From 1c0952ad95ef4de5347d401de02f03a3c9e27b92 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 2 Dec 2024 13:16:43 +0300 Subject: [PATCH] wip --- docs/devlog.md | 4 + hbs2-git3/app/Main.hs | 404 +++++++++++++----------- hbs2-git3/hbs2-git3.cabal | 1 + hbs2-git3/lib/HBS2/Git3/State/Direct.hs | 14 +- 4 files changed, 227 insertions(+), 196 deletions(-) diff --git a/docs/devlog.md b/docs/devlog.md index 3ef56170..9f7ed1f0 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1,3 +1,7 @@ +## 2024-12-02 + +Пробуем новую структуру репозитория + ## 2024-02-24 wtf? diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 4d9ecf67..54c0ede3 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -15,6 +15,7 @@ import HBS2.Data.Detect qualified as Detect import HBS2.Storage import HBS2.Storage.Operations.Class +import HBS2.Storage.Operations.ByteString import HBS2.Peer.CLI.Detect import HBS2.Peer.RPC.Client import HBS2.Peer.RPC.Client.Unix @@ -49,6 +50,10 @@ import DBPipe.SQLite import Codec.Compression.BZip as BZ1 import Codec.Compression.BZip.Internal qualified as BZ -- import Codec.Compression.Zlib.Internal qualified as GZ +import Codec.Compression.Zstd qualified as Zstd +import Codec.Compression.Zstd.Streaming qualified as ZstdS +import Codec.Compression.Zstd.Streaming (Result(..)) +import Codec.Compression.Zstd (maxCLevel) import Data.HashPSQ qualified as HPSQ import Data.Maybe @@ -57,6 +62,7 @@ import Data.ByteString.Lazy.Char8 qualified as LBS8 import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.ByteString.Lazy (ByteString) +import Data.ByteString.Builder as Builder import Text.InterpolatedString.Perl6 (qc) import Data.HashSet qualified as HS import Data.HashSet (HashSet(..)) @@ -71,9 +77,11 @@ import Control.Monad.Trans.Cont import Control.Monad.Trans.Maybe import Control.Monad.Reader import Control.Monad.Except +import Control.Concurrent.STM qualified as STM import System.IO (hPrint,hGetLine,IOMode(..)) import System.IO qualified as IO +import Data.Either import Data.Coerce import Data.Kind import Data.List (sortOn) @@ -91,6 +99,7 @@ quit = liftIO Q.exitSuccess data GitException = CompressionError String + | DecompressionError String | InvalidObjectFormat GitObjectType (Maybe GitHash) | InvalidGitPack ByteString | OtherGitError String @@ -352,55 +361,6 @@ gitReadCommitParents bs = do | ListVal [ StringLike "parent", StringLike hash ] <- what ] & catMaybes -gitWriteCommitPackIO :: (GitWritePacksOpts opt, GitObjectReader reader, Pretty what) => opt -> reader -> what -> ( BS.ByteString -> IO () ) -> IO () -gitWriteCommitPackIO opts reader what action = do - hhead <- gitRevParse what >>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty what) - - parents <- gitReadObjectThrow Commit hhead >>= gitReadCommitParents - - skip <- if not (excludeParents opts) then do - pure mempty - else do - skip' <- S.toList_ $ for parents $ \p -> do - gitReadTree p <&> fmap gitEntryHash >>= S.each - pure $ HS.fromList skip' - - - r <- gitReadTree hhead - <&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip)) - <&> sortGitTreeEntries - - flip runContT pure do - - inq <- newTQueueIO - - atomically do - writeTQueue inq (Commit, hhead) - for_ r $ \GitTreeEntry{..} -> do - writeTQueue inq (gitEntryType, gitEntryHash) - - let params = defaultCompressParams - let compressStream = BZ.compressIO params - - lift $ flip fix compressStream $ \go -> \case - BZ.CompressInputRequired next -> do - - inO <- atomically $ tryReadTQueue inq - - case inO of - Nothing -> go =<< next mempty - - Just (t,ha) -> do - (tt,bs) <- gitReadObjectMaybe reader ha >>= orThrow (GitReadError (show $ pretty ha)) - let header = [qc|{pretty (Short tt)} {pretty $ LBS.length bs} {pretty ha}|] - go =<< next (LBS.toStrict (LBS8.intercalate "\n" [header, bs])) - - BZ.CompressOutputAvailable outchunk next -> do - action outchunk - go =<< next - - BZ.CompressStreamEnd -> pure () - data UState = UHead ByteString @@ -409,33 +369,54 @@ pattern PEntryView t s h <- ( unpackPEntry -> Just (t,s,h) ) unpackPEntry :: [ByteString] -> Maybe (GitObjectType, Word32, GitHash) unpackPEntry = \case - ["C", s, h] -> (Commit,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) - ["B", s, h] -> (Blob,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) - ["T", s, h] -> (Tree,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) + ("C" : s : h : _) -> (Commit,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) + ("B" : s : h : _) -> (Blob,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) + ("T" : s : h : _) -> (Tree,,) <$> readMay (LBS8.unpack s) <*> fromStringMay (LBS8.unpack h) _ -> Nothing +data ES = + ES [BS.ByteString] Result + enumGitPackObjectsFromLBS :: MonadIO m => ByteString -> ( GitObjectType -> Word32 -> GitHash -> m Bool ) -> m () enumGitPackObjectsFromLBS lbs action = do - let content = BZ.decompress defaultDecompressParams lbs - flip fix (UHead content) $ \next -> \case - UHead "" -> none - UHead bs -> do + let chunks = LBS.toChunks lbs - let (hd,rest) = LBS8.span (/='\n') bs + stream <- liftIO ZstdS.decompress - case LBS8.words hd of - PEntryView t s h -> do - -- liftIO $ print $ pretty h <+> pretty t <+> pretty s - deeper <- action t s h - when deeper do - next $ UHead (LBS8.drop (1 + fromIntegral s) rest) + chunks <- S.toList_ do + + flip fix (ES chunks stream) $ \go -> \case + ES _ (Error s1 s2) -> throwIO (DecompressionError (s1 <> s2)) + + ES [] (Consume work) -> + go . ES [] =<< liftIO (work mempty) + + ES (r:rs) (Consume work) -> do + go . ES rs =<< liftIO (work r) + + ES rs (Produce s continue) -> do + S.yield s + go . ES rs =<< liftIO continue + + ES _ (Done s) -> do + S.yield s + + void $ flip fix (UHead (LBS.fromChunks chunks)) $ \next -> \case + UHead chunk -> do + let s0 = LBS8.dropWhile (=='\n') chunk + unless (LBS.null s0) do + let (hdr,rest) = LBS8.break (=='\n') s0 + (t,s,h) <- unpackPEntry (LBS8.words hdr) & orThrow (InvalidGitPack hdr) + void $ action t s h + let o = LBS.drop 1 rest + let (_, rest2) = LBS.splitAt (fromIntegral s) o + next (UHead rest2) - _ -> throwIO (InvalidGitPack hd) data ExportState = ExportGetCommit @@ -455,6 +436,10 @@ data WInput = WInputSBlock | WInputCBlock HashRef + +data EWState = + EWAcc Int [GitTreeEntry] Int [(GitHash, GitObjectType,Maybe GitTreeEntry, ByteString)] + theDict :: forall m . ( HBS2GitPerks m , HasClientAPI PeerAPI UNIX m , HasStorage m @@ -479,8 +464,10 @@ theDict = do entry $ bindMatch "git:tree:ls" $ nil_ $ const do r <- gitReadTree "HEAD" for_ r $ \GitTreeEntry{..} -> do - liftIO $ print $ pretty gitEntryHash <+> pretty gitEntryType <+> pretty gitEntrySize <+> pretty gitEntryName - + liftIO $ print $ pretty gitEntryHash + <+> pretty gitEntryType + <+> pretty gitEntrySize + <+> pretty gitEntryName entry $ bindMatch "reflog" $ nil_ $ \case [ SignPubKeyLike what ] -> do @@ -489,6 +476,9 @@ theDict = do _ -> throwIO (BadFormException @C nil) + entry $ bindMatch "debug" $ nil_ $ const do + setLogging @DEBUG $ toStderr . logPrefix "[debug] " + entry $ bindMatch "test:state:init" $ nil_ $ \case [ ] -> do lift $ connectedDo do @@ -515,121 +505,40 @@ theDict = do notice $ pretty r - entry $ bindMatch "test:git:tree:pack:write" $ nil_ $ \syn -> flip runContT pure do + entry $ bindMatch "test:git:sblock:list" $ nil_ $ \syn -> lift do + hash <- headMay [ x | HashLike x <- syn ] & orThrowUser "sblock hash not given" + sto <- getStorage - let o = [ WriteFullPack | StringLike "--full" <- syn ] & HS.fromList + liftIO do - (what,to) <- case syn of + hzz <- S.toList_ $ walkMerkle (coerce hash) (getBlock sto) $ \case + Left h -> throwIO MissedBlockError + Right ( hs :: [HashRef] ) -> S.each hs - ( StringLike rev : StringLike fn : _) -> do - -- let co = headDef "HEAD" $ [ GitRef (LBS8.toStrict $ LBS8.pack what) | StringLike what <- syn ] - fh <- ContT $ bracket (liftIO (IO.openFile fn WriteMode)) hClose - pure (rev, fh) + hmeta <- headMay hzz & orThrowUser "empty sblock" - ( StringLike rev : _ ) -> pure ( rev, stdout ) + what <- getBlock sto (coerce hmeta) + >>= orThrow StorageError + <&> LBS8.unpack + <&> parseTop + <&> fromRight mempty - _ -> pure ( "HEAD", stdout ) + _ <- headMay [ () + | ListVal [ StringLike "hbs2-git", _, StringLike "zstd" ] <- what + ] & orThrowUser "invalid sblock metadata" - rd <- ContT withGitCat + let pps = [ ph + | ListVal [ StringLike "p", HashLike ph ] <- what + ] & HS.fromList - liftIO $ gitWriteCommitPackIO o rd what $ \bs -> do - BS.hPut to bs + let rs = filter (\x -> not (HS.member x pps)) (tail hzz) - - entry $ bindMatch "test:git:tree:walk" $ nil_ $ \syn -> do - sref <- case syn of - [ HashLike s ] -> pure s - _ -> throwIO (BadFormException @C nil) - - sto <- lift getStorage - lift $ connectedDo $ flip runContT pure $ do - - _p <- newTVarIO 0 - wq <- newTVarIO ( HPSQ.empty @HashRef @Int @WInput ) - notice $ "sblock" <+> pretty sref - - atomically $ modifyTVar wq (HPSQ.insert sref 0 WInputSBlock) - - flip fix WGetInput \next -> \case - WStart -> do - debug $ "start" <+> pretty sref - next WEnd -- (WReadSBlock sref) - - WReadSBlock h -> do - blk' <- getBlock sto (coerce h) - maybe1 blk' (next WEnd) (next . WCheckSBlock h) - - WCheckSBlock h bs -> do - let what = tryDetect (coerce h) bs - case what of - Merkle mt -> next (WWalkSBlock h mt) - _ -> next WEnd - - WWalkSBlock self x -> case x of - MLeaf ( (c:parents) :: [HashRef]) -> do - - debug $ "walk sblock yay!" <+> pretty self <+> pretty parents - debug $ "sblok content" <+> pretty c - - atomically do - p0 <- stateTVar _p $ \x -> (x, pred x) - for_ (zip [1 ..] parents) $ \(i,p) -> do - modifyTVar _p $ \x -> x - i - modifyTVar wq (HPSQ.insert p (p0-i) WInputSBlock) - - modifyTVar wq (HPSQ.insert c (p0+1) (WInputCBlock self)) - - next WGetInput - - _ -> next WEnd - - WGetInput -> do - n <- readTVarIO wq <&> HPSQ.size - debug $ "get input!" <+> pretty n - - inp <- atomically $ stateTVar wq $ HPSQ.alterMin \case - Nothing -> (Nothing, Nothing) - Just (k,p,v) -> (Just (k,p,v), Nothing) - - case inp of - Just (h, _, WInputSBlock) -> do - debug $ "goto sblock" <+> pretty h - next (WReadSBlock h) - - Just (h, _, WInputCBlock sblk) -> do - debug $ "process cblock" <+> pretty h <+> pretty "from" <+> pretty sblk - - r <- liftIO $ runExceptT (getTreeContents sto h) - - case r of - Left{} -> next WEnd - Right lbs -> do - next $ WProcessCBlock h sblk lbs - - Nothing -> next WEnd - - WProcessCBlock cblk sblk lbs -> do - - r <- S.toList_ do - enumGitPackObjectsFromLBS lbs $ \t s h -> do - S.yield (t,h,s) - pure False - - case r of - [(Commit, h, _)] -> do - debug $ green "BLOCK" <+> pretty cblk <+> pretty h - - lift $ withState $ transactional do - insertGitPack h cblk - insertCBlock h sblk - - next WGetInput - - _ -> next WEnd - - - WEnd -> do - debug "exit" + for_ rs $ \r -> do + what <- runExceptT (getTreeContents sto r) >>= orThrowPassIO + debug $ yellow "reading" <+> pretty r + enumGitPackObjectsFromLBS what $ \t s h -> do + putStrLn $ show $ pretty t <+> pretty h <+> pretty s + pure True entry $ bindMatch "test:git:tree:export" $ nil_ $ \syn -> do @@ -661,7 +570,7 @@ theDict = do flip fix ExportGetCommit $ \next -> \case ExportStart -> do - here <- lift $ withState $ selectGitPack r <&> isJust + here <- lift $ withState $ selectCBlock r <&> isJust if here then next ExportCheck else next ExportGetCommit ExportGetCommit -> do @@ -690,7 +599,8 @@ theDict = do parents <- gitReadCommitParents bs n <- for (zip [1..] parents) $ \(i,gh) -> do - here <- lift $ withState $ selectGitPack gh <&> isJust + here <- lift $ withState $ selectCBlock gh <&> isJust + -- here <- pure False -- lift $ withState $ selectCBlock gh <&> isJust atomically do pdone <- readTVar done <&> HS.member gh if pdone || here then do @@ -702,38 +612,154 @@ theDict = do if sum n == 0 then lift do debug $ "write pack for" <+> pretty co - let fn = "export" show (pretty co) <> ".pack" + let dir = "export" - liftIO $ withFile fn WriteMode $ \to -> do - gitWriteCommitPackIO () reader co $ \pss -> do - BS.hPut to pss + mkdir dir - -- FIXME: support-encryption! - lbs <- liftIO $ LBS.readFile fn + hhead <- gitRevParse co + >>= orThrow (OtherGitError $ show $ "can't parse" <+> pretty co) - href <- createTreeWithMetadata sto mzero mempty lbs - >>= orThrowUser "can't write merkle tree" + parents <- gitReadObjectThrow Commit hhead >>= gitReadCommitParents - debug $ "pack-merkle-tree-hash" <+> pretty href + skip <- if not (excludeParents ()) then do + pure mempty + else do + skip' <- S.toList_ $ for parents $ \p -> do + gitReadTree p <&> fmap gitEntryHash >>= S.each + pure $ HS.fromList skip' - debug $ "make cblock" + + r <- gitReadTree hhead + <&> L.filter (\GitTreeEntry{..} -> not (HS.member gitEntryHash skip)) + -- <&> L.filter (\GitTreeEntry{..} -> gitEntryType /= Tree) + <&> sortGitTreeEntries + + let blkMax = 1048576 + + out <- newTQueueIO + + let writeLargeBlob n GitTreeEntry{..} = do + size <- gitEntrySize & orThrow (GitReadError (show $ "expected blob" <+> pretty gitEntryHash)) + debug $ yellow "write large object" <+> pretty gitEntryHash + let p = Builder.byteString [qc|{pretty $ Short gitEntryType} {pretty size} {pretty gitEntryHash} {gitEntryName}|] + <> Builder.byteString "\n" + & LBS.toStrict . Builder.toLazyByteString + + liftIO do + + -- TODO: check-if-work-on-large-files + pieces <- S.toList_ do + + stream <- lift $ ZstdS.compress maxCLevel + + let fn = dir show (pretty co) <> "." <> show n <> ".big" <> ".pack" + + (t,lbs) <- gitReadObjectMaybe reader gitEntryHash + >>= orThrow (GitReadError (show $ pretty gitEntryHash)) + + let chunks = p : LBS.toChunks lbs + + flip fix (chunks, stream) $ \go r -> + case r of + (c, Produce chunk continue) -> do + S.yield chunk + w <- lift continue + go (c,w) + + ([], Consume consume) -> do + x <- lift $ consume mempty + go ([],x) + + (s:ss, Consume consume) -> do + x <- lift $ consume s + go (ss,x) + + (_,Done bs) -> do + S.yield bs + debug "done!" + + (_,Error s1 s2) -> do + throwIO (CompressionError (s1 <> " " <> s2)) + + -- TODO: check-if-work-on-large-files + href <- createTreeWithMetadata sto mzero mempty (LBS.fromChunks pieces) + >>= orThrowPassIO + + atomically $ writeTQueue out href + + let writePack i l racc = do + -- write + -- pack + -- merkle + let fn = dir show (pretty co) <> "." <> show (length racc) <> "." <> show i <> ".pack" + let acc = reverse racc + debug $ green "write pack of objects" <+> pretty l <+> pretty (length acc) + + parts <- for acc $ \(h,t,e,lbs) -> liftIO do + let ename = [qc|{fromMaybe mempty $ gitEntryName <$> e}|] :: ByteString + + -- notice $ "pack" <+> pretty h <+> pretty t + let p = Builder.byteString [qc|{pretty $ Short t} {pretty (LBS.length lbs)} {pretty h} {ename}|] + <> Builder.byteString "\n" + <> Builder.lazyByteString lbs + <> Builder.byteString "\n" + pure p + + let packed = Zstd.compress maxCLevel (LBS.toStrict $ Builder.toLazyByteString $ mconcat parts) + + href <- createTreeWithMetadata sto mzero mempty (LBS.fromStrict packed) + >>= orThrowPassIO + + atomically $ writeTQueue out href + + flip fix (EWAcc 1 r 0 [(co,Commit,Nothing,bs)]) $ \go -> \case + + EWAcc _ [] _ [] -> none + + EWAcc i [] l acc -> do + writePack i l acc + + EWAcc i (r@GitTreeEntry{..}:rs) l acc | gitEntrySize >= Just (fromIntegral blkMax) -> do + writeLargeBlob i r + go (EWAcc (succ i) rs l acc) + + EWAcc i rs l acc | l >= blkMax -> do + writePack i l acc + go (EWAcc (succ i) rs 0 mempty) + + EWAcc i (e@GitTreeEntry{..}:rs) l acc -> do + + lbs <- gitReadObjectMaybe reader gitEntryHash + >>= orThrow (GitReadError (show $ pretty gitEntryHash)) + <&> snd + + go (EWAcc i rs (l + fromIntegral (LBS.length lbs)) ((gitEntryHash,gitEntryType, Just e, lbs) : acc)) + + packs <- atomically $ STM.flushTQueue out phashes <- withState $ for parents \p -> do selectCBlock p >>= orThrowUser ("pack export failed" <+> pretty p) - debug $ "write cblock" + let v = "hbs2-git 3.0 zstd" + let pps = vcat $ mconcat $ for phashes $ \p -> ["p" <+> pretty p] + let meta = LBS8.pack $ show $ pretty v <> line <> pps - let cblock = href : phashes + hmeta <- putBlock sto meta >>= orThrow StorageError <&> HashRef + + let cblock = hmeta : phashes <> packs let pt = toPTree (MaxSize 1024) (MaxNum 1024) cblock root <- makeMerkle 0 pt $ \(_,_,s) -> do void $ putBlock sto s withState $ transactional do - insertGitPack co href + for_ packs $ \href -> do + insertGitPack co href insertCBlock co (HashRef root) + notice $ "cblock" <+> pretty root + atomically $ modifyTVar done (HS.insert co) else do atomically $ modifyTVar q (HPSQ.insert co prio ()) @@ -751,7 +777,7 @@ debugPrefix = toStderr . logPrefix "[debug] " setupLogger :: MonadIO m => m () setupLogger = do - setLogging @DEBUG $ toStderr . logPrefix "[debug] " + -- setLogging @DEBUG $ toStderr . logPrefix "[debug] " setLogging @ERROR $ toStderr . logPrefix "[error] " setLogging @WARN $ toStderr . logPrefix "[warn] " setLogging @NOTICE $ toStdout . logPrefix "" diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index 8fd86eef..82ad4fbd 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -138,6 +138,7 @@ executable hbs2-git3 , binary , psqueues , vector + , zstd hs-source-dirs: app default-language: GHC2021 diff --git a/hbs2-git3/lib/HBS2/Git3/State/Direct.hs b/hbs2-git3/lib/HBS2/Git3/State/Direct.hs index d1abe826..f09ff5bf 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Direct.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Direct.hs @@ -44,8 +44,9 @@ evolveState = do ddl [qc| create table if not exists gitpack - ( kommit text not null primary key + ( kommit text not null , pack text not null + , primary key (kommit,pack) ) |] @@ -81,14 +82,13 @@ insertGitPack :: MonadIO m => GitHash -> HashRef -> DBPipeM m () insertGitPack co pack = do insert [qc| insert into gitpack (kommit,pack) values(?,?) - on conflict (kommit) do update set pack = excluded.pack + on conflict (kommit,pack) do nothing |] (co, pack) -selectGitPack :: MonadIO m => GitHash -> DBPipeM m (Maybe HashRef) -selectGitPack gh = do - select [qc|select pack from gitpack where kommit = ? limit 1|] (Only gh) - <&> listToMaybe . fmap fromOnly - +selectGitPacks :: MonadIO m => GitHash -> DBPipeM m [HashRef] +selectGitPacks gh = do + select [qc|select pack from gitpack where kommit = ? |] (Only gh) + <&> fmap fromOnly insertCBlock :: MonadIO m => GitHash -> HashRef -> DBPipeM m () insertCBlock co cblk = do