hbs2/hbs2-storage-ncq/lib/HBS2/Storage/NCQ3/Internal/Run.hs

284 lines
8.7 KiB
Haskell

{-# Language MultiWayIf #-}
module HBS2.Storage.NCQ3.Internal.Run where
import HBS2.Storage.NCQ.Types hiding (FileKey)
import HBS2.Storage.NCQ3.Internal.Prelude
import HBS2.Storage.NCQ3.Internal.Types
import HBS2.Storage.NCQ3.Internal.Files
import HBS2.Storage.NCQ3.Internal.Memtable
import HBS2.Storage.NCQ3.Internal.Index
import HBS2.Storage.NCQ3.Internal.State
import HBS2.Storage.NCQ3.Internal.Sweep
import HBS2.Storage.NCQ3.Internal.MMapCache
import HBS2.Storage.NCQ3.Internal.Fossil
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Network.ByteOrder qualified as N
import Data.HashSet qualified as HS
import Data.HashPSQ qualified as PSQ
import Data.Vector qualified as V
import Data.HashMap.Strict qualified as HM
import Data.ByteString qualified as BS
import Data.Sequence qualified as Seq
import Data.Fixed
import System.FilePath.Posix
import System.Posix.Files qualified as Posix
import System.Posix.IO as PosixBase
import System.Posix.Types as Posix
import System.Posix.Unistd
import System.Posix.IO.ByteString as Posix
import System.Posix.Files ( getFileStatus
, modificationTimeHiRes
, setFileTimesHiRes
, getFdStatus
, FileStatus(..)
, setFileMode
)
import System.Posix.Files qualified as PFS
import System.IO.MMap as MMap
import Control.Concurrent.STM qualified as STM
import System.FileLock as FL
ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m ()
ncqStorageStop NCQStorage{..} = do
atomically $ writeTVar ncqStopReq True
ncqStorageRun :: forall m . MonadUnliftIO m
=> NCQStorage
-> m ()
ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
ContT $ bracket setAlive (const unsetAlive)
ContT $ bracket none $ const $ liftIO do
readTVarIO ncqFileLock >>= mapM_ FL.unlockFile
closeQ <- liftIO newTQueueIO
closer <- spawnActivity $ liftIO $ fix \loop -> do
what <- atomically do
tryReadTQueue closeQ >>= \case
Just e -> pure $ Just e
Nothing -> do
stop <- readTVar ncqStopReq
if not stop then STM.retry else pure Nothing
maybe1 what none $ \(fk :: FileKey, fh) -> do
closeFd fh >> ncqIndexFile ncq (DataFile fk) >> loop
let shLast = V.length ncqWriteOps - 1
spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do
let q = ncqWriteOps ! i
forever (liftIO $ join $ atomically (readTQueue q))
replicateM_ 2 $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do
(h, answ) <- atomically $ readTQueue ncqReadReq
let answer l = atomically (putTMVar answ l)
-- debug $ "REQ" <+> pretty h
atomically (ncqLookupEntrySTM ncq h) >>= \case
Nothing -> none
Just e -> answer (Just (InMemory (ncqEntryData e))) >> exit ()
ContT $ ncqWithState ncq
NCQState{..} <- readTVarIO ncqState
for_ ncqStateIndex $ \(_, fk) -> do
CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk
lift (ncqLookupIndex h (bs, nw)) >>= \case
Just (IndexEntry fk o s) -> answer (Just (InFossil fk o s)) >> exit ()
Nothing -> none
-- debug $ "NOT FOUND SHIT" <+> pretty h
answer Nothing >> exit ()
spawnActivity measureWPS
-- spawnActivity (ncqStateUpdateLoop ncq)
spawnActivity $ forever do
pause @'Seconds 30
ema <- readTVarIO ncqWriteEMA
debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema)
spawnActivity $ postponed 10 $ forever do
lsInit <- ncqLiveKeys ncq <&> HS.size
void $ race (pause @'Seconds 60) do
flip fix lsInit $ \next ls0 -> do
(lsA,lsB) <- atomically do
ema <- readTVar ncqWriteEMA
ls1 <- ncqLiveKeysSTM ncq <&> HS.size
if ls1 /= ls0 && ema < ncqIdleThrsh then
pure (ls0,ls1)
else
STM.retry
debug $ "do sweep" <+> pretty lsA <+> pretty lsB
ncqSweepObsoleteStates ncq
ncqSweepFiles ncq
next lsB
spawnActivity $ postponed 10 $ compactLoop 10 300 do
ncqIndexCompactStep ncq
spawnActivity $ postponed 15 $ compactLoop 10 600 do
ncqFossilMergeStep ncq
flip fix RunNew $ \loop -> \case
RunFin -> do
debug "exit storage"
atomically $ pollSTM closer >>= maybe STM.retry (const none)
RunNew -> do
alive <- readTVarIO ncqAlive
empty <- readTVarIO ncqWriteQ <&> Seq.null
if not alive && empty
then loop RunFin
else do
(fk, fhx) <- openNewDataFile
loop $ RunWrite (fk, fhx, 0, 0)
RunSync (fk, fh, w, total, continue) -> do
(stop,sync) <- atomically do
(,) <$> readTVar ncqStopReq
<*> readTVar ncqSyncReq
-- <*> readTVar ncqWriteEMA
let needClose = total >= ncqMinLog || stop
rest <- if not (sync || needClose || w > ncqFsync) then
pure w
else do
ss <- appendTailSection fh
liftIO (fileSynchronise fh)
-- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize
ncqStateUpdate ncq do
ncqStateAddFact (P (PData (DataFile fk) ss))
atomically do
writeTVar ncqSyncReq False
modifyTVar ncqSyncNo succ
pure 0
if | needClose && continue -> do
atomically $ writeTQueue closeQ (fk, fh)
loop RunNew
| not continue -> loop RunFin
| otherwise -> loop $ RunWrite (fk, fh, rest, total)
RunWrite (fk, fh, w, total') -> do
let timeoutMicro = 10_000_000
chunk <- liftIO $ timeout timeoutMicro $ atomically do
stop <- readTVar ncqStopReq
sy <- readTVar ncqSyncReq
chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock)
if | Seq.null chunk && stop -> pure $ Left ()
| Seq.null chunk && not (stop || sy) -> STM.retry
| otherwise -> pure $ Right chunk
case chunk of
Nothing -> do
liftIO $ join $ readTVarIO ncqOnRunWriteIdle
if w == 0 then do
loop $ RunWrite (fk,fh,w,total')
else do
atomically $ writeTVar ncqSyncReq True
loop $ RunSync (fk, fh, w, total', True) -- exit ()
Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit ()
Just (Right chu) -> do
ws <- for chu $ \h -> do
atomically (ncqLookupEntrySTM ncq h) >>= \case
Just (NCQEntry bs w) -> do
atomically (writeTVar w (Just fk))
lift (appendSection fh bs)
_ -> pure 0
let written = sum ws
loop $ RunSync (fk, fh, w + written, total' + written, True)
mapM_ wait [closer]
where
setAlive = atomically $ writeTVar ncqAlive True
unsetAlive = atomically $ writeTVar ncqAlive False
openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd)
openNewDataFile = do
fk <- ncqGetNewFileKey ncq DataFile
let fname = ncqGetFileName ncq (DataFile fk)
touch fname
let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 }
(fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags)
spawnActivity m = do
a <- ContT $ withAsync m
link a
pure a
measureWPS = void $ flip fix Nothing \loop -> \case
Nothing -> do
w <- readTVarIO ncqWrites
t <- getTimeCoarse
pause @'Seconds step >> loop (Just (w,t))
Just (w0,t0) -> do
w1 <- readTVarIO ncqWrites
t1 <- getTimeCoarse
let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9
dw = fromIntegral (w1 - w0)
atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema
pause @'Seconds step >> loop (Just (w1,t1))
where
alpha = 0.1
step = 1.00
postponed n m = liftIO (pause @'Seconds n) >> m
compactLoop :: Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m ()
compactLoop t1 t2 what = forever $ void $ runMaybeT do
ema <- readTVarIO ncqWriteEMA
when (ema > ncqIdleThrsh) $ pause @'Seconds t1 >> mzero
compacted <- lift what
when compacted mzero
k0 <- readTVarIO ncqStateKey
void $ lift $ race (pause @'Seconds t2) do
flip fix k0 $ \waitState k1 -> do
pause @'Seconds 60
k2 <- readTVarIO ncqStateKey
when (k2 == k1) $ waitState k2
data RunSt =
RunNew
| RunWrite (FileKey, Fd, Int, Int)
| RunSync (FileKey, Fd, Int, Int, Bool)
| RunFin