This commit is contained in:
voidlizard 2025-07-31 12:15:26 +03:00
parent ced2239d53
commit 1b003ed124
5 changed files with 31 additions and 18 deletions

View File

@ -45,7 +45,7 @@ ncqStorageOpen3 fp upd = do
let ncqFsync = 16 * megabytes
let ncqWriteQLen = 1024 * 4
-- let ncqMinLog = 512 * megabytes
let ncqMinLog = 1 * gigabytes
let ncqMinLog = 1 * gigabytes
let ncqMaxLog = 32 * gigabytes
let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2
let ncqMaxCachedIndex = 64
@ -67,6 +67,7 @@ ncqStorageOpen3 fp upd = do
ncqWrites <- newTVarIO 0
ncqWriteEMA <- newTVarIO 0.0
ncqWriteOps <- V.fromList <$> replicateM wopNum newTQueueIO
ncqSyncOps <- newTQueueIO
ncqReadReq <- newTQueueIO
ncqAlive <- newTVarIO False
ncqStopReq <- newTVarIO False

View File

@ -171,10 +171,10 @@ appendSection fh sect = do
liftIO (Posix.fdWrite fh sect) <&> fromIntegral
{-# INLINE appendSection #-}
appendTailSection :: MonadIO m => Fd -> m ()
appendTailSection :: MonadIO m => Fd -> m NCQFileSize
appendTailSection fh = liftIO do
s <- Posix.fileSize <$> Posix.getFdStatus fh
void (appendSection fh (fileTailRecord s))
appendSection fh (fileTailRecord s) <&> (+ fromIntegral s) . fromIntegral
{-# INLINE appendTailSection #-}

View File

@ -59,9 +59,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
if not stop then STM.retry else pure Nothing
maybe1 what none $ \(fk :: FileKey, fh) -> do
closeFd fh
ncqIndexFile ncq (DataFile fk)
loop
closeFd fh >> ncqIndexFile ncq (DataFile fk) >> loop
let shLast = V.length ncqWriteOps - 1
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
@ -94,6 +92,8 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
spawnActivity measureWPS
-- spawnActivity (ncqStateUpdateLoop ncq)
spawnActivity $ postponed 10 $ forever do
ema <- readTVarIO ncqWriteEMA
@ -124,6 +124,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
spawnActivity $ postponed 15 $ compactLoop 10 600 do
ncqFossilMergeStep ncq
flip fix RunNew $ \loop -> \case
RunFin -> do
debug "exit storage"
@ -141,17 +142,21 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
RunSync (fk, fh, w, total, continue) -> do
stop <- readTVarIO ncqStopReq
sync <- readTVarIO ncqSyncReq
(stop,sync) <- atomically do
(,) <$> readTVar ncqStopReq
<*> readTVar ncqSyncReq
-- <*> readTVar ncqWriteEMA
let needClose = total >= ncqMinLog || stop
rest <- if not (sync || needClose || w > ncqFsync) then
pure w
else do
appendTailSection fh >> liftIO (fileSynchronise fh)
ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
ss <- appendTailSection fh
liftIO (fileSynchronise fh)
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
ncqStateUpdate ncq do
ncqStateAddFact (P (PData (DataFile fk) ss))
@ -207,7 +212,7 @@ ncqStorageRun3 ncq@NCQStorage3{..} = flip runContT pure do
let written = sum ws
loop $ RunSync (fk, fh, w + written, total' + written, True)
wait closer
mapM_ wait [closer]
where
setAlive = atomically $ writeTVar ncqAlive True

View File

@ -96,6 +96,7 @@ data NCQStorage3 =
, ncqWriteEMA :: TVar Double -- for writes-per-seconds
, ncqWriteQ :: TVar (Seq HashRef)
, ncqWriteOps :: Vector (TQueue (IO ()))
, ncqSyncOps :: TQueue (IO ())
, ncqReadReq :: TQueue (HashRef, TMVar (Maybe Location))
, ncqAlive :: TVar Bool
, ncqStopReq :: TVar Bool

View File

@ -395,14 +395,14 @@ testNCQ3Lookup1 syn TestEnv{..} = do
g <- liftIO MWC.createSystemRandom
let (opts, argz) = splitOpts [("-m",0),("-M",0)] syn
let (opts, argz) = splitOpts [("-m",1),("-M",0)] syn
let n = headDef 100000 [ fromIntegral x | LitIntVal x <- argz ]
let nt = max 2 . headDef 1 $ [ fromIntegral x | LitIntVal x <- drop 1 argz ]
let nl = headDef 3 $ [ fromIntegral x | LitIntVal x <- drop 2 argz ]
let r = (64*1024, 256*1024)
let merge = headDef False [ True | ListVal [StringLike "-m"] <- opts ]
let merge = headDef 0 [ step | ListVal [StringLike "-m", LitIntVal step] <- opts ]
let mergeFull = headDef False [ True | ListVal [StringLike "-M"] <- opts ]
notice $ "insert" <+> pretty n <+> "random blocks of size" <+> parens (pretty r) <+> pretty opts
@ -413,6 +413,10 @@ testNCQ3Lookup1 syn TestEnv{..} = do
res <- newTQueueIO
let ntimes n m = flip fix n $ \loop i -> do
r <- m
if r && i > 0 then loop (i - 1) else pure r
ncqWithStorage3 ncqDir $ \sto -> liftIO do
pooledForConcurrentlyN_ 8 sizes $ \size -> do
z <- genRandomBS g size
@ -422,12 +426,14 @@ testNCQ3Lookup1 syn TestEnv{..} = do
hs <- atomically $ STM.flushTQueue thashes
let wrap m = if | mergeFull -> notice "full merge" >> ncqIndexCompactFull sto >> m
| merge ->
| merge > 0 ->
fix \next -> do
notice "run ncqIndexCompactStep"
left <- ncqIndexCompactStep sto
m
if left then next else none
notice $ "run ncqIndexCompactStep" <+> pretty merge
flip fix merge \inner i -> do
left <- ntimes merge (ncqIndexCompactStep sto)
m
if left then next else none
| otherwise -> m
wrap do