block fetch progress notification

This commit is contained in:
Dmitry Zuikov 2024-01-24 07:11:28 +03:00
parent c0482fb1eb
commit ed6a484f50
4 changed files with 211 additions and 3 deletions

View File

@ -35,6 +35,7 @@ import HBS2.Net.Proto.Service
import HBS2.Net.Proto.Notify (NotifyProto)
import HBS2.OrDie
import HBS2.Storage.Simple
import HBS2.Storage.Operations.Missed
import HBS2.Data.Detect
import HBS2.System.Logger.Simple hiding (info)
@ -71,6 +72,7 @@ import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.API.RefChan
import HBS2.Peer.Notify
import HBS2.Peer.RPC.Client.StorageClient
import RPC2(RPC2Context(..))
@ -85,6 +87,7 @@ import Crypto.Saltine (sodiumInit)
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString qualified as BS
import Data.Cache qualified as Cache
import Data.Fixed
import Data.List qualified as L
import Data.Map (Map)
import Data.Map qualified as Map
@ -104,6 +107,7 @@ import System.IO
import System.Mem
import System.Metrics
import System.Posix.Process
import Control.Monad.Trans.Cont
import UnliftIO.Exception qualified as U
-- import UnliftIO.STM
@ -112,6 +116,8 @@ import UnliftIO.Async
import Control.Monad.Trans.Resource
import Streaming.Prelude qualified as S
import Graphics.Vty qualified as Vty
data GoAgainException = GoAgainException
deriving (Eq,Ord,Show,Typeable)
@ -303,9 +309,58 @@ runCLI = do
pFetch = do
rpc <- pRpcCommon
pro <- optional (switch (short 'p' <> long "progress" <> help "display progress"))
<&> fromMaybe False
h <- strArgument ( metavar "HASH" )
pure $ withMyRPC @PeerAPI rpc $ \caller -> do
void $ callService @RpcFetch caller h
pure $ flip runContT pure do
client <- ContT $ withRPCMessaging rpc
self <- runReaderT (ownPeer @UNIX) client
-- refChanAPI <- makeServiceCaller @RefChanAPI self
peerAPI <- makeServiceCaller @PeerAPI self
storageAPI <- makeServiceCaller @StorageAPI self
let sto = AnyStorage (StorageClient storageAPI)
let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX storageAPI
]
void $ ContT $ bracket (async $ runReaderT (runServiceClientMulti endpoints) client) cancel
t0 <- getTimeCoarse
void $ callService @RpcFetch peerAPI h
liftIO do
when pro $ flip runContT pure do
cfg <- liftIO $ Vty.standardIOConfig
vty <- ContT $ bracket (Vty.mkVty cfg) Vty.shutdown
fix \next -> do
miss <- findMissedBlocks sto h
let l = length miss
t1 <- getTimeCoarse
let elapsed = toNanoSeconds (TimeoutTS (t1 - t0))
& realToFrac @_ @Double
& (/1e9)
& realToFrac @_ @(Fixed E3)
& showFixed True
let msg = show $
"fetch tree:" <+> pretty h <> line
<> "blocks left:" <+> pretty l <> line
<> "time elapsed:" <+> pretty elapsed
let pic = Vty.picForImage $ Vty.string Vty.defAttr msg
liftIO $ Vty.update vty pic
unless (l == 0) do
pause @'Seconds 2
next
pPing = do
rpc <- pRpcCommon

View File

@ -198,7 +198,7 @@ executable hbs2-peer
, CLI.RefChan
-- other-extensions:
build-depends: base, hbs2-peer, hbs2-keyman
build-depends: base, hbs2-peer, hbs2-keyman, vty
hs-source-dirs: app

View File

@ -946,3 +946,40 @@ executable test-playground
, unordered-containers
, resourcet
executable test-repo-export
import: shared-properties
default-language: Haskell2010
-- other-extensions:
hs-source-dirs: repo-export
main-is: RepoExportMain.hs
build-depends:
base, hbs2-core, hbs2-peer, hbs2-git
, async
, bytestring
, cache
, containers
, directory
, exceptions
, hashable
, microlens-platform
, mtl
, prettyprinter
, random
, safe
, serialise
, stm
, streaming
, transformers
, uniplate
, vector
, simple-logger
, string-conversions
, filepath
, temporary
, unliftio
, unordered-containers

View File

@ -0,0 +1,116 @@
module Main where
import HBS2.Prelude.Plated
import HBS2.Data.Types.Refs
import HBS2.Merkle
import HBS2.Clock
import HBS2.OrDie
import HBS2Git.Config
import HBS2.Git.Local.CLI
import HBS2Git.App (detectRPC)
import HBS2.Peer.RPC.Client.Unix
import HBS2.Peer.RPC.API.Peer
import HBS2.Peer.RPC.API.Storage
import HBS2.Peer.RPC.API.RefLog
import HBS2.Peer.RPC.Client.StorageClient
import HBS2.System.Logger.Simple
import HBS2.Storage.Operations.ByteString
import HBS2.Storage
import HBS2.Git.Types
import UnliftIO
import Control.Monad.Reader
import Control.Monad.Catch
import Control.Monad.Trans.Cont
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy qualified as LBS
import Data.ByteString.Lazy (ByteString)
import Codec.Serialise
import Data.Maybe
import Data.HashSet qualified as HS
data RPCEndpoints =
RPCEndpoints
{ rpcPeer :: ServiceCaller PeerAPI UNIX
, rpcStorage :: ServiceCaller StorageAPI UNIX
, rpcRefLog :: ServiceCaller RefLogAPI UNIX
}
runWithRPC :: forall m . (MonadUnliftIO m, MonadThrow m) => (RPCEndpoints -> m ()) -> m ()
runWithRPC action = do
let soname' = Just "/tmp/hbs2-rpc.socket"
soname <- race ( pause @'Seconds 1) (maybe (detectRPC True) pure soname') `orDie` "hbs2-peer rpc timeout!"
client <- race ( pause @'Seconds 1) (newMessagingUnix False 1.0 soname) `orDie` "hbs2-peer rpc timeout!"
rpc <- RPCEndpoints <$> makeServiceCaller (fromString soname)
<*> makeServiceCaller (fromString soname)
<*> makeServiceCaller (fromString soname)
messaging <- async $ runMessagingUnix client
link messaging
let endpoints = [ Endpoint @UNIX (rpcPeer rpc)
, Endpoint @UNIX (rpcStorage rpc)
, Endpoint @UNIX (rpcRefLog rpc)
]
c1 <- async $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
link c1
test <- race ( pause @'Seconds 1) (callService @RpcPoke (rpcPeer rpc) ()) `orDie` "hbs2-peer rpc timeout!"
void $ pure test `orDie` "hbs2-peer rpc error!"
debug $ "hbs2-peer RPC ok" <+> pretty soname
action rpc
cancel messaging
void $ waitAnyCatchCancel [messaging, c1]
main :: IO ()
main = do
dir <- findGitDir "." >>= orThrowUser "not a git dir"
flip runContT pure do
o <- gitListAllObjects
ep <- ContT runWithRPC
let sto = StorageClient (rpcStorage ep)
cat <- startGitCatFile
-- h <- gitGetHash "HEAD" >>= orThrowUser "wtf1"
-- rvl <- gitRevList Nothing h
liftIO do
allShit' <- for o $ \r@(o,h) -> runMaybeT do
GitObject t lbs <- toMPlus =<< gitReadFromCatFileBatch cat h
liftIO $ print $ pretty (t, h)
ght <- writeAsMerkle sto lbs
tt <- getBlock sto ght
>>= toMPlus
>>= orThrowUser "FUCK" . (deserialiseOrFail @(MTree [HashRef]))
let txt = fromString (show $ pretty t)
let ann = MTreeAnn (ShortMetadata txt) NullEncryption tt
putBlock sto (serialise ann) >>= toMPlus
let pt = HS.fromList (HashRef <$> catMaybes allShit')
& HS.toList
& toPTree (MaxSize 256) (MaxNum 256)
ht <- makeMerkle 0 pt $ \(_,_,bss) -> do
void $ putBlock sto bss
print $ pretty (HashRef ht)