wip, ncqLinearScanForCompact

This commit is contained in:
voidlizard 2025-05-29 09:54:25 +03:00
parent 09528cbf9a
commit a5cd25a34a
6 changed files with 216 additions and 9 deletions

View File

@ -18,6 +18,7 @@ import HBS2.Storage
import HBS2.Storage.Operations.Class
import HBS2.Storage.Operations.ByteString
import HBS2.Storage.Operations.Missed
import HBS2.Storage.Operations.Delete
import HBS2.Net.Auth.Schema()
@ -161,13 +162,7 @@ It's just an easy way to create a such thing, you may browse it by hbs2 cat -H
entry $ bindMatch "hbs2:tree:delete" $ nil_ \case
[HashLike href] -> do
sto <- getStorage
what <- S.toList_ $ deepScan ScanDeep (const none) (coerce href) (getBlock sto) $ \ha -> do
S.yield ha
for_ (reverse what) $ \ha -> do
display_ $ "deleting" <+> pretty ha
delBlock sto ha
deleteMerkleTree sto href
_ -> throwIO (BadFormException @c nil)

View File

@ -127,6 +127,7 @@ library
, HBS2.Storage.Operations.Class
, HBS2.Storage.Operations.ByteString
, HBS2.Storage.Operations.Missed
, HBS2.Storage.Operations.Delete
, HBS2.System.Logger.Simple
, HBS2.System.Logger.Simple.ANSI
, HBS2.System.Logger.Simple.Class

View File

@ -0,0 +1,27 @@
module HBS2.Storage.Operations.Delete where
import HBS2.Prelude.Plated
import HBS2.Data.Detect
import HBS2.Data.Types.Refs
import HBS2.Hash
import HBS2.Merkle
import HBS2.Storage
import HBS2.System.Logger.Simple
import Streaming.Prelude qualified as S
import Streaming.Prelude (Stream, Of(..))
import Control.Monad.Trans.Maybe
import Control.Monad
import Data.Coerce
import Data.Maybe
deleteMerkleTree :: MonadIO m => AnyStorage -> HashRef -> m ()
deleteMerkleTree sto root = do
what <- S.toList_ $ deepScan ScanDeep (const none) (coerce root) (getBlock sto) $ \ha -> do
S.yield ha
for_ (reverse what) $ \ha -> do
delBlock sto ha

View File

@ -25,6 +25,8 @@ runMonkeys RPC2Context{..} = flip runContT pure do
idleSleep = 120
-- FIXME: does-not-work-well
-- IDLE detection is weak
idleMonkey = do
flip fix 0 $ \next bytes0 -> do
ByPassStat{..} <- liftIO rpcByPassInfo

View File

@ -122,6 +122,10 @@ newtype FilePrio = FilePrio (Down TimeSpec)
mkFilePrio :: TimeSpec -> FilePrio
mkFilePrio = FilePrio . Down
timeSpecFromFilePrio :: FilePrio -> TimeSpec
timeSpecFromFilePrio (FilePrio what) = getDown what
{-# INLINE timeSpecFromFilePrio #-}
data CachedEntry =
CachedEntry { cachedMmapedIdx :: ByteString
, cachedMmapedData :: ByteString
@ -1050,7 +1054,9 @@ ncqFixIndexes ncq@NCQStorage{..} = do
ncqAddTrackedFilesIO ncq [newKey]
ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage
ncqStorageOpen :: MonadUnliftIO m
=> FilePath
-> m NCQStorage
ncqStorageOpen fp' = do
flip fix 0 $ \next i -> do
fp <- liftIO $ makeAbsolute fp'
@ -1506,3 +1512,80 @@ posixToTimeSpec pt =
in TimeSpec (fromIntegral s) ns
-- NOTE: incremental
-- now it may became incremental if we'll
-- limit amount of tombs per one pass
-- then remove all dead entries,
-- then call again to remove tombs. etc
ncqLinearScanForCompact :: MonadUnliftIO m
=> NCQStorage
-> ( FileKey -> HashRef -> m () )
-> m Int
ncqLinearScanForCompact ncq@NCQStorage{..} action = do
tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList
let state0 = mempty :: HashMap HashRef TimeSpec
bodyCount <- newTVarIO 0
tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int))
flip fix (tracked, state0) $ \next -> \case
([], s) -> none
((fk,p,_):rest, state) -> do
let cqFile = ncqGetIndexFileName ncq fk
let dataFile = ncqGetDataFileName ncq fk
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly cqFile
>>= orThrow (NWayHashInvalidMetaData cqFile)
let emptyKey = BS.replicate nwayKeySize 0
found <- S.toList_ do
nwayHashScanAll meta mmaped $ \o k entryBs -> do
unless (k == emptyKey) do
let off = N.word64 (BS.take 8 entryBs)
let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs))
when (sz == ncqPrefixLen || sz == ncqPrefixLen + 32) do
S.yield off
let kk = coerce k
case HM.lookup kk state of
Just ts | ts > timeSpecFromFilePrio p -> do
atomically do
modifyTVar bodyCount succ
modifyTVar tombUse (HM.adjust (over _2 succ) kk)
lift $ action (fromString dataFile) kk
_ -> none
newEntries <- S.toList_ do
unless (List.null found) do
dataBs <- liftIO $ mmapFileByteString dataFile Nothing
for_ found $ \o -> do
let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs)
when (pre == ncqRefPrefix || pre == ncqTombPrefix) do
let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs)
let key = coerce (BS.copy keyBs)
unless (HM.member key state) do
S.yield (key, timeSpecFromFilePrio p)
when ( pre == ncqTombPrefix ) do
atomically $ modifyTVar tombUse (HM.insert key (fk,0))
next (rest, state <> HM.fromList newEntries)
use <- readTVarIO tombUse
let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ]
for_ useless $ \(f,h) -> do
atomically $ modifyTVar bodyCount succ
action f h
readTVarIO bodyCount

