From 6e39900d6b375df83bba5201069829532b7a3527 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 25 Dec 2024 07:08:30 +0300 Subject: [PATCH] wip2 --- hbs2-git3/app/Main.hs | 245 ++++------------------ hbs2-git3/hbs2-git3.cabal | 2 +- hbs2-git3/lib/HBS2/Data/Log/Structured.hs | 87 ++++++++ 3 files changed, 127 insertions(+), 207 deletions(-) diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 658a52e4..e262b5e2 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -706,34 +706,6 @@ instance (Ord k, Hashable k) => Cached (CacheFixedHPSQ k v) k v where pure v -data RCC = - RCC [GitHash] (HashMap GitHash (HashSet GitHash)) - -readCommitChain :: ( HBS2GitPerks m - , MonadUnliftIO m - , MonadReader Git3Env m - , HasStorage m - , HasStateDB m - ) - => Maybe GitRef - -> GitHash - -> ( GitHash -> m () ) - -> m (HashMap GitHash (HashSet GitHash)) - -readCommitChain _ h0 action = flip runContT pure $ callCC \_ -> do - theReader <- ContT $ withGitCat - void $ ContT $ bracket (pure theReader) stopProcess - flip fix (RCC [h0] mempty) $ \next -> \case - RCC [] seen -> pure seen - RCC ( h : hs ) seen | HM.member h seen -> next ( RCC hs seen ) - RCC ( h : hs ) seen -> do - lift (action h) - co <- gitReadObjectMaybe theReader h >>= orThrowUser ("object not found" <+> pretty h) - parents <- gitReadCommitParents (Just h) (snd co) - debug $ "processed commit" <+> pretty h - next $ RCC ( parents <> hs ) (HM.insertWith (<>) h (HS.fromList parents) seen) - - data HCC = HCC { hccHeight :: Int , hccRest :: [GitHash] @@ -746,23 +718,36 @@ readCommitChainHPSQ :: ( HBS2GitPerks m , HasStorage m , HasStateDB m ) - => Maybe GitRef + => (GitHash -> m Bool) + -> Maybe GitRef -> GitHash -> (GitHash -> m ()) -> m (HashPSQ GitHash Int (HashSet GitHash)) -readCommitChainHPSQ _ h0 action = flip runContT pure $ callCC \_ -> do +readCommitChainHPSQ filt _ h0 action = flip runContT pure $ callCC \_ -> do theReader <- ContT $ withGitCat void $ ContT $ bracket (pure theReader) stopProcess flip fix (HCC 0 [h0] HPSQ.empty) $ \next -> \case + HCC _ [] result -> pure result + HCC n ( h : hs ) result | HPSQ.member h result -> next ( HCC n hs result ) + HCC n ( h : hs ) result -> do - co <- gitReadObjectMaybe theReader h - >>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h) - parents <- gitReadCommitParents (Just h) (snd co) - lift $ action h - next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result ) + + done <- not <$> lift ( filt h ) + + if done then next (HCC n hs result) else do + + co <- gitReadObjectMaybe theReader h + >>= orThrow(GitReadError $ show $ pretty "object not found" <+> pretty h) + + parents <- gitReadCommitParents (Just h) (snd co) + + lift $ action h + next $ HCC (n-1) ( parents <> hs ) (snd $ HPSQ.alter (addParents () n parents) h result ) + + where addParents :: a -> Int @@ -1238,86 +1223,6 @@ indexCBlockCommits :: forall m . ( MonadIO m indexCBlockCommits cb = do pure () -class ReadLogOpts a where - -data ReadLogError = SomeReadLogError - deriving stock (Typeable, Show) - -instance Exception ReadLogError - -instance ReadLogOpts () - -type NumBytes = Int - -class Monad m => BytesReader m where - noBytesLeft :: m Bool - readBytes :: NumBytes -> m ByteString - - readBytesMaybe :: NumBytes -> m (Maybe ByteString) - readBytesMaybe n = do - bs <- readBytes n - if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing - -newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a } - deriving newtype ( Applicative - , Functor - , Monad - , MonadState ByteString - , MonadIO - , MonadTrans - ) - -readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString -readChunkThrow n = do - lbs <- get - let (this, that) = LBS.splitAt (fromIntegral n) lbs - if LBS.length this /= fromIntegral n then - throwIO SomeReadLogError - else do - put $! that - pure this - -readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString -readChunkSimple n = do - lbs <- get - let (this, that) = LBS.splitAt (fromIntegral n) lbs - put $! that - pure this - -reminds :: Monad m => ConsumeLBS m Int -reminds = gets (fromIntegral . LBS.length) - -consumed :: Monad m => ConsumeLBS m Bool -consumed = gets LBS.null - -runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a -runConsumeLBS s m = evalStateT (fromConsumeLBS m) s - -newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a } - deriving newtype ( Applicative - , Functor - , Monad - , MonadState BS.ByteString - , MonadIO - , MonadTrans - ) - - -instance Monad m => BytesReader (ConsumeLBS m) where - readBytes n = readChunkSimple n - noBytesLeft = consumed - -instance Monad m => BytesReader (ConsumeBS m) where - noBytesLeft = gets BS.null - readBytes n = do - s <- get - let (a,b) = BS.splitAt n s - put $! b - pure (LBS.fromStrict a) - -runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a -runConsumeBS s m = evalStateT (fromConsumeBS m) s - readLogFileLBS :: forall opts m . ( MonadIO m, ReadLogOpts opts, BytesReader m ) => opts -> ( GitHash -> Int -> ByteString -> m () ) @@ -1719,78 +1624,6 @@ theDict = do export (w <|> re <|> hd) r - entry $ bindMatch "test:git:read-commit-chain-full" $ nil_ $ \syn -> lift do - let (opts, argz) = splitOpts [("--threads",1)] syn - - - xead <- headDef "HEAD" [ x | StringLike x <- argz ] & gitRevParseThrow - let tnum = headDef 1 [fromIntegral x | ListVal [StringLike "--threads", LitIntVal x] <- opts] - - co <- readCommitChain Nothing xead dontHandle - let total = HM.size co - _n <- newTVarIO 0 - - t0 <- getTimeCoarse - - flip runContT pure do - - _fake <- newTVarIO 0 - - void $ ContT $ withAsync $ flip fix (0,t0) $ \next (x,tt0) -> do - pause @'Seconds 1 - t1 <- getTimeCoarse - n <- readTVarIO _n - let dt = 1e-9 * realToFrac (t1 - tt0) - - -- when (dt <= 0) $ next (n,t1) - - let dn = realToFrac (n-x) - let v = realToFrac $ dn / dt - let est = if v > 0 then realToFrac (total - n) / v :: Fixed E2 else 0 - sz <- readTVarIO _fake - - debug $ "read" - <+> pretty n <+> "/" <+> pretty total - <+> "elapsed" <+> pretty (ceiling $ realToFrac (t1-t0) * 1e-9 ) - <+> "dn" <+> pretty dn - <+> "sz" <+> pretty (realToFrac sz / (1024*1024) :: Fixed E2) - <+> pretty (realToFrac @(Fixed E2) v) <+> "per sec" - <+> "est." <+> pretty est - - next (n,t1) - - _already <- newTVarIO ( mempty :: HashSet GitHash ) - - let chunks = chunksOf (total `div` tnum) (HM.keys co) - - liftIO $ pooledForConcurrentlyN_ tnum chunks $ \chunk -> flip runContT pure do - theReader <- ContT $ withGitCat - void $ ContT $ bracket none (const $ stopProcess theReader) - for_ chunk $ \commit -> do - - hashes <- gitReadTreeObjectsOnly commit - >>= filterM ( \x -> readTVarIO _already <&> not . HS.member x) - --- hashes <- gitReadTree commit --- <&> fmap gitEntryHash --- >>= filterM ( \x -> readTVarIO _already <&> not . HS.member x) - - atomically $ modifyTVar _n succ - - for_ hashes $ \gh -> do - - (_t,lbs) <- gitReadObjectMaybe theReader gh - >>= orThrow (GitReadError (show $ pretty gh)) - - let l = sum $ fmap BS.length (LBS.toChunks lbs) - - atomically do - modifyTVar' _fake (+ l) - modifyTVar' _already (HS.insert gh) - - n <- readTVarIO _n - liftIO $ print $ pretty "read objects" <+> pretty n <+> "of" <+> pretty total - entry $ bindMatch "test:git:read-commit-chain" $ nil_ $ \syn -> do (mpath, hss) <- case syn of [ HashLike s ] -> pure (Nothing, s) @@ -1801,14 +1634,14 @@ theDict = do liftIO $ mapM_ setCurrentDirectory mpath -- let hss = headDef "HEAD" [ x | StringLike x <- snd (splitOpts [] syn) ] h <- gitRevParseThrow hss - r <- lift $ readCommitChain Nothing h dontHandle - liftIO $ print ( HM.size r ) + r <- lift $ readCommitChainHPSQ (const $ pure True) Nothing h dontHandle + liftIO $ print ( HPSQ.size r ) entry $ bindMatch "test:git:read-commit-chain-dfs" $ nil_ $ \syn -> lift do let (_, argz) = splitOpts [] syn let hd = headDef "HEAD" [ x | StringLike x <- argz] h <- gitRevParseThrow hd - r <- readCommitChainHPSQ Nothing h (\c -> debug $ "commit" <+> pretty c) + r <- readCommitChainHPSQ (const $ pure True) Nothing h (\c -> debug $ "commit" <+> pretty c) <&> HPSQ.toList <&> sortBy (comparing (view _2)) for_ r $ \(c,_,_) -> do @@ -2118,35 +1951,34 @@ theDict = do entry $ bindMatch "test:git:export-commit-dfs" $ nil_ $ \syn -> lift do let (opts, argz) = splitOpts [("--index",1)] syn - let hd = headDef "HEAD" [ x | StringLike x <- argz] - h <- gitRevParseThrow hd - hpsq <- readCommitChainHPSQ Nothing h (\c -> debug $ "commit" <+> pretty c) let useIndex = headMay [ f | ListVal [StringLike "--index", StringLike f] <- opts ] + let hd = headDef "HEAD" [ x | StringLike x <- argz] + h <- gitRevParseThrow hd + mmaped <- runMaybeT do fname <- toMPlus useIndex liftIO $ mmapFileByteString fname Nothing - let r = HPSQ.toList hpsq - & sortBy (comparing (view _2)) - & fmap (view _1) - - let total = HPSQ.size hpsq - _already <- newTVarIO mempty let notWrittenYet :: forall m . MonadIO m => GitHash -> m Bool notWrittenYet x = do already <- readTVarIO _already <&> HS.member x - -- alsoInIdx <- maybe1 mmaped (pure False) $ \m -> do - -- let found = binarySearch m 24 (coerce x) & isJust - -- -- error $ show $ "MOTHERFUCKER!" <+> pretty x <+> pretty found - -- pure found + alsoInIdx <- maybe1 mmaped (pure False) $ \m -> do + found <- binarySearchBS 24 (BS.take 20 . BS.drop 4) (coerce x) m + pure (isJust found) + pure (not already && not alsoInIdx) - pure (not already) -- && not alsoInIdx) + hpsq <- readCommitChainHPSQ notWrittenYet Nothing h (\c -> debug $ "commit" <+> pretty c) + let r = HPSQ.toList hpsq + & sortBy (comparing (view _2)) + & fmap (view _1) + + let total = HPSQ.size hpsq liftIO $ flip runContT pure do @@ -2184,15 +2016,16 @@ theDict = do >>= filterM notWrittenYet for_ hashes $ \gh -> do + atomically $ modifyTVar _already (HS.insert gh) (_t,lbs) <- gitReadObjectMaybe theReader gh >>= orThrow (GitReadError (show $ pretty gh)) let e = [ Builder.byteString (coerce gh) + , Builder.char8 (headDef 'B' $ show $ pretty $ Short _t) , Builder.lazyByteString lbs ] & Builder.toLazyByteString . mconcat atomically do - modifyTVar _already (HS.insert gh) writeTBQueue sourceQ (Just e) ContT $ withAsync $ forever do @@ -2234,7 +2067,7 @@ linearSearchLBS hash lbs = do pure $ listToMaybe found -binarySearchBS :: MonadIO m +binarySearchBS :: Monad m => Int -- ^ record size -> ( BS.ByteString -> BS.ByteString ) -- ^ key extractor -> BS.ByteString -- ^ key diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index e8a26432..2156bba4 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -19,7 +19,7 @@ common shared-properties -threaded -rtsopts -O2 - "-with-rtsopts=-N4 -A64m -AL256m -I0" + "-with-rtsopts=-N4 -A64m -AL256m -I0 -M4G" default-language: GHC2021 diff --git a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs index 917c7199..6edb36b3 100644 --- a/hbs2-git3/lib/HBS2/Data/Log/Structured.hs +++ b/hbs2-git3/lib/HBS2/Data/Log/Structured.hs @@ -8,6 +8,7 @@ import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.Maybe import Network.ByteOrder hiding (ByteString) +import Control.Monad.State import Codec.Compression.Zstd qualified as Zstd import Codec.Compression.Zstd.Streaming qualified as Zstd @@ -17,6 +18,92 @@ import Control.Exception -- import UnliftIO +class ReadLogOpts a where + +data ReadLogError = SomeReadLogError + deriving stock (Typeable, Show) + +instance Exception ReadLogError + +instance ReadLogOpts () + +type NumBytes = Int + +class Monad m => BytesReader m where + noBytesLeft :: m Bool + readBytes :: NumBytes -> m ByteString + + readBytesMaybe :: NumBytes -> m (Maybe ByteString) + readBytesMaybe n = do + bs <- readBytes n + if LBS.length bs == fromIntegral n then pure (Just bs) else pure Nothing + +newtype ConsumeLBS m a = ConsumeLBS { fromConsumeLBS :: StateT ByteString m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadState ByteString + , MonadIO + , MonadTrans + ) + +readChunkThrow :: MonadIO m => Int -> ConsumeLBS m ByteString +readChunkThrow n = do + lbs <- get + let (this, that) = LBS.splitAt (fromIntegral n) lbs + if LBS.length this /= fromIntegral n then + liftIO $ throwIO SomeReadLogError + else do + put $! that + pure this + +readChunkSimple :: Monad m => Int -> ConsumeLBS m ByteString +readChunkSimple n = do + lbs <- get + let (this, that) = LBS.splitAt (fromIntegral n) lbs + put $! that + pure this + +reminds :: Monad m => ConsumeLBS m Int +reminds = gets (fromIntegral . LBS.length) + +consumed :: Monad m => ConsumeLBS m Bool +consumed = gets LBS.null + +runConsumeLBS :: Monad m => ByteString -> ConsumeLBS m a -> m a +runConsumeLBS s m = evalStateT (fromConsumeLBS m) s + +newtype ConsumeBS m a = ConsumeBS { fromConsumeBS :: StateT BS.ByteString m a } + deriving newtype ( Applicative + , Functor + , Monad + , MonadState BS.ByteString + , MonadIO + , MonadTrans + ) + + +instance Monad m => BytesReader (ConsumeLBS m) where + readBytes n = readChunkSimple n + noBytesLeft = consumed + +instance Monad m => BytesReader (ConsumeBS m) where + noBytesLeft = gets BS.null + readBytes n = do + s <- get + let (a,b) = BS.splitAt n s + put $! b + pure (LBS.fromStrict a) + +runConsumeBS :: Monad m => BS.ByteString -> ConsumeBS m a -> m a +runConsumeBS s m = evalStateT (fromConsumeBS m) s + + + + + + + writeSection :: forall m . Monad m => ByteString -> ( ByteString -> m () )