diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index e609aa40..99003b54 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -73,7 +73,8 @@ ncqStorageOpen3 fp upd = do ncqOnRunWriteIdle <- newTVarIO none ncqSyncNo <- newTVarIO 0 ncqState <- newTVarIO mempty - ncqStateKey <- newTVarIO mempty + ncqStateKey <- newTVarIO (FileKey maxBound) + ncqStateUse <- newTVarIO mempty ncqServiceSem <- atomically $ newTSem 1 let ncq = NCQStorage3{..} & upd diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index 685f6bed..da3f01d3 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -90,7 +90,6 @@ ncqIndexFile n fk = runMaybeT do ncqStateUpdate n do ncqStateAddIndexFile ts fki ncqStateAddDataFile (coerce fk) - ncqStateAddFact (FI fk (IndexFile fki)) ncqStateDelFact (P (PData fk 0)) (bs,nw) <- toMPlus midx @@ -111,6 +110,8 @@ ncqIndexCompactStep :: MonadUnliftIO m -> m Bool ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do + debug "ncqIndexCompactStep" + idx <- readTVarIO ncqState <&> fmap (IndexFile . snd) . ncqStateIndex @@ -150,6 +151,7 @@ ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do fki <- ncqGetNewFileKey me IndexFile mv result (ncqGetFileName me (IndexFile fki)) + debug $ "state update" <+> pretty a <+> pretty b <+> "=>" <+> pretty fki ncqStateUpdate me do ncqStateDelIndexFile (coerce a) ncqStateDelIndexFile (coerce b) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs index b3cab583..0ba2e8ae 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/MMapCache.hs @@ -52,20 +52,17 @@ ncqGetCachedIndex ncq@NCQStorage3{..} = Nothing -> throwIO $ NCQStorageCantMapFile path Just (bs, nway) -> pure (CachedIndex bs nway) -ncqDelCachedIndex :: forall m . MonadUnliftIO m - => NCQStorage3 - -> FileKey - -> m () +ncqDelCachedIndexSTM :: NCQStorage3 + -> FileKey + -> STM () -ncqDelCachedIndex NCQStorage3{..} fk = - atomically (modifyTVar ncqMMapCachedIdx$ HPSQ.delete fk) +ncqDelCachedIndexSTM NCQStorage3{..} fk = + modifyTVar ncqMMapCachedIdx$ HPSQ.delete fk +ncqDelCachedDataSTM :: NCQStorage3 + -> FileKey + -> STM () -ncqDelCachedData :: forall m . MonadUnliftIO m - => NCQStorage3 - -> FileKey - -> m () - -ncqDelCachedData NCQStorage3{..} fk = - atomically (modifyTVar ncqMMapCachedData $ HPSQ.delete fk) +ncqDelCachedDataSTM NCQStorage3{..} fk = + modifyTVar ncqMMapCachedData $ HPSQ.delete fk diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index e62d0711..98b8713b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -13,6 +13,7 @@ import HBS2.Storage.NCQ3.Internal.MMapCache import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe import Network.ByteOrder qualified as N import Data.HashSet qualified as HS import Data.HashPSQ qualified as PSQ @@ -20,6 +21,7 @@ import Data.Vector qualified as V import Data.HashMap.Strict qualified as HM import Data.ByteString qualified as BS import Data.Sequence qualified as Seq +import Data.Fixed import System.FilePath.Posix import System.Posix.Files qualified as Posix import System.Posix.IO as PosixBase @@ -66,7 +68,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do let q = ncqWriteOps ! i forever (liftIO $ join $ atomically (readTQueue q)) - replicateM_ 2 $ spawnActivity $ fix \next -> do + replicateM_ 2 $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do (h, answ) <- atomically $ readTQueue ncqReadReq let answer l = atomically (putTMVar answ l) @@ -75,26 +77,62 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do atomically (ncqLookupEntrySTM ncq h) >>= \case Nothing -> none - Just e -> answer (Just (InMemory (ncqEntryData e))) >> next + Just e -> answer (Just (InMemory (ncqEntryData e))) >> exit () + + ContT $ ncqWithState ncq NCQState{..} <- readTVarIO ncqState for_ ncqStateIndex $ \(_, fk) -> do - CachedIndex bs nw <- ncqGetCachedIndex ncq fk - ncqLookupIndex h (bs, nw) >>= \case - Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> next + CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk + lift (ncqLookupIndex h (bs, nw)) >>= \case + Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> exit () Nothing -> none -- debug $ "NOT FOUND SHIT" <+> pretty h - answer Nothing >> next + answer Nothing >> exit () spawnActivity measureWPS - spawnActivity $ forever do - withSem ncqServiceSem (ncqSweepObsoleteStates ncq) - pause @'Seconds 10 + spawnActivity $ postponed 10 $ forever do - spawnActivity (ncqSweepLoop ncq) + ema <- readTVarIO ncqWriteEMA + + when ( ema < ncqIdleThrsh ) do + ncqSweepObsoleteStates ncq + + -- FIXME: timeout-hardcode + pause @'Seconds 60 + + spawnActivity $ forever do + pause @'Seconds 30 + ema <- readTVarIO ncqWriteEMA + debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema) + + spawnActivity $ postponed 10 $ forever do + ema <- readTVarIO ncqWriteEMA + + when ( ema < ncqIdleThrsh ) do + ncqSweepFiles ncq + + -- FIXME: timeout-hardcode + pause @'Seconds 60 + + spawnActivity $ postponed 10 $ forever $ void $ runMaybeT do + ema <- readTVarIO ncqWriteEMA + + when (ema > ncqIdleThrsh) $ pause @'Seconds 10 >> mzero + + compacted <- lift $ ncqIndexCompactStep ncq + + when compacted mzero + + k0 <- readTVarIO ncqStateKey + void $ lift $ race (pause @'Seconds 600) do + flip fix k0 $ \waitState k1 -> do + pause @'Seconds 60 + k2 <- readTVarIO ncqStateKey + when (k2 == k1) $ waitState k2 flip fix RunNew $ \loop -> \case RunFin -> do @@ -216,6 +254,8 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do alpha = 0.1 step = 1.00 + postponed n m = liftIO (pause @'Seconds n) >> m + data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index 0b92adbb..fb663c6d 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -4,6 +4,7 @@ module HBS2.Storage.NCQ3.Internal.State where import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.Files +import HBS2.Storage.NCQ3.Internal.MMapCache import Data.Config.Suckless.Script @@ -14,6 +15,7 @@ import Control.Monad.Reader import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont import Data.HashSet qualified as HS +import Data.HashMap.Strict qualified as HM import Data.Set qualified as Set import Data.ByteString qualified as BS import UnliftIO.IO.File @@ -38,6 +40,7 @@ ncqStateUpdate ncq@NCQStorage3{..} action = do s1 <- atomically do void $ runReaderT (fromStateOp action) ncq + modifyTVar ncqWrites succ readTVar ncqState unless (s1 == s0) do @@ -45,7 +48,7 @@ ncqStateUpdate ncq@NCQStorage3{..} action = do let snkFile = ncqGetFileName ncq (StateFile key) liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do IO.hPrint fh (pretty s1) - atomically $ writeTVar ncqStateKey (Just key) + atomically $ writeTVar ncqStateKey key ncqStateAddDataFile :: FileKey -> StateOP () ncqStateAddDataFile fk = do @@ -53,6 +56,13 @@ ncqStateAddDataFile fk = do StateOP $ lift do modifyTVar ncqState (over #ncqStateFiles (HS.insert fk)) +ncqStateDelDataFile :: FileKey -> StateOP () +ncqStateDelDataFile fk = do + sto@NCQStorage3{..} <- ask + StateOP $ lift do + modifyTVar ncqState (over #ncqStateFiles (HS.delete fk)) + ncqDelCachedDataSTM sto fk + ncqStateAddFact :: Fact -> StateOP () ncqStateAddFact fact = do NCQStorage3{..} <- ask @@ -75,8 +85,11 @@ ncqStateAddIndexFile ts fk = do ncqStateDelIndexFile :: FileKey -> StateOP () ncqStateDelIndexFile fk = do - NCQStorage3{..} <- ask - StateOP $ lift $ modifyTVar' ncqState (over #ncqStateIndex $ filter f) + sto@NCQStorage3{..} <- ask + StateOP $ lift do + modifyTVar' ncqState (over #ncqStateIndex $ filter f) + ncqDelCachedIndexSTM sto fk + where f (_,b) = b /= fk sortIndexes :: NCQState -> NCQState @@ -94,6 +107,41 @@ ncqFileFastCheck fp = do unless ( BS.length mmaped == fromIntegral s ) do throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s)) +ncqStateCapture :: forall m . MonadUnliftIO m + => NCQStorage3 + -> m FileKey + +ncqStateCapture me@NCQStorage3{..} = do + atomically do + key <- readTVar ncqStateKey + stateUse <- readTVar ncqStateUse + case HM.lookup key stateUse of + Just (_, tv) -> modifyTVar tv succ + Nothing -> do + state <- readTVar ncqState + new <- (state,) <$> newTVar 1 + modifyTVar ncqStateUse (HM.insert key new) + pure key + +ncqStateDismiss :: forall m . MonadUnliftIO m + => NCQStorage3 + -> FileKey + -> m () +ncqStateDismiss me@NCQStorage3{..} key = atomically do + useMap <- readTVar ncqStateUse + case HM.lookup key useMap of + Nothing -> pure () + Just (_, tv) -> do + modifyTVar tv (max 0 . pred) + cnt <- readTVar tv + when (cnt <= 0) do + modifyTVar ncqStateUse (HM.delete key) + +ncqWithState :: forall a m . MonadUnliftIO m + => NCQStorage3 + -> ( FileKey -> m a ) + -> m a +ncqWithState sto = bracket (ncqStateCapture sto) (ncqStateDismiss sto) readStateMay :: forall m . MonadUnliftIO m => NCQStorage3 @@ -117,9 +165,6 @@ readStateMay sto key = fmap sortIndexes <$> do ListVal [SymbolVal "f", LitIntVal n] -> ncqState0 { ncqStateFiles = HS.singleton (fromIntegral n) } - ListVal [SymbolVal "fi", LitIntVal a, LitIntVal b] -> - ncqState0 { ncqStateFacts = Set.singleton (FI (DataFile (fromIntegral a)) (IndexFile (fromIntegral b))) } - ListVal [SymbolVal "fp", LitIntVal a, LitIntVal s] -> ncqState0 { ncqStateFacts = Set.singleton (P (PData (DataFile $ fromIntegral a) (fromIntegral s))) } diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs index a733a782..3c88a72b 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs @@ -13,38 +13,46 @@ import Data.List qualified as List import Data.HashSet qualified as HS import System.Posix.Files qualified as PFS import Control.Monad.Trans.Maybe +import Data.HashMap.Strict qualified as HM -data SweepSt = SweepWaitIdle - | SweepCheckEMA SweepSt - | SweepSomething +ncqLiveKeys :: forall m . MonadUnliftIO m => NCQStorage3 -> m (HashSet FileKey) +ncqLiveKeys NCQStorage3{..} = do + + merged <- atomically do + s0 <- readTVar ncqState + readTVar ncqStateUse <&> (s0<>) . foldMap fst . HM.elems + + pure $ HS.fromList $ universeBi @_ @FileKey merged + +ncqSweepFiles :: forall m . MonadUnliftIO m => NCQStorage3 -> m () +ncqSweepFiles me@NCQStorage3{..} = withSem ncqServiceSem do + + debug "ncqSweepFiles" + + live <- ncqLiveKeys me -ncqSweepLoop :: MonadUnliftIO m => NCQStorage3 -> m () -ncqSweepLoop me@NCQStorage3{..} = flip fix SweepWaitIdle $ \next -> \case + debug $ "ALIVE" <+> pretty (HS.toList live) - SweepWaitIdle -> do - debug "SweepWaitIdle" - pause @'Seconds 10 - next (SweepCheckEMA SweepSomething) + fossils <- ncqListFilesBy me (List.isPrefixOf "f-") + indexes <- ncqListFilesBy me (List.isPrefixOf "i-") - SweepCheckEMA who -> do - ema <- readTVarIO ncqWriteEMA - debug $ "SweepCheckEMA" <+> pretty ema - if ema < ncqIdleThrsh then do - next who - else - next SweepWaitIdle + for_ indexes $ \(_, k) -> unless (HS.member k live) do + let fn = ncqGetFileName me (IndexFile k) + debug $ yellow "REMOVING" <+> pretty (takeFileName fn) + rm fn + + for_ fossils $ \(_, k) -> unless (HS.member k live) do + let fn = ncqGetFileName me (DataFile k) + debug $ yellow "REMOVING" <+> pretty (takeFileName fn) + rm fn - SweepSomething -> do - debug $ "SweepSomething" - pause @'Seconds 10 - next SweepWaitIdle ncqSweepObsoleteStates :: forall m . MonadUnliftIO m => NCQStorage3 -> m () -ncqSweepObsoleteStates me@NCQStorage3{..} = void $ runMaybeT do +ncqSweepObsoleteStates me@NCQStorage3{..} = withSem ncqServiceSem do debug $ "ncqSweepObsoleteStates" - k <- readTVarIO ncqStateKey >>= toMPlus + k <- readTVarIO ncqStateKey r <- liftIO $ try @_ @SomeException do ts <- PFS.getFileStatus (ncqGetFileName me (StateFile k)) <&> PFS.modificationTimeHiRes @@ -55,10 +63,9 @@ ncqSweepObsoleteStates me@NCQStorage3{..} = void $ runMaybeT do when (f /= k && t < ts) do debug $ yellow "TO REMOVE" <+> pretty (toFileName (StateFile f)) rm (ncqGetFileName me (StateFile f)) - lift do - case r of - Left e -> err ("SweepStates failed" <+> viaShow e) - Right{} -> none + case r of + Left e -> err ("SweepStates failed" <+> viaShow e) + Right{} -> none diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index f4b96bba..882d7f27 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -49,9 +49,7 @@ data Location = | InMemory {-# UNPACK #-} !ByteString -data Fact = - FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y - | P PData -- pending, not indexed +data Fact = P PData -- pending, not indexed deriving stock (Eq,Ord,Data) data PData = PData (DataFile FileKey) Word64 @@ -92,7 +90,8 @@ data NCQStorage3 = , ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData) , ncqMemTable :: Vector Shard , ncqState :: TVar NCQState - , ncqStateKey :: TVar (Maybe FileKey) + , ncqStateKey :: TVar FileKey + , ncqStateUse :: TVar (HashMap FileKey (NCQState, TVar Int)) , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqWriteQ :: TVar (Seq HashRef) @@ -188,6 +187,5 @@ instance Pretty NCQState where | f <- Set.toList ncqStateFacts ] - pf (FI (DataFile a) (IndexFile b)) = "fi" <+> pretty a <+> pretty b pf (P (PData (DataFile a) s)) = "fp" <+> pretty a <+> pretty s diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 00c64a52..8b1492d3 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -31,6 +31,7 @@ import Data.Config.Suckless.System import NCQTestCommon import Data.HashSet qualified as HS +import Data.HashMap.Strict qualified as HM import Test.Tasty.HUnit import Data.ByteString qualified as BS import Data.Ord @@ -38,6 +39,7 @@ import Data.Set qualified as Set import System.Random.MWC as MWC import Control.Concurrent.STM qualified as STM import Data.List qualified as List +import Control.Monad.Trans.Cont import UnliftIO {-HLINT ignore "Functor law"-} @@ -186,19 +188,59 @@ ncq3Tests = do entry $ bindMatch "test:ncq3:sweep" $ nil_ \e -> do + t0 <- getTimeCoarse + let (opts,args) = splitOpts [] e let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ] g <- liftIO MWC.createSystemRandom runTest $ \TestEnv{..} -> do - ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> do - notice $ "write" <+> pretty num + ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> flip runContT pure do + hst <- newTVarIO ( mempty :: HashSet HashRef ) + lostt <- newTVarIO 0 + req <- newTVarIO 0 + + ContT $ withAsync $ forever do + pause @'Seconds 20 + t <- getTimeCoarse <&> sec2 . (*1e-9) . realToFrac . toNanoSecs . (+ (-t0)) + l <- readTVarIO lostt + r <- readTVarIO req + pp <- readTVarIO ncqStateUse <&> HM.size + let c = if l > 0 then red else id + debug $ "Elapsed" <+> pretty t <+> pretty pp <+> pretty r <+> c (pretty l) + + ContT $ withAsync $ forever do + p <- liftIO $ uniformRM (0, 0.75) g + pause @'Seconds (realToFrac p) + hh <- readTVarIO hst + + when (HS.size hh > 0) do + + i <- liftIO $ uniformRM (0, HS.size hh - 1) g + let hi = HS.toList hh !! i + found <- ncqLocate sto hi <&> isJust + atomically $ modifyTVar req succ + + unless found do + err $ red "NOT FOUND" <+> pretty hi + atomically $ modifyTVar lostt succ + + notice $ "write" <+> pretty num replicateM_ num do n <- liftIO $ uniformRM (1024, 64*1024) g bs <- liftIO $ genRandomBS g n - h <- ncqPutBS sto (Just B) Nothing bs + h <- lift $ ncqPutBS sto (Just B) Nothing bs atomically $ modifyTVar hst (HS.insert h) - pause @'Seconds 300 + pause @'Seconds 180 + + notice "check after compaction" + + h1 <- readTVarIO hst + + for_ h1 $ \h -> lift do + found <- ncqLocate sto h <&> isJust + liftIO $ assertBool (show $ "found" <+> pretty h) found +