fixing memory hunger on intensive write

This commit is contained in:
voidlizard 2025-06-05 15:43:48 +03:00
parent 050914ac7a
commit fc7a5c5e9f
2 changed files with 135 additions and 63 deletions

View File

@ -77,6 +77,7 @@ import System.Posix.Files qualified as PFS
import System.IO.Error (catchIOError) import System.IO.Error (catchIOError)
import System.IO.MMap as MMap import System.IO.MMap as MMap
import System.IO.Temp (emptyTempFile) import System.IO.Temp (emptyTempFile)
import System.Mem
-- import Foreign.Ptr -- import Foreign.Ptr
-- import Foreign di -- import Foreign di
import qualified Data.ByteString.Internal as BSI import qualified Data.ByteString.Internal as BSI
@ -149,11 +150,13 @@ data NCQStorage =
NCQStorage NCQStorage
{ ncqRoot :: FilePath { ncqRoot :: FilePath
, ncqGen :: Int , ncqGen :: Int
, ncqQLen :: Int
, ncqSyncSize :: Int , ncqSyncSize :: Int
, ncqMinLog :: Int , ncqMinLog :: Int
, ncqMaxSegments :: Int , ncqMaxSegments :: Int
, ncqMaxCached :: Int , ncqMaxCached :: Int
, ncqCompactTreshold :: Int , ncqCompactTreshold :: Int
, ncqCapabilities :: Int
, ncqSalt :: HashRef , ncqSalt :: HashRef
, ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem)
, ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64)))
@ -352,6 +355,11 @@ ncqAddTrackedFilesSTM NCQStorage{..} keys = do
writeTVar ncqTrackedFiles new writeTVar ncqTrackedFiles new
ncqWaitForSlotSTM :: NCQStorage -> STM ()
ncqWaitForSlotSTM NCQStorage{..} = do
s <- readTVar ncqWriteQueue <&> HPSQ.size
when ( s >= ncqQLen ) STM.retry
ncqListTrackedFiles :: MonadIO m => NCQStorage -> m [FilePath] ncqListTrackedFiles :: MonadIO m => NCQStorage -> m [FilePath]
ncqListTrackedFiles ncq = do ncqListTrackedFiles ncq = do
let wd = ncqGetCurrentDir ncq let wd = ncqGetCurrentDir ncq
@ -429,9 +437,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
checkCompact <- makeCheckCompact checkCompact <- makeCheckCompact
checkMerge <- makeCheckMerge checkMerge <- makeCheckMerge
flagWatcher <- makeFlagWatcher flagWatcher <- makeFlagWatcher
sweep <- makeSweep
mapM_ waitCatch [writer,indexer,merge,compact] mapM_ waitCatch [writer,indexer,merge,compact]
mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge] mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge,sweep]
where where
@ -475,6 +484,35 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
again again
makeSweep = do
ContT $ withAsync $ liftIO $ fix \next -> do
pause @'Seconds 10
toClose <- atomically do
usage <- readTVar ncqCurrentUsage
staged <- readTVar ncqStaged
indexed <- readTVar ncqIndexed
let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage)
let closable = do
(f, _) <- dead
guard (IntSet.member f indexed)
guard (maybe True HPSQ.null (IntMap.lookup f staged))
pure f
writeTVar ncqCurrentUsage (IntMap.fromList alive)
writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable)
writeTVar ncqStaged (foldr IntMap.delete staged closable)
pure closable
for_ toClose $ \f -> do
debug $ "CLOSE FD" <+> pretty f
closeFd (fromIntegral f)
next
makeReader = do makeReader = do
cap <- getNumCapabilities cap <- getNumCapabilities
reader <- ContT $ withAsync $ untilStopped do reader <- ContT $ withAsync $ untilStopped do
@ -639,42 +677,36 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
initQ <- readTVarIO ncqWriteQueue initQ <- readTVarIO ncqWriteQueue
wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of wResult <- flip runContT pure $ callCC \exit -> do
Nothing -> pure mempty
Just (h,_,WQItem{..},rest) -> do
flip fix (0,initQ,mempty) \next (written,q,rq) -> do
-- we really have to write tomb prefix here when (written >= syncData) $ exit rq
let b = byteString (coerce @_ @ByteString h)
<> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData)
let wbs = toLazyByteString b -- when (HPSQ.null q) $ exit rq
let len = LBS.length wbs
let ws = N.bytestring32 (fromIntegral len)
let w = ncqSLen + len
off <- fdSeek fh SeekFromEnd 0 case HPSQ.minView q of
Nothing -> pure rq
Just (h,_,WQItem{..},rest) -> do
if isNothing wqData && wqNew then let b = byteString (coerce @_ @ByteString h)
pure () <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData)
else void do
liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs))
-- liftIO $ fileSynchronise fh
(written',sz) <- if written < syncData then do let wbs = toLazyByteString b
pure (written + w,0) let len = LBS.length wbs
else do let ws = N.bytestring32 (fromIntegral len)
ncqFsync ncq fh let w = ncqSLen + len
fsize <- getFdStatus fh <&> PFS.fileSize
pure (0,fromIntegral fsize)
off <- liftIO $ fdSeek fh SeekFromEnd 0
-- off <- fdSeek fh SeekFromEnd 0 <&> subtract (fromIntegral w) ww <- if isNothing wqData && wqNew then
pure 0
else do
liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs))
<&> fromIntegral
if sz < ncqMinLog then do let item = (h, (fromIntegral off, fromIntegral len))
((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) next (written + ww, rest, item : rq )
else do
pure [(h, (fromIntegral off, fromIntegral len))]
ncqFsync ncq fh ncqFsync ncq fh
size <- fdSeek fh SeekFromEnd 0 size <- fdSeek fh SeekFromEnd 0
@ -737,30 +769,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
debug $ "TRUNCATED, moved to" <+> pretty fossilized debug $ "TRUNCATED, moved to" <+> pretty fossilized
-- ncqSweep
toClose <- atomically do
usage <- readTVar ncqCurrentUsage
staged <- readTVar ncqStaged
indexed <- readTVar ncqIndexed
let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage)
let closable = do
(f, _) <- dead
guard (IntSet.member f indexed)
guard (maybe True HPSQ.null (IntMap.lookup f staged))
pure f
writeTVar ncqCurrentUsage (IntMap.fromList alive)
writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable)
writeTVar ncqStaged (foldr IntMap.delete staged closable)
pure closable
for_ toClose $ \f -> do
debug $ "CLOSE FD" <+> pretty f
closeFd (fromIntegral f)
-- --
ncqStoragePut_ :: MonadUnliftIO m ncqStoragePut_ :: MonadUnliftIO m
=> Bool => Bool
@ -781,6 +790,7 @@ ncqStoragePut_ check ncq@NCQStorage{..} h lbs = flip runContT pure $ callCC \exi
now <- getTimeCoarse now <- getTimeCoarse
atomically do atomically do
ncqWaitForSlotSTM ncq
let wqi = WQItem True (Just lbs) let wqi = WQItem True (Just lbs)
modifyTVar ncqWriteQueue (HPSQ.insert h now wqi) modifyTVar ncqWriteQueue (HPSQ.insert h now wqi)
modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs))
@ -1001,6 +1011,8 @@ ncqStorageGet_ ncq@NCQStorage{..} = \case
InCurrent (fd,o,l) -> do InCurrent (fd,o,l) -> do
r <- atomically do r <- atomically do
a <- newEmptyTMVar a <- newEmptyTMVar
inRQ <- readTVar ncqCurrentReadReq <&> Seq.length
when (inRQ > 4 * ncqCapabilities) STM.retry
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1)
modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) modifyTVar ncqCurrentReadReq (|> (fd, o, l, a))
pure a pure a
@ -1048,6 +1060,7 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do
now <- getTimeCoarse now <- getTimeCoarse
let writeTombstone wq = do let writeTombstone wq = do
ncqWaitForSlotSTM ncq
let recordPrefixLen = ncqSLen + ncqKeyLen + ncqPrefixLen let recordPrefixLen = ncqSLen + ncqKeyLen + ncqPrefixLen
modifyTVar ncqWriteQueue (HPSQ.insert h now wq) modifyTVar ncqWriteQueue (HPSQ.insert h now wq)
modifyTVar ncqNotWritten (+ recordPrefixLen) modifyTVar ncqNotWritten (+ recordPrefixLen)
@ -1250,7 +1263,8 @@ ncqStorageInit_ check path = do
let ncqRoot = path let ncqRoot = path
let ncqSyncSize = 64 * (1024 ^ 2) let ncqQLen = 32000
let ncqSyncSize = 32 * (1024 ^ 2)
let ncqMinLog = 1024 * (1024 ^ 2) let ncqMinLog = 1024 * (1024 ^ 2)
let ncqMaxSegments = 64 let ncqMaxSegments = 64
let ncqCompactTreshold = 128 * 1024^2 let ncqCompactTreshold = 128 * 1024^2
@ -1282,6 +1296,7 @@ ncqStorageInit_ check path = do
ncqCompactBusy <- newTMVarIO () ncqCompactBusy <- newTMVarIO ()
ncqFsyncNum <- newTVarIO 0 ncqFsyncNum <- newTVarIO 0
ncqLock <- newTVarIO ncqLock_ ncqLock <- newTVarIO ncqLock_
ncqCapabilities <- getNumCapabilities
let currentName = ncqGetCurrentName_ path ncqGen let currentName = ncqGetCurrentName_ path ncqGen

