{-# Language MultiWayIf #-} {-# Language RecordWildCards #-} module HBS2.Storage.NCQ where import HBS2.Prelude.Plated import HBS2.Hash import HBS2.OrDie import HBS2.Data.Types.Refs import HBS2.Base58 import HBS2.Net.Auth.Credentials import HBS2.Storage import HBS2.Misc.PrettyStuff import HBS2.System.Logger.Simple.ANSI import HBS2.Data.Log.Structured.NCQ import HBS2.Data.Log.Structured.SD import Data.Config.Suckless.System import Data.Config.Suckless.Script hiding (void) import Codec.Compression.Zstd qualified as Zstd import Codec.Compression.Zstd.Lazy as ZstdL import Codec.Compression.Zstd.Streaming qualified as ZstdS import Codec.Compression.Zstd.Streaming (Result(..)) import Control.Applicative import Data.ByteString.Builder import Network.ByteOrder qualified as N import Data.HashMap.Strict (HashMap) import Control.Monad.Except import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Data.Ord (Down(..),comparing) import Control.Concurrent.STM qualified as STM import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) import Data.IntSet qualified as IntSet import Data.IntSet (IntSet) import Data.Sequence as Seq import Data.List qualified as List import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString (ByteString) import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as BS8 import Data.Char (isDigit) import Data.Fixed import Data.Coerce import Data.Word import Data.Either import Data.Maybe import Data.Text qualified as Text import Data.Text.IO qualified as Text import Data.Int import Lens.Micro.Platform import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.HashMap.Strict qualified as HM import System.Directory (makeAbsolute) import System.FilePath.Posix import System.Posix.Fcntl import System.Posix.Files qualified as Posix import System.Posix.IO as PosixBase import System.Posix.Types as Posix import System.Posix.IO.ByteString as Posix import System.Posix.Unistd import System.Posix.Files ( getFileStatus , modificationTimeHiRes , setFileTimesHiRes , getFdStatus , FileStatus(..) , setFileMode ) import System.Posix.Files qualified as PFS import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) -- import Foreign.Ptr -- import Foreign di import qualified Data.ByteString.Internal as BSI import Streaming.Prelude qualified as S import UnliftIO import UnliftIO.Concurrent(getNumCapabilities) import UnliftIO.IO.File import System.FileLock as FL {- HLINT ignore "Functor law" -} type NCQPerks m = MonadIO m data NCQStorageException = NCQStorageAlreadyExist String | NCQStorageSeedMissed | NCQStorageTimeout | NCQStorageCurrentAlreadyOpen | NCQStorageCantOpenCurrent | NCQStorageBrokenCurrent | NCQMergeInvariantFailed String | NCQStorageCantLock FilePath deriving stock (Show,Typeable) instance Exception NCQStorageException newtype FileKey = FileKey ByteString deriving newtype (Eq,Ord,Hashable,Show) instance IsString FileKey where fromString = FileKey . BS8.pack . dropExtension . takeFileName instance Pretty FileKey where pretty (FileKey s) = parens ("file-key" <+> pretty (BS8.unpack s)) newtype FilePrio = FilePrio (Down TimeSpec) deriving newtype (Eq,Ord) deriving stock (Generic,Show) mkFilePrio :: TimeSpec -> FilePrio mkFilePrio = FilePrio . Down data CachedEntry = CachedEntry { cachedMmapedIdx :: ByteString , cachedMmapedData :: ByteString , cachedNway :: NWayHash , cachedTs :: TVar TimeSpec } instance Show CachedEntry where show _ = "CachedEntry{...}" data WQItem = WQItem { wqNew :: Bool , wqData :: Maybe LBS.ByteString } newtype RFd = RFd { unRfd :: Fd } newtype WFd = WFd { unWfd :: Fd } data NCQStorage = NCQStorage { ncqRoot :: FilePath , ncqGen :: Int , ncqSyncSize :: Int , ncqMinLog :: Int , ncqMaxLog :: Int , ncqMaxCached :: Int , ncqSalt :: HashRef , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) , ncqIndexed :: TVar IntSet , ncqIndexNow :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqCachedEntries :: TVar Int , ncqNotWritten :: TVar Word64 , ncqLastWritten :: TVar TimeSpec , ncqCurrentFd :: TVar (Maybe (RFd,WFd)) , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) , ncqLock :: TVar FL.FileLock , ncqFsyncNum :: TVar Int , ncqFlushNow :: TVar [TQueue ()] , ncqMergeReq :: TVar Int , ncqOpenDone :: TMVar Bool , ncqStopped :: TVar Bool } -- Log structure: -- (SD)* -- S ::= word32be, section prefix -- D ::= HASH PREFIX DATA -- HASH ::= BYTESTRING(32) -- PREFIX ::= BYTESTRING(4) -- DATA ::= BYTESTRING(n) | n == S - LEN(WORD32) - LEN(HASH) - LEN(PREFIX) newtype NCQFullRecordLen a = NCQFullRecordLen a deriving newtype (Num,Enum,Integral,Real,Ord,Eq) -- including prefix ncqFullDataLen :: forall a . Integral a => NCQFullRecordLen a -> a ncqFullDataLen full = fromIntegral full - ncqKeyLen {-# INLINE ncqFullDataLen #-} ncqKeyLen :: forall a . Integral a => a ncqKeyLen = 32 {-# INLINE ncqKeyLen #-} -- 'S' in SD, i.e size, i.e section header ncqSLen:: forall a . Integral a => a ncqSLen = 4 {-# INLINE ncqSLen #-} ncqDataOffset :: forall a b . (Integral a, Integral b) => a -> b ncqDataOffset base = fromIntegral base + ncqSLen + ncqKeyLen {-# INLINE ncqDataOffset #-} instance MonadUnliftIO m => Storage NCQStorage HbSync LBS.ByteString m where putBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs enqueueBlock ncq lbs = fmap coerce <$> ncqStoragePutBlock ncq lbs getBlock ncq h = ncqStorageGetBlock ncq (coerce h) hasBlock ncq = ncqStorageHasBlock ncq . coerce delBlock ncq = ncqStorageDel ncq . coerce updateRef ncq k v = do ncqStorageSetRef ncq (HashRef $ hashObject k) (HashRef v) getRef ncq k = ncqStorageGetRef ncq (HashRef $ hashObject k) <&> fmap coerce delRef ncq k = ncqStorageDelRef ncq (HashRef $ hashObject k) getChunk ncq h off size = runMaybeT do block <- lift (ncqStorageGetBlock ncq (coerce h)) >>= toMPlus let chunk = LBS.take (fromIntegral size) $ LBS.drop (fromIntegral off) block pure chunk data Location = InWriteQueue WQItem | InCurrent (Fd,Word64, Word64) | InFossil CachedEntry (Word64, Word64) instance Pretty Location where pretty = \case InWriteQueue{} -> "write-queue" InCurrent (fd,o,l) -> pretty $ mkForm @C "current" [mkInt fd, mkInt o, mkInt l] InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l] type IsHCQKey h = ( Eq (Key h) , Hashable (Key h) , IsKey h , Key h ~ Hash h , ToByteString (AsBase58 (Hash h)) , FromByteString (AsBase58 (Hash h)) ) ncqGetCurrentName_ :: FilePath -> Int -> FilePath ncqGetCurrentName_ root gen = root show (pretty gen) "current.data" ncqGetFileName :: NCQStorage -> FilePath -> FilePath ncqGetFileName NCQStorage{..} f = ncqRoot show (pretty ncqGen) takeFileName f ncqGetCurrentName :: NCQStorage -> FilePath ncqGetCurrentName NCQStorage{..} = ncqGetCurrentName_ ncqRoot ncqGen ncqGetCurrentDir :: NCQStorage -> FilePath ncqGetCurrentDir ncq = takeDirectory (ncqGetCurrentName ncq) ncqGetCurrentSizeName_ :: FilePath -> Int -> FilePath ncqGetCurrentSizeName_ root gen = dropExtension (ncqGetCurrentName_ root gen) <> ".size" ncqGetCurrentSizeName :: NCQStorage -> FilePath ncqGetCurrentSizeName NCQStorage{..} = dropExtension (ncqGetCurrentName_ ncqRoot ncqGen) <> ".size" ncqGetNewFossilName :: MonadIO m => NCQStorage -> m FilePath ncqGetNewFossilName n@NCQStorage{} = do let fn = ncqGetFileName n "fossil-.data" let (p,tpl) = splitFileName fn liftIO $ emptyTempFile p tpl ncqGetNewMergeName :: MonadIO m => NCQStorage -> m FilePath ncqGetNewMergeName n@NCQStorage{} = do let fn = ncqGetFileName n "merge-.data" let (p,tpl) = splitFileName fn liftIO $ emptyTempFile p tpl ncqGetIndexFileName :: NCQStorage -> FileKey -> FilePath ncqGetIndexFileName ncq fk = do ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".cq") ncqGetDataFileName :: NCQStorage -> FileKey -> FilePath ncqGetDataFileName ncq fk = do ncqGetFileName ncq (addExtension (dropExtension (BS8.unpack (coerce fk))) ".data") ncqGetErrorLogName :: NCQStorage -> FilePath ncqGetErrorLogName ncq = do ncqGetFileName ncq "errors.log" ncqEmptyDataHash :: HashRef ncqEmptyDataHash = HashRef $ hashObject @HbSync (mempty :: ByteString) ncqAddCachedSTM :: TimeSpec -- ^ now -> Int -- ^ limit -> TVar (HashPSQ FileKey TimeSpec a) -- ^ entry -> FileKey -- ^ key -> a -- ^ value -> STM () ncqAddCachedSTM now limit tv k v = do cache <- readTVar tv unless (HPSQ.member k cache) do let dst = if HPSQ.size cache + 1 > limit then maybe cache (view _4) (HPSQ.minView cache) else cache writeTVar tv (HPSQ.insert k now v dst) ncqAddTrackedFilesIO :: MonadIO m => NCQStorage -> [FilePath] -> m () ncqAddTrackedFilesIO ncq fps = do tsFiles <- catMaybes <$> forM fps \fp' -> liftIO $ do catchIOError (do let fp = fromString fp' let dataFile = ncqGetDataFileName ncq fp stat <- getFileStatus dataFile let ts = modificationTimeHiRes stat pure $ Just (fp, posixToTimeSpec ts)) (\e -> do err $ "ncqAddTrackedFilesIO: failed to stat " <+> viaShow e pure Nothing) atomically $ ncqAddTrackedFilesSTM ncq tsFiles ncqAddTrackedFilesSTM :: NCQStorage -> [(FileKey, TimeSpec)] -> STM () ncqAddTrackedFilesSTM NCQStorage{..} keys = do old <- readTVar ncqTrackedFiles let new = flip fix (old, keys) \next -> \case (s, []) -> s (s, (k,ts):xs) -> next (HPSQ.insert k (mkFilePrio ts) Nothing s, xs) writeTVar ncqTrackedFiles new ncqListTrackedFiles :: MonadIO m => NCQStorage -> m [FilePath] ncqListTrackedFiles ncq = do let wd = ncqGetCurrentDir ncq dirFiles wd >>= mapM (pure . takeBaseName) <&> List.filter (List.isPrefixOf "fossil-") ncqReadTrackedFiles :: MonadIO m => NCQStorage -> m () ncqReadTrackedFiles ncq@NCQStorage{} = do files <- ncqListTrackedFiles ncq ncqAddTrackedFilesIO ncq files ncqWriteError :: (MonadIO m) => NCQStorage -> Doc AnsiStyle -> m () ncqWriteError ncq txt = liftIO do p <- getPOSIXTime <&> round @_ @Integer let msg = "error" <+> fill 12 (pretty p) <+> txt err msg let msgTxt = fromString $ show (msg <> line) Text.appendFile (ncqGetErrorLogName ncq) msgTxt ncqIndexFile :: MonadUnliftIO m => NCQStorage -> FilePath -> m FilePath ncqIndexFile n@NCQStorage{} fp' = do let fp = ncqGetFileName n fp' & takeBaseName & (`addExtension` ".cq") & ncqGetFileName n items <- S.toList_ do ncqStorageScanDataFile n fp' $ \o w k v -> do let rs = w - 32 & fromIntegral @_ @Word32 & N.bytestring32 let os = fromIntegral @_ @Word64 o & N.bytestring64 let record = os <> rs -- debug $ "write record" <+> pretty (BS.length record) S.yield (coerce k, record) let (dir,name) = splitFileName fp result <- nwayWriteBatch (nwayAllocDef 1.10 32 8 12) dir name items mv result fp pure fp ncqFsync :: MonadUnliftIO m => NCQStorage -> Fd -> m () ncqFsync NCQStorage{..} fh = liftIO do fileSynchronise fh atomically $ modifyTVar ncqFsyncNum succ ncqStorageStop :: MonadUnliftIO m => NCQStorage -> m () ncqStorageStop ncq@NCQStorage{..} = do debug "ncqStorageStop" ncqStorageSync ncq atomically $ writeTVar ncqStopped True atomically do done <- readTVar ncqWriteQueue <&> HPSQ.null unless done STM.retry debug "ncqStorageStop DONE" ncqStorageRun :: MonadUnliftIO m => NCQStorage -> m () ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do indexQ <- newTQueueIO ContT $ bracket none $ const $ liftIO do ncqFinalize ncq debug "RUNNING STORAGE!" reader <- makeReader writer <- makeWriter indexQ indexer <- makeIndexer writer indexQ merge <- makeMerge mapM_ waitCatch [writer,indexer,merge] -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] mapM_ cancel [reader] where untilStopped m = fix \loop -> do m >> readTVarIO ncqStopped >>= \case False -> loop _ -> debug "STOPPING THREAD" micropause :: forall a m . (IsTimeout a, MonadUnliftIO m) => Timeout a -> m () micropause p = do void $ race @m (pause p) $ atomically do s <- readTVar ncqStopped unless s STM.retry makeReader = do cap <- getNumCapabilities reader <- ContT $ withAsync $ untilStopped do trace "I'm READER THREAD" reqs <- atomically do xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) when (List.null xs) STM.retry pure xs for_ reqs $ \(fd,off,l,answ) -> liftIO do -- FIXME: probe-requests-count trace $ "READER: PROCEED REQUEST" <+> viaShow fd <+> pretty off atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) bs <- Posix.fdRead fd (fromIntegral l) unless (BS.length bs == fromIntegral l) do err $ "READ MISMATCH" <+> pretty l <+> pretty (BS.length bs) atomically $ putTMVar answ bs link reader pure reader makeMerge = do me <- ContT $ withAsync $ untilStopped do micropause @'Seconds 10 req <- readTVarIO ncqMergeReq when (req > 0) do debug $ "STARTED MERGE" <+> pretty req try @_ @SomeException (ncqStorageMergeStep ncq) >>= \case Right{} -> none Left e -> err ("MERGE ERROR:" <+> viaShow e) atomically $ writeTVar ncqMergeReq 0 link me pure me makeWriter indexQ = do let dumpTimeout = TimeoutSec 10 let dumpData = fromIntegral ncqSyncSize let syncData = fromIntegral ncqSyncSize writer <- ContT $ withAsync do myFlushQ <- newTQueueIO atomically $ modifyTVar ncqFlushNow (myFlushQ:) fix \next -> do liftIO $ race (pause dumpTimeout) $ atomically do flush <- isEmptyTQueue myFlushQ <&> not stop <- readTVar ncqStopped bytes <- readTVar ncqNotWritten now <- readTVar ncqIndexNow <&> (>0) if bytes > dumpData || flush || now || stop then none else STM.retry void $ atomically (STM.flushTQueue myFlushQ) liftIO $ writeJournal indexQ syncData done <- atomically $ readTVar ncqWriteQueue <&> HPSQ.null stopped <- readTVarIO ncqStopped if done && stopped then none else next link writer pure writer makeIndexer w indexQ = do indexer <- ContT $ withAsync $ fix \next -> do what' <- race (pause @'Seconds 1) $ atomically do stop <- readTVar ncqStopped q <- tryPeekTQueue indexQ if not ( stop || isJust q) then STM.retry else do STM.flushTQueue indexQ let what = fromRight mempty what' for_ what $ \(fd,fn) -> do debug $ "FUCKING WRITE INDEX" <+> pretty fn key <- ncqIndexFile ncq fn ncqAddTrackedFilesIO ncq [key] ncqLoadSomeIndexes ncq [fromString key] atomically do modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) modifyTVar ncqIndexed (IntSet.insert (fromIntegral fd)) down <- atomically do writerDown <- pollSTM w <&> isJust stopped <- readTVar ncqStopped pure (stopped && writerDown) unless down next link indexer pure indexer writeJournal indexQ syncData = ncqWithCurrent ncq $ \(RFd fdr, WFd fh) -> liftIO do trace $ "writeJournal" <+> pretty syncData fdSeek fh SeekFromEnd 0 initQ <- readTVarIO ncqWriteQueue wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of Nothing -> pure mempty Just (h,_,WQItem{..},rest) -> do -- we really have to write tomb prefix here let b = byteString (coerce @_ @ByteString h) <> lazyByteString (fromMaybe (LBS.fromStrict ncqTombPrefix) wqData) let wbs = toLazyByteString b let len = LBS.length wbs let ws = N.bytestring32 (fromIntegral len) let w = ncqSLen + len off <- fdSeek fh SeekFromEnd 0 if isNothing wqData && wqNew then pure () else void do liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) -- liftIO $ fileSynchronise fh (written',sz) <- if written < syncData then do pure (written + w,0) else do ncqFsync ncq fh fsize <- getFdStatus fh <&> PFS.fileSize pure (0,fromIntegral fsize) -- off <- fdSeek fh SeekFromEnd 0 <&> subtract (fromIntegral w) if sz < ncqMinLog then do ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) else do pure [(h, (fromIntegral off, fromIntegral len))] ncqFsync ncq fh size <- fdSeek fh SeekFromEnd 0 writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) now1 <- getTimeCoarse atomically do q0 <- readTVar ncqWriteQueue w0 <- readTVar ncqStaged <&> fromMaybe HPSQ.empty . IntMap.lookup (fromIntegral fdr) b0 <- readTVar ncqNotWritten wbytes <- newTVar 0 (rq,rw) <- flip fix (q0,w0,wResult) \next (q,w,r) -> do case r of [] -> pure (q,w) ((h,(o,l)):xs) -> do modifyTVar wbytes (+l) let recLen = ncqFullDataLen (NCQFullRecordLen l) next (HPSQ.delete h q, HPSQ.insert h now1 (o,recLen) w,xs) writeTVar ncqWriteQueue rq modifyTVar ncqStaged (IntMap.insert (fromIntegral fdr) rw) bw <- readTVar wbytes writeTVar ncqNotWritten (max 0 (b0 - bw)) indexNow <- readTVarIO ncqIndexNow when (fromIntegral size >= ncqMinLog || indexNow > 0) do fsize <- getFdStatus fdr <&> PFS.fileSize unless (fsize == 0) do (n,u) <- atomically do let r = fromIntegral fdr u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r pure (fromIntegral @_ @Word32 r, u) let current = ncqGetCurrentName ncq fossilized <- ncqGetNewFossilName ncq debug $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u mv current fossilized atomically do -- NOTE: extra-use -- добавляем лишний 1 для индексации. -- исходный файл закрываем, только когда проиндексировано. -- то есть должны отнять 1 после индексации. modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fdr) 1) writeTQueue indexQ (fdr, fossilized) writeTVar ncqIndexNow 0 closeFd fh writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) ncqOpenCurrent ncq debug $ "TRUNCATED, moved to" <+> pretty fossilized toClose <- atomically do usage <- readTVar ncqCurrentUsage staged <- readTVar ncqStaged indexed <- readTVar ncqIndexed let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage) let closable = do (f, _) <- dead guard (IntSet.member f indexed) guard (maybe True HPSQ.null (IntMap.lookup f staged)) pure f writeTVar ncqCurrentUsage (IntMap.fromList alive) writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable) writeTVar ncqStaged (foldr IntMap.delete staged closable) pure closable for_ toClose $ \f -> do debug $ "CLOSE FD" <+> pretty f closeFd (fromIntegral f) -- ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> HashRef -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePut_ check ncq@NCQStorage{..} h lbs = flip runContT pure $ callCC \exit -> do when check do lift (ncqLocate ncq h) >>= \case Nothing -> none Just loc -> do what <- lift $ ncqStorageGet_ ncq loc let tomb = maybe True ncqIsTomb what -- continue if no record found || tomb unless tomb $ exit (Just h) now <- getTimeCoarse atomically do let wqi = WQItem True (Just lbs) modifyTVar ncqWriteQueue (HPSQ.insert h now wqi) modifyTVar ncqNotWritten (+ (fromIntegral $ 4 + 32 + LBS.length lbs)) pure (Just h) ncqStoragePutBlock :: MonadUnliftIO m => NCQStorage -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePutBlock ncq lbs = ncqStoragePut_ True ncq h (LBS.fromStrict ncqBlockPrefix <> lbs) where h = HashRef (hashObject lbs) ncqIsTomb :: LBS.ByteString -> Bool ncqIsTomb lbs = do let (pre,_) = LBS.splitAt ncqPrefixLen lbs LBS.isPrefixOf "T" pre {-# INLINE ncqIsTomb #-} data HasBlockError = LocationNotFound | DataNotRead | BlockIsTomb deriving stock (Eq,Show,Typeable) instance Exception HasBlockError ncqStorageHasBlockEither :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Either HasBlockError Integer) ncqStorageHasBlockEither ncq h = runExceptT do location <- ncqLocate ncq h >>= orThrow LocationNotFound let s = ncqLocatedSize location if s > ncqPrefixLen then pure (s - ncqPrefixLen) else do what <- lift (ncqStorageGet_ ncq location) >>= orThrow DataNotRead when (ncqIsTomb what) $ throwIO BlockIsTomb pure 0 ncqStorageHasBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe Integer) ncqStorageHasBlock ncq h = runMaybeT do location <- ncqLocate ncq h >>= toMPlus let s = ncqLocatedSize location if s > ncqPrefixLen then pure (s - ncqPrefixLen) else do what <- lift (ncqStorageGet_ ncq location) >>= toMPlus guard (not $ ncqIsTomb what) pure 0 ncqStorageGetBlock :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGetBlock ncq h = do ncqStorageGet ncq h >>= \case Just lbs | not (ncqIsTomb lbs) -> pure (Just $ LBS.drop ncqPrefixLen lbs) _ -> pure Nothing data NCQSectionType = B | R | T deriving stock (Eq,Ord,Show) instance Pretty NCQSectionType where pretty = \case B -> "B" T -> "T" R -> "R" ncqPrefixLen :: Integral a => a ncqPrefixLen = 4 {-# INLINE ncqPrefixLen #-} ncqRefPrefix :: ByteString ncqRefPrefix = "R;;\x00" ncqBlockPrefix :: ByteString ncqBlockPrefix = "B;;\x00" ncqTombPrefix :: ByteString ncqTombPrefix = "T;;\x00" ncqLocatedSize :: Location -> Integer ncqLocatedSize = \case InWriteQueue WQItem{..} -> fromIntegral $ maybe 0 LBS.length wqData InCurrent (_,_,s) -> fromIntegral s InFossil _ (_,s) -> fromIntegral s -- ncqFsync :: MonadUnliftIO m => NCQStorage{..} -> FilePath evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM () evictIfNeededSTM NCQStorage{..} howMany = do cur <- readTVar ncqCachedEntries let need = fromMaybe (cur `div` 2) howMany excess = max 0 (cur + need - ncqMaxCached) when (excess > 0) do files <- readTVar ncqTrackedFiles <&> HPSQ.toList oldest <- forM files \case (k, prio, Just ce) -> do ts <- readTVar (cachedTs ce) pure (Just (ts, k, prio)) _ -> pure Nothing let victims = oldest & catMaybes & List.sortOn (\(ts,_,_) -> ts) & List.take excess for_ victims $ \(_,k,prio) -> do modifyTVar ncqTrackedFiles (HPSQ.insert k prio Nothing) modifyTVar ncqCachedEntries (subtract 1) ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do inQ <- atomically $ readTVar ncqWriteQueue <&> (fmap snd . HPSQ.lookup h) <&> \case Just wq -> Just (InWriteQueue wq) _ -> Nothing for_ inQ $ exit . Just inC <- atomically $ do s <- readTVar ncqStaged <&> IntMap.toList let found = lastMay $ catMaybes [ (fd,) <$> HPSQ.lookup h hpsq | (fd, hpsq) <- s ] case found of Just (f, (_,(off,size))) -> pure (Just (InCurrent (fromIntegral f,off,size))) Nothing -> pure Nothing for_ inC $ exit . Just now <- getTimeCoarse tracked <- readTVarIO ncqTrackedFiles for_ (HPSQ.toList tracked) $ \(fk, prio, mCached) -> do case mCached of Just ce@CachedEntry{..} -> do lookupEntry h (cachedMmapedIdx, cachedNway) <&> fmap (InFossil ce) >>= \case Just loc -> do atomically $ writeTVar cachedTs now exit (Just loc) Nothing -> pure () Nothing -> void $ runMaybeT do let indexFile = ncqGetIndexFileName ncq fk let dataFile = ncqGetDataFileName ncq fk (idxBs, idxNway) <- liftIO (nwayHashMMapReadOnly indexFile) >>= toMPlus datBs <- liftIO $ mmapFileByteString dataFile Nothing ce <- CachedEntry idxBs datBs idxNway <$> newTVarIO now e <- lookupEntry h (idxBs, idxNway) <&> fmap (InFossil ce) >>= toMPlus liftIO $ atomically do files <- readTVar ncqTrackedFiles case HPSQ.lookup fk files of Just (p, _) -> do modifyTVar ncqTrackedFiles (HPSQ.insert fk p (Just ce)) modifyTVar ncqCachedEntries (+1) evictIfNeededSTM ncq (Just 1) Nothing -> pure () lift (exit (Just e)) pure Nothing where lookupEntry (hx :: HashRef) (mmaped, nway) = runMaybeT do entryBs <- liftIO (nwayHashLookup nway mmaped (coerce hx)) >>= toMPlus pure ( fromIntegral $ N.word64 (BS.take 8 entryBs) , fromIntegral $ N.word32 (BS.take 4 (BS.drop 8 entryBs)) ) ncqStorageScanDataFile :: MonadIO m => NCQStorage -> FilePath -> ( Integer -> Integer -> HashRef -> ByteString -> m () ) -> m () ncqStorageScanDataFile ncq fp' action = do let fp = ncqGetFileName ncq fp' mmaped <- liftIO (mmapFileByteString fp Nothing) flip runContT pure $ callCC \exit -> do flip fix (0,mmaped) $ \next (o,bs) -> do when (BS.length bs < ncqSLen) $ exit () let w = BS.take ncqSLen bs & N.word32 & fromIntegral when (BS.length bs < ncqSLen + w) $ exit () let kv = BS.drop ncqSLen bs let k = BS.take ncqKeyLen kv & coerce @_ @HashRef let v = BS.take (ncqFullDataLen (NCQFullRecordLen w)) $ BS.drop ncqKeyLen kv lift (action o (fromIntegral w) k v) next (ncqSLen + o + fromIntegral w, BS.drop (w+ncqSLen) bs) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq h = runMaybeT do location <- ncqLocate ncq h >>= toMPlus lift (ncqStorageGet_ ncq location) >>= toMPlus ncqStorageGet_ :: MonadUnliftIO m => NCQStorage -> Location -> m (Maybe LBS.ByteString) ncqStorageGet_ ncq@NCQStorage{..} = \case InWriteQueue WQItem{ wqData = Just lbs } -> do pure $ Just lbs InCurrent (fd,o,l) -> do r <- atomically do a <- newEmptyTMVar modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) pure a atomically (takeTMVar r) <&> Just . LBS.fromStrict InFossil ce (o,l) -> do now <- getTimeCoarse atomically $ writeTVar (cachedTs ce) now let chunk = BS.take (fromIntegral l) (BS.drop (ncqDataOffset o) (cachedMmapedData ce)) pure $ Just $ LBS.fromStrict chunk _ -> pure Nothing {-# INLINE ncqStorageGet_ #-} ncqRefHash :: NCQStorage -> HashRef -> HashRef ncqRefHash NCQStorage{..} h = HashRef (hashObject (coerce @_ @ByteString h <> coerce ncqSalt)) ncqStorageGetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe HashRef) ncqStorageGetRef ncq ref = runMaybeT do lbs <- lift (ncqStorageGet ncq h) >>= toMPlus guard (not $ ncqIsTomb lbs) let hbs = LBS.toStrict (LBS.drop ncqPrefixLen lbs) guard (BS.length hbs == ncqKeyLen) pure $ coerce hbs where h = ncqRefHash ncq ref ncqStorageSetRef :: MonadUnliftIO m => NCQStorage -> HashRef -> HashRef -> m () ncqStorageSetRef ncq ref val = do current <- ncqStorageGetRef ncq ref unless (current == Just val) do void $ ncqStoragePut_ False ncq h (LBS.fromStrict $ ncqRefPrefix <> coerce val) where h = ncqRefHash ncq ref ncqStorageDelRef :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDelRef ncq ref = ncqStorageDel ncq h where h = ncqRefHash ncq ref ncqStorageDel :: MonadUnliftIO m => NCQStorage -> HashRef -> m () ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do readTVarIO ncqStopped >>= \case True -> exit () _ -> none now <- getTimeCoarse let writeTombstone wq = do let recordPrefixLen = ncqSLen + ncqKeyLen + ncqPrefixLen modifyTVar ncqWriteQueue (HPSQ.insert h now wq) modifyTVar ncqNotWritten (+ recordPrefixLen) ncqLocate ncq h >>= atomically . \case Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) Just (InCurrent (fd,_,_)) -> do modifyTVar ncqStaged (IntMap.adjust (HPSQ.delete h) (fromIntegral fd)) writeTombstone (WQItem False Nothing) Just (InWriteQueue _) -> writeTombstone (WQItem True Nothing) _ -> pure () ncqStorageSync :: MonadUnliftIO m => NCQStorage -> m () ncqStorageSync NCQStorage{..} = do atomically $ readTVar ncqFlushNow >>= mapM_ (`writeTQueue` ()) ncqLoadSomeIndexes :: MonadIO m => NCQStorage -> [FileKey] -> m () ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do now <- getTimeCoarse ncqAddTrackedFilesIO ncq (fmap (BS8.unpack . coerce) keys) loaded <- catMaybes <$> forM keys \key -> runMaybeT do mEntry <- liftIO $ readTVarIO ncqTrackedFiles <&> HPSQ.lookup key guard (maybe True (\(_, m) -> isNothing m) mEntry) let idxFile = ncqGetIndexFileName ncq key let datFile = ncqGetDataFileName ncq key (mmIdx, nway) <- MaybeT $ liftIO $ nwayHashMMapReadOnly idxFile mmData <- liftIO $ mmapFileByteString datFile Nothing tnow <- newTVarIO now pure (key, CachedEntry mmIdx mmData nway tnow) atomically do evictIfNeededSTM ncq (Just (List.length loaded)) for_ loaded \(k, ce) -> do files <- readTVar ncqTrackedFiles case HPSQ.lookup k files of Just (p, Nothing) -> do modifyTVar ncqTrackedFiles (HPSQ.insert k p (Just ce)) modifyTVar ncqCachedEntries (+1) _ -> pure () ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes ncq@NCQStorage{..} = do debug "WIP: ncqStorageLoadIndexes" w <- readTVarIO ncqTrackedFiles <&> List.take (ncqMaxCached `div` 2) . HPSQ.keys ncqLoadSomeIndexes ncq w ncqFixIndexes :: MonadUnliftIO m => NCQStorage -> m () ncqFixIndexes ncq@NCQStorage{..} = do debug "ncqFixIndexes" keys <- readTVarIO ncqTrackedFiles <&> HPSQ.keys for_ keys $ \k -> do let idxName = ncqGetIndexFileName ncq k here <- doesFileExist idxName unless here do warn $ "missed-index" <+> pretty k let dataName = ncqGetDataFileName ncq k newKey <- ncqIndexFile ncq dataName ncqAddTrackedFilesIO ncq [newKey] ncqStorageOpen :: MonadUnliftIO m => FilePath -> m NCQStorage ncqStorageOpen fp' = do flip fix 0 $ \next i -> do fp <- liftIO $ makeAbsolute fp' ncq@NCQStorage{..} <- ncqStorageInit_ False fp ncqReadTrackedFiles ncq ncqFixIndexes ncq ncqLoadIndexes ncq readCurrent ncq `catch` \case NCQStorageBrokenCurrent | i < 2 -> do let fn = ncqGetCurrentName ncq let msg = "broken file" <+> pretty (takeFileName fn) ncqWriteError ncq msg let (p,tpl) = splitFileName (dropExtension fn `addExtension` ".broken") newFn <- liftIO $ emptyTempFile p tpl mv fn newFn rm (ncqGetCurrentSizeName ncq) void $ next (succ i) e -> throwIO e atomically $ putTMVar ncqOpenDone True pure ncq where readCurrent ncq@NCQStorage{..} = ncqWithCurrent ncq \(RFd fd, _) -> do let fn = ncqGetCurrentName ncq -- liftIO $ print $ pretty "FILE" <+> pretty fn bs0 <- liftIO $ mmapFileByteString fn Nothing now <- getTimeCoarse items <- S.toList_ <$> flip runContT pure $ callCC \exit ->do flip fix (0,bs0) $ \next (o,bs) -> do when (BS.length bs < ncqSLen) $ exit () let w = BS.take ncqSLen bs & N.word32 & fromIntegral let p = BS.take w (BS.drop ncqSLen bs) when (BS.length p < w ) do throwIO NCQStorageBrokenCurrent let k = BS.take ncqKeyLen p & coerce . BS.copy let vs = ncqFullDataLen (NCQFullRecordLen w) lift $ S.yield (k,now, (fromIntegral o, fromIntegral vs)) next (o+w+ncqSLen, BS.drop (w+ncqSLen) bs) atomically $ modifyTVar ncqStaged (IntMap.insert (fromIntegral fd) (HPSQ.fromList items)) ncqStorageInit :: MonadUnliftIO m => FilePath -> m NCQStorage ncqStorageInit = ncqStorageInit_ True ncqOpenCurrent :: MonadUnliftIO m => NCQStorage -> m () ncqOpenCurrent ncq@NCQStorage{..} = do let fp = ncqGetCurrentName ncq touch fp let flags = defaultFileFlags { exclusive = True } fdw <- liftIO (PosixBase.openFd fp Posix.ReadWrite flags) <&> WFd fdr <- liftIO (PosixBase.openFd fp Posix.ReadOnly flags) <&> RFd atomically $ writeTVar ncqCurrentFd (Just (fdr, fdw)) ncqWithCurrent :: MonadUnliftIO m => NCQStorage -> ((RFd, WFd) -> m a) -> m a ncqWithCurrent ncq@NCQStorage{..} action = do flip fix 2 $ \next i -> do readTVarIO ncqCurrentFd >>= \case Just a -> action a Nothing | i >= 0 -> do ncqOpenCurrent ncq next (pred i) Nothing -> do throwIO NCQStorageCantOpenCurrent ncqStorageInit_ :: MonadUnliftIO m => Bool -> FilePath -> m NCQStorage ncqStorageInit_ check path = do let ncqGen = 0 let lockName = dropFileName (ncqGetCurrentName_ path ncqGen) ".lock" here <- doesPathExist path when (here && check) $ throwIO (NCQStorageAlreadyExist path) mkdir (path show ncqGen) let seedPath = path ".seed" ncqLock_ <- liftIO do mkdir (takeDirectory lockName) l <- tryLockFile lockName Exclusive >>= orThrow (NCQStorageCantLock lockName) touch lockName pure l unless here do now <- liftIO $ getPOSIXTime <&> round @_ @Int let meta = [ mkForm @C "created" [ mkInt now ] ] let metas = show $ vsep (fmap pretty meta) liftIO $ appendFile (path "metadata") metas cred0 <- newCredentials @HBS2Basic cred <- addKeyPair Nothing cred0 let seed = show $ "# storage seed file" <+> pretty now <> line <> "# NEVER EVER MODIFY OR REMOVE THIS FILE" <> line <> "# or references may be lost and recovery will be prolematic" <> line <> pretty (AsCredFile $ AsBase58 cred) liftIO do Prelude.writeFile seedPath seed PFS.setFileMode seedPath 0o0444 let ncqRoot = path let ncqSyncSize = 64 * (1024 ^ 2) let ncqMinLog = 1024 * (1024 ^ 2) let ncqMaxLog = 4 * (1024 ^ 3) let ncqMaxCached = 128 ncqSalt <- try @_ @IOException (liftIO $ BS.readFile seedPath) >>= orThrow NCQStorageSeedMissed <&> HashRef . hashObject ncqWriteQueue <- newTVarIO HPSQ.empty ncqNotWritten <- newTVarIO 0 ncqLastWritten <- getTimeCoarse >>= newTVarIO ncqStaged <- newTVarIO mempty ncqFlushNow <- newTVarIO mempty ncqOpenDone <- newEmptyTMVarIO ncqCurrentReadReq <- newTVarIO mempty ncqCurrentUsage <- newTVarIO mempty ncqStopped <- newTVarIO False ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 ncqIndexNow <- newTVarIO 0 ncqCurrentFd <- newTVarIO Nothing ncqIndexed <- newTVarIO mempty ncqMergeReq <- newTVarIO 0 ncqFsyncNum <- newTVarIO 0 ncqLock <- newTVarIO ncqLock_ let currentName = ncqGetCurrentName_ path ncqGen let currentSize = ncqGetCurrentSizeName_ path ncqGen hereCurrent <- doesPathExist currentName when hereCurrent $ liftIO do let ncq0 = NCQStorage{..} lastSz <- try @_ @IOException (BS.readFile currentSize) <&> either (const 0) N.word64 currSz <- try @_ @IOException (fileSize currentName) <&> fromRight 0 <&> fromIntegral if | currSz > lastSz -> do fossilized <- ncqGetNewFossilName ncq0 debug $ "NEW FOSSIL FILE" <+> pretty fossilized let fn = takeFileName fossilized let msg = "wrong-size" <+> pretty lastSz <+> pretty fn ncqWriteError ncq0 msg mv currentName fossilized PFS.setFileSize fossilized (fromIntegral lastSz) rm currentSize | currSz < lastSz -> do err "current log is broken, removing, data loss" ncqWriteError ncq0 $ "current log is broken, removing, data loss" none | otherwise -> none debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen) let ncq = NCQStorage{..} ncqOpenCurrent ncq pure ncq data NCQFsckException = NCQFsckException deriving stock (Show,Typeable) instance Exception NCQFsckException data NCQFsckIssueType = FsckInvalidPrefix | FsckInvalidContent | FsckInvalidFileSize deriving stock (Eq,Ord,Show,Data,Generic) data NCQFsckIssue = NCQFsckIssue FilePath Word64 NCQFsckIssueType deriving stock (Eq,Ord,Show,Data,Generic) ncqFsck :: MonadUnliftIO m => FilePath -> m [NCQFsckIssue] ncqFsck fp = do isFile <- doesFileExist fp if isFile then ncqFsckOne fp else do fs <- dirFiles fp <&> List.filter ((== ".data") . takeExtension) concat <$> mapM ncqFsckOne fs ncqFsckOne :: MonadUnliftIO m => FilePath -> m [NCQFsckIssue] ncqFsckOne fp = do mmaped <- liftIO $ mmapFileByteString fp Nothing toff <- newTVarIO 0 issuesQ <- newTQueueIO let emit :: forall m . MonadIO m => NCQFsckIssue -> m () emit = atomically . writeTQueue issuesQ handle (\(_ :: ReadLogError) -> none) do runConsumeBS mmaped do readSections $ \size bs -> do let ssz = LBS.length bs let (hash, rest1) = LBS.splitAt 32 bs & over _1 (coerce . LBS.toStrict) let (prefix, rest2) = LBS.splitAt ncqPrefixLen rest1 & over _1 LBS.toStrict let (prefixOk,pt) = if | prefix == ncqBlockPrefix -> (True, Just B) | prefix == ncqRefPrefix -> (True, Just R) | prefix == ncqTombPrefix -> (True, Just T) | otherwise -> (False, Nothing) let contentOk = case pt of Just B -> hash == hashObject @HbSync rest2 _ -> True off <- readTVarIO toff unless prefixOk $ emit (NCQFsckIssue fp off FsckInvalidPrefix) unless contentOk $ emit (NCQFsckIssue fp off FsckInvalidContent) liftIO $ atomically $ modifyTVar toff (\x -> x + 4 + fromIntegral (LBS.length bs)) debug $ pretty (takeFileName fp) <+> pretty size <+> pretty ssz <+> brackets (pretty $ maybe "E" show pt) <+> brackets (if contentOk then pretty hash else "invalid hash") lastOff <- readTVarIO toff unless (fromIntegral (BS.length mmaped) == lastOff) do emit (NCQFsckIssue fp lastOff FsckInvalidFileSize) atomically $ STM.flushTQueue issuesQ ncqStorageFlush :: MonadUnliftIO m => NCQStorage -> m () ncqStorageFlush = ncqStorageSync ncqIndexRightNow :: MonadUnliftIO m => NCQStorage -> m () ncqIndexRightNow NCQStorage{..} = atomically $ modifyTVar ncqIndexNow succ ncqFinalize :: MonadUnliftIO m => NCQStorage -> m () ncqFinalize NCQStorage{..} = do liftIO $ readTVarIO ncqStaged <&> IntMap.keys >>= mapM_ (closeFd . fromIntegral) atomically (writeTVar ncqStaged mempty) readTVarIO ncqCurrentFd >>= \case Just (RFd _, WFd wfd) -> do liftIO (closeFd wfd) atomically (writeTVar ncqCurrentFd Nothing) _ -> none liftIO $ unlockFile =<< readTVarIO ncqLock withNCQ :: forall m a . MonadUnliftIO m => (NCQStorage -> NCQStorage) -> FilePath -> (NCQStorage -> m a) -> m a withNCQ setopts p action = flip runContT pure do ncq <- lift (ncqStorageOpen p) <&> setopts writer <- ContT $ withAsync (ncqStorageRun ncq) link writer e <- lift (action ncq) lift (ncqStorageStop ncq) wait writer pure e ncqStorageMerge :: MonadUnliftIO m => NCQStorage -> m () ncqStorageMerge NCQStorage{..} = atomically $ modifyTVar ncqMergeReq succ ncqStorageMergeStep :: MonadUnliftIO m => NCQStorage -> m () ncqStorageMergeStep ncq@NCQStorage{..} = do tracked <- readTVarIO ncqTrackedFiles <&> HPSQ.toList <&> fmap (over _2 (coerce @_ @TimeSpec)) <&> List.sortOn (view _2) <&> List.take 2 for_ tracked $ \(f, t, _) -> do debug $ "FILE TO MERGE" <+> pretty (realToFrac @_ @(Fixed E6) t) <+> pretty f mergeStep (fmap (view _1) tracked) where writeFiltered :: forall m . MonadIO m => FilePath -> Handle -> ( Integer -> Integer -> HashRef -> ByteString -> m Bool) -> m () writeFiltered fn out filt = do ncqStorageScanDataFile ncq fn $ \o s k v -> do skip <- filt o s k v <&> not when skip do debug $ pretty k <+> pretty "skipped" unless skip $ liftIO do BS.hPut out (LBS.toStrict (makeEntryLBS k v)) mergeStep [] = none mergeStep [_] = none mergeStep [b,a] = do warn $ "merge" <+> pretty a <+> pretty b let fDataNameA = ncqGetDataFileName ncq a let fIndexNameA = ncqGetIndexFileName ncq a let fDataNameB = ncqGetDataFileName ncq b let fIndexNameB = ncqGetIndexFileName ncq b warn $ "file A" <+> pretty fDataNameA <+> pretty fIndexNameA warn $ "file B" <+> pretty fDataNameB <+> pretty fIndexNameB doesFileExist fDataNameA `orFail` ("not exist" <+> pretty fDataNameA) doesFileExist fDataNameB `orFail` ("not exist" <+> pretty fDataNameB) doesFileExist fIndexNameA `orFail` ("not exist" <+> pretty fIndexNameA) flip runContT pure $ callCC \exit -> do mfile <- ncqGetNewMergeName ncq ContT $ bracket none $ const do rm mfile liftIO $ withBinaryFileAtomic mfile WriteMode $ \fwh -> do debug $ "merge: okay, good to go" <+> pretty (takeFileName mfile) (mmIdx, nway) <- nwayHashMMapReadOnly fIndexNameA >>= orThrow (NCQMergeInvariantFailed (show $ "can't mmap" <+> pretty fIndexNameA)) debug $ "SCAN FILE A" <+> pretty fDataNameA writeFiltered fDataNameA fwh $ \_ _ _ v -> do pure $ not (ncqIsTomb (LBS.fromStrict v)) debug $ "SCAN FILE B" <+> pretty fDataNameA writeFiltered fDataNameB fwh $ \_ _ k v -> do let tomb = ncqIsTomb (LBS.fromStrict v) foundInA <- liftIO (nwayHashLookup nway mmIdx (coerce k)) <&> isJust let skip = tomb || foundInA pure $ not skip result <- fileSize mfile when (result == 0) $ exit () liftIO do fossil <- ncqGetNewFossilName ncq mv mfile fossil statA <- getFileStatus fDataNameA let ts = modificationTimeHiRes statA setFileTimesHiRes fossil ts ts fname <- ncqIndexFile ncq fossil atomically do let fp = fromString fname modifyTVar ncqTrackedFiles (HPSQ.delete a) modifyTVar ncqTrackedFiles (HPSQ.delete b) ncqAddTrackedFilesSTM ncq [(fp, posixToTimeSpec ts)] mapM_ rm [fDataNameA, fDataNameB, fIndexNameB, fIndexNameA] mergeStep _ = do mergeError "assertion failed: more than 2 files to merge" mergeError d = throwIO (NCQMergeInvariantFailed (show d)) orFail what e = do r <- what unless r (throwIO (NCQMergeInvariantFailed (show e))) makeEntryLBS h bs = do let b = byteString (coerce @_ @ByteString h) <> byteString bs let wbs = toLazyByteString b let len = LBS.length wbs let ws = byteString (N.bytestring32 (fromIntegral len)) toLazyByteString (ws <> b) posixToTimeSpec :: POSIXTime -> TimeSpec posixToTimeSpec pt = let (s, frac) = properFraction pt :: (Integer, POSIXTime) ns = round (frac * 1e9) in TimeSpec (fromIntegral s) ns