From 90b9204e58fe7124513d1f30822ff0f32773cf8d Mon Sep 17 00:00:00 2001 From: Dmitry Zuykov Date: Fri, 16 May 2025 05:52:43 +0300 Subject: [PATCH] wip, ncq fixed races --- hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs | 184 +++++--- hbs2-tests/hbs2-tests.cabal | 21 - hbs2-tests/test/TestCQ.hs | 556 ----------------------- 3 files changed, 111 insertions(+), 650 deletions(-) delete mode 100644 hbs2-tests/test/TestCQ.hs diff --git a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs index 29d61526..d4c68180 100644 --- a/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs +++ b/hbs2-storage-ncq/lib/HBS2/Storage/NCQ.hs @@ -34,6 +34,8 @@ import Data.HashPSQ qualified as HPSQ import Data.HashPSQ (HashPSQ) import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) +import Data.IntSet qualified as IntSet +import Data.IntSet (IntSet) import Data.Sequence as Seq import Data.List qualified as List import Data.ByteString.Lazy qualified as LBS @@ -88,6 +90,8 @@ data NCQStorageException = NCQStorageAlreadyExist String | NCQStorageSeedMissed | NCQStorageTimeout + | NCQStorageCurrentAlreadyOpen + | NCQStorageCantOpenCurrent deriving stock (Show,Typeable) instance Exception NCQStorageException @@ -124,6 +128,10 @@ data WQItem = , wqData :: Maybe LBS.ByteString } +newtype RFd = RFd { unRfd :: Fd } + +newtype WFd = WFd { unWfd :: Fd } + data NCQStorage = NCQStorage { ncqRoot :: FilePath @@ -134,14 +142,14 @@ data NCQStorage = , ncqMaxCached :: Int , ncqSalt :: HashRef , ncqWriteQueue :: TVar (HashPSQ HashRef TimeSpec WQItem) - , ncqWaitIndex :: TVar (HashPSQ HashRef TimeSpec (Word64,Word64)) + , ncqStaged :: TVar (IntMap (HashPSQ HashRef TimeSpec (Word64,Word64))) + , ncqIndexed :: TVar IntSet , ncqIndexNow :: TVar Int , ncqTrackedFiles :: TVar (HashPSQ FileKey FilePrio (Maybe CachedEntry)) , ncqCachedEntries :: TVar Int , ncqNotWritten :: TVar Word64 , ncqLastWritten :: TVar TimeSpec - , ncqCurrentHandleW :: TVar Fd - , ncqCurrentHandleR :: TVar Fd + , ncqCurrentFd :: TVar (Maybe (RFd,WFd)) , ncqCurrentUsage :: TVar (IntMap Int) , ncqCurrentReadReq :: TVar (Seq (Fd, Word64, Word64, TMVar ByteString)) , ncqFlushNow :: TVar [TQueue ()] @@ -152,14 +160,14 @@ data NCQStorage = data Location = InWriteQueue WQItem - | InCurrent (Word64, Word64) + | InCurrent (Fd,Word64, Word64) | InFossil CachedEntry (Word64, Word64) instance Pretty Location where pretty = \case - InWriteQueue{} -> "write-queue" - InCurrent (o,l) -> pretty $ mkForm @C "current" [mkInt o, mkInt l] - InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l] + InWriteQueue{} -> "write-queue" + InCurrent (fd,o,l) -> pretty $ mkForm @C "current" [mkInt fd, mkInt o, mkInt l] + InFossil _ (o,l) -> pretty $ mkForm @C "fossil " [mkInt o, mkInt l] type IsHCQKey h = ( Eq (Key h) , Hashable (Key h) @@ -311,8 +319,7 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do indexQ <- newTQueueIO ContT $ bracket none $ const $ liftIO do - -- writeJournal syncData - readTVarIO ncqCurrentHandleW >>= closeFd + ncqFinalize ncq debug "RUNNING STORAGE!" @@ -407,10 +414,11 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do (key, _) <- ncqIndexFile ncq fn <&> over _2 HS.fromList ncqAddTrackedFilesIO ncq [key] + ncqLoadSomeIndexes ncq [fromString key] + atomically do modifyTVar ncqCurrentUsage (IntMap.adjust pred (fromIntegral fd)) - - ncqLoadSomeIndexes ncq [fromString key] + modifyTVar ncqIndexed (IntSet.insert (fromIntegral fd)) down <- atomically do writerDown <- pollSTM w <&> isJust @@ -422,12 +430,10 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do link indexer pure indexer - writeJournal indexQ syncData = liftIO do + writeJournal indexQ syncData = ncqWithCurrent ncq $ \(RFd fdr, WFd fh) -> liftIO do trace $ "writeJournal" <+> pretty syncData - fh <- readTVarIO ncqCurrentHandleW - fdSeek fh SeekFromEnd 0 initQ <- readTVarIO ncqWriteQueue @@ -462,11 +468,12 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do fileSynchronise fh size <- fdSeek fh SeekFromEnd 0 + writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) now1 <- getTimeCoarse atomically do q0 <- readTVar ncqWriteQueue - w0 <- readTVar ncqWaitIndex + w0 <- readTVar ncqStaged <&> fromMaybe HPSQ.empty . IntMap.lookup (fromIntegral fdr) b0 <- readTVar ncqNotWritten wbytes <- newTVar 0 @@ -479,24 +486,20 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do next (HPSQ.delete h q, HPSQ.insert h now1 (o,l) w,xs) writeTVar ncqWriteQueue rq - writeTVar ncqWaitIndex rw + modifyTVar ncqStaged (IntMap.insert (fromIntegral fdr) rw) bw <- readTVar wbytes writeTVar ncqNotWritten (max 0 (b0 - bw)) - writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 (fromIntegral size)) - indexNow <- readTVarIO ncqIndexNow when (fromIntegral size >= ncqMinLog || indexNow > 0) do - fsize <- readTVarIO ncqCurrentHandleR - >>= getFdStatus - <&> PFS.fileSize + fsize <- getFdStatus fdr <&> PFS.fileSize unless (fsize == 0) do (n,u) <- atomically do - r <- readTVar ncqCurrentHandleR <&> fromIntegral + let r = fromIntegral fdr u <- readTVar ncqCurrentUsage <&> fromMaybe 0 . IntMap.lookup r pure (fromIntegral @_ @Word32 r, u) @@ -510,37 +513,42 @@ ncqStorageRun ncq@NCQStorage{..} = flip runContT pure do atomically do writeTVar ncqIndexNow 0 - r <- readTVar ncqCurrentHandleR -- NOTE: extra-use -- добавляем лишний 1 для индексации. -- исходный файл закрываем, только когда проиндексировано. -- то есть должны отнять 1 после индексации. - modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral r) 1) - writeTQueue indexQ (r, fossilized) + modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fdr) 1) + writeTQueue indexQ (fdr, fossilized) - let flags = defaultFileFlags { exclusive = True } - - touch current + closeFd fh writeBinaryFileDurable (ncqGetCurrentSizeName ncq) (N.bytestring64 0) - - liftIO (PosixBase.openFd current Posix.ReadWrite flags) - >>= atomically . writeTVar ncqCurrentHandleW - - liftIO (PosixBase.openFd current Posix.ReadWrite flags) - >>= atomically . writeTVar ncqCurrentHandleR + ncqOpenCurrent ncq debug $ "TRUNCATED, moved to" <+> pretty fossilized - toClose <- atomically do - w <- readTVar ncqCurrentUsage <&> IntMap.toList - let (alive,dead) = List.partition( (>0) . snd) w - writeTVar ncqCurrentUsage (IntMap.fromList alive) - pure dead - for_ toClose $ \(f,_) -> do - when (f > 0) do - debug $ "CLOSE FD" <+> pretty f - Posix.closeFd (fromIntegral f) + toClose <- atomically do + usage <- readTVar ncqCurrentUsage + staged <- readTVar ncqStaged + indexed <- readTVar ncqIndexed + + let (alive, dead) = List.partition (\(_, u) -> u > 0) (IntMap.toList usage) + + let closable = do + (f, _) <- dead + guard (IntSet.member f indexed) + guard (maybe True HPSQ.null (IntMap.lookup f staged)) + pure f + + writeTVar ncqCurrentUsage (IntMap.fromList alive) + writeTVar ncqIndexed (indexed `IntSet.difference` IntSet.fromList closable) + writeTVar ncqStaged (foldr IntMap.delete staged closable) + + pure closable + + for_ toClose $ \f -> do + debug $ "CLOSE FD" <+> pretty f + closeFd (fromIntegral f) -- ncqStoragePut_ :: MonadUnliftIO m @@ -614,7 +622,7 @@ ncqTombPrefix = "T;;\x00" ncqLocatedSize :: Location -> Integer ncqLocatedSize = \case InWriteQueue WQItem{..} -> fromIntegral $ maybe 0 LBS.length wqData - InCurrent (_,s) -> fromIntegral s + InCurrent (_,_,s) -> fromIntegral s InFossil _ (_,s) -> fromIntegral s evictIfNeededSTM :: NCQStorage -> Maybe Int -> STM () @@ -646,18 +654,23 @@ evictIfNeededSTM NCQStorage{..} howMany = do ncqLocate :: MonadIO m => NCQStorage -> HashRef -> m (Maybe Location) ncqLocate ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do - l1 <- atomically do - inQ <- readTVar ncqWriteQueue - <&> (fmap snd . HPSQ.lookup h) - <&> \case - Just wq -> Just (InWriteQueue wq) - _ -> Nothing + inQ <- atomically $ readTVar ncqWriteQueue + <&> (fmap snd . HPSQ.lookup h) + <&> \case + Just wq -> Just (InWriteQueue wq) + _ -> Nothing - inC <- readTVar ncqWaitIndex <&> (fmap snd . HPSQ.lookup h) <&> fmap InCurrent - pure (inQ <|> inC) + for_ inQ $ exit . Just - for_ l1 $ exit . Just + inC <- atomically $ do + s <- readTVar ncqStaged <&> IntMap.toList + let found = lastMay $ catMaybes [ (fd,) <$> HPSQ.lookup h hpsq | (fd, hpsq) <- s ] + case found of + Just (f, (_,(off,size))) -> pure (Just (InCurrent (fromIntegral f,off,size))) + Nothing -> pure Nothing + + for_ inC $ exit . Just now <- getTimeCoarse tracked <- readTVarIO ncqTrackedFiles @@ -738,14 +751,13 @@ ncqStorageGet ncq h = runMaybeT do lift (ncqStorageGet_ ncq location) >>= toMPlus ncqStorageGet_ :: MonadUnliftIO m => NCQStorage -> Location -> m (Maybe LBS.ByteString) -ncqStorageGet_ NCQStorage{..} = \case +ncqStorageGet_ ncq@NCQStorage{..} = \case InWriteQueue WQItem{ wqData = Just lbs } -> do pure $ Just lbs - InCurrent (o,l) -> do + InCurrent (fd,o,l) -> do r <- atomically do a <- newEmptyTMVar - fd <- readTVar ncqCurrentHandleR modifyTVar ncqCurrentUsage (IntMap.insertWith (+) (fromIntegral fd) 1) modifyTVar ncqCurrentReadReq (|> (fd, o, l, a)) pure a @@ -798,8 +810,8 @@ ncqStorageDel ncq@NCQStorage{..} h = flip runContT pure $ callCC \exit -> do ncqLocate ncq h >>= atomically . \case Just (InFossil _ _) -> writeTombstone (WQItem False Nothing) - Just (InCurrent _) -> do - modifyTVar ncqWaitIndex (HPSQ.delete h) + Just (InCurrent (fd,_,_)) -> do + modifyTVar ncqStaged (IntMap.adjust (HPSQ.delete h) (fromIntegral fd)) writeTombstone (WQItem False Nothing) Just (InWriteQueue _) -> writeTombstone (WQItem True Nothing) @@ -877,7 +889,7 @@ ncqStorageOpen fp' = do where - readCurrent ncq@NCQStorage{..} = do + readCurrent ncq@NCQStorage{..} = ncqWithCurrent ncq \(RFd fd, _) -> do let fn = ncqGetCurrentName ncq -- liftIO $ print $ pretty "FILE" <+> pretty fn bs0 <- liftIO $ mmapFileByteString fn Nothing @@ -902,11 +914,33 @@ ncqStorageOpen fp' = do next (o+w+4, BS.drop (w+4) bs) - atomically $ writeTVar ncqWaitIndex (HPSQ.fromList items) + atomically $ modifyTVar ncqStaged (IntMap.insert (fromIntegral fd) (HPSQ.fromList items)) ncqStorageInit :: MonadUnliftIO m => FilePath -> m NCQStorage ncqStorageInit = ncqStorageInit_ True +ncqOpenCurrent :: MonadUnliftIO m => NCQStorage -> m () +ncqOpenCurrent ncq@NCQStorage{..} = do + let fp = ncqGetCurrentName ncq + touch fp + let flags = defaultFileFlags { exclusive = True } + fdw <- liftIO (PosixBase.openFd fp Posix.ReadWrite flags) <&> WFd + fdr <- liftIO (PosixBase.openFd fp Posix.ReadOnly flags) <&> RFd + atomically $ writeTVar ncqCurrentFd (Just (fdr, fdw)) + +ncqWithCurrent :: MonadUnliftIO m => NCQStorage -> ((RFd, WFd) -> m a) -> m a +ncqWithCurrent ncq@NCQStorage{..} action = do + flip fix 2 $ \next i -> do + readTVarIO ncqCurrentFd >>= \case + + Just a -> action a + + Nothing | i >= 0 -> do + ncqOpenCurrent ncq + next (pred i) + + Nothing -> do + throwIO NCQStorageCantOpenCurrent ncqStorageInit_ :: MonadUnliftIO m => Bool -> FilePath -> m NCQStorage ncqStorageInit_ check path = do @@ -956,7 +990,7 @@ ncqStorageInit_ check path = do ncqNotWritten <- newTVarIO 0 ncqLastWritten <- getTimeCoarse >>= newTVarIO - ncqWaitIndex <- newTVarIO HPSQ.empty + ncqStaged <- newTVarIO mempty ncqFlushNow <- newTVarIO mempty ncqOpenDone <- newEmptyTMVarIO @@ -966,6 +1000,8 @@ ncqStorageInit_ check path = do ncqTrackedFiles <- newTVarIO HPSQ.empty ncqCachedEntries <- newTVarIO 0 ncqIndexNow <- newTVarIO 0 + ncqCurrentFd <- newTVarIO Nothing + ncqIndexed <- newTVarIO mempty let currentName = ncqGetCurrentName_ path ncqGen @@ -974,8 +1010,6 @@ ncqStorageInit_ check path = do hereCurrent <- doesPathExist currentName when hereCurrent $ liftIO do - let ncqCurrentHandleW = undefined - let ncqCurrentHandleR = undefined let ncq0 = NCQStorage{..} lastSz <- try @_ @IOException (BS.readFile currentSize) @@ -994,19 +1028,10 @@ ncqStorageInit_ check path = do ncqWriteError ncq0 msg mv currentName fossilized - touch currentName - - let flags = defaultFileFlags { exclusive = True } - - ncqCurrentHandleW <- liftIO (PosixBase.openFd currentName Posix.ReadWrite flags) - >>= newTVarIO - - ncqCurrentHandleR <- liftIO (PosixBase.openFd currentName Posix.ReadOnly flags) - >>= newTVarIO - debug $ "currentFileName" <+> pretty (ncqGetCurrentName_ path ncqGen) let ncq = NCQStorage{..} + ncqOpenCurrent ncq pure ncq @@ -1016,6 +1041,19 @@ ncqStorageFlush = ncqStorageSync ncqIndexRightNow :: MonadUnliftIO m => NCQStorage -> m () ncqIndexRightNow NCQStorage{..} = atomically $ modifyTVar ncqIndexNow succ +ncqFinalize :: MonadUnliftIO m => NCQStorage -> m () +ncqFinalize NCQStorage{..} = do + + liftIO $ readTVarIO ncqStaged <&> IntMap.keys >>= mapM_ (closeFd . fromIntegral) + atomically (writeTVar ncqStaged mempty) + + readTVarIO ncqCurrentFd >>= \case + Just (RFd _, WFd wfd) -> do + liftIO (closeFd wfd) + atomically (writeTVar ncqCurrentFd Nothing) + + _ -> none + withNCQ :: forall m a . MonadUnliftIO m => (NCQStorage -> NCQStorage) -> FilePath diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 618a3f5d..f603c264 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -1179,27 +1179,6 @@ executable test-scripts , zstd -executable test-cq-storage - import: shared-properties - import: common-deps - default-language: Haskell2010 - ghc-options: - hs-source-dirs: test - main-is: TestCQ.hs - build-depends: - base, hbs2-core, hbs2-log-structured, hbs2-storage-ncq - , network - , string-conversions - , db-pipe - , suckless-conf - , network-byte-order - , text - , time - , mmap - , zstd - , unix - - executable tcq import: shared-properties import: common-deps diff --git a/hbs2-tests/test/TestCQ.hs b/hbs2-tests/test/TestCQ.hs deleted file mode 100644 index 90f16880..00000000 --- a/hbs2-tests/test/TestCQ.hs +++ /dev/null @@ -1,556 +0,0 @@ -{-# Language AllowAmbiguousTypes #-} -{-# Language UndecidableInstances #-} -{-# Language MultiWayIf #-} -{-# Language RecordWildCards #-} -module Main where - -import HBS2.Prelude.Plated -import HBS2.OrDie -import HBS2.Hash -import HBS2.Data.Types.Refs -import HBS2.Clock -import HBS2.Merkle - -import HBS2.Storage -import HBS2.Storage.Simple -import HBS2.Storage.Operations.ByteString - -import HBS2.System.Logger.Simple.ANSI - -import HBS2.Storage.NCQ -import HBS2.Data.Log.Structured.NCQ - - -import Data.Config.Suckless.Syntax -import Data.Config.Suckless.Script -import Data.Config.Suckless.System - -import DBPipe.SQLite hiding (field) - -import Data.Bits -import Data.ByteString (ByteString) -import Data.ByteString qualified as BS -import Data.ByteString.Lazy qualified as LBS -import Data.ByteString.Char8 qualified as BS8 -import Data.ByteString.Builder -import Data.Maybe -import Data.Word -import Data.List qualified as List -import Data.Vector qualified as V -import Data.Vector ((!)) -import Control.Monad.Trans.Cont -import Control.Monad.Trans.Maybe -import Network.ByteOrder qualified as N -import Data.Coerce -import Data.HashPSQ qualified as HPSQ -import Data.HashSet qualified as HS -import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HM -import Data.IntMap qualified as IntMap -import Data.IntMap (IntMap) -import Data.Fixed -import System.Environment -import System.Directory -import System.Posix.Fcntl -import System.Posix.IO -import System.IO.MMap -import System.IO qualified as IO -import System.Exit (exitSuccess, exitFailure) -import System.Random -import Safe -import Lens.Micro.Platform -import Control.Concurrent.STM qualified as STM - -import UnliftIO - -import Text.InterpolatedString.Perl6 (qc) - -import Streaming.Prelude qualified as S -import System.TimeIt - -import System.IO.Unsafe (unsafePerformIO) - -setupLogger :: MonadIO m => m () -setupLogger = do - setLogging @DEBUG $ toStderr . logPrefix "[debug] " - setLogging @ERROR $ toStderr . logPrefix "[error] " - setLogging @WARN $ toStderr . logPrefix "[warn] " - setLogging @NOTICE $ toStdout . logPrefix "" - -flushLoggers :: MonadIO m => m () -flushLoggers = do - silence - -silence :: MonadIO m => m () -silence = do - setLoggingOff @DEBUG - setLoggingOff @ERROR - setLoggingOff @WARN - setLoggingOff @NOTICE - setLoggingOff @TRACE - - - -main :: IO () -main = do - - let dict = makeDict @C do - - entry $ bindMatch "--help" $ nil_ \case - HelpEntryBound what -> helpEntry what - [StringLike s] -> helpList False (Just s) - _ -> helpList False Nothing - - internalEntries - - entry $ bindMatch "run" $ \case - [ StringLike what ] -> do - liftIO (readFile what) - <&> parseTop - >>= either (error.show) pure - >>= evalTop - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:sqlite" $ nil_ $ \case - [StringLike fn] -> liftIO do - hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines - - let dbname = "jopakita.db" - rm dbname - newDb <- newDBPipeEnv dbPipeOptsDef dbname - - withDB newDb do - ddl [qc|CREATE TABLE kv (k BLOB PRIMARY KEY, v int)|] - - timeItNamed "sqlite -- test insert" do - withDB newDb $ transactional do - for_ hashes $ \h -> do - let k = coerce @_ @ByteString h - insert [qc|insert into kv (k,v) values(?,?)|] (k,0) - - replicateM_ 5 do - withDB newDb do - timeItNamed "sqlite -- select test" do - -- fn <- newTVarIO 0 - -- fns <- newTVarIO 0 - q <- newTQueueIO - for_ hashes $ \h -> do - let k = coerce @_ @ByteString h - - founds <- select [qc|select k,v from kv where k = ?|] (Only k) - - for_ founds $ \(s :: ByteString,n :: Int) -> do - atomically $ writeTQueue q (s,n) - - found <- atomically (STM.flushTQueue q) <&> List.length - liftIO $ IO.hPrint stderr $ "FOUND" <+> pretty found - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:hashmap" $ nil_ $ \case - [StringLike fn] -> liftIO do - hashes <- readFile fn <&> mapMaybe (fromStringMay @HashRef) . lines - let hma = HM.fromList [(h,()) | h <- hashes ] - - replicateM_ 5 do - timeItNamed (show $ "HashMap lookup test" <+> pretty (List.length hashes)) do - q <- newTQueueIO - for_ hashes $ \h -> do - when (HM.member h hma) do - atomically $ writeTQueue q h - - n <- atomically ( STM.flushTQueue q) <&> List.length - liftIO $ print $ "FOUND" <+> pretty n - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "test:nway:scan" $ nil_ $ \case - [ StringLike fn ]-> liftIO do - (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) - let emptyKey = BS.replicate nwayKeySize 0 - nwayHashScanAll meta mmaped $ \o k v -> do - unless (k == emptyKey) do - liftIO $ print $ "scan:found" <+> fill 44 (pretty (coerce @_ @HashRef k)) <+> pretty o - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "test:nway:lookup" $ nil_ $ \case - - [ StringLike fn ] -> liftIO do - - hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - (mmaped, nw) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) - - replicateM_ 5 do - timeItNamed (show $ "lookup:nway" <+> pretty (List.length hashes)) do - rQ <- newTQueueIO - - for_ hashes $ \h -> do - r <- nwayHashLookup nw mmaped (coerce @_ @ByteString h) - when (isJust r) do - atomically $ writeTQueue rQ (h,r) - - found <- atomically $ STM.flushTQueue rQ - liftIO $ print $ "FOUND" <+> pretty (List.length found) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:nway:stats" $ \case - [StringLike fn] -> liftIO do - - mt_ <- newTVarIO 0 - total_ <- newTVarIO 0 - - (mmaped,meta@NWayHash{..}) <- nwayHashMMapReadOnly fn >>= orThrow (NWayHashInvalidMetaData fn) - - let emptyKey = BS.replicate nwayKeySize 0 - nwayHashScanAll meta mmaped $ \o k v -> do - atomically do - modifyTVar total_ succ - when (k == emptyKey) do - modifyTVar mt_ succ - - mt <- readTVarIO mt_ - total <- readTVarIO total_ - let used = total - mt - - let ratio = realToFrac @_ @(Fixed E3) (realToFrac used / realToFrac total) - - let stats = mkForm @C "stats" [ mkForm "empty" [mkInt mt] - , mkForm "used" [mkInt used] - , mkForm "total" [mkInt total] - , mkForm "ratio" [mkDouble ratio] - ] - - pure $ mkList [mkForm "metadata" [mkSyntax meta], stats] - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:nway:metadata" $ \case - [StringLike fn] -> liftIO do - (_, nw) <- nwayHashMMapReadOnly fn >>= orThrowUser "can't mmape file" - pure $ mkSyntax nw - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:nway:write" $ nil_ $ \case - [StringLike fn] -> liftIO do - hashes <- getContents <&> mapMaybe (fromStringMay @HashRef) . lines - let items = [ (coerce @_ @ByteString x, N.bytestring64 0) | x <- hashes ] - nwayWriteBatch (nwayAllocDef 1.10 32 8 8) "." fn items - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:index" $ \case - [ StringLike p, StringLike fsrc ]-> lift $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen p - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - (fres,_) <- lift $ ncqIndexFile ncq fsrc - - pure $ mkSym fres - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:list-tracked-files" $ nil_ \case - [StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do - ncqListTrackedFiles ncq >>= mapM_ display_ - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:get:stdout" $ nil_ \case - - [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do - w <- ncqStorageGet ncq h - maybe1 w exitFailure LBS.putStr - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:has" $ \case - - [StringLike fn, HashLike h] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen fn - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - lift do - ncqStorageHasBlock ncq h >>= \case - Nothing -> pure nil - Just x -> pure $ mkInt x - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:up" $ nil_ $ \case - - [StringLike fn] -> liftIO $ flip runContT pure do - - ncq@NCQStorage{..} <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - trf <- readTVarIO ncqTrackedFiles <&> HPSQ.keys - - for_ trf $ \tf -> do - notice $ "tracked" <+> pretty tf - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw" $ \case - [StringLike fn] -> liftIO $ flip runContT pure do - - debug "SHIT" - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - h <- lift $ ncqStoragePut ncq "JOPAKITA!" - h2 <- lift $ ncqStoragePut ncq "PECHENTRESKI!" - - liftIO $ ncqStorageStop ncq - wait writer - - pure $ mkList [mkSym (show $ pretty h), mkSym (show $ pretty h2)] - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:list" $ nil_ \case - [StringLike p, StringLike f] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen p - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - lift $ ncqStorageScanDataFile ncq f $ \o _ k v -> do - liftIO $ print $ pretty k -- <+> pretty o <+> pretty (BS.length v) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:find-some" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - liftIO $ for_ hashes $ \h -> runMaybeT do - what <- liftIO (ncqStorageHasBlock ncq h) >>= toMPlus - -- let h1 = hashObject @HbSync what - -- liftIO $ print $ "block" <+> pretty h <+> pretty h1 <+> pretty (LBS.length what) - liftIO $ print $ "block" <+> pretty h <+> pretty what -- (LBS.length what) - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:dump-some" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix - - s <- simpleStorageInit @HbSync (Just xdg) - - w <- ContT $ withAsync $ simpleStorageWorker s - link w - - let sto = AnyStorage s - - rm fn - dump <- openFile fn WriteMode - - for_ hashes $ \h -> runMaybeT do - blk <- getBlock sto (coerce h) >>= toMPlus - debug $ "read" <+> pretty (LBS.length blk) - none - -- liftIO $ LBS.hPut dump blk - - hClose dump - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:locate:one" $ nil_ \case - [StringLike fn, HashLike h] -> lift $ withNCQ id fn $ \ncq -> do - ncqLocate ncq h >>= \case - Nothing -> print $ pretty "not-found" <+> pretty h - Just l -> print $ pretty "found" <+> pretty h <+> pretty l - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:put:stdin" $ \case - [StringLike fn] -> lift $ withNCQ id fn $ \ncq -> do - what <- liftIO BS.getContents - href <- liftIO $ ncqStoragePut ncq (LBS.fromStrict what) - pure $ maybe nil (mkSym . show . pretty) href - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:get" $ nil_ \case - [StringLike fn, HashLike href] -> lift $ withNCQ id fn $ \ncq -> do - mbs <- ncqStorageGet ncq href - maybe1 mbs exitFailure LBS.putStr - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:merkle:write" $ nil_ \case - [StringLike fn, StringLike what] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - ContT $ bracket none $ const do - none - - lbs <- liftIO $ LBS.readFile what - - ta <- getTimeCoarse - - (t1,hashes) <- timeItT $ liftIO do - chu <- S.toList_ (readChunkedBS lbs (256*1024)) - forConcurrently chu $ \chunk -> do - ncqStoragePut ncq chunk >>= orThrowUser "can't save" - - tb <- getTimeCoarse - - notice $ "stored in" <+> pretty t1 - <+> pretty (realToFrac @_ @(Fixed E6) (realToFrac (toMicroSeconds (TimeoutTS (tb - ta))) / 1e6)) - - -- FIXME: handle-hardcode - let pt = toPTree (MaxSize 1024) (MaxNum 256) hashes -- FIXME: settings - - m <- makeMerkle 0 pt $ \(_,_,bss) -> liftIO do - void $ ncqStoragePut ncq bss >>= orThrowUser "can't save" - - liftIO $ print $ pretty m - - debug "stopping" - liftIO $ ncqStorageStop ncq - debug "stopping done" - - wait writer - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:one-ref" $ nil_ $ \case - [StringLike fn] -> liftIO $ flip runContT pure do - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - ContT $ bracket none $ const do - none - - none - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:write-some" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - - hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - xdg <- liftIO $ getXdgDirectory XdgData "hbs2" <&> fromString @StoragePrefix - - s <- simpleStorageInit @HbSync (Just xdg) - - w <- ContT $ withAsync $ simpleStorageWorker s - link w - - let sto = AnyStorage s - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - ContT $ bracket none $ const do - none - - for_ hashes $ \h -> runMaybeT do - already <- liftIO (ncqStorageHasBlock ncq h <&> isJust) - guard (not already) - -- debug $ "write" <+> pretty h - blk <- getBlock sto (coerce h) >>= toMPlus - liftIO do - let l = LBS.length blk - -- print $ pretty h <+> pretty l - ncqStoragePut ncq blk - - warn "about to stop storage!" - liftIO $ ncqStorageStop ncq - - wait writer - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:raw:del-some" $ nil_ \case - [StringLike fn] -> liftIO $ flip runContT pure do - - hashes <- liftIO $ getContents <&> mapMaybe (fromStringMay @HashRef) . lines - - ncq <- lift $ ncqStorageOpen fn - - writer <- ContT $ withAsync $ ncqStorageRun ncq - link writer - - ContT $ bracket none $ const do - none - - debug $ "TO DELETE" <+> pretty (length hashes) - - for_ hashes $ \h -> runMaybeT do - liftIO do - -- print $ "delete" <+> pretty h - ncqStorageDel ncq h - - liftIO $ ncqStorageStop ncq - - wait writer - - e -> throwIO $ BadFormException @C (mkList e) - - - entry $ bindMatch "test:ncq:index:now" $ nil_ \case - [StringLike p] -> lift do - withNCQ id p $ \ncq -> do - display_ $ "test:ncq:index:now" <+> pretty p - ncqIndexRightNow ncq - pause @'Seconds 10 - display_ $ "test:ncq:index:now" <+> pretty p <+> "done" - - e -> throwIO $ BadFormException @C (mkList e) - - entry $ bindMatch "test:ncq:run" $ nil_ \case - [StringLike p] -> lift do - withNCQ id p $ \_ -> do - display_ $ "hello from ncq" <+> pretty p - - e -> throwIO $ BadFormException @C (mkList e) - - - setupLogger - - argz <- liftIO getArgs - - forms <- parseTop (unlines $ unwords <$> splitForms argz) - & either (error.show) pure - - tvd <- newTVarIO dict - - (runEval tvd forms >>= eatNil display) - `finally` flushLoggers -