From bb9df9c25b1c6cf9f1f5e3386b85578dea34a9ad Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Tue, 13 May 2025 07:13:22 +0300 Subject: [PATCH] wip --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 63 ++++++++++++---------- hbs2-tests/test/TestCQ.hs | 66 ++++++++++-------------- 2 files changed, 62 insertions(+), 67 deletions(-) diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index ef9ab28e..9429300e 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -192,11 +192,6 @@ ncqGetErrorLogName :: NCQStorage -> FilePath ncqGetErrorLogName ncq = do ncqGetFileName ncq "errors.log" -ncqGetDeletedFileName :: NCQStorage -> FilePath -ncqGetDeletedFileName ncq = do - ncqGetFileName ncq "deleted.data" - - ncqEmptyDataHash :: HashRef ncqEmptyDataHash = HashRef $ hashObject @HbSync (mempty :: ByteString) @@ -323,16 +318,20 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do cap <- getNumCapabilities reader <- ContT $ withAsync $ untilStopped do - reqs <- atomically do - xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) - when (List.null xs) STM.retry - pure xs + debug "I'm READER THREAD" - for_ reqs $ \(fd,off,l,answ) -> liftIO do - atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) - fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) - bs <- Posix.fdRead fd (fromIntegral l) - atomically $ putTMVar answ bs + reqs <- atomically do + xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap) + when (List.null xs) STM.retry + pure xs + + + for_ reqs $ \(fd,off,l,answ) -> liftIO do + debug $ "READER: PROCEED REQUEST" <+> viaShow fd <+> pretty off + atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) + fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) + bs <- Posix.fdRead fd (fromIntegral l) + atomically $ putTMVar answ bs link reader pure reader @@ -714,18 +713,20 @@ ncqStorageScanDataFile ncq fp' action = do ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet ncq@NCQStorage{..} h = do - mloc <- ncqLocate ncq h - ncqCheckDeleted h mloc \case - + location <- ncqLocate ncq h + ncqCheckDeleted h location \case InWriteQueue lbs -> pure $ Just lbs - InCurrent (o,l) -> atomically do - a <- newEmptyTMVar - fd <- readTVar ncqCurrentHandleR - modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) - modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) - Just . LBS.fromStrict <$> takeTMVar a + InCurrent (o,l) -> do + r <- atomically do + a <- newEmptyTMVar + fd <- readTVar ncqCurrentHandleR + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) + modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) + pure a + + atomically (takeTMVar r) <&> Just . LBS.fromStrict InFossil ce (o,l) -> do now <- getTimeCoarse @@ -801,8 +802,6 @@ ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do _ -> pure () - - ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes ncq@NCQStorage{..} = do debug "WIP: ncqStorageLoadIndexes" @@ -923,7 +922,6 @@ ncqStorageInit_ check path = do ncqStopped <- newTVarIO False ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 - ncqSeqNo <- newTVarIO 1 let currentName = ncqGetCurrentName_ path ncqGen @@ -967,8 +965,19 @@ ncqStorageInit_ check path = do let ncq = NCQStorage{..} touch (ncqGetRefsDataFileName ncq) - touch (ncqGetDeletedFileName ncq) pure ncq +withNCQ :: forall m a . MonadUnliftIO m + => (NCQStorage -> NCQStorage) + -> FilePath + -> (NCQStorage -> m a) + -> m a +withNCQ setopts p action = flip runContT pure do + ncq <- lift (ncqStorageOpen p) <&> setopts + writer <- ContT $ withAsync (ncqStorageRun ncq) + link writer + e <- lift (action ncq) + lift (ncqStorageStop ncq) + pure e diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs index 6b480a01..9a0007c8 100644 --- a/hbs2-tests/test/TestCQ.hs +++ b/hbs2-tests/test/TestCQ.hs @@ -55,11 +55,14 @@ import System.Posix.Fcntl import System.Posix.IO import System.IO.MMap import System.IO qualified as IO +import System.Exit (exitSuccess, exitFailure) import System.Random import Safe import Lens.Micro.Platform import Control.Concurrent.STM qualified as STM + import UnliftIO + import Text.InterpolatedString.Perl6 (qc) import Streaming.Prelude qualified as S @@ -247,20 +250,11 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:raw:get" $ \case + entry $ bindMatch "test:ncq:raw:get:stdout" $ nil_ \case - [StringLike fn, HashLike h] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen fn - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - lift do - ncqStorageGet ncq h >>= \case - Nothing -> pure nil - Just bs -> do - -- debug $ "GET" <+> pretty (LBS.length bs) <+> pretty (hashObject @HbSync bs) - mkOpaque bs + [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do + w <- ncqStorageGet ncq h + maybe1 w exitFailure LBS.putStr e -> throwIO $ BadFormException @C (mkList e) @@ -371,43 +365,28 @@ main = do e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:raw:locate" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - timeItNamed (show $ "lookup" <+> pretty (List.length hashes)) do - for_ hashes $ \h -> liftIO do - ncqLocate ncq h >>= \case - Nothing -> print $ pretty "not-found" <+> pretty h - Just l -> print $ pretty "found" <+> pretty h <+> pretty l + entry $ bindMatch "test:ncq:raw:locate:one" $ nil_ \case + [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do + ncqLocate ncq h >>= \case + Nothing -> print $ pretty "not-found" <+> pretty h + Just l -> print $ pretty "found" <+> pretty h <+> pretty l e -> throwIO $ BadFormException @C (mkList e) - entry $ bindMatch "test:ncq:raw:put" $ \case - [StringLike fn] -> liftIO $ flip runContT pure do - + entry $ bindMatch "test:ncq:raw:put:stdin" $ \case + [StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do what <- liftIO BS.getContents - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what) - - liftIO $ ncqStorageStop ncq - wait writer - pure $ maybe nil (mkSym . show . pretty) href e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:raw:get" $ nil_ \case + [StringLike fn, HashLike href] -> lift $ withNCQ id fn $ \ncq -> do + mbs <- ncqStorageGet ncq href + maybe1 mbs exitFailure LBS.putStr + e -> throwIO $ BadFormException @C (mkList e) entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case [StringLike fn, StringLike what] -> liftIO $ flip runContT pure do @@ -530,6 +509,13 @@ main = do e -> throwIO $ BadFormException @C (mkList e) + entry $ bindMatch "test:ncq:run" $ nil_ \case + [StringLike p] -> lift do + withNCQ id p $ \_ -> do + display_ $ "hello from ncq" <+> pretty p + + e -> throwIO $ BadFormException @C (mkList e) + setupLogger