diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 299e50b4..3c158f1a 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -169,6 +169,7 @@ data NCQStorage = , ncqFsyncNum :: TVar Int , ncqFlushNow :: TVar [TQueue ()] , ncqMergeReq :: TVar Int + , ncqCompactReq :: TVar Int , ncqOpenDone :: TMVar Bool , ncqStopped :: TVar Bool } @@ -417,9 +418,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do writer <- makeWriter indexQ indexer <- makeIndexer writer indexQ merge <- makeMerge + compact <- makeCompact flagWatcher <- makeFlagWatcher - mapM_ waitCatch [writer,indexer,merge] + mapM_ waitCatch [writer,indexer,merge,compact] -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] mapM_ cancel [reader,flagWatcher] @@ -440,13 +442,15 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do makeFlagWatcher = do let flags = ncqGetFileName ncq ".flags" - let needIndexFlag = flags "index:now" - let needMergeFlag = flags "merge:now" + let needIndexFlag = flags "index:now" + let needMergeFlag = flags "merge:now" + let needCompactFlag = flags "compact:now" ContT $ withAsync $ fix \again -> do pause @'Seconds 1 needIndex <- doesPathExist needIndexFlag needMerge <- doesPathExist needMergeFlag + needCompact <- doesPathExist needCompactFlag when needIndex do rm needIndexFlag @@ -456,6 +460,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do rm needMergeFlag ncqStorageMerge ncq + when needCompact do + rm needCompactFlag + ncqStorageCompact ncq + + again makeReader = do @@ -485,6 +494,32 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link reader pure reader + makeCompact = do + + me <- ContT $ withAsync $ untilStopped do + + req <- atomically do + stop <- readTVar ncqStopped + req <- readTVar ncqCompactReq + + if | stop -> pure 0 + | req > 0 -> pure req + | otherwise -> STM.retry + + when (req > 0) do + atomically $ writeTVar ncqCompactReq 0 + debug $ "STARTED COMPACT" <+> pretty req + + try @_ @SomeException (ncqCompact ncq) >>= \case + Right{} -> none + Left e -> do + err ("COMPACT ERROR:" <+> viaShow e) + pause @'Seconds 5 + + + link me + pure me + makeMerge = do me <- ContT $ withAsync $ untilStopped do micropause @'Seconds 10 @@ -1217,6 +1252,7 @@ ncqStorageInit_ check path = do ncqCurrentFd <- newTVarIO Nothing ncqIndexed <- newTVarIO mempty ncqMergeReq <- newTVarIO 0 + ncqCompactReq <- newTVarIO 0 ncqFsyncNum <- newTVarIO 0 ncqLock <- newTVarIO ncqLock_ @@ -1605,6 +1641,9 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do readTVarIO bodyCount +ncqStorageCompact :: MonadUnliftIO m => NCQStorage -> m () +ncqStorageCompact NCQStorage{..} = do + atomically $ modifyTVar ncqCompactReq succ ncqCompact :: MonadUnliftIO m => NCQStorage -> m () ncqCompact ncq@NCQStorage{..} = do @@ -1622,20 +1661,17 @@ ncqCompact ncq@NCQStorage{..} = do let fDataNameA = ncqGetDataFileName ncq fk let fIndexNameA = ncqGetIndexFileName ncq fk - flip runContT pure $ callCC \exit -> do - + flip runContT pure do mfile <- ncqGetNewCompactName ncq ContT $ bracket none $ const do rm mfile - liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do - - writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do - pure $ not $ HS.member k es - liftIO do + withBinaryFileAtomic mfile WriteMode $ \fwh -> do + writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do + pure $ not $ HS.member k es result <- fileSize mfile @@ -1660,3 +1696,6 @@ ncqCompact ncq@NCQStorage{..} = do mapM_ rm [fDataNameA, fIndexNameA] + debug $ "compact done" <+> pretty (HM.size state0) + +