This commit is contained in:
Dmitry Zuykov 2025-05-13 07:13:22 +03:00
parent efe2a2cda9
commit bb9df9c25b
2 changed files with 62 additions and 67 deletions

View File

@ -192,11 +192,6 @@ ncqGetErrorLogName :: NCQStorage -> FilePath
ncqGetErrorLogName ncq = do ncqGetErrorLogName ncq = do
ncqGetFileName ncq "errors.log" ncqGetFileName ncq "errors.log"
ncqGetDeletedFileName :: NCQStorage -> FilePath
ncqGetDeletedFileName ncq = do
ncqGetFileName ncq "deleted.data"
ncqEmptyDataHash :: HashRef ncqEmptyDataHash :: HashRef
ncqEmptyDataHash = HashRef $ hashObject @HbSync (mempty :: ByteString) ncqEmptyDataHash = HashRef $ hashObject @HbSync (mempty :: ByteString)
@ -323,16 +318,20 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do
cap <- getNumCapabilities cap <- getNumCapabilities
reader <- ContT $ withAsync $ untilStopped do reader <- ContT $ withAsync $ untilStopped do
reqs <- atomically do debug "I'm READER THREAD"
xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap)
when (List.null xs) STM.retry
pure xs
for_ reqs $ \(fd,off,l,answ) -> liftIO do reqs <- atomically do
atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) xs <- stateTVar ncqCurrentReadReq (Seq.splitAt cap)
fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off) when (List.null xs) STM.retry
bs <- Posix.fdRead fd (fromIntegral l) pure xs
atomically $ putTMVar answ bs
for_ reqs $ \(fd,off,l,answ) -> liftIO do
debug $ "READER: PROCEED REQUEST" <+> viaShow fd <+> pretty off
atomically $ modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd))
fdSeek fd AbsoluteSeek (fromIntegral $ 4 + 32 + off)
bs <- Posix.fdRead fd (fromIntegral l)
atomically $ putTMVar answ bs
link reader link reader
pure reader pure reader
@ -714,18 +713,20 @@ ncqStorageScanDataFile ncq fp' action = do
ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString) ncqStorageGet :: MonadUnliftIO m => NCQStorage -> HashRef -> m (Maybe LBS.ByteString)
ncqStorageGet ncq@NCQStorage{..} h = do ncqStorageGet ncq@NCQStorage{..} h = do
mloc <- ncqLocate ncq h location <- ncqLocate ncq h
ncqCheckDeleted h mloc \case ncqCheckDeleted h location \case
InWriteQueue lbs -> InWriteQueue lbs ->
pure $ Just lbs pure $ Just lbs
InCurrent (o,l) -> atomically do InCurrent (o,l) -> do
a <- newEmptyTMVar r <- atomically do
fd <- readTVar ncqCurrentHandleR a <- newEmptyTMVar
modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) fd <- readTVar ncqCurrentHandleR
modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1)
Just . LBS.fromStrict <$> takeTMVar a modifyTVar ncqCurrentReadReq (|> (fd, o, l, a))
pure a
atomically (takeTMVar r) <&> Just . LBS.fromStrict
InFossil ce (o,l) -> do InFossil ce (o,l) -> do
now <- getTimeCoarse now <- getTimeCoarse
@ -801,8 +802,6 @@ ncqLoadSomeIndexes ncq@NCQStorage{..} keys = do
_ -> pure () _ -> pure ()
ncqLoadIndexes :: MonadIO m => NCQStorage -> m () ncqLoadIndexes :: MonadIO m => NCQStorage -> m ()
ncqLoadIndexes ncq@NCQStorage{..} = do ncqLoadIndexes ncq@NCQStorage{..} = do
debug "WIP: ncqStorageLoadIndexes" debug "WIP: ncqStorageLoadIndexes"
@ -923,7 +922,6 @@ ncqStorageInit_ check path = do
ncqStopped <- newTVarIO False ncqStopped <- newTVarIO False
ncqTrackedFiles <- newTVarIO HPSQ.empty ncqTrackedFiles <- newTVarIO HPSQ.empty
ncqCachedEntries <- newTVarIO 0 ncqCachedEntries <- newTVarIO 0
ncqSeqNo <- newTVarIO 1
let currentName = ncqGetCurrentName_ path ncqGen let currentName = ncqGetCurrentName_ path ncqGen
@ -967,8 +965,19 @@ ncqStorageInit_ check path = do
let ncq = NCQStorage{..} let ncq = NCQStorage{..}
touch (ncqGetRefsDataFileName ncq) touch (ncqGetRefsDataFileName ncq)
touch (ncqGetDeletedFileName ncq)
pure ncq pure ncq
withNCQ :: forall m a . MonadUnliftIO m
=> (NCQStorage -> NCQStorage)
-> FilePath
-> (NCQStorage -> m a)
-> m a
withNCQ setopts p action = flip runContT pure do
ncq <- lift (ncqStorageOpen p) <&> setopts
writer <- ContT $ withAsync (ncqStorageRun ncq)
link writer
e <- lift (action ncq)
lift (ncqStorageStop ncq)
pure e

