{-# Language MultiWayIf #-} {-# Language RecordWildCards #-} {-# Language PatternSynonyms #-} module HBS2.Storage.NCQ2 ( module HBS2.Storage.NCQ2 , module HBS2.Storage.NCQ.Types ) where import HBS2.Prelude.Plated import HBS2.Hash import HBS2.OrDie import HBS2.Data.Types.Refs import HBS2.Base58 import HBS2.Net.Auth.Credentials import HBS2.Storage import HBS2.Misc.PrettyStuff import HBS2.System.Logger.Simple.ANSI import HBS2.Data.Log.Structured.NCQ import HBS2.Data.Log.Structured.SD import HBS2.Storage.NCQ.Types import Data.Config.Suckless.System import Data.Config.Suckless.Script hiding (void) import Codec.Compression.Zstd qualified as Zstd import Codec.Compression.Zstd.Lazy as ZstdL import Codec.Compression.Zstd.Streaming qualified as ZstdS import Codec.Compression.Zstd.Streaming (Result(..)) import Numeric (showHex) import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N import Data.Bit.ThreadSafe qualified as BV import Data.HashMap.Strict (HashMap) import Control.Monad.Except import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.Time.Clock.POSIX import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM.TSem import Data.Hashable (hash) import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) import Data.IntSet qualified as IntSet import Data.IntSet (IntSet) import Data.Sequence qualified as Seq import Data.Sequence (Seq(..), (|>),(<|)) import Data.List qualified as List import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.Char (isDigit) import Data.Fixed import Data.Coerce import Data.Word import Data.Either import Data.Maybe import Data.Text qualified as Text import Data.Text.IO qualified as Text import Data.Int import Data.Vector qualified as V import Data.Vector (Vector, (!)) import Lens.Micro.Platform import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM import System.FilePath.Posix import System.Posix.Fcntl import System.Posix.Files qualified as Posix import System.Posix.IO as PosixBase import System.Posix.Types as Posix import System.Posix.IO.ByteString as Posix import System.Posix.Unistd import System.Posix.Files ( getFileStatus , modificationTimeHiRes , setFileTimesHiRes , getFdStatus , FileStatus(..) , setFileMode ) import System.Posix.Files qualified as PFS import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) import System.Mem -- import Foreign.Ptr -- import Foreign di import qualified Data.ByteString.Internal as BSI import Streaming.Prelude qualified as S import UnliftIO import UnliftIO.Concurrent(getNumCapabilities) import UnliftIO.IO.File -- FIXME: ASAP-USE-FILE-LOCK import System.FileLock as FL type FOff = Word64 data NCQEntry = NCQEntry { ncqEntryData :: ByteString , ncqDumped :: TVar (Maybe FileKey) } type Shard = TVar (HashMap HashRef NCQEntry) type NCQOffset = Word64 type NCQSize = Word32 type StateVersion = Word64 data StateOP = D FileKey | F TimeSpec FileKey | P FileKey deriving (Eq,Ord,Show) data NCQFlag = NCQMergeNow | NCQCompactNow deriving (Eq,Ord,Generic) data Location = InFossil FileKey !ByteString !NCQOffset !NCQSize | InMemory ByteString instance Pretty Location where pretty = \case InFossil k _ o s -> parens $ "in-fossil" <+> pretty k <+> pretty o <+> pretty s InMemory _ -> "in-memory" data TrackedFile = TrackedFile { tfTime :: FilePrio , tfKey :: FileKey , tfCached :: TVar (Maybe CachedEntry) } type TrackedFiles = Vector TrackedFile data NCQStorage2 = NCQStorage2 { ncqRoot :: FilePath , ncqGen :: Int , ncqSalt :: HashRef , ncqPostponeMerge :: Timeout 'Seconds , ncqPostponeSweep :: Timeout 'Seconds , ncqLuckyNum :: Int , ncqFsync :: Int , ncqWriteQLen :: Int , ncqWriteBlock :: Int , ncqMinLog :: Int , ncqMaxLog :: Int , ncqMaxCached :: Int , ncqIdleThrsh :: Double , ncqMemTable :: Vector Shard , ncqWriteQ :: TVar (Seq HashRef) , ncqWriteOps :: Vector (TQueue (IO ())) , ncqStorageTasks :: TVar Int , ncqStorageStopReq :: TVar Bool , ncqStorageSyncReq :: TVar Bool , ncqMergeReq :: TVar Bool , ncqMergeSem :: TSem , ncqSyncNo :: TVar Int , ncqCurrentFiles :: TVar (HashSet FileKey) , ncqTrackedFiles :: TVar TrackedFiles , ncqStateVersion :: TVar StateVersion , ncqStateUsage :: TVar (IntMap (Int, HashSet FileKey)) , ncqStateName :: TVar (Maybe StateFile) , ncqStateSem :: TSem , ncqCachedEntries :: TVar Int , ncqWrites :: TVar Int , ncqWriteEMA :: TVar Double -- for writes-per-seconds , ncqJobQ :: TQueue (IO ()) , ncqMiscSem :: TSem , ncqSweepSem :: TSem , ncqMergeTasks :: TVar Int , ncqOnRunWriteIdle :: TVar (IO ()) , ncqReadSem :: TSem } megabytes :: forall a . Integral a => a megabytes = 1024 ^ 2 gigabytes :: forall a . Integral a => a gigabytes = 1024 ^ 3 ncqStorageOpen2 :: MonadIO m => FilePath -> (NCQStorage2 -> NCQStorage2)-> m NCQStorage2 ncqStorageOpen2 fp upd = do let ncqRoot = fp let ncqGen = 0 let ncqFsync = 8 * megabytes let ncqWriteQLen = 1024 * 4 let ncqMinLog = 512 * megabytes let ncqMaxLog = 16 * gigabytes -- ??? let ncqWriteBlock = max 128 $ ncqWriteQLen `div` 2 let ncqMaxCached = 128 let ncqIdleThrsh = 50.00 let ncqPostponeMerge = 600.00 let ncqPostponeSweep = 2 * ncqPostponeMerge let ncqLuckyNum = 2 let shardNum = ncqLuckyNum * 2 let wopNum = ncqLuckyNum cap <- getNumCapabilities <&> fromIntegral ncqWriteQ <- newTVarIO mempty ncqMemTable <- V.fromList <$> replicateM shardNum (newTVarIO mempty) ncqStorageStopReq <- newTVarIO False ncqStorageSyncReq <- newTVarIO False ncqMergeReq <- newTVarIO False ncqMergeSem <- atomically (newTSem 1) ncqSyncNo <- newTVarIO 0 ncqCurrentFiles <- newTVarIO mempty ncqTrackedFiles <- newTVarIO V.empty ncqStateVersion <- newTVarIO 0 ncqStateUsage <- newTVarIO mempty ncqStateName <- newTVarIO Nothing ncqStateSem <- atomically $ newTSem 1 ncqCachedEntries <- newTVarIO 0 ncqStorageTasks <- newTVarIO 0 ncqWrites <- newTVarIO 0 ncqWriteEMA <- newTVarIO 0.00 ncqJobQ <- newTQueueIO ncqMiscSem <- atomically (newTSem 1) ncqSweepSem <- atomically (newTSem 1) ncqMergeTasks <- newTVarIO 0 ncqOnRunWriteIdle <- newTVarIO none ncqReadSem <- atomically $ newTSem 1 ncqWriteOps <- replicateM wopNum newTQueueIO <&> V.fromList let ncqSalt = "EstEFasxrCFqsGDxcY4haFcha9e4ZHRzsPbGUmDfdxLk" let ncq = NCQStorage2{..} & upd mkdir (ncqGetWorkDir ncq) liftIO $ withSem ncqMergeSem do ncqRepair ncq ncqPreloadIndexes ncq ncqSweepStates ncq pure ncq ncqWithStorage :: MonadUnliftIO m => FilePath -> ( NCQStorage2 -> m a ) -> m a ncqWithStorage fp action = flip runContT pure do sto <- lift (ncqStorageOpen2 fp id) w <- ContT $ withAsync (ncqStorageRun2 sto) link w r <- lift (action sto) lift (ncqStorageStop2 sto) wait w pure r ncqGetFileName :: NCQStorage2 -> FilePath -> FilePath ncqGetFileName ncq fp = ncqGetWorkDir ncq takeFileName fp ncqGetWorkDir :: NCQStorage2 -> FilePath ncqGetWorkDir NCQStorage2{..} = ncqRoot show ncqGen ncqGetLockFileName :: NCQStorage2 -> FilePath ncqGetLockFileName ncq = ncqGetFileName ncq ".lock" ncqNewUniqFileName :: MonadIO m => NCQStorage2 -> FilePath -> FilePath -> m FilePath ncqNewUniqFileName me@NCQStorage2{..} pref suff = liftIO $ withSem ncqMiscSem do flip fix 0 $ \next i -> do t <- round @_ @Integer . (* 1e9) <$> getPOSIXTime let v = show $ pretty (showHex t "") <> "-" <> pretty (showHex i "") let n = ncqGetFileName me (pref <> v <> suff) doesFileExist n >>= \case False -> pure n True -> next (succ i) ncqGetNewFossilName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewFossilName me = ncqNewUniqFileName me "fossil-" ".data" ncqGetNewStateName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewStateName me = ncqNewUniqFileName me "state-" "" ncqGetNewCompactName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewCompactName me = ncqNewUniqFileName me "compact-" ".data" ncqStorageStop2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageStop2 NCQStorage2{..} = do atomically $ writeTVar ncqStorageStopReq True ncqStorageSync2 :: MonadUnliftIO m => NCQStorage2 -> m () ncqStorageSync2 NCQStorage2{..} = do atomically $ writeTVar ncqStorageSyncReq True ncqShardIdx :: NCQStorage2 -> HashRef -> Int ncqShardIdx NCQStorage2{..} h = fromIntegral (BS.head (coerce h)) `mod` V.length ncqMemTable {-# INLINE ncqShardIdx #-} ncqGetShard :: NCQStorage2 -> HashRef -> Shard ncqGetShard ncq@NCQStorage2{..} h = ncqMemTable ! ncqShardIdx ncq h {-# INLINE ncqGetShard #-} ncqLookupEntrySTM :: NCQStorage2 -> HashRef -> STM (Maybe NCQEntry) ncqLookupEntrySTM ncq h = readTVar (ncqGetShard ncq h) <&> HM.lookup h ncqAlterEntrySTM :: NCQStorage2 -> HashRef -> (Maybe NCQEntry -> Maybe NCQEntry) -> STM () ncqAlterEntrySTM ncq h alterFn = do let shard = ncqGetShard ncq h modifyTVar shard (HM.alter alterFn h) ncqPutBS :: MonadUnliftIO m => NCQStorage2 -> Maybe NCQSectionType -> Maybe HashRef -> ByteString -> m HashRef ncqPutBS ncq@NCQStorage2{..} mtp mhref bs' = do waiter <- newEmptyTMVarIO let work = do let h = fromMaybe (HashRef (hashObject @HbSync bs')) mhref let bs = ncqMakeSectionBS mtp h bs' let shard = ncqGetShard ncq h zero <- newTVarIO Nothing atomically do stop <- readTVar ncqStorageStopReq filled <- readTVar ncqWriteQ <&> Seq.length when (not stop && filled > ncqWriteQLen) STM.retry upd <- stateTVar shard $ flip HM.alterF h \case Nothing -> (True, Just (NCQEntry bs zero)) Just e | ncqEntryData e /= bs -> (True, Just (NCQEntry bs zero)) | otherwise -> (False, Just e) when upd do modifyTVar' ncqWriteQ (|> h) putTMVar waiter h atomically do nw <- readTVar ncqWrites <&> (`mod` V.length ncqWriteOps) modifyTVar' ncqWrites succ writeTQueue (ncqWriteOps ! nw) work atomically $ takeTMVar waiter ncqEntryUnwrap :: NCQStorage2 -> ByteString -> (ByteString, Either ByteString (NCQSectionType, ByteString)) ncqEntryUnwrap _ source = do let (k,v) = BS.splitAt ncqKeyLen (BS.drop 4 source) case ncqIsMeta v of Just meta -> (k, Right (meta, BS.drop ncqPrefixLen v)) Nothing -> (k, Left v) {-# INLINE ncqEntryUnwrap #-} ncqIsTomb :: NCQStorage2 -> Location -> Bool ncqIsTomb me loc = case ncqEntryUnwrap me (ncqGetEntryBS me loc) of (_, Right (T, _)) -> True _ -> False ncqDelEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m () ncqDelEntry me href = do -- всегда пишем tomb и надеемся на лучшее -- merge/compact разберутся -- однако не пишем, если записи еще нет ncqLocate2 me href >>= \case Just loc | not (ncqIsTomb me loc) -> do void $ ncqPutBS me (Just T) (Just href) "" _ -> none ncqLookupEntry :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe NCQEntry) ncqLookupEntry sto hash = atomically (ncqLookupEntrySTM sto hash) ncqGetEntryBS :: NCQStorage2 -> Location -> ByteString ncqGetEntryBS _ = \case InMemory bs -> bs InFossil _ mmap off size -> do BS.take (fromIntegral size) $ BS.drop (fromIntegral off) mmap ncqEntrySize :: forall a . Integral a => Location -> a ncqEntrySize = \case InFossil _ _ _ size -> fromIntegral size InMemory bs -> fromIntegral (BS.length bs) useVersion :: forall m a . MonadUnliftIO m => NCQStorage2 -> (() -> m a) -> m a useVersion ncq m = bracket succV predV m where succV = atomically (ncqStateUseSTM ncq) predV = const $ atomically (ncqStateUnuseSTM ncq) ncqListTrackedFilesSTM :: NCQStorage2 -> STM (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) ncqListTrackedFilesSTM NCQStorage2{..} = do fs <- readTVar ncqTrackedFiles for fs $ \TrackedFile{..} -> (tfKey,,) <$> readTVar tfCached <*> pure tfCached ncqListTrackedFiles :: MonadUnliftIO m => NCQStorage2 -> m (Vector (FileKey, Maybe CachedEntry, TVar (Maybe CachedEntry))) ncqListTrackedFiles = atomically . ncqListTrackedFilesSTM ncqPreloadIndexes :: MonadUnliftIO m => NCQStorage2 -> m () ncqPreloadIndexes me@NCQStorage2{..} = useVersion me $ const do fs <- readTVarIO ncqTrackedFiles <&> take ncqMaxCached . V.toList flip fix (fs, ncqMaxCached) $ \next (files,lim) -> do case files of (t@TrackedFile{..}:rest) | lim > 0 -> do readTVarIO tfCached >>= \case Nothing -> do void $ ncqLoadTrackedFile me t next (rest, pred lim) _ -> next (rest, lim) _ -> none ncqLoadTrackedFile :: MonadUnliftIO m => NCQStorage2 -> TrackedFile -> m (Maybe CachedEntry) ncqLoadTrackedFile ncq@NCQStorage2{..} TrackedFile{..} = runMaybeT do let indexFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) idxHere <- liftIO $ doesFileExist indexFile unless idxHere do liftIO $ err $ red "missed index" <+> "in loadIndex" <+> pretty tfKey mzero (idxBs, idxNway) <- MaybeT $ liftIO (nwayHashMMapReadOnly indexFile) datBs <- liftIO $ mmapFileByteString dataFile Nothing tnow <- liftIO $ newTVarIO =<< getTimeCoarse let ce = CachedEntry idxBs datBs idxNway tnow atomically do writeTVar tfCached (Just ce) modifyTVar ncqCachedEntries (+1) evictIfNeededSTM ncq (Just 1) pure ce data Seek a = SeekStop !a | SeekNext !a --- ncqSeekInFossils :: forall a f m . (MonadUnliftIO m, Monoid (f a)) => NCQStorage2 -> HashRef -> (Location -> m (Seek (f a))) -> m (f a) ncqSeekInFossils ncq@NCQStorage2{..} href action = withSem ncqReadSem $ useVersion ncq $ const do tracked <- readTVarIO ncqTrackedFiles let l = V.length tracked let go :: Int -> Int -> f a -> m (f a) go i a r | i >= l = pure r | a > 1 = do let TrackedFile{..} = tracked ! i err $ "unable to load fossil" <+> pretty tfKey go (i+1) 0 r | otherwise = do let TrackedFile{..} = tracked ! i readTVarIO tfCached >>= \case Just PendingEntry{} -> go (i+1) 0 r Nothing -> do void $ ncqLoadTrackedFile ncq TrackedFile{..} go i (a+1) r Just CachedEntry{..} -> do liftIO (ncqLookupIndex href (cachedMmapedIdx, cachedNway)) >>= \case Nothing -> go (i+1) 0 r Just (offset, size) -> do now <- getTimeCoarse atomically $ writeTVar cachedTs now action (InFossil tfKey cachedMmapedData offset size) >>= \case SeekStop e -> pure (r <> e) SeekNext e -> go (i+1) 0 (r <> e) go 0 0 mempty ncqLookupIndex :: MonadUnliftIO m => HashRef -> (ByteString, NWayHash) -> m (Maybe ( NCQOffset, NCQSize )) ncqLookupIndex hx (mmaped, nway) = do fmap decodeEntry <$> nwayHashLookup nway mmaped (coerce hx) where {-# INLINE decodeEntry #-} decodeEntry entryBs = do let (p,r) = BS.splitAt 8 entryBs let off = fromIntegral (N.word64 p) let size = fromIntegral (N.word32 (BS.take 4 r)) ( off, size ) {-# INLINE ncqLookupIndex #-} ncqLocate2 :: MonadUnliftIO m => NCQStorage2 -> HashRef -> m (Maybe Location) ncqLocate2 ncq href = do inMem <- ncqLookupEntry ncq href <&> fmap (InMemory . ncqEntryData) inFo <- listToMaybe <$> ncqSeekInFossils ncq href \loc -> pure (SeekStop [loc]) pure $ inMem <|> inFo data RunSt = RunNew | RunWrite (FileKey, Fd, Int, Int) | RunSync (FileKey, Fd, Int, Int, Bool) | RunFin ncqStorageRun2 :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqStorageRun2 ncq@NCQStorage2{..} = flip runContT pure do closeQ <- newTQueueIO closer <- spawnActivity $ liftIO $ fix \loop -> do what <- atomically do stop <- readTVar ncqStorageStopReq tryReadTQueue closeQ >>= \case Just e -> pure $ Just e Nothing | not stop -> STM.retry | otherwise -> pure Nothing maybe1 what none $ \(fk, fh) -> do debug $ red "CLOSE FILE" <+> pretty fk closeFd fh debug $ yellow "indexing" <+> pretty fk idx <- ncqRunTaskNoMatterWhat ncq (ncqIndexFile ncq (DataFile fk)) ncqRunTaskNoMatterWhat ncq $ ncqStateUpdate ncq [F 0 fk] debug $ "REMOVE ALL SHIT" <+> pretty idx nwayHashMMapReadOnly idx >>= \case Nothing -> err $ "can't open index" <+> pretty idx Just (bs,nway) -> do nwayHashScanAll nway bs $ \_ k _ -> do unless (k == emptyKey) $ atomically $ void $ runMaybeT do NCQEntry _ tfk <- MaybeT $ ncqLookupEntrySTM ncq (coerce k) fk' <- MaybeT $ readTVar tfk guard (fk == fk') -- remove only own stuff lift $ ncqAlterEntrySTM ncq (coerce k) (const Nothing) ncqPreloadIndexes ncq atomically (modifyTVar' ncqCurrentFiles (HS.delete fk)) loop spawnActivity $ forever (liftIO $ join $ atomically (readTQueue ncqJobQ)) let shLast = V.length ncqWriteOps - 1 spawnActivity $ pooledForConcurrentlyN_ (V.length ncqWriteOps) [0..shLast] $ \i -> do let q = ncqWriteOps ! i forever (liftIO $ join $ atomically (readTQueue q)) spawnActivity measureWPS -- FIXME: bigger-period spawnActivity $ postponed ncqPostponeSweep $ forever $ (>> pause @'Seconds 120) $ do ema <- readTVarIO ncqWriteEMA n <- ncqListStateFiles ncq <&> List.length when (ema < ncqIdleThrsh * 1.5 && n > 0) $ withSem ncqMergeSem do debug $ yellow "run sweep routine" ncqSweepStates ncq ncqSweepFossils ncq spawnActivity $ postponed ncqPostponeMerge $ fix \again -> (>> again) do ema <- readTVarIO ncqWriteEMA mergeReq <- atomically $ stateTVar ncqMergeReq (,False) debug $ green "MERGE ATTEMPT" <+> pretty ema <+> "~" <+> pretty ncqIdleThrsh let notPending x = List.length [ k | (k,e,_) <- V.toList x, isNotPending e ] if ema > ncqIdleThrsh && not mergeReq then do pause @'Seconds 10 else do mq <- newEmptyTMVarIO spawnJob $ do merged <- ncqMergeStep ncq atomically $ putTMVar mq merged -- TODO: detect-dead-merge void $ race (pause @'Seconds 300) (atomically $ readTMVar mq) >>= \case Left{} -> warn $ yellow "MERGE FUCKING STALLED" Right True -> none Right False -> do debug "merge: all done, wait..." n0 <- ncqListTrackedFiles ncq <&> notPending -- FIXME: bigger-timeout void $ race (pause @'Seconds 60) do atomically do n <- ncqListTrackedFilesSTM ncq <&> notPending when (n == n0) STM.retry ContT $ bracket none $ const $ liftIO do fhh <- atomically (STM.flushTQueue closeQ) for_ fhh ( closeFd . snd ) flip fix RunNew $ \loop -> \case RunFin -> do debug "wait finalizing" atomically $ pollSTM closer >>= maybe STM.retry (const none) debug "exit storage" RunNew -> do stop <- readTVarIO ncqStorageStopReq mt <- readTVarIO ncqWriteQ <&> Seq.null if stop && mt then do loop RunFin else do (fk,fhx) <- openNewDataFile liftIO (ncqStateUpdate ncq [P fk]) debug $ "openNewDataFile" <+> pretty fk loop $ RunWrite (fk,fhx,0,0) RunSync (fk, fh, w, total, continue) -> do stop <- readTVarIO ncqStorageStopReq sync <- readTVarIO ncqStorageSyncReq let needClose = total >= ncqMinLog || stop rest <- if not (sync || needClose || w > ncqFsync) then pure w else do appendTailSection fh >> liftIO (fileSynchronise fh) -- FIXME: slow! -- to make it appear in state, but to ignore until index is done atomically do writeTVar ncqStorageSyncReq False modifyTVar' ncqSyncNo succ pure 0 if | needClose && continue -> do atomically $ writeTQueue closeQ (fk, fh) loop RunNew | not continue -> loop RunFin | otherwise -> loop $ RunWrite (fk, fh, rest, total) RunWrite (fk, fh, w, total') -> do let timeoutMicro = 10_000_000 chunk <- liftIO $ timeout timeoutMicro $ atomically do stop <- readTVar ncqStorageStopReq sy <- readTVar ncqStorageSyncReq chunk <- stateTVar ncqWriteQ (Seq.splitAt ncqWriteBlock) if | Seq.null chunk && stop -> pure $ Left () | Seq.null chunk && not (stop || sy) -> STM.retry | otherwise -> pure $ Right chunk case chunk of Nothing -> do liftIO $ join $ readTVarIO ncqOnRunWriteIdle if w == 0 then do loop $ RunWrite (fk,fh,w,total') else do atomically $ writeTVar ncqStorageSyncReq True loop $ RunSync (fk, fh, w, total', True) -- exit () Just (Left{}) -> loop $ RunSync (fk, fh, w, total', False) -- exit () Just (Right chu) -> do ws <- for chu $ \h -> do atomically (ncqLookupEntrySTM ncq h) >>= \case Just (NCQEntry bs w) -> do atomically (writeTVar w (Just fk)) lift (appendSection fh bs) _ -> pure 0 let written = sum ws loop $ RunSync (fk, fh, w + written, total' + written, True) where emptyKey = BS.replicate ncqKeyLen 0 openNewDataFile :: forall mx . MonadIO mx => mx (FileKey, Fd) openNewDataFile = do fname <- ncqGetNewFossilName ncq atomically $ modifyTVar' ncqCurrentFiles (HS.insert (fromString fname)) touch fname let flags = defaultFileFlags { exclusive = False, creat = Just 0o666 } (fromString fname,) <$> liftIO (PosixBase.openFd fname Posix.ReadWrite flags) spawnJob = ncqSpawnJob ncq postponed n m = liftIO (pause @'Seconds n) >> m spawnActivity m = do a <- ContT $ withAsync m link a pure a measureWPS = void $ flip fix Nothing \loop -> \case Nothing -> do w <- readTVarIO ncqWrites t <- getTimeCoarse pause @'Seconds step >> loop (Just (w,t)) Just (w0,t0) -> do w1 <- readTVarIO ncqWrites t1 <- getTimeCoarse let dt = max 1e-9 (realToFrac @_ @Double (t1 - t0)) / 1e9 dw = fromIntegral (w1 - w0) atomically $ modifyTVar' ncqWriteEMA \ema -> alpha * (dw/dt) + 0.9 * ema pause @'Seconds step >> loop (Just (w1,t1)) where alpha = 0.1 step = 1.00 ncqSpawnJob :: forall m . MonadIO m => NCQStorage2 -> IO () -> m () ncqSpawnJob NCQStorage2{..} m = atomically $ writeTQueue ncqJobQ m ncqFileFastCheck :: MonadUnliftIO m => FilePath -> m () ncqFileFastCheck fp = do -- debug $ "ncqFileFastCheck" <+> pretty fp mmaped <- liftIO $ mmapFileByteString fp Nothing let size = BS.length mmaped let s = BS.drop (size - 8) mmaped & N.word64 unless ( BS.length mmaped == fromIntegral s ) do throwIO $ NCQFsckIssueExt (FsckInvalidFileSize (fromIntegral s)) ncqStorageScanDataFile :: MonadIO m => NCQStorage2 -> FilePath -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) -> m () ncqStorageScanDataFile ncq fp' action = do let fp = ncqGetFileName ncq fp' mmaped <- liftIO (mmapFileByteString fp Nothing) flip runContT pure $ callCC \exit -> do flip fix (0,mmaped) $ \next (o,bs) -> do when (BS.length bs < ncqSLen) $ exit () let w = BS.take ncqSLen bs & N.word32 & fromIntegral when (BS.length bs < ncqSLen + w) $ exit () let kv = BS.drop ncqSLen bs let k = BS.take ncqKeyLen kv & coerce @_ @HashRef let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv lift (action o (fromIntegral w) k v) next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) ncqIndexFile :: MonadUnliftIO m => NCQStorage2 -> DataFile FileKey -> m FilePath ncqIndexFile n@NCQStorage2{} fk = do let fp = toFileName fk & ncqGetFileName n let dest = toFileName (IndexFile (coerce @_ @FileKey fk)) & ncqGetFileName n debug $ "INDEX" <+> pretty fp <+> pretty dest items <- S.toList_ do ncqStorageScanDataFile n fp $ \o w k s -> case ncqIsMeta s of Just M -> none _ -> do -- we need size in order to return block size faster -- w/o search in fossil let rs = (w + ncqSLen) & fromIntegral @_ @Word32 & N.bytestring32 let os = fromIntegral @_ @Word64 o & N.bytestring64 let record = os <> rs S.yield (coerce k, record) let (dir,name) = splitFileName fp let idxTemp = (dropExtension name <> "-") `addExtension` ".cq$" result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir idxTemp items mv result dest ncqStateUpdate n [F 0 (coerce fk)] pure dest ncqAddTrackedFiles :: MonadIO m => NCQStorage2 -> [DataFile FileKey] -> m () ncqAddTrackedFiles ncq@NCQStorage2{..} files = flip runContT pure do valid <- for files \fkey -> callCC \skip -> do let fname = ncqGetFileName ncq (toFileName fkey) let idxName = ncqGetFileName ncq (toFileName (IndexFile (coerce @_ @FileKey fkey))) idxHere <- doesFileExist idxName unless idxHere do err $ "Index does not exist" <+> pretty (takeFileName idxName) skip Nothing stat <- liftIO $ PFS.getFileStatus fname let ts = posixToTimeSpec $ PFS.modificationTimeHiRes stat let fk = fromString (takeFileName fname) pure $ Just (ts, fk) atomically $ ncqAddTrackedFilesSTM ncq (catMaybes valid) ncqAddTrackedFilesSTM :: NCQStorage2 -> [(TimeSpec, FileKey)] -> STM () ncqAddTrackedFilesSTM NCQStorage2{..} newFiles = do a <- readTVar ncqTrackedFiles <&> V.toList let already = HS.fromList (map tfKey a) b <- for newFiles \(t, f) -> if f `HS.member` already then pure Nothing else do tv <- newTVar Nothing pure . Just $ TrackedFile (FilePrio (Down t)) f tv let new = V.fromList $ List.sortOn tfTime (a <> catMaybes b) writeTVar ncqTrackedFiles new {-# INLINE ncqAddTrackedFilesSTM #-} evictIfNeededSTM :: NCQStorage2 -> Maybe Int -> STM () evictIfNeededSTM me@NCQStorage2{..} howMany = do cur <- readTVar ncqCachedEntries let need = fromMaybe cur howMany excess = max 0 (cur + need - ncqMaxCached) when (excess > 0) do files <- ncqListTrackedFilesSTM me oldest <- forM (V.toList files) \case (k, Just (CachedEntry{..}) , t) -> do ts <- readTVar cachedTs pure (Just (ts, k, t)) _ -> pure Nothing let victims = oldest & catMaybes & List.sortOn (view _1) & List.take excess for_ victims $ \(_,_,t) -> do writeTVar t Nothing modifyTVar ncqCachedEntries (subtract 1) {- HLINT ignore "Functor law" -} ncqListDirFossils :: MonadIO m => NCQStorage2 -> m [FilePath] ncqListDirFossils ncq = do let wd = ncqGetWorkDir ncq dirFiles wd >>= mapM (pure . takeFileName) <&> List.filter (\f -> List.isPrefixOf "fossil-" f && List.isSuffixOf ".data" f) <&> HS.toList . HS.fromList ncqListStateFiles :: forall m . MonadIO m => NCQStorage2 -> m [(TimeSpec, StateFile)] ncqListStateFiles ncq = do let wd = ncqGetWorkDir ncq dirFiles wd >>= mapM (pure . takeBaseName) <&> List.filter (List.isPrefixOf "state-") >>= mapM timespecOf <&> fmap (over _2 fromString) . rights <&> List.sortOn Down where timespecOf x = liftIO @m $ try @_ @IOException do (,x) . posixToTimeSpec . modificationTimeHiRes <$> getFileStatus (ncqGetFileName ncq x) ncqRepair :: MonadIO m => NCQStorage2 -> m () ncqRepair me@NCQStorage2{..} = do states <- ncqListStateFiles me <&> fmap snd fossils <- flip fix states $ \next -> \case [] -> do debug $ yellow "no valid state found; start from scratch" ncqListDirFossils me <&> fmap (DataFile . fromString) (s:ss) -> tryLoadState s >>= \case Just files -> do debug $ yellow "used state" <+> pretty s atomically $ writeTVar ncqStateName (Just s) pure files Nothing -> do warn $ red "inconsistent state" <+> pretty s rm (ncqGetFileName me $ toFileName s) next ss ncqAddTrackedFiles me fossils void $ liftIO (ncqStateUpdate me mempty) where readState path = ncqReadStateKeys me path <&> fmap DataFile tryLoadState (fk :: StateFile) = liftIO do debug $ "tryLoadState" <+> pretty fk state <- readState fk let checkFile fo = flip fix 0 $ \next i -> do let dataFile = ncqGetFileName me (toFileName fo) let indexFile = ncqGetFileName me (toFileName (IndexFile (coerce @_ @FileKey fo))) here <- doesFileExist dataFile if not here then do rm indexFile pure False else do try @_ @SomeException (ncqFileFastCheck dataFile) >>= \case Left e -> do err (viaShow e) here <- doesFileExist dataFile when here do let broken = dropExtension dataFile `addExtension` ".broken" mv dataFile broken rm indexFile warn $ red "renamed" <+> pretty dataFile <+> pretty broken pure False Right{} | i > 1 -> pure False Right{} -> do exists <- doesFileExist indexFile if exists then do pure True else do debug $ "indexing" <+> pretty (toFileName fo) r <- ncqIndexFile me fo debug $ "indexed" <+> pretty r next (succ i) results <- forM state checkFile pure $ if and results then Just state else Nothing ncqRefHash :: NCQStorage2 -> HashRef -> HashRef ncqRefHash NCQStorage2 {..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) ncqRunTaskNoMatterWhat :: MonadUnliftIO m => NCQStorage2 -> m a -> m a ncqRunTaskNoMatterWhat NCQStorage2{..} task = do atomically (modifyTVar ncqStorageTasks succ) task `finally` atomically (modifyTVar ncqStorageTasks pred) ncqRunTask :: MonadUnliftIO m => NCQStorage2 -> a -> m a -> m a ncqRunTask ncq@NCQStorage2{..} def task = readTVarIO ncqStorageStopReq >>= \case True -> pure def False -> ncqRunTaskNoMatterWhat ncq task ncqWaitTasks :: MonadUnliftIO m => NCQStorage2 -> m () ncqWaitTasks NCQStorage2{..} = atomically do tno <- readTVar ncqStorageTasks when (tno > 0) STM.retry ncqStateUseSTM :: NCQStorage2 -> STM () ncqStateUseSTM NCQStorage2{..} = do k <- readTVar ncqStateVersion <&> fromIntegral modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 succ) k) ncqStateUnuseSTM :: NCQStorage2 -> STM () ncqStateUnuseSTM NCQStorage2{..} = do k <- readTVar ncqStateVersion <&> fromIntegral -- TODO: remove when n <= 0 modifyTVar' ncqStateUsage (IntMap.update (Just . over _1 pred) k) ncqStateUpdate :: MonadUnliftIO m => NCQStorage2 -> [StateOP] -> m Bool ncqStateUpdate me@NCQStorage2{..} ops' = withSem ncqStateSem $ flip runContT pure $ callCC \exit -> do debug $ "ncqStateUpdate" <+> viaShow ops' t1 <- FilePrio . Down <$> liftIO getTimeCoarse ops <- checkWithDisk $ \name -> do err $ "ncqStateUpdate invariant fail" <+> pretty name exit False changed <- atomically do current' <- readTVar ncqTrackedFiles <&> V.toList memStateVersionSTM (HS.fromList (fmap tfKey current')) let current = HM.fromList [ (tfKey, e) | e@TrackedFile{..} <- current' ] wtf <- flip fix (current, ops) $ \next (s, o) -> case o of [] -> pure s (D fk : rest) -> next (HM.delete fk s, rest) (P fk : rest) | HM.member fk current -> next (s, rest) | otherwise -> do e <- TrackedFile t1 fk <$> newTVar (Just PendingEntry) next (HM.insert fk e s, rest) (F t fk : rest) -> do case HM.lookup fk s of Nothing -> do new <- TrackedFile (FilePrio (Down t)) fk <$> newTVar Nothing next (HM.insert fk new s, rest) Just TrackedFile{..} -> do pe <- readTVar tfCached if isNotPending pe then next (s, rest) else do writeTVar tfCached Nothing next (s, rest) writeTVar ncqTrackedFiles (V.fromList $ List.sortOn tfTime (HM.elems wtf)) pure (HM.keysSet current /= HM.keysSet wtf) -- let fks = HS.fromList [ fk | F _ fk <- ops ] -- tra <- lift $ ncqListTrackedFiles me <&> filter (not . isNotPending . view _2) . V.toList -- let tra2 = [ k | (k,_,_) <- tra, HS.member k fks ] -- unless (List.null tra2) do -- err $ red "FUCKED" <+> pretty tra2 when changed $ liftIO do name <- ncqDumpCurrentState me atomically $ writeTVar ncqStateName (Just name) debug $ green "switched state" <+> pretty name -- a1 <- V.toList <$> lift (ncqListTrackedFiles me) -- let fsz = HS.fromList [ fk | F _ fk <- ops ] -- let p1 = [ fk | (fk, Just PendingEntry{}, _) <- a1, HS.member fk fsz ] -- unless (List.null p1) do -- error $ show $ "PIZDA!" <+> pretty p1 <> line <> viaShow ops' pure changed where memStateVersionSTM currentKeys = do k0 <- readTVar ncqStateVersion <&> fromIntegral let doAlter = \case Nothing -> Just (0, currentKeys) Just (u,f) -> Just (u,f) modifyTVar' ncqStateUsage (IntMap.alter doAlter k0) checkWithDisk onFail = for ops' $ \case -- f@(F _ fk) -> do let datFile = ncqGetFileName me (toFileName $ DataFile fk) e2 <- doesFileExist datFile unless e2 (onFail datFile) ts <- liftIO (getFileStatus datFile) <&> posixToTimeSpec . PFS.modificationTimeHiRes pure (F ts fk) d -> pure d ncqDumpCurrentState :: MonadUnliftIO m => NCQStorage2 -> m StateFile ncqDumpCurrentState me@NCQStorage2{..} = do files <- ncqListTrackedFiles me name <- ncqGetNewStateName me writeBinaryFileDurableAtomic name (BS8.unlines [coerce k | (k,_,_) <- V.toList files]) pure $ fromString name ncqMergeFull :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqMergeFull me = fix \next -> ncqMergeStep me >>= \case False -> none True -> next -- FIXME: sometime-causes-no-such-file-or-directory ncqMergeStep :: MonadUnliftIO m => NCQStorage2 -> m Bool ncqMergeStep ncq@NCQStorage2{..} = do withSem ncqMergeSem $ ncqRunTask ncq False do debug "ncqMergeStep" tracked <- ncqListTrackedFiles ncq files <- for tracked $ \(f,e,_) -> do let fn = ncqGetFileName ncq (toFileName $ DataFile f) let idx = ncqGetFileName ncq (toFileName $ IndexFile f) dataHere <- doesFileExist fn sz <- case e of Just PendingEntry -> pure (-100) _ | dataHere -> liftIO (fileSize fn) | otherwise -> pure (-3) idxHere <- doesFileExist idx pure (f, sz, idxHere) -- debug $ red "MERGE FILES" <+> viaShow files let bothOk (_, sz1, here1) (_, sz2, here2) = here1 && here2 && sz1 > 0 && sz2 > 0 && (sz1 + sz2) < fromIntegral ncqMaxLog found <- flip fix (V.toList files, Nothing, Nothing) $ \next -> \case ([], _, r) -> pure r (a:b:rest, Nothing, _) | bothOk a b -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) (a:b:rest, Just s, _ ) | bothOk a b && (view _2 a + view _2 b) < s -> do next (b:rest, Just (view _2 a + view _2 b), Just (a,b)) (_:rest, s, r) -> do next (rest, s, r) case found of Just (a,b) -> mergeStep a b >> pure True _ -> do debug "merge: not found shit" pure False where ncqGetNewMergeName :: MonadIO m => NCQStorage2 -> m FilePath ncqGetNewMergeName n@NCQStorage2{} = do let (p,tpl) = splitFileName (ncqGetFileName n "merge-.data") liftIO $ emptyTempFile p tpl mergeStep (a,_,_) (b,_,_) = do debug $ "merge" <+> pretty a <+> pretty b let fDataNameA = ncqGetFileName ncq $ toFileName (DataFile a) let fIndexNameA = ncqGetFileName ncq $ toFileName (IndexFile a) let fDataNameB = ncqGetFileName ncq $ toFileName (DataFile b) doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) flip runContT pure $ callCC \exit -> do mfile <- ncqGetNewMergeName ncq ContT $ bracket none $ const do rm mfile liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile) (mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA)) debug $ "SCAN FILE A" <+> pretty fDataNameA writeFiltered ncq fDataNameA fwh $ \_ _ _ v -> do let meta = Just M == ncqIsMeta v pure $ not meta debug $ "SCAN FILE B" <+> pretty fDataNameA writeFiltered ncq fDataNameB fwh $ \_ _ k v -> do let meta = Just M == ncqIsMeta v foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust let skip = foundInA || meta pure $ not skip appendTailSection =<< handleToFd fwh liftIO do result <- fileSize mfile idx <- if result == 0 then pure Nothing else do fossil <- ncqGetNewFossilName ncq mv mfile fossil statA <- getFileStatus fDataNameA let ts = modificationTimeHiRes statA setFileTimesHiRes fossil ts ts let fk = DataFile (fromString fossil) void $ ncqIndexFile ncq fk pure $ Just (ts,fk) for_ idx $ \(ts,DataFile fk) -> do void $ ncqStateUpdate ncq [D a, D b, F (posixToTimeSpec ts) fk] orFail what e = do r <- what unless r (throwIO (NCQMergeInvariantFailed (show e))) -- ncqCompact :: MonadUnliftIO m => NCQStorage2 -> m () -- ncqCompact ncq@NCQStorage2{..} = withSem ncqMergeSem do -- q <- newTVarIO ( mempty :: HashMap FileKey (HashSet HashRef) ) -- ncqLinearScanForCompact ncq $ \fk h -> atomically do -- modifyTVar q (HM.insertWith (<>) fk (HS.singleton h)) -- state0 <- readTVarIO q -- for_ (HM.toList state0) $ \(fk, es) -> do -- trace $ "TO DELETE" <+> pretty fk <+> pretty (HS.size es) -- let fDataNameA = ncqGetFileName ncq (toFileName $ DataFile fk) -- flip runContT pure do -- mfile <- ncqGetNewCompactName ncq -- ContT $ bracket none $ const $ rm mfile -- liftIO do -- withBinaryFileAtomic mfile WriteMode $ \fwh -> do -- writeFiltered ncq fDataNameA fwh $ \_ _ k v -> do -- pure $ not $ HS.member k es -- appendTailSection =<< handleToFd fwh -- result <- fileSize mfile -- if result == 0 then do -- atomically $ modifyTVar ncqTrackedFiles (HPSQ.delete fk) -- else do -- fossil <- ncqGetNewFossilName ncq -- mv mfile fossil -- statA <- getFileStatus fDataNameA -- let ts = modificationTimeHiRes statA -- setFileTimesHiRes fossil ts ts -- void $ ncqIndexFile ncq (DataFile (fromString fossil)) -- void $ ncqStateUpdate ncq [F (posixToTimeSpec ts) (fromString fossil)] -- debug $ "compact done" <+> pretty (HM.size state0) -- NOTE: incremental -- now it may became incremental if we'll -- limit amount of tombs per one pass -- then remove all dead entries, -- then call again to remove tombs. etc -- as for now, seems it should work up to 10TB -- of storage ncqLinearScanForCompact :: MonadUnliftIO m => NCQStorage2 -> ( FileKey -> HashRef -> m () ) -> m Int ncqLinearScanForCompact ncq@NCQStorage2{..} action = flip runContT pure do ContT $ useVersion ncq tracked <- readTVarIO ncqTrackedFiles <&> V.toList let state0 = mempty :: HashMap HashRef TimeSpec profit <- newTVarIO 0 tombUse <- newTVarIO (mempty :: HashMap HashRef (FileKey, Int)) -- TODO: explicit-unmap-files flip fix (tracked, state0) $ \next -> \case ([], s) -> none ((TrackedFile{..}):rest, state) -> do e <- readTVarIO tfCached let cqFile = ncqGetFileName ncq (toFileName (IndexFile tfKey)) let dataFile = ncqGetFileName ncq (toFileName (DataFile tfKey)) c <- doesFileExist cqFile d <- doesFileExist dataFile let pending = not (isNotPending e) if not c || not d || pending then next (rest, state) else do (mmaped,meta@NWayHash{..}) <- liftIO $ nwayHashMMapReadOnly cqFile >>= orThrow (NWayHashInvalidMetaData cqFile) notice $ "SCAN" <+> pretty cqFile let emptyKey = BS.replicate nwayKeySize 0 found <- S.toList_ do nwayHashScanAll meta mmaped $ \o k entryBs -> do unless (k == emptyKey) do let off = N.word64 (BS.take 8 entryBs) let sz = N.word32 (BS.take 4 (BS.drop 8 entryBs)) -- debug $ "SCAN SHIT" <+> pretty tfKey <+> pretty off <+> pretty sz -- fast-n-dirty-check-for-deleted when (sz <= ncqSLen + ncqKeyLen + ncqPrefixLen ) do debug $ red "FOUND EMPTY RECORD" <+> pretty sz S.yield off let kk = coerce k case HM.lookup kk state of Just ts | ts > timeSpecFromFilePrio tfTime -> do notice $ pretty kk <+> pretty (sz + ncqSLen) atomically do modifyTVar profit ( + (sz + ncqSLen) ) modifyTVar tombUse (HM.adjust (over _2 succ) kk) lift $ lift $ action (fromString dataFile) kk _ -> none notice "SURVIVED 2" newEntries <- S.toList_ do unless (List.null found) do notice $ red "TRY" <+> pretty dataFile dataBs <- liftIO $ mmapFileByteString dataFile Nothing notice "SURVIVED 3" for_ found $ \o -> do let pre = BS.take (fromIntegral ncqPrefixLen) (BS.drop (ncqDataOffset o) dataBs) when (pre == ncqRefPrefix || pre == ncqTombPrefix) do let keyBs = BS.take ncqKeyLen (BS.drop (fromIntegral o + ncqSLen) dataBs) let key = coerce (BS.copy keyBs) unless (HM.member key state) do S.yield (key, timeSpecFromFilePrio tfTime) when ( pre == ncqTombPrefix ) do atomically $ modifyTVar tombUse (HM.insert key (tfKey,0)) next (rest, state <> HM.fromList newEntries) use <- readTVarIO tombUse let useless = [ (f,h) | (h, (f,n)) <- HM.toList use, n == 0 ] for_ useless $ \(f,h) -> do atomically $ modifyTVar profit (+ncqFullTombLen) lift $ action f h notice "SURVIVED 3" readTVarIO profit <&> fromIntegral ncqReadStateKeys :: forall m . MonadUnliftIO m => NCQStorage2 -> StateFile -> m [FileKey] ncqReadStateKeys me path = liftIO do keys <- BS8.readFile (ncqGetFileName me (toFileName path)) <&> filter (not . BS8.null) . BS8.lines pure $ fmap (coerce @_ @FileKey) keys ncqSweepFossils :: forall m . MonadUnliftIO m => NCQStorage2 -> m () ncqSweepFossils me@NCQStorage2{..} = withSem ncqSweepSem do debug $ yellow "sweep fossils" -- better be safe than sorry current <- readTVarIO ncqCurrentFiles sfs <- ncqListStateFiles me debug $ "STATE FILES" <+> vcat (fmap pretty sfs) mentioned <- mapM (safeRead . ncqReadStateKeys @m me) (fmap snd sfs) <&> HS.fromList . mconcat kicked' <- ncqListDirFossils me <&> fmap fromString (kicked, used) <- atomically do active <- ncqListTrackedFilesSTM me <&> HS.fromList . fmap (view _1) . V.toList used' <- readTVar ncqStateUsage <&> IntMap.elems let used = current <> active <> mentioned <> HS.unions [ keys | (n, keys) <- used', n > 0 ] let k = filter (\x -> not (HS.member x used)) kicked' pure (k,HS.fromList $ HS.toList used) debug $ "KICK" <+> vcat (fmap pretty kicked) debug $ "LIVE SET" <+> vcat (fmap pretty (HS.toList used)) for_ kicked $ \fo -> do debug $ "sweep fossil file" <+> pretty fo rm (ncqGetFileName me (toFileName (IndexFile fo))) rm (ncqGetFileName me (toFileName (DataFile fo))) where safeRead m = try @_ @IOException m >>= \case Right x -> pure x Left e -> err ("ncqSweepFossils" <+> viaShow e) >> pure mempty ncqSweepStates :: MonadUnliftIO m => NCQStorage2 -> m () ncqSweepStates me@NCQStorage2{..} = withSem ncqSweepSem $ flip runContT pure do debug $ yellow "remove unused states" current' <- readTVarIO ncqStateName current <- ContT $ for_ current' debug $ yellow "CURRENT STATE NAME" <+> pretty current states <- ncqListStateFiles me <&> fmap snd flip fix (Left states) $ \next -> \case Left [] -> none Right [] -> none Left (x:xs) | x == current -> next (Right xs) | otherwise -> next (Left xs) Right (x:xs) -> do debug $ "Remove obsolete state" <+> pretty x rm (ncqGetFileName me (toFileName x)) next (Right xs) ncqSetOnRunWriteIdle :: MonadUnliftIO m => NCQStorage2 -> IO () -> m () ncqSetOnRunWriteIdle NCQStorage2{..} io = atomically (writeTVar ncqOnRunWriteIdle io) writeFiltered :: forall m . MonadIO m => NCQStorage2 -> FilePath -> Handle -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) -> m () writeFiltered ncq fn out filt = do ncqStorageScanDataFile ncq fn $ \o s k v -> do skip <- filt o s k v <&> not -- when skip do -- debug $ pretty k <+> pretty "skipped" unless skip $ liftIO do BS.hPut out (LBS.toStrict (makeEntryLBS k v)) where makeEntryLBS h bs = do let b = byteString (coerce @_ @ByteString h) <> byteString bs let wbs = toLazyByteString b let len = LBS.length wbs let ws = byteString (N.bytestring32 (fromIntegral len)) toLazyByteString (ws <> b) zeroSyncEntry :: ByteString zeroSyncEntry = ncqMakeSectionBS (Just B) zeroHash zeroPayload where zeroPayload = N.bytestring64 0 zeroHash = HashRef (hashObject zeroPayload) {-# INLINE zeroSyncEntry #-} zeroSyncEntrySize :: Word64 zeroSyncEntrySize = fromIntegral (BS.length zeroSyncEntry) {-# INLINE zeroSyncEntrySize #-} -- 1. It's M-record -- 2. It's last w64be == fileSize -- 3. It's hash == hash (bytestring64be fileSize) -- 4. recovery-strategy: start-to-end, end-to-start fileTailRecord :: Integral a => a -> ByteString fileTailRecord w = do -- on open: last w64be == fileSize let paylo = N.bytestring64 (fromIntegral w + zeroSyncEntrySize) let h = hashObject @HbSync paylo & coerce ncqMakeSectionBS (Just M) h paylo {-# INLINE fileTailRecord #-} appendSection :: forall m . MonadUnliftIO m => Fd -> ByteString -> m Int -- (FOff, Int) appendSection fh sect = do -- off <- liftIO $ fdSeek fh SeekFromEnd 0 -- pure (fromIntegral off, fromIntegral len) liftIO (Posix.fdWrite fh sect) <&> fromIntegral {-# INLINE appendSection #-} appendTailSection :: MonadIO m => Fd -> m () appendTailSection fh = liftIO do s <- Posix.fileSize <$> Posix.getFdStatus fh void (appendSection fh (fileTailRecord s)) {-# INLINE appendTailSection #-} withSem :: forall a m . MonadUnliftIO m => TSem -> m a -> m a withSem sem m = bracket enter leave (const m) where enter = atomically (waitTSem sem) leave = const $ atomically (signalTSem sem) isNotPending :: Maybe CachedEntry -> Bool isNotPending = \case Just (PendingEntry {}) -> False _ -> True