{-# Language MultiWayIf #-} module HBS2.Storage.NCQ3.Internal.Run where import HBS2.Storage.NCQ.Types hiding (FileKey) import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.Files import HBS2.Storage.NCQ3.Internal.Memtable import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Sweep import HBS2.Storage.NCQ3.Internal.MMapCache import HBS2.Storage.NCQ3.Internal.Fossil import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.HashSet qualified as HS import Data.Vector qualified as V import Data.Sequence qualified as Seq import Data.Fixed import System.Posix.IO as PosixBase import System.Posix.Types as Posix import System.Posix.Unistd import System.Posix.IO.ByteString as Posix import Control.Concurrent.STM qualified as STM import System.FileLock as FL ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageStop NCQStorage{..} = do atomically $ writeTVar ncqStopReq True ncqStorageRun :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do ContT $ bracket setAlive (const unsetAlive) ContT $ bracket none $ const $ liftIO do readTVarIO ncqFileLock >>= mapM_ FL.unlockFile ContT $ bracket none $ const $ liftIO do debug "storage done" closeQ <- liftIO newTQueueIO closer <- spawnActivity $ liftIO $ fix \loop -> do what <- atomically do tryReadTQueue closeQ >>= \case Just e -> pure $ Just e Nothing -> do stop <- readTVar ncqStopReq if not stop then STM.retry else pure Nothing maybe1 what none $ \(fk :: FileKey) -> do ncqIndexFile ncq Nothing (DataFile fk) atomically $ modifyTVar ncqCurrentFossils (HS.delete fk) loop let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do let q = ncqWriteOps ! i forever (liftIO $ join $ atomically (readTQueue q)) replicateM_ ncqReadThreads $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do (h, answ) <- atomically $ readTQueue ncqReadReq let answer l = atomically (putTMVar answ l) -- debug $ "REQ" <+> pretty h atomically (ncqLookupEntrySTM ncq h) >>= \case Nothing -> none Just (_, EntryHere bs) -> answer (Just (InMemory bs)) >> exit () Just (_, EntryThere loc) -> answer (Just $ InFossil loc) >> exit () ContT $ ncqWithState ncq NCQState{..} <- readTVarIO ncqState for_ ncqStateIndex $ \(_, fk) -> do CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk lift (ncqLookupIndex h (bs, nw)) >>= \case Just (IndexEntry fk o s) -> answer (Just (InFossil (FileLocation fk o s))) >> exit () Nothing -> none -- debug $ "NOT FOUND SHIT" <+> pretty h answer Nothing >> exit () -- spawnActivity measureWPS spawnActivity (ncqStateUpdateLoop ncq) spawnActivity $ forever do pause @'Seconds 30 ema <- readTVarIO ncqWriteEMA debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema) spawnActivity $ postponed 30 $ forever do lsInit <- ncqLiveKeys ncq <&> HS.size void $ race (pause @'Seconds 30) do flip fix lsInit $ \next ls0 -> do (lsA,lsB) <- atomically do ema <- readTVar ncqWriteEMA ls1 <- ncqLiveKeysSTM ncq <&> HS.size if ls1 /= ls0 && ema < ncqIdleThrsh then pure (ls0,ls1) else STM.retry debug $ "do sweep" <+> pretty lsA <+> pretty lsB ncqSweepObsoleteStates ncq ncqSweepFiles ncq next lsB spawnActivity $ postponed 20 $ compactLoop 10 30 do ncqIndexCompactStep ncq spawnActivity $ postponed 20 $ compactLoop 10 60 do ncqFossilMergeStep ncq flip fix RunNew $ \loop -> \case RunFin -> do debug "exit storage" atomically $ pollSTM closer >>= maybe STM.retry (const none) RunNew -> do alive <- readTVarIO ncqAlive empty <- readTVarIO ncqWriteQ <&> Seq.null if not alive && empty then loop RunFin else do (fk, fhx) <- openNewDataFile loop $ RunWrite (fk, fhx, 0, 0) RunSync (fk, fh, w, total, continue) -> do (stop,sync) <- atomically do (,) <$> readTVar ncqStopReq <*> readTVar ncqSyncReq -- <*> readTVar ncqWriteEMA let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then pure w else do ss <- appendTailSection fh liftIO (fileSynchronise fh) -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize -- atomically $ ncqDeferredWriteOpSTM ncq do ncqStateUpdate ncq do ncqStateAddFact (P (PData (DataFile fk) ss)) atomically do writeTVar ncqSyncReq False modifyTVar ncqSyncNo succ pure 0 if | needClose && continue -> do liftIO $ closeFd fh atomically $ writeTQueue closeQ fk loop RunNew | not continue -> loop RunFin | otherwise -> loop $ RunWrite (fk, fh, rest, total) RunWrite (fk, fh, w, total') -> do let timeoutMicro = 10_000_000 chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStopReq sy <- readTVar ncqSyncReq chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) if | Seq.null chunk && stop -> pure $ Left () | Seq.null chunk && not (stop || sy) -> STM.retry | otherwise -> pure $ Right chunk case chunk of Nothing -> do liftIO $ join $ readTVarIO ncqOnRunWriteIdle if w == 0 then do loop $ RunWrite (fk,fh,w,total') else do atomically $ writeTVar ncqSyncReq True loop $ RunSync (fk, fh, w, total', True) -- exit () Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () Just (Right chu) -> do ws <- for chu $ \h -> do atomically (ncqLookupEntrySTM ncq h) >>= \case Just (NCQEntry w, EntryHere bs) -> do off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) n <- lift (appendSection fh bs) atomically (writeTVar w (EntryThere (FileLocation fk off (fromIntegral n)))) pure n _ -> pure 0 let written = sum ws loop $ RunSync (fk, fh, w + written, total' + written, True) mapM_ wait [closer] where setAlive = atomically $ writeTVar ncqAlive True unsetAlive = atomically $ writeTVar ncqAlive False openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fk <- ncqGetNewFileKey ncq DataFile atomically $ modifyTVar ncqCurrentFossils (HS.insert fk) ncqStateUpdate ncq (ncqStateAddDataFile fk) let fname = ncqGetFileName ncq (DataFile fk) -- touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) spawnActivity m = do a <- ContT $ withAsync m link a pure a measureWPS :: m () measureWPS = void $ flip fix Nothing \loop -> \case Nothing -> do w <- readTVarIO ncqWrites t <- getTimeCoarse pause @'Seconds step >> loop (Just (w,t)) Just (w0,t0) -> do w1 <- readTVarIO ncqWrites t1 <- getTimeCoarse let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 dw = fromIntegral (w1 - w0) atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema pause @'Seconds step >> loop (Just (w1,t1)) where alpha = 0.1 step = 1.00 postponed n m = liftIO (pause @'Seconds n) >> m compactLoop :: Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m () compactLoop t1 t2 what = forever $ void $ runMaybeT do ema <- readTVarIO ncqWriteEMA when (ema > ncqIdleThrsh) $ pause @'Seconds t1 >> mzero compacted <- lift what when compacted mzero k0 <- readTVarIO ncqStateKey void $ lift $ race (pause @'Seconds t2) do flip fix k0 $ \waitState k1 -> do pause @'Seconds 60 k2 <- readTVarIO ncqStateKey when (k2 == k1) $ waitState k2 data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) | RunFin