From 77a0052ffb0df4cde9ac69a481295a623d41613e Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Tue, 13 May 2025 11:41:41 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 152 ++++++++++++----------- hbs2-tests/test/TCQ.hs | 18 +++ 2 files changed, 98 insertions(+), 72 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 0e82cd23..b7d88356 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -53,7 +53,8 @@ import System.Posix.IO as PosixBase import System.Posix.Types as Posix import System.Posix.IO.ByteString as Posix import System.Posix.Unistd -import System.Posix.Files (getFileStatus, modificationTimeHiRes) +import System.Posix.Files (getFileStatus, modificationTimeHiRes, getFdStatus, FileStatus(..)) +import System.Posix.Files qualified as PFS import System.IO.Error (catchIOError) import System.IO.MMap as MMap import System.IO.Temp (emptyTempFile) @@ -145,7 +146,7 @@ instance Pretty Location where pretty = \case InWriteQueue{} -> "write-queue" InCurrent (o,l) -> pretty $ mkForm @C "current" [mkInt o, mkInt l] - InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkList [mkInt o, mkInt l]] + InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l] type IsHCQKey h = ( Eq (Key h) , Hashable (Key h) @@ -307,8 +308,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do refsWriter <- makeRefsWriter reader <- makeReader - indexer <- makeIndexer indexQ writer <- makeWriter indexQ + indexer <- makeIndexer writer indexQ mapM_ waitCatch [writer,indexer,refsWriter] -- mapM_ waitCatch [writer,indexer,refsWriter] -- ,indexer,refsWriter] @@ -361,7 +362,8 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do flush <- isEmptyTQueue myFlushQ <&> not stop <- readTVar ncqStopped bytes <- readTVar ncqNotWritten - if bytes > dumpData || flush || stop then none else STM.retry + now <- readTVar ncqIndexNow <&> (>0) + if bytes > dumpData || flush || now || stop then none else STM.retry void $ atomically (STM.flushTQueue myFlushQ) @@ -410,18 +412,16 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do pure refsWriter - makeIndexer indexQ = do - indexer <- ContT $ withAsync $ untilStopped do - - debug $ "STARTED INDEXER" + makeIndexer w indexQ = do + indexer <- ContT $ withAsync $ fix \next -> do what' <- race (pause @'Seconds 1) $ atomically do - stop <- readTVar ncqStopped - q <- tryPeekTQueue indexQ - if not (stop || isJust q) then - STM.retry - else do - STM.flushTQueue indexQ + stop <- readTVar ncqStopped + q <- tryPeekTQueue indexQ + if not ( stop || isJust q) then + STM.retry + else do + STM.flushTQueue indexQ let what = fromRight mempty what' @@ -431,17 +431,19 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do (key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList - -- atomically do - -- r <- readTVar ncqWaitIndex <&> HPSQ.toList - -- let new = [(k,p,v) | (k,p,v) <- r, not (k `HS.member` added)] - -- writeTVar ncqWaitIndex (HPSQ.fromList new) - ncqAddTrackedFilesIO ncq [key] atomically do modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) ncqLoadSomeIndexes ncq [fromString key] + down <- atomically do + writerDown <- pollSTM w <&> isJust + stopped <- readTVar ncqStopped + pure (stopped && writerDown) + + unless down next + link indexer pure indexer @@ -456,29 +458,29 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do initQ <- readTVarIO ncqWriteQueue wResult <- flip fix (0,initQ) \next (written,q) -> case HPSQ.minView q of - Nothing -> pure mempty - Just (h,_,WQItem{..},rest) -> do + Nothing -> pure mempty + Just (h,_,WQItem{..},rest) -> do - off <- fdSeek fh SeekFromEnd 0 - let b = byteString (coerce @_ @ByteString h) - <> lazyByteString (fromMaybe mempty wqData) - let wbs = toLazyByteString b - let len = LBS.length wbs - let ws = N.bytestring32 (fromIntegral len) - let w = 4 + len + off <- fdSeek fh SeekFromEnd 0 + let b = byteString (coerce @_ @ByteString h) + <> lazyByteString (fromMaybe mempty wqData) + let wbs = toLazyByteString b + let len = LBS.length wbs + let ws = N.bytestring32 (fromIntegral len) + let w = 4 + len - if isNothing wqData && wqNew then - pure () - else void do - liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) + if isNothing wqData && wqNew then + pure () + else void do + liftIO (Posix.fdWrite fh (ws <> LBS.toStrict wbs)) - written' <- if written < syncData then do - pure (written + w) - else do - fileSynchronise fh - pure 0 + written' <- if written < syncData then do + pure (written + w) + else do + fileSynchronise fh + pure 0 - ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) + ((h, (fromIntegral off, fromIntegral len)) : ) <$> next (written', rest) fileSynchronise fh size <- fdSeek fh SeekFromEnd 0 @@ -509,52 +511,58 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do when (fromIntegral size >= ncqMinLog || indexNow > 0) do - (n,u) <- atomically do - r <- readTVar ncqCurrentHandleR <&> fromIntegral - u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r - pure (fromIntegral @_ @Word32 r, u) + fsize <- readTVarIO ncqCurrentHandleR + >>= getFdStatus + <&> PFS.fileSize - let current = ncqGetCurrentName ncq + unless (fsize == 0) do - fossilized <- ncqGetNewFossilName ncq + (n,u) <- atomically do + r <- readTVar ncqCurrentHandleR <&> fromIntegral + u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r + pure (fromIntegral @_ @Word32 r, u) - warn $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u + let current = ncqGetCurrentName ncq - mv current fossilized + fossilized <- ncqGetNewFossilName ncq - atomically do - writeTVar ncqIndexNow 0 - r <- readTVar ncqCurrentHandleR - -- NOTE: extra-use - -- добавляем лишний 1 для индексации. - -- исходный файл закрываем, только когда проиндексировано. - -- то есть должны отнять 1 после индексации. - modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1) - writeTQueue indexQ (r, fossilized) + debug $ "NEED TRUNCATE" <+> pretty current <+> viaShow size <+> pretty n <+> pretty u - let flags = defaultFileFlags { exclusive = True } + mv current fossilized - touch current - writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) + atomically do + writeTVar ncqIndexNow 0 + r <- readTVar ncqCurrentHandleR + -- NOTE: extra-use + -- добавляем лишний 1 для индексации. + -- исходный файл закрываем, только когда проиндексировано. + -- то есть должны отнять 1 после индексации. + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1) + writeTQueue indexQ (r, fossilized) - liftIO (PosixBase.openFd current Posix.ReadWrite flags) - >>= atomically . writeTVar ncqCurrentHandleW + let flags = defaultFileFlags { exclusive = True } - liftIO (PosixBase.openFd current Posix.ReadWrite flags) - >>= atomically . writeTVar ncqCurrentHandleR + touch current + writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) - debug $ "TRUNCATED, moved to" <+> pretty fossilized + liftIO (PosixBase.openFd current Posix.ReadWrite flags) + >>= atomically . writeTVar ncqCurrentHandleW - toClose <- atomically do - w <- readTVar ncqCurrentUsage <&> IntMap.toList - let (alive,dead) = List.partition( (>0) . snd) w - writeTVar ncqCurrentUsage (IntMap.fromList alive) - pure dead + liftIO (PosixBase.openFd current Posix.ReadWrite flags) + >>= atomically . writeTVar ncqCurrentHandleR - for_ toClose $ \(f,_) -> do - when (f > 0) do - debug $ "CLOSE FD" <+> pretty f - Posix.closeFd (fromIntegral f) + debug $ "TRUNCATED, moved to" <+> pretty fossilized + + toClose <- atomically do + w <- readTVar ncqCurrentUsage <&> IntMap.toList + let (alive,dead) = List.partition( (>0) . snd) w + writeTVar ncqCurrentUsage (IntMap.fromList alive) + pure dead + + for_ toClose $ \(f,_) -> do + when (f > 0) do + debug $ "CLOSE FD" <+> pretty f + Posix.closeFd (fromIntegral f) ncqStoragePut_ :: MonadUnliftIO m => Bool -> NCQStorage -> LBS.ByteString -> m (Maybe HashRef) ncqStoragePut_ check ncq@NCQStorage{..} lbs = flip runContT pure $ callCC \exit -> do diff --git a/hbs2-tests/test/TCQ.hs b/hbs2-tests/test/TCQ.hs index 70b2ec8b..d0ae01fb 100644 --- a/hbs2-tests/test/TCQ.hs +++ b/hbs2-tests/test/TCQ.hs @@ -182,6 +182,24 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:fossilize" $ nil_ \case + [ isOpaqueOf @TCQ -> Just tcq ] -> lift do + ncq <- getNCQ tcq + ncqIndexRightNow ncq + + e -> throwIO $ BadFormException @C (mkList e) + + entry $ bindMatch "ncq:locate" $ \case + [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do + ncq <- getNCQ tcq + ncqLocate ncq hash >>= \case + Just x -> do + parseSyntax (show $ pretty x) & either (error.show) pure + + _ -> pure nil + + e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "ncq:get" $ \case [ isOpaqueOf @TCQ -> Just tcq, HashLike hash ] -> lift do ncq <- getNCQ tcq