new HBS2.Net.Proto and shit

This commit is contained in:
Dmitry Zuikov 2023-01-17 08:16:48 +03:00
parent a9f0141f87
commit c3f72d9727
5 changed files with 70 additions and 253 deletions

View File

@ -147,6 +147,7 @@ test-suite test
, safe
, serialise
, stm
, streaming
, tasty
, tasty-hunit
, transformers

View File

@ -3,6 +3,7 @@ module Main where
import TestFakeMessaging
import TestActors
import TestBlockInfoActor
import TestUniqProtoId
import Test.Tasty
import Test.Tasty.HUnit
@ -15,6 +16,7 @@ main =
testCase "testFakeMessaging1" testFakeMessaging1
, testCase "testActorsBasic" testActorsBasic
, testCase "testBlockInfoActor" testBlockInfoActor
, testCase "testUniqProtoId" testUniqProtoId
]

View File

@ -1,223 +0,0 @@
{-# Language FunctionalDependencies #-}
module TestAbstractDispatch where
import HBS2.Prelude
import HBS2.Net.Proto
import HBS2.Net.Messaging
import HBS2.Clock
import Control.Monad
import Control.Concurrent.Async
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable
import Data.Function
import Data.Functor
import Data.Hashable
import Data.Kind
import Data.Proxy
import Data.Word
import Data.Dynamic
import Prettyprinter
import System.Random qualified as Random
-- import GHC.TypeLits
-- import Data.Maybe
import Debug.Trace
import FakeMessaging
-- newtype Cookie = Cookie Word32
-- deriving stock (Eq,Ord)
-- deriving newtype Hashable
data family SessionType p :: Type
data family Cookie p :: Type
class Monad m => CookieGenerator p m where
genCookie :: Hashable s => s -> m (Cookie p)
class Monad m => HasTimeout msg m where
timeoutFor :: Proxy msg -> m (Timeout 'Seconds)
class HasCookie p msg | msg -> p where
getCookie :: msg -> Maybe (Cookie p)
data DefAnswer p = forall msg . (IsEncoded p msg) => DefAnswer msg
class HasDefAnswer p a | p -> a where
defAnswer :: a -> DefAnswer p
-- still okay
type family Encoded p :: Type
class IsEncoded p msg | msg -> p where
encode :: msg -> Encoded p
decode :: Encoded p -> Maybe msg
-- still okay
data MessageWithCookie p = MessageWithCookie (Cookie p) (Encoded p)
-- ЧТО МЫ ХОТИМ:
-- СООБЩЕНИЯ РАЗНЫХ ТИПОВ. ОБРАБАТЫВАТЬ НЕЗАВИСИМО, В РАЗНЫХ ОБРАБОТЧИКАХ
--
-- ПОЧЕМУ МЫ ЭТОГО ХОТИМ:
--
-- ЧТО БЫ НЕ ДЕЛАТЬ ОДИН СЛОЖНЫЙ ОБРАБОТЧИК С БОЛЬШИМ СТЕЙТОМ
-- ТАКОЕ УЖЕ ДЕЛАЛИ, ТАМ ХУЙ НОГУ СЛОМИТ ПОТОМ ОТЛАЖИВАТЬ
-- НАДО СДЕЛАТЬ, ЧТО БЫ МОЖНО БЫЛО ОТЛАДИТЬ ПО КУСКАМ
-- ТЕМ БОЛЕЕ, ЧТО ОНИ ДРУГ ОТ ДРУГА НЕ ЗАВИСЯТ
--
data Handler p m = forall msg . IsEncoded p msg =>
Handler ( (From p, Cookie p) -> msg -> m () )
-- for convenience
handler :: forall msg m p . (IsEncoded p (msg p))
=> ((From p, Cookie p) -> msg p -> m ())
-> Handler p m
handler = Handler
data Fabrique p = forall bus . Messaging bus p (MessageWithCookie p)
=> Fabrique bus
data Dispatcher p m =
Dispatcher
{ self :: Peer p
, handlers :: Cache (Cookie p) (Handler p m) --- FIXME: class + maybe cookie
, fabriq :: Fabrique p
}
newDispatcher :: (MonadIO m, (Messaging bus p (MessageWithCookie p)))
=> Peer p
-> bus
-> m (Dispatcher p m)
newDispatcher me bus = do
let fab = Fabrique bus
cache <- liftIO $ Cache.newCache Nothing
pure $ Dispatcher me cache fab
sendRequest :: forall p msg m. ( MonadIO m
, IsEncoded p msg
, Messaging (Fabrique p) p (MessageWithCookie p)
, Hashable (Cookie p)
, CookieGenerator p m
, HasTimeout msg m
)
=> Dispatcher p m
-> Peer p
-> Maybe (Cookie p)
-> msg
-> Handler p m
-> m ()
sendRequest d p mbCoo msg answ = do
-- liftIO $ print "sending request!"
cookie <- maybe (genCookie p) pure mbCoo
timeout <- timeoutFor (Proxy @msg) <&> Just . toTimeSpec
liftIO $ Cache.insert' (handlers d) Nothing cookie answ
sendTo (fabriq d) (To p) (From (self d)) (MessageWithCookie cookie (encode msg))
dispatcher :: forall p m . ( MonadIO m, Hashable (Cookie p), Pretty (Cookie p), Pretty (Peer p)
, Messaging (Fabrique p) p (MessageWithCookie p)
, HasDefAnswer p (Cookie p)
)
=> Dispatcher p m
-> m ()
dispatcher d = fix \next -> do
-- FIXME: if receive is non-blocking we'll get a busy loop
-- FIXME: if receive is blocking - we'll block here forever
received <- receive (fabriq d) (To (self d))
for_ received $ \(who@(From peer), MessageWithCookie coo bs) -> do
-- поискали в мапе по куке
found <- liftIO $ Cache.lookup (handlers d) coo
case found of
Nothing -> do
case defAnswer @p coo of
DefAnswer msg -> do
sendTo (fabriq d) (To peer) (From (self d)) (MessageWithCookie coo (encode msg))
Just (Handler dispatch) -> maybe (pure ()) (dispatch (who,coo)) (decode bs)
-- ^^^^^^^^^^^^^^ CAN NOT DECODE CASE
next
data PingPong p = Ping
| Pong
deriving stock (Typeable)
data instance SessionType Fake =
PingPongSession
deriving stock (Eq,Ord,Enum)
newtype instance Cookie Fake = CookieFake Word32
deriving stock (Eq)
deriving newtype (Hashable,Num,Pretty)
instance CookieGenerator Fake IO where
genCookie s = do
i <- Random.randomIO :: IO Int
pure $ fromIntegral $ hash (i + hash s)
type instance Encoded Fake = Dynamic
instance Typeable (PingPong p) => IsEncoded Fake (PingPong p) where
encode = toDyn
decode = fromDynamic
instance Messaging (Fabrique Fake) Fake (MessageWithCookie Fake) where
sendTo (Fabrique bus) = sendTo bus
receive (Fabrique bus) = receive bus
instance HasTimeout (PingPong Fake) IO where
timeoutFor _ = pure 1
instance HasDefAnswer Fake (Cookie Fake) where
defAnswer _ = DefAnswer (Pong @Fake)
testAbstractDispatch :: IO ()
testAbstractDispatch = do
let peers = [1..2] :: [Peer Fake]
bus <- newFakeP2P @Fake @(MessageWithCookie Fake) True
threads <- forM peers $ \p -> do
disp <- newDispatcher p bus
dispThread <- async (dispatcher disp)
for_ [ px | px <- peers, px /= p ] $ \pip -> do
-- liftIO $ print "sending ping"
sendRequest disp pip Nothing (Ping @Fake) $ handler @PingPong $ \(From who, coo) ->
\case
Ping -> pure () -- we do not expect ping here
Pong -> do
liftIO $ print $ "got pong 2" <+> pretty who
<+> "->"
<+> pretty (self disp)
<+> brackets (pretty coo)
pure dispThread
-- -- pure dispThread
pause ( 1 :: Timeout 'Seconds)
mapM_ cancel threads
void $ waitAnyCatchCancel threads

View File

@ -8,13 +8,12 @@ import HBS2.Net.PeerLocator
import HBS2.Net.PeerLocator.Static
import FakeMessaging
import HasProtocol
import Test.Tasty.HUnit
import Test.QuickCheck
import Data.Word
import Data.Hashable (Hashable)
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Control.Concurrent.Async

View File

@ -4,29 +4,40 @@ module TestUniqProtoId where
import HBS2.Prelude
import HBS2.Prelude.Plated
import HBS2.Clock
import HasProtocol
import FakeMessaging
import Test.Tasty.HUnit
import Data.ByteString.Lazy (ByteString)
import Control.Concurrent.Async
import Codec.Serialise hiding (encode,decode)
import System.IO
import Control.Concurrent.STM.TQueue qualified as Q
-- import Control.Concurrent.STM.TQueue ()
import Control.Concurrent.STM
import Prettyprinter hiding (pipe)
debug :: (MonadIO m) => Doc ann -> m ()
debug p = liftIO $ hPrint stderr p
dump :: MonadIO m => TQueue a -> a -> m ()
dump q x = liftIO $ atomically $ Q.writeTQueue q x
data PingPong e = Ping Int
| Pong Int
deriving stock (Generic,Show,Read)
deriving stock (Eq,Generic,Show,Read)
data PeekPoke e = Peek Int
| Poke Int
| Nop
deriving stock (Generic,Show,Read)
deriving stock (Eq,Generic,Show,Read)
instance Serialise (PingPong e)
@ -49,33 +60,43 @@ pingPongHandler :: forall e m . ( MonadIO m
, Response e (PingPong e) m
, HasProtocol e (PingPong e)
)
=> PingPong e
=> TQueue (PingPong e)
-> PingPong e
-> m ()
pingPongHandler =
pingPongHandler q =
\case
Ping c -> debug ("effect: PING" <+> pretty c) >> response (Pong @e c)
Pong c -> debug ( "effect: PONG" <+> pretty c) >> response (Ping @e (succ c))
Ping c -> dump q (Ping c) >> response (Pong @e c)
Pong c | c < 100 -> dump q (Pong c) >> response (Ping @e (succ c))
| otherwise -> dump q (Pong c)
peekPokeHandler :: forall e m . ( MonadIO m
, Response e (PeekPoke e) m
, HasProtocol e (PeekPoke e)
)
=> PeekPoke e
=> TQueue (PeekPoke e)
-> PeekPoke e
-> m ()
peekPokeHandler =
peekPokeHandler q =
\case
Peek c -> debug ("effect: Peek" <+> pretty c) >> response (Poke @e c)
Poke c -> debug ("effect: Poke" <+> pretty c) >> response (Nop @e)
Nop -> debug "effect: Nop" >> response (Peek @e 1)
Peek c -> dump q (Peek c) >> response (Poke @e (succ c))
Poke c -> dump q (Poke c) >> response (Nop @e)
Nop -> dump q Nop
testUniqProtoId :: IO ()
testUniqProtoId = do
hSetBuffering stderr LineBuffering
qpg0 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake))
qpp0 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake))
qpg1 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake))
qpp1 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake))
fake <- newFakeP2P True
let peer0 = FakePeer 0
@ -84,6 +105,8 @@ testUniqProtoId = do
env0 <- newEnv peer0 fake
env1 <- newEnv peer1 fake
race (pause (0.25 :: Timeout 'Seconds)) $ do
runEngineM env0 $ do
request peer1 (Ping @Fake 0)
@ -92,18 +115,33 @@ testUniqProtoId = do
pip1 <- async $
runPeer env0
[ makeResponse pingPongHandler
, makeResponse peekPokeHandler
[ makeResponse (pingPongHandler qpg0)
, makeResponse (peekPokeHandler qpp0)
]
pip2 <- async $
runPeer env1
[ makeResponse pingPongHandler
, makeResponse peekPokeHandler
[ makeResponse (pingPongHandler qpg1)
, makeResponse (peekPokeHandler qpp1)
]
(_, e) <- waitAnyCatchCancel [pip1, pip2]
pause (0.10 :: Timeout 'Seconds)
print e
debug "stopping threads"
mapM_ cancel [pip1, pip2]
void $ waitAnyCatchCancel [pip1, pip2]
ping0 <- atomically $ Q.flushTQueue qpg0
ping1 <- atomically $ Q.flushTQueue qpg1
p0 <- atomically $ Q.flushTQueue qpp0
p1 <- atomically $ Q.flushTQueue qpp1
assertEqual "ping0" ping0 [ Pong i | i <- [0..100] ]
assertEqual "ping1" ping1 [ Ping i | i <- [0..100] ]
assertEqual "p0" p0 [ Peek 0, Nop ]
assertEqual "p1" p1 [ Poke 1 ]
debug "we're done"