From 7a357dd8c4bc7412e2b1f1e25091bb45c3101854 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 22 Aug 2025 09:23:52 +0300 Subject: [PATCH] fixing wrong state on crash exit --- hbs2-cli/lib/HBS2/CLI/NCQ3/Migrate.hs | 10 +- hbs2-peer/app/Migrate.hs | 4 +- hbs2-peer/app/PeerMain.hs | 13 +- hbs2-storage-ncq/hbs2-storage-ncq.cabal | 1 + hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs | 1 + .../lib/HBS2/Storage/NCQ3/Internal.hs | 134 +++---------- .../lib/HBS2/Storage/NCQ3/Internal/Flags.hs | 32 ++++ .../lib/HBS2/Storage/NCQ3/Internal/Fossil.hs | 2 +- .../lib/HBS2/Storage/NCQ3/Internal/Index.hs | 3 +- .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 177 ++++++++++++++---- .../lib/HBS2/Storage/NCQ3/Internal/State.hs | 2 +- .../lib/HBS2/Storage/NCQ3/Internal/Sweep.hs | 35 ++-- .../lib/HBS2/Storage/NCQ3/Internal/Types.hs | 83 ++++---- hbs2-tests/test/NCQ3.hs | 21 +++ hbs2-tests/test/NCQ3/Endurance.hs | 107 +++++++++-- 15 files changed, 410 insertions(+), 215 deletions(-) create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Flags.hs diff --git a/hbs2-cli/lib/HBS2/CLI/NCQ3/Migrate.hs b/hbs2-cli/lib/HBS2/CLI/NCQ3/Migrate.hs index ac2bbc98..84f0a4c9 100644 --- a/hbs2-cli/lib/HBS2/CLI/NCQ3/Migrate.hs +++ b/hbs2-cli/lib/HBS2/CLI/NCQ3/Migrate.hs @@ -33,10 +33,6 @@ migrateEntries = do $ entry $ bindMatch "ncq3:migrate:ncq" $ nil_ $ \case [ StringLike src, StringLike dst] -> do - sto <- getStorage - - - api <- getClientAPI @PeerAPI @UNIX refs <- callRpcWaitMay @RpcPollList2 (1.0 :: Timeout 'Seconds) api (Nothing, Nothing) @@ -44,7 +40,11 @@ migrateEntries = do rrefs <- S.toList_ <$> for refs $ \(pk, s, _) -> case s of "reflog" -> S.yield (WrapRef $ RefLogKey @'HBS2Basic pk) - "refchan" -> S.yield (WrapRef $ RefChanLogKey @'HBS2Basic pk) + + "refchan" -> do + S.yield (WrapRef $ RefChanLogKey @'HBS2Basic pk) + S.yield (WrapRef $ RefChanHeadKey @'HBS2Basic pk) + "lwwref" -> S.yield (WrapRef $ LWWRefKey @'HBS2Basic pk) _ -> none diff --git a/hbs2-peer/app/Migrate.hs b/hbs2-peer/app/Migrate.hs index fa605f89..e07e91c8 100644 --- a/hbs2-peer/app/Migrate.hs +++ b/hbs2-peer/app/Migrate.hs @@ -101,7 +101,9 @@ migrate syn = flip runContT pure $ callCC \exit -> do rrefs <- S.toList_ <$> for refs $ \(pk, s, _) -> case s of "reflog" -> S.yield (WrapRef $ RefLogKey @'HBS2Basic pk) - "refchan" -> S.yield (WrapRef $ RefChanLogKey @'HBS2Basic pk) + "refchan" -> do + S.yield (WrapRef $ RefChanLogKey @'HBS2Basic pk) + S.yield (WrapRef $ RefChanHeadKey @'HBS2Basic pk) "lwwref" -> S.yield (WrapRef $ LWWRefKey @'HBS2Basic pk) _ -> none diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 1b8409d2..9cdbfe62 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -31,8 +31,8 @@ import HBS2.Net.Proto.Notify import HBS2.Peer.Proto.Mailbox import HBS2.OrDie import HBS2.Storage.Simple -import HBS2.Storage.NCQ3 --- import HBS2.Storage.NCQ +-- import HBS2.Storage.NCQ3 +import HBS2.Storage.NCQ import HBS2.Storage.Operations.Missed import HBS2.Data.Detect @@ -822,13 +822,14 @@ runPeer opts = respawnOnError opts $ flip runContT pure do -- error "STOP" - let ncqPath = coerce pref "ncq3" - -- let ncqPath = coerce pref "ncq" + -- let ncqPath = coerce pref "ncq3" + let ncqPath = coerce pref "ncq" debug $ "storage prefix:" <+> pretty ncqPath -- s <- ContT $ ncqWithStorage ncqPath - s <- lift $ ncqStorageOpen ncqPath id + -- s <- lift $ ncqStorageOpen ncqPath id + s <- lift $ ncqStorageOpen ncqPath -- simpleStorageInit @HbSync (Just pref) let blk = liftIO . hasBlock s @@ -1399,7 +1400,7 @@ checkMigration prefix = flip runContT pure $ callCC \exit -> do already <- Sy.doesDirectoryExist migration when (L.null blocks && not already) do - checkNCQ1 + -- checkNCQ1 exit () let reason = if already then diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 6238d8f5..f851dfd8 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -73,6 +73,7 @@ library HBS2.Storage.NCQ3.Internal.MMapCache HBS2.Storage.NCQ3.Internal.Files HBS2.Storage.NCQ3.Internal.Fossil + HBS2.Storage.NCQ3.Internal.Flags HBS2.Storage.NCQ HBS2.Storage.NCQ.Types -- other-modules: diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs index 1316e9f0..e0795167 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -25,6 +25,7 @@ import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Memtable import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.Fossil +import HBS2.Storage.NCQ3.Internal.Flags as Exported diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 240b347e..22e1cd17 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -27,20 +27,22 @@ import System.FileLock as FL ncqStorageOpen :: MonadIO m => FilePath -> (NCQStorage -> NCQStorage) -> m NCQStorage ncqStorageOpen fp upd = do - let ncqRoot = fp - let ncqGen = 0 + let ncqRoot = fp + let ncqGen = 0 -- let ncqFsync = 16 * megabytes - let ncqFsync = 16 * megabytes - let ncqWriteQLen = 1024 * 4 - let ncqMinLog = 512 * megabytes - let ncqMaxLog = 32 * gigabytes - let ncqWriteBlock = max 256 $ ncqWriteQLen `div` 2 - let ncqMaxCachedIndex = 64 - let ncqMaxCachedData = 64 - let ncqIdleThrsh = 50.0 - let ncqPostponeMerge = 300.0 - let ncqPostponeSweep = 2 * ncqPostponeMerge - let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" + let ncqFsync = 16 * megabytes + let ncqWriteQLen = 1024 * 4 + let ncqMinLog = 512 * megabytes + let ncqMaxLog = 32 * gigabytes + let ncqWriteBlock = max 256 $ ncqWriteQLen `div` 2 + let ncqMaxCachedIndex = 64 + let ncqMaxCachedData = 64 + let ncqIdleThrsh = 50.0 + let ncqPostponeService = 20 + let ncqSweepTime = 30.00 + let ncqMergeTimeA = 10.00 + let ncqMergeTimeB = 60.00 + let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" cap <- getNumCapabilities @@ -61,10 +63,12 @@ ncqStorageOpen fp upd = do ncqAlive <- newTVarIO False ncqStopReq <- newTVarIO False ncqSyncReq <- newTVarIO False + ncqSweepReq <- newTVarIO False + ncqMergeReq <- newTVarIO False ncqOnRunWriteIdle <- newTVarIO none ncqSyncNo <- newTVarIO 0 ncqState <- newTVarIO mempty - ncqStateKey <- newTVarIO (FileKey maxBound) + ncqStateKey <- newTVarIO ncqNullStateKey ncqStateUse <- newTVarIO mempty ncqServiceSem <- atomically $ newTSem 1 ncqRunSem <- atomically $ newTSem 1 @@ -75,17 +79,21 @@ ncqStorageOpen fp upd = do mkdir (ncqGetWorkDir ncq) - liftIO (FL.tryLockFile (ncqGetFileName ncq ".lock") Exclusive) - >>= orThrow NCQStorageCurrentAlreadyOpen - >>= atomically . writeTVar ncqFileLock . Just - - liftIO (ncqTryLoadState ncq) - pure ncq -ncqWithStorage :: MonadUnliftIO m => FilePath -> (NCQStorage -> m a) -> m a -ncqWithStorage fp action = flip runContT pure do - sto <- lift (ncqStorageOpen fp id) +{- HLINT ignore "Eta reduce" -} + +ncqWithStorage :: MonadUnliftIO m + => FilePath + -> (NCQStorage -> m a) -> m a +ncqWithStorage fp action = ncqWithStorage0 fp id action + +ncqWithStorage0 :: MonadUnliftIO m + => FilePath + -> (NCQStorage -> NCQStorage) + -> (NCQStorage -> m a) -> m a +ncqWithStorage0 fp tune action = flip runContT pure do + sto <- lift (ncqStorageOpen fp tune) w <- ContT $ withAsync (ncqStorageRun sto) link w r <- lift (action sto) @@ -200,86 +208,6 @@ ncqPutBS0 wait ncq@NCQStorage{..} mtp mhref bs' = ncqOperation ncq (pure $ fromM where hash0 = HashRef (hashObject @HbSync bs') -ncqTryLoadState :: forall m. MonadUnliftIO m - => NCQStorage - -> m () - -ncqTryLoadState me@NCQStorage{..} = withSem ncqServiceSem do - - stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" ) - - r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case - (r, s, []) -> pure (r,s,[]) - (l, s0, (_,s):ss) -> do - - readStateMay me s >>= \case - Nothing -> next (s : l, s0, ss) - Just ns -> do - ok <- checkState ns - if ok then - pure (l <> fmap snd ss, ns, ss) - else - next (s : l, s0, ss) - - let (bad, new@NCQState{..}, rest) = r - - atomically $ modifyTVar ncqState (<> new) - - for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do - - let path = ncqGetFileName me dataFile - realSize <- fileSize path - - let sizewtf = realSize /= fromIntegral s - - flip fix 0 $ \again i -> do - - good <- try @_ @NCQFsckException (ncqFileFastCheck path) - - let corrupted = isLeft good - - if not corrupted then do - debug $ yellow "indexing" <+> pretty dataFile - ncqIndexFile me Nothing dataFile - else do - - o <- ncqFileTryRecover path - warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize) - - let best = if i < 1 then max s o else s - - warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path) - - liftIO $ PFS.setFileSize path (fromIntegral best) - - if i <= 1 then again (succ i) else pure Nothing - - - for_ (bad <> fmap snd rest) $ \f -> do - let old = ncqGetFileName me (StateFile f) - rm old - - where - - -- TODO: created-but-not-indexed-file? - - checkState NCQState{..} = flip runContT pure $ callCC \exit -> do - - for_ ncqStateFiles $ \fk -> do - - let dataFile = ncqGetFileName me (DataFile fk) - here <- doesFileExist dataFile - - unless here $ exit False - - lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case - Left e -> err (viaShow e) >> exit False - Right () -> none - - pure True - - - class IsTomb a where diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Flags.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Flags.hs new file mode 100644 index 00000000..89cd4696 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Flags.hs @@ -0,0 +1,32 @@ +module HBS2.Storage.NCQ3.Internal.Flags where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types + +import Control.Concurrent.STM qualified as STM + +ncqSetFlagSTM :: TVar Bool -> STM () +ncqSetFlagSTM t = writeTVar t True + +ncqSetFlag :: MonadIO m => TVar Bool -> m () +ncqSetFlag t = atomically $ writeTVar t True + +ncqClearFlagSTM :: TVar Bool -> STM () +ncqClearFlagSTM t = writeTVar t False + +ncqClearFlag :: MonadIO m => TVar Bool -> m () +ncqClearFlag t = liftIO (atomically $ ncqClearFlagSTM t) + +ncqWaitFlagSTM :: TVar Bool -> STM Bool +ncqWaitFlagSTM t = do + val <- readTVar t + unless val STM.retry + writeTVar t False + pure val + +ncqGetFlagSTM :: TVar Bool -> STM Bool +ncqGetFlagSTM = readTVar + +ncqGetFlag :: MonadIO m => TVar Bool -> m Bool +ncqGetFlag = liftIO . readTVarIO + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs index 1fda301b..2a9399cc 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs @@ -62,7 +62,7 @@ ncqFossilMergeStep :: forall m . MonadUnliftIO m => NCQStorage -> m Bool -ncqFossilMergeStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do +ncqFossilMergeStep me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do tmax <- liftIO getPOSIXTime >>= newTVarIO debug "ncqFossilMergeStep" diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index 98795b21..80ac56c4 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -156,7 +156,7 @@ ncqIndexCompactFull ncq = fix \again -> ncqIndexCompactStep :: MonadUnliftIO m => NCQStorage -> m Bool -ncqIndexCompactStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do +ncqIndexCompactStep me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do debug "ncqIndexCompactStep" @@ -198,6 +198,7 @@ ncqIndexCompactStep me@NCQStorage{..} = withSem ncqServiceSem $ flip runContT pu liftIO $ PFS.setFileTimesHiRes result ts ts fki <- ncqGetNewFileKey me IndexFile + moveFile result (ncqGetFileName me (IndexFile fki)) debug $ "state update" <+> pretty a <+> pretty b <+> "=>" <+> pretty fki diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index dd8c7ee0..d25c63db 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -11,28 +11,143 @@ import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Sweep import HBS2.Storage.NCQ3.Internal.MMapCache import HBS2.Storage.NCQ3.Internal.Fossil +import HBS2.Storage.NCQ3.Internal.Flags +import Control.Concurrent.STM qualified as STM import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe -import Data.HashSet qualified as HS -import Data.Vector qualified as V -import Data.Sequence qualified as Seq +import Data.Either import Data.Fixed +import Data.HashSet qualified as HS +import Data.HashMap.Strict qualified as HM +import Data.List qualified as List +import Data.Sequence qualified as Seq +import Data.Set qualified as Set +import Data.Vector qualified as V +import System.FileLock as FL +import System.Posix.Files qualified as PFS import System.Posix.IO as PosixBase +import System.Posix.IO.ByteString as Posix import System.Posix.Types as Posix import System.Posix.Unistd -import System.Posix.IO.ByteString as Posix -import Control.Concurrent.STM qualified as STM -import System.FileLock as FL ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageStop NCQStorage{..} = do atomically $ writeTVar ncqStopReq True + +ncqTryLoadState :: forall m. MonadUnliftIO m + => NCQStorage + -> m () + +ncqTryLoadState me@NCQStorage{..} = do + + debug "ncqTryLoadState" + + stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" ) + <&> List.sortOn ( Down . snd ) + + r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case + (r, s, []) -> pure (r,s,[]) + (l, s0, (_,s):ss) -> do + + readStateMay me s >>= \case + Nothing -> next (s : l, s0, ss) + Just ns -> do + ok <- checkState ns + debug $ "state status" <+> pretty s <+> pretty ok + if ok then + pure (l <> fmap snd ss, ns, ss) + else + next (s : l, s0, ss) + + let (bad, new@NCQState{..}, rest) = r + + atomically $ modifyTVar ncqState (<> new) + + for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do + + let path = ncqGetFileName me dataFile + realSize <- fileSize path + + let sizewtf = realSize /= fromIntegral s + + flip fix 0 $ \again i -> do + + good <- try @_ @NCQFsckException (ncqFileFastCheck path) + + let corrupted = isLeft good + + if not corrupted then do + debug $ yellow "indexing" <+> pretty dataFile + ncqIndexFile me Nothing dataFile + else do + + o <- ncqFileTryRecover path + warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize) + + let best = if i < 1 then max s o else s + + warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path) + + liftIO $ PFS.setFileSize path (fromIntegral best) + + if i <= 1 then again (succ i) else pure Nothing + + + for_ (bad <> fmap snd rest) $ \f -> do + let old = ncqGetFileName me (StateFile f) + rm old + + where + + -- TODO: created-but-not-indexed-file? + + checkState NCQState{..} = flip runContT pure $ callCC \exit -> do + + for_ ncqStateFiles $ \fk -> do + + let dataFile = ncqGetFileName me (DataFile fk) + here <- doesFileExist dataFile + + unless here $ exit False + + -- lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case + -- Right () -> none + -- Left e -> do + -- warn (viaShow e) + -- let known = HM.lookup fk facts + -- fs <- fileSize dataFile + -- warn $ "file is incomplete (or damaged)" + -- <+> pretty dataFile + -- <+> "actual:" <+> pretty fs + -- <+> "known:" <+> pretty known + -- let ok = isJust known && Just (fromIntegral fs) >= known + -- unless ok $ exit False + + for_ ncqStateIndex $ \(_,fk) -> do + + let idxFile = ncqGetFileName me (IndexFile fk) + here <- doesFileExist idxFile + + unless here do + err $ red "missed index in state" <+> pretty idxFile + exit False + + pure True + + ncqStorageRun :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do + + debug "ncqStorageRun" + + liftIO (FL.tryLockFile (ncqGetFileName ncq ".lock") Exclusive) + >>= orThrow NCQStorageCurrentAlreadyOpen + >>= atomically . writeTVar ncqFileLock . Just + ContT $ bracket setAlive (const unsetAlive) ContT $ bracket none $ const $ liftIO do @@ -41,6 +156,8 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do ContT $ bracket none $ const $ liftIO do debug "storage done" + liftIO (ncqTryLoadState ncq) + closeQ <- liftIO newTQueueIO closer <- spawnActivity $ liftIO $ fix \loop -> do @@ -86,7 +203,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do -- debug $ "NOT FOUND SHIT" <+> pretty h answer Nothing >> exit () - -- spawnActivity measureWPS + spawnActivity measureWPS spawnActivity (ncqStateUpdateLoop ncq) @@ -95,29 +212,17 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do ema <- readTVarIO ncqWriteEMA debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema) - spawnActivity $ postponed 30 $ forever do - lsInit <- ncqLiveKeys ncq <&> HS.size - void $ race (pause @'Seconds 30) do - flip fix lsInit $ \next ls0 -> do - (lsA,lsB) <- atomically do - ema <- readTVar ncqWriteEMA - ls1 <- ncqLiveKeysSTM ncq <&> HS.size + spawnActivity $ postponed ncqPostponeService $ forever do + ncqSweepObsoleteStates ncq + ncqSweepFiles ncq + void $ race (pause @'Seconds ncqSweepTime) do + atomically (ncqWaitFlagSTM ncqSweepReq) - if ls1 /= ls0 && ema < ncqIdleThrsh then - pure (ls0,ls1) - else - STM.retry - - debug $ "do sweep" <+> pretty lsA <+> pretty lsB - ncqSweepObsoleteStates ncq - ncqSweepFiles ncq - next lsB - - spawnActivity $ postponed 20 $ compactLoop 10 30 do - ncqIndexCompactStep ncq - - spawnActivity $ postponed 20 $ compactLoop 10 60 do - ncqFossilMergeStep ncq + spawnActivity $ postponed ncqPostponeService + $ compactLoop ncqMergeReq ncqMergeTimeA ncqMergeTimeB $ withSem ncqServiceSem do + a <- ncqFossilMergeStep ncq + b <- ncqIndexCompactStep ncq + pure $ a || b flip fix RunNew $ \loop -> \case RunFin -> do @@ -255,12 +360,18 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do postponed n m = liftIO (pause @'Seconds n) >> m - compactLoop :: Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m () - compactLoop t1 t2 what = forever $ void $ runMaybeT do - ema <- readTVarIO ncqWriteEMA + compactLoop :: TVar Bool + -> Timeout 'Seconds + -> Timeout 'Seconds + -> m Bool + -> m () + compactLoop flag t1 t2 what = forever $ void $ runMaybeT do + ema <- readTVarIO ncqWriteEMA + fired <- ncqGetFlag flag - when (ema > ncqIdleThrsh) $ pause @'Seconds t1 >> mzero + when (ema > ncqIdleThrsh && not fired) $ pause @'Seconds t1 >> mzero + ncqClearFlag flag compacted <- lift what when compacted mzero diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index 4bd9ea8a..940f21e7 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -108,7 +108,7 @@ ncqStateDelIndexFile fk = do where f (_,b) = b /= fk sortIndexes :: NCQState -> NCQState -sortIndexes = over #ncqStateIndex (List.sortOn fst) +sortIndexes = over #ncqStateIndex sortIndexes0 ncqStateCapture :: forall m . MonadUnliftIO m diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs index 8c71c031..fd537231 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs @@ -4,13 +4,15 @@ module HBS2.Storage.NCQ3.Internal.Sweep where import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.Files +import HBS2.Storage.NCQ3.Internal.State -import Data.Generics.Uniplate.Operations +import Control.Monad.Trans.Cont import Data.Generics.Uniplate.Data() -import Data.List qualified as List -import Data.HashSet qualified as HS -import System.Posix.Files qualified as PFS +import Data.Generics.Uniplate.Operations import Data.HashMap.Strict qualified as HM +import Data.HashSet qualified as HS +import Data.List qualified as List +import System.Posix.Files qualified as PFS ncqLiveKeysSTM :: NCQStorage -> STM (HashSet FileKey) ncqLiveKeysSTM NCQStorage{..} = do @@ -24,19 +26,26 @@ ncqLiveKeysSTM NCQStorage{..} = do ncqLiveKeys :: forall m . MonadIO m => NCQStorage -> m (HashSet FileKey) ncqLiveKeys ncq = atomically $ ncqLiveKeysSTM ncq +{- HLINT ignore "Functor law"-} + ncqSweepFiles :: forall m . MonadUnliftIO m => NCQStorage -> m () -ncqSweepFiles me@NCQStorage{..} = withSem ncqServiceSem do +ncqSweepFiles me@NCQStorage{..} = do debug "ncqSweepFiles" - live <- ncqLiveKeys me - - - debug $ "ALIVE" <+> pretty (HS.toList live) - fossils <- ncqListFilesBy me (List.isPrefixOf "f-") indexes <- ncqListFilesBy me (List.isPrefixOf "i-") + stateFiles <- ncqListFilesBy me (List.isPrefixOf "s-") <&> fmap snd + + liveOnDisk <- for stateFiles (readStateMay me) + <&> mconcat . catMaybes + <&> HS.fromList . universeBi @_ @FileKey + + live <- ncqLiveKeys me <&> (<> liveOnDisk) + + debug $ "ALIVE" <+> pretty (HS.toList live) + for_ indexes $ \(_, k) -> unless (HS.member k live) do let fn = ncqGetFileName me (IndexFile k) debug $ yellow "REMOVING" <+> pretty (takeFileName fn) @@ -49,14 +58,18 @@ ncqSweepFiles me@NCQStorage{..} = withSem ncqServiceSem do ncqSweepObsoleteStates :: forall m . MonadUnliftIO m => NCQStorage -> m () -ncqSweepObsoleteStates me@NCQStorage{..} = withSem ncqServiceSem do +ncqSweepObsoleteStates me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do debug $ "ncqSweepObsoleteStates" k <- readTVarIO ncqStateKey + when (k == ncqNullStateKey) $ exit () + r <- liftIO $ try @_ @SomeException do ts <- PFS.getFileStatus (ncqGetFileName me (StateFile k)) <&> PFS.modificationTimeHiRes + filez <- ncqListFilesBy me (List.isPrefixOf "s-") + <&> List.drop 1 . List.sortOn (Down . snd) -- delete old 10 states for_ filez $ \(t,f) -> do diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index c5dcd456..98105e89 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -7,6 +7,7 @@ import Numeric (readHex) import Data.Data import Data.Set qualified as Set import Data.HashSet qualified as HS +import Data.List qualified as List import Text.Printf import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem) import System.FileLock (FileLock) @@ -83,42 +84,47 @@ data NCQState = data NCQStorage = NCQStorage - { ncqRoot :: FilePath - , ncqGen :: Int - , ncqSalt :: HashRef - , ncqPostponeMerge :: Timeout 'Seconds - , ncqPostponeSweep :: Timeout 'Seconds - , ncqFsync :: Int - , ncqWriteQLen :: Int - , ncqWriteBlock :: Int - , ncqMinLog :: Int - , ncqMaxLog :: Int - , ncqMaxCachedIndex :: Int - , ncqMaxCachedData :: Int - , ncqReadThreads :: Int - , ncqIdleThrsh :: Double - , ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex) - , ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData) - , ncqMemTable :: Vector Shard - , ncqState :: TVar NCQState - , ncqStateKey :: TVar FileKey - , ncqStateUse :: TVar (HashMap FileKey (NCQState, TVar Int)) - , ncqCurrentFossils :: TVar (HashSet FileKey) - , ncqWrites :: TVar Int - , ncqWriteEMA :: TVar Double -- for writes-per-seconds - , ncqWriteQ :: TVar (Seq HashRef) - , ncqWriteOps :: Vector (TQueue (IO ())) - , ncqSyncOps :: TQueue (IO ()) - , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) - , ncqAlive :: TVar Bool - , ncqStopReq :: TVar Bool - , ncqSyncReq :: TVar Bool - , ncqOnRunWriteIdle :: TVar (IO ()) - , ncqSyncNo :: TVar Int - , ncqServiceSem :: TSem - , ncqRunSem :: TSem - , ncqFileLock :: TVar (Maybe FileLock) + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqSalt :: HashRef + , ncqPostponeService :: Timeout 'Seconds + , ncqSweepTime :: Timeout 'Seconds + , ncqMergeTimeA :: Timeout 'Seconds + , ncqMergeTimeB :: Timeout 'Seconds + , ncqFsync :: Int + , ncqWriteQLen :: Int + , ncqWriteBlock :: Int + , ncqMinLog :: Int + , ncqMaxLog :: Int + , ncqMaxCachedIndex :: Int + , ncqMaxCachedData :: Int + , ncqReadThreads :: Int + , ncqIdleThrsh :: Double + , ncqMMapCachedIdx :: TVar (HashPSQ FileKey CachePrio CachedIndex) + , ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData) + , ncqMemTable :: Vector Shard + , ncqState :: TVar NCQState + , ncqStateKey :: TVar FileKey + , ncqStateUse :: TVar (HashMap FileKey (NCQState, TVar Int)) + , ncqCurrentFossils :: TVar (HashSet FileKey) + , ncqWrites :: TVar Int + , ncqWriteEMA :: TVar Double -- for writes-per-seconds + , ncqWriteQ :: TVar (Seq HashRef) + , ncqWriteOps :: Vector (TQueue (IO ())) + , ncqSyncOps :: TQueue (IO ()) + , ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location)) + , ncqAlive :: TVar Bool + , ncqStopReq :: TVar Bool + , ncqSyncReq :: TVar Bool + , ncqSweepReq :: TVar Bool + , ncqMergeReq :: TVar Bool + , ncqOnRunWriteIdle :: TVar (IO ()) + , ncqSyncNo :: TVar Int + , ncqServiceSem :: TSem + , ncqRunSem :: TSem + , ncqFileLock :: TVar (Maybe FileLock) } + deriving stock (Generic) instance Monoid FileKey where @@ -147,7 +153,7 @@ instance Semigroup NCQState where (<>) a b = NCQState files index seqq version facts where files = ncqStateFiles a <> ncqStateFiles b - index = ncqStateIndex a <> ncqStateIndex b + index = sortIndexes0 (ncqStateIndex a <> ncqStateIndex b) seqq = max (ncqStateFileSeq a) (ncqStateFileSeq b) version = max (ncqStateVersion a) (ncqStateVersion b) facts = ncqStateFacts a <> ncqStateFacts b @@ -208,6 +214,8 @@ instance Pretty NCQState where pf (P (PData (DataFile a) s)) = "fp" <+> pretty a <+> pretty s +sortIndexes0 :: [(Down POSIXTime, b)] -> [(Down POSIXTime, b)] +sortIndexes0 = List.sortOn fst ncqTombEntrySize :: NCQSize ncqTombEntrySize = ncqSLen + ncqKeyLen + ncqPrefixLen @@ -226,5 +234,6 @@ ncqDeferredWriteOpSTM NCQStorage{..} work = do logErr :: forall x a m . (Pretty x, MonadUnliftIO m) => x -> m a -> m a logErr loc m = handle (\(e::SomeException) -> err (pretty loc <> ":" <> viaShow e) >> throwIO e) m - +ncqNullStateKey :: FileKey +ncqNullStateKey = FileKey maxBound diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 04dcff10..e0386167 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -868,6 +868,27 @@ ncq3Tests = do notice "re-opened storage test done" + entry $ bindMatch "test:ncq3:wrong-state" $ nil_ $ \e -> do + g <- liftIO MWC.createSystemRandom + let (opts,args) = splitOpts [] e + let path = headDef "." [x | StringLike x <- args ] + notice $ "root path" <+> pretty path + + let params = set #ncqPostponeService 1 + + ncqWithStorage0 path params $ \sto -> do + + void $ race (pause @'Seconds 600) $ forever do + p <- liftIO $ uniformRM (0,3.00) g + pause @'Seconds (realToFrac p) + n <- liftIO $ uniformRM (1,256*1024) g + s <- liftIO $ genRandomBS g n + h <- putBlock (AnyStorage sto) (LBS.fromStrict s) + debug $ "block written" <+> pretty h <+> pretty n + + none + + ncq3EnduranceTest ncq3EnduranceTestInProc diff --git a/hbs2-tests/test/NCQ3/Endurance.hs b/hbs2-tests/test/NCQ3/Endurance.hs index b8615396..9dbb345e 100644 --- a/hbs2-tests/test/NCQ3/Endurance.hs +++ b/hbs2-tests/test/NCQ3/Endurance.hs @@ -83,12 +83,16 @@ data EnduranceFSM = | EndurancePutBlk | EnduranceHasBlk | EnduranceGetBlk + | EnduranceHasSeedBlk | EnduranceDelBlk | EndurancePutRef | EnduranceGetRef | EnduranceDelRef | EnduranceStorm + | EnduranceCalm | EnduranceKill + | EnduranceMerge + | EnduranceSweep | EnduranceStop buildCDF :: [(s, Double)] -> (V.Vector s, U.Vector Double) @@ -157,6 +161,11 @@ validateTestResult logFile = do atomically $ modifyTVar blocks (HM.insert h (Left ())) _ -> none + entry $ bindMatch "has-seed-block-result" $ nil_ \case + [ HashLike _, LitIntVal _ ] -> none + [ HashLike h] -> err $ red "missed seed block" <+> pretty h + _ -> none + -- has-block-result entry $ bindMatch "has-block-result" $ nil_ \case [ HashLike h, LitIntVal n ] -> do @@ -255,18 +264,25 @@ ncq3EnduranceTest = do LitIntVal x -> fromIntegral x _ -> 0 - wIdle <- dbl <$> lookupValueDef (mkDouble 200.00) "w:idle" - wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def" - wPutBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:putblk" - wGetBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:getblk" - wHasBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:hasblk" - wDelBlk <- dbl <$> lookupValueDef (mkDouble 3.00) "w:delblk" - wPutRef <- dbl <$> lookupValueDef (mkDouble 5.00) "w:putref" - wGetRef <- dbl <$> lookupValueDef (mkDouble 10.00) "w:getref" - wDelRef <- dbl <$> lookupValueDef (mkDouble 1.00) "w:delref" - wStorm <- dbl <$> lookupValueDef (mkDouble 0.80) "w:storm" - wKill <- dbl <$> lookupValueDef (mkDouble 0.0004) "w:kill" - wNum <- int <$> lookupValueDef (mkInt 10000) "w:num" + wSeed <- int <$> lookupValueDef (mkInt 1000) "w:seed" + wIdle <- dbl <$> lookupValueDef (mkDouble 200.00) "w:idle" + wIdleDef <- dbl <$> lookupValueDef (mkDouble 0.25) "w:idle:def" + wMaxBlk <- int <$> lookupValueDef (mkInt 65536) "w:maxblk" + wPutBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:putblk" + wGetBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:getblk" + wHasBlk <- dbl <$> lookupValueDef (mkDouble 30.00) "w:hasblk" + wDelBlk <- dbl <$> lookupValueDef (mkDouble 3.00) "w:delblk" + wPutRef <- dbl <$> lookupValueDef (mkDouble 5.00) "w:putref" + wGetRef <- dbl <$> lookupValueDef (mkDouble 10.00) "w:getref" + wDelRef <- dbl <$> lookupValueDef (mkDouble 1.00) "w:delref" + wStorm <- dbl <$> lookupValueDef (mkDouble 0.80) "w:storm" + wStormMin <- dbl <$> lookupValueDef (mkDouble 1.00) "w:stormmin" + wStormMax <- dbl <$> lookupValueDef (mkDouble 60.00) "w:stormmax" + wCalm <- dbl <$> lookupValueDef (mkDouble 0.001) "w:calm" + wKill <- dbl <$> lookupValueDef (mkDouble 0.00) "w:kill" + wMerge <- dbl <$> lookupValueDef (mkDouble 0.001) "w:merge" + wSweep <- dbl <$> lookupValueDef (mkDouble 0.001) "w:sweep" + wNum <- int <$> lookupValueDef (mkInt 10000) "w:num" runTest \TestEnv{..} -> do @@ -278,10 +294,12 @@ ncq3EnduranceTest = do rest <- newTVarIO n blocks <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double () ) + seed <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double () ) refs <- newTVarIO ( HPSQ.empty :: HPSQ.HashPSQ HashRef Double HashRef ) killed <- newTVarIO 0 let getRandomBlock = liftIO $ getRandomFromPSQ g blocks + let getRandomSeedBlock = liftIO $ getRandomFromPSQ g seed let getRandomRef = liftIO $ getRandomFromPSQ g refs let d = makeDict do @@ -321,12 +339,16 @@ ncq3EnduranceTest = do let actions = [ (EnduranceIdle, wIdle) , (EndurancePutBlk, wPutBlk) , (EnduranceGetBlk, wGetBlk) + , (EnduranceHasSeedBlk, wHasBlk) , (EnduranceHasBlk, wHasBlk) , (EnduranceDelBlk, wDelBlk) , (EndurancePutRef, wPutRef) , (EnduranceGetRef, wGetRef) , (EnduranceDelRef, wDelRef) , (EnduranceStorm, wStorm) + , (EnduranceCalm, wCalm) + , (EnduranceMerge, wMerge) + , (EnduranceSweep, wSweep) , (EnduranceKill, wKill) ] @@ -339,6 +361,21 @@ ncq3EnduranceTest = do , "test:ncq3:endurance:inner", testEnvDir ] & setStdin createPipe & setStdout createPipe + ncqWithStorage testEnvDir $ \sto -> do + replicateM_ wSeed do + n <- liftIO $ uniformRM (1, wMaxBlk) g + bs <- liftIO $ LBS.fromStrict <$> genRandomBS g n + putBlock (AnyStorage sto) bs >>= \case + Just h -> atomically $ modifyTVar seed (HPSQ.insert (HashRef h) 1.0 ()) + Nothing -> err $ red "can't write seed block" + + ncqWithStorage testEnvDir $ \sto -> do + seeds <- readTVarIO seed <&> HPSQ.toList + for_ seeds $ \(h,_,_) -> do + here <- hasBlock (AnyStorage sto) (coerce h) + unless (isJust here) do + err $ "missed seed block" <+> pretty h + fix \recover -> handle (\(e :: IOException) -> err (viaShow e) >> pause @'Seconds 1 >> recover) do flip runContT pure do @@ -354,7 +391,7 @@ ncq3EnduranceTest = do pread <- ContT $ withAsync $ fix \loop -> do liftIO (try @_ @IOException (IO.hGetLine outp)) >>= \case Left e | isEOFError e -> none - Left e -> err (viaShow e) + Left e -> err (viaShow e) >> throwIO e Right s -> do liftIO do appendFile logFile (s <> "\n") @@ -362,6 +399,8 @@ ncq3EnduranceTest = do putStrLn s loop + link pread + ContT $ withAsync $ forever do join $ atomically (readTQueue storms) @@ -403,7 +442,7 @@ ncq3EnduranceTest = do getNextState >>= loop EndurancePutBlk -> do - bsize <- liftIO $ uniformRM (1, 256*1024) g + bsize <- liftIO $ uniformRM (1, wMaxBlk) g liftIO $ IO.hPrint inp ("write-random-block" <+> viaShow bsize) atomically $ modifyTVar rest pred getNextState >>= loop @@ -422,6 +461,13 @@ ncq3EnduranceTest = do getNextState >>= loop + EnduranceHasSeedBlk -> do + blk <- getRandomSeedBlock + for_ blk $ \h -> do + liftIO $ IO.hPrint inp ("has-seed-block" <+> pretty h) + + getNextState >>= loop + EnduranceGetBlk -> do blk <- getRandomBlock for_ blk $ \h -> do @@ -448,6 +494,14 @@ ncq3EnduranceTest = do liftIO $ IO.hPrint inp ("del-ref" <+> pretty h) getNextState >>= loop + EnduranceMerge -> do + liftIO $ IO.hPrint inp "merge" + getNextState >>= loop + + EnduranceSweep -> do + liftIO $ IO.hPrint inp "sweep" + getNextState >>= loop + EnduranceKill -> do debug $ red "KILL" <+> viaShow pid cancel pread @@ -467,6 +521,12 @@ ncq3EnduranceTest = do notice $ "validate" <+> pretty logFile liftIO $ validateTestResult logFile + EnduranceCalm -> do + n <- liftIO $ uniformRM (0.5,10.00) g + debug $ "CALM" <+> pretty n + pause @'Seconds (realToFrac n) + getNextState >>= loop + EnduranceStorm -> do now <- getTimeCoarse @@ -482,7 +542,7 @@ ncq3EnduranceTest = do loop EnduranceIdle | otherwise -> do - t0 <- liftIO $ uniformRM (0,10.00) g + t0 <- liftIO $ uniformRM (wStormMin,wStormMax) g debug $ red "FIRE IN DA HOLE!" <+> pretty t0 atomically $ writeTQueue storms do atomically $ writeTVar idleTime 0 @@ -522,7 +582,7 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do Right _ -> none where - dict g sto = makeDict @c @m do + dict g sto@NCQStorage{..} = makeDict @c @m do entry $ bindMatch "exit" $ const do pure $ mkSym "done" @@ -542,6 +602,13 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do e -> throwIO (BadFormException @c (mkList e)) + entry $ bindMatch "has-seed-block" $ nil_ \case + [ HashLike h ] -> do + s <- hasBlock (AnyStorage sto) (coerce h) + liftIO $ print $ "has-seed-block-result" <+> pretty h <+> pretty s + + e -> throwIO (BadFormException @c (mkList e)) + entry $ bindMatch "get-block" $ nil_ \case [ HashLike h ] -> do s <- getBlock (AnyStorage sto) (coerce h) @@ -578,4 +645,12 @@ testEnduranceInner path = flip runContT pure $ callCC \exit -> do e -> throwIO (BadFormException @c (mkList e)) + entry $ bindMatch "merge" $ nil_ $ const do + ncqSetFlag ncqMergeReq + liftIO $ print $ "merge" + + entry $ bindMatch "sweep" $ nil_ $ const do + ncqSetFlag ncqSweepReq + liftIO $ print $ "sweep" +