From 0c50d1cc984cac83a42d85625016616d46f0254a Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 24 Dec 2024 11:44:23 +0300 Subject: [PATCH] wip --- hbs2-git3/app/Main.hs | 111 ++++++---------------- hbs2-git3/hbs2-git3.cabal | 3 + hbs2-git3/lib/HBS2/Data/Log/Structured.hs | 74 +++++++++++++++ 3 files changed, 105 insertions(+), 83 deletions(-) create mode 100644 hbs2-git3/lib/HBS2/Data/Log/Structured.hs diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 565c36ac..88098a81 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -31,6 +31,9 @@ import HBS2.Peer.RPC.Client.StorageClient import HBS2.CLI.Run.Internal.Merkle (getTreeContents) +-- move to a sepatate library +import HBS2.Data.Log.Structured + import HBS2.Git.Local import HBS2.Git.Local.CLI @@ -1899,14 +1902,14 @@ theDict = do fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" file <- liftIO $ mmapFileByteString fname Nothing void $ runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do - debug $ "object" <+> pretty h <+> pretty s + liftIO $ print $ "object" <+> pretty h <+> pretty s entry $ bindMatch "test:git:read-log-lbs" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" theLog <- liftIO $ LBS.readFile fname void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do - debug $ "object" <+> pretty h + liftIO $ print $ "object" <+> pretty h <+> pretty s entry $ bindMatch "test:git:log:index:naive:dump" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn @@ -2129,99 +2132,41 @@ theDict = do pure (not already) -- && not alsoInIdx) - flip runContT pure do - tnum' <- getNumCapabilities + liftIO $ flip runContT pure do - let tnum = if total < 100 then 0 else max 0 (floor (logBase 2 (realToFrac total)) - 1) + sourceQ <- newTBQueueIO 1000 - liftIO $ print $ red "TNUM" <+> pretty tnum <+> pretty total + theReader <- ContT $ withGitCat - queues <- replicateM (tnum+1) newTQueueIO <&> Vector.fromList + seed <- randomIO @Word16 + logFile <- ContT $ withBinaryFile (show $ "export-" <> pretty seed <> ".log") AppendMode - feeder <- ContT $ withAsync do - let balanced = zip (cycle [0..tnum]) r - for_ balanced $ \(i,c) -> atomically $ writeTQueue (queues ! i) (Just c) - atomically $ for_ queues (`writeTQueue` Nothing) + l <- lift $ async $ writeSections (atomically (readTBQueue sourceQ)) \output -> do + liftIO $ LBS.hPutStr logFile output - workers <- liftIO $ for [0..tnum] $ \i -> async $ flip runContT pure do + link l - theReader <- ContT $ withGitCat - void $ ContT $ bracket none (const $ stopProcess theReader) + for_ r $ \commit -> do + hashes <- gitReadTreeObjectsOnly commit + <&> (commit:) + >>= filterM notWrittenYet - liftIO do - fix \loop -> flip runContT pure do - suff <- liftIO $ randomIO @Word32 - ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode - fix \loop2 -> do - atomically (readTQueue (queues ! i)) >>= \case - Nothing -> none - Just commit -> do - debug $ "write commit and shit" <+> pretty commit + for_ hashes $ \gh -> do + (_t,lbs) <- gitReadObjectMaybe theReader gh + >>= orThrow (GitReadError (show $ pretty gh)) - hashes <- gitReadTreeObjectsOnly commit - <&> (commit:) - >>= filterM notWrittenYet + let section = [ Builder.byteString (coerce gh) + , Builder.lazyByteString lbs + ] & Builder.toLazyByteString . mconcat - for_ hashes $ \gh -> do + atomically do + modifyTVar _already (HS.insert gh) + writeTBQueue sourceQ (Just section) - (_t,lbs) <- gitReadObjectMaybe theReader gh - >>= orThrow (GitReadError (show $ pretty gh)) + atomically $ writeTBQueue sourceQ Nothing - let kbs = coerce @_ @BS.ByteString gh - let keySize = BS.length kbs - - let objectSize = LBS.length lbs & fromIntegral - let entrySize = fromIntegral $ keySize + objectSize - - let entry = mconcat [ Builder.word32BE entrySize - , Builder.byteString kbs - , Builder.lazyByteString lbs - ] - - atomically $ modifyTVar _already (HS.insert gh) - liftIO $ LBS.hPutStr ofile ( Builder.toLazyByteString entry ) - - loop2 - - mapM_ wait (feeder:workers) - - - -- let chunks = chunksOf (total `div` tnum) r - - -- liftIO $ forConcurrently_ chunks $ \chunk -> flip runContT pure do - - -- suff <- liftIO $ randomIO @Word32 - -- theReader <- ContT $ withGitCat - -- ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode - - -- void $ ContT $ bracket none (const $ stopProcess theReader) - -- for_ chunk $ \commit -> do - - -- hashes <- gitReadTreeObjectsOnly commit - -- <&> (commit:) - -- >>= filterM notWrittenYet - - -- for_ hashes $ \gh -> do - - -- (_t,lbs) <- gitReadObjectMaybe theReader gh - -- >>= orThrow (GitReadError (show $ pretty gh)) - - -- let kbs = coerce @_ @BS.ByteString gh - -- let keySize = BS.length kbs - - -- -- debug $ pretty gh <+> pretty keySize - - -- let objectSize = LBS.length lbs & fromIntegral - -- let entrySize = fromIntegral $ keySize + objectSize - -- let entry = LBS.toStrict $ Builder.toLazyByteString $ Builder.word32BE entrySize - - -- liftIO do - -- atomically $ modifyTVar _already (HS.insert gh) - -- debug $ "entry size" <+> pretty (BS.length entry) <+> pretty gh <+> pretty entrySize - -- BS.hPutStr ofile entry - -- BS.hPutStr ofile kbs - -- LBS.hPutStr ofile lbs + wait l linearSearchLBS hash lbs = do diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index ac4cb07c..e8a26432 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -108,6 +108,7 @@ common shared-properties , unix , uuid , vector-algorithms + , zstd library @@ -121,6 +122,8 @@ library HBS2.Git3.State.Direct HBS2.Git3.Config.Local + HBS2.Data.Log.Structured + build-depends: base , base16-bytestring , binary diff --git a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs new file mode 100644 index 00000000..d4e91beb --- /dev/null +++ b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs @@ -0,0 +1,74 @@ +module HBS2.Data.Log.Structured where + +import HBS2.Prelude.Plated + +import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.ByteString.Builder qualified as B +import Network.ByteOrder hiding (ByteString) + +import Codec.Compression.Zstd.Streaming qualified as Zstd +import Codec.Compression.Zstd qualified as Zstd +import Codec.Compression.Zstd.Streaming (Result(..)) + +import Control.Exception + +-- import UnliftIO + +writeSection :: forall m . Monad m + => ByteString + -> ( ByteString -> m () ) + -> m () + +writeSection bs output = do + let bssize = bytestring32 (fromIntegral $ LBS.length bs) + let section = B.byteString bssize <> B.lazyByteString bs + output (B.toLazyByteString section) + + +writeSections :: forall m . Monad m + => m (Maybe ByteString) + -> ( ByteString -> m () ) + -> m () + +writeSections source sink = fix \next -> do + source >>= maybe none (\bs -> writeSection bs sink >> next) + + +data CompressedStreamError = + CompressedStreamWriteError + deriving stock (Typeable,Show) + +instance Exception CompressedStreamError + +writeCompressedStreamZstd :: forall m . MonadIO m + => Result + -> m (Maybe ByteString) + -> ( ByteString -> m () ) + -> m () +writeCompressedStreamZstd stream source sink = do + + flip fix (mempty,stream) $ \next -> \case + (_, Done s) -> sink (LBS.fromStrict s) + + (_,Error _ _) -> liftIO (throwIO CompressedStreamWriteError) + + (some, Produce s continue) -> do + sink (LBS.fromStrict s) + c <- liftIO continue + next (some, c) + + ([], w@(Consume consume)) -> do + source >>= \case + Just piece -> do + next (LBS.toChunks piece, w) + + Nothing -> do + c <- liftIO (consume mempty) + next ([], c) + + (x:xs, Consume consume) -> do + c <- liftIO (consume x) + next (xs, c) +