This commit is contained in:
Dmitry Zuikov 2023-01-16 21:52:18 +03:00
parent edfcaabd0e
commit b0e4152d98
2 changed files with 81 additions and 98 deletions

View File

@ -6,17 +6,20 @@ import Data.Kind
import Data.Proxy import Data.Proxy
import GHC.TypeLits import GHC.TypeLits
class HasPeer p where -- e -> Transport (like, UDP or TChan)
data family (Peer p) :: Type -- p -> L4 Protocol (like Ping/Pong)
class (KnownNat (ProtocolId a), HasPeer p) => HasProtocol p a | a -> p where class HasPeer e where
type family ProtocolId a = (id :: Nat) | id -> a data family (Peer e) :: Type
type family Encoded p :: Type
protoId :: forall . KnownNat (ProtocolId a) => Proxy a -> Integer class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where
protoId _ = natVal (Proxy @(ProtocolId a)) type family ProtocolId p = (id :: Nat) | id -> p
type family Encoded e :: Type
decode :: Encoded p -> Maybe a protoId :: forall . KnownNat (ProtocolId p) => Proxy p -> Integer
encode :: a -> Encoded p protoId _ = natVal (Proxy @(ProtocolId p))
decode :: Encoded e -> Maybe p
encode :: p -> Encoded e

View File

