From fc7a5c5e9f22b4b6070acc5d50cd53b0b9869649 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 5 Jun 2025 15:43:48 +0300 Subject: [PATCH] fixing memory hunger on intensive write --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 121 +++++++++++++---------- hbs2-tests/test/TestNCQ.hs | 77 +++++++++++++-- 2 files changed, 135 insertions(+), 63 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 6296c3e0..ed493861 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -77,6 +77,7 @@ import System.Posix.Files qualified as PFS import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) +import System.Mem -- import Foreign.Ptr -- import Foreign di import qualified Data.ByteString.Internal as BSI @@ -149,11 +150,13 @@ data NCQStorage = NCQStorage { ncqRoot :: FilePath , ncqGen :: Int + , ncqQLen :: Int , ncqSyncSize :: Int , ncqMinLog :: Int , ncqMaxSegments :: Int , ncqMaxCached :: Int , ncqCompactTreshold :: Int + , ncqCapabilities :: Int , ncqSalt :: HashRef , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) @@ -352,6 +355,11 @@ ncqAddTrackedFilesSTM NCQStorage{..} keys = do writeTVar ncqTrackedFiles new +ncqWaitForSlotSTM :: NCQStorage -> STM () +ncqWaitForSlotSTM NCQStorage{..} = do + s <- readTVar ncqWriteQueue <&> HPSQ.size + when ( s >= ncqQLen ) STM.retry + ncqListTrackedFiles :: MonadIO m => NCQStorage -> m [FilePath] ncqListTrackedFiles ncq = do let wd = ncqGetCurrentDir ncq @@ -429,9 +437,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do checkCompact <- makeCheckCompact checkMerge <- makeCheckMerge flagWatcher <- makeFlagWatcher + sweep <- makeSweep mapM_ waitCatch [writer,indexer,merge,compact] - mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge] + mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge,sweep] where @@ -475,6 +484,35 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do again + makeSweep = do + ContT $ withAsync $ liftIO $ fix \next -> do + pause @'Seconds 10 + + toClose <- atomically do + usage <- readTVar ncqCurrentUsage + staged <- readTVar ncqStaged + indexed <- readTVar ncqIndexed + + let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage) + + let closable = do + (f, _) <- dead + guard (IntSet.member f indexed) + guard (maybe True HPSQ.null (IntMap.lookup f staged)) + pure f + + writeTVar ncqCurrentUsage (IntMap.fromList alive) + writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable) + writeTVar ncqStaged (foldr IntMap.delete staged closable) + + pure closable + + for_ toClose $ \f -> do + debug $ "CLOSE FD" <+> pretty f + closeFd (fromIntegral f) + + next + makeReader = do cap <- getNumCapabilities reader <- ContT $ withAsync $ untilStopped do @@ -639,42 +677,36 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do initQ <- readTVarIO ncqWriteQueue - wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of - Nothing -> pure mempty - Just (h,_,WQItem{..},rest) -> do + wResult <- flip runContT pure $ callCC \exit -> do + flip fix (0,initQ,mempty) \next (written,q,rq) -> do - -- we really have to write tomb prefix here - let b = byteString (coerce @_ @ByteString h) - <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData) + when (written >= syncData) $ exit rq - let wbs = toLazyByteString b - let len = LBS.length wbs - let ws = N.bytestring32 (fromIntegral len) - let w = ncqSLen + len + -- when (HPSQ.null q) $ exit rq - off <- fdSeek fh SeekFromEnd 0 + case HPSQ.minView q of + Nothing -> pure rq + Just (h,_,WQItem{..},rest) -> do - if isNothing wqData && wqNew then - pure () - else void do - liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) - -- liftIO $ fileSynchronise fh + let b = byteString (coerce @_ @ByteString h) + <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData) - (written',sz) <- if written < syncData then do - pure (written + w,0) - else do - ncqFsync ncq fh - fsize <- getFdStatus fh <&> PFS.fileSize - pure (0,fromIntegral fsize) + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = N.bytestring32 (fromIntegral len) + let w = ncqSLen + len + off <- liftIO $ fdSeek fh SeekFromEnd 0 - -- off <- fdSeek fh SeekFromEnd 0 <&> subtract (fromIntegral w) + ww <- if isNothing wqData && wqNew then + pure 0 + else do + liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) + <&> fromIntegral - if sz < ncqMinLog then do - ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) - else do - pure [(h, (fromIntegral off, fromIntegral len))] + let item = (h, (fromIntegral off, fromIntegral len)) + next (written + ww, rest, item : rq ) ncqFsync ncq fh size <- fdSeek fh SeekFromEnd 0 @@ -737,30 +769,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug $ "TRUNCATED, moved to" <+> pretty fossilized - - toClose <- atomically do - usage <- readTVar ncqCurrentUsage - staged <- readTVar ncqStaged - indexed <- readTVar ncqIndexed - - let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage) - - let closable = do - (f, _) <- dead - guard (IntSet.member f indexed) - guard (maybe True HPSQ.null (IntMap.lookup f staged)) - pure f - - writeTVar ncqCurrentUsage (IntMap.fromList alive) - writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable) - writeTVar ncqStaged (foldr IntMap.delete staged closable) - - pure closable - - for_ toClose $ \f -> do - debug $ "CLOSE FD" <+> pretty f - closeFd (fromIntegral f) - + -- ncqSweep -- ncqStoragePut_ :: MonadUnliftIO m => Bool @@ -781,6 +790,7 @@ ncqStoragePut_ check ncq@NCQStorage{..} h lbs = flip runContT pure $ callCC \exi now <- getTimeCoarse atomically do + ncqWaitForSlotSTM ncq let wqi = WQItem True (Just lbs) modifyTVar ncqWriteQueue (HPSQ.insert h now wqi) modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) @@ -1001,6 +1011,8 @@ ncqStorageGet_ ncq@NCQStorage{..} = \case InCurrent (fd,o,l) -> do r <- atomically do a <- newEmptyTMVar + inRQ <- readTVar ncqCurrentReadReq <&> Seq.length + when (inRQ > 4 * ncqCapabilities) STM.retry modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) pure a @@ -1048,6 +1060,7 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do now <- getTimeCoarse let writeTombstone wq = do + ncqWaitForSlotSTM ncq let recordPrefixLen = ncqSLen + ncqKeyLen + ncqPrefixLen modifyTVar ncqWriteQueue (HPSQ.insert h now wq) modifyTVar ncqNotWritten (+ recordPrefixLen) @@ -1250,7 +1263,8 @@ ncqStorageInit_ check path = do let ncqRoot = path - let ncqSyncSize = 64 * (1024 ^ 2) + let ncqQLen = 32000 + let ncqSyncSize = 32 * (1024 ^ 2) let ncqMinLog = 1024 * (1024 ^ 2) let ncqMaxSegments = 64 let ncqCompactTreshold = 128 * 1024^2 @@ -1282,6 +1296,7 @@ ncqStorageInit_ check path = do ncqCompactBusy <- newTMVarIO () ncqFsyncNum <- newTVarIO 0 ncqLock <- newTVarIO ncqLock_ + ncqCapabilities <- getNumCapabilities let currentName = ncqGetCurrentName_ path ncqGen diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 074326d7..0d3cecf2 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -71,6 +71,7 @@ import Safe import Lens.Micro.Platform import Control.Concurrent.STM qualified as STM import System.IO.Temp qualified as Temp +import System.Mem import UnliftIO @@ -177,6 +178,43 @@ testNCQFuckupRecovery1 TestEnv{..} = flip runContT pure do +testNCQLongWrite :: MonadUnliftIO m => Int -> TestEnv -> m () +testNCQLongWrite n TestEnv{..} = flip runContT pure do + let ncqDir = testEnvDir "ncq-simple" + + -- Step 1: Write block + lift $ withNCQ id ncqDir $ \ncq -> liftIO do + let sto = AnyStorage ncq + replicateM_ n do + size <- randomRIO (1, 256*1024) + let payload = LBS.replicate size 0x41 -- 0x41 = 'A' + h <- putBlock sto payload + assertBool "block written" (isJust h) + + +testNCQLongWriteRead :: MonadUnliftIO m => Int -> TestEnv -> m () +testNCQLongWriteRead n TestEnv{..} = flip runContT pure do + let ncqDir = testEnvDir "ncq-simple" + + wq <- newTQueueIO + + -- Step 1: Write block + lift $ withNCQ id ncqDir $ \ncq -> liftIO do + let sto = AnyStorage ncq + replicateM_ n do + size <- randomRIO (1, 256*1024) + let payload = LBS.replicate size 0x41 -- 0x41 = 'A' + h <- putBlock sto payload + assertBool "block written" (isJust h) + for_ h $ \hhh -> do + atomically $ writeTQueue wq (HashRef hhh) + + r <- atomically $ STM.flushTQueue wq + + for_ r $ \h -> do + s <- ncqLocate ncq h + assertBool "actually written" (isJust s) + testNCQSimple1 :: MonadUnliftIO m => TestEnv -> m () testNCQSimple1 TestEnv{..} = flip runContT pure do let ncqDir = testEnvDir "ncq-simple" @@ -300,7 +338,7 @@ testNCQ1 :: MonadUnliftIO m -> TestEnv -> m () -testNCQ1 n TestEnv{..} = flip runContT pure do +testNCQ1 n TestEnv{..} = flip runContT pure $ callCC \stop -> do let tmp = testEnvDir @@ -313,15 +351,26 @@ testNCQ1 n TestEnv{..} = flip runContT pure do nSize <- newTVarIO 0 - fss <- for [1..n] $ \i -> liftIO do - let fname = inputDir show i <> ".bin" - size <- randomRIO (1, 256*1024) - atomically $ modifyTVar nSize (+size) - file <- LBS.toStrict . LBS.take size <$> LBS.readFile "/dev/urandom" - BS.writeFile fname file - let ha = hashObject @HbSync file - pure (fname, (ha, fromIntegral $ BS.length file)) + tssQ <- newTQueueIO + forM_ [1..n] $ \i -> liftIO do + withBinaryFile "/dev/urandom" ReadMode \urandom -> do + let fname = inputDir show i <> ".bin" + size <- randomRIO (1, 256*1024) + atomically $ modifyTVar' nSize (+size) + file <- BS.copy <$> BS.hGetSome urandom size + BS.writeFile fname file + let !ha = hashObject @HbSync file + let !len = fromIntegral $ BS.length file + -- atomically $ writeTQueue tssQ (fname, (ha, fromIntegral $! BS.length file)) + -- l <- getFileSize fname + -- atomically $ writeTQueue tssQ (fname, (ha, l)) + atomically $ writeTQueue tssQ (fname, (ha, len)) + -- performGC + + fss <- atomically (STM.flushTQueue tssQ) + + stop () liftIO do withNCQ id ncqDir $ \ncq -> flip runContT pure do @@ -334,7 +383,7 @@ testNCQ1 n TestEnv{..} = flip runContT pure do written = readTVarIO twritten <&> HS.toList <&> fmap (,0.1) ContT $ withAsync $ forever do - polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO $ void $ asyncLinked do + polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO do what <- getBlock sto hz >>= orThrowUser ("block not found" <+> pretty hz) let h2 = hashObject @HbSync what @@ -529,6 +578,14 @@ main = do debug $ "test:ncq:fuckup-recovery1" runTest testNCQFuckupRecovery1 + entry $ bindMatch "test:ncq:long-write" $ nil_ $ \case + [ LitIntVal n ] -> runTest $ testNCQLongWrite (fromIntegral n) + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "test:ncq:long-write-read" $ nil_ $ \case + [ LitIntVal n ] -> runTest $ testNCQLongWriteRead (fromIntegral n) + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:test-simple1" $ nil_ $ \case [] -> runTest $ testNCQSimple1 e -> throwIO $ BadFormException @C (mkList e)