From 4941c5442ceb100fd148445901ab512bc5a37b6a Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 3 Dec 2024 11:16:22 +0300 Subject: [PATCH] wip7 --- cabal.project | 2 +- hbs2-git3/app/Main.hs | 61 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/cabal.project b/cabal.project index d182df83..b147e7fb 100644 --- a/cabal.project +++ b/cabal.project @@ -10,6 +10,6 @@ constraints: , http-client >=0.7.16 && <0.8 -- executable-static: True --- profiling: True +profiling: True --library-profiling: False diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index b1175609..c366ba69 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -66,6 +66,7 @@ import Data.ByteString qualified as BS import Data.ByteString.Lazy (ByteString) import Data.ByteString.Builder as Builder import Text.InterpolatedString.Perl6 (qc) +import Data.Set qualified as Set import Data.HashSet qualified as HS import Data.HashSet (HashSet(..)) import Data.HashMap.Strict qualified as HM @@ -101,8 +102,9 @@ quit :: MonadUnliftIO m => m () quit = liftIO Q.exitSuccess class Cached cache k v | cache -> k, cache -> v where - cached :: forall m . MonadIO m => cache -> k -> m v -> m v - uncache :: forall m . MonadIO m => cache -> k -> m () + isCached :: forall m . MonadIO m => cache -> k -> m Bool + cached :: forall m . MonadIO m => cache -> k -> m v -> m v + uncache :: forall m . MonadIO m => cache -> k -> m () data GitException = CompressionError String @@ -456,6 +458,7 @@ data EWState = newtype CacheTVH k v = CacheTVH (TVar (HashMap k v)) instance Hashable k => Cached (CacheTVH k v) k v where + isCached (CacheTVH t) k = readTVarIO t <&> HM.member k uncache (CacheTVH t) k = atomically (modifyTVar t (HM.delete k)) cached (CacheTVH t) k a = do what <- readTVarIO t <&> HM.lookup k @@ -476,6 +479,9 @@ newCacheFixedHPSQ :: MonadIO m => Int -> m (CacheFixedHPSQ k v) newCacheFixedHPSQ l = CacheFixedHPSQ l <$> newTVarIO HPSQ.empty instance (Ord k, Hashable k) => Cached (CacheFixedHPSQ k v) k v where + + isCached CacheFixedHPSQ{..} k = readTVarIO _theCache <&> HPSQ.member k + uncache CacheFixedHPSQ{..} k = atomically $ modifyTVar _theCache (HPSQ.delete k) cached CacheFixedHPSQ{..} k a = do @@ -514,14 +520,38 @@ export r = connectedDo $ flip runContT pure do sto <- lift getStorage - reader <- ContT $ withGitCat + reader <- ContT $ withGitCat + reader2 <- ContT $ withGitCat + + let commitCacheSize = 2000 missed <- CacheTVH <$> newTVarIO mempty - commits <- newCacheFixedHPSQ 1000 + commits <- newCacheFixedHPSQ commitCacheSize + + deferred <- newTQueueIO ContT $ bracket none $ const do hClose $ getStdin reader + ContT $ bracket none $ const do + hClose $ getStdin reader2 + + ContT $ withAsync $ replicateM_ 2 $ forever do + join $ atomically (readTQueue deferred) + + -- let noCBlock x = do + -- here <- withState $ selectCBlock x + -- pure (isNothing here) + + -- pre <- gitRunCommand [qc|git rev-list {pretty r}|] + -- <&> fromRight mempty + -- <&> LBS8.lines + -- <&> mapMaybe ( fromStringMay @GitHash . LBS8.unpack ) + -- <&> take (10 * commitCacheSize) + -- >>= filterM (lift . noCBlock) + -- <&> take commitCacheSize + -- >>= \xs -> lift (mapM_ (\x -> cached commits x (gitReadObjectMaybe reader x)) xs) + lift $ flip fix ExportGetCommit $ \next -> \case ExportStart -> do @@ -587,7 +617,6 @@ export r = connectedDo $ flip runContT pure do hhead <- gitRevParse co >>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co) - parents <- gitReadObjectThrow Commit hhead >>= gitReadCommitParents @@ -608,19 +637,24 @@ export r = connectedDo $ flip runContT pure do let blkMax = 1048576 - packs <- S.toList_ $ flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case + -- wtf <- ContT $ withAsync do + -- pure () + + out <- newTQueueIO + + flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case EWAcc _ [] _ [] -> none EWAcc i [] l acc -> do - lift (writePack sto l acc) >>= S.yield + writePack sto l acc >>= atomically . writeTQueue out EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do - writeLargeBlob sto reader r >>= S.yield + atomically $ writeTQueue deferred $ writeLargeBlob sto reader2 r >>= atomically . writeTQueue out go (EWAcc (succ i) rs l acc) EWAcc i rs l acc | l >= blkMax -> do - lift (writePack sto l acc) >>= S.yield + atomically $ writeTQueue deferred $ writePack sto l acc >>= atomically . writeTQueue out go (EWAcc (succ i) rs 0 mempty) EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do @@ -631,6 +665,11 @@ export r = connectedDo $ flip runContT pure do go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc)) + packs <- atomically do + allDone <- isEmptyTQueue deferred + unless allDone STM.retry + STM.flushTQueue out + phashes <- catMaybes <$> withState (for parents selectCBlock) let v = "hbs2-git 3.0 zstd" @@ -639,7 +678,7 @@ export r = connectedDo $ flip runContT pure do hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef - let cblock = hmeta : phashes <> packs + let cblock = hmeta : uniqAndOrdered phashes <> uniqAndOrdered packs let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock root <- makeMerkle 0 pt $ \(_,_,s) -> do @@ -669,6 +708,8 @@ export r = connectedDo $ flip runContT pure do where finish = none + uniqAndOrdered = Set.toList . Set.fromList + writeLargeBlob sto reader GitTreeEntry{..} = liftIO do size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash)) debug $ yellow "write large object" <+> pretty gitEntryHash