attempt to extend pipeline for rpc

This commit is contained in:
Dmitry Zuikov 2024-03-16 08:54:38 +03:00
parent e702f3609f
commit c6b90878c3
9 changed files with 171 additions and 76 deletions

View File

@ -10,9 +10,9 @@ BIN_DIR := ./bin
BINS := \ BINS := \
hbs2 \ hbs2 \
hbs2-peer \ hbs2-peer \
hbs2-reposync \
hbs2-keyman \ hbs2-keyman \
hbs2-git-reposync \ hbs2-git-reposync \
hbs2-git-subscribe \
git-remote-hbs2 \ git-remote-hbs2 \
git-hbs2 \ git-hbs2 \
git-remote-hbs21 \ git-remote-hbs21 \

View File

@ -405,7 +405,7 @@ runPeerM :: forall e m . ( MonadIO m
runPeerM env f = do runPeerM env f = do
let de = view envDeferred env let de = view envDeferred env
as <- liftIO $ replicateM 8 $ async $ runPipeline de as <- liftIO $ replicateM 32 $ async $ runPipeline de
sw <- liftIO $ async $ forever $ withPeerM env $ do sw <- liftIO $ async $ forever $ withPeerM env $ do
pause defSweepTimeout pause defSweepTimeout

View File

@ -1163,7 +1163,7 @@ runPeer opts = Exception.handle (\e -> myException e
envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource envrl <- newNotifyEnvServer @(RefLogEvents L4Proto) refLogNotifySource
w1 <- asyncLinked $ runNotifyWorkerServer env w1 <- asyncLinked $ runNotifyWorkerServer env
w2 <- asyncLinked $ runNotifyWorkerServer envrl w2 <- asyncLinked $ runNotifyWorkerServer envrl
runProto @UNIX wws <- replicateM 1 $ async $ runProto @UNIX
[ makeResponse (makeServer @PeerAPI) [ makeResponse (makeServer @PeerAPI)
, makeResponse (makeServer @RefLogAPI) , makeResponse (makeServer @RefLogAPI)
, makeResponse (makeServer @RefChanAPI) , makeResponse (makeServer @RefChanAPI)
@ -1172,7 +1172,7 @@ runPeer opts = Exception.handle (\e -> myException e
, makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env) , makeResponse (makeNotifyServer @(RefChanEvents L4Proto) env)
, makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl) , makeResponse (makeNotifyServer @(RefLogEvents L4Proto) envrl)
] ]
mapM_ wait [w1,w2] mapM_ wait (w1 : w2 : wws )
void $ waitAnyCancel $ w <> [ loop void $ waitAnyCancel $ w <> [ loop
, m1 , m1

View File

@ -0,0 +1,111 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE UndecidableInstances #-}
module Main where
import HBS2.Git.Client.Prelude hiding (info)
import HBS2.Git.Client.App hiding (_progress, _storage, _peerAPI, _lwwAPI, _refLogAPI)
import HBS2.Git.Client.Progress
import HBS2.Git.Client.Import
import HBS2.Git.Client.RefLog
import HBS2.Peer.CLI.Detect
import Options.Applicative
import Data.Semigroup ((<>))
main :: IO ()
main = do
let parser = subscribe
<$> optional (strOption
( long "socket"
<> short 's'
<> metavar "SOCKET"
<> help "Socket file path"))
<*> argument pLww (metavar "LWWREF")
join $ execParser (info (parser <**> helper)
( fullDesc
<> progDesc "Parse command line arguments"
<> header "Command line arguments parsing example"))
where
pLww :: ReadM (LWWRefKey HBS2Basic)
pLww = maybeReader fromStringMay
data MyStuff =
MyStuff
{ _peerAPI :: ServiceCaller PeerAPI UNIX
, _lwwAPI :: ServiceCaller LWWRefAPI UNIX
, _refLogAPI :: ServiceCaller RefLogAPI UNIX
, _storage :: AnyStorage
, _progress :: AnyProgress
}
newtype MyApp m a = MyApp { fromMyApp :: ReaderT MyStuff m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadUnliftIO
, MonadThrow
, MonadReader MyStuff
)
instance Monad m => HasProgressIndicator (MyApp m) where
getProgressIndicator = asks _progress
instance Monad m => HasStorage (MyApp m) where
getStorage = asks _storage
instance Monad m => HasAPI PeerAPI UNIX (MyApp m) where
getAPI = asks _peerAPI
instance Monad m => HasAPI LWWRefAPI UNIX (MyApp m) where
getAPI = asks _lwwAPI
instance Monad m => HasAPI RefLogAPI UNIX (MyApp m) where
getAPI = asks _refLogAPI
subscribe :: forall m . MonadUnliftIO m => Maybe String -> LWWRefKey HBS2Basic -> m ()
subscribe soname' ref = do
soname <- maybe1 soname' detectRPC (pure.Just) `orDie` "can't locate rpc"
flip runContT pure do
client <- lift $ race (pause @'Seconds 1) (newMessagingUnix False 1.0 soname)
>>= orThrowUser ("can't connect to" <+> pretty soname)
q <- lift newProgressQ
let ip = AnyProgress q
void $ ContT $ withAsync $ runMessagingUnix client
void $ ContT $ withAsync $ drawProgress q
peerAPI <- makeServiceCaller @PeerAPI (fromString soname)
refLogAPI <- makeServiceCaller @RefLogAPI (fromString soname)
storageAPI <- makeServiceCaller @StorageAPI (fromString soname)
lwwAPI <- makeServiceCaller @LWWRefAPI (fromString soname)
let sto = AnyStorage (StorageClient storageAPI)
let endpoints = [ Endpoint @UNIX peerAPI
, Endpoint @UNIX refLogAPI
, Endpoint @UNIX lwwAPI
, Endpoint @UNIX storageAPI
]
void $ ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
let app = MyStuff peerAPI lwwAPI refLogAPI sto ip
lift $ flip runReaderT app $ fromMyApp do
merelySubscribeRepo ref
onProgress ip ImportAllDone
hFlush stdout
hFlush stderr
pure ()

View File

@ -96,14 +96,6 @@ pImport = do
git <- Git.findGitDir >>= orThrowUser "not a git dir" git <- Git.findGitDir >>= orThrowUser "not a git dir"
importRepoWait puk importRepoWait puk
pSubscribe :: GitPerks m => Parser (GitCLI m ())
pSubscribe = do
lww <- argument pLwwKey (metavar "LWWREF")
pure do
merelySubscribeRepo lww >>= liftIO . \case
Just x -> print $ "subscribed" <+> pretty x
Nothing -> exitFailure
pTools :: GitPerks m => Parser (GitCLI m ()) pTools :: GitPerks m => Parser (GitCLI m ())
pTools = hsubparser ( command "dump-pack" (info pDumpPack (progDesc "dump hbs2 git pack")) pTools = hsubparser ( command "dump-pack" (info pDumpPack (progDesc "dump hbs2 git pack"))
<> command "show-ref" (info pShowRef (progDesc "show current references")) <> command "show-ref" (info pShowRef (progDesc "show current references"))

View File

@ -45,12 +45,37 @@ newtype GitCLI m a = GitCLI { fromGitCLI :: ReaderT GitEnv m a }
, Monad , Monad
, MonadIO , MonadIO
, MonadUnliftIO , MonadUnliftIO
, MonadTrans
, MonadReader GitEnv , MonadReader GitEnv
, MonadThrow , MonadThrow
) )
type GitPerks m = ( MonadUnliftIO m, MonadThrow m ) -- type GitPerks m = ( MonadUnliftIO m, MonadThrow m )
type GitPerks m = ( MonadUnliftIO m )
instance Monad m => HasProgressIndicator (GitCLI m) where
getProgressIndicator = asks _progress
instance Monad m => HasStorage (GitCLI m) where
getStorage = asks _storage
instance Monad m => HasAPI PeerAPI UNIX (GitCLI m) where
getAPI = asks _peerAPI
instance Monad m => HasAPI LWWRefAPI UNIX (GitCLI m) where
getAPI = asks _lwwRefAPI
instance Monad m => HasAPI RefLogAPI UNIX (GitCLI m) where
getAPI = asks _refLogAPI
instance MonadReader GitEnv m => HasAPI RefLogAPI UNIX (ExceptT e m) where
getAPI = asks _refLogAPI
instance MonadReader GitEnv m => HasAPI LWWRefAPI UNIX (ExceptT e m) where
getAPI = asks _lwwRefAPI
instance MonadReader GitEnv m => HasAPI PeerAPI UNIX (ExceptT e m) where
getAPI = asks _peerAPI
newGitEnv :: GitPerks m newGitEnv :: GitPerks m
=> AnyProgress => AnyProgress

View File

@ -49,19 +49,5 @@ data GitEnv =
, _keyringCache :: TVar (HashMap HashRef [KeyringEntry HBS2Basic]) , _keyringCache :: TVar (HashMap HashRef [KeyringEntry HBS2Basic])
} }
instance (Monad m, MonadReader GitEnv m) => HasProgressIndicator m where
getProgressIndicator = asks _progress
instance MonadReader GitEnv m => HasStorage m where
getStorage = asks _storage
instance MonadReader GitEnv m => HasAPI PeerAPI UNIX m where
getAPI = asks _peerAPI
instance MonadReader GitEnv m => HasAPI LWWRefAPI UNIX m where
getAPI = asks _lwwRefAPI
instance MonadReader GitEnv m => HasAPI RefLogAPI UNIX m where
getAPI = asks _refLogAPI
makeLenses 'GitEnv makeLenses 'GitEnv

View File

@ -1,10 +1,7 @@
module HBS2.Git.Client.Import where module HBS2.Git.Client.Import where
import HBS2.Git.Client.Prelude hiding (info) import HBS2.Git.Client.Prelude hiding (info)
import HBS2.Git.Client.App.Types import HBS2.Git.Client.App.Types
import HBS2.Git.Client.Config
import HBS2.Git.Client.State import HBS2.Git.Client.State
import HBS2.Git.Client.RefLog import HBS2.Git.Client.RefLog
import HBS2.Git.Client.Progress import HBS2.Git.Client.Progress
@ -21,8 +18,6 @@ import Text.InterpolatedString.Perl6 (qc)
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
import System.IO (hPrint) import System.IO (hPrint)
import Data.Maybe import Data.Maybe
import System.Environment
import System.Exit
data ImportRefLogNotFound = ImportRefLogNotFound data ImportRefLogNotFound = ImportRefLogNotFound
deriving stock (Typeable,Show) deriving stock (Typeable,Show)
@ -62,25 +57,26 @@ data IState =
-- class -- class
merelySubscribeRepo :: ( GitPerks m merelySubscribeRepo :: forall e s m . ( GitPerks m
, HasStorage m , HasStorage m
, HasProgressIndicator m , HasProgressIndicator m
, HasAPI PeerAPI UNIX m , HasAPI PeerAPI UNIX m
, HasAPI LWWRefAPI UNIX m , HasAPI LWWRefAPI UNIX m
, HasAPI RefLogAPI UNIX m , HasAPI RefLogAPI UNIX m
, e ~ L4Proto
, s ~ Encryption e
) )
=> LWWRefKey HBS2Basic => LWWRefKey HBS2Basic
-> m (Maybe HashRef) -> m (Maybe (PubKey 'Sign s))
merelySubscribeRepo lwwKey = do merelySubscribeRepo lwwKey = do
ip <- getProgressIndicator ip <- getProgressIndicator
sto <- getStorage sto <- getStorage
subscribeLWWRef lwwKey subscribeLWWRef lwwKey
fetchLWWRef lwwKey fetchLWWRef lwwKey
flip fix (IWaitLWWBlock 10) $ \next -> \case r <- flip fix (IWaitLWWBlock 10) $ \next -> \case
IWaitLWWBlock w | w <= 0 -> do IWaitLWWBlock w | w <= 0 -> do
throwIO ImportRefLogNotFound throwIO ImportRefLogNotFound
@ -99,30 +95,12 @@ merelySubscribeRepo lwwKey = do
void $ try @_ @SomeException (getRefLogMerkle lwwRefLogPubKey) void $ try @_ @SomeException (getRefLogMerkle lwwRefLogPubKey)
subscribeRefLog lwwRefLogPubKey subscribeRefLog lwwRefLogPubKey
pause @'Seconds 0.25 pause @'Seconds 0.25
getRefLogMerkle lwwRefLogPubKey pure $ Just lwwRefLogPubKey
next (IWaitRefLog 10 lwwRefLogPubKey)
IWaitRefLog w _ | w <= 0 -> do
throwIO ImportRefLogNotFound
IWaitRefLog w puk -> do
onProgress ip (ImportRefLogStart puk)
try @_ @SomeException (getRefLogMerkle puk) >>= \case
Left _ -> do
onProgress ip (ImportRefLogDone puk Nothing)
pause @'Seconds 2
next (IWaitRefLog (pred w) puk)
Right Nothing -> do
onProgress ip (ImportRefLogDone puk Nothing)
pause @'Seconds 2
next (IWaitRefLog (pred w) puk)
Right (Just h) -> do
pure (Just h)
_ -> pure Nothing _ -> pure Nothing
onProgress ip ImportAllDone
pure r
importRepoWait :: ( GitPerks m importRepoWait :: ( GitPerks m
, MonadReader GitEnv m , MonadReader GitEnv m

View File

@ -19,22 +19,25 @@ instance Exception RefLogRequestTimeout
instance Exception RefLogRequestError instance Exception RefLogRequestError
subscribeRefLog :: (GitPerks m, HasAPI PeerAPI UNIX m) => RefLogId -> m () doSomeRandomShit :: HasAPI PeerAPI UNIX m => m ()
doSomeRandomShit = error "FUCK"
subscribeRefLog :: forall m .(GitPerks m, HasAPI PeerAPI UNIX m) => RefLogId -> m ()
subscribeRefLog puk = do subscribeRefLog puk = do
api <- getAPI @PeerAPI @UNIX api <- getAPI @PeerAPI @UNIX
void $ callService @RpcPollAdd api (puk, "reflog", 13) void $ callService @RpcPollAdd api (puk, "reflog", 13)
subscribeLWWRef :: (GitPerks m, HasAPI PeerAPI UNIX m) => LWWRefKey HBS2Basic -> m () subscribeLWWRef :: forall m . (GitPerks m, HasAPI PeerAPI UNIX m) => LWWRefKey HBS2Basic -> m ()
subscribeLWWRef puk = do subscribeLWWRef puk = do
api <- getAPI @PeerAPI @UNIX api <- getAPI @PeerAPI @UNIX
void $ callService @RpcPollAdd api (fromLwwRefKey puk, "lwwref", 17) void $ callService @RpcPollAdd api (fromLwwRefKey puk, "lwwref", 17)
fetchLWWRef :: (GitPerks m, HasAPI LWWRefAPI UNIX m) => LWWRefKey HBS2Basic -> m () fetchLWWRef :: forall m . (GitPerks m, HasAPI LWWRefAPI UNIX m) => LWWRefKey HBS2Basic -> m ()
fetchLWWRef key = do fetchLWWRef key = do
api <- getAPI @LWWRefAPI @UNIX api <- getAPI @LWWRefAPI @UNIX
void $ race (pause @'Seconds 1) (callService @RpcLWWRefFetch api key) void $ race (pause @'Seconds 1) (callService @RpcLWWRefFetch api key)
getRefLogMerkle :: (GitPerks m, HasAPI RefLogAPI UNIX m) => RefLogId -> m (Maybe HashRef) getRefLogMerkle :: forall m . (GitPerks m, HasAPI RefLogAPI UNIX m) => RefLogId -> m (Maybe HashRef)
getRefLogMerkle puk = do getRefLogMerkle puk = do
api <- getAPI @RefLogAPI @UNIX api <- getAPI @RefLogAPI @UNIX