From b0851401d75e4ff6b64077d39079c84988f0abf9 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 29 May 2025 17:50:29 +0300 Subject: [PATCH] check for compact --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 103 ++++++++++++++--------- 1 file changed, 62 insertions(+), 41 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 3c158f1a..0cdff658 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -147,31 +147,33 @@ newtype WFd = WFd { unWfd :: Fd } data NCQStorage = NCQStorage - { ncqRoot :: FilePath - , ncqGen :: Int - , ncqSyncSize :: Int - , ncqMinLog :: Int - , ncqMaxLog :: Int - , ncqMaxCached :: Int - , ncqSalt :: HashRef - , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) - , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) - , ncqIndexed :: TVar IntSet - , ncqIndexNow :: TVar Int - , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) - , ncqCachedEntries :: TVar Int - , ncqNotWritten :: TVar Word64 - , ncqLastWritten :: TVar TimeSpec - , ncqCurrentFd :: TVar (Maybe (RFd,WFd)) - , ncqCurrentUsage :: TVar (IntMap Int) - , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) - , ncqLock :: TVar FL.FileLock - , ncqFsyncNum :: TVar Int - , ncqFlushNow :: TVar [TQueue ()] - , ncqMergeReq :: TVar Int - , ncqCompactReq :: TVar Int - , ncqOpenDone :: TMVar Bool - , ncqStopped :: TVar Bool + { ncqRoot :: FilePath + , ncqGen :: Int + , ncqSyncSize :: Int + , ncqMinLog :: Int + , ncqMaxLog :: Int + , ncqMaxCached :: Int + , ncqCompactTreshold :: Int + , ncqSalt :: HashRef + , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) + , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) + , ncqIndexed :: TVar IntSet + , ncqIndexNow :: TVar Int + , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) + , ncqCachedEntries :: TVar Int + , ncqNotWritten :: TVar Word64 + , ncqLastWritten :: TVar TimeSpec + , ncqCurrentFd :: TVar (Maybe (RFd,WFd)) + , ncqCurrentUsage :: TVar (IntMap Int) + , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) + , ncqLock :: TVar FL.FileLock + , ncqFsyncNum :: TVar Int + , ncqFlushNow :: TVar [TQueue ()] + , ncqMergeReq :: TVar Int + , ncqCompactReq :: TVar Int + , ncqCompactBusy :: TMVar () + , ncqOpenDone :: TMVar Bool + , ncqStopped :: TVar Bool } @@ -205,6 +207,11 @@ ncqDataOffset :: forall a b . (Integral a, Integral b) => a -> b ncqDataOffset base = fromIntegral base + ncqSLen + ncqKeyLen {-# INLINE ncqDataOffset #-} + +ncqFullTombLen :: forall a . Integral a => a +ncqFullTombLen = ncqSLen + ncqKeyLen + ncqPrefixLen + 0 +{-# INLINE ncqFullTombLen #-} + instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where putBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs @@ -414,16 +421,16 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug "RUNNING STORAGE!" - reader <- makeReader - writer <- makeWriter indexQ - indexer <- makeIndexer writer indexQ - merge <- makeMerge - compact <- makeCompact - flagWatcher <- makeFlagWatcher + reader <- makeReader + writer <- makeWriter indexQ + indexer <- makeIndexer writer indexQ + merge <- makeMerge + compact <- makeCompact + checkCompact <- makeCheckCompact + flagWatcher <- makeFlagWatcher mapM_ waitCatch [writer,indexer,merge,compact] - -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] - mapM_ cancel [reader,flagWatcher] + mapM_ cancel [reader,flagWatcher,checkCompact] where @@ -494,6 +501,15 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link reader pure reader + makeCheckCompact = do + ContT $ withAsync $ untilStopped do + pause @'Seconds 600 + debug "SCAN/CHECK FOR COMPACT" + profit <- ncqLinearScanForCompact ncq (\_ _ -> none) + -- FIXME: profit-hardcode + when (profit >= ncqCompactTreshold ) do + atomically $ modifyTVar ncqCompactReq succ + makeCompact = do me <- ContT $ withAsync $ untilStopped do @@ -1228,6 +1244,7 @@ ncqStorageInit_ check path = do let ncqSyncSize = 64 * (1024 ^ 2) let ncqMinLog = 1024 * (1024 ^ 2) let ncqMaxLog = 4 * (1024 ^ 3) + let ncqCompactTreshold = 128 * 1024^2 let ncqMaxCached = 128 @@ -1253,6 +1270,7 @@ ncqStorageInit_ check path = do ncqIndexed <- newTVarIO mempty ncqMergeReq <- newTVarIO 0 ncqCompactReq <- newTVarIO 0 + ncqCompactBusy <- newEmptyTMVarIO ncqFsyncNum <- newTVarIO 0 ncqLock <- newTVarIO ncqLock_ @@ -1572,13 +1590,16 @@ ncqLinearScanForCompact :: MonadUnliftIO m => NCQStorage -> ( FileKey -> HashRef -> m () ) -> m Int -ncqLinearScanForCompact ncq@NCQStorage{..} action = do +ncqLinearScanForCompact ncq@NCQStorage{..} action = flip runContT pure do + + ContT $ bracket ( atomically (takeTMVar ncqCompactBusy) ) $ const do + atomically $ putTMVar ncqCompactBusy () tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList let state0 = mempty :: HashMap HashRef TimeSpec - bodyCount <- newTVarIO 0 + profit <- newTVarIO 0 tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int)) -- TODO: explicit-unmap-files @@ -1590,7 +1611,7 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do let cqFile = ncqGetIndexFileName ncq fk let dataFile = ncqGetDataFileName ncq fk - (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly cqFile + (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile >>= orThrow (NWayHashInvalidMetaData cqFile) let emptyKey = BS.replicate nwayKeySize 0 @@ -1610,9 +1631,9 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do case HM.lookup kk state of Just ts | ts > timeSpecFromFilePrio p -> do atomically do - modifyTVar bodyCount succ + modifyTVar profit ( + (sz + ncqSLen) ) modifyTVar tombUse (HM.adjust (over _2 succ) kk) - lift $ action (fromString dataFile) kk + lift $ lift $ action (fromString dataFile) kk _ -> none @@ -1636,10 +1657,10 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ] for_ useless $ \(f,h) -> do - atomically $ modifyTVar bodyCount succ - action f h + atomically $ modifyTVar profit (+ncqFullTombLen) + lift $ action f h - readTVarIO bodyCount + readTVarIO profit <&> fromIntegral ncqStorageCompact :: MonadUnliftIO m => NCQStorage -> m () ncqStorageCompact NCQStorage{..} = do