mirror of https://github.com/voidlizard/hbs2
ncqCompact
This commit is contained in:
parent
25ceea216e
commit
52fc45d30c
|
@ -169,6 +169,7 @@ data NCQStorage =
|
||||||
, ncqFsyncNum :: TVar Int
|
, ncqFsyncNum :: TVar Int
|
||||||
, ncqFlushNow :: TVar [TQueue ()]
|
, ncqFlushNow :: TVar [TQueue ()]
|
||||||
, ncqMergeReq :: TVar Int
|
, ncqMergeReq :: TVar Int
|
||||||
|
, ncqCompactReq :: TVar Int
|
||||||
, ncqOpenDone :: TMVar Bool
|
, ncqOpenDone :: TMVar Bool
|
||||||
, ncqStopped :: TVar Bool
|
, ncqStopped :: TVar Bool
|
||||||
}
|
}
|
||||||
|
@ -417,9 +418,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
||||||
writer <- makeWriter indexQ
|
writer <- makeWriter indexQ
|
||||||
indexer <- makeIndexer writer indexQ
|
indexer <- makeIndexer writer indexQ
|
||||||
merge <- makeMerge
|
merge <- makeMerge
|
||||||
|
compact <- makeCompact
|
||||||
flagWatcher <- makeFlagWatcher
|
flagWatcher <- makeFlagWatcher
|
||||||
|
|
||||||
mapM_ waitCatch [writer,indexer,merge]
|
mapM_ waitCatch [writer,indexer,merge,compact]
|
||||||
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
|
-- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter]
|
||||||
mapM_ cancel [reader,flagWatcher]
|
mapM_ cancel [reader,flagWatcher]
|
||||||
|
|
||||||
|
@ -440,13 +442,15 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
||||||
|
|
||||||
makeFlagWatcher = do
|
makeFlagWatcher = do
|
||||||
let flags = ncqGetFileName ncq ".flags"
|
let flags = ncqGetFileName ncq ".flags"
|
||||||
let needIndexFlag = flags </> "index:now"
|
let needIndexFlag = flags </> "index:now"
|
||||||
let needMergeFlag = flags </> "merge:now"
|
let needMergeFlag = flags </> "merge:now"
|
||||||
|
let needCompactFlag = flags </> "compact:now"
|
||||||
|
|
||||||
ContT $ withAsync $ fix \again -> do
|
ContT $ withAsync $ fix \again -> do
|
||||||
pause @'Seconds 1
|
pause @'Seconds 1
|
||||||
needIndex <- doesPathExist needIndexFlag
|
needIndex <- doesPathExist needIndexFlag
|
||||||
needMerge <- doesPathExist needMergeFlag
|
needMerge <- doesPathExist needMergeFlag
|
||||||
|
needCompact <- doesPathExist needCompactFlag
|
||||||
|
|
||||||
when needIndex do
|
when needIndex do
|
||||||
rm needIndexFlag
|
rm needIndexFlag
|
||||||
|
@ -456,6 +460,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
||||||
rm needMergeFlag
|
rm needMergeFlag
|
||||||
ncqStorageMerge ncq
|
ncqStorageMerge ncq
|
||||||
|
|
||||||
|
when needCompact do
|
||||||
|
rm needCompactFlag
|
||||||
|
ncqStorageCompact ncq
|
||||||
|
|
||||||
|
|
||||||
again
|
again
|
||||||
|
|
||||||
makeReader = do
|
makeReader = do
|
||||||
|
@ -485,6 +494,32 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
|
||||||
link reader
|
link reader
|
||||||
pure reader
|
pure reader
|
||||||
|
|
||||||
|
makeCompact = do
|
||||||
|
|
||||||
|
me <- ContT $ withAsync $ untilStopped do
|
||||||
|
|
||||||
|
req <- atomically do
|
||||||
|
stop <- readTVar ncqStopped
|
||||||
|
req <- readTVar ncqCompactReq
|
||||||
|
|
||||||
|
if | stop -> pure 0
|
||||||
|
| req > 0 -> pure req
|
||||||
|
| otherwise -> STM.retry
|
||||||
|
|
||||||
|
when (req > 0) do
|
||||||
|
atomically $ writeTVar ncqCompactReq 0
|
||||||
|
debug $ "STARTED COMPACT" <+> pretty req
|
||||||
|
|
||||||
|
try @_ @SomeException (ncqCompact ncq) >>= \case
|
||||||
|
Right{} -> none
|
||||||
|
Left e -> do
|
||||||
|
err ("COMPACT ERROR:" <+> viaShow e)
|
||||||
|
pause @'Seconds 5
|
||||||
|
|
||||||
|
|
||||||
|
link me
|
||||||
|
pure me
|
||||||
|
|
||||||
makeMerge = do
|
makeMerge = do
|
||||||
me <- ContT $ withAsync $ untilStopped do
|
me <- ContT $ withAsync $ untilStopped do
|
||||||
micropause @'Seconds 10
|
micropause @'Seconds 10
|
||||||
|
@ -1217,6 +1252,7 @@ ncqStorageInit_ check path = do
|
||||||
ncqCurrentFd <- newTVarIO Nothing
|
ncqCurrentFd <- newTVarIO Nothing
|
||||||
ncqIndexed <- newTVarIO mempty
|
ncqIndexed <- newTVarIO mempty
|
||||||
ncqMergeReq <- newTVarIO 0
|
ncqMergeReq <- newTVarIO 0
|
||||||
|
ncqCompactReq <- newTVarIO 0
|
||||||
ncqFsyncNum <- newTVarIO 0
|
ncqFsyncNum <- newTVarIO 0
|
||||||
ncqLock <- newTVarIO ncqLock_
|
ncqLock <- newTVarIO ncqLock_
|
||||||
|
|
||||||
|
@ -1605,6 +1641,9 @@ ncqLinearScanForCompact ncq@NCQStorage{..} action = do
|
||||||
|
|
||||||
readTVarIO bodyCount
|
readTVarIO bodyCount
|
||||||
|
|
||||||
|
ncqStorageCompact :: MonadUnliftIO m => NCQStorage -> m ()
|
||||||
|
ncqStorageCompact NCQStorage{..} = do
|
||||||
|
atomically $ modifyTVar ncqCompactReq succ
|
||||||
|
|
||||||
ncqCompact :: MonadUnliftIO m => NCQStorage -> m ()
|
ncqCompact :: MonadUnliftIO m => NCQStorage -> m ()
|
||||||
ncqCompact ncq@NCQStorage{..} = do
|
ncqCompact ncq@NCQStorage{..} = do
|
||||||
|
@ -1622,20 +1661,17 @@ ncqCompact ncq@NCQStorage{..} = do
|
||||||
let fDataNameA = ncqGetDataFileName ncq fk
|
let fDataNameA = ncqGetDataFileName ncq fk
|
||||||
let fIndexNameA = ncqGetIndexFileName ncq fk
|
let fIndexNameA = ncqGetIndexFileName ncq fk
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
flip runContT pure do
|
||||||
|
|
||||||
|
|
||||||
mfile <- ncqGetNewCompactName ncq
|
mfile <- ncqGetNewCompactName ncq
|
||||||
|
|
||||||
ContT $ bracket none $ const do
|
ContT $ bracket none $ const do
|
||||||
rm mfile
|
rm mfile
|
||||||
|
|
||||||
liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
|
||||||
|
|
||||||
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
|
|
||||||
pure $ not $ HS.member k es
|
|
||||||
|
|
||||||
liftIO do
|
liftIO do
|
||||||
|
withBinaryFileAtomic mfile WriteMode $ \fwh -> do
|
||||||
|
writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do
|
||||||
|
pure $ not $ HS.member k es
|
||||||
|
|
||||||
result <- fileSize mfile
|
result <- fileSize mfile
|
||||||
|
|
||||||
|
@ -1660,3 +1696,6 @@ ncqCompact ncq@NCQStorage{..} = do
|
||||||
|
|
||||||
mapM_ rm [fDataNameA, fIndexNameA]
|
mapM_ rm [fDataNameA, fIndexNameA]
|
||||||
|
|
||||||
|
debug $ "compact done" <+> pretty (HM.size state0)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue