diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 4059588e..658a52e4 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -2090,7 +2090,7 @@ theDict = do for_ fnames $ \f -> do theLog <- liftIO $ LBS.readFile f - void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do + void $ runConsumeLBS (ZstdL.decompress theLog) $ readLogFileLBS () $ \h s lbs -> do lift $ S.yield (coerce @_ @BS.ByteString h) debug $ "object" <+> pretty h @@ -2102,6 +2102,20 @@ theDict = do BS.hPutStr fh entrySize BS.hPutStr fh ghs + + entry $ bindMatch "test:sqlite" $ nil_ $ \case + [ StringLike fn ] -> lift do + db <- newDBPipeEnv dbPipeOptsDef fn + withDB db do + all <- select_ @_ @(Only Text) [qc|select hash from githash|] + for_ all $ \x -> do + n <- select @(Only Int) [qc|select 1 from githash where hash = ?|] (Only (fromOnly x)) + <&> L.null + unless n do + liftIO $ print $ pretty (fromOnly x) + + _ -> throwIO (BadFormException @C nil) + entry $ bindMatch "test:git:export-commit-dfs" $ nil_ $ \syn -> lift do let (opts, argz) = splitOpts [("--index",1)] syn let hd = headDef "HEAD" [ x | StringLike x <- argz] @@ -2136,44 +2150,69 @@ theDict = do liftIO $ flip runContT pure do - sourceQ <- newTBQueueIO 1000 + tn <- getNumCapabilities - theReader <- ContT $ withGitCat + sourceQ <- newTBQueueIO (fromIntegral tn * 100) seed <- randomIO @Word16 logFile <- ContT $ withBinaryFile (show $ "export-" <> pretty seed <> ".log") AppendMode l <- lift $ async $ do - stream <- ZstdS.compress maxCLevel - q <- newTQueueIO - writeSections (atomically (readTBQueue sourceQ)) (atomically . writeTQueue q . Just) - atomically $ writeTQueue q Nothing - writeCompressedStreamZstd stream (atomically $ readTQueue q) $ \shit -> do - liftIO $ LBS.hPutStr logFile shit + zstd <- ZstdS.compress maxCLevel + flip fix zstd \jerk sn -> do + atomically (readTBQueue sourceQ) >>= \case + Nothing -> writeCompressedChunkZstd (LBS.hPutStr logFile) sn Nothing + Just s -> do + lbs <- S.toList_ (writeSection s $ S.yield) <&> mconcat + writeCompressedChunkZstd (LBS.hPutStr logFile) sn (Just lbs) >>= jerk link l - for_ r $ \commit -> do - hashes <- gitReadTreeObjectsOnly commit - <&> (commit:) - >>= filterM notWrittenYet + let commitz = chunksOf (total `div` tn) r - for_ hashes $ \gh -> do - (_t,lbs) <- gitReadObjectMaybe theReader gh - >>= orThrow (GitReadError (show $ pretty gh)) + progress_ <- newTVarIO 0 - let section = [ Builder.byteString (coerce gh) - , Builder.lazyByteString lbs - ] & Builder.toLazyByteString . mconcat + workers <- lift $ forM (zip [0..] commitz) $ \(i,chunk) -> async $ flip runContT pure do + theReader <- ContT withGitCat - atomically do - modifyTVar _already (HS.insert gh) - writeTBQueue sourceQ (Just section) + for_ chunk \commit -> do + + atomically $ modifyTVar progress_ succ + + hashes <- gitReadTreeObjectsOnly commit + <&> (commit:) + >>= filterM notWrittenYet + + for_ hashes $ \gh -> do + (_t,lbs) <- gitReadObjectMaybe theReader gh + >>= orThrow (GitReadError (show $ pretty gh)) + + let e = [ Builder.byteString (coerce gh) + , Builder.lazyByteString lbs + ] & Builder.toLazyByteString . mconcat + + atomically do + modifyTVar _already (HS.insert gh) + writeTBQueue sourceQ (Just e) + + ContT $ withAsync $ forever do + pause @'Seconds 1 + p <- readTVarIO progress_ + + let pp = fromIntegral p / (fromIntegral total :: Double) * 100 + & realToFrac @_ @(Fixed E2) + + liftIO $ IO.hPutStr stderr $ show $ " \r" <> pretty pp <> "%" + pure () + + mapM_ link workers + mapM_ wait workers atomically $ writeTBQueue sourceQ Nothing wait l + linearSearchLBS hash lbs = do found <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do diff --git a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs index d4e91beb..917c7199 100644 --- a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs +++ b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs @@ -2,14 +2,15 @@ module HBS2.Data.Log.Structured where import HBS2.Prelude.Plated +import Data.ByteString.Builder qualified as B import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS -import Data.ByteString.Builder qualified as B +import Data.Maybe import Network.ByteOrder hiding (ByteString) -import Codec.Compression.Zstd.Streaming qualified as Zstd import Codec.Compression.Zstd qualified as Zstd +import Codec.Compression.Zstd.Streaming qualified as Zstd import Codec.Compression.Zstd.Streaming (Result(..)) import Control.Exception @@ -42,33 +43,48 @@ data CompressedStreamError = instance Exception CompressedStreamError +writeCompressedChunkZstd :: forall m . MonadIO m + => ( ByteString -> m () ) + -> Result + -> Maybe ByteString + -> m Result + +writeCompressedChunkZstd sink stream mlbs = do + flip fix ( LBS.toChunks lbs, stream) $ \next -> \case + + ([], r@(Done s)) -> sink (LBS.fromStrict s) >> pure r + + (_, Done{}) -> liftIO (throwIO CompressedStreamWriteError) + + (_, Error{})-> liftIO (throwIO CompressedStreamWriteError) + + (w, Produce s continue) -> do + sink (LBS.fromStrict s) + c <- liftIO continue + next (w, c) + + (_, Consume consume) | isNothing mlbs -> do + r <- liftIO (consume mempty) + next ([], r) + + ([], r@(Consume{})) -> pure r + + (x:xs, r@(Consume consume)) -> do + what <- liftIO (consume x) + next (xs, what) + + where + lbs = fromMaybe mempty mlbs + + writeCompressedStreamZstd :: forall m . MonadIO m => Result -> m (Maybe ByteString) -> ( ByteString -> m () ) -> m () writeCompressedStreamZstd stream source sink = do - - flip fix (mempty,stream) $ \next -> \case - (_, Done s) -> sink (LBS.fromStrict s) - - (_,Error _ _) -> liftIO (throwIO CompressedStreamWriteError) - - (some, Produce s continue) -> do - sink (LBS.fromStrict s) - c <- liftIO continue - next (some, c) - - ([], w@(Consume consume)) -> do - source >>= \case - Just piece -> do - next (LBS.toChunks piece, w) - - Nothing -> do - c <- liftIO (consume mempty) - next ([], c) - - (x:xs, Consume consume) -> do - c <- liftIO (consume x) - next (xs, c) + flip fix stream $ \next sn -> do + source >>= \case + Nothing -> writeCompressedChunkZstd sink sn Nothing >> none + Just lbs -> writeCompressedChunkZstd sink sn (Just lbs) >>= next