View File

@ -16,6 +16,7 @@ import HBS2.Merkle
import HBS2.Storage
import HBS2.Storage.Simple
import HBS2.Storage.Operations.ByteString
import HBS2.Storage.Operations.Delete
import HBS2.Net.Auth.Credentials
import HBS2.Peer.Proto.RefLog
import HBS2.Peer.Proto.LWWRef
@ -24,13 +25,14 @@ import HBS2.Data.Types.SignedBox
import HBS2.System.Logger.Simple.ANSI
import HBS2.Storage.NCQ
import HBS2.Data.Log.Structured.SD
import HBS2.Data.Log.Structured.NCQ
import HBS2.CLI.Run.Internal.Merkle
import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Script as SC
import Data.Config.Suckless.System
import Data.Config.Suckless.System as SF
import Data.Config.Suckless.Script.File as SF
import DBPipe.SQLite hiding (field)
@ -70,6 +72,7 @@ import System.Random
import Safe
import Lens.Micro.Platform
import Control.Concurrent.STM qualified as STM
import Data.Hashable
import UnliftIO
@ -486,6 +489,20 @@ pragma synchronous=normal;
pure $ mkSym (show $ pretty m)
entry $ bindMatch "ncq:merkle:del" $ nil_ \syn -> do
(sto,root) <- case syn of
[ isOpaqueOf @TCQ -> Just tcq, HashLike root ] -> lift do
ncq <- AnyStorage <$> getNCQ tcq
pure (ncq, root)
e -> throwIO $ BadFormException @C (mkList e)
lift do
deleteMerkleTree sto root
entry $ bindMatch "ncq:merkle:read:stdout" $ nil_ \syn -> do
(tcq,h) <- case syn of
[ isOpaqueOf @TCQ -> Just tcq, HashLike f ] -> lift do
@ -501,6 +518,88 @@ pragma synchronous=normal;
LBS.putStr lbs
entry $ bindMatch "ncq:sd:scan:test" $ \case
[StringLike fn] -> liftIO do
isDir <- SF.doesDirectoryExist fn
files <- if not isDir then
pure [fn]
else do
S.toList_ do
glob ["**/*.data"] [] fn $ \f -> do
S.yield f
pure True
ttombs <- newTVarIO 0
rrefs <- newTVarIO 0
for_ files $ \fp -> do
mmaped <- liftIO $ mmapFileByteString fp Nothing
runConsumeBS mmaped do
readSections $ \size bs -> do
let ssz = LBS.length bs
let (_, rest1) = LBS.splitAt 32 bs
let (prefix, _) = LBS.splitAt ncqPrefixLen rest1 & over _1 LBS.toStrict
if | prefix == ncqTombPrefix -> do
atomically $ modifyTVar ttombs succ
| prefix == ncqRefPrefix -> do
atomically $ modifyTVar rrefs succ
| otherwise -> none
r <- readTVarIO rrefs
t <- readTVarIO ttombs
pure $ mkList [mkInt t, mkInt r]
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:scan:for-compact" $ nil_ \syn -> do
ncq@NCQStorage{..} <- case syn of
[ isOpaqueOf @TCQ -> Just tcq ] -> lift $ getNCQ tcq
e -> throwIO $ BadFormException @C (mkList e)
ncqLinearScanForCompact ncq $ \fk h -> do
notice $ "TO DELETE" <+> pretty fk <+> pretty h
entry $ bindMatch "ncq:nway:scan:cq:test" $ \case
[StringLike fn] -> liftIO do
isDir <- SF.doesDirectoryExist fn
files <- if not isDir then
pure [fn]
else do
S.toList_ do
glob ["**/*.cq"] [] fn $ \f -> do
S.yield f
pure True
counters <- newTVarIO mempty
for_ files $ \f -> do
(mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly f >>= orThrow (NWayHashInvalidMetaData f)
let emptyKey = BS.replicate nwayKeySize 0
nwayHashScanAll meta mmaped $ \o k v -> do
unless (k == emptyKey) do
atomically do
let k1 = hash k `mod` (2 ^ 17)
modifyTVar counters (IntMap.insertWith (+) k1 1)
r <- readTVarIO counters <&> IntMap.size
pure $ mkInt r
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "ncq:nway:stats" $ \case
[StringLike fn] -> liftIO do