@ -17,17 +17,18 @@ import Data.Map (Map)
import Control.Monad.Reader import Control.Monad.Reader
import Data.ByteString (ByteString) import Data.ByteString (ByteString)
import Data.Foldable import Control.Concurrent.Async
import Data.List qualified as List
import Data.Cache qualified as Cache
import Data.Cache (Cache)
import Control.Concurrent.STM.TChan as Chan
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan as Chan
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable
import Data.Hashable import Data.Hashable
import Data.List qualified as List
import Data.Maybe import Data.Maybe
import Safe import Safe
import Prettyprinter import Prettyprinter hiding (pipe)
newtype From a = From (Peer a) newtype From a = From (Peer a)
@ -76,16 +77,15 @@ instance ( (HasPeer proto, Hashable (Peer proto))
data AnyProtocol e m = forall p a . ( HasProtocol p a data AnyProtocol e m = forall p . ( HasProtocol e p
, KnownNat (ProtocolId a) , KnownNat (ProtocolId p)
, Response p a m , Response e p m
, e ~ Encoded p ) =>
) =>
AnyProtocol AnyProtocol
{ myProtoId :: Integer { myProtoId :: Integer
, protoDecode :: Encoded p -> Maybe a , protoDecode :: Encoded e -> Maybe p
, protoEncode :: a -> Encoded p , protoEncode :: p -> Encoded e
, handle :: a -> m () , handle :: p -> m ()
} }
@ -95,15 +95,15 @@ class Response e p (m :: Type -> Type) where
class Request e p (m :: Type -> Type) where class Request e p (m :: Type -> Type) where
request :: Peer e -> p -> m () request :: Peer e -> p -> m ()
makeResponse :: forall a p m . ( MonadIO m makeResponse :: forall e p m . ( MonadIO m
, Response a p m , Response e p m
, HasProtocol a p , HasProtocol e p
) )
=> (p -> m ()) -> AnyProtocol (Encoded a) m => (p -> m ()) -> AnyProtocol e m
makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p))
, protoDecode = decode @a , protoDecode = decode
, protoEncode = encode @a , protoEncode = encode
, handle = h , handle = h
} }
@ -141,10 +141,40 @@ instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where
_ -> pure () _ -> pure ()
newEnv :: forall e bus m . (Monad m, Messaging bus e (AnyMessage e)) => Peer e -> bus -> m (EngineEnv e)
newEnv p pipe = pure $ EngineEnv Nothing p pipe
runPeer :: MonadIO m => EngineEnv e -> [AnyProtocol e (EngineM e m)] -> m a
runPeer env@(EngineEnv {self = me, bus = pipe}) hh = do
let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ]
let disp = Map.fromList resp
runEngineM env $ do
forever $ do
messages <- receive pipe (To me)
for_ messages $ \(From pip, AnyMessage n msg) -> do
local (\e -> e { peer = Just pip } ) $ do
case Map.lookup n disp of
Just (AnyProtocol {protoDecode = decoder, handle = h}) -> maybe (pure ()) h (decoder msg)
Nothing -> pure ()
data PingPong = Ping Int data PingPong = Ping Int
| Pong Int | Pong Int
deriving stock (Show,Read) deriving stock (Show,Read)
data PeekPoke = Peek Int
| Poke Int
| Nop
deriving stock (Show,Read)
data Fake data Fake
@ -159,12 +189,6 @@ instance HasProtocol Fake PingPong where
decode = readMay decode = readMay
encode = show encode = show
data PeekPoke = Peek Int
| Poke Int
| Nop
deriving stock (Show,Read)
instance HasProtocol Fake PeekPoke where instance HasProtocol Fake PeekPoke where
type instance ProtocolId PeekPoke = 2 type instance ProtocolId PeekPoke = 2
type instance Encoded Fake = String type instance Encoded Fake = String
@ -186,79 +210,35 @@ peekPokeHandler =
Nop -> liftIO (print $ pretty "effect: Nop") >> response @a (Peek 1) Nop -> liftIO (print $ pretty "effect: Nop") >> response @a (Peek 1)
runPeer :: forall e p bus . (
HasProtocol e p
, Messaging bus e (AnyMessage e)
, Response e p (EngineM e IO)
)
=> Peer e
-> bus
-> [AnyProtocol (Encoded e) (EngineM e IO)]
-> IO ()
runPeer peer pipe hh = do
resp <- forM hh $ \a@(AnyProtocol { myProtoId = pid }) -> do
pure (pid, a)
let disp = Map.fromList resp :: Map Integer (AnyProtocol (Encoded e) (EngineM e IO))
let env = EngineEnv Nothing peer pipe
runEngineM env $ do
forever $ do
messages <- receive pipe (To peer)
for_ messages $ \(From pip, AnyMessage n msg) -> do
local (\e -> e { peer = Just pip } ) $ do
case Map.lookup n disp of
Just (AnyProtocol {protoDecode = decoder, handle = h}) -> maybe (pure ()) h (decoder msg)
Nothing -> pure ()
testUniqiProtoId :: IO () testUniqiProtoId :: IO ()
testUniqiProtoId = do testUniqiProtoId = do
fake <- newFakeP2P True fake <- newFakeP2P True
-- runPeer @Fake (FakePeer 0) fake let peer0 = FakePeer 0
-- [ makeResponse pingPongHandler let peer1 = FakePeer 1
-- , makeResponse peekPokeHandler
-- ]
-- undefined env0 <- newEnv peer0 fake
env1 <- newEnv peer1 fake
let env = EngineEnv @Fake Nothing (FakePeer 0) fake runEngineM env0 $ do
request peer1 (Ping 0)
let wtf = [ makeResponse pingPongHandler runEngineM env1 $ do
, makeResponse peekPokeHandler request peer0 (Peek 0)
] :: [AnyProtocol (Encoded Fake) (EngineM Fake IO)]
resp <- forM wtf $ \a@(AnyProtocol { myProtoId = pid }) -> do pip1 <- async $
pure (pid, a) runPeer env0
[ makeResponse pingPongHandler
, makeResponse peekPokeHandler
]
let decoders = Map.fromList resp :: Map Integer (AnyProtocol (Encoded Fake) (EngineM Fake IO)) pip2 <- async $
runPeer env1
[ makeResponse pingPongHandler
, makeResponse peekPokeHandler
]
runEngineM env $ do void $ waitAnyCatchCancel [pip1, pip2]
request (FakePeer 0) (Ping 0)
request (FakePeer 0) (Peek 1)
forever $ do
messages <- receive fake (To (FakePeer 0))
for_ messages $ \(From pip, AnyMessage n msg) -> do
local (\e -> e { peer = Just pip } ) $ do
case Map.lookup n decoders of
Just (AnyProtocol {protoDecode = decoder, handle = h}) -> maybe (pure ()) h (decoder msg)
Nothing -> pure ()
pause ( 0.25 :: Timeout 'Seconds)