wip, sweep routines

This commit is contained in:
voidlizard 2025-07-29 18:04:08 +03:00
parent 29d5025e19
commit a5dbfe5e0b
9 changed files with 252 additions and 10 deletions

View File

@ -65,6 +65,7 @@ library
HBS2.Storage.NCQ3.Internal.Types
HBS2.Storage.NCQ3.Internal.Prelude
HBS2.Storage.NCQ3.Internal.State
HBS2.Storage.NCQ3.Internal.Sweep
HBS2.Storage.NCQ3.Internal.Run
HBS2.Storage.NCQ3.Internal.Memtable
HBS2.Storage.NCQ3.Internal.Index

View File

@ -35,6 +35,8 @@ import System.Posix.Files ( getFileStatus
)
import System.Posix.Files qualified as PFS
import System.IO.MMap as MMap
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TSem
ncqStorageOpen3 :: MonadIO m => FilePath -> (NCQStorage3 -> NCQStorage3) -> m NCQStorage3
ncqStorageOpen3 fp upd = do
@ -71,6 +73,8 @@ ncqStorageOpen3 fp upd = do
ncqOnRunWriteIdle <- newTVarIO none
ncqSyncNo <- newTVarIO 0
ncqState <- newTVarIO mempty
ncqStateKey <- newTVarIO mempty
ncqServiceSem <- atomically $ newTSem 1
let ncq = NCQStorage3{..} & upd
@ -118,8 +122,8 @@ ncqPutBS ncq@NCQStorage3{..} mtp mhref bs' = ncqOperation ncq (pure $ fromMaybe
putTMVar waiter h
atomically do
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
modifyTVar ncqWrites succ
nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps)
writeTQueue (ncqWriteOps ! nw) work
atomically $ takeTMVar waiter
@ -180,9 +184,8 @@ ncqTryLoadState me@NCQStorage3{..} = do
ncqIndexFile me dataFile
for_ (bad <> drop 3 (fmap snd rest)) $ \f -> do
for_ (bad <> fmap snd rest) $ \f -> do
let old = ncqGetFileName me (StateFile f)
-- debug $ "rm old state" <+> pretty old
rm old
where

View File

@ -39,5 +39,29 @@ ncqListFilesBy me@NCQStorage3{..} filt = do
pure $ List.sortOn ( Down . fst ) r
ncqFindMinPairOf :: forall fa m . (ToFileName fa, MonadUnliftIO m)
=> NCQStorage3
-> [fa]
-> m (Maybe (NCQFileSize, fa, fa))
ncqFindMinPairOf sto lst = do
let files = fmap (\x -> (x, ncqGetFileName sto x)) lst
flip fix (files, Nothing) $ \next (fs, r) -> do
case fs of
[] -> pure r
[ _ ] -> pure r
( s1 : s2 : ss ) -> do
size1 <- fsize (snd s1)
size2 <- fsize (snd s2)
let size = fromIntegral $ size1 + size2
case r of
Nothing -> next (s2 : ss, Just (size, fst s1, fst s2) )
e@(Just (size0, _, _)) | size0 > size -> next (s2 : ss, Just (size, fst s1, fst s2) )
| otherwise -> next (s2:ss, e)
where fsize s = liftIO (PFS.getFileStatus s) <&> PFS.fileSize

View File

