From 34585e000725d1cc8d017da29c73a09cc4f5d87e Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Sat, 17 May 2025 10:05:11 +0300 Subject: [PATCH] wip --- Makefile | 1 + hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 63 +++- hbs2-tests/hbs2-tests.cabal | 21 ++ hbs2-tests/test/TestNCQ.hs | 299 ++++++++++++++++++ .../lib/Data/Config/Suckless/System.hs | 2 + 5 files changed, 374 insertions(+), 12 deletions(-) create mode 100644 hbs2-tests/test/TestNCQ.hs diff --git a/Makefile b/Makefile index d790e350..ae4264c4 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ BINS := \ git-remote-hbs23 \ hbs2-ncq \ tcq \ + test-ncq \ RT_DIR := tests/RT diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 92abdc8c..4de6627b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -26,6 +26,7 @@ import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N import Data.HashMap.Strict (HashMap) +import Control.Monad.Except import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.Ord (Down(..),comparing) @@ -155,6 +156,7 @@ data NCQStorage = , ncqCurrentFd :: TVar (Maybe (RFd,WFd)) , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) + , ncqFsyncNum :: TVar Int , ncqFlushNow :: TVar [TQueue ()] , ncqMergeReq :: TVar Int , ncqOpenDone :: TMVar Bool @@ -166,7 +168,9 @@ instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where putBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs getBlock ncq h = ncqStorageGetBlock ncq (coerce h) - hasBlock ncq = hasBlock ncq . coerce + + hasBlock ncq = ncqStorageHasBlock ncq . coerce + delBlock ncq = ncqStorageDel ncq . coerce updateRef ncq k v = do @@ -335,6 +339,11 @@ ncqIndexFile n@NCQStorage{} fp' = do pure fp +ncqFsync :: MonadUnliftIO m => NCQStorage -> Fd -> m () +ncqFsync NCQStorage{..} fh = liftIO do + fileSynchronise fh + atomically $ modifyTVar ncqFsyncNum succ + ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do debug "ncqStorageStop" @@ -513,16 +522,21 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do pure () else void do liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) + -- liftIO $ fileSynchronise fh - written' <- if written < syncData then do - pure (written + w) - else do - fileSynchronise fh - pure 0 + (written',sz) <- if written < syncData then do + pure (written + w,0) + else do + ncqFsync ncq fh + fsize <- getFdStatus fh <&> PFS.fileSize + pure (0,fromIntegral fsize) - ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) + if sz < ncqMinLog then do + ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) + else do + pure [(h, (fromIntegral off, fromIntegral len))] - fileSynchronise fh + ncqFsync ncq fh size <- fdSeek fh SeekFromEnd 0 writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) @@ -641,6 +655,27 @@ ncqIsTomb lbs = do LBS.isPrefixOf "T" pre {-# INLINE ncqIsTomb #-} + +data HasBlockError = + LocationNotFound + | DataNotRead + | BlockIsTomb + deriving stock (Eq,Show,Typeable) + + +instance Exception HasBlockError + +ncqStorageHasBlockEither :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Either HasBlockError Integer) +ncqStorageHasBlockEither ncq h = runExceptT do + location <- ncqLocate ncq h >>= orThrow LocationNotFound + let s = ncqLocatedSize location + if s > ncqPrefixLen then + pure (s - ncqPrefixLen) + else do + what <- lift (ncqStorageGet_ ncq location) >>= orThrow DataNotRead + when (ncqIsTomb what) $ throwIO BlockIsTomb + pure 0 + ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock ncq h = runMaybeT do location <- ncqLocate ncq h >>= toMPlus @@ -681,6 +716,8 @@ ncqLocatedSize = \case InCurrent (_,_,s) -> fromIntegral s InFossil _ (_,s) -> fromIntegral s +-- ncqFsync :: MonadUnliftIO m => NCQStorage{..} -> FilePath + evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM () evictIfNeededSTM NCQStorage{..} howMany = do cur <- readTVar ncqCachedEntries @@ -866,6 +903,7 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqLocate ncq h >>= atomically . \case Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) + Just (InCurrent (fd,_,_)) -> do modifyTVar ncqStaged (IntMap.adjust (HPSQ.delete h) (fromIntegral fd)) writeTombstone (WQItem False Nothing) @@ -1032,11 +1070,11 @@ ncqStorageInit_ check path = do let ncqRoot = path - let ncqSyncSize = 64 * (1024 ^ 2) - let ncqMinLog = 512 * (1024 ^ 2) - let ncqMaxLog = 4 * (1024 ^ 3) + let ncqSyncSize = 64 * (1024 ^ 2) + let ncqMinLog = 1024 * (1024 ^ 2) + let ncqMaxLog = 4 * (1024 ^ 3) - let ncqMaxCached = 64 + let ncqMaxCached = 128 ncqSalt <- try @_ @IOException (liftIO $ BS.readFile seedPath) >>= orThrow NCQStorageSeedMissed @@ -1059,6 +1097,7 @@ ncqStorageInit_ check path = do ncqCurrentFd <- newTVarIO Nothing ncqIndexed <- newTVarIO mempty ncqMergeReq <- newTVarIO 0 + ncqFsyncNum <- newTVarIO 0 let currentName = ncqGetCurrentName_ path ncqGen diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index f603c264..29f22d7b 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1200,3 +1200,24 @@ executable tcq , unix +executable test-ncq + import: shared-properties + import: common-deps + default-language: Haskell2010 + ghc-options: + hs-source-dirs: test + main-is: TestNCQ.hs + build-depends: + base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq + , network + , string-conversions + , db-pipe + , suckless-conf + , network-byte-order + , text + , time + , mmap + , zstd + , unix + + diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs new file mode 100644 index 00000000..fa8f1703 --- /dev/null +++ b/hbs2-tests/test/TestNCQ.hs @@ -0,0 +1,299 @@ +{-# Language AllowAmbiguousTypes #-} +{-# Language UndecidableInstances #-} +{-# Language MultiWayIf #-} +{-# Language RecordWildCards #-} +{-# Language ViewPatterns #-} +{-# OPTIONS_GHC -Wno-orphans #-} +module Main where + +import HBS2.Prelude.Plated +import HBS2.OrDie +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Clock +import HBS2.Merkle + +import HBS2.Storage +import HBS2.Storage.Simple +import HBS2.Storage.Operations.ByteString + +import HBS2.System.Logger.Simple.ANSI + +import HBS2.Storage.NCQ +import HBS2.Data.Log.Structured.NCQ + +import HBS2.CLI.Run.Internal.Merkle + +import Data.Config.Suckless.Syntax +import Data.Config.Suckless.Script as SC +import Data.Config.Suckless.System + +import DBPipe.SQLite hiding (field) + +import Data.Bits +import Data.ByteString (ByteString) +import Data.ByteString qualified as BS +import Data.ByteString.Lazy qualified as LBS +import Data.Text.Encoding qualified as TE +import Data.ByteString.Char8 qualified as BS8 +import Data.ByteString.Builder +import Data.Maybe +import Data.Word +import Data.List qualified as List +import Data.Vector qualified as V +import Data.Vector ((!)) +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Control.Monad.Except (runExceptT) +import Network.ByteOrder qualified as N +import Data.Coerce +import Data.HashPSQ qualified as HPSQ +import Data.HashSet qualified as HS +import Data.HashSet (HashSet) +import Data.HashMap.Strict (HashMap) +import Data.HashMap.Strict qualified as HM +import Data.IntMap qualified as IntMap +import Data.IntMap (IntMap) +import Data.Fixed +import System.Environment +import System.FilePath.Posix +import System.Directory +import System.Posix.Fcntl +import System.Posix.IO +import System.IO.MMap +import System.IO qualified as IO +import System.Exit (exitSuccess, exitFailure) +import System.Random +import Safe +import Lens.Micro.Platform +import Control.Concurrent.STM qualified as STM +import System.IO.Temp qualified as Temp + +import UnliftIO + +import Text.InterpolatedString.Perl6 (qc) + +import Streaming.Prelude qualified as S +import System.TimeIt + +import System.IO.Unsafe (unsafePerformIO) + +{- HLINT ignore "Functor law" -} + +setupLogger :: MonadIO m => m () +setupLogger = do + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + setLogging @ERROR $ toStderr . logPrefix "[error] " + setLogging @WARN $ toStderr . logPrefix "[warn] " + setLogging @NOTICE $ toStdout . logPrefix "" + +flushLoggers :: MonadIO m => m () +flushLoggers = do + silence + +silence :: MonadIO m => m () +silence = do + setLoggingOff @DEBUG + setLoggingOff @ERROR + setLoggingOff @WARN + setLoggingOff @NOTICE + setLoggingOff @TRACE + +testNCQ1 :: MonadUnliftIO m + => Bool + -> FilePath + -> Int + -> m () + +testNCQ1 keep prefix n = flip runContT pure do + + mkdir prefix + + tmp <- liftIO (Temp.createTempDirectory prefix "ncq-test") + + let inputDir = tmp "input" + let ncqDir = tmp "ncq-test-data" + + for_ [inputDir] mkdir + + ContT $ bracket none $ const do + unless keep $ rm tmp + + twritten <- newTVarIO (mempty :: HashSet HashRef) + + nSize <- newTVarIO 0 + + fss <- for [1..n] $ \i -> liftIO do + let fname = inputDir show i <> ".bin" + size <- randomRIO (1, 256*1024) + atomically $ modifyTVar nSize (+size) + file <- LBS.take size <$> LBS.readFile "/dev/urandom" + BS.writeFile fname (BS.toStrict file) + pure fname + + ncq <- liftIO $ ncqStorageOpen ncqDir + r <- liftIO $ async (ncqStorageRun ncq) + + let sto = AnyStorage ncq + + nWq <- newTVarIO 0 + nCu <- newTVarIO 0 + nFo <- newTVarIO 0 + nMissed <- newTVarIO 0 + + let + updateStats :: forall m . MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) + updateStats ncq h = do + w <- ncqLocate ncq (coerce h) + + case w of + Just (InWriteQueue _) -> atomically $ modifyTVar nWq succ + Just (InCurrent _) -> atomically $ modifyTVar nCu succ + Just (InFossil _ _) -> atomically $ modifyTVar nFo succ + Nothing -> atomically $ modifyTVar nMissed succ + + pure w + + + t1 <- ContT $ withAsync $ fix \loop -> do + + what <- readTVarIO twritten + p <- randomRIO (0.01, 0.5) + pause @'Seconds (realToFrac p) + + forConcurrently_ what $ \h -> do + + w <- updateStats ncq h + + what <- ncqStorageHasBlockEither ncq (coerce h) + case what of + Left LocationNotFound | isJust w -> do + error $ show $ "FUCKING RACE!" <+> pretty w + + Left e -> throwIO e + Right _ -> none + + done <- readTVarIO (ncqStopped ncq) + unless done loop + + link t1 + -- + + out <- newTQueueIO + + liftIO do + forConcurrently_ fss $ \f -> do + -- debug $ "process file" <+> pretty f + blk <- BS.readFile f + h <- putBlock sto (LBS.fromStrict blk) `orDie` ("Can't store block " <> f) + atomically do + writeTQueue out (HashRef h) + modifyTVar twritten (HS.insert (coerce h)) + + blkQ <- atomically do + STM.flushTQueue out + + notice $ "WAIT BLOCKS DONE" <+> pretty (List.length blkQ) + + lift $ ncqStorageFlush ncq + + for_ blkQ $ \h -> liftIO do + void $ updateStats ncq h + hasBlock sto (coerce h) + `orDie` show ("missed" <+> pretty h) + + liftIO $ ncqStorageStop ncq + + wait t1 + + let vars = zip [ "write-q" + , "current" + , "fossil" + , "missed" + , "size" + ] + [nWq, nCu, nFo, nMissed, nSize] + + liftIO $ wait r + + lift $ withNCQ id ncqDir $ \ncq1 -> do + for_ blkQ $ \h -> liftIO do + void $ updateStats ncq1 h + hasBlock (AnyStorage ncq1) (coerce h) >>= \case + Nothing -> print $ "missed" <+> pretty h + Just x -> none + + results <- for vars $ \(k,w) -> do + v <- readTVarIO w + pure $ mkList @C [ mkSym k, mkInt v] + + liftIO $ print $ pretty $ mkList (mkSym "results" : results) + + + +main :: IO () +main = do + + tvd <- newTVarIO mempty + + let dict = makeDict @C do + + entry $ bindMatch "--help" $ nil_ \case + HelpEntryBound what -> helpEntry what + [StringLike s] -> helpList False (Just s) + _ -> helpList False Nothing + + internalEntries + + entry $ bindMatch "#!" $ nil_ $ const none + + entry $ bindMatch "--run" $ \case + (StringLike what : args) -> liftIO do + + liftIO (readFile what) + <&> parseTop + >>= either (error.show) pure + >>= \syn -> do + runTM tvd do + + for_ (zip [1..] args) $ \(i,a) -> do + let n = Id ("$" <> fromString (show i)) + SC.bind n a + + SC.bind "$argv" (mkList args) + + evalTop syn + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "debug" $ nil_ \case + + [ LitBoolVal False ] -> do + setLoggingOff @DEBUG + + [ StringLike "off" ] -> do + setLoggingOff @DEBUG + + _ -> + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + + entry $ bindMatch "test:ncq:test1" $ nil_ $ \syn -> lift do + let (opts, argz) = splitOpts [("-n",1)] syn + let n = headDef 100 [ x | ListVal [ StringLike "-n", LitIntVal x ] <- opts ] + debug $ "ncq:test1" <+> pretty n + testNCQ1 False "./tmp-ncq" (fromIntegral n) + + setupLogger + + argz <- liftIO getArgs + + forms <- parseTop (unlines $ unwords <$> splitForms argz) + & either (error.show) pure + + atomically $ writeTVar tvd dict + + (runEval tvd forms >>= eatNil display) + `finally` flushLoggers + + + diff --git a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/System.hs b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/System.hs index 3aeed406..bc9b8436 100644 --- a/miscellaneous/suckless-conf/lib/Data/Config/Suckless/System.hs +++ b/miscellaneous/suckless-conf/lib/Data/Config/Suckless/System.hs @@ -125,3 +125,5 @@ sysTempDir = do pure $ if null tmp1 then tmp2 else tmp1 + +