From dc44776950fae6576d0ef09927454ff12f6a6efc Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Tue, 17 Jan 2023 14:32:51 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Actors.hs | 5 +-- hbs2-core/lib/HBS2/Defaults.hs | 4 +++ hbs2-core/lib/HBS2/Net/Peer.hs | 48 +++++++++++++++++++-------- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 4 ++- hbs2-tests/test/Main.hs | 14 +++++--- 5 files changed, 54 insertions(+), 21 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors.hs b/hbs2-core/lib/HBS2/Actors.hs index 56b836b0..e27ffdc1 100644 --- a/hbs2-core/lib/HBS2/Actors.hs +++ b/hbs2-core/lib/HBS2/Actors.hs @@ -16,6 +16,7 @@ import Control.Concurrent.STM.TVar qualified as TVar import Control.Monad import Data.Function import Data.Functor +import Data.Kind data Pipeline m a = Pipeline @@ -23,7 +24,7 @@ data Pipeline m a = , toQueue :: TBMQueue ( m a ) } -newPipeline :: forall a m . MonadIO m => Int -> m (Pipeline m a) +newPipeline :: forall a (m :: Type -> Type) . MonadIO m => Int -> m (Pipeline m a) newPipeline size = do tv <- liftIO $ TVar.newTVarIO False liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv @@ -47,7 +48,7 @@ stopPipeline pip = liftIO $ do else do pause ( 0.01 :: Timeout 'Seconds) >> next -addJob :: forall a m . MonadIO m => Pipeline m a -> m a -> m () +addJob :: forall a m m1 . (MonadIO m, MonadIO m1) => Pipeline m a -> m a -> m1 () addJob pip act = liftIO $ do doWrite <- atomically $ TVar.readTVar ( stopAdding pip ) unless doWrite $ do diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 39609578..69bad7d5 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -11,3 +11,7 @@ defBlockSize = 256 * 1024 defStorePath :: IsString a => a defStorePath = "hbs2" +defPipelineSize :: Int +defPipelineSize = 100 + + diff --git a/hbs2-core/lib/HBS2/Net/Peer.hs b/hbs2-core/lib/HBS2/Net/Peer.hs index b7609dab..ba9ecf15 100644 --- a/hbs2-core/lib/HBS2/Net/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Peer.hs @@ -7,6 +7,8 @@ import HBS2.Prelude.Plated import HBS2.Net.Proto import HBS2.Net.Messaging import HBS2.Clock +import HBS2.Actors +import HBS2.Defaults import Lens.Micro.Platform import Data.ByteString.Lazy ( ByteString ) @@ -15,6 +17,7 @@ import Control.Monad.Reader import Data.Map qualified as Map import GHC.TypeLits import Control.Monad.Trans.Maybe +import Control.Concurrent.Async import Codec.Serialise hiding (encode,decode) @@ -23,13 +26,25 @@ data AnyMessage e = AnyMessage Integer (Encoded e) instance Serialise (Encoded e) => Serialise (AnyMessage e) -data EngineEnv e = forall bus . ( Messaging bus e ByteString - , Serialise (Encoded e) - ) => +newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } + deriving ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (EngineEnv e) + , MonadTrans + ) + + +data EngineEnv e = forall bus m . ( Messaging bus e ByteString + , Serialise (Encoded e) + , MonadIO m + ) => EngineEnv { _peer :: Maybe (Peer e) , _self :: Peer e , bus :: bus + , defer :: Pipeline m () } makeLenses 'EngineEnv @@ -57,14 +72,6 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) } -newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } - deriving ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (EngineEnv e) - ) - runEngineM :: EngineEnv e -> EngineM e m a -> m a runEngineM e f = runReaderT (fromEngine f) e @@ -78,6 +85,13 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where liftIO $ sendTo b (To p) (From s) bs instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where + + deferred _ m = do + e@(EngineEnv { defer = d }) <- ask + lift $ runEngineM e m + pure () + -- lift $ addJob d (lift $ runEngineM e m) + response resp = do env <- ask let proto = protoId @e @p (Proxy @p) @@ -92,6 +106,7 @@ instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where newEnv :: forall e bus m . ( Monad m + , MonadIO m , Messaging bus e ByteString , Serialise (Encoded e) ) @@ -99,8 +114,9 @@ newEnv :: forall e bus m . ( Monad m -> bus -> m (EngineEnv e) -newEnv p pipe = pure $ EngineEnv Nothing p pipe - +newEnv p pipe = do + de <- liftIO $ newPipeline defPipelineSize + pure $ EngineEnv Nothing p pipe de runPeer :: forall e m a . ( MonadIO m ) @@ -108,7 +124,7 @@ runPeer :: forall e m a . ( MonadIO m -> [AnyProtocol e (EngineM e m)] -> m a -runPeer env@(EngineEnv {bus = pipe}) hh = do +runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do let me = env ^. self @@ -116,8 +132,12 @@ runPeer env@(EngineEnv {bus = pipe}) hh = do let disp = Map.fromList resp + -- let q = liftIO $ runPipeline d + runEngineM env $ do + -- void $ liftIO $ runPipeline d + forever $ do messages <- receive pipe (To me) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index adc4325b..3ee44ed4 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -8,6 +8,7 @@ import Data.Kind import GHC.TypeLits import Data.Proxy import Data.Hashable +import Control.Monad.IO.Class -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -16,8 +17,9 @@ class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where data family (Peer e) :: Type -class Response e p (m :: Type -> Type) | p -> e where +class MonadIO m => Response e p (m :: Type -> Type) | p -> e where response :: p -> m () + deferred :: Proxy p -> m () -> m () class Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () diff --git a/hbs2-tests/test/Main.hs b/hbs2-tests/test/Main.hs index cb99d690..7ccf377a 100644 --- a/hbs2-tests/test/Main.hs +++ b/hbs2-tests/test/Main.hs @@ -9,6 +9,7 @@ import HBS2.Net.Messaging.Fake import HBS2.Net.Peer import HBS2.Storage.Simple import HBS2.Storage.Simple.Extra +import HBS2.Actors -- import Test.Tasty hiding (Timeout) -- import Test.Tasty.HUnit hiding (Timeout) @@ -74,10 +75,13 @@ blockSizeHandler s = -- TODO: STORAGE: seek for block -- TODO: defer answer (?) - hasBlock s h >>= \case - Just size -> response (BlockSize @e h size) - Nothing -> response (NoBlock @e h) + -- TODO: does it really work? + deferred (Proxy @(BlockSize e))$ do + hasBlock s h >>= \case + Just size -> response (BlockSize @e h size) + Nothing -> response (NoBlock @e h) + -- deferred (Proxy @(BlockSize e)) $ do NoBlock h -> debug $ "NoBlock" <+> pretty h BlockSize h sz -> debug $ "BlockSize" <+> pretty h <+> pretty sz @@ -154,7 +158,9 @@ test1 = do mapM_ wait peerz - void $ waitAnyCatchCancel peerz + (_, e) <- waitAnyCatchCancel peerz + + debug (pretty $ show e) debug "we're done"