{-# Language MultiWayIf #-} module HBS2.Storage.NCQ3.Internal.Run where import HBS2.Storage.NCQ.Types hiding (FileKey) import HBS2.Storage.NCQ3.Internal.Prelude import HBS2.Storage.NCQ3.Internal.Types import HBS2.Storage.NCQ3.Internal.Files import HBS2.Storage.NCQ3.Internal.Memtable import HBS2.Storage.NCQ3.Internal.Index import HBS2.Storage.NCQ3.Internal.State import HBS2.Storage.NCQ3.Internal.Sweep import HBS2.Storage.NCQ3.Internal.MMapCache import HBS2.Storage.NCQ3.Internal.Fossil import HBS2.Storage.NCQ3.Internal.Flags import HBS2.Storage.NCQ3.Internal.Fsync import Control.Concurrent.STM qualified as STM import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.Either import Data.Fixed import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM import Data.List qualified as List import Data.Sequence qualified as Seq import Data.Set qualified as Set import Data.Vector qualified as V import System.FileLock as FL import System.Posix.Files qualified as PFS import System.Posix.IO as PosixBase import System.Posix.IO.ByteString as Posix import System.Posix.Types as Posix import System.Posix.Unistd {- HLINT ignore "Eta reduce" -} ncqStorageStop :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageStop NCQStorage{..} = do atomically $ writeTVar ncqStopReq True ncqRemoveGarbage :: forall m. MonadIO m => NCQStorage -> m () ncqRemoveGarbage me = do let wd = ncqGetWorkDir me let garb x = List.isSuffixOf ".part" x || List.isSuffixOf ".cq$" x || List.isSuffixOf ".merge" x dirFiles wd <&> filter garb >>= mapM_ rm ncqTryLoadState :: forall m. MonadUnliftIO m => NCQStorage -> m () ncqTryLoadState me@NCQStorage{..} = do debug "ncqTryLoadState" stateFiles <- ncqListFilesBy me ( List.isPrefixOf "s-" ) <&> List.sortOn ( Down . snd ) r <- flip fix ([], ncqState0, stateFiles) $ \next -> \case (r, s, []) -> pure (r,s,[]) (l, s0, (_,s):ss) -> do readStateMay me s >>= \case Nothing -> next (s : l, s0, ss) Just ns -> do ok <- checkState ns debug $ "state status" <+> pretty s <+> pretty ok if ok then pure (l <> fmap snd ss, ns, ss) else next (s : l, s0, ss) let (bad, new@NCQState{..}, rest) = r atomically $ modifyTVar ncqState (<> new) for_ [ (d,s) | P (PData d s) <- Set.toList ncqStateFacts ] $ \(dataFile,s) -> do let path = ncqGetFileName me dataFile realSize <- fileSize path let sizewtf = realSize /= fromIntegral s flip fix 0 $ \again i -> do good <- try @_ @NCQFsckException (ncqFileFastCheck path) let corrupted = isLeft good if not corrupted then do debug $ yellow "indexing" <+> pretty dataFile ncqIndexFile me Nothing dataFile else do o <- ncqFileTryRecover path warn $ "ncqFileTryRecover" <+> pretty path <+> pretty o <+> parens (pretty realSize) let best = if i < 1 then max s o else s warn $ red "trim" <+> pretty s <+> pretty best <+> red (pretty (fromIntegral best - realSize)) <+> pretty (takeFileName path) liftIO $ PFS.setFileSize path (fromIntegral best) if i <= 1 then again (succ i) else pure Nothing for_ (bad <> fmap snd rest) $ \f -> do let old = ncqGetFileName me (StateFile f) rm old where -- TODO: created-but-not-indexed-file? checkState NCQState{..} = flip runContT pure $ callCC \exit -> do for_ ncqStateFiles $ \fk -> do let dataFile = ncqGetFileName me (DataFile fk) here <- doesFileExist dataFile unless here $ exit False -- lift (try @_ @SomeException (ncqFileFastCheck dataFile)) >>= \case -- Right () -> none -- Left e -> do -- warn (viaShow e) -- let known = HM.lookup fk facts -- fs <- fileSize dataFile -- warn $ "file is incomplete (or damaged)" -- <+> pretty dataFile -- <+> "actual:" <+> pretty fs -- <+> "known:" <+> pretty known -- let ok = isJust known && Just (fromIntegral fs) >= known -- unless ok $ exit False for_ ncqStateIndex $ \(_,fk) -> do let idxFile = ncqGetFileName me (IndexFile fk) here <- doesFileExist idxFile unless here do err $ red "missed index in state" <+> pretty idxFile exit False pure True ncqStorageRun :: forall m . MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = withSem ncqRunSem $ flip runContT pure do debug "ncqStorageRun" liftIO (FL.tryLockFile (ncqGetFileName ncq ".lock") Exclusive) >>= orThrow NCQStorageCurrentAlreadyOpen >>= atomically . writeTVar ncqFileLock . Just ContT $ bracket setAlive (const unsetAlive) ContT $ bracket none $ const $ liftIO do readTVarIO ncqFileLock >>= mapM_ FL.unlockFile ContT $ bracket none $ const $ liftIO do void $ ncqStateDump ncq debug "storage done" ncqRemoveGarbage ncq liftIO (ncqTryLoadState ncq) indexQ <- liftIO newTQueueIO indexer <- spawnActivity $ liftIO $ fix \loop -> do what <- atomically do tryReadTQueue indexQ >>= \case Just e -> pure $ Just e Nothing -> do stop <- readTVar ncqStopReq if not stop then STM.retry else pure Nothing maybe1 what none $ \(fk :: FileKey) -> do ncqIndexFile ncq Nothing (DataFile fk) dropReplaces fk atomically $ modifyTVar ncqCurrentFossils (HS.delete fk) loop let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do let q = ncqWriteOps ! i forever (liftIO $ join $ atomically (readTQueue q)) replicateM_ ncqReadThreads $ spawnActivity $ forever $ flip runContT pure $ callCC \exit -> do (h, answ) <- atomically $ readTQueue ncqReadReq let answer l = atomically (putTMVar answ l) -- debug $ "REQ" <+> pretty h atomically (ncqLookupEntrySTM ncq h) >>= \case Nothing -> none Just (_, EntryHere bs) -> answer (Just (InMemory bs)) >> exit () Just (_, EntryThere loc) -> answer (Just $ InFossil loc) >> exit () ContT $ ncqWithState ncq -- debug $ "REQ IN STATE" <+> pretty h NCQState{..} <- readTVarIO ncqState for_ ncqStateIndex $ \(_, fk) -> do -- debug $ "SCAN FUCKING INDEX" <+> pretty fk CachedIndex bs nw <- lift $ ncqGetCachedIndex ncq fk lift (ncqLookupIndex h (bs, nw)) >>= \case Just (IndexEntry fk o s) -> answer (Just (InFossil (FileLocation fk o s))) >> exit () Nothing -> none -- debug $ "NOT FOUND SHIT" <+> pretty h answer Nothing >> exit () spawnActivity measureWPS spawnActivity (ncqStateUpdateLoop ncq) spawnActivity $ forever do pause @'Seconds 30 ema <- readTVarIO ncqWriteEMA debug $ "EMA" <+> pretty (realToFrac @_ @(Fixed E3) ema) spawnActivity $ postponed ncqPostponeService $ forever do ncqSweepObsoleteStates ncq ncqSweepFiles ncq void $ race (pause @'Seconds ncqSweepTime) do atomically (ncqWaitFlagSTM ncqSweepReq) spawnActivity $ postponed ncqPostponeService $ compactLoop ncqMergeReq ncqMergeTimeA ncqMergeTimeB $ withSem ncqServiceSem do a <- ncqFossilMergeStep ncq b <- ncqIndexCompactStep ncq pure $ a || b flip fix RunNew $ \loop s -> do -- debug $ viaShow s case s of RunFin mfh -> do liftIO $ for_ mfh closeFd rest <- readTVarIO ncqWriteQ <&> Seq.length debug $ "exit storage" <+> pretty rest atomically $ pollSTM indexer >>= maybe STM.retry (const none) RunNew -> do alive <- readTVarIO ncqAlive empty <- readTVarIO ncqWriteQ <&> Seq.null if not alive && empty then loop (RunFin Nothing) else do (fk, fhx) <- openNewDataFile loop $ RunWrite (fk, fhx, 0, 0) RunSync (fk, fh, w, total, continue) -> do (stop,sync) <- atomically do (,) <$> readTVar ncqStopReq <*> readTVar ncqSyncReq -- <*> readTVar ncqWriteEMA let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then pure w else do ss <- appendTailSection fh liftIO (fileSynchronisePortable fh) flushReplaces fk ncqStateUpdate ncq do ncqStateAddFact (P (PData (DataFile fk) ss)) -- ss <- liftIO (PFS.getFdStatus fh) <&> fromIntegral . PFS.fileSize -- atomically $ ncqDeferredWriteOpSTM ncq do atomically do writeTVar ncqSyncReq False modifyTVar ncqSyncNo succ pure 0 if | needClose && continue -> do liftIO $ closeFd fh flushReplaces fk debug $ "closeFd" <+> viaShow fh atomically $ writeTQueue indexQ fk loop RunNew | not continue -> loop (RunFin (Just fh)) | otherwise -> loop $ RunWrite (fk, fh, rest, total) RunWrite (fk, fh, w, total') -> do let timeoutMicro = 10_000_000 chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStopReq sy <- readTVar ncqSyncReq chunk <- if not stop then stateTVar ncqWriteQ (Seq.splitAt 1) else do r <- readTVar ncqWriteQ modifyTVar ncqWriteQ mempty pure r if | Seq.null chunk && stop -> pure $ Left () | Seq.null chunk && not (stop || sy) -> STM.retry | otherwise -> pure $ Right chunk stop <- readTVarIO ncqStopReq case chunk of Nothing -> do liftIO $ join $ readTVarIO ncqOnRunWriteIdle stop <- readTVarIO ncqStopReq if w == 0 && not stop then do loop $ RunWrite (fk,fh,w,total') else do atomically $ writeTVar ncqSyncReq True loop $ RunSync (fk, fh, w, total', not stop) -- exit () Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () Just (Right chu) -> do ws <- for chu $ \h -> do atomically (ncqLookupEntrySTM ncq h) >>= \case Just (NCQEntry w, EntryHere bs) -> do off <- fromIntegral <$> liftIO (fdSeek fh RelativeSeek 0) n <- lift (appendSection fh bs) let op = do readTVar w >>= \case EntryHere bs1 | bs1 == bs -> do writeTVar w (EntryThere (FileLocation fk off (fromIntegral n))) _ -> none atomically $ modifyTVar ncqReplQueue (HM.insertWith (<>) fk [op]) pure n _ -> pure 0 let written = sum ws loop $ RunSync (fk, fh, w + written, total' + written, not stop) mapM_ wait [indexer] where setAlive = atomically $ writeTVar ncqAlive True unsetAlive = atomically $ writeTVar ncqAlive False dropReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 () dropReplaces fk = atomically do modifyTVar ncqReplQueue (HM.delete fk) flushReplaces :: forall m1 . MonadIO m1 => FileKey -> m1 () flushReplaces fk = do atomically do ncqDelCachedDataSTM ncq fk ops <- readTVar ncqReplQueue <&> fromMaybe mempty . HM.lookup fk modifyTVar ncqReplQueue (HM.delete fk) sequence_ ops openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fk <- ncqGetNewFileKey ncq DataFile atomically $ modifyTVar ncqCurrentFossils (HS.insert fk) ncqStateUpdate ncq do ncqStateAddFact (P (PData (DataFile fk) 0)) ncqStateAddDataFile fk let fname = ncqGetFileName ncq (DataFile fk) -- touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fk,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) spawnActivity m = do a <- ContT $ withAsync m link a pure a measureWPS :: m () measureWPS = void $ flip fix Nothing \loop -> \case Nothing -> do w <- readTVarIO ncqWrites t <- getTimeCoarse pause @'Seconds step >> loop (Just (w,t)) Just (w0,t0) -> do w1 <- readTVarIO ncqWrites t1 <- getTimeCoarse let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 dw = fromIntegral (w1 - w0) atomically $ modifyTVar ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema pause @'Seconds step >> loop (Just (w1,t1)) where alpha = 0.1 step = 1.00 postponed n m = liftIO (pause @'Seconds n) >> m compactLoop :: TVar Bool -> Timeout 'Seconds -> Timeout 'Seconds -> m Bool -> m () compactLoop flag t1 t2 what = forever $ void $ runMaybeT do ema <- readTVarIO ncqWriteEMA fired <- ncqGetFlag flag when (ema > ncqIdleThrsh && not fired) $ pause @'Seconds t1 >> mzero ncqClearFlag flag compacted <- lift what when compacted mzero k0 <- readTVarIO ncqStateKey void $ lift $ race (pause @'Seconds t2) do flip fix k0 $ \waitState k1 -> do pause @'Seconds 60 k2 <- readTVarIO ncqStateKey when (k2 == k1) $ waitState k2 data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) | RunFin (Maybe Fd) deriving stock Show