diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 9f69a8a9..44d557fd 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -109,6 +109,7 @@ library , stm , stm-chans , text + , transformers , uniplate hs-source-dirs: lib diff --git a/hbs2-core/lib/HBS2/Net/Peer.hs b/hbs2-core/lib/HBS2/Net/Peer.hs index 4db585c3..b7609dab 100644 --- a/hbs2-core/lib/HBS2/Net/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Peer.hs @@ -3,25 +3,33 @@ module HBS2.Net.Peer where import HBS2.Prelude +import HBS2.Prelude.Plated import HBS2.Net.Proto import HBS2.Net.Messaging +import HBS2.Clock import Lens.Micro.Platform import Data.ByteString.Lazy ( ByteString ) import Data.Foldable import Control.Monad.Reader import Data.Map qualified as Map -import Data.Proxy import GHC.TypeLits -import Codec.Serialise qualified as S +import Control.Monad.Trans.Maybe + +import Codec.Serialise hiding (encode,decode) data AnyMessage e = AnyMessage Integer (Encoded e) + deriving stock (Generic) -data EngineEnv e = forall bus . (Messaging bus e (AnyMessage e)) => +instance Serialise (Encoded e) => Serialise (AnyMessage e) + +data EngineEnv e = forall bus . ( Messaging bus e ByteString + , Serialise (Encoded e) + ) => EngineEnv { _peer :: Maybe (Peer e) , _self :: Peer e - , bus :: bus + , bus :: bus } makeLenses 'EngineEnv @@ -66,7 +74,8 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where let proto = protoId @e @p (Proxy @p) ask >>= \case EngineEnv { _self = s, bus = b} -> do - liftIO $ sendTo b (To p) (From s) (AnyMessage proto (encode msg)) + let bs = serialise (AnyMessage @e proto (encode msg)) + liftIO $ sendTo b (To p) (From s) bs instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where response resp = do @@ -77,12 +86,14 @@ instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where , _self = s , bus = b } ) -> do - liftIO $ sendTo b (To p) (From s) (AnyMessage proto (encode resp)) + let bs = serialise (AnyMessage @e proto (encode resp)) + liftIO $ sendTo b (To p) (From s) bs _ -> pure () newEnv :: forall e bus m . ( Monad m - , Messaging bus e (AnyMessage e) + , Messaging bus e ByteString + , Serialise (Encoded e) ) => Peer e -> bus @@ -91,7 +102,12 @@ newEnv :: forall e bus m . ( Monad m newEnv p pipe = pure $ EngineEnv Nothing p pipe -runPeer :: MonadIO m => EngineEnv e -> [AnyProtocol e (EngineM e m)] -> m a +runPeer :: forall e m a . ( MonadIO m + ) + => EngineEnv e + -> [AnyProtocol e (EngineM e m)] + -> m a + runPeer env@(EngineEnv {bus = pipe}) hh = do let me = env ^. self @@ -105,12 +121,20 @@ runPeer env@(EngineEnv {bus = pipe}) hh = do forever $ do messages <- receive pipe (To me) - for_ messages $ \(From pip, AnyMessage n msg) -> local (set peer (Just pip)) do + for_ messages $ \(From pip, bs) -> do - case Map.lookup n disp of - Nothing -> pure () + case deserialiseOrFail @(AnyMessage e) bs of - Just (AnyProtocol { protoDecode = decoder - , handle = h - }) -> maybe (pure ()) h (decoder msg) + Left _-> pure () -- liftIO $ print "failed to deserialise" + + Right (AnyMessage n msg) -> do + + local (set peer (Just pip)) do + + case Map.lookup n disp of + Nothing -> pure () + + Just (AnyProtocol { protoDecode = decoder + , handle = h + }) -> maybe (pure ()) h (decoder msg) diff --git a/hbs2-core/test/TestUniqProtoId.hs b/hbs2-core/test/TestUniqProtoId.hs index f400d356..a26ac3cc 100644 --- a/hbs2-core/test/TestUniqProtoId.hs +++ b/hbs2-core/test/TestUniqProtoId.hs @@ -34,7 +34,6 @@ instance HasProtocol Fake (PeekPoke Fake) where decode = readMay encode = show - pingPongHandler :: forall e m . ( MonadIO m , Response e (PingPong e) m , HasProtocol e (PingPong e) @@ -90,6 +89,8 @@ testUniqProtoId = do , makeResponse peekPokeHandler ] - void $ waitAnyCatchCancel [pip1, pip2] + (_, e) <- waitAnyCatchCancel [pip1, pip2] + + print e