diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs index 727130da..41e9183c 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ2.hs @@ -373,14 +373,29 @@ ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do spawnActivity measureWPS - spawnActivity $ forever do - ema <- readTVarIO ncqWriteEMA + spawnActivity $ fix \again -> (>> again) do + ema <- readTVarIO ncqWriteEMA - when (ema < ncqIdleThrsh) do - debug "SPAWN MERGE" - spawnJob $ void (ncqStorageMergeStep ncq) + if ema > ncqIdleThrsh then do + pause @'Seconds 2.5 - pause @'Seconds 10 + else do + mq <- newEmptyTMVarIO + + spawnJob $ do + merged <- ncqStorageMergeStep ncq + atomically $ putTMVar mq merged + + -- TODO: detect-dead-merge + void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case + Left{} -> none + Right True -> none + Right False -> do + debug "merge: all done, wait..." + n0 <- readTVarIO ncqTrackedFiles <&> HPSQ.size + atomically do + n <- readTVar ncqTrackedFiles <&> HPSQ.size + when (n == n0) STM.retry ContT $ bracket none $ const $ liftIO do fhh <- atomically (STM.flushTQueue closeQ)