so far so good

This commit is contained in:
Dmitry Zuikov 2023-01-15 13:09:42 +03:00
parent 28731b0a4f
commit c3891dbd11
4 changed files with 72 additions and 26 deletions

View File

@ -12,8 +12,7 @@ newtype To a = To (Peer a)
class IsPeer addr => Messaging bus addr msg | bus -> addr, bus -> msg where
sendTo :: MonadIO m => bus -> To addr -> From addr -> msg -> m ()
receive :: MonadIO m => bus -> To addr -> m [msg]
receive :: MonadIO m => bus -> To addr -> m [(From addr, msg)]
-- data AnyMessaging p m = forall bus . Messaging bus (Peer p)

View File

@ -21,7 +21,7 @@ data FakeP2P peer msg =
FakeP2P
{
blocking :: Bool
, fakeP2p :: Cache (Peer peer) (TChan msg)
, fakeP2p :: Cache (Peer peer) (TChan (From peer,msg))
}
newFakeP2P :: Bool -> IO (FakeP2P peer msg)
@ -30,17 +30,12 @@ newFakeP2P block = FakeP2P block <$> Cache.newCache Nothing
instance ( (IsPeer peer, Hashable (Peer peer) )
) => Messaging (FakeP2P peer msg) peer msg where
sendTo bus (To whom) _ msg = liftIO do
sendTo bus (To whom) who msg = liftIO do
chan <- Cache.fetchWithCache (fakeP2p bus) whom $ const newTChanIO
atomically $ Chan.writeTChan chan msg
atomically $ Chan.writeTChan chan (who, msg)
-- NOTE: non-blocking version!
receive bus (To me) = liftIO do
Cache.fetchWithCache (fakeP2p bus)
me
(const newTChanIO)
>>= readChan
readChan =<< Cache.fetchWithCache (fakeP2p bus) me (const newTChanIO)
where
readChan | blocking bus = atomically . (List.singleton <$>) . Chan.readTChan

View File

@ -6,14 +6,17 @@ import HBS2.Net.Proto
import HBS2.Net.Messaging
import HBS2.Clock
import Data.Foldable
import Data.Functor
import Data.Function
import Control.Concurrent.Async
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Kind
import Data.Foldable
import Data.Function
import Data.Functor
import Data.Hashable
import Data.Kind
import Data.Proxy
import Data.Word
import Prettyprinter
import FakeMessaging
@ -50,9 +53,16 @@ data MessageWithCookie p = MessageWithCookie (Cookie p) (Encoded p)
--
data Handler p m = forall msg . IsEncoded p msg =>
Handler ( msg -> m () )
Handler ( (From p, Cookie p) -> msg -> m () )
data Fabrique p = forall bus . Messaging bus (Peer p) (MessageWithCookie p)
-- for convenience
handler :: forall msg m p . (IsEncoded p (msg p))
=> ((From p, Cookie p) -> msg p -> m ())
-> Handler p m
handler = Handler
data Fabrique p = forall bus . Messaging bus p (MessageWithCookie p)
=> Fabrique bus
data Dispatcher p m =
@ -62,7 +72,7 @@ data Dispatcher p m =
, fabriq :: Fabrique p -- СЮДОЙ ПИХАТЬ СООБЩЕНИЯ
}
newDispatcher :: (MonadIO m, (Messaging bus (Peer p) (MessageWithCookie p)))
newDispatcher :: (MonadIO m, (Messaging bus p (MessageWithCookie p)))
=> Peer p
-> bus
-> m (Dispatcher p m)
@ -81,12 +91,13 @@ sendRequest :: forall p msg m. ( MonadIO m
)
=> Dispatcher p m
-> Peer p
-> Maybe (Cookie p)
-> msg
-> Handler p m
-> m ()
sendRequest d p msg answ = do
cookie <- genCookie p
sendRequest d p mbCoo msg answ = do
cookie <- maybe (genCookie p) pure mbCoo
timeout <- timeoutFor (Proxy @msg) <&> Just . toTimeSpec
liftIO $ Cache.insert' (handlers d) timeout cookie answ
sendTo (fabriq d) (To p) (From (self d)) (MessageWithCookie cookie (encode msg))
@ -103,7 +114,7 @@ dispatcher d = fix \next -> do
-- FIXME: if receive is blocking - we'll block here forever
received <- receive (fabriq d) (To (self d))
for_ received $ \(MessageWithCookie coo bs) -> do
for_ received $ \(who, MessageWithCookie coo bs) -> do
-- поискали в мапе по куке
found <- liftIO $ Cache.lookup (handlers d) coo
@ -111,13 +122,55 @@ dispatcher d = fix \next -> do
case found of
Nothing -> pure () -- NO HANDLER FOUND FOR COOKIE CASE
-- декодировали сообщение и пихнули в обработчик
Just (Handler dispatch) -> maybe (pure ()) dispatch (decode bs)
Just (Handler dispatch) -> maybe (pure ()) (dispatch (who,coo)) (decode bs)
-- ^^^^^^^^^ CAN NOT DECODE CASE
next
data PingPong p = Ping
| Pong
newtype instance Cookie Fake = CookieFake Word32
deriving stock (Eq)
deriving newtype (Hashable,Num,Pretty)
instance CookieGenerator Fake IO where
genCookie _ = pure 0
instance IsEncoded Fake (PingPong p) where
data instance Encoded Fake = PingPong p
encode = undefined
decode = undefined
instance Messaging (Fabrique Fake) Fake (MessageWithCookie Fake) where
sendTo = undefined
receive = undefined
instance HasTimeout (PingPong Fake) IO where
timeoutFor _ = pure 0.1
testAbstractDispatch :: IO ()
testAbstractDispatch = do
pure ()
let peers = [1..2] :: [Peer Fake]
bus <- newFakeP2P @Fake @(MessageWithCookie Fake) True
for_ peers $ \p -> do
disp <- newDispatcher p bus
dispThread <- async (dispatcher disp)
for_ [ px | px <- peers, px /= p ] $ \pip -> do
sendRequest disp pip Nothing (Ping @Fake) $ handler $ \(From who, coo) ->
\case
Pong -> liftIO $ print $ "got pong" <+> brackets (pretty coo)
Ping -> do
liftIO $ print $ "got ping" <+> pretty who <+> brackets (pretty coo)
sendRequest disp who (Just coo) (Pong @Fake) $ handler @PingPong (\_ _ -> pure () )
cancel dispThread

View File

@ -1,6 +1,5 @@
module TestFakeMessaging where
import HBS2.Net.Proto
import HBS2.Net.Messaging
import HBS2.Net.Messaging.Fake
@ -8,7 +7,7 @@ import Test.Tasty.HUnit
import Control.Monad
import Data.Tuple
import Data.Hashable
import Data.Functor
import System.Random
import Data.IORef
import Data.Word
@ -34,7 +33,7 @@ testFakeMessaging1 = do
pure ( to, Set.singleton m )
received <- forM peers $ \me -> do
msg <- replicateM 10 $ receive bus (To me)
msg <- replicateM 10 $ receive bus (To me) <&> fmap snd
pure ( me, Set.fromList (mconcat msg) )
let s1 = Map.fromListWith (<>) (mconcat sent)