From 7f344a7f7214f31e76b03f15fbba15e78580143b Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 23 Dec 2024 18:03:43 +0300 Subject: [PATCH] wip --- hbs2-git3/app/Main.hs | 237 ++++++++++++++++++++++++++++++++---------- 1 file changed, 182 insertions(+), 55 deletions(-) diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 3baebe24..565c36ac 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -51,6 +51,7 @@ import Codec.Compression.Zstd qualified as Zstd import Codec.Compression.Zstd.Streaming qualified as ZstdS import Codec.Compression.Zstd.Streaming (Result(..)) import Codec.Compression.Zstd (maxCLevel) +import Codec.Compression.Zstd.Lazy qualified as ZstdL import Codec.Compression.Zlib qualified as Zlib @@ -1288,17 +1289,37 @@ consumed = gets LBS.null runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a runConsumeLBS s m = evalStateT (fromConsumeLBS m) s +newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadState BS.ByteString + , MonadIO + , MonadTrans + ) + + instance Monad m => BytesReader (ConsumeLBS m) where readBytes n = readChunkSimple n noBytesLeft = consumed -readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts ) +instance Monad m => BytesReader (ConsumeBS m) where + noBytesLeft = gets BS.null + readBytes n = do + s <- get + let (a,b) = BS.splitAt n s + put $! b + pure (LBS.fromStrict a) + +runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a +runConsumeBS s m = evalStateT (fromConsumeBS m) s + +readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m ) => opts - -> ByteString -> ( GitHash -> Int -> ByteString -> m () ) -> m Int -readLogFileLBS _ lbs action = runConsumeLBS lbs $ flip fix 0 \go n -> do +readLogFileLBS _ action = flip fix 0 \go n -> do done <- noBytesLeft if done then pure n else do @@ -1313,7 +1334,7 @@ readLogFileLBS _ lbs action = runConsumeLBS lbs $ flip fix 0 \go n -> do sdata <- readBytesMaybe ( ssize - 20 ) >>= orThrow SomeReadLogError - void $ lift $ action hash (fromIntegral ssize) sdata + void $ action hash (fromIntegral ssize) sdata go (succ n) -- FIXME: move-to-suckless-script @@ -1799,52 +1820,102 @@ theDict = do r <- S.toList_ $ for_ files $ \f -> do lbs <- liftIO $ LBS.readFile f - readLogFileLBS () lbs $ \h _ _ -> do - when (h == what) (S.yield f) + runConsumeLBS lbs $ readLogFileLBS () $ \h _ _ -> do + when (h == what) (lift $ S.yield f) for_ (HS.fromList r) $ \x -> do liftIO $ print x + entry $ bindMatch "test:git:zstd:packed:list" $ nil_ $ \case + [StringLike fn] -> lift do + lbs <- liftIO$ LBS.readFile fn + + runConsumeLBS (ZstdL.decompress lbs) $ readLogFileLBS () $ \h s _ -> do + debug $ "object" <+> pretty h <+> pretty s + + _ -> throwIO (BadFormException @C nil) + + entry $ bindMatch "test:git:zstd:pack" $ nil_ $ \syn -> do + + let (opts,argz) = splitOpts [("-l",1)] syn + + fn <- headMay [ x | StringLike x <- argz ] & orThrowUser "filename not set" + let l = headDef 5 [ fromIntegral l | ListVal [StringLike "-l", LitIntVal l ] <- opts ] + + file <- liftIO $ LBS.readFile fn + let z = ZstdL.compress l file + liftIO $ LBS.writeFile (fn <> ".z") z + entry $ bindMatch "test:git:zstd:train" $ nil_ $ \case - [ StringLike fn ] -> do + [ StringLike fn ] -> lift do file <- liftIO $ mmapFileByteString fn Nothing - pure () + + _total <- newTVarIO 0 + + samples' <- S.toList_ ( runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do + + atomically $ modifyTVar _total succ + lift $ S.yield (LBS.toStrict (LBS.take (256*1024) lbs)) ) + + let samples = [x | (1,x) <- zip (cycle [1..10]) samples' ] + + dictionary <- Zstd.trainFromSamples (256 * 1024) samples + & orThrowUser "can't train dictionary" + + debug $ "dict size" <+> pretty (BS.length $ Zstd.fromDict dictionary) + + flip runContT pure do + + ContT $ withAsync $ forever do + pause @'Seconds 1 + p <- readTVarIO _total + liftIO $ IO.hPutStr stderr (" \r" <> show (pretty p)) + + fh <- ContT $ withBinaryFile (fn <> ".packed") AppendMode + void $ runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do + + let packed = Zstd.compressUsingDict dictionary maxCLevel (LBS.toStrict lbs) + -- let packed = Zstd.compress maxCLevel (LBS.toStrict lbs) + + let kbs = coerce @_ @BS.ByteString h + let keySize = BS.length kbs + + let objectSize = BS.length packed & fromIntegral + let entrySize = fromIntegral $ keySize + objectSize + let entry = LBS.toStrict $ Builder.toLazyByteString $ Builder.word32BE entrySize + + liftIO do + atomically $ modifyTVar _total pred + -- debug $ "entry size" <+> pretty (BS.length entry) <+> pretty h <+> pretty entrySize + BS.hPutStr fh entry + BS.hPutStr fh kbs + BS.hPutStr fh packed + _ -> throwIO (BadFormException @C nil) entry $ bindMatch "test:git:read-log-file" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" - flip runContT pure do - h <- ContT $ bracket (openFile fname ReadMode) hClose - - fix \next -> do - eof <- hIsEOF h - unless eof do - size <- liftIO (BS.hGet h 4) <&> fromIntegral . N.word32 - debug $ "size" <+> pretty size - bshash <- liftIO (BS.hGet h 20) <&> GitHash - hSeek h RelativeSeek (size - 20) - -- lbs <- liftIO (LBS.hGet h (size - 20)) - debug $ "object" <+> pretty bshash - next + file <- liftIO $ mmapFileByteString fname Nothing + void $ runConsumeBS file $ readLogFileLBS () $ \h s lbs -> do + debug $ "object" <+> pretty h <+> pretty s entry $ bindMatch "test:git:read-log-lbs" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn fname <- headMay [ x | StringLike x <- argz] & orThrowUser "log file not set" theLog <- liftIO $ LBS.readFile fname - void $ readLogFileLBS () theLog $ \h s lbs -> do + void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do debug $ "object" <+> pretty h - entry $ bindMatch "test:git:log:index:naive:dump" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn fname <- headMay [ x | StringLike x <- argz] & orThrowUser "no file" - lbs <- liftIO $ LBS.readFile fname + bs <- liftIO $ mmapFileByteString fname Nothing - runConsumeLBS lbs $ flip fix 0 \go n -> do - done <- consumed + runConsumeBS bs $ flip fix 0 \go n -> do + done <- noBytesLeft if done then pure () else do ssize <- readBytesMaybe 4 @@ -1863,7 +1934,7 @@ theDict = do lbs <- liftIO $ LBS.readFile fn - hashes <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do + hashes <- S.toList_ $ runConsumeLBS lbs $ flip fix 0 \go n -> do done <- consumed if done then pure () else do @@ -2015,8 +2086,8 @@ theDict = do for_ fnames $ \f -> do theLog <- liftIO $ LBS.readFile f - void $ readLogFileLBS () theLog $ \h s lbs -> do - S.yield (coerce @_ @BS.ByteString h) + void $ runConsumeLBS theLog $ readLogFileLBS () $ \h s lbs -> do + lift $ S.yield (coerce @_ @BS.ByteString h) debug $ "object" <+> pretty h let sorted = Set.toList $ Set.fromList all @@ -2047,7 +2118,9 @@ theDict = do _already <- newTVarIO mempty - let notWrittenYet x = do + let + notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool + notWrittenYet x = do already <- readTVarIO _already <&> HS.member x -- alsoInIdx <- maybe1 mmaped (pure False) $ \m -> do -- let found = binarySearch m 24 (coerce x) & isJust @@ -2058,43 +2131,97 @@ theDict = do flip runContT pure do - tnum <- getNumCapabilities + tnum' <- getNumCapabilities - let chunks = chunksOf (total `div` tnum) r + let tnum = if total < 100 then 0 else max 0 (floor (logBase 2 (realToFrac total)) - 1) - liftIO $ forConcurrently_ chunks $ \chunk -> flip runContT pure do + liftIO $ print $ red "TNUM" <+> pretty tnum <+> pretty total + + queues <- replicateM (tnum+1) newTQueueIO <&> Vector.fromList + + feeder <- ContT $ withAsync do + let balanced = zip (cycle [0..tnum]) r + for_ balanced $ \(i,c) -> atomically $ writeTQueue (queues ! i) (Just c) + atomically $ for_ queues (`writeTQueue` Nothing) + + workers <- liftIO $ for [0..tnum] $ \i -> async $ flip runContT pure do - suff <- liftIO $ randomIO @Word32 theReader <- ContT $ withGitCat - ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode - void $ ContT $ bracket none (const $ stopProcess theReader) - for_ chunk $ \commit -> do - hashes <- gitReadTreeObjectsOnly commit - <&> (commit:) - >>= filterM notWrittenYet + liftIO do + fix \loop -> flip runContT pure do + suff <- liftIO $ randomIO @Word32 + ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode + fix \loop2 -> do + atomically (readTQueue (queues ! i)) >>= \case + Nothing -> none + Just commit -> do + debug $ "write commit and shit" <+> pretty commit - for_ hashes $ \gh -> do + hashes <- gitReadTreeObjectsOnly commit + <&> (commit:) + >>= filterM notWrittenYet - (_t,lbs) <- gitReadObjectMaybe theReader gh - >>= orThrow (GitReadError (show $ pretty gh)) + for_ hashes $ \gh -> do - let kbs = coerce @_ @BS.ByteString gh - let keySize = BS.length kbs + (_t,lbs) <- gitReadObjectMaybe theReader gh + >>= orThrow (GitReadError (show $ pretty gh)) - -- debug $ pretty gh <+> pretty keySize + let kbs = coerce @_ @BS.ByteString gh + let keySize = BS.length kbs - let objectSize = LBS.length lbs & fromIntegral - let entrySize = fromIntegral $ keySize + objectSize - let entry = LBS.toStrict $ Builder.toLazyByteString $ Builder.word32BE entrySize + let objectSize = LBS.length lbs & fromIntegral + let entrySize = fromIntegral $ keySize + objectSize - liftIO do - atomically $ modifyTVar _already (HS.insert gh) - debug $ "entry size" <+> pretty (BS.length entry) <+> pretty gh <+> pretty entrySize - BS.hPutStr ofile entry - BS.hPutStr ofile kbs - LBS.hPutStr ofile lbs + let entry = mconcat [ Builder.word32BE entrySize + , Builder.byteString kbs + , Builder.lazyByteString lbs + ] + + atomically $ modifyTVar _already (HS.insert gh) + liftIO $ LBS.hPutStr ofile ( Builder.toLazyByteString entry ) + + loop2 + + mapM_ wait (feeder:workers) + + + -- let chunks = chunksOf (total `div` tnum) r + + -- liftIO $ forConcurrently_ chunks $ \chunk -> flip runContT pure do + + -- suff <- liftIO $ randomIO @Word32 + -- theReader <- ContT $ withGitCat + -- ofile <- ContT $ withBinaryFile (show $ pretty "export-" <> pretty suff <>".log") AppendMode + + -- void $ ContT $ bracket none (const $ stopProcess theReader) + -- for_ chunk $ \commit -> do + + -- hashes <- gitReadTreeObjectsOnly commit + -- <&> (commit:) + -- >>= filterM notWrittenYet + + -- for_ hashes $ \gh -> do + + -- (_t,lbs) <- gitReadObjectMaybe theReader gh + -- >>= orThrow (GitReadError (show $ pretty gh)) + + -- let kbs = coerce @_ @BS.ByteString gh + -- let keySize = BS.length kbs + + -- -- debug $ pretty gh <+> pretty keySize + + -- let objectSize = LBS.length lbs & fromIntegral + -- let entrySize = fromIntegral $ keySize + objectSize + -- let entry = LBS.toStrict $ Builder.toLazyByteString $ Builder.word32BE entrySize + + -- liftIO do + -- atomically $ modifyTVar _already (HS.insert gh) + -- debug $ "entry size" <+> pretty (BS.length entry) <+> pretty gh <+> pretty entrySize + -- BS.hPutStr ofile entry + -- BS.hPutStr ofile kbs + -- LBS.hPutStr ofile lbs linearSearchLBS hash lbs = do