From 6e67d1d0aec69cc49ed3eac35788bbb45525d1c8 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 18 Jan 2023 07:16:59 +0300 Subject: [PATCH] ResponseM monad --- hbs2-core/lib/HBS2/Net/Peer.hs | 71 +++++++++++++++++---------- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 1 + hbs2-tests/test/Main.hs | 16 ++++-- 3 files changed, 57 insertions(+), 31 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Peer.hs b/hbs2-core/lib/HBS2/Net/Peer.hs index c6b8d7db..127c64a1 100644 --- a/hbs2-core/lib/HBS2/Net/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Peer.hs @@ -18,6 +18,7 @@ import Data.Map qualified as Map import GHC.TypeLits import Control.Monad.Trans.Maybe import Control.Concurrent.Async +import Data.Kind import Codec.Serialise hiding (encode,decode) @@ -26,17 +27,6 @@ data AnyMessage e = AnyMessage Integer (Encoded e) instance Serialise (Encoded e) => Serialise (AnyMessage e) -newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (EngineEnv e) - , MonadTrans - ) - --- instance MonadTrans (EngineM (EngineEnv e)) where --- lift = lift data EngineEnv e = forall bus . ( Messaging bus e ByteString , Serialise (Encoded e) @@ -48,8 +38,36 @@ data EngineEnv e = forall bus . ( Messaging bus e ByteString , defer :: Pipeline IO () } + +newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (EngineEnv e) + , MonadTrans + ) + +data ResponseEnv e = + ResponseEnv + { _engine :: EngineEnv e + , _respPeer :: Peer e + } + +newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (ResponseEnv e) + , MonadTrans + ) + + makeLenses 'EngineEnv +makeLenses 'ResponseEnv + data AnyProtocol e m = forall p . ( HasProtocol e p , Response e p m ) => @@ -76,6 +94,8 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) runEngineM :: EngineEnv e -> EngineM e m a -> m a runEngineM e f = runReaderT (fromEngine f) e +runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a +runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p) instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where request p msg = do @@ -85,24 +105,25 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where let bs = serialise (AnyMessage @e proto (encode msg)) liftIO $ sendTo b (To p) (From s) bs -instance (HasProtocol e p) => Response e p (EngineM e IO) where +instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where + + thatPeer _ = asks (view respPeer) deferred _ m = do - e@(EngineEnv { defer = d }) <- ask - addJob d (runEngineM e m) + e@(EngineEnv { defer = d }) <- asks (view engine) + p <- asks (view respPeer) + addJob d (runEngineM e (runResponseM e p m)) response resp = do env <- ask + let p = env ^. respPeer + let s = env ^. (engine . self) let proto = protoId @e @p (Proxy @p) - case env of - (EngineEnv { _peer = Just p - , _self = s - , bus = b - } ) -> do - let bs = serialise (AnyMessage @e proto (encode resp)) - liftIO $ sendTo b (To p) (From s) bs - _ -> pure () + let bs = serialise (AnyMessage @e proto (encode resp)) + -- TODO: wrap somehow + case env ^. engine of + EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs newEnv :: forall e bus m . ( Monad m , MonadIO m @@ -120,7 +141,7 @@ newEnv p pipe = do runPeer :: forall e m a . ( MonadIO m ) => EngineEnv e - -> [AnyProtocol e (EngineM e m)] + -> [AnyProtocol e (ResponseM e m)] -> m a runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do @@ -135,8 +156,6 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do runEngineM env $ do - -- void $ liftIO $ runPipeline d - forever $ do messages <- receive pipe (To me) @@ -155,5 +174,5 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do Just (AnyProtocol { protoDecode = decoder , handle = h - }) -> maybe (pure ()) h (decoder msg) + }) -> maybe (pure ()) (runResponseM env pip . h) (decoder msg) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 75f61644..ebd60b87 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -20,6 +20,7 @@ class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where class (MonadIO m, HasProtocol e p) => Response e p m | p -> e where response :: p -> m () deferred :: Proxy p -> m () -> m () + thatPeer :: Proxy p -> m (Peer e) class Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index 30ed1774..4c2a08fc 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -89,12 +89,12 @@ blockSizeProto getBlockSize evHasBlock = -- deferred (Proxy @(BlockSize e)) $ do NoBlock h -> do - evHasBlock ( undefined, h, Nothing ) - debug $ "NoBlock" <+> pretty h + that <- thatPeer (Proxy @(BlockSize e)) + evHasBlock ( that, h, Nothing ) BlockSize h sz -> do - evHasBlock ( undefined, h, Just sz ) - debug $ "BlockSize" <+> pretty h <+> pretty sz + that <- thatPeer (Proxy @(BlockSize e)) + evHasBlock ( that, h, Just sz ) main :: IO () main = do @@ -136,8 +136,11 @@ runFakePeer env = do simpleStorageStop storage + let handleBlockInfo (p, h, sz) = do + debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz + runPeer env - [ makeResponse (blockSizeProto (hasBlock storage) dontHandle) + [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo) ] cancel w @@ -164,6 +167,9 @@ test1 = do request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) + request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) + pause ( 0.5 :: Timeout 'Seconds) mapM_ cancel peerz