diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 44d557fd..9e6594ae 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -147,6 +147,7 @@ test-suite test , safe , serialise , stm + , streaming , tasty , tasty-hunit , transformers diff --git a/hbs2-core/test/Main.hs b/hbs2-core/test/Main.hs index 626f601f..5186bac5 100644 --- a/hbs2-core/test/Main.hs +++ b/hbs2-core/test/Main.hs @@ -3,6 +3,7 @@ module Main where import TestFakeMessaging import TestActors import TestBlockInfoActor +import TestUniqProtoId import Test.Tasty import Test.Tasty.HUnit @@ -15,6 +16,7 @@ main = testCase "testFakeMessaging1" testFakeMessaging1 , testCase "testActorsBasic" testActorsBasic , testCase "testBlockInfoActor" testBlockInfoActor + , testCase "testUniqProtoId" testUniqProtoId ] diff --git a/hbs2-core/test/TestAbstractDispatch.hs b/hbs2-core/test/TestAbstractDispatch.hs deleted file mode 100644 index 24092965..00000000 --- a/hbs2-core/test/TestAbstractDispatch.hs +++ /dev/null @@ -1,223 +0,0 @@ -{-# Language FunctionalDependencies #-} -module TestAbstractDispatch where - -import HBS2.Prelude -import HBS2.Net.Proto -import HBS2.Net.Messaging -import HBS2.Clock - -import Control.Monad -import Control.Concurrent.Async -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Foldable -import Data.Function -import Data.Functor -import Data.Hashable -import Data.Kind -import Data.Proxy -import Data.Word -import Data.Dynamic -import Prettyprinter -import System.Random qualified as Random --- import GHC.TypeLits --- import Data.Maybe - -import Debug.Trace - -import FakeMessaging - --- newtype Cookie = Cookie Word32 --- deriving stock (Eq,Ord) --- deriving newtype Hashable - -data family SessionType p :: Type - -data family Cookie p :: Type - -class Monad m => CookieGenerator p m where - genCookie :: Hashable s => s -> m (Cookie p) - -class Monad m => HasTimeout msg m where - timeoutFor :: Proxy msg -> m (Timeout 'Seconds) - -class HasCookie p msg | msg -> p where - getCookie :: msg -> Maybe (Cookie p) - -data DefAnswer p = forall msg . (IsEncoded p msg) => DefAnswer msg - -class HasDefAnswer p a | p -> a where - defAnswer :: a -> DefAnswer p - - --- still okay - -type family Encoded p :: Type - -class IsEncoded p msg | msg -> p where - encode :: msg -> Encoded p - decode :: Encoded p -> Maybe msg - --- still okay -data MessageWithCookie p = MessageWithCookie (Cookie p) (Encoded p) - --- ЧТО МЫ ХОТИМ: --- СООБЩЕНИЯ РАЗНЫХ ТИПОВ. ОБРАБАТЫВАТЬ НЕЗАВИСИМО, В РАЗНЫХ ОБРАБОТЧИКАХ --- --- ПОЧЕМУ МЫ ЭТОГО ХОТИМ: --- --- ЧТО БЫ НЕ ДЕЛАТЬ ОДИН СЛОЖНЫЙ ОБРАБОТЧИК С БОЛЬШИМ СТЕЙТОМ --- ТАКОЕ УЖЕ ДЕЛАЛИ, ТАМ ХУЙ НОГУ СЛОМИТ ПОТОМ ОТЛАЖИВАТЬ --- НАДО СДЕЛАТЬ, ЧТО БЫ МОЖНО БЫЛО ОТЛАДИТЬ ПО КУСКАМ --- ТЕМ БОЛЕЕ, ЧТО ОНИ ДРУГ ОТ ДРУГА НЕ ЗАВИСЯТ --- - -data Handler p m = forall msg . IsEncoded p msg => - Handler ( (From p, Cookie p) -> msg -> m () ) - --- for convenience -handler :: forall msg m p . (IsEncoded p (msg p)) - => ((From p, Cookie p) -> msg p -> m ()) - -> Handler p m - -handler = Handler - -data Fabrique p = forall bus . Messaging bus p (MessageWithCookie p) - => Fabrique bus - -data Dispatcher p m = - Dispatcher - { self :: Peer p - , handlers :: Cache (Cookie p) (Handler p m) --- FIXME: class + maybe cookie - , fabriq :: Fabrique p - } - -newDispatcher :: (MonadIO m, (Messaging bus p (MessageWithCookie p))) - => Peer p - -> bus - -> m (Dispatcher p m) - -newDispatcher me bus = do - let fab = Fabrique bus - cache <- liftIO $ Cache.newCache Nothing - pure $ Dispatcher me cache fab - -sendRequest :: forall p msg m. ( MonadIO m - , IsEncoded p msg - , Messaging (Fabrique p) p (MessageWithCookie p) - , Hashable (Cookie p) - , CookieGenerator p m - , HasTimeout msg m - ) - => Dispatcher p m - -> Peer p - -> Maybe (Cookie p) - -> msg - -> Handler p m - -> m () - -sendRequest d p mbCoo msg answ = do - -- liftIO $ print "sending request!" - cookie <- maybe (genCookie p) pure mbCoo - timeout <- timeoutFor (Proxy @msg) <&> Just . toTimeSpec - liftIO $ Cache.insert' (handlers d) Nothing cookie answ - sendTo (fabriq d) (To p) (From (self d)) (MessageWithCookie cookie (encode msg)) - -dispatcher :: forall p m . ( MonadIO m, Hashable (Cookie p), Pretty (Cookie p), Pretty (Peer p) - , Messaging (Fabrique p) p (MessageWithCookie p) - , HasDefAnswer p (Cookie p) - ) - => Dispatcher p m - -> m () - -dispatcher d = fix \next -> do - - -- FIXME: if receive is non-blocking we'll get a busy loop - -- FIXME: if receive is blocking - we'll block here forever - received <- receive (fabriq d) (To (self d)) - - for_ received $ \(who@(From peer), MessageWithCookie coo bs) -> do - - -- поискали в мапе по куке - found <- liftIO $ Cache.lookup (handlers d) coo - - case found of - Nothing -> do - case defAnswer @p coo of - DefAnswer msg -> do - sendTo (fabriq d) (To peer) (From (self d)) (MessageWithCookie coo (encode msg)) - - Just (Handler dispatch) -> maybe (pure ()) (dispatch (who,coo)) (decode bs) - -- ^^^^^^^^^^^^^^ CAN NOT DECODE CASE - - next - - -data PingPong p = Ping - | Pong - deriving stock (Typeable) - -data instance SessionType Fake = - PingPongSession - deriving stock (Eq,Ord,Enum) - -newtype instance Cookie Fake = CookieFake Word32 - deriving stock (Eq) - deriving newtype (Hashable,Num,Pretty) - - -instance CookieGenerator Fake IO where - genCookie s = do - i <- Random.randomIO :: IO Int - pure $ fromIntegral $ hash (i + hash s) - -type instance Encoded Fake = Dynamic - -instance Typeable (PingPong p) => IsEncoded Fake (PingPong p) where - encode = toDyn - decode = fromDynamic - -instance Messaging (Fabrique Fake) Fake (MessageWithCookie Fake) where - sendTo (Fabrique bus) = sendTo bus - receive (Fabrique bus) = receive bus - -instance HasTimeout (PingPong Fake) IO where - timeoutFor _ = pure 1 - -instance HasDefAnswer Fake (Cookie Fake) where - defAnswer _ = DefAnswer (Pong @Fake) - -testAbstractDispatch :: IO () -testAbstractDispatch = do - - let peers = [1..2] :: [Peer Fake] - - bus <- newFakeP2P @Fake @(MessageWithCookie Fake) True - - threads <- forM peers $ \p -> do - disp <- newDispatcher p bus - dispThread <- async (dispatcher disp) - - for_ [ px | px <- peers, px /= p ] $ \pip -> do - - -- liftIO $ print "sending ping" - sendRequest disp pip Nothing (Ping @Fake) $ handler @PingPong $ \(From who, coo) -> - \case - Ping -> pure () -- we do not expect ping here - - Pong -> do - - liftIO $ print $ "got pong 2" <+> pretty who - <+> "->" - <+> pretty (self disp) - <+> brackets (pretty coo) - - pure dispThread - - -- -- pure dispThread - pause ( 1 :: Timeout 'Seconds) - - mapM_ cancel threads - - void $ waitAnyCatchCancel threads - diff --git a/hbs2-core/test/TestBlockInfoActor.hs b/hbs2-core/test/TestBlockInfoActor.hs index 321ef3d6..43246e94 100644 --- a/hbs2-core/test/TestBlockInfoActor.hs +++ b/hbs2-core/test/TestBlockInfoActor.hs @@ -8,13 +8,12 @@ import HBS2.Net.PeerLocator import HBS2.Net.PeerLocator.Static import FakeMessaging +import HasProtocol import Test.Tasty.HUnit import Test.QuickCheck import Data.Word -import Data.Hashable (Hashable) -import Data.ByteString (ByteString) import Data.ByteString qualified as B import Control.Concurrent.Async diff --git a/hbs2-core/test/TestUniqProtoId.hs b/hbs2-core/test/TestUniqProtoId.hs index b97e890e..14ec25f7 100644 --- a/hbs2-core/test/TestUniqProtoId.hs +++ b/hbs2-core/test/TestUniqProtoId.hs @@ -4,29 +4,40 @@ module TestUniqProtoId where import HBS2.Prelude import HBS2.Prelude.Plated +import HBS2.Clock import HasProtocol import FakeMessaging +import Test.Tasty.HUnit + import Data.ByteString.Lazy (ByteString) import Control.Concurrent.Async import Codec.Serialise hiding (encode,decode) import System.IO +import Control.Concurrent.STM.TQueue qualified as Q +-- import Control.Concurrent.STM.TQueue () +import Control.Concurrent.STM + import Prettyprinter hiding (pipe) debug :: (MonadIO m) => Doc ann -> m () debug p = liftIO $ hPrint stderr p + +dump :: MonadIO m => TQueue a -> a -> m () +dump q x = liftIO $ atomically $ Q.writeTQueue q x + data PingPong e = Ping Int | Pong Int - deriving stock (Generic,Show,Read) + deriving stock (Eq,Generic,Show,Read) data PeekPoke e = Peek Int | Poke Int | Nop - deriving stock (Generic,Show,Read) + deriving stock (Eq,Generic,Show,Read) instance Serialise (PingPong e) @@ -49,33 +60,43 @@ pingPongHandler :: forall e m . ( MonadIO m , Response e (PingPong e) m , HasProtocol e (PingPong e) ) - => PingPong e + => TQueue (PingPong e) + -> PingPong e -> m () -pingPongHandler = +pingPongHandler q = \case - Ping c -> debug ("effect: PING" <+> pretty c) >> response (Pong @e c) - Pong c -> debug ( "effect: PONG" <+> pretty c) >> response (Ping @e (succ c)) + + Ping c -> dump q (Ping c) >> response (Pong @e c) + + Pong c | c < 100 -> dump q (Pong c) >> response (Ping @e (succ c)) + | otherwise -> dump q (Pong c) peekPokeHandler :: forall e m . ( MonadIO m , Response e (PeekPoke e) m , HasProtocol e (PeekPoke e) ) - => PeekPoke e + => TQueue (PeekPoke e) + -> PeekPoke e -> m () -peekPokeHandler = +peekPokeHandler q = \case - Peek c -> debug ("effect: Peek" <+> pretty c) >> response (Poke @e c) - Poke c -> debug ("effect: Poke" <+> pretty c) >> response (Nop @e) - Nop -> debug "effect: Nop" >> response (Peek @e 1) - + Peek c -> dump q (Peek c) >> response (Poke @e (succ c)) + Poke c -> dump q (Poke c) >> response (Nop @e) + Nop -> dump q Nop testUniqProtoId :: IO () testUniqProtoId = do hSetBuffering stderr LineBuffering + qpg0 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake)) + qpp0 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake)) + + qpg1 <- Q.newTQueueIO :: IO (TQueue (PingPong Fake)) + qpp1 <- Q.newTQueueIO :: IO (TQueue (PeekPoke Fake)) + fake <- newFakeP2P True let peer0 = FakePeer 0 @@ -84,26 +105,43 @@ testUniqProtoId = do env0 <- newEnv peer0 fake env1 <- newEnv peer1 fake - runEngineM env0 $ do - request peer1 (Ping @Fake 0) + race (pause (0.25 :: Timeout 'Seconds)) $ do - runEngineM env1 $ do - request peer0 (Peek @Fake 0) + runEngineM env0 $ do + request peer1 (Ping @Fake 0) - pip1 <- async $ - runPeer env0 - [ makeResponse pingPongHandler - , makeResponse peekPokeHandler - ] + runEngineM env1 $ do + request peer0 (Peek @Fake 0) - pip2 <- async $ - runPeer env1 - [ makeResponse pingPongHandler - , makeResponse peekPokeHandler - ] + pip1 <- async $ + runPeer env0 + [ makeResponse (pingPongHandler qpg0) + , makeResponse (peekPokeHandler qpp0) + ] - (_, e) <- waitAnyCatchCancel [pip1, pip2] + pip2 <- async $ + runPeer env1 + [ makeResponse (pingPongHandler qpg1) + , makeResponse (peekPokeHandler qpp1) + ] - print e + pause (0.10 :: Timeout 'Seconds) + debug "stopping threads" + + mapM_ cancel [pip1, pip2] + + void $ waitAnyCatchCancel [pip1, pip2] + + ping0 <- atomically $ Q.flushTQueue qpg0 + ping1 <- atomically $ Q.flushTQueue qpg1 + p0 <- atomically $ Q.flushTQueue qpp0 + p1 <- atomically $ Q.flushTQueue qpp1 + + assertEqual "ping0" ping0 [ Pong i | i <- [0..100] ] + assertEqual "ping1" ping1 [ Ping i | i <- [0..100] ] + assertEqual "p0" p0 [ Peek 0, Nop ] + assertEqual "p1" p1 [ Poke 1 ] + + debug "we're done"