This commit is contained in:
Dmitry Zuikov 2023-01-17 14:32:51 +03:00
parent d5ce8000a1
commit dc44776950
5 changed files with 54 additions and 21 deletions

View File

@ -16,6 +16,7 @@ import Control.Concurrent.STM.TVar qualified as TVar
import Control.Monad import Control.Monad
import Data.Function import Data.Function
import Data.Functor import Data.Functor
import Data.Kind
data Pipeline m a = data Pipeline m a =
Pipeline Pipeline
@ -23,7 +24,7 @@ data Pipeline m a =
, toQueue :: TBMQueue ( m a ) , toQueue :: TBMQueue ( m a )
} }
newPipeline :: forall a m . MonadIO m => Int -> m (Pipeline m a) newPipeline :: forall a (m :: Type -> Type) . MonadIO m => Int -> m (Pipeline m a)
newPipeline size = do newPipeline size = do
tv <- liftIO $ TVar.newTVarIO False tv <- liftIO $ TVar.newTVarIO False
liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv liftIO $ TBMQ.newTBMQueueIO size <&> Pipeline tv
@ -47,7 +48,7 @@ stopPipeline pip = liftIO $ do
else do else do
pause ( 0.01 :: Timeout 'Seconds) >> next pause ( 0.01 :: Timeout 'Seconds) >> next
addJob :: forall a m . MonadIO m => Pipeline m a -> m a -> m () addJob :: forall a m m1 . (MonadIO m, MonadIO m1) => Pipeline m a -> m a -> m1 ()
addJob pip act = liftIO $ do addJob pip act = liftIO $ do
doWrite <- atomically $ TVar.readTVar ( stopAdding pip ) doWrite <- atomically $ TVar.readTVar ( stopAdding pip )
unless doWrite $ do unless doWrite $ do

View File

@ -11,3 +11,7 @@ defBlockSize = 256 * 1024
defStorePath :: IsString a => a defStorePath :: IsString a => a
defStorePath = "hbs2" defStorePath = "hbs2"
defPipelineSize :: Int
defPipelineSize = 100

View File

