From 1b003ed124dae92e6ec34313377b49909cfdaf2c Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 31 Jul 2025 12:15:26 +0300 Subject: [PATCH] wip --- .../lib/HBS2/Storage/NCQ3/Internal.hs | 3 ++- .../lib/HBS2/Storage/NCQ3/Internal/Fossil.hs | 4 ++-- .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 21 ++++++++++++------- .../lib/HBS2/Storage/NCQ3/Internal/Types.hs | 1 + hbs2-tests/test/NCQ3.hs | 20 +++++++++++------- 5 files changed, 31 insertions(+), 18 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 88033220..bc741406 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -45,7 +45,7 @@ ncqStorageOpen3 fp upd = do let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 4 -- let ncqMinLog = 512 * megabytes - let ncqMinLog = 1 * gigabytes + let ncqMinLog = 1 * gigabytes let ncqMaxLog = 32 * gigabytes let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCachedIndex = 64 @@ -67,6 +67,7 @@ ncqStorageOpen3 fp upd = do ncqWrites <- newTVarIO 0 ncqWriteEMA <- newTVarIO 0.0 ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO + ncqSyncOps <- newTQueueIO ncqReadReq <- newTQueueIO ncqAlive <- newTVarIO False ncqStopReq <- newTVarIO False diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs index 30a5590f..bb91f397 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs @@ -171,10 +171,10 @@ appendSection fh sect = do liftIO (Posix.fdWrite fh sect) <&> fromIntegral {-# INLINE appendSection #-} -appendTailSection :: MonadIO m => Fd -> m () +appendTailSection :: MonadIO m => Fd -> m NCQFileSize appendTailSection fh = liftIO do s <- Posix.fileSize <$> Posix.getFdStatus fh - void (appendSection fh (fileTailRecord s)) + appendSection fh (fileTailRecord s) <&> (+ fromIntegral s) . fromIntegral {-# INLINE appendTailSection #-} diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 3ec955a6..ebbeb48a 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -59,9 +59,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do if not stop then STM.retry else pure Nothing maybe1 what none $ \(fk :: FileKey, fh) -> do - closeFd fh - ncqIndexFile ncq (DataFile fk) - loop + closeFd fh >> ncqIndexFile ncq (DataFile fk) >> loop let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do @@ -94,6 +92,8 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do spawnActivity measureWPS + -- spawnActivity (ncqStateUpdateLoop ncq) + spawnActivity $ postponed 10 $ forever do ema <- readTVarIO ncqWriteEMA @@ -124,6 +124,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do spawnActivity $ postponed 15 $ compactLoop 10 600 do ncqFossilMergeStep ncq + flip fix RunNew $ \loop -> \case RunFin -> do debug "exit storage" @@ -141,17 +142,21 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do RunSync (fk, fh, w, total, continue) -> do - stop <- readTVarIO ncqStopReq - sync <- readTVarIO ncqSyncReq + (stop,sync) <- atomically do + (,) <$> readTVar ncqStopReq + <*> readTVar ncqSyncReq + -- <*> readTVar ncqWriteEMA let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then pure w else do - appendTailSection fh >> liftIO (fileSynchronise fh) - ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize + ss <- appendTailSection fh + liftIO (fileSynchronise fh) + + -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize ncqStateUpdate ncq do ncqStateAddFact (P (PData (DataFile fk) ss)) @@ -207,7 +212,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do let written = sum ws loop $ RunSync (fk, fh, w + written, total' + written, True) - wait closer + mapM_ wait [closer] where setAlive = atomically $ writeTVar ncqAlive True diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index 0c70346c..e786b42b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -96,6 +96,7 @@ data NCQStorage3 = , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqWriteQ :: TVar (Seq HashRef) , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqSyncOps :: TQueue (IO ()) , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) , ncqAlive :: TVar Bool , ncqStopReq :: TVar Bool diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index feb3016b..e8f9f171 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -395,14 +395,14 @@ testNCQ3Lookup1 syn TestEnv{..} = do g <- liftIO MWC.createSystemRandom - let (opts, argz) = splitOpts [("-m",0),("-M",0)] syn + let (opts, argz) = splitOpts [("-m",1),("-M",0)] syn let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ] let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ] let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ] let r = (64*1024, 256*1024) - let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ] + let merge = headDef 0 [ step | ListVal [StringLike "-m", LitIntVal step] <- opts ] let mergeFull = headDef False [ True | ListVal [StringLike "-M"] <- opts ] notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts @@ -413,6 +413,10 @@ testNCQ3Lookup1 syn TestEnv{..} = do res <- newTQueueIO + let ntimes n m = flip fix n $ \loop i -> do + r <- m + if r && i > 0 then loop (i - 1) else pure r + ncqWithStorage3 ncqDir $ \sto -> liftIO do pooledForConcurrentlyN_ 8 sizes $ \size -> do z <- genRandomBS g size @@ -422,12 +426,14 @@ testNCQ3Lookup1 syn TestEnv{..} = do hs <- atomically $ STM.flushTQueue thashes let wrap m = if | mergeFull -> notice "full merge" >> ncqIndexCompactFull sto >> m - | merge -> + | merge > 0 -> fix \next -> do - notice "run ncqIndexCompactStep" - left <- ncqIndexCompactStep sto - m - if left then next else none + notice $ "run ncqIndexCompactStep" <+> pretty merge + flip fix merge \inner i -> do + left <- ntimes merge (ncqIndexCompactStep sto) + m + if left then next else none + | otherwise -> m wrap do