From f4f2b26be6e26ef7fa62f9d542d95a4114877772 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 29 May 2025 11:38:38 +0300 Subject: [PATCH] ncqCompact --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 123 ++++++++++++++++++----- hbs2-tests/test/TCQ.hs | 13 +++ 2 files changed, 110 insertions(+), 26 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 965333f5..0f0b9c00 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -277,6 +277,13 @@ ncqGetNewMergeName n@NCQStorage{} = do let (p,tpl) = splitFileName fn liftIO $ emptyTempFile p tpl + +ncqGetNewCompactName :: MonadIO m => NCQStorage -> m FilePath +ncqGetNewCompactName n@NCQStorage{} = do + let fn = ncqGetFileName n "compact-.data" + let (p,tpl) = splitFileName fn + liftIO $ emptyTempFile p tpl + ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath ncqGetIndexFileName ncq fk = do ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq") @@ -1380,6 +1387,36 @@ withNCQ setopts p action = flip runContT pure do pure e +writeFiltered :: forall m . MonadIO m + => NCQStorage + -> FilePath + -> Handle + -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) + -> m () + +writeFiltered ncq fn out filt = do + ncqStorageScanDataFile ncq fn $ \o s k v -> do + skip <- filt o s k v <&> not + + when skip do + debug $ pretty k <+> pretty "skipped" + + unless skip $ liftIO do + BS.hPut out (LBS.toStrict (makeEntryLBS k v)) + + where + + makeEntryLBS h bs = do + let b = byteString (coerce @_ @ByteString h) + <> byteString bs + + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = byteString (N.bytestring32 (fromIntegral len)) + + toLazyByteString (ws <> b) + + ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m () ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ @@ -1399,21 +1436,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do where - writeFiltered :: forall m . MonadIO m - => FilePath - -> Handle - -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) - -> m () - - writeFiltered fn out filt = do - ncqStorageScanDataFile ncq fn $ \o s k v -> do - skip <- filt o s k v <&> not - - when skip do - debug $ pretty k <+> pretty "skipped" - - unless skip $ liftIO do - BS.hPut out (LBS.toStrict (makeEntryLBS k v)) mergeStep [] = none mergeStep [_] = none @@ -1450,12 +1472,12 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do debug $ "SCAN FILE A" <+> pretty fDataNameA - writeFiltered fDataNameA fwh $ \_ _ _ v -> do + writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do pure $ not (ncqIsTomb (LBS.fromStrict v)) debug $ "SCAN FILE B" <+> pretty fDataNameA - writeFiltered fDataNameB fwh $ \_ _ k v -> do + writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do let tomb = ncqIsTomb (LBS.fromStrict v) foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust let skip = tomb || foundInA @@ -1494,15 +1516,6 @@ ncqStorageMergeStep ncq@NCQStorage{..} = do r <- what unless r (throwIO (NCQMergeInvariantFailed (show e))) - makeEntryLBS h bs = do - let b = byteString (coerce @_ @ByteString h) - <> byteString bs - - let wbs = toLazyByteString b - let len = LBS.length wbs - let ws = byteString (N.bytestring32 (fromIntegral len)) - - toLazyByteString (ws <> b) posixToTimeSpec :: POSIXTime -> TimeSpec @@ -1517,6 +1530,8 @@ posixToTimeSpec pt = -- limit amount of tombs per one pass -- then remove all dead entries, -- then call again to remove tombs. etc +-- as for now, seems it should work up to 10TB +-- of storage ncqLinearScanForCompact :: MonadUnliftIO m => NCQStorage -> ( FileKey -> HashRef -> m () ) @@ -1530,6 +1545,8 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do bodyCount <- newTVarIO 0 tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int)) + -- TODO: explicit-unmap-files + flip fix (tracked, state0) $ \next -> \case ([], s) -> none ((fk,p,_):rest, state) -> do @@ -1589,3 +1606,57 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do readTVarIO bodyCount +ncqCompact :: MonadUnliftIO m => NCQStorage -> m () +ncqCompact ncq@NCQStorage{..} = do + + q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) + + ncqLinearScanForCompact ncq $ \fk h -> atomically do + modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) + + state0 <- readTVarIO q + + for_ (HM.toList state0) $ \(fk, es) -> do + notice $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) + + let fDataNameA = ncqGetDataFileName ncq fk + let fIndexNameA = ncqGetIndexFileName ncq fk + + flip runContT pure $ callCC \exit -> do + + + mfile <- ncqGetNewCompactName ncq + + ContT $ bracket none $ const do + rm mfile + + liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do + + writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do + pure $ not $ HS.member k es + + liftIO do + + result <- fileSize mfile + + if result == 0 then do + atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) + else do + + fossil <- ncqGetNewFossilName ncq + mv mfile fossil + + statA <- getFileStatus fDataNameA + + let ts = modificationTimeHiRes statA + setFileTimesHiRes fossil ts ts + + fname <- ncqIndexFile ncq fossil + + atomically do + let fp = fromString fname + modifyTVar ncqTrackedFiles (HPSQ.delete fk) + ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] + + mapM_ rm [fDataNameA, fIndexNameA] + diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index d1f5cdeb..514365a1 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -230,6 +230,19 @@ main = do pure nil + entry $ bindMatch "ncq:compact" $ \syn -> lift do + + tcq <- case syn of + [ isOpaqueOf @TCQ -> Just tcq ] -> do + pure tcq + + e -> throwIO $ BadFormException @C (mkList e) + + ncq <- getNCQ tcq + ncqCompact ncq + + pure nil + entry $ bindMatch "ncq:merge" $ \syn -> lift do tcq <- case syn of