This commit is contained in:
voidlizard 2024-12-24 11:44:23 +03:00
parent 7f344a7f72
commit 0c50d1cc98
3 changed files with 105 additions and 83 deletions

View File

@ -31,6 +31,9 @@ import HBS2.Peer.RPC.Client.StorageClient
import HBS2.CLI.Run.Internal.Merkle (getTreeContents) import HBS2.CLI.Run.Internal.Merkle (getTreeContents)
-- move to a sepatate library
import HBS2.Data.Log.Structured
import HBS2.Git.Local import HBS2.Git.Local
import HBS2.Git.Local.CLI import HBS2.Git.Local.CLI
@ -1899,14 +1902,14 @@ theDict = do
fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set"
file <- liftIO $ mmapFileByteString fname Nothing file <- liftIO $ mmapFileByteString fname Nothing
void $ runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do void $ runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do
debug $ "object" <+> pretty h <+> pretty s liftIO $ print $ "object" <+> pretty h <+> pretty s
entry $ bindMatch "test:git:read-log-lbs" $ nil_ $ \syn -> lift do entry $ bindMatch "test:git:read-log-lbs" $ nil_ $ \syn -> lift do
let (_, argz) = splitOpts [] syn let (_, argz) = splitOpts [] syn
fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set"
theLog <- liftIO $ LBS.readFile fname theLog <- liftIO $ LBS.readFile fname
void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do
debug $ "object" <+> pretty h liftIO $ print $ "object" <+> pretty h <+> pretty s
entry $ bindMatch "test:git:log:index:naive:dump" $ nil_ $ \syn -> lift do entry $ bindMatch "test:git:log:index:naive:dump" $ nil_ $ \syn -> lift do
let (_, argz) = splitOpts [] syn let (_, argz) = splitOpts [] syn
@ -2129,99 +2132,41 @@ theDict = do
pure (not already) -- && not alsoInIdx) pure (not already) -- && not alsoInIdx)
flip runContT pure do
tnum' <- getNumCapabilities liftIO $ flip runContT pure do
let tnum = if total < 100 then 0 else max 0 (floor (logBase 2 (realToFrac total)) - 1) sourceQ <- newTBQueueIO 1000
liftIO $ print $ red "TNUM" <+> pretty tnum <+> pretty total theReader <- ContT $ withGitCat
queues <- replicateM (tnum+1) newTQueueIO <&> Vector.fromList seed <- randomIO @Word16
logFile <- ContT $ withBinaryFile (show $ "export-" <> pretty seed <> ".log") AppendMode
feeder <- ContT $ withAsync do l <- lift $ async $ writeSections (atomically (readTBQueue sourceQ)) \output -> do
let balanced = zip (cycle [0..tnum]) r liftIO $ LBS.hPutStr logFile output
for_ balanced $ \(i,c) -> atomically $ writeTQueue (queues ! i) (Just c)
atomically $ for_ queues (`writeTQueue` Nothing)
workers <- liftIO $ for [0..tnum] $ \i -> async $ flip runContT pure do link l
theReader <- ContT $ withGitCat for_ r $ \commit -> do
void $ ContT $ bracket none (const $ stopProcess theReader) hashes <- gitReadTreeObjectsOnly commit
<&> (commit:)
>>= filterM notWrittenYet
liftIO do for_ hashes $ \gh -> do
fix \loop -> flip runContT pure do (_t,lbs) <- gitReadObjectMaybe theReader gh
suff <- liftIO $ randomIO @Word32 >>= orThrow (GitReadError (show $ pretty gh))
ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode
fix \loop2 -> do
atomically (readTQueue (queues ! i)) >>= \case
Nothing -> none
Just commit -> do
debug $ "write commit and shit" <+> pretty commit
hashes <- gitReadTreeObjectsOnly commit let section = [ Builder.byteString (coerce gh)
<&> (commit:) , Builder.lazyByteString lbs
>>= filterM notWrittenYet ] & Builder.toLazyByteString . mconcat
for_ hashes $ \gh -> do atomically do
modifyTVar _already (HS.insert gh)
writeTBQueue sourceQ (Just section)
(_t,lbs) <- gitReadObjectMaybe theReader gh atomically $ writeTBQueue sourceQ Nothing
>>= orThrow (GitReadError (show $ pretty gh))
let kbs = coerce @_ @BS.ByteString gh wait l
let keySize = BS.length kbs
let objectSize = LBS.length lbs & fromIntegral
let entrySize = fromIntegral $ keySize + objectSize
let entry = mconcat [ Builder.word32BE entrySize
, Builder.byteString kbs
, Builder.lazyByteString lbs
]
atomically $ modifyTVar _already (HS.insert gh)
liftIO $ LBS.hPutStr ofile ( Builder.toLazyByteString entry )
loop2
mapM_ wait (feeder:workers)
-- let chunks = chunksOf (total `div` tnum) r
-- liftIO $ forConcurrently_ chunks $ \chunk -> flip runContT pure do
-- suff <- liftIO $ randomIO @Word32
-- theReader <- ContT $ withGitCat
-- ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode
-- void $ ContT $ bracket none (const $ stopProcess theReader)
-- for_ chunk $ \commit -> do
-- hashes <- gitReadTreeObjectsOnly commit
-- <&> (commit:)
-- >>= filterM notWrittenYet
-- for_ hashes $ \gh -> do
-- (_t,lbs) <- gitReadObjectMaybe theReader gh
-- >>= orThrow (GitReadError (show $ pretty gh))
-- let kbs = coerce @_ @BS.ByteString gh
-- let keySize = BS.length kbs
-- -- debug $ pretty gh <+> pretty keySize
-- let objectSize = LBS.length lbs & fromIntegral
-- let entrySize = fromIntegral $ keySize + objectSize
-- let entry = LBS.toStrict $ Builder.toLazyByteString $ Builder.word32BE entrySize
-- liftIO do
-- atomically $ modifyTVar _already (HS.insert gh)
-- debug $ "entry size" <+> pretty (BS.length entry) <+> pretty gh <+> pretty entrySize
-- BS.hPutStr ofile entry
-- BS.hPutStr ofile kbs
-- LBS.hPutStr ofile lbs
linearSearchLBS hash lbs = do linearSearchLBS hash lbs = do

