diff --git a/hbs2-core/lib/HBS2/Net/Messaging.hs b/hbs2-core/lib/HBS2/Net/Messaging.hs index 9d6ccc3f..bdcb5161 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging.hs @@ -12,8 +12,7 @@ newtype To a = To (Peer a) class IsPeer addr => Messaging bus addr msg | bus -> addr, bus -> msg where sendTo :: MonadIO m => bus -> To addr -> From addr -> msg -> m () - receive :: MonadIO m => bus -> To addr -> m [msg] - + receive :: MonadIO m => bus -> To addr -> m [(From addr, msg)] -- data AnyMessaging p m = forall bus . Messaging bus (Peer p) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Fake.hs b/hbs2-core/lib/HBS2/Net/Messaging/Fake.hs index 6c94f4dc..130c95a8 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Fake.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Fake.hs @@ -21,7 +21,7 @@ data FakeP2P peer msg = FakeP2P { blocking :: Bool - , fakeP2p :: Cache (Peer peer) (TChan msg) + , fakeP2p :: Cache (Peer peer) (TChan (From peer,msg)) } newFakeP2P :: Bool -> IO (FakeP2P peer msg) @@ -30,17 +30,12 @@ newFakeP2P block = FakeP2P block <$> Cache.newCache Nothing instance ( (IsPeer peer, Hashable (Peer peer) ) ) => Messaging (FakeP2P peer msg) peer msg where - sendTo bus (To whom) _ msg = liftIO do + sendTo bus (To whom) who msg = liftIO do chan <- Cache.fetchWithCache (fakeP2p bus) whom $ const newTChanIO - atomically $ Chan.writeTChan chan msg + atomically $ Chan.writeTChan chan (who, msg) - -- NOTE: non-blocking version! receive bus (To me) = liftIO do - Cache.fetchWithCache (fakeP2p bus) - me - (const newTChanIO) - - >>= readChan + readChan =<< Cache.fetchWithCache (fakeP2p bus) me (const newTChanIO) where readChan | blocking bus = atomically . (List.singleton <$>) . Chan.readTChan diff --git a/hbs2-core/test/TestAbstractDispatch.hs b/hbs2-core/test/TestAbstractDispatch.hs index c5e30335..38f5e1c6 100644 --- a/hbs2-core/test/TestAbstractDispatch.hs +++ b/hbs2-core/test/TestAbstractDispatch.hs @@ -6,14 +6,17 @@ import HBS2.Net.Proto import HBS2.Net.Messaging import HBS2.Clock -import Data.Foldable -import Data.Functor -import Data.Function +import Control.Concurrent.Async import Data.Cache (Cache) import Data.Cache qualified as Cache -import Data.Kind +import Data.Foldable +import Data.Function +import Data.Functor import Data.Hashable +import Data.Kind import Data.Proxy +import Data.Word +import Prettyprinter import FakeMessaging @@ -50,9 +53,16 @@ data MessageWithCookie p = MessageWithCookie (Cookie p) (Encoded p) -- data Handler p m = forall msg . IsEncoded p msg => - Handler ( msg -> m () ) + Handler ( (From p, Cookie p) -> msg -> m () ) -data Fabrique p = forall bus . Messaging bus (Peer p) (MessageWithCookie p) +-- for convenience +handler :: forall msg m p . (IsEncoded p (msg p)) + => ((From p, Cookie p) -> msg p -> m ()) + -> Handler p m + +handler = Handler + +data Fabrique p = forall bus . Messaging bus p (MessageWithCookie p) => Fabrique bus data Dispatcher p m = @@ -62,7 +72,7 @@ data Dispatcher p m = , fabriq :: Fabrique p -- СЮДОЙ ПИХАТЬ СООБЩЕНИЯ } -newDispatcher :: (MonadIO m, (Messaging bus (Peer p) (MessageWithCookie p))) +newDispatcher :: (MonadIO m, (Messaging bus p (MessageWithCookie p))) => Peer p -> bus -> m (Dispatcher p m) @@ -81,12 +91,13 @@ sendRequest :: forall p msg m. ( MonadIO m ) => Dispatcher p m -> Peer p + -> Maybe (Cookie p) -> msg -> Handler p m -> m () -sendRequest d p msg answ = do - cookie <- genCookie p +sendRequest d p mbCoo msg answ = do + cookie <- maybe (genCookie p) pure mbCoo timeout <- timeoutFor (Proxy @msg) <&> Just . toTimeSpec liftIO $ Cache.insert' (handlers d) timeout cookie answ sendTo (fabriq d) (To p) (From (self d)) (MessageWithCookie cookie (encode msg)) @@ -103,7 +114,7 @@ dispatcher d = fix \next -> do -- FIXME: if receive is blocking - we'll block here forever received <- receive (fabriq d) (To (self d)) - for_ received $ \(MessageWithCookie coo bs) -> do + for_ received $ \(who, MessageWithCookie coo bs) -> do -- поискали в мапе по куке found <- liftIO $ Cache.lookup (handlers d) coo @@ -111,13 +122,55 @@ dispatcher d = fix \next -> do case found of Nothing -> pure () -- NO HANDLER FOUND FOR COOKIE CASE -- декодировали сообщение и пихнули в обработчик - Just (Handler dispatch) -> maybe (pure ()) dispatch (decode bs) + Just (Handler dispatch) -> maybe (pure ()) (dispatch (who,coo)) (decode bs) -- ^^^^^^^^^ CAN NOT DECODE CASE next +data PingPong p = Ping + | Pong + + +newtype instance Cookie Fake = CookieFake Word32 + deriving stock (Eq) + deriving newtype (Hashable,Num,Pretty) + + +instance CookieGenerator Fake IO where + genCookie _ = pure 0 + +instance IsEncoded Fake (PingPong p) where + data instance Encoded Fake = PingPong p + encode = undefined + decode = undefined + +instance Messaging (Fabrique Fake) Fake (MessageWithCookie Fake) where + sendTo = undefined + receive = undefined + +instance HasTimeout (PingPong Fake) IO where + timeoutFor _ = pure 0.1 + testAbstractDispatch :: IO () testAbstractDispatch = do - pure () + + let peers = [1..2] :: [Peer Fake] + + bus <- newFakeP2P @Fake @(MessageWithCookie Fake) True + + for_ peers $ \p -> do + disp <- newDispatcher p bus + dispThread <- async (dispatcher disp) + + for_ [ px | px <- peers, px /= p ] $ \pip -> do + + sendRequest disp pip Nothing (Ping @Fake) $ handler $ \(From who, coo) -> + \case + Pong -> liftIO $ print $ "got pong" <+> brackets (pretty coo) + Ping -> do + liftIO $ print $ "got ping" <+> pretty who <+> brackets (pretty coo) + sendRequest disp who (Just coo) (Pong @Fake) $ handler @PingPong (\_ _ -> pure () ) + + cancel dispThread diff --git a/hbs2-core/test/TestFakeMessaging.hs b/hbs2-core/test/TestFakeMessaging.hs index 8f6b3076..81b2149b 100644 --- a/hbs2-core/test/TestFakeMessaging.hs +++ b/hbs2-core/test/TestFakeMessaging.hs @@ -1,6 +1,5 @@ module TestFakeMessaging where -import HBS2.Net.Proto import HBS2.Net.Messaging import HBS2.Net.Messaging.Fake @@ -8,7 +7,7 @@ import Test.Tasty.HUnit import Control.Monad import Data.Tuple -import Data.Hashable +import Data.Functor import System.Random import Data.IORef import Data.Word @@ -34,7 +33,7 @@ testFakeMessaging1 = do pure ( to, Set.singleton m ) received <- forM peers $ \me -> do - msg <- replicateM 10 $ receive bus (To me) + msg <- replicateM 10 $ receive bus (To me) <&> fmap snd pure ( me, Set.fromList (mconcat msg) ) let s1 = Map.fromListWith (<>) (mconcat sent)