@ -13,6 +13,8 @@ import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.ByteString qualified as BS
import System.IO.MMap
import System.IO.Temp as Temp
import Streaming.Prelude qualified as S
data IndexEntry = IndexEntry {-# UNPACK #-} !FileKey !Word64 !Word32
@ -32,6 +34,12 @@ unpackIndexEntry entryBs = do
emptyKey :: ByteString
emptyKey = BS.replicate 32 0
ncqIndexAlloc :: NWayHashAlloc
ncqIndexAlloc = nwayAllocDef 1.10 32 8 16
ncqIndexAllocForMerge :: NWayHashAlloc
ncqIndexAllocForMerge = nwayAllocDef 0.8 32 8 16
ncqLookupIndex :: MonadUnliftIO m
=> HashRef
-> (ByteString, NWayHash)
@ -67,7 +75,7 @@ ncqIndexFile n fk = runMaybeT do
let (dir,name) = splitFileName fp
let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$"
result <- lift $ nwayWriteBatch (nwayAllocDef 1.10 32 8 16) dir idxTemp items
result <- lift $ nwayWriteBatch ncqIndexAlloc dir idxTemp items
mv result dest
@ -96,6 +104,59 @@ ncqIndexFile n fk = runMaybeT do
pure dest
{-HLINT ignore "Functor law"-}
ncqIndexCompactStep :: MonadUnliftIO m
=> NCQStorage3
-> m Bool
ncqIndexCompactStep me@NCQStorage3{..} = flip runContT pure $ callCC \exit -> do
idx <- readTVarIO ncqState
<&> fmap (IndexFile . snd) . ncqStateIndex
r' <- lift $ ncqFindMinPairOf me idx
(_, a, b) <- ContT $ maybe1 r' (pure False)
let idx1Name = ncqGetFileName me a
let idx2Name = ncqGetFileName me b
(bs1, nw1) <- lift (nwayHashMMapReadOnly idx1Name) >>= \case
Nothing -> err ("missed file" <+> pretty idx1Name) >> exit False
Just e -> pure e
(bs2, nw2) <- lift (nwayHashMMapReadOnly idx2Name) >>= \case
Nothing -> err ("missed file" <+> pretty idx2Name) >> exit False
Just e -> pure e
e <- S.toList_ do
nwayHashScanAll nw1 bs1 $ \_ k v -> unless (k == emptyKey) do
S.yield (k,v)
nwayHashScanAll nw2 bs2 \_ k v -> unless (k == emptyKey) do
r <- liftIO (nwayHashLookup nw1 bs1 k)
unless (isJust r) do
S.yield (k,v)
let dir = ncqGetWorkDir me
ts <- liftIO (PFS.getFileStatus idx1Name) <&> PFS.modificationTimeHiRes
result <- lift $ nwayWriteBatch ncqIndexAllocForMerge dir "merged-.cq$" e
liftIO $ PFS.setFileTimesHiRes result ts ts
fki <- ncqGetNewFileKey me IndexFile
mv result (ncqGetFileName me (IndexFile fki))
ncqStateUpdate me do
ncqStateDelIndexFile (coerce a)
ncqStateDelIndexFile (coerce b)
ncqStateAddIndexFile ts fki
pure True
ncqStorageScanDataFile :: MonadIO m
=> NCQStorage3
-> FilePath

View File

@ -8,6 +8,7 @@ import HBS2.Storage.NCQ3.Internal.Files
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Index
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Sweep
import HBS2.Storage.NCQ3.Internal.MMapCache
@ -89,6 +90,12 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
spawnActivity measureWPS
spawnActivity $ forever do
withSem ncqServiceSem (ncqSweepObsoleteStates ncq)
pause @'Seconds 10
spawnActivity (ncqSweepLoop ncq)
flip fix RunNew $ \loop -> \case
RunFin -> do
debug "exit storage"

View File

@ -41,9 +41,11 @@ ncqStateUpdate ncq@NCQStorage3{..} action = do
readTVar ncqState
unless (s1 == s0) do
snkFile <- ncqGetNewFileKey ncq StateFile <&> ncqGetFileName ncq . StateFile
key <- ncqGetNewFileKey ncq StateFile
let snkFile = ncqGetFileName ncq (StateFile key)
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
IO.hPrint fh (pretty s1)
atomically $ writeTVar ncqStateKey (Just key)
ncqStateAddDataFile :: FileKey -> StateOP ()
ncqStateAddDataFile fk = do
@ -71,6 +73,12 @@ ncqStateAddIndexFile ts fk = do
NCQStorage3{..} <- ask
StateOP $ lift $ modifyTVar' ncqState (sortIndexes . over #ncqStateIndex ((Down ts, fk) :))
ncqStateDelIndexFile :: FileKey -> StateOP ()
ncqStateDelIndexFile fk = do
NCQStorage3{..} <- ask
StateOP $ lift $ modifyTVar' ncqState (over #ncqStateIndex $ filter f)
where f (_,b) = b /= fk
sortIndexes :: NCQState -> NCQState
sortIndexes = over #ncqStateIndex (List.sortOn fst)

View File

@ -0,0 +1,64 @@
module HBS2.Storage.NCQ3.Internal.Sweep where
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.Files
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Index
import Data.Generics.Uniplate.Operations
import Data.Generics.Uniplate.Data()
import Data.List qualified as List
import Data.HashSet qualified as HS
import System.Posix.Files qualified as PFS
import Control.Monad.Trans.Maybe
data SweepSt = SweepWaitIdle
| SweepCheckEMA SweepSt
| SweepSomething
ncqSweepLoop :: MonadUnliftIO m => NCQStorage3 -> m ()
ncqSweepLoop me@NCQStorage3{..} = flip fix SweepWaitIdle $ \next -> \case
SweepWaitIdle -> do
debug "SweepWaitIdle"
pause @'Seconds 10
next (SweepCheckEMA SweepSomething)
SweepCheckEMA who -> do
ema <- readTVarIO ncqWriteEMA
debug $ "SweepCheckEMA" <+> pretty ema
if ema < ncqIdleThrsh then do
next who
else
next SweepWaitIdle
SweepSomething -> do
debug $ "SweepSomething"
pause @'Seconds 10
next SweepWaitIdle
ncqSweepObsoleteStates :: forall m . MonadUnliftIO m => NCQStorage3 -> m ()
ncqSweepObsoleteStates me@NCQStorage3{..} = void $ runMaybeT do
debug $ "ncqSweepObsoleteStates"
k <- readTVarIO ncqStateKey >>= toMPlus
r <- liftIO $ try @_ @SomeException do
ts <- PFS.getFileStatus (ncqGetFileName me (StateFile k)) <&> PFS.modificationTimeHiRes
filez <- ncqListFilesBy me (List.isPrefixOf "s-")
for_ filez $ \(t,f) -> do
when (f /= k && t < ts) do
debug $ yellow "TO REMOVE" <+> pretty (toFileName (StateFile f))
rm (ncqGetFileName me (StateFile f))
lift do
case r of
Left e -> err ("SweepStates failed" <+> viaShow e)
Right{} -> none

View File

@ -4,12 +4,12 @@ module HBS2.Storage.NCQ3.Internal.Types where
import HBS2.Storage.NCQ3.Internal.Prelude
import Data.Generics.Product
import Numeric (readHex)
import Data.Data
import Data.Set qualified as Set
import Data.HashSet qualified as HS
import Text.Printf
-- import Lens.Micro.Platform
import Control.Concurrent.STM.TSem (TSem,waitTSem,signalTSem)
data CachedData = CachedData !ByteString
@ -24,11 +24,15 @@ type StateVersion = Word64
newtype FileKey = FileKey Word32
deriving newtype (Eq,Ord,Show,Num,Enum,Real,Integral,Pretty,Hashable)
deriving stock (Data,Generic)
deriving stock instance Eq (DataFile FileKey)
deriving stock instance Ord (DataFile FileKey)
deriving stock instance Eq (IndexFile FileKey)
deriving stock instance Ord (IndexFile FileKey)
deriving stock instance Data (IndexFile FileKey)
deriving stock instance Data (DataFile FileKey)
deriving stock instance Data (StateFile FileKey)
data NCQEntry =
NCQEntry
@ -37,6 +41,7 @@ data NCQEntry =
}
type NCQOffset = Word64
type NCQFileSize = NCQOffset
type NCQSize = Word32
data Location =
@ -47,9 +52,10 @@ data Location =
data Fact =
FI (DataFile FileKey) (IndexFile FileKey) -- file X has index Y
| P PData -- pending, not indexed
deriving stock (Eq,Ord)
deriving stock (Eq,Ord,Data)
data PData = PData (DataFile FileKey) Word64
deriving stock (Data)
instance Ord PData where
compare (PData a _) (PData b _) = compare a b
@ -65,7 +71,7 @@ data NCQState =
, ncqStateVersion :: StateVersion
, ncqStateFacts :: Set Fact
}
deriving stock (Eq,Generic)
deriving stock (Eq,Generic,Data)
data NCQStorage3 =
NCQStorage3
@ -86,6 +92,7 @@ data NCQStorage3 =
, ncqMMapCachedData :: TVar (HashPSQ FileKey CachePrio CachedData)
, ncqMemTable :: Vector Shard
, ncqState :: TVar NCQState
, ncqStateKey :: TVar (Maybe FileKey)
, ncqWrites :: TVar Int
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
, ncqWriteQ :: TVar (Seq HashRef)
@ -96,10 +103,10 @@ data NCQStorage3 =
, ncqSyncReq :: TVar Bool
, ncqOnRunWriteIdle :: TVar (IO ())
, ncqSyncNo :: TVar Int
, ncqServiceSem :: TSem
}
instance Monoid FileKey where
mempty = FileKey 0
@ -140,6 +147,12 @@ instance Pretty Location where
ncqMakeFossilName :: FileKey -> FilePath
ncqMakeFossilName = printf "f-%08x.data" . coerce @_ @Word32
withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a
withSem sem action =
bracket (atomically (waitTSem sem))
(const $ atomically (signalTSem sem))
(const action)
ncqState0 :: NCQState
ncqState0 = NCQState{..}
where

