This commit is contained in:
voidlizard 2024-12-24 18:51:11 +03:00
parent 83bcba17ae
commit 3773c7857b
2 changed files with 102 additions and 47 deletions

View File

@ -2090,7 +2090,7 @@ theDict = do
for_ fnames $ \f -> do
theLog <- liftIO $ LBS.readFile f
void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do
void $ runConsumeLBS (ZstdL.decompress theLog) $ readLogFileLBS () $ \h s lbs -> do
lift $ S.yield (coerce @_ @BS.ByteString h)
debug $ "object" <+> pretty h
@ -2102,6 +2102,20 @@ theDict = do
BS.hPutStr fh entrySize
BS.hPutStr fh ghs
entry $ bindMatch "test:sqlite" $ nil_ $ \case
[ StringLike fn ] -> lift do
db <- newDBPipeEnv dbPipeOptsDef fn
withDB db do
all <- select_ @_ @(Only Text) [qc|select hash from githash|]
for_ all $ \x -> do
n <- select @(Only Int) [qc|select 1 from githash where hash = ?|] (Only (fromOnly x))
<&> L.null
unless n do
liftIO $ print $ pretty (fromOnly x)
_ -> throwIO (BadFormException @C nil)
entry $ bindMatch "test:git:export-commit-dfs" $ nil_ $ \syn -> lift do
let (opts, argz) = splitOpts [("--index",1)] syn
let hd = headDef "HEAD" [ x | StringLike x <- argz]
@ -2136,44 +2150,69 @@ theDict = do
liftIO $ flip runContT pure do
sourceQ <- newTBQueueIO 1000
tn <- getNumCapabilities
theReader <- ContT $ withGitCat
sourceQ <- newTBQueueIO (fromIntegral tn * 100)
seed <- randomIO @Word16
logFile <- ContT $ withBinaryFile (show $ "export-" <> pretty seed <> ".log") AppendMode
l <- lift $ async $ do
stream <- ZstdS.compress maxCLevel
q <- newTQueueIO
writeSections (atomically (readTBQueue sourceQ)) (atomically . writeTQueue q . Just)
atomically $ writeTQueue q Nothing
writeCompressedStreamZstd stream (atomically $ readTQueue q) $ \shit -> do
liftIO $ LBS.hPutStr logFile shit
zstd <- ZstdS.compress maxCLevel
flip fix zstd \jerk sn -> do
atomically (readTBQueue sourceQ) >>= \case
Nothing -> writeCompressedChunkZstd (LBS.hPutStr logFile) sn Nothing
Just s -> do
lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat
writeCompressedChunkZstd (LBS.hPutStr logFile) sn (Just lbs) >>= jerk
link l
for_ r $ \commit -> do
hashes <- gitReadTreeObjectsOnly commit
<&> (commit:)
>>= filterM notWrittenYet
let commitz = chunksOf (total `div` tn) r
for_ hashes $ \gh -> do
(_t,lbs) <- gitReadObjectMaybe theReader gh
>>= orThrow (GitReadError (show $ pretty gh))
progress_ <- newTVarIO 0
let section = [ Builder.byteString (coerce gh)
, Builder.lazyByteString lbs
] & Builder.toLazyByteString . mconcat
workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do
theReader <- ContT withGitCat
atomically do
modifyTVar _already (HS.insert gh)
writeTBQueue sourceQ (Just section)
for_ chunk \commit -> do
atomically $ modifyTVar progress_ succ
hashes <- gitReadTreeObjectsOnly commit
<&> (commit:)
>>= filterM notWrittenYet
for_ hashes $ \gh -> do
(_t,lbs) <- gitReadObjectMaybe theReader gh
>>= orThrow (GitReadError (show $ pretty gh))
let e = [ Builder.byteString (coerce gh)
, Builder.lazyByteString lbs
] & Builder.toLazyByteString . mconcat
atomically do
modifyTVar _already (HS.insert gh)
writeTBQueue sourceQ (Just e)
ContT $ withAsync $ forever do
pause @'Seconds 1
p <- readTVarIO progress_
let pp = fromIntegral p / (fromIntegral total :: Double) * 100
& realToFrac @_ @(Fixed E2)
liftIO $ IO.hPutStr stderr $ show $ " \r" <> pretty pp <> "%"
pure ()
mapM_ link workers
mapM_ wait workers
atomically $ writeTBQueue sourceQ Nothing
wait l
linearSearchLBS hash lbs = do
found <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do

View File

@ -2,14 +2,15 @@ module HBS2.Data.Log.Structured where
import HBS2.Prelude.Plated
import Data.ByteString.Builder qualified as B
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.ByteString.Builder qualified as B
import Data.Maybe
import Network.ByteOrder hiding (ByteString)
import Codec.Compression.Zstd.Streaming qualified as Zstd
import Codec.Compression.Zstd qualified as Zstd
import Codec.Compression.Zstd.Streaming qualified as Zstd
import Codec.Compression.Zstd.Streaming (Result(..))
import Control.Exception
@ -42,33 +43,48 @@ data CompressedStreamError =
instance Exception CompressedStreamError
writeCompressedChunkZstd :: forall m . MonadIO m
=> ( ByteString -> m () )
-> Result
-> Maybe ByteString
-> m Result
writeCompressedChunkZstd sink stream mlbs = do
flip fix ( LBS.toChunks lbs, stream) $ \next -> \case
([], r@(Done s)) -> sink (LBS.fromStrict s) >> pure r
(_, Done{}) -> liftIO (throwIO CompressedStreamWriteError)
(_, Error{})-> liftIO (throwIO CompressedStreamWriteError)
(w, Produce s continue) -> do
sink (LBS.fromStrict s)
c <- liftIO continue
next (w, c)
(_, Consume consume) | isNothing mlbs -> do
r <- liftIO (consume mempty)
next ([], r)
([], r@(Consume{})) -> pure r
(x:xs, r@(Consume consume)) -> do
what <- liftIO (consume x)
next (xs, what)
where
lbs = fromMaybe mempty mlbs
writeCompressedStreamZstd :: forall m . MonadIO m
=> Result
-> m (Maybe ByteString)
-> ( ByteString -> m () )
-> m ()
writeCompressedStreamZstd stream source sink = do
flip fix (mempty,stream) $ \next -> \case
(_, Done s) -> sink (LBS.fromStrict s)
(_,Error _ _) -> liftIO (throwIO CompressedStreamWriteError)
(some, Produce s continue) -> do
sink (LBS.fromStrict s)
c <- liftIO continue
next (some, c)
([], w@(Consume consume)) -> do
source >>= \case
Just piece -> do
next (LBS.toChunks piece, w)
Nothing -> do
c <- liftIO (consume mempty)
next ([], c)
(x:xs, Consume consume) -> do
c <- liftIO (consume x)
next (xs, c)
flip fix stream $ \next sn -> do
source >>= \case
Nothing -> writeCompressedChunkZstd sink sn Nothing >> none
Just lbs -> writeCompressedChunkZstd sink sn (Just lbs) >>= next