hbs2/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs

412 lines
12 KiB
Haskell

{-# Language MultiWayIf #-}
module HBS2.Storage.NCQ3.Internal.Run where
import HBS2.Storage.NCQ.Types hiding (FileKey)
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.Files
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Index
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Sweep
import HBS2.Storage.NCQ3.Internal.MMapCache
import HBS2.Storage.NCQ3.Internal.Fossil
import HBS2.Storage.NCQ3.Internal.Flags
import HBS2.Storage.NCQ3.Internal.Fsync
import Control.Concurrent.STM qualified as STM
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.Either
import Data.Fixed
import Data.HashSet qualified as HS
import Data.HashMap.Strict qualified as HM
import Data.List qualified as List
import Data.Sequence qualified as Seq
import Data.Set qualified as Set
import Data.Vector qualified as V
import System.FileLock as FL
import System.Posix.Files qualified as PFS
import System.Posix.IO as PosixBase
import System.Posix.IO.ByteString as Posix
import System.Posix.Types as Posix
import System.Posix.Unistd
{- HLINT ignore "Eta reduce" -}
ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m ()
ncqStorageStop NCQStorage{..} = do
atomically $ writeTVar ncqStopReq True
ncqRemoveGarbage :: forall m. MonadIO m
=> NCQStorage
-> m ()
ncqRemoveGarbage me = do
let wd = ncqGetWorkDir me
let garb x = List.isSuffixOf ".part" x
|| List.isSuffixOf ".cq$" x
|| List.isSuffixOf ".merge" x
dirFiles wd <&> filter garb >>= mapM_ rm
ncqTryLoadState :: forall m. MonadUnliftIO m
=> NCQStorage
-> m ()
ncqTryLoadState me@NCQStorage{..} = do
debug "ncqTryLoadState"
stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" )
<&> List.sortOn ( Down . snd )
r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case
(r, s, []) -> pure (r,s,[])
(l, s0, (_,s):ss) -> do
readStateMay me s >>= \case
Nothing -> next (s : l, s0, ss)
Just ns -> do
ok <- checkState ns
debug $ "state status" <+> pretty s <+> pretty ok
if ok then
pure (l <> fmap snd ss, ns, ss)
else
next (s : l, s0, ss)
let (bad, new@NCQState{..}, rest) = r
atomically $ modifyTVar ncqState (<> new)
for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do
let path = ncqGetFileName me dataFile
realSize <- fileSize path
let sizewtf = realSize /= fromIntegral s
flip fix 0 $ \again i -> do
good <- try @_ @NCQFsckException (ncqFileFastCheck path)
let corrupted = isLeft good
if not corrupted then do
debug $ yellow "indexing" <+> pretty dataFile
ncqIndexFile me Nothing dataFile
else do
o <- ncqFileTryRecover path
warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize)
let best = if i < 1 then max s o else s
warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path)
liftIO $ PFS.setFileSize path (fromIntegral best)
if i <= 1 then again (succ i) else pure Nothing
for_ (bad <> fmap snd rest) $ \f -> do
let old = ncqGetFileName me (StateFile f)
rm old
where
-- TODO: created-but-not-indexed-file?
checkState NCQState{..} = flip runContT pure $ callCC \exit -> do
for_ ncqStateFiles $ \fk -> do
let dataFile = ncqGetFileName me (DataFile fk)
here <- doesFileExist dataFile
unless here $ exit False
-- lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case
-- Right () -> none
-- Left e -> do
-- warn (viaShow e)
-- let known = HM.lookup fk facts
-- fs <- fileSize dataFile
-- warn $ "file is incomplete (or damaged)"
-- <+> pretty dataFile
-- <+> "actual:" <+> pretty fs
-- <+> "known:" <+> pretty known
-- let ok = isJust known && Just (fromIntegral fs) >= known
-- unless ok $ exit False
for_ ncqStateIndex $ \(_,fk) -> do
let idxFile = ncqGetFileName me (IndexFile fk)
here <- doesFileExist idxFile
unless here do
err $ red "missed index in state" <+> pretty idxFile
exit False
pure True
ncqStorageRun :: forall m . MonadUnliftIO m
=> NCQStorage
-> m ()
ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do
debug "ncqStorageRun"
liftIO (FL.tryLockFile (ncqGetFileName ncq ".lock") Exclusive)
>>= orThrow NCQStorageCurrentAlreadyOpen
>>= atomically . writeTVar ncqFileLock . Just
ContT $ bracket setAlive (const unsetAlive)
ContT $ bracket none $ const $ liftIO do
readTVarIO ncqFileLock >>= mapM_ FL.unlockFile
ContT $ bracket none $ const $ liftIO do
debug "storage done"
ncqRemoveGarbage ncq
liftIO (ncqTryLoadState ncq)
closeQ <- liftIO newTQueueIO
closer <- spawnActivity $ liftIO $ fix \loop -> do
what <- atomically do
tryReadTQueue closeQ >>= \case
Just e -> pure $ Just e
Nothing -> do
stop <- readTVar ncqStopReq
if not stop then STM.retry else pure Nothing
maybe1 what none $ \(fk :: FileKey) -> do
ncqIndexFile ncq Nothing (DataFile fk)
atomically $ modifyTVar ncqCurrentFossils (HS.delete fk)
loop
let shLast = V.length ncqWriteOps - 1
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
let q = ncqWriteOps ! i
forever (liftIO $ join $ atomically (readTQueue q))
replicateM_ ncqReadThreads $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do
(h, answ) <- atomically $ readTQueue ncqReadReq
let answer l = atomically (putTMVar answ l)
-- debug $ "REQ" <+> pretty h
atomically (ncqLookupEntrySTM ncq h) >>= \case
Nothing -> none
Just (_, EntryHere bs) -> answer (Just (InMemory bs)) >> exit ()
Just (_, EntryThere loc) -> answer (Just $ InFossil loc) >> exit ()
ContT $ ncqWithState ncq
NCQState{..} <- readTVarIO ncqState
for_ ncqStateIndex $ \(_, fk) -> do
CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk
lift (ncqLookupIndex h (bs, nw)) >>= \case
Just (IndexEntry fk o s) -> answer (Just (InFossil (FileLocation fk o s))) >> exit ()
Nothing -> none
-- debug $ "NOT FOUND SHIT" <+> pretty h
answer Nothing >> exit ()
spawnActivity measureWPS
spawnActivity (ncqStateUpdateLoop ncq)
spawnActivity $ forever do
pause @'Seconds 30
ema <- readTVarIO ncqWriteEMA
debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema)
spawnActivity $ postponed ncqPostponeService $ forever do
ncqSweepObsoleteStates ncq
ncqSweepFiles ncq
void $ race (pause @'Seconds ncqSweepTime) do
atomically (ncqWaitFlagSTM ncqSweepReq)
spawnActivity $ postponed ncqPostponeService
$ compactLoop ncqMergeReq ncqMergeTimeA ncqMergeTimeB $ withSem ncqServiceSem do
a <- ncqFossilMergeStep ncq
b <- ncqIndexCompactStep ncq
pure $ a || b
flip fix RunNew $ \loop -> \case
RunFin -> do
debug "exit storage"
atomically $ pollSTM closer >>= maybe STM.retry (const none)
RunNew -> do
alive <- readTVarIO ncqAlive
empty <- readTVarIO ncqWriteQ <&> Seq.null
if not alive && empty
then loop RunFin
else do
(fk, fhx) <- openNewDataFile
loop $ RunWrite (fk, fhx, 0, 0)
RunSync (fk, fh, w, total, continue) -> do
(stop,sync) <- atomically do
(,) <$> readTVar ncqStopReq
<*> readTVar ncqSyncReq
-- <*> readTVar ncqWriteEMA
let needClose = total >= ncqMinLog || stop
rest <- if not (sync || needClose || w > ncqFsync) then
pure w
else do
ss <- appendTailSection fh
liftIO (fileSynchronisePortable fh)
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
-- atomically $ ncqDeferredWriteOpSTM ncq do
ncqStateUpdate ncq do
ncqStateAddFact (P (PData (DataFile fk) ss))
atomically do
writeTVar ncqSyncReq False
modifyTVar ncqSyncNo succ
pure 0
if | needClose && continue -> do
liftIO $ closeFd fh
atomically $ writeTQueue closeQ fk
loop RunNew
| not continue -> loop RunFin
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
RunWrite (fk, fh, w, total') -> do
let timeoutMicro = 10_000_000
chunk <- liftIO $ timeout timeoutMicro $ atomically do
stop <- readTVar ncqStopReq
sy <- readTVar ncqSyncReq
chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
if | Seq.null chunk && stop -> pure $ Left ()
| Seq.null chunk && not (stop || sy) -> STM.retry
| otherwise -> pure $ Right chunk
case chunk of
Nothing -> do
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
if w == 0 then do
loop $ RunWrite (fk,fh,w,total')
else do
atomically $ writeTVar ncqSyncReq True
loop $ RunSync (fk, fh, w, total', True) -- exit ()
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
Just (Right chu) -> do
ws <- for chu $ \h -> do
atomically (ncqLookupEntrySTM ncq h) >>= \case
Just (NCQEntry w, EntryHere bs) -> do
off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0)
n <- lift (appendSection fh bs)
atomically (writeTVar w (EntryThere (FileLocation fk off (fromIntegral n))))
pure n
_ -> pure 0
let written = sum ws
loop $ RunSync (fk, fh, w + written, total' + written, True)
mapM_ wait [closer]
where
setAlive = atomically $ writeTVar ncqAlive True
unsetAlive = atomically $ writeTVar ncqAlive False
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do
fk <- ncqGetNewFileKey ncq DataFile
atomically $ modifyTVar ncqCurrentFossils (HS.insert fk)
ncqStateUpdate ncq (ncqStateAddDataFile fk)
let fname = ncqGetFileName ncq (DataFile fk)
-- touch fname
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
spawnActivity m = do
a <- ContT $ withAsync m
link a
pure a
measureWPS :: m ()
measureWPS = void $ flip fix Nothing \loop -> \case
Nothing -> do
w <- readTVarIO ncqWrites
t <- getTimeCoarse
pause @'Seconds step >> loop (Just (w,t))
Just (w0,t0) -> do
w1 <- readTVarIO ncqWrites
t1 <- getTimeCoarse
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
dw = fromIntegral (w1 - w0)
atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
pause @'Seconds step >> loop (Just (w1,t1))
where
alpha = 0.1
step = 1.00
postponed n m = liftIO (pause @'Seconds n) >> m
compactLoop :: TVar Bool
-> Timeout 'Seconds
-> Timeout 'Seconds
-> m Bool
-> m ()
compactLoop flag t1 t2 what = forever $ void $ runMaybeT do
ema <- readTVarIO ncqWriteEMA
fired <- ncqGetFlag flag
when (ema > ncqIdleThrsh && not fired) $ pause @'Seconds t1 >> mzero
ncqClearFlag flag
compacted <- lift what
when compacted mzero
k0 <- readTVarIO ncqStateKey
void $ lift $ race (pause @'Seconds t2) do
flip fix k0 $ \waitState k1 -> do
pause @'Seconds 60
k2 <- readTVarIO ncqStateKey
when (k2 == k1) $ waitState k2
data RunSt =
RunNew
| RunWrite (FileKey, Fd, Int, Int)
| RunSync (FileKey, Fd, Int, Int, Bool)
| RunFin