View File

@ -30,6 +30,7 @@ import Data.Config.Suckless.System
import NCQTestCommon
import Data.HashSet qualified as HS
import Test.Tasty.HUnit
import Data.ByteString qualified as BS
import Data.Ord
@ -39,6 +40,7 @@ import Control.Concurrent.STM qualified as STM
import Data.List qualified as List
import UnliftIO
{-HLINT ignore "Functor law"-}
ncq3Tests :: forall m . MonadUnliftIO m => MakeDictM C m ()
ncq3Tests = do
@ -141,3 +143,62 @@ ncq3Tests = do
notice $ "found:" <+> pretty (coerce @_ @HashRef k) <+> viaShow e
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq3:merge" $ nil_ \e -> do
let (opts,args) = splitOpts [] e
let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ]
g <- liftIO MWC.createSystemRandom
runTest $ \TestEnv{..} -> do
ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> do
notice $ "write" <+> pretty num
hst <- newTVarIO ( mempty :: HashSet HashRef )
replicateM_ num do
n <- liftIO $ uniformRM (1024, 64*1024) g
bs <- liftIO $ genRandomBS g n
h <- ncqPutBS sto (Just B) Nothing bs
atomically $ modifyTVar hst (HS.insert h)
idx <- readTVarIO ncqState
<&> ncqStateIndex
<&> fmap (IndexFile . snd)
r <- ncqFindMinPairOf sto idx
notice $ pretty r
fix $ \loop -> do
notice "compacting once"
w <- ncqIndexCompactStep sto
when w loop
nstate <- readTVarIO ncqState
notice $ "new state" <> line <> pretty nstate
hss <- readTVarIO hst
for_ hss $ \h -> do
found <- ncqLocate sto h <&> isJust
liftIO $ assertBool (show $ "found" <+> pretty h) found
entry $ bindMatch "test:ncq3:sweep" $ nil_ \e -> do
let (opts,args) = splitOpts [] e
let num = headDef 1000 [ fromIntegral n | LitIntVal n <- args ]
g <- liftIO MWC.createSystemRandom
runTest $ \TestEnv{..} -> do
ncqWithStorage3 testEnvDir $ \sto@NCQStorage3{..} -> do
notice $ "write" <+> pretty num
hst <- newTVarIO ( mempty :: HashSet HashRef )
replicateM_ num do
n <- liftIO $ uniformRM (1024, 64*1024) g
bs <- liftIO $ genRandomBS g n
h <- ncqPutBS sto (Just B) Nothing bs
atomically $ modifyTVar hst (HS.insert h)
pause @'Seconds 300