From c6b90878c3485b98220f7d87ad14e047ab7dcf2a Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 16 Mar 2024 08:54:38 +0300 Subject: [PATCH] attempt to extend pipeline for rpc --- Makefile | 18 +-- hbs2-core/lib/HBS2/Actors/Peer.hs | 2 +- hbs2-peer/app/PeerMain.hs | 4 +- hbs21-git/git-hbs2-subscribe/Main.hs | 111 ++++++++++++++++++ hbs21-git/git-hbs21/Main.hs | 10 +- .../HBS2/Git/Client/App/Types.hs | 27 ++++- .../HBS2/Git/Client/App/Types/GitEnv.hs | 14 --- .../HBS2/Git/Client/Import.hs | 50 +++----- .../HBS2/Git/Client/RefLog.hs | 11 +- 9 files changed, 171 insertions(+), 76 deletions(-) create mode 100644 hbs21-git/git-hbs2-subscribe/Main.hs diff --git a/Makefile b/Makefile index eb1f40f6..d9edcf6f 100644 --- a/Makefile +++ b/Makefile @@ -8,15 +8,15 @@ MAKEFLAGS += --no-builtin-rules GHC_VERSION := 9.4.8 BIN_DIR := ./bin BINS := \ - hbs2 \ - hbs2-peer \ - hbs2-reposync \ - hbs2-keyman \ - hbs2-git-reposync \ - git-remote-hbs2 \ - git-hbs2 \ - git-remote-hbs21 \ - git-hbs21 \ + hbs2 \ + hbs2-peer \ + hbs2-keyman \ + hbs2-git-reposync \ + hbs2-git-subscribe \ + git-remote-hbs2 \ + git-hbs2 \ + git-remote-hbs21 \ + git-hbs21 \ ifeq ($(origin .RECIPEPREFIX), undefined) $(error This Make does not support .RECIPEPREFIX. Please use GNU Make 4.0 or later) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 58e55bc2..2dce5d61 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -405,7 +405,7 @@ runPeerM :: forall e m . ( MonadIO m runPeerM env f = do let de = view envDeferred env - as <- liftIO $ replicateM 8 $ async $ runPipeline de + as <- liftIO $ replicateM 32 $ async $ runPipeline de sw <- liftIO $ async $ forever $ withPeerM env $ do pause defSweepTimeout diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 3c321d92..01e8fe5b 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1163,7 +1163,7 @@ runPeer opts = Exception.handle (\e -> myException e envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource w1 <- asyncLinked $ runNotifyWorkerServer env w2 <- asyncLinked $ runNotifyWorkerServer envrl - runProto @UNIX + wws <- replicateM 1 $ async $ runProto @UNIX [ makeResponse (makeServer @PeerAPI) , makeResponse (makeServer @RefLogAPI) , makeResponse (makeServer @RefChanAPI) @@ -1172,7 +1172,7 @@ runPeer opts = Exception.handle (\e -> myException e , makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env) , makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl) ] - mapM_ wait [w1,w2] + mapM_ wait (w1 : w2 : wws ) void $ waitAnyCancel $ w <> [ loop , m1 diff --git a/hbs21-git/git-hbs2-subscribe/Main.hs b/hbs21-git/git-hbs2-subscribe/Main.hs new file mode 100644 index 00000000..d65fdaa5 --- /dev/null +++ b/hbs21-git/git-hbs2-subscribe/Main.hs @@ -0,0 +1,111 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE UndecidableInstances #-} +module Main where + +import HBS2.Git.Client.Prelude hiding (info) +import HBS2.Git.Client.App hiding (_progress, _storage, _peerAPI, _lwwAPI, _refLogAPI) +import HBS2.Git.Client.Progress +import HBS2.Git.Client.Import +import HBS2.Git.Client.RefLog +import HBS2.Peer.CLI.Detect + +import Options.Applicative +import Data.Semigroup ((<>)) + +main :: IO () +main = do + let parser = subscribe + <$> optional (strOption + ( long "socket" + <> short 's' + <> metavar "SOCKET" + <> help "Socket file path")) + <*> argument pLww (metavar "LWWREF") + join $ execParser (info (parser <**> helper) + ( fullDesc + <> progDesc "Parse command line arguments" + <> header "Command line arguments parsing example")) + + + where + pLww :: ReadM (LWWRefKey HBS2Basic) + pLww = maybeReader fromStringMay + + +data MyStuff = + MyStuff + { _peerAPI :: ServiceCaller PeerAPI UNIX + , _lwwAPI :: ServiceCaller LWWRefAPI UNIX + , _refLogAPI :: ServiceCaller RefLogAPI UNIX + , _storage :: AnyStorage + , _progress :: AnyProgress + } + +newtype MyApp m a = MyApp { fromMyApp :: ReaderT MyStuff m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadUnliftIO + , MonadThrow + , MonadReader MyStuff + ) + +instance Monad m => HasProgressIndicator (MyApp m) where + getProgressIndicator = asks _progress + +instance Monad m => HasStorage (MyApp m) where + getStorage = asks _storage + +instance Monad m => HasAPI PeerAPI UNIX (MyApp m) where + getAPI = asks _peerAPI + +instance Monad m => HasAPI LWWRefAPI UNIX (MyApp m) where + getAPI = asks _lwwAPI + +instance Monad m => HasAPI RefLogAPI UNIX (MyApp m) where + getAPI = asks _refLogAPI + +subscribe :: forall m . MonadUnliftIO m => Maybe String -> LWWRefKey HBS2Basic -> m () +subscribe soname' ref = do + + soname <- maybe1 soname' detectRPC (pure.Just) `orDie` "can't locate rpc" + + flip runContT pure do + + client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname) + >>= orThrowUser ("can't connect to" <+> pretty soname) + + q <- lift newProgressQ + let ip = AnyProgress q + + void $ ContT $ withAsync $ runMessagingUnix client + void $ ContT $ withAsync $ drawProgress q + + peerAPI <- makeServiceCaller @PeerAPI (fromString soname) + refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname) + storageAPI <- makeServiceCaller @StorageAPI (fromString soname) + lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname) + + let sto = AnyStorage (StorageClient storageAPI) + + let endpoints = [ Endpoint @UNIX peerAPI + , Endpoint @UNIX refLogAPI + , Endpoint @UNIX lwwAPI + , Endpoint @UNIX storageAPI + ] + + void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client + + let app = MyStuff peerAPI lwwAPI refLogAPI sto ip + + lift $ flip runReaderT app $ fromMyApp do + merelySubscribeRepo ref + + onProgress ip ImportAllDone + + hFlush stdout + hFlush stderr + + pure () + diff --git a/hbs21-git/git-hbs21/Main.hs b/hbs21-git/git-hbs21/Main.hs index 8d0d3eb5..53e67311 100644 --- a/hbs21-git/git-hbs21/Main.hs +++ b/hbs21-git/git-hbs21/Main.hs @@ -36,7 +36,7 @@ globalOptions = do commands :: GitPerks m => Parser (GitCLI m ()) commands = - hsubparser ( command "export" (info pExport (progDesc "export repo to hbs2-git")) + hsubparser ( command "export" (info pExport (progDesc "export repo to hbs2-git")) <> command "import" (info pImport (progDesc "import repo from reflog")) <> command "key" (info pKey (progDesc "key management")) <> command "tools" (info pTools (progDesc "misc tools")) @@ -96,14 +96,6 @@ pImport = do git <- Git.findGitDir >>= orThrowUser "not a git dir" importRepoWait puk -pSubscribe :: GitPerks m => Parser (GitCLI m ()) -pSubscribe = do - lww <- argument pLwwKey (metavar "LWWREF") - pure do - merelySubscribeRepo lww >>= liftIO . \case - Just x -> print $ "subscribed" <+> pretty x - Nothing -> exitFailure - pTools :: GitPerks m => Parser (GitCLI m ()) pTools = hsubparser ( command "dump-pack" (info pDumpPack (progDesc "dump hbs2 git pack")) <> command "show-ref" (info pShowRef (progDesc "show current references")) diff --git a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types.hs b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types.hs index 163ea65e..60dfa627 100644 --- a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types.hs +++ b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types.hs @@ -45,12 +45,37 @@ newtype GitCLI m a = GitCLI { fromGitCLI :: ReaderT GitEnv m a } , Monad , MonadIO , MonadUnliftIO + , MonadTrans , MonadReader GitEnv , MonadThrow ) -type GitPerks m = ( MonadUnliftIO m, MonadThrow m ) +-- type GitPerks m = ( MonadUnliftIO m, MonadThrow m ) +type GitPerks m = ( MonadUnliftIO m ) +instance Monad m => HasProgressIndicator (GitCLI m) where + getProgressIndicator = asks _progress + +instance Monad m => HasStorage (GitCLI m) where + getStorage = asks _storage + +instance Monad m => HasAPI PeerAPI UNIX (GitCLI m) where + getAPI = asks _peerAPI + +instance Monad m => HasAPI LWWRefAPI UNIX (GitCLI m) where + getAPI = asks _lwwRefAPI + +instance Monad m => HasAPI RefLogAPI UNIX (GitCLI m) where + getAPI = asks _refLogAPI + +instance MonadReader GitEnv m => HasAPI RefLogAPI UNIX (ExceptT e m) where + getAPI = asks _refLogAPI + +instance MonadReader GitEnv m => HasAPI LWWRefAPI UNIX (ExceptT e m) where + getAPI = asks _lwwRefAPI + +instance MonadReader GitEnv m => HasAPI PeerAPI UNIX (ExceptT e m) where + getAPI = asks _peerAPI newGitEnv :: GitPerks m => AnyProgress diff --git a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types/GitEnv.hs b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types/GitEnv.hs index d2901cde..e6af7086 100644 --- a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types/GitEnv.hs +++ b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/App/Types/GitEnv.hs @@ -49,19 +49,5 @@ data GitEnv = , _keyringCache :: TVar (HashMap HashRef [KeyringEntry HBS2Basic]) } -instance (Monad m, MonadReader GitEnv m) => HasProgressIndicator m where - getProgressIndicator = asks _progress - -instance MonadReader GitEnv m => HasStorage m where - getStorage = asks _storage - -instance MonadReader GitEnv m => HasAPI PeerAPI UNIX m where - getAPI = asks _peerAPI - -instance MonadReader GitEnv m => HasAPI LWWRefAPI UNIX m where - getAPI = asks _lwwRefAPI - -instance MonadReader GitEnv m => HasAPI RefLogAPI UNIX m where - getAPI = asks _refLogAPI makeLenses 'GitEnv diff --git a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/Import.hs b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/Import.hs index d3f90ede..1d150d23 100644 --- a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/Import.hs +++ b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/Import.hs @@ -1,10 +1,7 @@ module HBS2.Git.Client.Import where - - import HBS2.Git.Client.Prelude hiding (info) import HBS2.Git.Client.App.Types -import HBS2.Git.Client.Config import HBS2.Git.Client.State import HBS2.Git.Client.RefLog import HBS2.Git.Client.Progress @@ -21,8 +18,6 @@ import Text.InterpolatedString.Perl6 (qc) import Streaming.Prelude qualified as S import System.IO (hPrint) import Data.Maybe -import System.Environment -import System.Exit data ImportRefLogNotFound = ImportRefLogNotFound deriving stock (Typeable,Show) @@ -62,25 +57,26 @@ data IState = -- class -merelySubscribeRepo :: ( GitPerks m - , HasStorage m - , HasProgressIndicator m - , HasAPI PeerAPI UNIX m - , HasAPI LWWRefAPI UNIX m - , HasAPI RefLogAPI UNIX m - ) +merelySubscribeRepo :: forall e s m . ( GitPerks m + , HasStorage m + , HasProgressIndicator m + , HasAPI PeerAPI UNIX m + , HasAPI LWWRefAPI UNIX m + , HasAPI RefLogAPI UNIX m + , e ~ L4Proto + , s ~ Encryption e + ) => LWWRefKey HBS2Basic - -> m (Maybe HashRef) + -> m (Maybe (PubKey 'Sign s)) merelySubscribeRepo lwwKey = do ip <- getProgressIndicator sto <- getStorage subscribeLWWRef lwwKey - fetchLWWRef lwwKey - flip fix (IWaitLWWBlock 10) $ \next -> \case + r <- flip fix (IWaitLWWBlock 10) $ \next -> \case IWaitLWWBlock w | w <= 0 -> do throwIO ImportRefLogNotFound @@ -99,30 +95,12 @@ merelySubscribeRepo lwwKey = do void $ try @_ @SomeException (getRefLogMerkle lwwRefLogPubKey) subscribeRefLog lwwRefLogPubKey pause @'Seconds 0.25 - getRefLogMerkle lwwRefLogPubKey - next (IWaitRefLog 10 lwwRefLogPubKey) - - IWaitRefLog w _ | w <= 0 -> do - throwIO ImportRefLogNotFound - - IWaitRefLog w puk -> do - onProgress ip (ImportRefLogStart puk) - try @_ @SomeException (getRefLogMerkle puk) >>= \case - Left _ -> do - onProgress ip (ImportRefLogDone puk Nothing) - pause @'Seconds 2 - next (IWaitRefLog (pred w) puk) - - Right Nothing -> do - onProgress ip (ImportRefLogDone puk Nothing) - pause @'Seconds 2 - next (IWaitRefLog (pred w) puk) - - Right (Just h) -> do - pure (Just h) + pure $ Just lwwRefLogPubKey _ -> pure Nothing + onProgress ip ImportAllDone + pure r importRepoWait :: ( GitPerks m , MonadReader GitEnv m diff --git a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/RefLog.hs b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/RefLog.hs index b5be6be9..428882f6 100644 --- a/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/RefLog.hs +++ b/hbs21-git/hbs2-git-client-lib/HBS2/Git/Client/RefLog.hs @@ -19,22 +19,25 @@ instance Exception RefLogRequestTimeout instance Exception RefLogRequestError -subscribeRefLog :: (GitPerks m, HasAPI PeerAPI UNIX m) => RefLogId -> m () +doSomeRandomShit :: HasAPI PeerAPI UNIX m => m () +doSomeRandomShit = error "FUCK" + +subscribeRefLog :: forall m .(GitPerks m, HasAPI PeerAPI UNIX m) => RefLogId -> m () subscribeRefLog puk = do api <- getAPI @PeerAPI @UNIX void $ callService @RpcPollAdd api (puk, "reflog", 13) -subscribeLWWRef :: (GitPerks m, HasAPI PeerAPI UNIX m) => LWWRefKey HBS2Basic -> m () +subscribeLWWRef :: forall m . (GitPerks m, HasAPI PeerAPI UNIX m) => LWWRefKey HBS2Basic -> m () subscribeLWWRef puk = do api <- getAPI @PeerAPI @UNIX void $ callService @RpcPollAdd api (fromLwwRefKey puk, "lwwref", 17) -fetchLWWRef :: (GitPerks m, HasAPI LWWRefAPI UNIX m) => LWWRefKey HBS2Basic -> m () +fetchLWWRef :: forall m . (GitPerks m, HasAPI LWWRefAPI UNIX m) => LWWRefKey HBS2Basic -> m () fetchLWWRef key = do api <- getAPI @LWWRefAPI @UNIX void $ race (pause @'Seconds 1) (callService @RpcLWWRefFetch api key) -getRefLogMerkle :: (GitPerks m, HasAPI RefLogAPI UNIX m) => RefLogId -> m (Maybe HashRef) +getRefLogMerkle :: forall m . (GitPerks m, HasAPI RefLogAPI UNIX m) => RefLogId -> m (Maybe HashRef) getRefLogMerkle puk = do api <- getAPI @RefLogAPI @UNIX