mirror of https://github.com/voidlizard/hbs2
some kludges? in order file gc to work faster
This commit is contained in:
parent
e36cb783c4
commit
91a2563134
|
@ -278,3 +278,4 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) =>
|
||||||
|
|
||||||
pure $ mkList r
|
pure $ mkList r
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ ncqStorageOpen fp upd = do
|
||||||
ncqSweepReq <- newTVarIO False
|
ncqSweepReq <- newTVarIO False
|
||||||
ncqMergeReq <- newTVarIO False
|
ncqMergeReq <- newTVarIO False
|
||||||
ncqCompactReq <- newTVarIO False
|
ncqCompactReq <- newTVarIO False
|
||||||
|
ncqStateDumpReq <- newTVarIO False
|
||||||
ncqOnRunWriteIdle <- newTVarIO none
|
ncqOnRunWriteIdle <- newTVarIO none
|
||||||
ncqSyncNo <- newTVarIO 0
|
ncqSyncNo <- newTVarIO 0
|
||||||
ncqState <- newTVarIO mempty
|
ncqState <- newTVarIO mempty
|
||||||
|
|
|
@ -257,6 +257,9 @@ fileTailRecord w = do
|
||||||
ncqMakeSectionBS (Just M) h paylo
|
ncqMakeSectionBS (Just M) h paylo
|
||||||
{-# INLINE fileTailRecord #-}
|
{-# INLINE fileTailRecord #-}
|
||||||
|
|
||||||
|
typicalFileTailRecordLen :: Integral a => a
|
||||||
|
typicalFileTailRecordLen = fromIntegral (BS.length (fileTailRecord @Integer 0))
|
||||||
|
|
||||||
appendSection :: forall m . MonadUnliftIO m
|
appendSection :: forall m . MonadUnliftIO m
|
||||||
=> Fd
|
=> Fd
|
||||||
-> ByteString
|
-> ByteString
|
||||||
|
|
|
@ -5,6 +5,7 @@ import HBS2.Storage.NCQ3.Internal.Types
|
||||||
import HBS2.Storage.NCQ3.Internal.State
|
import HBS2.Storage.NCQ3.Internal.State
|
||||||
import HBS2.Storage.NCQ3.Internal.Memtable
|
import HBS2.Storage.NCQ3.Internal.Memtable
|
||||||
import HBS2.Storage.NCQ3.Internal.Files
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
import HBS2.Storage.NCQ3.Internal.Flags
|
||||||
|
|
||||||
import System.Posix.Files qualified as PFS
|
import System.Posix.Files qualified as PFS
|
||||||
import Streaming.Prelude qualified as S
|
import Streaming.Prelude qualified as S
|
||||||
|
@ -158,7 +159,7 @@ ncqIndexCompactStep :: MonadUnliftIO m
|
||||||
-> m Bool
|
-> m Bool
|
||||||
ncqIndexCompactStep me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do
|
ncqIndexCompactStep me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
debug "ncqIndexCompactStep"
|
debug $ red "ncqIndexCompactStep"
|
||||||
|
|
||||||
idx <- readTVarIO ncqState
|
idx <- readTVarIO ncqState
|
||||||
<&> fmap (IndexFile . snd) . ncqStateIndex
|
<&> fmap (IndexFile . snd) . ncqStateIndex
|
||||||
|
@ -210,6 +211,10 @@ ncqIndexCompactStep me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do
|
||||||
ncqStateDelIndexFile (coerce a)
|
ncqStateDelIndexFile (coerce a)
|
||||||
ncqStateDelIndexFile (coerce b)
|
ncqStateDelIndexFile (coerce b)
|
||||||
|
|
||||||
|
-- FIXME: crutch
|
||||||
|
-- костыль!
|
||||||
|
ncqSetFlag ncqStateDumpReq
|
||||||
|
|
||||||
pure True
|
pure True
|
||||||
|
|
||||||
ncqStorageScanDataFile :: MonadIO m
|
ncqStorageScanDataFile :: MonadIO m
|
||||||
|
|
|
@ -93,21 +93,27 @@ ncqTryLoadState me@NCQStorage{..} = do
|
||||||
|
|
||||||
let corrupted = isLeft good
|
let corrupted = isLeft good
|
||||||
|
|
||||||
if not corrupted then do
|
if | not corrupted && realSize <= typicalFileTailRecordLen -> do
|
||||||
debug $ yellow "indexing" <+> pretty dataFile
|
|
||||||
ncqIndexFile me Nothing dataFile
|
|
||||||
else do
|
|
||||||
|
|
||||||
o <- ncqFileTryRecover path
|
warn $ "skip indexing" <+> pretty realSize <+> pretty (takeFileName path)
|
||||||
warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize)
|
|
||||||
|
|
||||||
let best = if i < 1 then max s o else s
|
| not corrupted -> do
|
||||||
|
|
||||||
warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path)
|
debug $ "indexing" <+> pretty dataFile
|
||||||
|
void $ ncqIndexFile me Nothing dataFile
|
||||||
|
|
||||||
liftIO $ PFS.setFileSize path (fromIntegral best)
|
| otherwise -> do
|
||||||
|
|
||||||
if i <= 1 then again (succ i) else pure Nothing
|
o <- ncqFileTryRecover path
|
||||||
|
warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize)
|
||||||
|
|
||||||
|
let best = if i < 1 then max s o else s
|
||||||
|
|
||||||
|
warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path)
|
||||||
|
|
||||||
|
liftIO $ PFS.setFileSize path (fromIntegral best)
|
||||||
|
|
||||||
|
if i <= 1 then again (succ i) else none
|
||||||
|
|
||||||
|
|
||||||
for_ (bad <> fmap snd rest) $ \f -> do
|
for_ (bad <> fmap snd rest) $ \f -> do
|
||||||
|
@ -235,6 +241,7 @@ ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||||||
debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema)
|
debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema)
|
||||||
|
|
||||||
spawnActivity $ postponed ncqPostponeService $ forever do
|
spawnActivity $ postponed ncqPostponeService $ forever do
|
||||||
|
ncqRemoveEmptyFossils ncq
|
||||||
ncqSweepObsoleteStates ncq
|
ncqSweepObsoleteStates ncq
|
||||||
ncqSweepFiles ncq
|
ncqSweepFiles ncq
|
||||||
void $ race (pause @'Seconds ncqSweepTime) do
|
void $ race (pause @'Seconds ncqSweepTime) do
|
||||||
|
|
|
@ -4,6 +4,7 @@ module HBS2.Storage.NCQ3.Internal.State where
|
||||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||||
import HBS2.Storage.NCQ3.Internal.Types
|
import HBS2.Storage.NCQ3.Internal.Types
|
||||||
import HBS2.Storage.NCQ3.Internal.Files
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
import HBS2.Storage.NCQ3.Internal.Flags
|
||||||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||||||
|
|
||||||
import Data.Config.Suckless.Script
|
import Data.Config.Suckless.Script
|
||||||
|
@ -34,8 +35,15 @@ ncqStateDump ncq@NCQStorage{..} = do
|
||||||
state <- readTVarIO ncqState
|
state <- readTVarIO ncqState
|
||||||
key <- ncqGetNewFileKey ncq StateFile
|
key <- ncqGetNewFileKey ncq StateFile
|
||||||
let snkFile = ncqGetFileName ncq (StateFile key)
|
let snkFile = ncqGetFileName ncq (StateFile key)
|
||||||
|
|
||||||
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
liftIO $ withBinaryFileDurableAtomic snkFile WriteMode $ \fh -> do
|
||||||
IO.hPrint fh (pretty state)
|
IO.hPrint fh (pretty state)
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
writeTVar ncqStateKey key
|
||||||
|
ncqClearFlagSTM ncqStateDumpReq
|
||||||
|
|
||||||
|
debug $ yellow "ncqStateDump" <+> pretty key <+> pretty (toFileName (StateFile key))
|
||||||
pure key
|
pure key
|
||||||
|
|
||||||
ncqStateUpdateLoop :: MonadIO m
|
ncqStateUpdateLoop :: MonadIO m
|
||||||
|
@ -46,18 +54,19 @@ ncqStateUpdateLoop ncq@NCQStorage{..} = do
|
||||||
|
|
||||||
debug $ red "ncqStateUpdateLoop"
|
debug $ red "ncqStateUpdateLoop"
|
||||||
|
|
||||||
|
|
||||||
sInit <- readTVarIO ncqState
|
sInit <- readTVarIO ncqState
|
||||||
|
|
||||||
flip fix sInit $ \next s0 -> do
|
flip fix sInit $ \next s0 -> do
|
||||||
state <- atomically do
|
state <- atomically do
|
||||||
s1 <- readTVar ncqState
|
s1 <- readTVar ncqState
|
||||||
stop <- readTVar ncqStopReq
|
stop <- readTVar ncqStopReq
|
||||||
if s1 == s0 && not stop then STM.retry else pure s1
|
dump <- readTVar ncqStateDumpReq
|
||||||
|
if s1 == s0 && not stop && not dump then STM.retry else pure s1
|
||||||
|
|
||||||
key <- ncqStateDump ncq
|
key <- ncqStateDump ncq
|
||||||
|
|
||||||
done <- atomically do
|
done <- atomically do
|
||||||
writeTVar ncqStateKey key
|
|
||||||
modifyTVar ncqWrites succ
|
modifyTVar ncqWrites succ
|
||||||
readTVar ncqStopReq
|
readTVar ncqStopReq
|
||||||
|
|
||||||
|
@ -71,7 +80,12 @@ ncqStateUpdate :: MonadIO m
|
||||||
|
|
||||||
ncqStateUpdate ncq@NCQStorage{..} action = do
|
ncqStateUpdate ncq@NCQStorage{..} action = do
|
||||||
atomically do
|
atomically do
|
||||||
|
s0 <- readTVar ncqState
|
||||||
void $ runReaderT (fromStateOp action) ncq
|
void $ runReaderT (fromStateOp action) ncq
|
||||||
|
s1 <- readTVar ncqState
|
||||||
|
when (s0 /= s1) do
|
||||||
|
modifyTVar ncqState (over #ncqStateVersion succ)
|
||||||
|
ncqSetFlagSTM ncqStateDumpReq
|
||||||
|
|
||||||
ncqStateAddDataFile :: FileKey -> StateOP ()
|
ncqStateAddDataFile :: FileKey -> StateOP ()
|
||||||
ncqStateAddDataFile fk = do
|
ncqStateAddDataFile fk = do
|
||||||
|
|
|
@ -4,6 +4,7 @@ module HBS2.Storage.NCQ3.Internal.Sweep where
|
||||||
import HBS2.Storage.NCQ3.Internal.Prelude
|
import HBS2.Storage.NCQ3.Internal.Prelude
|
||||||
import HBS2.Storage.NCQ3.Internal.Types
|
import HBS2.Storage.NCQ3.Internal.Types
|
||||||
import HBS2.Storage.NCQ3.Internal.Files
|
import HBS2.Storage.NCQ3.Internal.Files
|
||||||
|
import HBS2.Storage.NCQ3.Internal.Fossil
|
||||||
import HBS2.Storage.NCQ3.Internal.State
|
import HBS2.Storage.NCQ3.Internal.State
|
||||||
|
|
||||||
import Control.Monad.Trans.Cont
|
import Control.Monad.Trans.Cont
|
||||||
|
@ -13,6 +14,7 @@ import Data.HashMap.Strict qualified as HM
|
||||||
import Data.HashSet qualified as HS
|
import Data.HashSet qualified as HS
|
||||||
import Data.List qualified as List
|
import Data.List qualified as List
|
||||||
import System.Posix.Files qualified as PFS
|
import System.Posix.Files qualified as PFS
|
||||||
|
import Streaming.Prelude qualified as S
|
||||||
|
|
||||||
ncqLiveKeysSTM :: NCQStorage -> STM (HashSet FileKey)
|
ncqLiveKeysSTM :: NCQStorage -> STM (HashSet FileKey)
|
||||||
ncqLiveKeysSTM NCQStorage{..} = do
|
ncqLiveKeysSTM NCQStorage{..} = do
|
||||||
|
@ -28,6 +30,34 @@ ncqLiveKeys ncq = atomically $ ncqLiveKeysSTM ncq
|
||||||
|
|
||||||
{- HLINT ignore "Functor law"-}
|
{- HLINT ignore "Functor law"-}
|
||||||
|
|
||||||
|
ncqRemoveEmptyFossils :: forall m . MonadUnliftIO m => NCQStorage -> m ()
|
||||||
|
ncqRemoveEmptyFossils me@NCQStorage{..} = flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
|
s@NCQState{..} <- readTVarIO ncqState
|
||||||
|
debug $ red "CURRENT STATE" <+> pretty ncqStateVersion <> line <> pretty s
|
||||||
|
|
||||||
|
check <- atomically do
|
||||||
|
NCQState{..} <- readTVar ncqState
|
||||||
|
current <- readTVar ncqCurrentFossils
|
||||||
|
let fks = HS.fromList [ coerce fk | P (PData fk _) <- universeBi ncqStateFacts ]
|
||||||
|
pure $ HS.toList (fks `HS.difference` current)
|
||||||
|
|
||||||
|
current <- readTVarIO ncqCurrentFossils
|
||||||
|
debug $ "ncqRemoveEmptyFossils" <+> pretty (HS.toList current) <+> pretty check
|
||||||
|
|
||||||
|
loosers <- S.toList_ $ for_ check $ \fk -> do
|
||||||
|
let path = ncqGetFileName me (toFileName (DataFile fk))
|
||||||
|
s <- fileSize path
|
||||||
|
when (s <= typicalFileTailRecordLen) $ S.yield fk
|
||||||
|
|
||||||
|
debug $ "ncqRemoveEmptyFossils" <+> pretty loosers
|
||||||
|
|
||||||
|
when (List.null loosers) $ exit ()
|
||||||
|
|
||||||
|
ncqStateUpdate me $ for_ loosers $ \fk -> do
|
||||||
|
ncqStateDelDataFile fk
|
||||||
|
ncqStateDelFact (P (PData (DataFile fk) 0))
|
||||||
|
|
||||||
ncqSweepFiles :: forall m . MonadUnliftIO m => NCQStorage -> m ()
|
ncqSweepFiles :: forall m . MonadUnliftIO m => NCQStorage -> m ()
|
||||||
ncqSweepFiles me@NCQStorage{..} = do
|
ncqSweepFiles me@NCQStorage{..} = do
|
||||||
|
|
||||||
|
|
|
@ -120,6 +120,7 @@ data NCQStorage =
|
||||||
, ncqSyncReq :: TVar Bool
|
, ncqSyncReq :: TVar Bool
|
||||||
, ncqSweepReq :: TVar Bool
|
, ncqSweepReq :: TVar Bool
|
||||||
, ncqMergeReq :: TVar Bool
|
, ncqMergeReq :: TVar Bool
|
||||||
|
, ncqStateDumpReq :: TVar Bool
|
||||||
, ncqCompactReq :: TVar Bool
|
, ncqCompactReq :: TVar Bool
|
||||||
, ncqOnRunWriteIdle :: TVar (IO ())
|
, ncqOnRunWriteIdle :: TVar (IO ())
|
||||||
, ncqSyncNo :: TVar Int
|
, ncqSyncNo :: TVar Int
|
||||||
|
|
Loading…
Reference in New Issue