From 778e172b9d0759f147b38a3bd6692f05739b0a65 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 3 Dec 2024 07:55:22 +0300 Subject: [PATCH] wip3 --- hbs2-git3/app/Main.hs | 494 +++++++++++++++++++++++------------------- 1 file changed, 268 insertions(+), 226 deletions(-) diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 54c0ede3..fca3c089 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -1,4 +1,5 @@ {-# OPTIONS_GHC -fno-warn-orphans #-} +{-# Language FunctionalDependencies #-} {-# Language ViewPatterns #-} {-# Language PatternSynonyms #-} {-# Language RecordWildCards #-} @@ -67,6 +68,7 @@ import Text.InterpolatedString.Perl6 (qc) import Data.HashSet qualified as HS import Data.HashSet (HashSet(..)) import Data.HashMap.Strict qualified as HM +import Data.HashMap.Strict (HashMap(..)) import Data.Word import Streaming.Prelude qualified as S @@ -97,6 +99,9 @@ type HBS2GitPerks m = (MonadUnliftIO m) quit :: MonadUnliftIO m => m () quit = liftIO Q.exitSuccess +class Cached cache k v | cache -> k, cache -> v where + cached :: forall m . MonadIO m => cache -> k -> m v -> m v + data GitException = CompressionError String | DecompressionError String @@ -361,6 +366,10 @@ gitReadCommitParents bs = do | ListVal [ StringLike "parent", StringLike hash ] <- what ] & catMaybes +gitObjectExists :: (MonadIO m, Pretty what) => what -> m Bool +gitObjectExists what = do + gitRunCommand [qc|git cat-file -e {pretty what}|] <&> isRight + data UState = UHead ByteString @@ -420,8 +429,10 @@ enumGitPackObjectsFromLBS lbs action = do data ExportState = ExportGetCommit + | ExportProcessCommit GitHash ByteString | ExportCheck | ExportStart + | ExportExit data WState = WStart @@ -440,6 +451,260 @@ data WInput = data EWState = EWAcc Int [GitTreeEntry] Int [(GitHash, GitObjectType,Maybe GitTreeEntry, ByteString)] +newtype CacheTVH k v = CacheTVH (TVar (HashMap k v)) + +instance Hashable k => Cached (CacheTVH k v) k v where + cached (CacheTVH t) k a = do + what <- readTVarIO t <&> HM.lookup k + case what of + Just x -> pure x + Nothing -> do + r <- a + atomically $ modifyTVar t (HM.insert k r) + pure r + +export :: ( HBS2GitPerks m + , MonadUnliftIO m + , MonadReader Git3Env m + , HasStorage m + , HasStateDB m + ) + => GitHash -> m () +export r = connectedDo $ flip runContT pure do + debug $ green "export" <+> pretty r + + q <- newTVarIO ( HPSQ.empty @GitHash @Double @() ) + done <- newTVarIO ( mempty :: HashSet GitHash ) + + atomically $ modifyTVar q (HPSQ.insert r 1.0 ()) + + sto <- lift getStorage + + reader <- ContT $ withGitCat + + missed <- CacheTVH <$> newTVarIO mempty + + ContT $ bracket none $ const do + hClose $ getStdin reader + + lift $ flip fix ExportGetCommit $ \next -> \case + + ExportStart -> do + here <- withState $ selectCBlock r <&> isJust + if here then next ExportCheck else next ExportGetCommit + + ExportGetCommit -> do + + co' <- atomically $ stateTVar q $ HPSQ.alterMin \case + Nothing -> (Nothing, Nothing) + Just (k,p,v) -> (Just (k,p), Nothing) + + case co' of + Nothing -> do + debug $ red "go ExportCheck" + next ExportCheck + + Just (co,prio) -> do + debug $ "Process commit" <+> pretty co <+> pretty prio + debug $ "check-pack-for" <+> pretty prio <+> pretty co + + isDone <- readTVarIO done <&> HS.member co + + let already = isDone + + if already + then do + next ExportGetCommit + else do + (t,bs) <- liftIO (gitReadObjectMaybe reader co) + >>= orThrow (GitReadError (show $ pretty co)) + + parents <- gitReadCommitParents bs + + n <- for (zip [1..] parents) $ \(i,gh) -> do + -- exists <- cached missed gh (gitObjectExists gh) + exists <- liftIO $ cached missed gh (isJust <$> gitReadObjectMaybe reader gh) + here <- withState $ selectCBlock gh <&> isJust + + unless exists do + debug $ red "missed!" <+> pretty gh + -- atomically $ modifyTVar done (HS.insert gh) + + atomically do + pdone <- readTVar done <&> HS.member gh + if pdone || here || not exists then do -- for shallow commits? + pure 0 + else do + modifyTVar q (HPSQ.insert gh (prio-i) ()) + pure 1 + + if sum n == 0 then do + next $ ExportProcessCommit co bs + else do + -- error "FUCK!" + debug $ yellow "put commit back" <+> pretty co + atomically $ modifyTVar q (HPSQ.insert co prio ()) + next ExportGetCommit + + ExportProcessCommit co bs -> do + debug $ "write pack for" <+> pretty co + + hhead <- gitRevParse co + >>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co) + + + parents <- gitReadObjectThrow Commit hhead + >>= gitReadCommitParents + + skip <- if not (excludeParents ()) then do + pure mempty + else do + skip' <- S.toList_ $ for parents $ \p -> do + -- exists <- liftIO $ cached missed p (gitObjectExists p) + exists <- liftIO $ cached missed p (isJust <$> gitReadObjectMaybe reader p) + when exists do + gitReadTree p <&> fmap gitEntryHash >>= S.each + + pure $ HS.fromList skip' + + r <- gitReadTree hhead + <&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip)) + -- <&> L.filter (\GitTreeEntry{..} -> gitEntryType /= Tree) + <&> sortGitTreeEntries + + let blkMax = 1048576 + + out <- newTQueueIO + + flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case + + EWAcc _ [] _ [] -> none + + EWAcc i [] l acc -> do + writePack sto l acc >>= atomically . writeTQueue out + + EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do + writeLargeBlob sto reader r >>= atomically . writeTQueue out + go (EWAcc (succ i) rs l acc) + + EWAcc i rs l acc | l >= blkMax -> do + writePack sto l acc >>= atomically . writeTQueue out + go (EWAcc (succ i) rs 0 mempty) + + EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do + + lbs <- gitReadObjectMaybe reader gitEntryHash + >>= orThrow (GitReadError (show $ pretty gitEntryHash)) + <&> snd + + go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc)) + + packs <- atomically $ STM.flushTQueue out + + phashes <- catMaybes <$> withState (for parents selectCBlock) + + let v = "hbs2-git 3.0 zstd" + let pps = vcat $ mconcat $ for phashes $ \p -> ["p" <+> pretty p] + let meta = LBS8.pack $ show $ pretty v <> line <> pps + + hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef + + let cblock = hmeta : phashes <> packs + let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock + + root <- makeMerkle 0 pt $ \(_,_,s) -> do + void $ putBlock sto s + + withState $ transactional do + for_ packs $ \href -> do + insertGitPack co href + insertCBlock co (HashRef root) + + notice $ "cblock" <+> pretty root + + atomically do + modifyTVar done (HS.insert co) + modifyTVar q (HPSQ.delete co) + + next ExportGetCommit + + ExportCheck -> do + debug $ "ExportCheck dummy" <+> pretty r + c <- withState $ selectCBlock r >>= orThrowUser "export failed" + liftIO $ hPrint stdout (pretty c) + next ExportExit + + ExportExit -> finish + + where + finish = none + + writeLargeBlob sto reader GitTreeEntry{..} = liftIO do + size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash)) + debug $ yellow "write large object" <+> pretty gitEntryHash + let p = Builder.byteString [qc|{pretty $ Short gitEntryType} {pretty size} {pretty gitEntryHash} {gitEntryName}|] + <> Builder.byteString "\n" + & LBS.toStrict . Builder.toLazyByteString + + -- TODO: check-if-work-on-large-files + pieces <- S.toList_ do + + stream <- lift $ ZstdS.compress maxCLevel + + (t,lbs) <- gitReadObjectMaybe reader gitEntryHash + >>= orThrow (GitReadError (show $ pretty gitEntryHash)) + + let chunks = p : LBS.toChunks lbs + + flip fix (chunks, stream) $ \go r -> + case r of + (c, Produce chunk continue) -> do + S.yield chunk + w <- lift continue + go (c,w) + + ([], Consume consume) -> do + x <- lift $ consume mempty + go ([],x) + + (s:ss, Consume consume) -> do + x <- lift $ consume s + go (ss,x) + + (_,Done bs) -> do + S.yield bs + + (_,Error s1 s2) -> do + throwIO (CompressionError (s1 <> " " <> s2)) + + -- TODO: check-if-work-on-large-files + createTreeWithMetadata sto mzero mempty (LBS.fromChunks pieces) + >>= orThrowPassIO + + writePack sto l racc = do + -- write + -- pack + -- merkle + let acc = reverse racc + debug $ green "write pack of objects" <+> pretty l <+> pretty (length acc) + + parts <- for acc $ \(h,t,e,lbs) -> liftIO do + let ename = [qc|{fromMaybe mempty $ gitEntryName <$> e}|] :: ByteString + + -- notice $ "pack" <+> pretty h <+> pretty t + let p = Builder.byteString [qc|{pretty $ Short t} {pretty (LBS.length lbs)} {pretty h} {ename}|] + <> Builder.byteString "\n" + <> Builder.lazyByteString lbs + <> Builder.byteString "\n" + pure p + + let packed = Zstd.compress maxCLevel (LBS.toStrict $ Builder.toLazyByteString $ mconcat parts) + + createTreeWithMetadata sto mzero mempty (LBS.fromStrict packed) + >>= orThrowPassIO + + + theDict :: forall m . ( HBS2GitPerks m , HasClientAPI PeerAPI UNIX m , HasStorage m @@ -540,237 +805,13 @@ theDict = do putStrLn $ show $ pretty t <+> pretty h <+> pretty s pure True - entry $ bindMatch "test:git:tree:export" $ nil_ $ \syn -> do - - mkdir "export" - + entry $ bindMatch "test:git:tree:export" $ nil_ $ \syn -> lift do r <- case syn of [] -> gitRevParseThrow "HEAD" [ StringLike co ] -> gitRevParseThrow co _ -> throwIO (BadFormException @C nil) - debug $ "process commit" <+> pretty r - - q <- newTVarIO ( HPSQ.empty @GitHash @Double @() ) - done <- newTVarIO ( mempty :: HashSet GitHash ) - - atomically $ modifyTVar q (HPSQ.insert r 1.0 ()) - - lift $ connectedDo do - - sto <- getStorage - - flip runContT pure do - - reader <- ContT $ withGitCat - - ContT $ bracket none $ const do - hClose $ getStdin reader - - flip fix ExportGetCommit $ \next -> \case - - ExportStart -> do - here <- lift $ withState $ selectCBlock r <&> isJust - if here then next ExportCheck else next ExportGetCommit - - ExportGetCommit -> do - - co' <- atomically $ stateTVar q $ HPSQ.alterMin \case - Nothing -> (Nothing, Nothing) - Just (k,p,v) -> (Just (k,p), Nothing) - - case co' of - Nothing -> next ExportCheck - - Just (co,prio) -> do - debug $ "Process commit" <+> pretty co - debug $ "check-pack-for" <+> pretty prio <+> pretty co - - isDone <- readTVarIO done <&> HS.member co - - let already = isDone - - if already - then next ExportGetCommit - else do - (t,bs) <- liftIO (gitReadObjectMaybe reader co) - >>= orThrow (GitReadError (show $ pretty co)) - - parents <- gitReadCommitParents bs - - n <- for (zip [1..] parents) $ \(i,gh) -> do - here <- lift $ withState $ selectCBlock gh <&> isJust - -- here <- pure False -- lift $ withState $ selectCBlock gh <&> isJust - atomically do - pdone <- readTVar done <&> HS.member gh - if pdone || here then do - pure 0 - else do - modifyTVar q (HPSQ.insert gh (prio-i) ()) - pure 1 - - if sum n == 0 then lift do - debug $ "write pack for" <+> pretty co - - let dir = "export" - - mkdir dir - - hhead <- gitRevParse co - >>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co) - - parents <- gitReadObjectThrow Commit hhead >>= gitReadCommitParents - - skip <- if not (excludeParents ()) then do - pure mempty - else do - skip' <- S.toList_ $ for parents $ \p -> do - gitReadTree p <&> fmap gitEntryHash >>= S.each - pure $ HS.fromList skip' - - - r <- gitReadTree hhead - <&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip)) - -- <&> L.filter (\GitTreeEntry{..} -> gitEntryType /= Tree) - <&> sortGitTreeEntries - - let blkMax = 1048576 - - out <- newTQueueIO - - let writeLargeBlob n GitTreeEntry{..} = do - size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash)) - debug $ yellow "write large object" <+> pretty gitEntryHash - let p = Builder.byteString [qc|{pretty $ Short gitEntryType} {pretty size} {pretty gitEntryHash} {gitEntryName}|] - <> Builder.byteString "\n" - & LBS.toStrict . Builder.toLazyByteString - - liftIO do - - -- TODO: check-if-work-on-large-files - pieces <- S.toList_ do - - stream <- lift $ ZstdS.compress maxCLevel - - let fn = dir show (pretty co) <> "." <> show n <> ".big" <> ".pack" - - (t,lbs) <- gitReadObjectMaybe reader gitEntryHash - >>= orThrow (GitReadError (show $ pretty gitEntryHash)) - - let chunks = p : LBS.toChunks lbs - - flip fix (chunks, stream) $ \go r -> - case r of - (c, Produce chunk continue) -> do - S.yield chunk - w <- lift continue - go (c,w) - - ([], Consume consume) -> do - x <- lift $ consume mempty - go ([],x) - - (s:ss, Consume consume) -> do - x <- lift $ consume s - go (ss,x) - - (_,Done bs) -> do - S.yield bs - debug "done!" - - (_,Error s1 s2) -> do - throwIO (CompressionError (s1 <> " " <> s2)) - - -- TODO: check-if-work-on-large-files - href <- createTreeWithMetadata sto mzero mempty (LBS.fromChunks pieces) - >>= orThrowPassIO - - atomically $ writeTQueue out href - - let writePack i l racc = do - -- write - -- pack - -- merkle - let fn = dir show (pretty co) <> "." <> show (length racc) <> "." <> show i <> ".pack" - let acc = reverse racc - debug $ green "write pack of objects" <+> pretty l <+> pretty (length acc) - - parts <- for acc $ \(h,t,e,lbs) -> liftIO do - let ename = [qc|{fromMaybe mempty $ gitEntryName <$> e}|] :: ByteString - - -- notice $ "pack" <+> pretty h <+> pretty t - let p = Builder.byteString [qc|{pretty $ Short t} {pretty (LBS.length lbs)} {pretty h} {ename}|] - <> Builder.byteString "\n" - <> Builder.lazyByteString lbs - <> Builder.byteString "\n" - pure p - - let packed = Zstd.compress maxCLevel (LBS.toStrict $ Builder.toLazyByteString $ mconcat parts) - - href <- createTreeWithMetadata sto mzero mempty (LBS.fromStrict packed) - >>= orThrowPassIO - - atomically $ writeTQueue out href - - flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case - - EWAcc _ [] _ [] -> none - - EWAcc i [] l acc -> do - writePack i l acc - - EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do - writeLargeBlob i r - go (EWAcc (succ i) rs l acc) - - EWAcc i rs l acc | l >= blkMax -> do - writePack i l acc - go (EWAcc (succ i) rs 0 mempty) - - EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do - - lbs <- gitReadObjectMaybe reader gitEntryHash - >>= orThrow (GitReadError (show $ pretty gitEntryHash)) - <&> snd - - go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc)) - - packs <- atomically $ STM.flushTQueue out - - phashes <- withState $ for parents \p -> do - selectCBlock p - >>= orThrowUser ("pack export failed" <+> pretty p) - - let v = "hbs2-git 3.0 zstd" - let pps = vcat $ mconcat $ for phashes $ \p -> ["p" <+> pretty p] - let meta = LBS8.pack $ show $ pretty v <> line <> pps - - hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef - - let cblock = hmeta : phashes <> packs - let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock - - root <- makeMerkle 0 pt $ \(_,_,s) -> do - void $ putBlock sto s - - withState $ transactional do - for_ packs $ \href -> do - insertGitPack co href - insertCBlock co (HashRef root) - - notice $ "cblock" <+> pretty root - - atomically $ modifyTVar done (HS.insert co) - else do - atomically $ modifyTVar q (HPSQ.insert co prio ()) - - next ExportGetCommit - - ExportCheck -> do - debug $ "ExportCheck dummy" <+> pretty r - c <- lift $ withState $ selectCBlock r >>= orThrowUser "export failed" - liftIO $ hPrint stdout (pretty c) - + export r -- debugPrefix :: LoggerEntry -> LoggerEntry debugPrefix = toStderr . logPrefix "[debug] " @@ -812,4 +853,5 @@ main = flip runContT pure do conf <- readLocalConf let dict = theDict recover $ run dict (conf <> cli) + `finally` silence