View File

@ -55,11 +55,14 @@ import System.Posix.Fcntl
import System.Posix.IO import System.Posix.IO
import System.IO.MMap import System.IO.MMap
import System.IO qualified as IO import System.IO qualified as IO
import System.Exit (exitSuccess, exitFailure)
import System.Random import System.Random
import Safe import Safe
import Lens.Micro.Platform import Lens.Micro.Platform
import Control.Concurrent.STM qualified as STM import Control.Concurrent.STM qualified as STM
import UnliftIO import UnliftIO
import Text.InterpolatedString.Perl6 (qc) import Text.InterpolatedString.Perl6 (qc)
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
@ -247,20 +250,11 @@ main = do
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw:get" $ \case entry $ bindMatch "test:ncq:raw:get:stdout" $ nil_ \case
[StringLike fn, HashLike h] -> liftIO $ flip runContT pure do [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do
w <- ncqStorageGet ncq h
ncq <- lift $ ncqStorageOpen fn maybe1 w exitFailure LBS.putStr
writer <- ContT $ withAsync $ ncqStorageRun ncq
link writer
lift do
ncqStorageGet ncq h >>= \case
Nothing -> pure nil
Just bs -> do
-- debug $ "GET" <+> pretty (LBS.length bs) <+> pretty (hashObject @HbSync bs)
mkOpaque bs
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
@ -371,43 +365,28 @@ main = do
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw:locate" $ nil_ \case entry $ bindMatch "test:ncq:raw:locate:one" $ nil_ \case
[StringLike fn] -> liftIO $ flip runContT pure do [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do
hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines ncqLocate ncq h >>= \case
Nothing -> print $ pretty "not-found" <+> pretty h
ncq <- lift $ ncqStorageOpen fn Just l -> print $ pretty "found" <+> pretty h <+> pretty l
writer <- ContT $ withAsync $ ncqStorageRun ncq
link writer
timeItNamed (show $ "lookup" <+> pretty (List.length hashes)) do
for_ hashes $ \h -> liftIO do
ncqLocate ncq h >>= \case
Nothing -> print $ pretty "not-found" <+> pretty h
Just l -> print $ pretty "found" <+> pretty h <+> pretty l
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw:put" $ \case entry $ bindMatch "test:ncq:raw:put:stdin" $ \case
[StringLike fn] -> liftIO $ flip runContT pure do [StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do
what <- liftIO BS.getContents what <- liftIO BS.getContents
ncq <- lift $ ncqStorageOpen fn
writer <- ContT $ withAsync $ ncqStorageRun ncq
link writer
href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what) href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what)
liftIO $ ncqStorageStop ncq
wait writer
pure $ maybe nil (mkSym . show . pretty) href pure $ maybe nil (mkSym . show . pretty) href
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw:get" $ nil_ \case
[StringLike fn, HashLike href] -> lift $ withNCQ id fn $ \ncq -> do
mbs <- ncqStorageGet ncq href
maybe1 mbs exitFailure LBS.putStr
e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case
[StringLike fn, StringLike what] -> liftIO $ flip runContT pure do [StringLike fn, StringLike what] -> liftIO $ flip runContT pure do
@ -530,6 +509,13 @@ main = do
e -> throwIO $ BadFormException @C (mkList e) e -> throwIO $ BadFormException @C (mkList e)
entry $ bindMatch "test:ncq:run" $ nil_ \case
[StringLike p] -> lift do
withNCQ id p $ \_ -> do
display_ $ "hello from ncq" <+> pretty p
e -> throwIO $ BadFormException @C (mkList e)
setupLogger setupLogger