From ed6a484f50a7cf5864f6940b7f54a418ef82ee5c Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 24 Jan 2024 07:11:28 +0300 Subject: [PATCH] block fetch progress notification --- hbs2-peer/app/PeerMain.hs | 59 +++++++++++- hbs2-peer/hbs2-peer.cabal | 2 +- hbs2-tests/hbs2-tests.cabal | 37 ++++++++ hbs2-tests/repo-export/RepoExportMain.hs | 116 +++++++++++++++++++++++ 4 files changed, 211 insertions(+), 3 deletions(-) create mode 100644 hbs2-tests/repo-export/RepoExportMain.hs diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index b351f995..671d1a8e 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -35,6 +35,7 @@ import HBS2.Net.Proto.Service import HBS2.Net.Proto.Notify (NotifyProto) import HBS2.OrDie import HBS2.Storage.Simple +import HBS2.Storage.Operations.Missed import HBS2.Data.Detect import HBS2.System.Logger.Simple hiding (info) @@ -71,6 +72,7 @@ import HBS2.Peer.RPC.API.Peer import HBS2.Peer.RPC.API.RefLog import HBS2.Peer.RPC.API.RefChan import HBS2.Peer.Notify +import HBS2.Peer.RPC.Client.StorageClient import RPC2(RPC2Context(..)) @@ -85,6 +87,7 @@ import Crypto.Saltine (sodiumInit) import Data.ByteString.Lazy qualified as LBS import Data.ByteString qualified as BS import Data.Cache qualified as Cache +import Data.Fixed import Data.List qualified as L import Data.Map (Map) import Data.Map qualified as Map @@ -104,6 +107,7 @@ import System.IO import System.Mem import System.Metrics import System.Posix.Process +import Control.Monad.Trans.Cont import UnliftIO.Exception qualified as U -- import UnliftIO.STM @@ -112,6 +116,8 @@ import UnliftIO.Async import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S +import Graphics.Vty qualified as Vty + data GoAgainException = GoAgainException deriving (Eq,Ord,Show,Typeable) @@ -303,9 +309,58 @@ runCLI = do pFetch = do rpc <- pRpcCommon + pro <- optional (switch (short 'p' <> long "progress" <> help "display progress")) + <&> fromMaybe False h <- strArgument ( metavar "HASH" ) - pure $ withMyRPC @PeerAPI rpc $ \caller -> do - void $ callService @RpcFetch caller h + + pure $ flip runContT pure do + + client <- ContT $ withRPCMessaging rpc + + self <- runReaderT (ownPeer @UNIX) client + -- refChanAPI <- makeServiceCaller @RefChanAPI self + peerAPI <- makeServiceCaller @PeerAPI self + storageAPI <- makeServiceCaller @StorageAPI self + let sto = AnyStorage (StorageClient storageAPI) + + let endpoints = [ Endpoint @UNIX peerAPI + , Endpoint @UNIX storageAPI + ] + + void $ ContT $ bracket (async $ runReaderT (runServiceClientMulti endpoints) client) cancel + + t0 <- getTimeCoarse + + void $ callService @RpcFetch peerAPI h + + liftIO do + when pro $ flip runContT pure do + cfg <- liftIO $ Vty.standardIOConfig + vty <- ContT $ bracket (Vty.mkVty cfg) Vty.shutdown + + fix \next -> do + miss <- findMissedBlocks sto h + + let l = length miss + t1 <- getTimeCoarse + let elapsed = toNanoSeconds (TimeoutTS (t1 - t0)) + & realToFrac @_ @Double + & (/1e9) + & realToFrac @_ @(Fixed E3) + & showFixed True + + let msg = show $ + "fetch tree:" <+> pretty h <> line + <> "blocks left:" <+> pretty l <> line + <> "time elapsed:" <+> pretty elapsed + + let pic = Vty.picForImage $ Vty.string Vty.defAttr msg + liftIO $ Vty.update vty pic + + unless (l == 0) do + pause @'Seconds 2 + next + pPing = do rpc <- pRpcCommon diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 9522158d..f3b6cfde 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -198,7 +198,7 @@ executable hbs2-peer , CLI.RefChan -- other-extensions: - build-depends: base, hbs2-peer, hbs2-keyman + build-depends: base, hbs2-peer, hbs2-keyman, vty hs-source-dirs: app diff --git a/hbs2-tests/hbs2-tests.cabal b/hbs2-tests/hbs2-tests.cabal index 4f3c6776..2bf68124 100644 --- a/hbs2-tests/hbs2-tests.cabal +++ b/hbs2-tests/hbs2-tests.cabal @@ -946,3 +946,40 @@ executable test-playground , unordered-containers , resourcet + +executable test-repo-export + import: shared-properties + default-language: Haskell2010 + + -- other-extensions: + + hs-source-dirs: repo-export + main-is: RepoExportMain.hs + build-depends: + base, hbs2-core, hbs2-peer, hbs2-git + , async + , bytestring + , cache + , containers + , directory + , exceptions + , hashable + , microlens-platform + , mtl + , prettyprinter + , random + , safe + , serialise + , stm + , streaming + , transformers + , uniplate + , vector + , simple-logger + , string-conversions + , filepath + , temporary + , unliftio + , unordered-containers + + diff --git a/hbs2-tests/repo-export/RepoExportMain.hs b/hbs2-tests/repo-export/RepoExportMain.hs new file mode 100644 index 00000000..c662deab --- /dev/null +++ b/hbs2-tests/repo-export/RepoExportMain.hs @@ -0,0 +1,116 @@ +module Main where + +import HBS2.Prelude.Plated +import HBS2.Data.Types.Refs +import HBS2.Merkle +import HBS2.Clock +import HBS2.OrDie +import HBS2Git.Config +import HBS2.Git.Local.CLI +import HBS2Git.App (detectRPC) +import HBS2.Peer.RPC.Client.Unix +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.Storage +import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.RPC.Client.StorageClient +import HBS2.System.Logger.Simple +import HBS2.Storage.Operations.ByteString +import HBS2.Storage +import HBS2.Git.Types + +import UnliftIO +import Control.Monad.Reader +import Control.Monad.Catch +import Control.Monad.Trans.Cont +import Control.Monad.Trans.Maybe +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString.Lazy (ByteString) +import Codec.Serialise +import Data.Maybe +import Data.HashSet qualified as HS + +data RPCEndpoints = + RPCEndpoints + { rpcPeer :: ServiceCaller PeerAPI UNIX + , rpcStorage :: ServiceCaller StorageAPI UNIX + , rpcRefLog :: ServiceCaller RefLogAPI UNIX + } + +runWithRPC :: forall m . (MonadUnliftIO m, MonadThrow m) => (RPCEndpoints -> m ()) -> m () +runWithRPC action = do + + let soname' = Just "/tmp/hbs2-rpc.socket" + + soname <- race ( pause @'Seconds 1) (maybe (detectRPC True) pure soname') `orDie` "hbs2-peer rpc timeout!" + + client <- race ( pause @'Seconds 1) (newMessagingUnix False 1.0 soname) `orDie` "hbs2-peer rpc timeout!" + + rpc <- RPCEndpoints <$> makeServiceCaller (fromString soname) + <*> makeServiceCaller (fromString soname) + <*> makeServiceCaller (fromString soname) + + messaging <- async $ runMessagingUnix client + link messaging + + let endpoints = [ Endpoint @UNIX (rpcPeer rpc) + , Endpoint @UNIX (rpcStorage rpc) + , Endpoint @UNIX (rpcRefLog rpc) + ] + + c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + link c1 + + test <- race ( pause @'Seconds 1) (callService @RpcPoke (rpcPeer rpc) ()) `orDie` "hbs2-peer rpc timeout!" + + void $ pure test `orDie` "hbs2-peer rpc error!" + + debug $ "hbs2-peer RPC ok" <+> pretty soname + + action rpc + + cancel messaging + + void $ waitAnyCatchCancel [messaging, c1] + + +main :: IO () +main = do + dir <- findGitDir "." >>= orThrowUser "not a git dir" + + flip runContT pure do + + o <- gitListAllObjects + + ep <- ContT runWithRPC + + let sto = StorageClient (rpcStorage ep) + + cat <- startGitCatFile + + -- h <- gitGetHash "HEAD" >>= orThrowUser "wtf1" + -- rvl <- gitRevList Nothing h + + liftIO do + allShit' <- for o $ \r@(o,h) -> runMaybeT do + GitObject t lbs <- toMPlus =<< gitReadFromCatFileBatch cat h + liftIO $ print $ pretty (t, h) + ght <- writeAsMerkle sto lbs + + tt <- getBlock sto ght + >>= toMPlus + >>= orThrowUser "FUCK" . (deserialiseOrFail @(MTree [HashRef])) + + let txt = fromString (show $ pretty t) + let ann = MTreeAnn (ShortMetadata txt) NullEncryption tt + putBlock sto (serialise ann) >>= toMPlus + + let pt = HS.fromList (HashRef <$> catMaybes allShit') + & HS.toList + & toPTree (MaxSize 256) (MaxNum 256) + + ht <- makeMerkle 0 pt $ \(_,_,bss) -> do + void $ putBlock sto bss + + print $ pretty (HashRef ht) + +