From 6c3dc290419c7c9b6346f8a09c14e50164e01a62 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 30 Jul 2025 15:46:37 +0300 Subject: [PATCH] wip, data file merge --- hbs2-storage-ncq/hbs2-storage-ncq.cabal | 1 + hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs | 1 + .../lib/HBS2/Storage/NCQ3/Internal.hs | 11 +- .../lib/HBS2/Storage/NCQ3/Internal/Fossil.hs | 181 ++++++++++++++++++ .../lib/HBS2/Storage/NCQ3/Internal/Index.hs | 12 +- .../lib/HBS2/Storage/NCQ3/Internal/Run.hs | 79 +++----- hbs2-tests/test/NCQ3.hs | 35 ++++ 7 files changed, 254 insertions(+), 66 deletions(-) create mode 100644 hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs diff --git a/hbs2-storage-ncq/hbs2-storage-ncq.cabal b/hbs2-storage-ncq/hbs2-storage-ncq.cabal index 3b0bda42..8ca9561c 100644 --- a/hbs2-storage-ncq/hbs2-storage-ncq.cabal +++ b/hbs2-storage-ncq/hbs2-storage-ncq.cabal @@ -71,6 +71,7 @@ library HBS2.Storage.NCQ3.Internal.Index HBS2.Storage.NCQ3.Internal.MMapCache HBS2.Storage.NCQ3.Internal.Files + HBS2.Storage.NCQ3.Internal.Fossil HBS2.Storage.NCQ HBS2.Storage.NCQ2 HBS2.Storage.NCQ2.Internal diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs index d96417d7..114484bf 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3.hs @@ -16,5 +16,6 @@ import HBS2.Storage.NCQ3.Internal import HBS2.Storage.NCQ3.Internal.Run import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Memtable +import HBS2.Storage.NCQ3.Internal.Index diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs index 99003b54..d55cdab1 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal.hs @@ -45,7 +45,7 @@ ncqStorageOpen3 fp upd = do let ncqFsync = 16 * megabytes let ncqWriteQLen = 1024 * 4 let ncqMinLog = 512 * megabytes - let ncqMaxLog = 2 * ncqMinLog + let ncqMaxLog = 32 * gigabytes let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCachedIndex = 16 let ncqMaxCachedData = 64 @@ -131,15 +131,6 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe where hash0 = HashRef (hashObject @HbSync bs') -ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location) -ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do - answ <- newEmptyTMVarIO - - atomically do - modifyTVar ncqWrites succ - writeTQueue ncqReadReq (href, answ) - - atomically $ takeTMVar answ ncqTryLoadState :: forall m. MonadUnliftIO m => NCQStorage3 diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs new file mode 100644 index 00000000..1108be91 --- /dev/null +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Fossil.hs @@ -0,0 +1,181 @@ +module HBS2.Storage.NCQ3.Internal.Fossil where + +import HBS2.Storage.NCQ3.Internal.Prelude +import HBS2.Storage.NCQ3.Internal.Types +import HBS2.Storage.NCQ3.Internal.Files +import HBS2.Storage.NCQ3.Internal.Index +import HBS2.Storage.NCQ3.Internal.State + +import HBS2.Data.Types.Refs + +import Data.HashSet qualified as HS +import Data.List qualified as List +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Control.Monad.Trans.Cont +import Network.ByteOrder qualified as N +import Data.ByteString.Builder +import System.IO.Temp (emptyTempFile) + +import System.FilePath.Posix +import System.Posix.Files qualified as Posix +import System.Posix.IO as PosixBase +import System.Posix.Types as Posix +import System.Posix.Unistd +import System.Posix.IO.ByteString as Posix +import System.Posix.Files ( getFileStatus + , modificationTimeHiRes + , setFileTimesHiRes + , getFdStatus + , FileStatus(..) + , setFileMode + ) +import System.Posix.Files qualified as PFS + +import UnliftIO.IO.File + +{-HLINT ignore "Functor law"-} + +ncqFossilMergeStep :: forall m . MonadUnliftIO m + => NCQStorage3 + -> m Bool + +ncqFossilMergeStep me@NCQStorage3{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do + + debug "ncqFossilMergeStep" + + -- TODO: consider-sort-by-timestamps + files <- readTVarIO ncqState + <&> fmap DataFile . HS.toList . ncqStateFiles + <&> List.sortOn Down + + r' <- lift $ ncqFindMinPairOf me files + + r@(sumSize, f1, f2) <- ContT $ maybe1 r' (pure False) + + debug $ "for compacting" <+> pretty f1 <+> pretty f2 <+> pretty r <+> pretty ncqMaxLog + + when (fromIntegral sumSize > ncqMaxLog) $ exit False + + let (p,tpl) = splitFileName (ncqGetFileName me "merge-.merge") + + outFile <- liftIO $ emptyTempFile p tpl + + ContT $ bracket none $ const do + rm outFile + + liftIO $ withBinaryFileAtomic outFile WriteMode $ \fwh -> do + fd <- handleToFd fwh + + already <- newTVarIO (mempty :: HashSet HashRef ) + + for_ [f1, f2] $ \fi -> do + let fik = coerce fi + writeFiltered me (ncqGetFileName me fi) fd $ \_ _ k _ -> do + ncqLocate me k >>= \case + Nothing -> pure True + Just (InMemory{}) -> pure False + Just (InFossil fk _ _) -> do + let beWritten = fik >= fk + atomically do + here <- readTVar already <&> HS.member k + let proceed = not here && beWritten + when proceed (modifyTVar already (HS.insert k)) + pure proceed + + appendTailSection fd + + f3 <- DataFile <$> ncqGetNewFileKey me DataFile + + let newFile = ncqGetFileName me f3 + + mv outFile newFile + + ss <- liftIO (PFS.getFileStatus newFile) <&> fromIntegral . PFS.fileSize + + ncqStateUpdate me do + ncqStateAddFact (P (PData f3 ss)) + + lift $ ncqIndexFile me f3 + + ncqStateUpdate me do + ncqStateDelDataFile (coerce f1) + ncqStateDelDataFile (coerce f2) + + debug $ "COMPACTED" <+> pretty f1 <+> pretty f2 <+> "=>" <+> pretty f3 + + pure True + + +writeFiltered :: forall m . MonadIO m + => NCQStorage3 + -> FilePath + -> Fd + -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) + -> m () + +writeFiltered ncq fn out filt = do + ncqStorageScanDataFile ncq fn $ \o s k v -> do + skip <- filt o s k v <&> not + + when skip do + debug $ pretty k <+> pretty "skipped" + + unless skip $ liftIO do + void $ appendSection out (LBS.toStrict (makeEntryLBS k v)) + + where + + makeEntryLBS h bs = do + let b = byteString (coerce @_ @ByteString h) + <> byteString bs + + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = byteString (N.bytestring32 (fromIntegral len)) + + toLazyByteString (ws <> b) + + + +zeroSyncEntry :: ByteString +zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload + where zeroPayload = N.bytestring64 0 + zeroHash = HashRef (hashObject zeroPayload) +{-# INLINE zeroSyncEntry #-} + +zeroSyncEntrySize :: Word64 +zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) +{-# INLINE zeroSyncEntrySize #-} + +-- 1. It's M-record +-- 2. It's last w64be == fileSize +-- 3. It's hash == hash (bytestring64be fileSize) +-- 4. recovery-strategy: start-to-end, end-to-start +fileTailRecord :: Integral a => a -> ByteString +fileTailRecord w = do + -- on open: last w64be == fileSize + let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) + let h = hashObject @HbSync paylo & coerce + ncqMakeSectionBS (Just M) h paylo +{-# INLINE fileTailRecord #-} + +appendSection :: forall m . MonadUnliftIO m + => Fd + -> ByteString + -> m Int -- (FOff, Int) + +appendSection fh sect = do + -- off <- liftIO $ fdSeek fh SeekFromEnd 0 + -- pure (fromIntegral off, fromIntegral len) + liftIO (Posix.fdWrite fh sect) <&> fromIntegral +{-# INLINE appendSection #-} + +appendTailSection :: MonadIO m => Fd -> m () +appendTailSection fh = liftIO do + s <- Posix.fileSize <$> Posix.getFdStatus fh + void (appendSection fh (fileTailRecord s)) +{-# INLINE appendTailSection #-} + + + diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs index da3f01d3..2c693e84 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Index.hs @@ -49,6 +49,16 @@ ncqLookupIndex hx (mmaped, nway) = do {-# INLINE ncqLookupIndex #-} +ncqLocate :: MonadUnliftIO m => NCQStorage3 -> HashRef -> m (Maybe Location) +ncqLocate me@NCQStorage3{..} href = ncqOperation me (pure Nothing) do + answ <- newEmptyTMVarIO + + atomically do + -- modifyTVar ncqWrites succ + writeTQueue ncqReadReq (href, answ) + + atomically $ takeTMVar answ + ncqIndexFile :: MonadUnliftIO m => NCQStorage3 -> DataFile FileKey -> m (Maybe FilePath) ncqIndexFile n fk = runMaybeT do @@ -108,7 +118,7 @@ ncqIndexFile n fk = runMaybeT do ncqIndexCompactStep :: MonadUnliftIO m => NCQStorage3 -> m Bool -ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do +ncqIndexCompactStep me@NCQStorage3{..} = withSem ncqServiceSem $ flip runContT pure $ callCC \exit -> do debug "ncqIndexCompactStep" diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs index 98b8713b..3ec955a6 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs @@ -10,7 +10,7 @@ import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Sweep import HBS2.Storage.NCQ3.Internal.MMapCache - +import HBS2.Storage.NCQ3.Internal.Fossil import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe @@ -118,21 +118,11 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do -- FIXME: timeout-hardcode pause @'Seconds 60 - spawnActivity $ postponed 10 $ forever $ void $ runMaybeT do - ema <- readTVarIO ncqWriteEMA + spawnActivity $ postponed 10 $ compactLoop 10 300 do + ncqIndexCompactStep ncq - when (ema > ncqIdleThrsh) $ pause @'Seconds 10 >> mzero - - compacted <- lift $ ncqIndexCompactStep ncq - - when compacted mzero - - k0 <- readTVarIO ncqStateKey - void $ lift $ race (pause @'Seconds 600) do - flip fix k0 $ \waitState k1 -> do - pause @'Seconds 60 - k2 <- readTVarIO ncqStateKey - when (k2 == k1) $ waitState k2 + spawnActivity $ postponed 15 $ compactLoop 10 600 do + ncqFossilMergeStep ncq flip fix RunNew $ \loop -> \case RunFin -> do @@ -256,6 +246,25 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do postponed n m = liftIO (pause @'Seconds n) >> m + compactLoop :: Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m () + compactLoop t1 t2 what = forever $ void $ runMaybeT do + ema <- readTVarIO ncqWriteEMA + + when (ema > ncqIdleThrsh) $ pause @'Seconds t1 >> mzero + + compacted <- lift what + + when compacted mzero + + k0 <- readTVarIO ncqStateKey + void $ lift $ race (pause @'Seconds t2) do + flip fix k0 $ \waitState k1 -> do + pause @'Seconds 60 + k2 <- readTVarIO ncqStateKey + when (k2 == k1) $ waitState k2 + + + data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) @@ -263,43 +272,3 @@ data RunSt = | RunFin -zeroSyncEntry :: ByteString -zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload - where zeroPayload = N.bytestring64 0 - zeroHash = HashRef (hashObject zeroPayload) -{-# INLINE zeroSyncEntry #-} - -zeroSyncEntrySize :: Word64 -zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) -{-# INLINE zeroSyncEntrySize #-} - --- 1. It's M-record --- 2. It's last w64be == fileSize --- 3. It's hash == hash (bytestring64be fileSize) --- 4. recovery-strategy: start-to-end, end-to-start -fileTailRecord :: Integral a => a -> ByteString -fileTailRecord w = do - -- on open: last w64be == fileSize - let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) - let h = hashObject @HbSync paylo & coerce - ncqMakeSectionBS (Just M) h paylo -{-# INLINE fileTailRecord #-} - -appendSection :: forall m . MonadUnliftIO m - => Fd - -> ByteString - -> m Int -- (FOff, Int) - -appendSection fh sect = do - -- off <- liftIO $ fdSeek fh SeekFromEnd 0 - -- pure (fromIntegral off, fromIntegral len) - liftIO (Posix.fdWrite fh sect) <&> fromIntegral -{-# INLINE appendSection #-} - -appendTailSection :: MonadIO m => Fd -> m () -appendTailSection fh = liftIO do - s <- Posix.fileSize <$> Posix.getFdStatus fh - void (appendSection fh (fileTailRecord s)) -{-# INLINE appendTailSection #-} - - diff --git a/hbs2-tests/test/NCQ3.hs b/hbs2-tests/test/NCQ3.hs index 8b1492d3..f61e24c3 100644 --- a/hbs2-tests/test/NCQ3.hs +++ b/hbs2-tests/test/NCQ3.hs @@ -16,6 +16,7 @@ import HBS2.Storage.Operations.ByteString import HBS2.Storage.NCQ3 import HBS2.Storage.NCQ3.Internal.Files import HBS2.Storage.NCQ3.Internal.Index +import HBS2.Storage.NCQ3.Internal.Fossil import HBS2.System.Logger.Simple.ANSI @@ -244,3 +245,37 @@ ncq3Tests = do liftIO $ assertBool (show $ "found" <+> pretty h) found + + entry $ bindMatch "test:ncq3:merge:fossil" $ nil_ \e -> do + + let (opts,args) = splitOpts [] e + let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ] + g <- liftIO MWC.createSystemRandom + + runTest $ \TestEnv{..} -> do + ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> flip runContT pure do + + hst <- newTVarIO ( mempty :: HashSet HashRef ) + + notice $ "write" <+> pretty num + replicateM_ num do + n <- liftIO $ uniformRM (1024, 64*1024) g + bs <- liftIO $ genRandomBS g n + h <- lift $ ncqPutBS sto (Just B) Nothing bs + atomically $ modifyTVar hst (HS.insert h) + + lift (ncqFossilMergeStep sto) + + notice "merge done" + + pause @'Seconds 180 + + notice "check after compaction" + + h1 <- readTVarIO hst + + for_ h1 $ \h -> lift do + found <- ncqLocate sto h <&> isJust + liftIO $ assertBool (show $ "found" <+> pretty h) found + +