From 0c71a7dab0441185add8f3e9d211a6511701efbc Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 22 Jul 2025 08:01:38 +0300 Subject: [PATCH] wip, wtf --- .../lib/HBS2/Storage/NCQ/Types.hs | 3 +- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 219 +++++++----------- hbs2-tests/test/TestNCQ.hs | 33 +-- 3 files changed, 93 insertions(+), 162 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs index 13159bd1..7703fd5f 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ/Types.hs @@ -35,6 +35,7 @@ data NCQStorageException = | NCQStorageCantOpenCurrent | NCQStorageBrokenCurrent | NCQMergeInvariantFailed String + | NCQCompactInvariantFailed String | NCQStorageCantLock FilePath | NCQStorageCantMapFile FilePath deriving stock (Show,Typeable) @@ -48,7 +49,7 @@ instance IsString FileKey where fromString = FileKey . BS8.pack . dropExtension . takeFileName instance Pretty FileKey where - pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) + pretty (FileKey s) = pretty (BS8.unpack s) newtype DataFile a = DataFile a diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 67959e08..d6b95e19 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -119,6 +119,12 @@ type NCQSize = Word32 type StateVersion = Word64 +data NCQIdxEntry = + NCQIdxEntry + { ncqIdxEntryOffset :: !NCQOffset + , ncqIdxEntrySize :: !NCQSize + } + data StateOP = D FileKey | F TimeSpec FileKey | P FileKey deriving (Eq,Ord,Show) @@ -372,6 +378,11 @@ ncqEntryUnwrap _ source = do Nothing -> (k, Left v) {-# INLINE ncqEntryUnwrap #-} + +ncqIdxIsTombSize :: NCQIdxEntry -> Bool +ncqIdxIsTombSize (NCQIdxEntry _ s) = s == ncqSLen + ncqKeyLen + ncqPrefixLen +{-# INLINE ncqIdxIsTombSize #-} + ncqIsTomb :: NCQStorage2 -> Location -> Bool ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of (_, Right (T, _)) -> True @@ -501,7 +512,7 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do Just CachedEntry{..} -> do liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case Nothing -> go (i+1) 0 r - Just (offset, size) -> do + Just (NCQIdxEntry offset size) -> do now <- getTimeCoarse atomically $ writeTVar cachedTs now action (InFossil tfKey cachedMmapedData offset size) >>= \case @@ -514,17 +525,17 @@ ncqSeekInFossils ncq@NCQStorage2{..} href action = useVersion ncq $ const do ncqLookupIndex :: MonadUnliftIO m => HashRef -> (ByteString, NWayHash) - -> m (Maybe ( NCQOffset, NCQSize )) + -> m (Maybe NCQIdxEntry ) ncqLookupIndex hx (mmaped, nway) = do fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) {-# INLINE ncqLookupIndex #-} -decodeEntry :: ByteString -> ( NCQOffset, NCQSize ) +decodeEntry :: ByteString -> NCQIdxEntry decodeEntry entryBs = do let (p,r) = BS.splitAt 8 entryBs let off = fromIntegral (N.word64 p) let size = fromIntegral (N.word32 (BS.take 4 r)) - ( off, size ) + NCQIdxEntry off size {-# INLINE decodeEntry #-} ncqLocateActually :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) @@ -593,7 +604,7 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do CachedEntry{..} -> do ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case Nothing -> none - Just (!offset,!size) -> do + Just (NCQIdxEntry offset size) -> do answer (Just (InFossil fk cachedMmapedData offset size)) next {-# INLINE lookupCached #-} @@ -1222,10 +1233,13 @@ ncqMergeStep ncq@NCQStorage2{..} = do let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a) let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b) + let fIndexNameB = ncqGetFileName ncq $ toFileName (IndexFile b) + -- TODO: proper-exception-handling doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) + doesFileExist fIndexNameB `orFail` ("not exist" <+> pretty fIndexNameB) flip runContT pure $ callCC \exit -> do @@ -1280,159 +1294,102 @@ ncqMergeStep ncq@NCQStorage2{..} = do r <- what unless r (throwIO (NCQMergeInvariantFailed (show e))) +ncqCompactStep :: forall m . MonadUnliftIO m => NCQStorage2 -> m () +ncqCompactStep me@NCQStorage2{..} = withSem ncqMergeSem $ flip runContT pure $ callCC \exit -> do + ContT $ useVersion me --- ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m () --- ncqCompact ncq@NCQStorage2{..} = withSem ncqMergeSem do + files <- lift (ncqListTrackedFiles me) + <&> filter (isNotPending . view _2) . V.toList + <&> fmap (view _1) + <&> zip [0 :: Int ..] + <&> IntMap.fromList --- q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) + for_ (IntMap.elems files) $ \fk -> do --- ncqLinearScanForCompact ncq $ \fk h -> atomically do --- modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) + let datF = ncqGetFileName me (toFileName (DataFile fk)) + dataSize <- liftIO (fileSize datF) + garbage <- lift $ getGargabeSlow fk mempty --- state0 <- readTVarIO q + let realProfit = sum (HM.elems garbage) + let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6) + notice $ "profit" <+> pretty fk <+> pretty dataSize <+> pretty realProfit <+> pretty pfl --- for_ (HM.toList state0) $ \(fk, es) -> do --- trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) + -- (aIdx, fileA, nTombs) <- findFileA files >>= maybe (exit ()) pure --- let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk) + -- notice $ green "compact: fileA" <+> pretty fileA <+> pretty aIdx <+> pretty nTombs --- flip runContT pure do + -- idxA <- lift (viewIndex fileA) + -- tombs <- lift (getTombsInIndex idxA) --- mfile <- ncqGetNewCompactName ncq + -- let (_,self,b) = IntMap.splitLookup aIdx files --- ContT $ bracket none $ const $ rm mfile + -- notice $ green "pretty" <+> viaShow b --- liftIO do --- withBinaryFileAtomic mfile WriteMode $ \fwh -> do --- writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do --- pure $ not $ HS.member k es --- appendTailSection =<< handleToFd fwh + -- for_ (IntMap.elems b) $ \fk -> callCC \skip -> do + -- profit <- lift (getProfit fk tombs) --- result <- fileSize mfile + -- let datF = ncqGetFileName me (toFileName (DataFile fk)) + -- here <- doesFileExist datF --- if result == 0 then do --- atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) --- else do + -- unless here do + -- throwIO (NCQCompactInvariantFailed (show $ "fossil exists" <+> pretty fk)) --- fossil <- ncqGetNewFossilName ncq --- mv mfile fossil + -- dataSize <- liftIO (fileSize datF) --- statA <- getFileStatus fDataNameA + -- when (dataSize == 0) do + -- notice $ "skipped" <+> pretty fk <+> pretty dataSize <+> pretty profit + -- skip () --- let ts = modificationTimeHiRes statA --- setFileTimesHiRes fossil ts ts + -- garbage <- lift (getGargabeSlow fk mempty) + -- let realProfit = sum (HM.elems garbage) --- void $ ncqIndexFile ncq (DataFile (fromString fossil)) --- void $ ncqStateUpdate ncq [F (posixToTimeSpec ts) (fromString fossil)] + -- let pfl = (realToFrac realProfit / realToFrac dataSize) & realToFrac @_ @(Fixed E6) + -- notice $ "profit" <+> pretty fk <+> pretty profit <+> pretty dataSize <+> pretty pfl <+> pretty realProfit --- debug $ "compact done" <+> pretty (HM.size state0) + -- none + where --- NOTE: incremental --- now it may became incremental if we'll --- limit amount of tombs per one pass --- then remove all dead entries, --- then call again to remove tombs. etc --- as for now, seems it should work up to 10TB --- of storage -ncqLinearScanForCompact :: MonadUnliftIO m - => NCQStorage2 - -> ( FileKey -> HashRef -> m () ) - -> m Int -ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do + -- findFileA files = lift do + -- tnums <- for (IntMap.toList files) $ \(i, fk) -> (i, fk,) . HS.size <$> (getTombsInIndex =<< viewIndex fk) + -- pure $ listToMaybe ( List.sortOn ( Down . view _3 ) tnums ) - ContT $ useVersion ncq + viewIndex fk = do + let idxf = ncqGetFileName me $ toFileName (IndexFile fk) + liftIO (nwayHashMMapReadOnly idxf) + >>= orThrow (NCQCompactInvariantFailed (show $ "index exists" <+> pretty fk)) - tracked <- readTVarIO ncqTrackedFiles <&> V.toList + getTombsInIndex :: MonadUnliftIO m => (ByteString, NWayHash) -> m (HashSet HashRef) + getTombsInIndex (idxBs, nway) = HS.fromList <$> S.toList_ do + nwayHashScanAll nway idxBs $ \_ k v -> do + when (k /= ncqEmptyKey && ncqIdxIsTombSize (decodeEntry v) ) do + S.yield (coerce @_ @HashRef k) - let state0 = mempty :: HashMap HashRef TimeSpec + getProfit :: MonadIO m => FileKey -> HashSet HashRef -> m NCQSize + getProfit fk tombs = do + (bs,nw) <- viewIndex fk + r <- S.toList_ $ nwayHashScanAll nw bs$ \_ k v -> do + when (HS.member (coerce k) tombs) $ S.yield $ ncqIdxEntrySize (decodeEntry v) + pure (sum r) - profit <- newTVarIO 0 - tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int)) + getGargabeSlow :: MonadIO m => FileKey -> HashSet HashRef -> m (HashMap HashRef NCQSize) + getGargabeSlow fk tombs = do + let datFile = ncqGetFileName me (toFileName $ DataFile fk) + idx <- viewIndex fk - -- TODO: explicit-unmap-files + mmaped <- liftIO (mmapFileByteString datFile Nothing) - flip fix (tracked, state0) $ \next -> \case - ([], s) -> none - ((TrackedFile{..}):rest, state) -> do - e <- readTVarIO tfCached + r <- newTVarIO mempty + runConsumeBS mmaped do + readSections $ \size bs -> do + let k = coerce @_ @HashRef $ fst (ncqEntryUnwrap me (LBS.toStrict bs)) + found <- isJust <$> lift (ncqLookupIndex k idx) + let garbage = HS.member k tombs || not found + when garbage $ atomically do + modifyTVar' r (HM.insertWith (+) k (fromIntegral size)) - let cqFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) - let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) - - c <- doesFileExist cqFile - d <- doesFileExist dataFile - let pending = not (isNotPending e) - - if not c || not d || pending then - next (rest, state) - else do - - - (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile - >>= orThrow (NWayHashInvalidMetaData cqFile) - - notice $ "SCAN" <+> pretty cqFile - - let emptyKey = BS.replicate nwayKeySize 0 - - found <- S.toList_ do - nwayHashScanAll meta mmaped $ \o k entryBs -> do - unless (k == emptyKey) do - - let off = N.word64 (BS.take 8 entryBs) - let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs)) - - -- debug $ "SCAN SHIT" <+> pretty tfKey <+> pretty off <+> pretty sz - - -- fast-n-dirty-check-for-deleted - when (sz <= ncqSLen + ncqKeyLen + ncqPrefixLen ) do - debug $ red "FOUND EMPTY RECORD" <+> pretty sz - S.yield off - - let kk = coerce k - - case HM.lookup kk state of - Just ts | ts > timeSpecFromFilePrio tfTime -> do - notice $ pretty kk <+> pretty (sz + ncqSLen) - atomically do - modifyTVar profit ( + (sz + ncqSLen) ) - modifyTVar tombUse (HM.adjust (over _2 succ) kk) - lift $ lift $ action (fromString dataFile) kk - - _ -> none - - notice "SURVIVED 2" - - newEntries <- S.toList_ do - unless (List.null found) do - notice $ red "TRY" <+> pretty dataFile - dataBs <- liftIO $ mmapFileByteString dataFile Nothing - notice "SURVIVED 3" - for_ found $ \o -> do - let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs) - - when (pre == ncqRefPrefix || pre == ncqTombPrefix) do - let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs) - let key = coerce (BS.copy keyBs) - unless (HM.member key state) do - S.yield (key, timeSpecFromFilePrio tfTime) - when ( pre == ncqTombPrefix ) do - atomically $ modifyTVar tombUse (HM.insert key (tfKey,0)) - - next (rest, state <> HM.fromList newEntries) - - use <- readTVarIO tombUse - let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ] - - for_ useless $ \(f,h) -> do - atomically $ modifyTVar profit (+ncqFullTombLen) - lift $ action f h - - notice "SURVIVED 3" - - readTVarIO profit <&> fromIntegral + readTVarIO r ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile -> m [FileKey] ncqReadStateKeys me path = liftIO do diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index db9eb367..1e0a6c16 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -969,7 +969,7 @@ testNCQ2Lookup1 syn TestEnv{..} = do Just (CachedEntry{..}) -> do ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case Nothing -> none - Just (o,s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next + Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next Nothing -> do @@ -979,7 +979,7 @@ testNCQ2Lookup1 syn TestEnv{..} = do Just CachedEntry{..} -> do ncqLookupIndex h (cachedMmapedIdx, cachedNway) >>= \case Nothing -> none - Just (o,s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next + Just (NCQIdxEntry o s) -> atomically (putTMVar answ (Just (N2.InFossil tfKey cachedMmapedData o s))) >> next atomically (putTMVar answ Nothing) >> next @@ -1663,34 +1663,7 @@ main = do notice $ "should be deleted" <+> pretty (HS.size deleted) <+> "/" <+> pretty tnum - useVersion sto $ const do - tfs <- N2.ncqListTrackedFiles sto <&> filter (isNotPending . view _2) . V.toList - - t0 <- getTimeCoarse - for_ tfs $ \(fk,_,_) -> void $ runMaybeT do - - let idxf = N2.ncqGetFileName sto $ toFileName (IndexFile fk) - - (idxBs, nway) <- liftIO $ nwayHashMMapReadOnly idxf - >>= orThrowUser "can't mmap index" - - stat' <- S.toList_ $ nwayHashScanAll nway idxBs $ \_ k v -> do - unless (k == ncqEmptyKey) do - let (o,s) = decodeEntry v - when ( s == ncqSLen + ncqKeyLen + ncqPrefixLen ) do - let hk = coerce @_ @HashRef k - S.yield (fk, 1) - - let stat = HM.fromListWith (+) stat' - for_ (HM.toList stat) $ \(k, num) -> do - notice $ pretty k <+> pretty num - - t1 <- getTimeCoarse - - let dt = realToFrac (toNanoSecs (t1 - t0)) * 1e-9 & sec3 - - notice $ "scan time" <+> pretty dt - + ncqCompactStep sto entry $ bindMatch "test:ncq2:del1" $ nil_ $ \syn -> do