ResponseM monad

This commit is contained in:
Dmitry Zuikov 2023-01-18 07:16:59 +03:00
parent 6021c70aef
commit 6e67d1d0ae
3 changed files with 57 additions and 31 deletions

View File

@ -18,6 +18,7 @@ import Data.Map qualified as Map
import GHC.TypeLits import GHC.TypeLits
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.Async import Control.Concurrent.Async
import Data.Kind
import Codec.Serialise hiding (encode,decode) import Codec.Serialise hiding (encode,decode)
@ -26,17 +27,6 @@ data AnyMessage e = AnyMessage Integer (Encoded e)
instance Serialise (Encoded e) => Serialise (AnyMessage e) instance Serialise (Encoded e) => Serialise (AnyMessage e)
newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader (EngineEnv e)
, MonadTrans
)
-- instance MonadTrans (EngineM (EngineEnv e)) where
-- lift = lift
data EngineEnv e = forall bus . ( Messaging bus e ByteString data EngineEnv e = forall bus . ( Messaging bus e ByteString
, Serialise (Encoded e) , Serialise (Encoded e)
@ -48,8 +38,36 @@ data EngineEnv e = forall bus . ( Messaging bus e ByteString
, defer :: Pipeline IO () , defer :: Pipeline IO ()
} }
newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader (EngineEnv e)
, MonadTrans
)
data ResponseEnv e =
ResponseEnv
{ _engine :: EngineEnv e
, _respPeer :: Peer e
}
newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a }
deriving newtype ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader (ResponseEnv e)
, MonadTrans
)
makeLenses 'EngineEnv makeLenses 'EngineEnv
makeLenses 'ResponseEnv
data AnyProtocol e m = forall p . ( HasProtocol e p data AnyProtocol e m = forall p . ( HasProtocol e p
, Response e p m , Response e p m
) => ) =>
@ -76,6 +94,8 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p))
runEngineM :: EngineEnv e -> EngineM e m a -> m a runEngineM :: EngineEnv e -> EngineM e m a -> m a
runEngineM e f = runReaderT (fromEngine f) e runEngineM e f = runReaderT (fromEngine f) e
runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a
runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p)
instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where
request p msg = do request p msg = do
@ -85,24 +105,25 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where
let bs = serialise (AnyMessage @e proto (encode msg)) let bs = serialise (AnyMessage @e proto (encode msg))
liftIO $ sendTo b (To p) (From s) bs liftIO $ sendTo b (To p) (From s) bs
instance (HasProtocol e p) => Response e p (EngineM e IO) where instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where
thatPeer _ = asks (view respPeer)
deferred _ m = do deferred _ m = do
e@(EngineEnv { defer = d }) <- ask e@(EngineEnv { defer = d }) <- asks (view engine)
addJob d (runEngineM e m) p <- asks (view respPeer)
addJob d (runEngineM e (runResponseM e p m))
response resp = do response resp = do
env <- ask env <- ask
let p = env ^. respPeer
let s = env ^. (engine . self)
let proto = protoId @e @p (Proxy @p) let proto = protoId @e @p (Proxy @p)
case env of
(EngineEnv { _peer = Just p
, _self = s
, bus = b
} ) -> do
let bs = serialise (AnyMessage @e proto (encode resp)) let bs = serialise (AnyMessage @e proto (encode resp))
liftIO $ sendTo b (To p) (From s) bs
_ -> pure ()
-- TODO: wrap somehow
case env ^. engine of
EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs
newEnv :: forall e bus m . ( Monad m newEnv :: forall e bus m . ( Monad m
, MonadIO m , MonadIO m
@ -120,7 +141,7 @@ newEnv p pipe = do
runPeer :: forall e m a . ( MonadIO m runPeer :: forall e m a . ( MonadIO m
) )
=> EngineEnv e => EngineEnv e
-> [AnyProtocol e (EngineM e m)] -> [AnyProtocol e (ResponseM e m)]
-> m a -> m a
runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
@ -135,8 +156,6 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
runEngineM env $ do runEngineM env $ do
-- void $ liftIO $ runPipeline d
forever $ do forever $ do
messages <- receive pipe (To me) messages <- receive pipe (To me)
@ -155,5 +174,5 @@ runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
Just (AnyProtocol { protoDecode = decoder Just (AnyProtocol { protoDecode = decoder
, handle = h , handle = h
}) -> maybe (pure ()) h (decoder msg) }) -> maybe (pure ()) (runResponseM env pip . h) (decoder msg)

View File

@ -20,6 +20,7 @@ class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where
class (MonadIO m, HasProtocol e p) => Response e p m | p -> e where class (MonadIO m, HasProtocol e p) => Response e p m | p -> e where
response :: p -> m () response :: p -> m ()
deferred :: Proxy p -> m () -> m () deferred :: Proxy p -> m () -> m ()
thatPeer :: Proxy p -> m (Peer e)
class Request e p (m :: Type -> Type) | p -> e where class Request e p (m :: Type -> Type) | p -> e where
request :: Peer e -> p -> m () request :: Peer e -> p -> m ()

View File

@ -89,12 +89,12 @@ blockSizeProto getBlockSize evHasBlock =
-- deferred (Proxy @(BlockSize e)) $ do -- deferred (Proxy @(BlockSize e)) $ do
NoBlock h -> do NoBlock h -> do
evHasBlock ( undefined, h, Nothing ) that <- thatPeer (Proxy @(BlockSize e))
debug $ "NoBlock" <+> pretty h evHasBlock ( that, h, Nothing )
BlockSize h sz -> do BlockSize h sz -> do
evHasBlock ( undefined, h, Just sz ) that <- thatPeer (Proxy @(BlockSize e))
debug $ "BlockSize" <+> pretty h <+> pretty sz evHasBlock ( that, h, Just sz )
main :: IO () main :: IO ()
main = do main = do
@ -136,8 +136,11 @@ runFakePeer env = do
simpleStorageStop storage simpleStorageStop storage
let handleBlockInfo (p, h, sz) = do
debug $ pretty p <+> "has block" <+> pretty h <+> pretty sz
runPeer env runPeer env
[ makeResponse (blockSizeProto (hasBlock storage) dontHandle) [ makeResponse (blockSizeProto (hasBlock storage) handleBlockInfo)
] ]
cancel w cancel w
@ -164,6 +167,9 @@ test1 = do
request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")) request p1 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")) request p1 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
request p0 (GetBlockSize @Fake (fromString "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ"))
request p0 (GetBlockSize @Fake (fromString "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt"))
pause ( 0.5 :: Timeout 'Seconds) pause ( 0.5 :: Timeout 'Seconds)
mapM_ cancel peerz mapM_ cancel peerz