diff --git a/hbs2-git3/app/Main.hs b/hbs2-git3/app/Main.hs index 0fc17009..4ecdb24e 100644 --- a/hbs2-git3/app/Main.hs +++ b/hbs2-git3/app/Main.hs @@ -18,13 +18,10 @@ import HBS2.Peer.RPC.API.LWWRef import HBS2.Peer.RPC.API.Storage import HBS2.Peer.RPC.Client.StorageClient -import HBS2.CLI.Run.Internal.Merkle (getTreeContents) - -- move to Data.Config.Suckless.Script.Filea sepatate library import HBS2.Data.Log.Structured - -import HBS2.CLI.Run.Internal.Merkle (createTreeWithMetadata) +import HBS2.CLI.Run.Internal.Merkle (getTreeContents) import HBS2.CLI.Run.RefLog (getCredentialsForReflog,mkRefLogUpdateFrom) import HBS2.System.Dir @@ -33,6 +30,8 @@ import HBS2.Git3.Types import HBS2.Git3.Config.Local import HBS2.Git3.Git import HBS2.Git3.Export +import HBS2.Git3.Import +import HBS2.Git3.State.RefLog import Data.Config.Suckless.Script import Data.Config.Suckless.Script.File @@ -929,50 +928,56 @@ theDict = do let cpOnly = or [ True | ListVal [StringLike "--checkpoints"] <- opts ] let sOnly = or [ True | ListVal [StringLike "--segments"] <- opts ] - sto <- getStorage + hxs <- txListAll Nothing - refLogAPI <- getClientAPI @RefLogAPI @UNIX - reflog <- getGitRemoteKey >>= orThrowUser "reflog not set" - - rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog - >>= orThrowUser "rpc timeout" - >>= orThrowUser "reflog is empty" - <&> coerce - - hxs <- S.toList_ $ walkMerkle @[HashRef] rv (getBlock sto) $ \case - Left{} -> throwIO MissedBlockError - Right hs -> S.each hs - - liftIO $ forM_ hxs $ \h -> do - - decoded <- readTxMay sto h - <&> \case - Just (TxSegment x) | not cpOnly -> + liftIO $ forM_ hxs $ \(h,tx) -> do + let decoded = case tx of + TxSegment x | not cpOnly -> Just ("S" <+> fill 44 (pretty h) <+> fill 44 (pretty x)) - Just (TxCheckpoint n x) | not sOnly -> + TxCheckpoint n x | not sOnly -> Just ("C" <+> fill 44 (pretty h) <+> pretty x <+> fill 8 (pretty n)) _ -> Nothing forM_ decoded print - entry $ bindMatch "test:git:import" $ nil_ $ \syn -> lift $ connectedDo do - + entry $ bindMatch "reflog:import:pack" $ nil_ $ \syn -> lift $ connectedDo do updateReflogIndex - refLogAPI <- getClientAPI @RefLogAPI @UNIX - reflog <- getGitRemoteKey >>= orThrowUser "reflog not set" + packs <- findGitDir + >>= orThrowUser "git directory not found" + <&> ( "objects/pack") - rv <- callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog - >>= orThrowUser "rpc timeout" - >>= orThrowUser "reflog is empty" - <&> coerce @_ @HashRef + state <- getStatePathM - notice $ "test:git:import" <+> pretty (AsBase58 reflog) <+> pretty rv + let imported = state "imported" - sto <- getStorage - none + prev <- runMaybeT do + f <- liftIO (try @_ @IOError (readFile imported)) >>= toMPlus + toMPlus (fromStringMay @HashRef f) + + excl <- maybe1 prev (pure mempty) $ \p -> do + txListAll (Just p) <&> HS.fromList . fmap fst + + rv <- refLogRef + + hxs <- txListAll rv <&> filter (not . flip HS.member excl . fst) + + forConcurrently_ hxs $ \case + (_, TxCheckpoint{}) -> none + (h, TxSegment tree) -> do + s <- writeAsGitPack packs tree + + for_ s $ \file -> do + gitRunCommand [qc|git index-pack {file}|] + >>= orThrowPassIO + + notice $ "imported" <+> pretty h + + for_ rv $ \r -> do + liftIO $ UIO.withBinaryFileAtomic imported WriteMode $ \fh -> do + IO.hPutStr fh (show $ pretty r) exportEntries "reflog:" diff --git a/hbs2-git3/hbs2-git3.cabal b/hbs2-git3/hbs2-git3.cabal index b42dd5cb..7819d4f9 100644 --- a/hbs2-git3/hbs2-git3.cabal +++ b/hbs2-git3/hbs2-git3.cabal @@ -124,8 +124,11 @@ library HBS2.Git3.Types HBS2.Git3.Prelude HBS2.Git3.Export + HBS2.Git3.Import HBS2.Git3.State.Types + HBS2.Git3.State.RefLog HBS2.Git3.State.Index + HBS2.Git3.State.Segment HBS2.Git3.Config.Local HBS2.Git3.Git HBS2.Git3.Git.Pack diff --git a/hbs2-git3/lib/HBS2/Git3/Git/Pack.hs b/hbs2-git3/lib/HBS2/Git3/Git/Pack.hs index f6efe678..ad97cacc 100644 --- a/hbs2-git3/lib/HBS2/Git3/Git/Pack.hs +++ b/hbs2-git3/lib/HBS2/Git3/Git/Pack.hs @@ -84,3 +84,5 @@ decodeObjectSize source = run $ flip fix (source,0,0,0) $ \next (bs, i, tp, num) + + diff --git a/hbs2-git3/lib/HBS2/Git3/Import.hs b/hbs2-git3/lib/HBS2/Git3/Import.hs new file mode 100644 index 00000000..cb4a00b2 --- /dev/null +++ b/hbs2-git3/lib/HBS2/Git3/Import.hs @@ -0,0 +1,103 @@ +{-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} +module HBS2.Git3.Import where + +import HBS2.Git3.Prelude +import HBS2.Git3.State.Index +import HBS2.Git3.Git +import HBS2.Git3.Git.Pack +import HBS2.CLI.Run.Internal.Merkle (getTreeContents) +import HBS2.Git3.State.Segment + +import HBS2.Data.Log.Structured + +import HBS2.System.Dir + +import Codec.Compression.Zlib qualified as Zlib +import Data.ByteString.Lazy.Char8 qualified as LBS8 +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS +import System.IO.Temp as Temp +import UnliftIO.IO.File qualified as UIO +import Network.ByteOrder qualified as N + +data ImportException = + ImportInvalidSegment HashRef + deriving stock (Show,Typeable) + +instance Exception ImportException + +writeAsGitPack :: forall m . (HBS2GitPerks m, HasStorage m) + => FilePath + -> HashRef + -> m (Maybe FilePath) + +writeAsGitPack dir href = do + + sto <- getStorage + + file <- liftIO $ Temp.emptyTempFile dir (show (pretty href) <> ".pack") + + no_ <- newTVarIO 0 + + liftIO $ UIO.withBinaryFileAtomic file ReadWriteMode $ \fh -> flip runContT pure do + + let header = BS.concat [ "PACK", N.bytestring32 2, N.bytestring32 0 ] + + liftIO $ BS.hPutStr fh header + + seen_ <- newTVarIO (mempty :: HashSet GitHash) + + source <- liftIO (runExceptT (getTreeContents sto href)) + >>= orThrow MissedBlockError + + lbs' <- decompressSegmentLBS source + + lbs <- ContT $ maybe1 lbs' none + + runConsumeLBS lbs $ readLogFileLBS () $ \h s obs -> do + seen <- readTVarIO seen_ <&> HS.member h + unless seen do + + let (t, body) = LBS.splitAt 1 obs + + let tp = fromStringMay @(Short GitObjectType) (LBS8.unpack t) + & maybe Blob coerce + + let params = Zlib.defaultCompressParams { Zlib.compressMethod = Zlib.deflateMethod } + + let packed = Zlib.compressWith params body + + let preamble = encodeObjectSize (gitPackTypeOf tp) (fromIntegral $ LBS.length body) + + liftIO do + atomically $ modifyTVar seen_ (HS.insert h) + BS.hPutStr fh preamble + LBS.hPutStr fh packed + + atomically $ modifyTVar no_ succ + + no <- readTVarIO no_ + hSeek fh AbsoluteSeek 8 + liftIO $ BS.hPutStr fh (N.bytestring32 no) + hFlush fh + + sz <- hFileSize fh + hSeek fh AbsoluteSeek 0 + + sha <- liftIO $ LBS.hGetNonBlocking fh (fromIntegral sz) <&> sha1lazy + + hSeek fh SeekFromEnd 0 + + liftIO $ BS.hPutStr fh sha + + no <- readTVarIO no_ + + if no > 0 then do + pure $ Just file + else do + rm file + pure Nothing + diff --git a/hbs2-git3/lib/HBS2/Git3/Prelude.hs b/hbs2-git3/lib/HBS2/Git3/Prelude.hs index 7db9ca6c..fa5cc778 100644 --- a/hbs2-git3/lib/HBS2/Git3/Prelude.hs +++ b/hbs2-git3/lib/HBS2/Git3/Prelude.hs @@ -53,6 +53,11 @@ import System.IO.MMap as Exported import GHC.Natural as Exported import UnliftIO as Exported +data RefLogNotSetException = + RefLogNotSetException + deriving stock (Show,Typeable) + +instance Exception RefLogNotSetException defSegmentSize :: Int defSegmentSize = 50 * 1024 * 1024 @@ -210,3 +215,8 @@ runGit3 :: Git3Perks m => Git3Env -> Git3 m b -> m b runGit3 env action = withGit3Env env action +getStatePathM :: forall m . (HBS2GitPerks m, HasGitRemoteKey m) => m FilePath +getStatePathM = do + k <- getGitRemoteKey >>= orThrow RefLogNotSetException + getStatePath (AsBase58 k) + diff --git a/hbs2-git3/lib/HBS2/Git3/State/Index.hs b/hbs2-git3/lib/HBS2/Git3/State/Index.hs index 2d90f81f..14bb0271 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Index.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Index.hs @@ -4,6 +4,8 @@ import HBS2.Git3.Prelude import HBS2.System.Dir import HBS2.CLI.Run.Internal.Merkle (getTreeContents) import HBS2.Git3.State.Types +import HBS2.Git3.State.Segment +import HBS2.Git3.State.RefLog import HBS2.Git3.Git import HBS2.Data.Log.Structured @@ -268,33 +270,6 @@ bloomFilterSize n k p where rnd x = 2 ** realToFrac (ceiling (logBase 2 x)) & round -data GitTx = - TxSegment HashRef - | TxCheckpoint Natural HashRef - -readTxMay :: forall m . ( MonadIO m - ) - => AnyStorage -> HashRef -> m (Maybe GitTx) - -readTxMay sto href = runMaybeT do - - tx <- getBlock sto (coerce href) - >>= toMPlus - - RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx - & toMPlus - - toMPlus $ - ( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromAnn ) - <|> - ( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromSeq ) - - where - fromAnn = \case - AnnotatedHashRef _ h -> Just (TxSegment h) - - fromSeq = \case - (SequentialRef n (AnnotatedHashRef _ h)) -> Just $ TxCheckpoint (fromIntegral n) h updateReflogIndex :: forall m . ( Git3Perks m , MonadReader Git3Env m @@ -344,34 +319,13 @@ updateReflogIndex = do Nothing -> mzero Just (TxCheckpoint{}) -> mzero Just (TxSegment href) -> do + -- FIXME: error logging - chunks <- liftIO (runExceptT (getTreeContents sto href)) + source <- liftIO (runExceptT (getTreeContents sto href)) >>= orThrow MissedBlockError - <&> LBS.toChunks - what <- toMPlus =<< liftIO do - init <- decompress - flip fix (init, chunks, mempty :: LBS.ByteString) $ \next -> \case - - (Consume work, [], o) -> do - r1 <- work "" - next (r1, [], o) - - (Consume work, e:es, o) -> do - r1 <- work e - next (r1, es, o) - - (Produce piece r, e, o) -> do - r1 <- r - next (r1, e, LBS.append o (LBS.fromStrict piece)) - - (ZStdS.Done bs, _, o) -> pure (Just (LBS.append o (LBS.fromStrict bs))) - - (Error _ _, _, _) -> do - debug $ "not a valid segment" <+> pretty h - pure Nothing - - guard (LBS.length what > 0) + what <- decompressSegmentLBS source + >>= toMPlus pieces <- S.toList_ $ do void $ runConsumeLBS what $ readLogFileLBS () $ \o _ lbs -> do diff --git a/hbs2-git3/lib/HBS2/Git3/State/RefLog.hs b/hbs2-git3/lib/HBS2/Git3/State/RefLog.hs new file mode 100644 index 00000000..5009c350 --- /dev/null +++ b/hbs2-git3/lib/HBS2/Git3/State/RefLog.hs @@ -0,0 +1,82 @@ +module HBS2.Git3.State.RefLog where + +import HBS2.Git3.Prelude + +import Control.Applicative +import Data.ByteString.Lazy qualified as LBS +import Data.Maybe +import Streaming.Prelude qualified as S + +data GitTx = + TxSegment HashRef + | TxCheckpoint Natural HashRef + +data RefLogException = + RefLogRPCException + deriving stock (Show, Typeable) + +instance Exception RefLogException + +readTxMay :: forall m . ( MonadIO m + ) + => AnyStorage -> HashRef -> m (Maybe GitTx) + +readTxMay sto href = runMaybeT do + + tx <- getBlock sto (coerce href) + >>= toMPlus + + RefLogUpdate{..} <- deserialiseOrFail @(RefLogUpdate L4Proto) tx + & toMPlus + + toMPlus $ + ( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromAnn ) + <|> + ( deserialiseOrFail (LBS.fromStrict _refLogUpdData) & either (const Nothing) fromSeq ) + + where + fromAnn = \case + AnnotatedHashRef _ h -> Just (TxSegment h) + + fromSeq = \case + (SequentialRef n (AnnotatedHashRef _ h)) -> Just $ TxCheckpoint (fromIntegral n) h + +refLogRef :: forall m . ( HBS2GitPerks m + , HasStorage m + , HasClientAPI RefLogAPI UNIX m + , HasGitRemoteKey m + ) + => m (Maybe HashRef) + +refLogRef = do + refLogAPI <- getClientAPI @RefLogAPI @UNIX + reflog <- getGitRemoteKey >>= orThrow RefLogNotSetException + + callRpcWaitMay @RpcRefLogGet (TimeoutSec 1) refLogAPI reflog + >>= orThrow RefLogNotSetException + +txListAll :: forall m . ( HBS2GitPerks m + , HasStorage m + , HasClientAPI RefLogAPI UNIX m + , HasGitRemoteKey m + ) + => Maybe HashRef + -> m [(HashRef, GitTx)] + +txListAll mhref = do + sto <- getStorage + + fromMaybe mempty <$> runMaybeT do + + rv <- case mhref of + Just x -> pure x + Nothing -> lift refLogRef >>= toMPlus + + hxs <- S.toList_ $ walkMerkle @[HashRef] (coerce rv) (getBlock sto) $ \case + Left{} -> throwIO MissedBlockError + Right hs -> S.each hs + + S.toList_ $ for_ hxs $ \h -> do + tx <- liftIO (readTxMay sto h) + maybe none (S.yield . (h,)) tx + diff --git a/hbs2-git3/lib/HBS2/Git3/State/Segment.hs b/hbs2-git3/lib/HBS2/Git3/State/Segment.hs new file mode 100644 index 00000000..2be18c63 --- /dev/null +++ b/hbs2-git3/lib/HBS2/Git3/State/Segment.hs @@ -0,0 +1,32 @@ +module HBS2.Git3.State.Segment where + +import HBS2.Git3.Prelude +import Data.ByteString.Lazy ( ByteString ) +import Data.ByteString.Lazy qualified as LBS + +import Codec.Compression.Zstd.Streaming as ZStdS + +decompressSegmentLBS :: MonadIO m => ByteString -> m (Maybe ByteString) +decompressSegmentLBS source = runMaybeT do + let chunks = LBS.toChunks source + toMPlus =<< liftIO do + init <- decompress + flip fix (init, chunks, mempty :: LBS.ByteString) $ \next -> \case + + (Consume work, [], o) -> do + r1 <- work "" + next (r1, [], o) + + (Consume work, e:es, o) -> do + r1 <- work e + next (r1, es, o) + + (Produce piece r, e, o) -> do + r1 <- r + next (r1, e, LBS.append o (LBS.fromStrict piece)) + + (ZStdS.Done bs, _, o) -> pure (Just (LBS.append o (LBS.fromStrict bs))) + + (Error _ _, _, _) -> do + pure Nothing + diff --git a/hbs2-git3/lib/HBS2/Git3/State/Types.hs b/hbs2-git3/lib/HBS2/Git3/State/Types.hs index e238cff3..75708231 100644 --- a/hbs2-git3/lib/HBS2/Git3/State/Types.hs +++ b/hbs2-git3/lib/HBS2/Git3/State/Types.hs @@ -26,3 +26,4 @@ getStatePath p = do d <- getConfigPath pure $ d show (pretty p) +