View File

@ -71,6 +71,7 @@ import Safe
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM qualified as STM
import System.IO.Temp qualified as Temp import System.IO.Temp qualified as Temp
import System.Mem
import UnliftIO import UnliftIO
@ -177,6 +178,43 @@ testNCQFuckupRecovery1 TestEnv{..} = flip runContT pure do
testNCQLongWrite :: MonadUnliftIO m => Int -> TestEnv -> m ()
testNCQLongWrite n TestEnv{..} = flip runContT pure do
let ncqDir = testEnvDir </> "ncq-simple"
-- Step 1: Write block
lift $ withNCQ id ncqDir $ \ncq -> liftIO do
let sto = AnyStorage ncq
replicateM_ n do
size <- randomRIO (1, 256*1024)
let payload = LBS.replicate size 0x41 -- 0x41 = 'A'
h <- putBlock sto payload
assertBool "block written" (isJust h)
testNCQLongWriteRead :: MonadUnliftIO m => Int -> TestEnv -> m ()
testNCQLongWriteRead n TestEnv{..} = flip runContT pure do
let ncqDir = testEnvDir </> "ncq-simple"
wq <- newTQueueIO
-- Step 1: Write block
lift $ withNCQ id ncqDir $ \ncq -> liftIO do
let sto = AnyStorage ncq
replicateM_ n do
size <- randomRIO (1, 256*1024)
let payload = LBS.replicate size 0x41 -- 0x41 = 'A'
h <- putBlock sto payload
assertBool "block written" (isJust h)
for_ h $ \hhh -> do
atomically $ writeTQueue wq (HashRef hhh)
r <- atomically $ STM.flushTQueue wq
for_ r $ \h -> do
s <- ncqLocate ncq h
assertBool "actually written" (isJust s)
testNCQSimple1 :: MonadUnliftIO m => TestEnv -> m () testNCQSimple1 :: MonadUnliftIO m => TestEnv -> m ()
testNCQSimple1 TestEnv{..} = flip runContT pure do testNCQSimple1 TestEnv{..} = flip runContT pure do
let ncqDir = testEnvDir </> "ncq-simple" let ncqDir = testEnvDir </> "ncq-simple"
@ -300,7 +338,7 @@ testNCQ1 :: MonadUnliftIO m
-> TestEnv -> TestEnv
-> m () -> m ()
testNCQ1 n TestEnv{..} = flip runContT pure do testNCQ1 n TestEnv{..} = flip runContT pure $ callCC \stop -> do
let tmp = testEnvDir let tmp = testEnvDir
@ -313,15 +351,26 @@ testNCQ1 n TestEnv{..} = flip runContT pure do
nSize <- newTVarIO 0 nSize <- newTVarIO 0
fss <- for [1..n] $ \i -> liftIO do tssQ <- newTQueueIO
let fname = inputDir </> show i <> ".bin"
size <- randomRIO (1, 256*1024)
atomically $ modifyTVar nSize (+size)
file <- LBS.toStrict . LBS.take size <$> LBS.readFile "/dev/urandom"
BS.writeFile fname file
let ha = hashObject @HbSync file
pure (fname, (ha, fromIntegral $ BS.length file))
forM_ [1..n] $ \i -> liftIO do
withBinaryFile "/dev/urandom" ReadMode \urandom -> do
let fname = inputDir </> show i <> ".bin"
size <- randomRIO (1, 256*1024)
atomically $ modifyTVar' nSize (+size)
file <- BS.copy <$> BS.hGetSome urandom size
BS.writeFile fname file
let !ha = hashObject @HbSync file
let !len = fromIntegral $ BS.length file
-- atomically $ writeTQueue tssQ (fname, (ha, fromIntegral $! BS.length file))
-- l <- getFileSize fname
-- atomically $ writeTQueue tssQ (fname, (ha, l))
atomically $ writeTQueue tssQ (fname, (ha, len))
-- performGC
fss <- atomically (STM.flushTQueue tssQ)
stop ()
liftIO do liftIO do
withNCQ id ncqDir $ \ncq -> flip runContT pure do withNCQ id ncqDir $ \ncq -> flip runContT pure do
@ -334,7 +383,7 @@ testNCQ1 n TestEnv{..} = flip runContT pure do
written = readTVarIO twritten <&> HS.toList <&> fmap (,0.1) written = readTVarIO twritten <&> HS.toList <&> fmap (,0.1)
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO $ void $ asyncLinked do polling (Polling 0.25 0.25) written $ \(HashRef hz) -> liftIO do
what <- getBlock sto hz >>= orThrowUser ("block not found" <+> pretty hz) what <- getBlock sto hz >>= orThrowUser ("block not found" <+> pretty hz)
let h2 = hashObject @HbSync what let h2 = hashObject @HbSync what
@ -529,6 +578,14 @@ main = do
debug $ "test:ncq:fuckup-recovery1" debug $ "test:ncq:fuckup-recovery1"
runTest testNCQFuckupRecovery1 runTest testNCQFuckupRecovery1
entry $ bindMatch "test:ncq:long-write" $ nil_ $ \case
[ LitIntVal n ] -> runTest $ testNCQLongWrite (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:long-write-read" $ nil_ $ \case
[ LitIntVal n ] -> runTest $ testNCQLongWriteRead (fromIntegral n)
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:test-simple1" $ nil_ $ \case entry $ bindMatch "test:ncq:test-simple1" $ nil_ $ \case
[] -> runTest $ testNCQSimple1 [] -> runTest $ testNCQSimple1
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)