From e5b4b27901b63fe03e0ce32ced391a0ef3aa16b0 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 11 Jul 2025 12:37:13 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs | 211 +++++++++++++++++++++- hbs2-tests/test/TestNCQ.hs | 13 ++ 2 files changed, 216 insertions(+), 8 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 1485dffb..727130da 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -47,7 +47,7 @@ import Data.IntMap (IntMap) import Data.IntSet qualified as IntSet import Data.IntSet (IntSet) import Data.Sequence qualified as Seq -import Data.Sequence (Seq(..), (|>)) +import Data.Sequence (Seq(..), (|>),(<|)) import Data.List qualified as List import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy.Char8 qualified as LBS8 @@ -110,6 +110,10 @@ type Shard = TVar (HashMap HashRef NCQEntry) type NCQOffset = Word64 type NCQSize = Word32 +data NCQFlag = + NCQMergeNow | NCQCompactNow + deriving (Eq,Ord,Generic) + data Location = InFossil ByteString NCQOffset NCQSize | InMemory ByteString @@ -124,7 +128,8 @@ data NCQStorage2 = , ncqWriteBlock :: Int , ncqMinLog :: Int , ncqMaxCached :: Int - , ncqMemTable :: Vector Shard + , ncqIdleThrsh :: Double + , ncqMemTable :: Vector Shard , ncqWriteSem :: TSem , ncqWriteQ :: TVar (Seq HashRef) , ncqStorageTasks :: TVar Int @@ -133,8 +138,10 @@ data NCQStorage2 = , ncqSyncNo :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqCachedEntries :: TVar Int - } deriving (Generic) - + , ncqWrites :: TVar Int + , ncqWriteEMA :: TVar Double -- for writes-per-seconds + , ncqJobQ :: TQueue (IO ()) + } megabytes :: forall a . Integral a => a megabytes = 1024 ^ 2 @@ -148,6 +155,8 @@ ncqStorageOpen2 fp upd = do let ncqMinLog = 256 * megabytes let ncqWriteBlock = 1024 let ncqMaxCached = 128 + let ncqIdleThrsh = 50.00 + cap <- getNumCapabilities <&> fromIntegral ncqWriteQ <- newTVarIO mempty ncqWriteSem <- atomically $ newTSem 16 -- (fromIntegral cap) @@ -158,6 +167,9 @@ ncqStorageOpen2 fp upd = do ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 ncqStorageTasks <- newTVarIO 0 + ncqWrites <- newTVarIO 0 + ncqWriteEMA <- newTVarIO 0.00 + ncqJobQ <- newTQueueIO let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" @@ -194,6 +206,11 @@ ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewFossilName ncq = do liftIO $ emptyTempFile (ncqGetWorkDir ncq) "fossil-.data" +ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath +ncqGetNewCompactName n@NCQStorage2{} = do + let (p,tpl) = splitFileName (ncqGetFileName n "compact-.data") + liftIO $ emptyTempFile p tpl + ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do atomically $ writeTVar ncqStorageStopReq True @@ -225,6 +242,8 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do let bs = ncqMakeSectionBS mtp h bs' atomically do waitTSem ncqWriteSem + + modifyTVar' ncqWrites succ stop <- readTVar ncqStorageStopReq filled <- readTVar ncqWriteQ <&> Seq.length @@ -233,6 +252,7 @@ ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do ncqAlterEntrySTM ncq h $ \case Just e -> Just e Nothing -> Just (NCQEntry bs) + modifyTVar' ncqWriteQ (|> h) signalTSem ncqWriteSem @@ -349,8 +369,18 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do link closer - jobz <- ContT $ withAsync $ forever (atomically (readTQueue jobQ) >>= join) - link jobz + spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) + + spawnActivity measureWPS + + spawnActivity $ forever do + ema <- readTVarIO ncqWriteEMA + + when (ema < ncqIdleThrsh) do + debug "SPAWN MERGE" + spawnJob $ void (ncqStorageMergeStep ncq) + + pause @'Seconds 10 ContT $ bracket none $ const $ liftIO do fhh <- atomically (STM.flushTQueue closeQ) @@ -422,8 +452,8 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do where - emptyKey = BS.replicate ncqKeyLen 0 + emptyKey = BS.replicate ncqKeyLen 0 openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do @@ -432,6 +462,33 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) + spawnJob :: IO () -> m () + spawnJob m = atomically $ writeTQueue ncqJobQ m + + spawnActivity m = do + a <- ContT $ withAsync m + link a + pure a + + measureWPS = void $ flip fix Nothing \loop -> \case + Nothing -> do + w <- readTVarIO ncqWrites + t <- getTimeCoarse + pause @'Seconds step >> loop (Just (w,t)) + + Just (w0,t0) -> do + w1 <- readTVarIO ncqWrites + t1 <- getTimeCoarse + let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 + dw = fromIntegral (w1 - w0) + atomically $ modifyTVar' ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema + pause @'Seconds step >> loop (Just (w1,t1)) + + where + alpha = 0.1 + step = 1.00 + + ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () ncqFileFastCheck fp = do mmaped <- liftIO $ mmapFileByteString fp Nothing @@ -749,6 +806,144 @@ ncqStorageMergeStep ncq@NCQStorage2{..} = ncqRunTask ncq False $ flip runContT unless r (throwIO (NCQMergeInvariantFailed (show e))) +ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m () +ncqCompact ncq@NCQStorage2{..} = do + + q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) + + ncqLinearScanForCompact ncq $ \fk h -> atomically do + modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) + + state0 <- readTVarIO q + + for_ (HM.toList state0) $ \(fk, es) -> do + trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) + + let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk) + let fIndexNameA = ncqGetFileName ncq (toFileName (IndexFile fk)) + + flip runContT pure do + + mfile <- ncqGetNewCompactName ncq + + ContT $ bracket none $ const do + rm mfile + + liftIO do + withBinaryFileAtomic mfile WriteMode $ \fwh -> do + writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do + pure $ not $ HS.member k es + appendTailSection =<< handleToFd fwh + + result <- fileSize mfile + + if result == 0 then do + atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) + else do + + fossil <- ncqGetNewFossilName ncq + mv mfile fossil + + statA <- getFileStatus fDataNameA + + let ts = modificationTimeHiRes statA + setFileTimesHiRes fossil ts ts + + fname <- ncqIndexFile ncq (DataFile (fromString fossil)) + + atomically do + let fp = fromString fname + modifyTVar ncqTrackedFiles (HPSQ.delete fk) + ncqAddTrackedFileSTM ncq fp (posixToTimeSpec ts) + + mapM_ rm [fDataNameA, fIndexNameA] + + debug $ "compact done" <+> pretty (HM.size state0) + + +-- NOTE: incremental +-- now it may became incremental if we'll +-- limit amount of tombs per one pass +-- then remove all dead entries, +-- then call again to remove tombs. etc +-- as for now, seems it should work up to 10TB +-- of storage +ncqLinearScanForCompact :: MonadUnliftIO m + => NCQStorage2 + -> ( FileKey -> HashRef -> m () ) + -> m Int +ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do + + + tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList + + let state0 = mempty :: HashMap HashRef TimeSpec + + profit <- newTVarIO 0 + tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int)) + + -- TODO: explicit-unmap-files + + flip fix (tracked, state0) $ \next -> \case + ([], s) -> none + ((fk,p,_):rest, state) -> do + + let cqFile = ncqGetFileName ncq (toFileName (IndexFile fk)) + let dataFile = ncqGetFileName ncq (toFileName (DataFile fk)) + + (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile + >>= orThrow (NWayHashInvalidMetaData cqFile) + + let emptyKey = BS.replicate nwayKeySize 0 + + found <- S.toList_ do + nwayHashScanAll meta mmaped $ \o k entryBs -> do + unless (k == emptyKey) do + + let off = N.word64 (BS.take 8 entryBs) + let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs)) + + when (sz == ncqPrefixLen || sz == ncqPrefixLen + 32) do + S.yield off + + let kk = coerce k + + case HM.lookup kk state of + Just ts | ts > timeSpecFromFilePrio p -> do + notice $ pretty kk <+> pretty (sz + ncqSLen) + atomically do + modifyTVar profit ( + (sz + ncqSLen) ) + modifyTVar tombUse (HM.adjust (over _2 succ) kk) + lift $ lift $ action (fromString dataFile) kk + + _ -> none + + newEntries <- S.toList_ do + unless (List.null found) do + dataBs <- liftIO $ mmapFileByteString dataFile Nothing + for_ found $ \o -> do + let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs) + + when (pre == ncqRefPrefix || pre == ncqTombPrefix) do + let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs) + let key = coerce (BS.copy keyBs) + unless (HM.member key state) do + S.yield (key, timeSpecFromFilePrio p) + when ( pre == ncqTombPrefix ) do + atomically $ modifyTVar tombUse (HM.insert key (fk,0)) + + next (rest, state <> HM.fromList newEntries) + + use <- readTVarIO tombUse + let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ] + + for_ useless $ \(f,h) -> do + atomically $ modifyTVar profit (+ncqFullTombLen) + lift $ action f h + + readTVarIO profit <&> fromIntegral + + writeFiltered :: forall m . MonadIO m => NCQStorage2 -> FilePath @@ -789,7 +984,7 @@ zeroSyncEntrySize :: Word64 zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) {-# INLINE zeroSyncEntrySize #-} --- 1. It's B-record +-- 1. It's M-record -- 2. It's last w64be == fileSize -- 3. It's hash == hash (bytestring64be fileSize) -- 4. recovery-strategy: start-to-end, end-to-start diff --git a/hbs2-tests/test/TestNCQ.hs b/hbs2-tests/test/TestNCQ.hs index 4e4c8cc6..9d8183ac 100644 --- a/hbs2-tests/test/TestNCQ.hs +++ b/hbs2-tests/test/TestNCQ.hs @@ -1105,6 +1105,19 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq2:ema" $ nil_ $ const do + notice "test:ncq2:ema" + runTest $ \TestEnv{..} -> do + g <- liftIO MWC.createSystemRandom + let dir = testEnvDir "ncq1" + let n = 50000 + ncqWithStorage dir $ \sto -> do + replicateM_ n do + ncqPutBS sto (Just B) Nothing =<< genRandomBS g (256*1024) + + notice $ "written" <+> pretty n + + pause @'Seconds 120 entry $ bindMatch "test:filter:emulate-1" $ nil_ $ \case [ LitIntVal n ] -> runTest $ testFilterEmulate1 (fromIntegral n)