View File

@ -108,6 +108,7 @@ common shared-properties
, unix , unix
, uuid , uuid
, vector-algorithms , vector-algorithms
, zstd
library library
@ -121,6 +122,8 @@ library
HBS2.Git3.State.Direct HBS2.Git3.State.Direct
HBS2.Git3.Config.Local HBS2.Git3.Config.Local
HBS2.Data.Log.Structured
build-depends: base build-depends: base
, base16-bytestring , base16-bytestring
, binary , binary

View File

@ -0,0 +1,74 @@
module HBS2.Data.Log.Structured where
import HBS2.Prelude.Plated
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.ByteString.Builder qualified as B
import Network.ByteOrder hiding (ByteString)
import Codec.Compression.Zstd.Streaming qualified as Zstd
import Codec.Compression.Zstd qualified as Zstd
import Codec.Compression.Zstd.Streaming (Result(..))
import Control.Exception
-- import UnliftIO
writeSection :: forall m . Monad m
=> ByteString
-> ( ByteString -> m () )
-> m ()
writeSection bs output = do
let bssize = bytestring32 (fromIntegral $ LBS.length bs)
let section = B.byteString bssize <> B.lazyByteString bs
output (B.toLazyByteString section)
writeSections :: forall m . Monad m
=> m (Maybe ByteString)
-> ( ByteString -> m () )
-> m ()
writeSections source sink = fix \next -> do
source >>= maybe none (\bs -> writeSection bs sink >> next)
data CompressedStreamError =
CompressedStreamWriteError
deriving stock (Typeable,Show)
instance Exception CompressedStreamError
writeCompressedStreamZstd :: forall m . MonadIO m
=> Result
-> m (Maybe ByteString)
-> ( ByteString -> m () )
-> m ()
writeCompressedStreamZstd stream source sink = do
flip fix (mempty,stream) $ \next -> \case
(_, Done s) -> sink (LBS.fromStrict s)
(_,Error _ _) -> liftIO (throwIO CompressedStreamWriteError)
(some, Produce s continue) -> do
sink (LBS.fromStrict s)
c <- liftIO continue
next (some, c)
([], w@(Consume consume)) -> do
source >>= \case
Just piece -> do
next (LBS.toChunks piece, w)
Nothing -> do
c <- liftIO (consume mempty)
next ([], c)
(x:xs, Consume consume) -> do
c <- liftIO (consume x)
next (xs, c)