From 5a8ad51ee4fe4aad9ff568e012df6e9387d2f72e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 3 Jun 2025 10:10:36 +0300 Subject: [PATCH] new merge --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 109 +++++++++++++---------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index b9496131..6296c3e0 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -151,7 +151,7 @@ data NCQStorage = , ncqGen :: Int , ncqSyncSize :: Int , ncqMinLog :: Int - , ncqMaxLog :: Int + , ncqMaxSegments :: Int , ncqMaxCached :: Int , ncqCompactTreshold :: Int , ncqSalt :: HashRef @@ -427,10 +427,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do merge <- makeMerge compact <- makeCompact checkCompact <- makeCheckCompact + checkMerge <- makeCheckMerge flagWatcher <- makeFlagWatcher mapM_ waitCatch [writer,indexer,merge,compact] - mapM_ cancel [reader,flagWatcher,checkCompact] + mapM_ cancel [reader,flagWatcher,checkCompact,checkMerge] where @@ -507,7 +508,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do debug "SCAN/CHECK FOR COMPACT" profit <- ncqLinearScanForCompact ncq (\_ _ -> none) -- FIXME: profit-hardcode - when (profit >= ncqCompactTreshold ) do + when ( profit >= ncqCompactTreshold ) do atomically $ modifyTVar ncqCompactReq succ makeCompact = do @@ -536,6 +537,14 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link me pure me + makeCheckMerge = do + ContT $ withAsync $ untilStopped do + pause @'Seconds 600 + debug "CHECK FOR MERGE" + num <- readTVarIO ncqTrackedFiles <&> HPSQ.size + when (num > ncqMaxSegments) do + atomically $ modifyTVar ncqMergeReq succ + makeMerge = do me <- ContT $ withAsync $ untilStopped do micropause @'Seconds 10 @@ -1241,10 +1250,10 @@ ncqStorageInit_ check path = do let ncqRoot = path - let ncqSyncSize = 64 * (1024 ^ 2) - let ncqMinLog = 1024 * (1024 ^ 2) - let ncqMaxLog = 4 * (1024 ^ 3) - let ncqCompactTreshold = 128 * 1024^2 + let ncqSyncSize = 64 * (1024 ^ 2) + let ncqMinLog = 1024 * (1024 ^ 2) + let ncqMaxSegments = 64 + let ncqCompactTreshold = 128 * 1024^2 let ncqMaxCached = 128 @@ -1475,26 +1484,37 @@ ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m () ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m () -ncqStorageMergeStep ncq@NCQStorage{..} = do - tracked <- readTVarIO ncqTrackedFiles - <&> HPSQ.toList - <&> fmap (over _2 (coerce @_ @TimeSpec)) - <&> List.sortOn (view _2) - <&> List.take 2 +ncqStorageMergeStep ncq@NCQStorage{..} = flip runContT pure do + ContT $ bracket ( atomically (takeTMVar ncqCompactBusy) ) $ const do + atomically $ putTMVar ncqCompactBusy () - for_ tracked $ \(f, t, _) -> do - debug $ "FILE TO MERGE" <+> pretty (realToFrac @_ @(Fixed E6) t) <+> pretty f + liftIO do - mergeStep (fmap (view _1) tracked) + tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + files <- for tracked $ \(f,p,_) -> do + let fn = ncqGetDataFileName ncq f + sz <- liftIO (fileSize fn) + pure (f, sz, p) + + let found = flip fix (files, Nothing, Nothing) $ \next -> \case + ([], _, r) -> r + (a:b:rest, Nothing, _) -> + next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) + + (a:b:rest, Just s, _ ) | view _2 a + view _2 b < s -> + next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) + + (_:rest, s, r) -> + next (rest, s, r) + + case found of + Just (a,b) -> mergeStep a b + _ -> none where - - mergeStep [] = none - mergeStep [_] = none - - mergeStep [b,a] = do + mergeStep (a,_,p1) (b,_,p2) = do warn $ "merge" <+> pretty a <+> pretty b let fDataNameA = ncqGetDataFileName ncq a @@ -1503,8 +1523,8 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do let fDataNameB = ncqGetDataFileName ncq b let fIndexNameB = ncqGetIndexFileName ncq b - warn $ "file A" <+> pretty fDataNameA <+> pretty fIndexNameA - warn $ "file B" <+> pretty fDataNameB <+> pretty fIndexNameB + warn $ "file A" <+> pretty (timeSpecFromFilePrio p1) <+> pretty fDataNameA <+> pretty fIndexNameA + warn $ "file B" <+> pretty (timeSpecFromFilePrio p2) <+> pretty fDataNameB <+> pretty fIndexNameB doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) @@ -1526,52 +1546,45 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do debug $ "SCAN FILE A" <+> pretty fDataNameA - writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do - pure $ not (ncqIsTomb (LBS.fromStrict v)) + writeFiltered ncq fDataNameA fwh $ \_ _ _ _ -> do + pure True debug $ "SCAN FILE B" <+> pretty fDataNameA - writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do - let tomb = ncqIsTomb (LBS.fromStrict v) + writeFiltered ncq fDataNameB fwh $ \_ _ k _ -> do foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust - let skip = tomb || foundInA + let skip = foundInA pure $ not skip - result <- fileSize mfile - - when (result == 0) $ exit () liftIO do - fossil <- ncqGetNewFossilName ncq - mv mfile fossil + result <- fileSize mfile - statA <- getFileStatus fDataNameA - - let ts = modificationTimeHiRes statA - setFileTimesHiRes fossil ts ts - - fname <- ncqIndexFile ncq fossil + fp' <- if result == 0 then + pure Nothing + else do + fossil <- ncqGetNewFossilName ncq + mv mfile fossil + statA <- getFileStatus fDataNameA + let ts = modificationTimeHiRes statA + setFileTimesHiRes fossil ts ts + Just . (ts,) . fromString <$> ncqIndexFile ncq fossil atomically do - let fp = fromString fname modifyTVar ncqTrackedFiles (HPSQ.delete a) modifyTVar ncqTrackedFiles (HPSQ.delete b) - ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] + for_ fp' $ \(ts,fp) -> do + ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] - mergeStep _ = do - mergeError "assertion failed: more than 2 files to merge" - - mergeError d = throwIO (NCQMergeInvariantFailed (show d)) orFail what e = do r <- what unless r (throwIO (NCQMergeInvariantFailed (show e))) - posixToTimeSpec :: POSIXTime -> TimeSpec posixToTimeSpec pt = let (s, frac) = properFraction pt :: (Integer, POSIXTime) @@ -1692,8 +1705,8 @@ ncqCompact ncq@NCQStorage{..} = do liftIO do withBinaryFileAtomic mfile WriteMode $ \fwh -> do - writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do - pure $ not $ HS.member k es + writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do + pure $ not $ HS.member k es result <- fileSize mfile