diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 1afe66d8..ff70f760 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -76,7 +76,6 @@ library HBS2.Storage.NCQ3.Internal.Flags HBS2.Storage.NCQ3.Internal.Fsync HBS2.Storage.NCQ3.Internal.CLI - HBS2.Storage.NCQ HBS2.Storage.NCQ.Types -- other-modules: -- other-extensions: diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 32644567..0a6a1748 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1183,26 +1183,6 @@ executable test-scripts , zstd -executable tcq - import: shared-properties - import: common-deps - default-language: Haskell2010 - ghc-options: - hs-source-dirs: test - main-is: TCQ.hs - build-depends: - base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq - , network - , string-conversions - , db-pipe - , suckless-conf - , network-byte-order - , text - , time - , mmap - , zstd - , unix - executable test-ncq import: shared-properties diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs deleted file mode 100644 index 212ca50a..00000000 --- a/hbs2-tests/test/TCQ.hs +++ /dev/null @@ -1,786 +0,0 @@ -{-# Language AllowAmbiguousTypes #-} -{-# Language UndecidableInstances #-} -{-# Language MultiWayIf #-} -{-# Language RecordWildCards #-} -{-# Language ViewPatterns #-} -{-# OPTIONS_GHC -Wno-orphans #-} -module Main where - -import HBS2.Prelude.Plated -import HBS2.OrDie -import HBS2.Hash -import HBS2.Data.Types.Refs -import HBS2.Clock -import HBS2.Merkle - -import HBS2.Storage -import HBS2.Storage.Simple -import HBS2.Storage.Operations.ByteString -import HBS2.Storage.Operations.Delete -import HBS2.Net.Auth.Credentials -import HBS2.Peer.Proto.RefLog -import HBS2.Peer.Proto.LWWRef -import HBS2.Data.Types.SignedBox - -import HBS2.System.Logger.Simple.ANSI - -import HBS2.Storage.NCQ -import HBS2.Data.Log.Structured.SD -import HBS2.Data.Log.Structured.NCQ - -import HBS2.CLI.Run.Internal.Merkle - -import Data.Config.Suckless.Syntax -import Data.Config.Suckless.Script as SC -import Data.Config.Suckless.System as SF -import Data.Config.Suckless.Script.File as SF - -import DBPipe.SQLite hiding (field) - -import System.Random.MWC as MWC - -import System.IO.Temp as Temp -import Data.Bits -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Lazy qualified as LBS -import Data.Text.Encoding qualified as TE -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Builder -import Data.Maybe -import Data.Word -import Data.List qualified as List -import Data.Vector qualified as V -import Data.Vector ((!)) -import Control.Monad.Trans.Cont -import Control.Monad.Trans.Maybe -import Control.Monad.Except (runExceptT) -import Network.ByteOrder qualified as N -import Data.Coerce -import Data.HashPSQ qualified as HPSQ -import Data.HashSet qualified as HS -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HM -import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.Fixed -import System.Environment -import System.Directory -import System.Posix.Fcntl -import System.Posix.IO -import System.IO.MMap -import System.IO qualified as IO -import System.Exit (exitSuccess, exitFailure) -import System.Random -import System.Random.Stateful -import Safe -import Lens.Micro.Platform -import Control.Concurrent.STM qualified as STM -import Data.Hashable - -import UnliftIO -import UnliftIO.Async - -import Text.InterpolatedString.Perl6 (qc) - -import Streaming.Prelude qualified as S -import System.TimeIt - -import System.IO.Unsafe (unsafePerformIO) - -{- HLINT ignore "Functor law" -} - -setupLogger :: MonadIO m => m () -setupLogger = do - -- setLogging @DEBUG $ toStderr . logPrefix "[debug] " - setLogging @ERROR $ toStderr . logPrefix "[error] " - setLogging @WARN $ toStderr . logPrefix "[warn] " - setLogging @NOTICE $ toStdout . logPrefix "" - -flushLoggers :: MonadIO m => m () -flushLoggers = do - silence - -silence :: MonadIO m => m () -silence = do - setLoggingOff @DEBUG - setLoggingOff @ERROR - setLoggingOff @WARN - setLoggingOff @NOTICE - setLoggingOff @TRACE - - -data TCQError = - TCQAlreadyOpen FilePath - | TCQGone FilePath - deriving stock (Show,Typeable) - -instance Exception TCQError - -newtype TCQ = - TCQ FilePath - deriving newtype (Eq,Ord,Show,Typeable) - - -main :: IO () -main = do - - instances <- newTVarIO (mempty :: HashMap FilePath (NCQStorage, Async ())) - - tvd <- newTVarIO mempty - - let runScript dict argz what = liftIO do - script <- either (error.show) pure $ parseTop what - runM dict do - bindCliArgs argz - void $ evalTop script - - let finalizeStorages = do - debug "finalize ncq" - r <- readTVarIO instances <&> HM.toList - mapM_ ncqStorageStop (fmap (fst.snd) r) - mapM_ wait (fmap (snd.snd) r) - - let getNCQ (TCQ p) = do - readTVarIO instances - <&> HM.lookup p - <&> fmap fst - >>= orThrow (TCQGone p) - - let getTCQ (TCQ p) = do - readTVarIO instances - <&> HM.lookup p - >>= orThrow (TCQGone p) - - let dict = makeDict @C do - - entry $ bindMatch "--help" $ nil_ \case - HelpEntryBound what -> helpEntry what - [StringLike s] -> helpList True (Just s) - _ -> helpList True Nothing - - - hidden $ internalEntries - - SF.entries - - entry $ bindMatch "#!" $ nil_ $ const none - - entry $ bindMatch "stdin" $ nil_ $ \case - argz -> do - liftIO getContents >>= runScript dict argz - - entry $ bindMatch "file" $ nil_ $ \case - ( StringLike fn : argz ) -> do - liftIO (readFile fn) >>= runScript dict argz - - e -> error (show $ pretty $ mkList e) - - - entry $ bindMatch "debug" $ nil_ \case - - [ LitBoolVal False ] -> do - setLoggingOff @DEBUG - - [ StringLike "off" ] -> do - setLoggingOff @DEBUG - - _ -> - setLogging @DEBUG $ toStderr . logPrefix "[debug] " - - entry $ bindMatch "ncq:open" $ \case - [ StringLike path ] -> do - debug $ "ncq:open" <+> pretty path - ncq <- ncqStorageOpen path - r <- async (ncqStorageRun ncq) - - e <- atomically do - already <- readTVar instances <&> HM.member path - if already then - pure $ Left $ TCQAlreadyOpen path - else do - modifyTVar instances (HM.insert path (ncq,r)) - pure $ Right () - - either throwIO pure e - - mkOpaque (TCQ path) - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "ncq:poke" $ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - ncq <- getNCQ tcq - pure $ mkSym "okay" - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:fossilize" $ nil_ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - ncq <- getNCQ tcq - ncqIndexRightNow ncq - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:merge:step" $ \syn -> lift do - - tcq <- case syn of - [ isOpaqueOf @TCQ -> Just tcq ] -> do - pure tcq - - e -> throwIO $ BadFormException @C (mkList e) - - ncq <- getNCQ tcq - ncqStorageMergeStep ncq - - pure nil - - entry $ bindMatch "ncq:compact" $ \syn -> lift do - - tcq <- case syn of - [ isOpaqueOf @TCQ -> Just tcq ] -> do - pure tcq - - e -> throwIO $ BadFormException @C (mkList e) - - ncq <- getNCQ tcq - ncqCompact ncq - - pure nil - - entry $ bindMatch "ncq:compact:scan" $ \syn -> lift do - - tcq <- case syn of - [ isOpaqueOf @TCQ -> Just tcq ] -> do - pure tcq - - e -> throwIO $ BadFormException @C (mkList e) - - ncq <- getNCQ tcq - r <- ncqLinearScanForCompact ncq (\_ _ -> none) - - pure $ mkInt r - - entry $ bindMatch "ncq:merge" $ \syn -> lift do - - tcq <- case syn of - [ isOpaqueOf @TCQ -> Just tcq ] -> do - pure tcq - - e -> throwIO $ BadFormException @C (mkList e) - - ncq <- getNCQ tcq - ncqStorageMerge ncq - - pure nil - - entry $ bindMatch "ncq:close" $ nil_ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - ncq <- getNCQ tcq - ncqStorageStop ncq - - void $ runMaybeT do - (s,r) <- readTVarIO instances - <&> HM.lookup (coerce tcq) - >>= toMPlus - - wait r - atomically $ modifyTVar instances (HM.delete (coerce tcq)) - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "ncq:fsck" $ nil_ \case - [ StringLike fpath ] -> lift do - issues <- ncqFsck fpath - - for_ issues $ \i -> do - err $ viaShow i - - unless (List.null issues) exitFailure - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:cached:entries" $ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - NCQStorage{..} <- getNCQ tcq - readTVarIO ncqCachedEntries <&> mkInt - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:locate" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do - ncq <- getNCQ tcq - ncqLocate ncq hash >>= \case - Just x -> do - parseSyntax (show $ pretty x) & either (error.show) pure - - _ -> pure nil - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:has" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do - ncq <- getNCQ tcq - ncqStorageHasBlock ncq hash <&> maybe nil mkInt - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "ncq:del" $ nil_ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do - ncq <- getNCQ tcq - ncqStorageDel ncq hash - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:flush" $ nil_ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - ncq <- getNCQ tcq - ncqStorageFlush ncq - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:stop" $ nil_ \case - [ isOpaqueOf @TCQ -> Just tcq ] -> lift do - (ncq, w) <- getTCQ tcq - ncqStorageStop ncq - debug "wait storage to stop" - wait w - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:set:ref" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike ref , HashLike val ] -> lift do - ncq <- getNCQ tcq - ncqStorageSetRef ncq ref val - pure nil - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:del:ref" $ \case - [ isOpaqueOf @TCQ -> Just tcq , HashLike ref ] -> lift do - ncq <- getNCQ tcq - ncqStorageDelRef ncq ref - pure nil - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:get:ref" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike w ] -> lift do - ncq <- getNCQ tcq - ref <- ncqStorageGetRef ncq w - debug $ "ref" <+> pretty w <+> pretty ref - pure $ maybe nil (mkSym . show . pretty) ref - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:get:reflog" $ \case - [ isOpaqueOf @TCQ -> Just tcq, SignPubKeyLike reflog ] -> lift do - ncq <- getNCQ tcq - let sto = AnyStorage ncq - let ha = hashObject @HbSync (RefLogKey @HBS2Basic reflog) - debug $ "refhash" <+> pretty ha - ref <- getRef sto (RefLogKey @HBS2Basic reflog) - pure $ maybe nil (mkSym . show . pretty) ref - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:get:lwwref" $ \case - [ isOpaqueOf @TCQ -> Just tcq, SignPubKeyLike lww ] -> lift do - ncq <- getNCQ tcq - let sto = AnyStorage ncq - val <- runMaybeT do - rv <- getRef sto (LWWRefKey @HBS2Basic lww) >>= toMPlus - getBlock sto rv >>= toMPlus - <&> unboxSignedBox @(LWWRef 'HBS2Basic) @HBS2Basic - >>= toMPlus - <&> snd - - pure $ maybe nil (mkSym . show . pretty) val - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:refhash" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike w ] -> lift do - ncq <- getNCQ tcq - let rf = ncqRefHash ncq w - pure $ mkSym ( show $ pretty $ rf ) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:hash" $ \case - [ isOpaqueOf @ByteString -> Just bs ] -> lift do - pure $ mkSym ( show $ pretty $ hashObject @HbSync bs ) - - [ StringLike s ] -> lift do - pure $ mkSym ( show $ pretty $ hashObject @HbSync (BS8.pack s) ) - - e -> pure nil - - entry $ bindMatch "ncq:get" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do - ncq <- getNCQ tcq - ncqStorageGetBlock ncq hash >>= maybe (pure nil) mkOpaque - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:put" $ \syn -> do - (tcq,bs) <- case syn of - [ isOpaqueOf @TCQ -> Just tcq, isOpaqueOf @ByteString -> Just bs ] -> lift do - pure (tcq, LBS.fromStrict bs) - - [ isOpaqueOf @TCQ -> Just tcq, TextLike s ] -> lift do - pure (tcq, LBS.fromStrict (TE.encodeUtf8 s)) - - e -> throwIO $ BadFormException @C (mkList e) - - lift do - ncq <- getNCQ tcq - r <- ncqStoragePutBlock ncq bs - pure $ maybe nil (mkSym . show . pretty) r - - entry $ bindMatch "ncq:merkle:hashes" $ \case - [ isOpaqueOf @TCQ -> Just tcq, HashLike h ] -> lift do - ncq <- getNCQ tcq - liftIO do - let sto = AnyStorage ncq - mkList <$> S.toList_ do - walkMerkle (coerce h) (getBlock sto) $ \case - Left{} -> throwIO MissedBlockError - Right (hrr :: [HashRef]) -> do - forM_ hrr $ \hx -> do - S.yield (mkSym $ show $ pretty hx) - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "sqlite:nwrite" $ nil_ \case - [ LitIntVal tn', LitIntVal n ] -> lift do - - let tn = fromIntegral tn' - let num = fromIntegral n - - - g <- liftIO MWC.createSystemRandom - - for_ [1..tn] $ \tnn -> flip runContT pure do - - let fnv = num `quot` tnn - - mkdir "temp" - - let tmp = "temp" - - -- tmp <- ContT $ Temp.withTempDirectory "temp" "nwrite" - - dbf <- liftIO $ Temp.emptyTempFile tmp "nwrite-.db" - - db <- newDBPipeEnv dbPipeOptsDef dbf - - pipe <- ContT $ withAsync (runPipe db) - - tw <- newTVarIO 0 - - withDB db do - ddl "create table if not exists block (hash blob not null primary key, value blob)" - commitAll - - withDB db do - ddl [qc| - pragma journal_mode=WAL; - pragma synchronous=normal; - |] - - - t0 <- getTimeCoarse - - ss <- replicateM num $ liftIO $ MWC.uniformRM (64*1024, 256*1024) g - - liftIO $ pooledForConcurrentlyN_ tnn ss $ \size -> do - lbs <- uniformByteStringM size g <&> LBS.fromStrict - - let ha = hashObject @HbSync lbs - - let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |] - - withDB db do - insert sql (coerce @_ @ByteString ha, lbs) - atomically $ modifyTVar tw (+ (32 + size)) - - withDB db do - commitAll - - w <- readTVarIO tw - t1 <- getTimeCoarse - - let t = realToFrac (toNanoSecs (t1 - t0)) / 1e9 - let tsec = realToFrac @_ @(Fixed E2) t - - let total = realToFrac w - - let speed = if t > 0 then total / t else 0 - let totMegs = realToFrac @_ @(Fixed E2) $ total / (1024**2) - let speedMbs = realToFrac @_ @(Fixed E2) $ speed / (1024**2) - - notice $ pretty tnn <+> pretty (tsec) <+> pretty totMegs <+> pretty speedMbs - - none - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "sqlite:merkle:write" $ nil_ \case - [ StringLike dbf, StringLike fname ] -> lift do - db <- newDBPipeEnv dbPipeOptsDef dbf - - - withDB db do - ddl "create table if not exists block (hash blob not null primary key, value blob)" - commitAll - - withDB db do - ddl [qc| -pragma journal_mode=WAL; -pragma synchronous=normal; - |] - - flip runContT pure do - pipe <- ContT $ withAsync (runPipe db) - - lbs <- liftIO $ LBS.readFile fname - - chu <- S.toList_ (readChunkedBS lbs (256*1024)) - - let sql = [qc|insert into block (hash, value) values(?,?) on conflict (hash) do nothing |] - - withDB db do - hashes <- for chu $ \chunk -> do - let ha = hashObject @HbSync chunk - insert sql (coerce @_ @ByteString ha, chunk) - pure ha - - let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes - - m <- makeMerkle 0 pt $ \(ha,_,bss) -> do - insert sql (coerce @_ @ByteString ha, bss) - - withDB db do - commitAll - - pure $ mkSym @C (show $ pretty m) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:merkle:write" $ \syn -> do - (tcq,fname) <- case syn of - [ isOpaqueOf @TCQ -> Just tcq, StringLike f ] -> lift do - pure (tcq, f) - - e -> throwIO $ BadFormException @C (mkList e) - - lift do - ncq <- getNCQ tcq - - lbs <- liftIO $ LBS.readFile fname - - chu <- S.toList_ (readChunkedBS lbs (256*1024)) - hashes <- forConcurrently chu $ \chunk -> do - ncqStoragePutBlock ncq chunk >>= orThrowUser "can't save" - - -- FIXME: handle-hardcode - let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings - - m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do - void $ ncqStoragePutBlock ncq bss >>= orThrowUser "can't save" - - pure $ mkSym (show $ pretty m) - - - entry $ bindMatch "ncq:merkle:del" $ nil_ \syn -> do - (sto,root) <- case syn of - [ isOpaqueOf @TCQ -> Just tcq, HashLike root ] -> lift do - - ncq <- AnyStorage <$> getNCQ tcq - pure (ncq, root) - - e -> throwIO $ BadFormException @C (mkList e) - - lift do - deleteMerkleTree sto root - - - entry $ bindMatch "ncq:merkle:read:stdout" $ nil_ \syn -> do - (tcq,h) <- case syn of - [ isOpaqueOf @TCQ -> Just tcq, HashLike f ] -> lift do - pure (tcq, f) - - e -> throwIO $ BadFormException @C (mkList e) - - lift do - ncq <- getNCQ tcq - - lbs <- runExceptT (getTreeContents (AnyStorage ncq) h) - >>= orThrowPassIO - - LBS.putStr lbs - - - entry $ bindMatch "ncq:sd:scan:test" $ \case - [StringLike fn] -> liftIO do - - isDir <- SF.doesDirectoryExist fn - - files <- if not isDir then - pure [fn] - else do - S.toList_ do - glob ["**/*.data"] [] fn $ \f -> do - S.yield f - pure True - - - ttombs <- newTVarIO 0 - rrefs <- newTVarIO 0 - - for_ files $ \fp -> do - - mmaped <- liftIO $ mmapFileByteString fp Nothing - - runConsumeBS mmaped do - readSections $ \size bs -> do - let ssz = LBS.length bs - let (_, rest1) = LBS.splitAt 32 bs - let (prefix, _) = LBS.splitAt ncqPrefixLen rest1 & over _1 LBS.toStrict - - if | prefix == ncqTombPrefix -> do - atomically $ modifyTVar ttombs succ - | prefix == ncqRefPrefix -> do - atomically $ modifyTVar rrefs succ - | otherwise -> none - - r <- readTVarIO rrefs - t <- readTVarIO ttombs - - pure $ mkList [mkInt t, mkInt r] - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "ncq:scan:for-compact" $ nil_ \syn -> do - ncq@NCQStorage{..} <- case syn of - [ isOpaqueOf @TCQ -> Just tcq ] -> lift $ getNCQ tcq - - e -> throwIO $ BadFormException @C (mkList e) - - ncqLinearScanForCompact ncq $ \fk h -> do - notice $ "TO DELETE" <+> pretty fk <+> pretty h - - entry $ bindMatch "ncq:nway:scan:cq:test" $ \case - [StringLike fn] -> liftIO do - - isDir <- SF.doesDirectoryExist fn - - files <- if not isDir then - pure [fn] - else do - S.toList_ do - glob ["**/*.cq"] [] fn $ \f -> do - S.yield f - pure True - - counters <- newTVarIO mempty - - for_ files $ \f -> do - - (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly f >>= orThrow (NWayHashInvalidMetaData f) - - let emptyKey = BS.replicate nwayKeySize 0 - nwayHashScanAll meta mmaped $ \o k v -> do - unless (k == emptyKey) do - atomically do - let k1 = hash k `mod` (2 ^ 17) - modifyTVar counters (IntMap.insertWith (+) k1 1) - - r <- readTVarIO counters <&> IntMap.size - pure $ mkInt r - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "ncq:nway:stats" $ \case - [StringLike fn] -> liftIO do - - mt_ <- newTVarIO 0 - total_ <- newTVarIO 0 - - (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) - - let emptyKey = BS.replicate nwayKeySize 0 - nwayHashScanAll meta mmaped $ \o k v -> do - atomically do - modifyTVar total_ succ - when (k == emptyKey) do - modifyTVar mt_ succ - - mt <- readTVarIO mt_ - total <- readTVarIO total_ - let used = total - mt - - let ratio = realToFrac @_ @(Fixed E3) (realToFrac used / realToFrac total) - - let stats = mkForm @C "stats" [ mkForm "empty" [mkInt mt] - , mkForm "used" [mkInt used] - , mkForm "total" [mkInt total] - , mkForm "ratio" [mkDouble ratio] - ] - - pure $ mkList [mkForm "metadata" [mkSyntax meta], stats] - - e -> throwIO $ BadFormException @C (mkList e) - - - setupLogger - - argz <- liftIO getArgs - - forms <- parseTop (unlines $ unwords <$> splitForms argz) - & either (error.show) pure - - atomically $ writeTVar tvd dict - - flip runContT pure do - - ContT $ bracket none $ const do - finalizeStorages - flushLoggers - - lift do - case forms of - - ( cmd@(ListVal [StringLike "file", StringLike fn]) : _ ) -> do - void $ run dict [cmd] - - ( cmd@(ListVal [StringLike "stdin"]) : _ ) -> do - void $ run dict [cmd] - - ( cmd@(ListVal [StringLike "--help"]) : _ ) -> do - void $ run dict [cmd] - - [] -> do - eof <- liftIO IO.isEOF - if eof then - void $ run dict [mkForm "help" []] - else do - what <- liftIO getContents - >>= either (error.show) pure . parseTop - - run dict what >>= eatNil display - - e -> void $ run dict e - - -- (runEval tvd forms >>= eatNil display) - -- `finally` (finalizeStorages >> flushLoggers) - - - diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 425f244b..c07338d8 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -1,560 +1,19 @@ {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} -{-# Language MultiWayIf #-} -{-# Language RecordWildCards #-} -{-# Language ViewPatterns #-} {-# OPTIONS_GHC -Wno-orphans #-} module Main where import HBS2.Prelude.Plated -import HBS2.OrDie -import HBS2.Hash -import HBS2.Data.Types.Refs -import HBS2.Misc.PrettyStuff -import HBS2.Clock -import HBS2.Merkle -import HBS2.Polling - -import HBS2.Storage -import HBS2.Storage.Simple -import HBS2.Storage.Operations.ByteString - import HBS2.System.Logger.Simple.ANSI - -import HBS2.Data.Log.Structured.SD -import HBS2.Storage.NCQ -import HBS2.Data.Log.Structured.NCQ - -import HBS2.CLI.Run.Internal.Merkle - import Data.Config.Suckless.Syntax import Data.Config.Suckless.Script as SC -import Data.Config.Suckless.System - import NCQTestCommon import NCQ3 - -import DBPipe.SQLite hiding (field) - -import Codec.Compression.Zstd qualified as Zstd - -import System.Posix.Files qualified as PFS -import Numeric (showHex) -import Data.Ord (Down(..)) -import Data.Char -import Data.Bits -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Lazy qualified as LBS -import Data.Text.Encoding qualified as TE -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Builder -import Data.Hashable (hash) -import Data.Maybe -import Data.Either -import Data.Word -import Data.List qualified as List -import Data.Vector qualified as V -import Data.Vector ((!)) -import Control.Monad.Trans.Cont -import Control.Monad.Trans.Maybe -import Control.Monad.Except (runExceptT) -import Network.ByteOrder qualified as N -import Data.Coerce -import Data.HashPSQ qualified as HPSQ -import Data.HashSet qualified as HS -import Data.HashSet (HashSet) -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HM -import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.IntSet (IntSet) -import Data.IntSet qualified as IntSet -import Data.Fixed import System.Environment -import System.FilePath.Posix -import System.Directory -import System.Posix.Fcntl -import System.Posix.IO -import System.IO.MMap -import System.IO qualified as IO -import System.Exit (exitSuccess, exitFailure) -import System.Random -import System.Random.MWC as MWC -import System.Random.Stateful -import System.Random.Shuffle (shuffleM) -import Safe -import Lens.Micro.Platform -import Control.Concurrent.STM qualified as STM -import System.IO.Temp qualified as Temp -import System.Mem - import UnliftIO -import UnliftIO.Async - -import Test.Tasty.HUnit -import Text.InterpolatedString.Perl6 (qc) - -import Streaming.Prelude qualified as S -import System.TimeIt - -import System.IO.Unsafe (unsafePerformIO) - -import Data.BloomFilter.Easy as Bloom {- HLINT ignore "Functor law" -} - - - -testNCQFuckupRecovery1 :: MonadUnliftIO m - => TestEnv - -> m () - -testNCQFuckupRecovery1 TestEnv{..} = flip runContT pure do - - let ncqDir = testEnvDir "ncq" - - (cur,ha,h0) <- lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - - source <- LBS.take (100 * 1024^2) <$> liftIO (LBS.readFile "/dev/urandom") - - let h0 = hashObject @HbSync source - - hash <- runExceptT (writeAsMerkle sto source <&> HashRef) - >>= orThrowPassIO @_ @SomeException - - notice $ "stored" <+> pretty hash <+> pretty (LBS.length source) - - pure (ncqGetCurrentName ncq, hash, h0) - - liftIO do - ss <- randomRIO (1, 32*1024) - shit <- LBS.take ss <$> LBS.readFile "/dev/urandom" - BS.appendFile cur (LBS.toStrict shit) - newSize <- getFileSize cur - notice $ "CURRENT-FILE" <+> pretty cur <+> "successfully corrupted" <+> pretty newSize - - notice $ "CURRENT-FILE" <+> pretty cur - - lift $ withNCQ id ncqDir $ \ncq -> do - notice $ "REOPEN STORAGE" - let sto = AnyStorage ncq - - lbs <- runExceptT (getTreeContents sto ha) - >>= orThrowPassIO - - let h1 = hashObject @HbSync lbs - - when (h0 /= h1) do - error "corrupted state" - - notice $ "loaded" <+> pretty ha <+> pretty (LBS.length lbs) - - - -testNCQLongWrite :: MonadUnliftIO m => Int -> TestEnv -> m () -testNCQLongWrite n TestEnv{..} = flip runContT pure do - let ncqDir = testEnvDir "ncq-simple" - - -- Step 1: Write block - lift $ withNCQ id ncqDir $ \ncq -> liftIO do - let sto = AnyStorage ncq - replicateM_ n do - size <- randomRIO (1, 256*1024) - let payload = LBS.replicate size 0x41 -- 0x41 = 'A' - h <- putBlock sto payload - assertBool "block written" (isJust h) - - -testNCQLongWriteRead :: MonadUnliftIO m => Int -> TestEnv -> m () -testNCQLongWriteRead n TestEnv{..} = flip runContT pure do - let ncqDir = testEnvDir "ncq-simple" - - wq <- newTQueueIO - - -- Step 1: Write block - lift $ withNCQ id ncqDir $ \ncq -> liftIO do - let sto = AnyStorage ncq - replicateM_ n do - size <- randomRIO (1, 256*1024) - let payload = LBS.replicate size 0x41 -- 0x41 = 'A' - h <- putBlock sto payload - assertBool "block written" (isJust h) - for_ h $ \hhh -> do - atomically $ writeTQueue wq (HashRef hhh) - - r <- atomically $ STM.flushTQueue wq - - for_ r $ \h -> do - s <- ncqLocate ncq h - assertBool "actually written" (isJust s) - -testNCQSimple1 :: MonadUnliftIO m => TestEnv -> m () -testNCQSimple1 TestEnv{..} = flip runContT pure do - let ncqDir = testEnvDir "ncq-simple" - - for_ [ 0 .. 18 ] $ \s -> do - let size = 2 ^ s - let payload = LBS.replicate size 0x41 -- 0x41 = 'A' - let expectedHash = hashObject @HbSync payload - - -- Step 1: Write block - lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - h <- putBlock sto payload `orDie` "failed to write block" - liftIO $ assertBool "hashes match (write)" (h == expectedHash) - - -- Step 2: Read back - lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - blk <- getBlock sto (coerce expectedHash) `orDie` "block not found" - sx <- hasBlock sto (coerce expectedHash) - - loc <- ncqLocate ncq (coerce expectedHash) - >>= orThrowUser "not found" - - blk0 <- ncqStorageGet_ ncq loc - - let sblk0 = LBS.length <$> blk0 - - liftIO $ print $ "block size" - <+> pretty sx - <+> ";" - <+> pretty (LBS.length blk) - <+> ";" - <+> pretty size - <+> ";" - <+> pretty sblk0 - <+> pretty loc - - liftIO $ do - assertBool "block has correct length" (LBS.length blk == size) - assertBool "block contents are correct" (blk == payload) - - -testNCQSimple2 :: MonadUnliftIO m => Int -> TestEnv -> m () -testNCQSimple2 n TestEnv{..} = flip runContT pure do - let ncqDir = testEnvDir "ncq-simple2" - - let alph_ = V.fromList ['A' .. 'z'] - cnt <- newTVarIO 0 - - let alphx = liftIO do - i <- atomically $ stateTVar cnt (\x -> (x, succ x)) - pure $ alph_ ! ( i `mod` V.length alph_) - - -- Step 1: write N blocks - hashes <- lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - replicateM n do - size <- liftIO $ randomRIO (0, 256 * 1024) - chr <- alphx - let payload = LBS.replicate size (fromIntegral $ ord chr) - let h = hashObject @HbSync payload - h' <- putBlock sto payload `orDie` "putBlock failed" - loc <- ncqLocate ncq (coerce h) - s <- hasBlock sto h - w <- getBlock sto h - let w' = fromMaybe mempty w - - if w == Just payload then do - debug $ "okay" <+> pretty loc - else do - err $ pretty s <> "/" <> pretty size - <+> viaShow (LBS.take 48 w') - <+> ".." - <+> viaShow (LBS.take 8 $ LBS.reverse w') - <> line - <+> pretty loc - - error "ABORTED" - - liftIO $ assertBool "hash matches" (h == h') - pure (h, size, payload) - - let testRead ncq = do - let sto = AnyStorage ncq - forM_ hashes $ \(h, expectedSize, expectedPayload) -> do - loc <- ncqLocate ncq (coerce h) >>= orThrowUser "not found" - blk <- getBlock sto (coerce h) `orDie` "block not found" - sx <- hasBlock sto (coerce h) - blk0 <- ncqStorageGet_ ncq loc - let sblk0 = LBS.length <$> blk0 - let actualSize = LBS.length blk - - debug $ "block size" - <+> pretty sx - <+> ";" - <+> pretty actualSize - <+> ";" - <+> pretty expectedSize - <+> ";" - <+> pretty sblk0 - <+> pretty loc - - liftIO do - assertBool "size match" (actualSize == expectedSize) - assertBool "payload match" (blk == expectedPayload) - - -- Step 2: reopen and verify - lift $ withNCQ id ncqDir $ \ncq -> do - testRead ncq - -- ncqIndexRightNow ncq - pause @'Seconds 2 - - liftIO $ print $ "LAST PASS" - -- Step 3: reopen and verify - fossil - lift $ withNCQ id ncqDir $ \ncq -> do - testRead ncq - -testNCQ1 :: MonadUnliftIO m - => Int - -> TestEnv - -> m () - -testNCQ1 n TestEnv{..} = flip runContT pure $ callCC \stop -> do - - let tmp = testEnvDir - - let inputDir = tmp "input" - let ncqDir = tmp "ncq-test-data" - - for_ [inputDir] mkdir - - twritten <- newTVarIO (mempty :: HashSet HashRef) - - nSize <- newTVarIO 0 - - tssQ <- newTQueueIO - - forM_ [1..n] $ \i -> liftIO do - withBinaryFile "/dev/urandom" ReadMode \urandom -> do - let fname = inputDir show i <> ".bin" - size <- randomRIO (1, 256*1024) - atomically $ modifyTVar' nSize (+size) - file <- BS.copy <$> BS.hGetSome urandom size - BS.writeFile fname file - let !ha = hashObject @HbSync file - let !len = fromIntegral $ BS.length file - -- atomically $ writeTQueue tssQ (fname, (ha, fromIntegral $! BS.length file)) - -- l <- getFileSize fname - -- atomically $ writeTQueue tssQ (fname, (ha, l)) - atomically $ writeTQueue tssQ (fname, (ha, len)) - -- performGC - - fss <- atomically (STM.flushTQueue tssQ) - - stop () - - liftIO do - withNCQ id ncqDir $ \ncq -> flip runContT pure do - - let sto = AnyStorage ncq - let fileMap = HM.fromList [ (ha,(s,fn)) | (fn,(ha,s)) <- fss ] - - let - written :: forall m a . (Fractional a, MonadIO m) => m [(HashRef, a)] - written = readTVarIO twritten <&> HS.toList <&> fmap (,0.1) - - ContT $ withAsync $ forever do - polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO do - what <- getBlock sto hz >>= orThrowUser ("block not found" <+> pretty hz) - let h2 = hashObject @HbSync what - - (s,_) <- HM.lookup hz fileMap & orThrowUser "fileMap entry missed" - - ssz <- hasBlock sto hz - >>= orThrowUser ("block size not found" <+> pretty hz) - - when (ssz /= s) do - error $ show $ "size mismatch" <+> pretty hz - - when (hz /= h2) do - error $ show $ pretty "hash does not match" <+> pretty hz <+> pretty s - - liftIO $ forConcurrently_ fss $ \(fn, (ha,s)) -> do - co <- liftIO (BS.readFile fn) <&> LBS.fromStrict - h1 <- putBlock sto co >>= orThrowUser "block not written" - lbs2 <- getBlock sto ha >>= orThrowUser "block not found" - let h2 = hashObject @HbSync lbs2 - - when (ha /= h2 || h1 /= ha) do - error $ show $ pretty "hash does not match" <+> pretty h1 <+> pretty s - - atomically $ modifyTVar twritten (HS.insert (HashRef h1)) - - debug $ "putBlock" <+> pretty ha <+> pretty h2 - - liftIO $ forConcurrently_ fss $ \(fn, (ha,s)) -> do - lbs2 <- getBlock sto ha >>= orThrowUser "block not found" - let h2 = hashObject @HbSync lbs2 - - when (ha /= h2) do - error $ show $ pretty "hash does not match" <+> pretty ha <+> pretty s - - debug $ "getBlock" <+> pretty ha <+> pretty h2 - - liftIO do - withNCQ id ncqDir $ \ncq -> flip runContT pure do - - let sto = AnyStorage ncq - - for_ fss $ \(fn, (ha,s)) -> do - lbs2 <- getBlock sto ha >>= orThrowUser "block not found" - let h2 = hashObject @HbSync lbs2 - - when (ha /= h2) do - error $ show $ pretty "hash does not match" <+> pretty ha <+> pretty s - - debug $ "getBlock" <+> pretty ha <+> pretty h2 - - -testNCQTree1 :: MonadUnliftIO m - => Int - -> TestEnv - -> m () - -testNCQTree1 n TestEnv{..} = flip runContT pure do - - let size = 1024 * 1024 * fromIntegral n - - let tmp = testEnvDir - - let inputDir = tmp "input" - let ncqDir = tmp "ncq-test-data" - - treeLbs <- LBS.take size <$> liftIO (LBS.readFile ("/dev/urandom")) - - let h1 = hashObject @HbSync treeLbs - - lift $ withNCQ id ncqDir $ \ncq1 -> do - - let sto = AnyStorage ncq1 - - r <- createTreeWithMetadata sto Nothing mempty treeLbs - >>= orThrowPassIO - - lbs2 <- runExceptT (getTreeContents sto r) - >>= orThrowPassIO - - let h2 = hashObject @HbSync lbs2 - - - let l1 = LBS.length treeLbs - let l2 = LBS.length treeLbs - display (mkList @C [mkSym r, mkSym h1, mkSym h2, mkInt l1, mkInt l2]) - - liftIO $ assertBool "hashes equal" (h1 == h2) - - -- display (mkSym @C $ show $ pretty r) - -testNCQRefs1 :: MonadUnliftIO m - => Int - -> TestEnv - -> m () - -testNCQRefs1 n TestEnv{..} = flip runContT pure do - - let tmp = testEnvDir - - let ncqDir = tmp "ncq-test-data" - - refs <- liftIO $ replicateM n $ do - ref <- SomeRefKey <$> randomIO @Word64 - val <- randomIO @Word64 <&> hashObject . serialise - pure (ref, val) - - lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - - for_ refs $ \(k,v) -> do - updateRef sto k v - - for_ refs $ \(k,v0) -> liftIO do - v1 <- getRef sto k - assertBool "refs equal 1" (Just v0 == v1) - - notice $ "all" <+> pretty n <+> "refs found" - - debug "restart storage" - - lift $ withNCQ id ncqDir $ \ncq -> do - let sto = AnyStorage ncq - - for_ refs $ \(k,v0) -> liftIO do - v1 <- getRef sto k - assertBool "refs equal 2" (Just v0 == v1) - delRef sto k - - notice $ "all" <+> pretty n <+> "refs found after restart" - - for_ refs $ \(k,_) -> liftIO do - v1 <- getRef sto k - assertBool "ref deleted" (isNothing v1) - - notice $ "all" <+> pretty n <+> "refs deleted" - - -testNCQConcurrent1 :: MonadUnliftIO m - => Bool - -> Int - -> Int - -> TestEnv - -> m () - -testNCQConcurrent1 noRead tn n TestEnv{..} = flip runContT pure do - - let tmp = testEnvDir - let inputDir = tmp "input" - let ncqDir = tmp "ncq-test-data" - - debug "preparing" - - mkdir inputDir - - debug $ pretty inputDir - - filez <- liftIO $ pooledReplicateConcurrentlyN 8 n $ do - size <- randomRIO (64*1024, 256*1024) - w <- liftIO (randomIO :: IO Word8) - let tbs = BS.replicate size w -- replicateM size w <&> BS.pack - let ha = hashObject @HbSync tbs -- & show . pretty - let fn = inputDir show (pretty ha) - liftIO $ BS.writeFile fn tbs - pure (fn, ha, BS.length tbs) - - debug "done" - - let fnv = V.fromList filez - let ssz = sum [ s | (_,_,s) <- filez ] & realToFrac - - setLoggingOff @DEBUG - - for_ [1 .. tn] $ \tnn -> do - - (t,_) <- timeItT $ liftIO $ withNCQ id ncqDir $ \ncq1 -> do - - pooledForConcurrentlyN_ tnn fnv $ \(n,ha,_) -> do - co <- BS.readFile n <&> LBS.fromStrict - putBlock ncq1 co - - pooledReplicateConcurrentlyN_ tnn (10 * V.length fnv) do - unless noRead do - i <- randomRIO (0, V.length fnv - 1) - let (n,ha,_) = fnv ! i - sz <- getBlock ncq1 ha - none - - let tt = realToFrac @_ @(Fixed E2) t - let speed = ((ssz / (1024 **2)) / t) & realToFrac @_ @(Fixed E2) - notice $ pretty tnn <+> pretty tt <+> pretty speed - - rm ncqDir - - - main :: IO () main = do