mirror of https://github.com/voidlizard/hbs2
471 lines
15 KiB
Haskell
471 lines
15 KiB
Haskell
{-# Language MultiWayIf #-}
|
||
module HBS2.Storage.NCQ3.Internal.Run where
|
||
|
||
import HBS2.Storage.NCQ.Types hiding (FileKey)
|
||
import HBS2.Storage.NCQ3.Internal.Prelude
|
||
import HBS2.Storage.NCQ3.Internal.Types
|
||
import HBS2.Storage.NCQ3.Internal.Files
|
||
import HBS2.Storage.NCQ3.Internal.Memtable
|
||
import HBS2.Storage.NCQ3.Internal.Index
|
||
import HBS2.Storage.NCQ3.Internal.State
|
||
import HBS2.Storage.NCQ3.Internal.Sweep
|
||
import HBS2.Storage.NCQ3.Internal.MMapCache
|
||
import HBS2.Storage.NCQ3.Internal.Fossil
|
||
import HBS2.Storage.NCQ3.Internal.Flags
|
||
import HBS2.Storage.NCQ3.Internal.Fsync
|
||
|
||
import Control.Concurrent.STM qualified as STM
|
||
import Control.Monad.Trans.Cont
|
||
import Control.Monad.Trans.Maybe
|
||
import Data.Either
|
||
import Data.Fixed
|
||
import Data.HashSet qualified as HS
|
||
import Data.HashMap.Strict qualified as HM
|
||
import Data.List qualified as List
|
||
import Data.Sequence qualified as Seq
|
||
import Data.Set qualified as Set
|
||
import Data.Vector qualified as V
|
||
import System.FileLock as FL
|
||
import System.Posix.Files qualified as PFS
|
||
import System.Posix.IO as PosixBase
|
||
import System.Posix.IO.ByteString as Posix
|
||
import System.Posix.Types as Posix
|
||
import System.Posix.Unistd
|
||
|
||
{- HLINT ignore "Eta reduce" -}
|
||
|
||
ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m ()
|
||
ncqStorageStop NCQStorage{..} = do
|
||
atomically $ writeTVar ncqStopReq True
|
||
|
||
|
||
ncqRemoveGarbage :: forall m. MonadIO m
|
||
=> NCQStorage
|
||
-> m ()
|
||
|
||
ncqRemoveGarbage me = do
|
||
let wd = ncqGetWorkDir me
|
||
let garb x = List.isSuffixOf ".part" x
|
||
|| List.isSuffixOf ".cq$" x
|
||
|| List.isSuffixOf ".merge" x
|
||
|
||
dirFiles wd <&> filter garb >>= mapM_ rm
|
||
|
||
ncqTryLoadState :: forall m. MonadUnliftIO m
|
||
=> NCQStorage
|
||
-> m ()
|
||
|
||
ncqTryLoadState me@NCQStorage{..} = do
|
||
|
||
debug "ncqTryLoadState"
|
||
|
||
stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" )
|
||
<&> List.sortOn ( Down . snd )
|
||
|
||
r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case
|
||
(r, s, []) -> pure (r,s,[])
|
||
(l, s0, (_,s):ss) -> do
|
||
|
||
readStateMay me s >>= \case
|
||
Nothing -> next (s : l, s0, ss)
|
||
Just ns -> do
|
||
ok <- checkState ns
|
||
debug $ "state status" <+> pretty s <+> pretty ok
|
||
if ok then
|
||
pure (l <> fmap snd ss, ns, ss)
|
||
else
|
||
next (s : l, s0, ss)
|
||
|
||
let (bad, new@NCQState{..}, rest) = r
|
||
|
||
atomically $ modifyTVar ncqState (<> new)
|
||
|
||
for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do
|
||
|
||
let path = ncqGetFileName me dataFile
|
||
realSize <- fileSize path
|
||
|
||
let sizewtf = realSize /= fromIntegral s
|
||
|
||
flip fix 0 $ \again i -> do
|
||
|
||
good <- try @_ @NCQFsckException (ncqFileFastCheck path)
|
||
|
||
let corrupted = isLeft good
|
||
|
||
if | not corrupted && realSize <= typicalFileTailRecordLen -> do
|
||
|
||
warn $ "skip indexing" <+> pretty realSize <+> pretty (takeFileName path)
|
||
|
||
| not corrupted -> do
|
||
|
||
debug $ "indexing" <+> pretty dataFile
|
||
void $ ncqIndexFile me Nothing dataFile
|
||
|
||
| otherwise -> do
|
||
|
||
o <- ncqFileTryRecover path
|
||
warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize)
|
||
|
||
let best = if i < 1 then max s o else s
|
||
|
||
warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path)
|
||
|
||
liftIO $ PFS.setFileSize path (fromIntegral best)
|
||
|
||
if i <= 1 then again (succ i) else none
|
||
|
||
|
||
for_ (bad <> fmap snd rest) $ \f -> do
|
||
let old = ncqGetFileName me (StateFile f)
|
||
rm old
|
||
|
||
where
|
||
|
||
-- TODO: created-but-not-indexed-file?
|
||
|
||
checkState NCQState{..} = flip runContT pure $ callCC \exit -> do
|
||
|
||
for_ ncqStateFiles $ \fk -> do
|
||
|
||
let dataFile = ncqGetFileName me (DataFile fk)
|
||
here <- doesFileExist dataFile
|
||
|
||
unless here $ exit False
|
||
|
||
-- lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case
|
||
-- Right () -> none
|
||
-- Left e -> do
|
||
-- warn (viaShow e)
|
||
-- let known = HM.lookup fk facts
|
||
-- fs <- fileSize dataFile
|
||
-- warn $ "file is incomplete (or damaged)"
|
||
-- <+> pretty dataFile
|
||
-- <+> "actual:" <+> pretty fs
|
||
-- <+> "known:" <+> pretty known
|
||
-- let ok = isJust known && Just (fromIntegral fs) >= known
|
||
-- unless ok $ exit False
|
||
|
||
for_ ncqStateIndex $ \(_,fk) -> do
|
||
|
||
let idxFile = ncqGetFileName me (IndexFile fk)
|
||
here <- doesFileExist idxFile
|
||
|
||
unless here do
|
||
err $ red "missed index in state" <+> pretty idxFile
|
||
exit False
|
||
|
||
pure True
|
||
|
||
|
||
ncqStorageRun :: forall m . MonadUnliftIO m
|
||
=> NCQStorage
|
||
-> m ()
|
||
ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
|
||
|
||
debug "ncqStorageRun"
|
||
|
||
liftIO (FL.tryLockFile (ncqGetFileName ncq ".lock") Exclusive)
|
||
>>= orThrow NCQStorageCurrentAlreadyOpen
|
||
>>= atomically . writeTVar ncqFileLock . Just
|
||
|
||
ContT $ bracket setAlive (const unsetAlive)
|
||
|
||
ContT $ bracket none $ const $ liftIO do
|
||
readTVarIO ncqFileLock >>= mapM_ FL.unlockFile
|
||
|
||
ContT $ bracket none $ const $ liftIO do
|
||
void $ ncqStateDump ncq
|
||
debug "storage done"
|
||
|
||
ncqRemoveGarbage ncq
|
||
|
||
liftIO (ncqTryLoadState ncq)
|
||
|
||
indexQ <- liftIO newTQueueIO
|
||
|
||
indexer <- spawnActivity $ liftIO $ fix \loop -> do
|
||
what <- atomically do
|
||
tryReadTQueue indexQ >>= \case
|
||
Just e -> pure $ Just e
|
||
Nothing -> do
|
||
stop <- readTVar ncqStopReq
|
||
if not stop then STM.retry else pure Nothing
|
||
|
||
maybe1 what none $ \(fk :: FileKey) -> do
|
||
ncqIndexFile ncq Nothing (DataFile fk)
|
||
dropReplaces fk
|
||
atomically $ modifyTVar ncqCurrentFossils (HS.delete fk)
|
||
loop
|
||
|
||
let shLast = V.length ncqWriteOps - 1
|
||
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
|
||
let q = ncqWriteOps ! i
|
||
forever (liftIO $ join $ atomically (readTQueue q))
|
||
|
||
replicateM_ ncqReadThreads $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do
|
||
|
||
(h, answ) <- atomically $ readTQueue ncqReadReq
|
||
let answer l = atomically (putTMVar answ l)
|
||
|
||
-- debug $ "REQ" <+> pretty h
|
||
|
||
atomically (ncqLookupEntrySTM ncq h) >>= \case
|
||
Nothing -> none
|
||
Just (_, EntryHere bs) -> answer (Just (InMemory bs)) >> exit ()
|
||
Just (_, EntryThere loc) -> answer (Just $ InFossil loc) >> exit ()
|
||
|
||
ContT $ ncqWithState ncq
|
||
|
||
-- debug $ "REQ IN STATE" <+> pretty h
|
||
|
||
NCQState{..} <- readTVarIO ncqState
|
||
|
||
for_ ncqStateIndex $ \(_, fk) -> do
|
||
-- debug $ "SCAN FUCKING INDEX" <+> pretty fk
|
||
CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk
|
||
lift (ncqLookupIndex h (bs, nw)) >>= \case
|
||
Just (IndexEntry fk o s) -> answer (Just (InFossil (FileLocation fk o s))) >> exit ()
|
||
Nothing -> none
|
||
|
||
-- debug $ "NOT FOUND SHIT" <+> pretty h
|
||
answer Nothing >> exit ()
|
||
|
||
spawnActivity measureWPS
|
||
|
||
spawnActivity (ncqStateUpdateLoop ncq)
|
||
|
||
spawnActivity $ forever do
|
||
pause @'Seconds 30
|
||
ema <- readTVarIO ncqWriteEMA
|
||
debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema)
|
||
|
||
spawnActivity $ postponed ncqPostponeService $ forever do
|
||
ncqRemoveEmptyFossils ncq
|
||
ncqSweepObsoleteStates ncq
|
||
ncqSweepFiles ncq
|
||
void $ race (pause @'Seconds ncqSweepTime) do
|
||
atomically (ncqWaitFlagSTM ncqSweepReq)
|
||
|
||
spawnActivity $ postponed ncqPostponeService
|
||
$ compactLoop ncqMergeReq ncqMergeTimeA ncqMergeTimeB $ withSem ncqServiceSem do
|
||
ncqFossilMergeStep ncq
|
||
|
||
spawnActivity $ postponed ncqPostponeService
|
||
$ compactLoop ncqCompactReq ncqCompactTimeA ncqCompactTimeB $ withSem ncqServiceSem do
|
||
ncqIndexCompactStep ncq
|
||
|
||
flip fix RunNew $ \loop s -> do
|
||
-- debug $ viaShow s
|
||
case s of
|
||
RunFin mfh -> do
|
||
liftIO $ for_ mfh closeFd
|
||
rest <- readTVarIO ncqWriteQ <&> Seq.length
|
||
debug $ "exit storage" <+> pretty rest
|
||
atomically $ pollSTM indexer >>= maybe STM.retry (const none)
|
||
|
||
RunNew -> do
|
||
alive <- readTVarIO ncqAlive
|
||
empty <- readTVarIO ncqWriteQ <&> Seq.null
|
||
if not alive && empty
|
||
then loop (RunFin Nothing)
|
||
else do
|
||
(fk, fhx) <- openNewDataFile
|
||
loop $ RunWrite (fk, fhx, 0, 0)
|
||
|
||
|
||
RunSync (fk, fh, w, total, continue) -> do
|
||
|
||
(stop,sync) <- atomically do
|
||
(,) <$> readTVar ncqStopReq
|
||
<*> readTVar ncqSyncReq
|
||
-- <*> readTVar ncqWriteEMA
|
||
|
||
let needClose = total >= ncqMinLog || stop
|
||
|
||
rest <- if not (sync || needClose || w > ncqFsync) then
|
||
pure w
|
||
else do
|
||
|
||
ss <- appendTailSection fh
|
||
liftIO (fileSynchronisePortable fh)
|
||
flushReplaces fk
|
||
|
||
ncqStateUpdate ncq do
|
||
ncqStateAddFact (P (PData (DataFile fk) ss))
|
||
|
||
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
|
||
|
||
-- atomically $ ncqDeferredWriteOpSTM ncq do
|
||
|
||
atomically do
|
||
writeTVar ncqSyncReq False
|
||
modifyTVar ncqSyncNo succ
|
||
|
||
pure 0
|
||
|
||
if | needClose && continue -> do
|
||
liftIO $ closeFd fh
|
||
flushReplaces fk
|
||
debug $ "closeFd" <+> viaShow fh
|
||
atomically $ writeTQueue indexQ fk
|
||
loop RunNew
|
||
|
||
| not continue -> loop (RunFin (Just fh))
|
||
|
||
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
|
||
|
||
|
||
RunWrite (fk, fh, w, total') -> do
|
||
|
||
let timeoutMicro = 10_000_000
|
||
|
||
chunk <- liftIO $ timeout timeoutMicro $ atomically do
|
||
stop <- readTVar ncqStopReq
|
||
sy <- readTVar ncqSyncReq
|
||
|
||
chunk <- if not stop then
|
||
stateTVar ncqWriteQ (Seq.splitAt 1)
|
||
else do
|
||
r <- readTVar ncqWriteQ
|
||
modifyTVar ncqWriteQ mempty
|
||
pure r
|
||
|
||
if | Seq.null chunk && stop -> pure $ Left ()
|
||
| Seq.null chunk && not (stop || sy) -> STM.retry
|
||
| otherwise -> pure $ Right chunk
|
||
|
||
stop <- readTVarIO ncqStopReq
|
||
|
||
case chunk of
|
||
Nothing -> do
|
||
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
|
||
stop <- readTVarIO ncqStopReq
|
||
if w == 0 && not stop then do
|
||
loop $ RunWrite (fk,fh,w,total')
|
||
else do
|
||
atomically $ writeTVar ncqSyncReq True
|
||
loop $ RunSync (fk, fh, w, total', not stop) -- exit ()
|
||
|
||
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
|
||
|
||
Just (Right chu) -> do
|
||
ws <- for chu $ \h -> do
|
||
atomically (ncqLookupEntrySTM ncq h) >>= \case
|
||
Just (NCQEntry w, EntryHere bs) -> do
|
||
off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0)
|
||
n <- lift (appendSection fh bs)
|
||
|
||
let op = do
|
||
readTVar w >>= \case
|
||
EntryHere bs1 | bs1 == bs -> do
|
||
writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))
|
||
_ -> none
|
||
|
||
atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op])
|
||
pure n
|
||
|
||
_ -> pure 0
|
||
|
||
let written = sum ws
|
||
loop $ RunSync (fk, fh, w + written, total' + written, not stop)
|
||
|
||
mapM_ wait [indexer]
|
||
|
||
where
|
||
setAlive = atomically $ writeTVar ncqAlive True
|
||
unsetAlive = atomically $ writeTVar ncqAlive False
|
||
|
||
dropReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 ()
|
||
dropReplaces fk = atomically do
|
||
modifyTVar ncqReplQueue (HM.delete fk)
|
||
|
||
flushReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 ()
|
||
flushReplaces fk = do
|
||
atomically do
|
||
ncqDelCachedDataSTM ncq fk
|
||
ops <- readTVar ncqReplQueue <&> fromMaybe mempty . HM.lookup fk
|
||
modifyTVar ncqReplQueue (HM.delete fk)
|
||
sequence_ ops
|
||
|
||
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
|
||
openNewDataFile = do
|
||
fk <- ncqGetNewFileKey ncq DataFile
|
||
|
||
atomically $ modifyTVar ncqCurrentFossils (HS.insert fk)
|
||
|
||
ncqStateUpdate ncq do
|
||
ncqStateAddFact (P (PData (DataFile fk) 0))
|
||
-- FIXM: asap-remove-this
|
||
-- это неправильно. из-за этого файл
|
||
-- болтается в current и мешает при мерже
|
||
-- хотя это еще не настоящий файл (до индексации).
|
||
-- почему мы вообще его сюда засунули?
|
||
-- ncqStateAddDataFile fk
|
||
|
||
let fname = ncqGetFileName ncq (DataFile fk)
|
||
-- touch fname
|
||
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
|
||
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
|
||
|
||
spawnActivity m = do
|
||
a <- ContT $ withAsync m
|
||
link a
|
||
pure a
|
||
|
||
measureWPS :: m ()
|
||
measureWPS = void $ flip fix Nothing \loop -> \case
|
||
Nothing -> do
|
||
w <- readTVarIO ncqWrites
|
||
t <- getTimeCoarse
|
||
pause @'Seconds step >> loop (Just (w,t))
|
||
|
||
Just (w0,t0) -> do
|
||
w1 <- readTVarIO ncqWrites
|
||
t1 <- getTimeCoarse
|
||
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
|
||
dw = fromIntegral (w1 - w0)
|
||
atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
|
||
pause @'Seconds step >> loop (Just (w1,t1))
|
||
|
||
where
|
||
alpha = 0.1
|
||
step = 1.00
|
||
|
||
postponed n m = liftIO (pause @'Seconds n) >> m
|
||
|
||
compactLoop :: TVar Bool
|
||
-> Timeout 'Seconds
|
||
-> Timeout 'Seconds
|
||
-> m Bool
|
||
-> m ()
|
||
compactLoop flag t1 t2 what = forever $ void $ runMaybeT do
|
||
ema <- readTVarIO ncqWriteEMA
|
||
fired <- ncqGetFlag flag
|
||
|
||
when (ema > ncqIdleThrsh && not fired) $ pause @'Seconds t1 >> mzero
|
||
|
||
ncqClearFlag flag
|
||
compacted <- lift what
|
||
|
||
when compacted do
|
||
ncqSetFlag ncqSweepReq
|
||
mzero
|
||
|
||
k0 <- readTVarIO ncqStateKey
|
||
void $ lift $ race (pause @'Seconds t2) do
|
||
flip fix k0 $ \waitState k1 -> do
|
||
pause @'Seconds t2
|
||
k2 <- readTVarIO ncqStateKey
|
||
when (k2 == k1) $ waitState k2
|
||
|
||
|
||
data RunSt =
|
||
RunNew
|
||
| RunWrite (FileKey, Fd, Int, Int)
|
||
| RunSync (FileKey, Fd, Int, Int, Bool)
|
||
| RunFin (Maybe Fd)
|
||
deriving stock Show
|
||
|
||
|