@ -7,6 +7,8 @@ import HBS2.Prelude.Plated
import HBS2.Net.Proto import HBS2.Net.Proto
import HBS2.Net.Messaging import HBS2.Net.Messaging
import HBS2.Clock import HBS2.Clock
import HBS2.Actors
import HBS2.Defaults
import Lens.Micro.Platform import Lens.Micro.Platform
import Data.ByteString.Lazy ( ByteString ) import Data.ByteString.Lazy ( ByteString )
@ -15,6 +17,7 @@ import Control.Monad.Reader
import Data.Map qualified as Map import Data.Map qualified as Map
import GHC.TypeLits import GHC.TypeLits
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.Async
import Codec.Serialise hiding (encode,decode) import Codec.Serialise hiding (encode,decode)
@ -23,13 +26,25 @@ data AnyMessage e = AnyMessage Integer (Encoded e)
instance Serialise (Encoded e) => Serialise (AnyMessage e) instance Serialise (Encoded e) => Serialise (AnyMessage e)
data EngineEnv e = forall bus . ( Messaging bus e ByteString newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a }
, Serialise (Encoded e) deriving ( Functor
) => , Applicative
, Monad
, MonadIO
, MonadReader (EngineEnv e)
, MonadTrans
)
data EngineEnv e = forall bus m . ( Messaging bus e ByteString
, Serialise (Encoded e)
, MonadIO m
) =>
EngineEnv EngineEnv
{ _peer :: Maybe (Peer e) { _peer :: Maybe (Peer e)
, _self :: Peer e , _self :: Peer e
, bus :: bus , bus :: bus
, defer :: Pipeline m ()
} }
makeLenses 'EngineEnv makeLenses 'EngineEnv
@ -57,14 +72,6 @@ makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p))
} }
newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a }
deriving ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader (EngineEnv e)
)
runEngineM :: EngineEnv e -> EngineM e m a -> m a runEngineM :: EngineEnv e -> EngineM e m a -> m a
runEngineM e f = runReaderT (fromEngine f) e runEngineM e f = runReaderT (fromEngine f) e
@ -78,6 +85,13 @@ instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where
liftIO $ sendTo b (To p) (From s) bs liftIO $ sendTo b (To p) (From s) bs
instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where
deferred _ m = do
e@(EngineEnv { defer = d }) <- ask
lift $ runEngineM e m
pure ()
-- lift $ addJob d (lift $ runEngineM e m)
response resp = do response resp = do
env <- ask env <- ask
let proto = protoId @e @p (Proxy @p) let proto = protoId @e @p (Proxy @p)
@ -92,6 +106,7 @@ instance (MonadIO m, HasProtocol e p) => Response e p (EngineM e m) where
newEnv :: forall e bus m . ( Monad m newEnv :: forall e bus m . ( Monad m
, MonadIO m
, Messaging bus e ByteString , Messaging bus e ByteString
, Serialise (Encoded e) , Serialise (Encoded e)
) )
@ -99,8 +114,9 @@ newEnv :: forall e bus m . ( Monad m
-> bus -> bus
-> m (EngineEnv e) -> m (EngineEnv e)
newEnv p pipe = pure $ EngineEnv Nothing p pipe newEnv p pipe = do
de <- liftIO $ newPipeline defPipelineSize
pure $ EngineEnv Nothing p pipe de
runPeer :: forall e m a . ( MonadIO m runPeer :: forall e m a . ( MonadIO m
) )
@ -108,7 +124,7 @@ runPeer :: forall e m a . ( MonadIO m
-> [AnyProtocol e (EngineM e m)] -> [AnyProtocol e (EngineM e m)]
-> m a -> m a
runPeer env@(EngineEnv {bus = pipe}) hh = do runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do
let me = env ^. self let me = env ^. self
@ -116,8 +132,12 @@ runPeer env@(EngineEnv {bus = pipe}) hh = do
let disp = Map.fromList resp let disp = Map.fromList resp
-- let q = liftIO $ runPipeline d
runEngineM env $ do runEngineM env $ do
-- void $ liftIO $ runPipeline d
forever $ do forever $ do
messages <- receive pipe (To me) messages <- receive pipe (To me)

View File

@ -8,6 +8,7 @@ import Data.Kind
import GHC.TypeLits import GHC.TypeLits
import Data.Proxy import Data.Proxy
import Data.Hashable import Data.Hashable
import Control.Monad.IO.Class
-- e -> Transport (like, UDP or TChan) -- e -> Transport (like, UDP or TChan)
-- p -> L4 Protocol (like Ping/Pong) -- p -> L4 Protocol (like Ping/Pong)
@ -16,8 +17,9 @@ class (Hashable (Peer e), Eq (Peer e)) => HasPeer e where
data family (Peer e) :: Type data family (Peer e) :: Type
class Response e p (m :: Type -> Type) | p -> e where class MonadIO m => Response e p (m :: Type -> Type) | p -> e where
response :: p -> m () response :: p -> m ()
deferred :: Proxy p -> m () -> m ()
class Request e p (m :: Type -> Type) | p -> e where class Request e p (m :: Type -> Type) | p -> e where
request :: Peer e -> p -> m () request :: Peer e -> p -> m ()

View File

@ -9,6 +9,7 @@ import HBS2.Net.Messaging.Fake
import HBS2.Net.Peer import HBS2.Net.Peer
import HBS2.Storage.Simple import HBS2.Storage.Simple
import HBS2.Storage.Simple.Extra import HBS2.Storage.Simple.Extra
import HBS2.Actors
-- import Test.Tasty hiding (Timeout) -- import Test.Tasty hiding (Timeout)
-- import Test.Tasty.HUnit hiding (Timeout) -- import Test.Tasty.HUnit hiding (Timeout)
@ -74,10 +75,13 @@ blockSizeHandler s =
-- TODO: STORAGE: seek for block -- TODO: STORAGE: seek for block
-- TODO: defer answer (?) -- TODO: defer answer (?)
hasBlock s h >>= \case -- TODO: does it really work?
Just size -> response (BlockSize @e h size) deferred (Proxy @(BlockSize e))$ do
Nothing -> response (NoBlock @e h) hasBlock s h >>= \case
Just size -> response (BlockSize @e h size)
Nothing -> response (NoBlock @e h)
-- deferred (Proxy @(BlockSize e)) $ do
NoBlock h -> debug $ "NoBlock" <+> pretty h NoBlock h -> debug $ "NoBlock" <+> pretty h
BlockSize h sz -> debug $ "BlockSize" <+> pretty h <+> pretty sz BlockSize h sz -> debug $ "BlockSize" <+> pretty h <+> pretty sz
@ -154,7 +158,9 @@ test1 = do
mapM_ wait peerz mapM_ wait peerz
void $ waitAnyCatchCancel peerz (_, e) <- waitAnyCatchCancel peerz
debug (pretty $ show e)
debug "we're done" debug "we're done"