From a5dbfe5e0b7765f43dc9f383e7020beb0efd3aae Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 29 Jul 2025 18:04:08 +0300 Subject: [PATCH] wip, sweep routines --- hbs2-storage-ncq/hbs2-storage-ncq.cabal | 1 + .../lib/HBS2/Storage/NCQ3/Internal.hs | 9 ++- .../lib/HBS2/Storage/NCQ3/Internal/Files.hs | 24 +++++++ .../lib/HBS2/Storage/NCQ3/Internal/Index.hs | 63 +++++++++++++++++- .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 7 ++ .../lib/HBS2/Storage/NCQ3/Internal/State.hs | 10 ++- .../lib/HBS2/Storage/NCQ3/Internal/Sweep.hs | 64 +++++++++++++++++++ .../lib/HBS2/Storage/NCQ3/Internal/Types.hs | 23 +++++-- hbs2-tests/test/NCQ3.hs | 61 ++++++++++++++++++ 9 files changed, 252 insertions(+), 10 deletions(-) create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index eec50334..3b0bda42 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -65,6 +65,7 @@ library HBS2.Storage.NCQ3.Internal.Types HBS2.Storage.NCQ3.Internal.Prelude HBS2.Storage.NCQ3.Internal.State + HBS2.Storage.NCQ3.Internal.Sweep HBS2.Storage.NCQ3.Internal.Run HBS2.Storage.NCQ3.Internal.Memtable HBS2.Storage.NCQ3.Internal.Index diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 46c24005..e609aa40 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -35,6 +35,8 @@ import System.Posix.Files ( getFileStatus ) import System.Posix.Files qualified as PFS import System.IO.MMap as MMap +import Control.Concurrent.STM qualified as STM +import Control.Concurrent.STM.TSem ncqStorageOpen3 :: MonadIO m => FilePath -> (NCQStorage3 -> NCQStorage3) -> m NCQStorage3 ncqStorageOpen3 fp upd = do @@ -71,6 +73,8 @@ ncqStorageOpen3 fp upd = do ncqOnRunWriteIdle <- newTVarIO none ncqSyncNo <- newTVarIO 0 ncqState <- newTVarIO mempty + ncqStateKey <- newTVarIO mempty + ncqServiceSem <- atomically $ newTSem 1 let ncq = NCQStorage3{..} & upd @@ -118,8 +122,8 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe putTMVar waiter h atomically do - nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) modifyTVar ncqWrites succ + nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) writeTQueue (ncqWriteOps ! nw) work atomically $ takeTMVar waiter @@ -180,9 +184,8 @@ ncqTryLoadState me@NCQStorage3{..} = do ncqIndexFile me dataFile - for_ (bad <> drop 3 (fmap snd rest)) $ \f -> do + for_ (bad <> fmap snd rest) $ \f -> do let old = ncqGetFileName me (StateFile f) - -- debug $ "rm old state" <+> pretty old rm old where diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs index cd66f1c7..60159592 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Files.hs @@ -39,5 +39,29 @@ ncqListFilesBy me@NCQStorage3{..} filt = do pure $ List.sortOn ( Down . fst ) r +ncqFindMinPairOf :: forall fa m . (ToFileName fa, MonadUnliftIO m) + => NCQStorage3 + -> [fa] + -> m (Maybe (NCQFileSize, fa, fa)) +ncqFindMinPairOf sto lst = do + + let files = fmap (\x -> (x, ncqGetFileName sto x)) lst + + flip fix (files, Nothing) $ \next (fs, r) -> do + case fs of + [] -> pure r + [ _ ] -> pure r + ( s1 : s2 : ss ) -> do + size1 <- fsize (snd s1) + size2 <- fsize (snd s2) + let size = fromIntegral $ size1 + size2 + + case r of + Nothing -> next (s2 : ss, Just (size, fst s1, fst s2) ) + e@(Just (size0, _, _)) | size0 > size -> next (s2 : ss, Just (size, fst s1, fst s2) ) + | otherwise -> next (s2:ss, e) + + where fsize s = liftIO (PFS.getFileStatus s) <&> PFS.fileSize + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index 19ee37d7..685f6bed 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -13,6 +13,8 @@ import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.ByteString qualified as BS import System.IO.MMap +import System.IO.Temp as Temp +import Streaming.Prelude qualified as S data IndexEntry = IndexEntry {-# UNPACK #-} !FileKey !Word64 !Word32 @@ -32,6 +34,12 @@ unpackIndexEntry entryBs = do emptyKey :: ByteString emptyKey = BS.replicate 32 0 +ncqIndexAlloc :: NWayHashAlloc +ncqIndexAlloc = nwayAllocDef 1.10 32 8 16 + +ncqIndexAllocForMerge :: NWayHashAlloc +ncqIndexAllocForMerge = nwayAllocDef 0.8 32 8 16 + ncqLookupIndex :: MonadUnliftIO m => HashRef -> (ByteString, NWayHash) @@ -67,7 +75,7 @@ ncqIndexFile n fk = runMaybeT do let (dir,name) = splitFileName fp let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" - result <- lift $ nwayWriteBatch (nwayAllocDef 1.10 32 8 16) dir idxTemp items + result <- lift $ nwayWriteBatch ncqIndexAlloc dir idxTemp items mv result dest @@ -96,6 +104,59 @@ ncqIndexFile n fk = runMaybeT do pure dest +{-HLINT ignore "Functor law"-} + +ncqIndexCompactStep :: MonadUnliftIO m + => NCQStorage3 + -> m Bool +ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do + + idx <- readTVarIO ncqState + <&> fmap (IndexFile . snd) . ncqStateIndex + + r' <- lift $ ncqFindMinPairOf me idx + + (_, a, b) <- ContT $ maybe1 r' (pure False) + + let idx1Name = ncqGetFileName me a + let idx2Name = ncqGetFileName me b + + (bs1, nw1) <- lift (nwayHashMMapReadOnly idx1Name) >>= \case + Nothing -> err ("missed file" <+> pretty idx1Name) >> exit False + Just e -> pure e + + (bs2, nw2) <- lift (nwayHashMMapReadOnly idx2Name) >>= \case + Nothing -> err ("missed file" <+> pretty idx2Name) >> exit False + Just e -> pure e + + e <- S.toList_ do + nwayHashScanAll nw1 bs1 $ \_ k v -> unless (k == emptyKey) do + S.yield (k,v) + + nwayHashScanAll nw2 bs2 \_ k v -> unless (k == emptyKey) do + r <- liftIO (nwayHashLookup nw1 bs1 k) + unless (isJust r) do + S.yield (k,v) + + + let dir = ncqGetWorkDir me + + ts <- liftIO (PFS.getFileStatus idx1Name) <&> PFS.modificationTimeHiRes + + result <- lift $ nwayWriteBatch ncqIndexAllocForMerge dir "merged-.cq$" e + + liftIO $ PFS.setFileTimesHiRes result ts ts + + fki <- ncqGetNewFileKey me IndexFile + mv result (ncqGetFileName me (IndexFile fki)) + + ncqStateUpdate me do + ncqStateDelIndexFile (coerce a) + ncqStateDelIndexFile (coerce b) + ncqStateAddIndexFile ts fki + + pure True + ncqStorageScanDataFile :: MonadIO m => NCQStorage3 -> FilePath diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 4ba3b546..e62d0711 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -8,6 +8,7 @@ import HBS2.Storage.NCQ3.Internal.Files import HBS2.Storage.NCQ3.Internal.Memtable import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Sweep import HBS2.Storage.NCQ3.Internal.MMapCache @@ -89,6 +90,12 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do spawnActivity measureWPS + spawnActivity $ forever do + withSem ncqServiceSem (ncqSweepObsoleteStates ncq) + pause @'Seconds 10 + + spawnActivity (ncqSweepLoop ncq) + flip fix RunNew $ \loop -> \case RunFin -> do debug "exit storage" diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs index 478670c5..0b92adbb 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/State.hs @@ -41,9 +41,11 @@ ncqStateUpdate ncq@NCQStorage3{..} action = do readTVar ncqState unless (s1 == s0) do - snkFile <- ncqGetNewFileKey ncq StateFile <&> ncqGetFileName ncq . StateFile + key <- ncqGetNewFileKey ncq StateFile + let snkFile = ncqGetFileName ncq (StateFile key) liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do IO.hPrint fh (pretty s1) + atomically $ writeTVar ncqStateKey (Just key) ncqStateAddDataFile :: FileKey -> StateOP () ncqStateAddDataFile fk = do @@ -71,6 +73,12 @@ ncqStateAddIndexFile ts fk = do NCQStorage3{..} <- ask StateOP $ lift $ modifyTVar' ncqState (sortIndexes . over #ncqStateIndex ((Down ts, fk) :)) +ncqStateDelIndexFile :: FileKey -> StateOP () +ncqStateDelIndexFile fk = do + NCQStorage3{..} <- ask + StateOP $ lift $ modifyTVar' ncqState (over #ncqStateIndex $ filter f) + where f (_,b) = b /= fk + sortIndexes :: NCQState -> NCQState sortIndexes = over #ncqStateIndex (List.sortOn fst) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs new file mode 100644 index 00000000..a733a782 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Sweep.hs @@ -0,0 +1,64 @@ +module HBS2.Storage.NCQ3.Internal.Sweep where + + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.Files +import HBS2.Storage.NCQ3.Internal.State +import HBS2.Storage.NCQ3.Internal.Index + +import Data.Generics.Uniplate.Operations +import Data.Generics.Uniplate.Data() +import Data.List qualified as List +import Data.HashSet qualified as HS +import System.Posix.Files qualified as PFS +import Control.Monad.Trans.Maybe + +data SweepSt = SweepWaitIdle + | SweepCheckEMA SweepSt + | SweepSomething + + +ncqSweepLoop :: MonadUnliftIO m => NCQStorage3 -> m () +ncqSweepLoop me@NCQStorage3{..} = flip fix SweepWaitIdle $ \next -> \case + + SweepWaitIdle -> do + debug "SweepWaitIdle" + pause @'Seconds 10 + next (SweepCheckEMA SweepSomething) + + SweepCheckEMA who -> do + ema <- readTVarIO ncqWriteEMA + debug $ "SweepCheckEMA" <+> pretty ema + if ema < ncqIdleThrsh then do + next who + else + next SweepWaitIdle + + SweepSomething -> do + debug $ "SweepSomething" + pause @'Seconds 10 + next SweepWaitIdle + +ncqSweepObsoleteStates :: forall m . MonadUnliftIO m => NCQStorage3 -> m () +ncqSweepObsoleteStates me@NCQStorage3{..} = void $ runMaybeT do + debug $ "ncqSweepObsoleteStates" + + k <- readTVarIO ncqStateKey >>= toMPlus + + r <- liftIO $ try @_ @SomeException do + ts <- PFS.getFileStatus (ncqGetFileName me (StateFile k)) <&> PFS.modificationTimeHiRes + filez <- ncqListFilesBy me (List.isPrefixOf "s-") + + for_ filez $ \(t,f) -> do + + when (f /= k && t < ts) do + debug $ yellow "TO REMOVE" <+> pretty (toFileName (StateFile f)) + rm (ncqGetFileName me (StateFile f)) + lift do + + case r of + Left e -> err ("SweepStates failed" <+> viaShow e) + Right{} -> none + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs index babc8d61..f4b96bba 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Types.hs @@ -4,12 +4,12 @@ module HBS2.Storage.NCQ3.Internal.Types where import HBS2.Storage.NCQ3.Internal.Prelude -import Data.Generics.Product import Numeric (readHex) +import Data.Data import Data.Set qualified as Set import Data.HashSet qualified as HS import Text.Printf --- import Lens.Micro.Platform +import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem) data CachedData = CachedData !ByteString @@ -24,11 +24,15 @@ type StateVersion = Word64 newtype FileKey = FileKey Word32 deriving newtype (Eq,Ord,Show,Num,Enum,Real,Integral,Pretty,Hashable) + deriving stock (Data,Generic) deriving stock instance Eq (DataFile FileKey) deriving stock instance Ord (DataFile FileKey) deriving stock instance Eq (IndexFile FileKey) deriving stock instance Ord (IndexFile FileKey) +deriving stock instance Data (IndexFile FileKey) +deriving stock instance Data (DataFile FileKey) +deriving stock instance Data (StateFile FileKey) data NCQEntry = NCQEntry @@ -37,6 +41,7 @@ data NCQEntry = } type NCQOffset = Word64 +type NCQFileSize = NCQOffset type NCQSize = Word32 data Location = @@ -47,9 +52,10 @@ data Location = data Fact = FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y | P PData -- pending, not indexed - deriving stock (Eq,Ord) + deriving stock (Eq,Ord,Data) data PData = PData (DataFile FileKey) Word64 + deriving stock (Data) instance Ord PData where compare (PData a _) (PData b _) = compare a b @@ -65,7 +71,7 @@ data NCQState = , ncqStateVersion :: StateVersion , ncqStateFacts :: Set Fact } - deriving stock (Eq,Generic) + deriving stock (Eq,Generic,Data) data NCQStorage3 = NCQStorage3 @@ -86,6 +92,7 @@ data NCQStorage3 = , ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData) , ncqMemTable :: Vector Shard , ncqState :: TVar NCQState + , ncqStateKey :: TVar (Maybe FileKey) , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqWriteQ :: TVar (Seq HashRef) @@ -96,10 +103,10 @@ data NCQStorage3 = , ncqSyncReq :: TVar Bool , ncqOnRunWriteIdle :: TVar (IO ()) , ncqSyncNo :: TVar Int + , ncqServiceSem :: TSem } - instance Monoid FileKey where mempty = FileKey 0 @@ -140,6 +147,12 @@ instance Pretty Location where ncqMakeFossilName :: FileKey -> FilePath ncqMakeFossilName = printf "f-%08x.data" . coerce @_ @Word32 +withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a +withSem sem action = + bracket (atomically (waitTSem sem)) + (const $ atomically (signalTSem sem)) + (const action) + ncqState0 :: NCQState ncqState0 = NCQState{..} where diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index edba1b82..00c64a52 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -30,6 +30,7 @@ import Data.Config.Suckless.System import NCQTestCommon +import Data.HashSet qualified as HS import Test.Tasty.HUnit import Data.ByteString qualified as BS import Data.Ord @@ -39,6 +40,7 @@ import Control.Concurrent.STM qualified as STM import Data.List qualified as List import UnliftIO +{-HLINT ignore "Functor law"-} ncq3Tests :: forall m . MonadUnliftIO m => MakeDictM C m () ncq3Tests = do @@ -141,3 +143,62 @@ ncq3Tests = do notice $ "found:" <+> pretty (coerce @_ @HashRef k) <+> viaShow e e -> throwIO $ BadFormException @C (mkList e) + + + entry $ bindMatch "test:ncq3:merge" $ nil_ \e -> do + + let (opts,args) = splitOpts [] e + let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ] + g <- liftIO MWC.createSystemRandom + + runTest $ \TestEnv{..} -> do + ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> do + notice $ "write" <+> pretty num + hst <- newTVarIO ( mempty :: HashSet HashRef ) + replicateM_ num do + n <- liftIO $ uniformRM (1024, 64*1024) g + bs <- liftIO $ genRandomBS g n + h <- ncqPutBS sto (Just B) Nothing bs + atomically $ modifyTVar hst (HS.insert h) + + idx <- readTVarIO ncqState + <&> ncqStateIndex + <&> fmap (IndexFile . snd) + + r <- ncqFindMinPairOf sto idx + notice $ pretty r + + fix $ \loop -> do + notice "compacting once" + w <- ncqIndexCompactStep sto + when w loop + + nstate <- readTVarIO ncqState + + notice $ "new state" <> line <> pretty nstate + + hss <- readTVarIO hst + + for_ hss $ \h -> do + found <- ncqLocate sto h <&> isJust + liftIO $ assertBool (show $ "found" <+> pretty h) found + + + entry $ bindMatch "test:ncq3:sweep" $ nil_ \e -> do + + let (opts,args) = splitOpts [] e + let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ] + g <- liftIO MWC.createSystemRandom + + runTest $ \TestEnv{..} -> do + ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> do + notice $ "write" <+> pretty num + hst <- newTVarIO ( mempty :: HashSet HashRef ) + replicateM_ num do + n <- liftIO $ uniformRM (1024, 64*1024) g + bs <- liftIO $ genRandomBS g n + h <- ncqPutBS sto (Just B) Nothing bs + atomically $ modifyTVar hst (HS.insert h) + + pause @'Seconds 300 +