diff --git a/cabal.project b/cabal.project index 4ae153ca..c029f303 100644 --- a/cabal.project +++ b/cabal.project @@ -1,2 +1,6 @@ -packages: **/*.cabal +packages: + hbs2/hbs2.cabal + hbs2-core/hbs2-core.cabal + hbs2-storage-simple/hbs2-storage-simple.cabal + diff --git a/hbs2-core/lib/HBS2/Actors/OldPeer.hs b/hbs2-core/lib/HBS2/Actors/OldPeer.hs new file mode 100644 index 00000000..8f809003 --- /dev/null +++ b/hbs2-core/lib/HBS2/Actors/OldPeer.hs @@ -0,0 +1,252 @@ +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} +module HBS2.Actors.Peer where + +import HBS2.Prelude +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Net.Messaging +import HBS2.Clock +import HBS2.Actors +import HBS2.Defaults + + +import Control.Concurrent.Async +import Control.Monad.Reader +import Control.Monad.Trans.Maybe +import Data.ByteString.Lazy ( ByteString ) +import Data.Digest.Murmur32 +import Data.Foldable hiding (find) +import Data.Hashable +import Data.Kind +import Data.Map qualified as Map +import GHC.TypeLits +import Lens.Micro.Platform +import System.Random qualified as Random +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Dynamic +import Data.Maybe + +import Codec.Serialise hiding (encode,decode) + + + +data AnyMessage e = AnyMessage Integer (Encoded e) + deriving stock (Generic) + +instance Serialise (Encoded e) => Serialise (AnyMessage e) + + +data EngineEnv e = forall bus . ( Messaging bus e ByteString + , Serialise (Encoded e) + ) => + EngineEnv + { _peer :: Maybe (Peer e) + , _self :: Peer e + , _sessions :: Cache SKey Dynamic + , bus :: bus + , defer :: Pipeline IO () + } + + +newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (EngineEnv e) + , MonadTrans + ) + +data ResponseEnv e = + ResponseEnv + { _engine :: EngineEnv e + , _respPeer :: Peer e + } + +newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadIO + , MonadReader (ResponseEnv e) + , MonadTrans + ) + + +makeLenses 'EngineEnv + +makeLenses 'ResponseEnv + +data AnyProtocol e m = forall p . ( HasProtocol e p + , Response e p m + ) => + AnyProtocol + { myProtoId :: Integer + , protoDecode :: Encoded e -> Maybe p + , protoEncode :: p -> Encoded e + , handle :: p -> m () + } + +makeResponse :: forall e p m . ( MonadIO m + , Response e p m + , HasProtocol e p + ) + => (p -> m ()) -> AnyProtocol e m + +makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) + , protoDecode = decode + , protoEncode = encode + , handle = h + } + + +runEngineM :: EngineEnv e -> EngineM e m a -> m a +runEngineM e f = runReaderT (fromEngine f) e + +runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a +runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p) + +instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where + request p msg = do + let proto = protoId @e @p (Proxy @p) + ask >>= \case + EngineEnv { _self = s, bus = b} -> do + let bs = serialise (AnyMessage @e proto (encode msg)) + liftIO $ sendTo b (To p) (From s) bs + + + +instance ( MonadIO m + , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (ResponseM e m) where + + find k f = flip runEngineM (find k f) =<< asks (view engine) + + fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) + + update d k f = flip runEngineM (update d k f) =<< asks (view engine) + + expire k = flip runEngineM (expire k) =<< asks (view engine) + + +instance ( MonadIO m + , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (EngineM e m) where + + + find k f = do + se <- asks (view sessions) + let sk = newSKey @(SessionKey e p) k + r <- liftIO $ Cache.lookup se sk + case fromDynamic @(SessionData e p) <$> r of + Just v -> pure $ f <$> v + Nothing -> pure Nothing + + fetch upd def k fn = do + se <- asks (view sessions) + let sk = newSKey @(SessionKey e p) k + let ddef = toDyn def + + r <- liftIO $ Cache.lookup se sk + + case r of + Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v ) + Nothing -> do + when upd $ liftIO $ Cache.insert se sk ddef + pure (fn def) + + update def k f = do + se <- asks (view sessions) + val <- fetch @e @p True def k id + liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) + + expire k = do + se <- asks (view sessions) + liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) + +instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where + + thatPeer _ = asks (view respPeer) + + deferred _ m = do + e@(EngineEnv { defer = d }) <- asks (view engine) + p <- asks (view respPeer) + addJob d (runEngineM e (runResponseM e p m)) + + response resp = do + env <- ask + let p = env ^. respPeer + let s = env ^. (engine . self) + let proto = protoId @e @p (Proxy @p) + let bs = serialise (AnyMessage @e proto (encode resp)) + + -- TODO: wrap somehow + case env ^. engine of + EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs + +newEnv :: forall e bus m . ( Monad m + , MonadIO m + , Messaging bus e ByteString + , Serialise (Encoded e) + ) + => Peer e + -> bus + -> m (EngineEnv e) + +newEnv p pipe = do + de <- liftIO $ newPipeline defProtoPipelineSize + se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass + pure $ EngineEnv Nothing p se pipe de + +runPeer :: forall e m a . ( MonadIO m + ) + => EngineEnv e + -> [AnyProtocol e (ResponseM e m)] + -> m a + +runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do + + let me = env ^. self + + let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] + + let disp = Map.fromList resp + + void $ liftIO $ async $ runPipeline d + + runEngineM env $ do + + forever $ do + messages <- receive pipe (To me) + + for_ messages $ \(From pip, bs) -> do + + case deserialiseOrFail @(AnyMessage e) bs of + + Left _-> pure () -- liftIO $ print "failed to deserialise" + + Right (AnyMessage n msg) -> do + + local (set peer (Just pip)) do + + ee <- ask + + case Map.lookup n disp of + Nothing -> pure () + + Just (AnyProtocol { protoDecode = decoder + , handle = h + }) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg) + + + diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 8f809003..e220dc54 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,252 +1,3 @@ -{-# Language TemplateHaskell #-} -{-# Language UndecidableInstances #-} module HBS2.Actors.Peer where -import HBS2.Prelude -import HBS2.Prelude.Plated -import HBS2.Net.Proto -import HBS2.Net.Messaging -import HBS2.Clock -import HBS2.Actors -import HBS2.Defaults - - -import Control.Concurrent.Async -import Control.Monad.Reader -import Control.Monad.Trans.Maybe -import Data.ByteString.Lazy ( ByteString ) -import Data.Digest.Murmur32 -import Data.Foldable hiding (find) -import Data.Hashable -import Data.Kind -import Data.Map qualified as Map -import GHC.TypeLits -import Lens.Micro.Platform -import System.Random qualified as Random -import Data.Cache (Cache) -import Data.Cache qualified as Cache -import Data.Dynamic -import Data.Maybe - -import Codec.Serialise hiding (encode,decode) - - - -data AnyMessage e = AnyMessage Integer (Encoded e) - deriving stock (Generic) - -instance Serialise (Encoded e) => Serialise (AnyMessage e) - - -data EngineEnv e = forall bus . ( Messaging bus e ByteString - , Serialise (Encoded e) - ) => - EngineEnv - { _peer :: Maybe (Peer e) - , _self :: Peer e - , _sessions :: Cache SKey Dynamic - , bus :: bus - , defer :: Pipeline IO () - } - - -newtype EngineM e m a = EngineM { fromEngine :: ReaderT (EngineEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (EngineEnv e) - , MonadTrans - ) - -data ResponseEnv e = - ResponseEnv - { _engine :: EngineEnv e - , _respPeer :: Peer e - } - -newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadIO - , MonadReader (ResponseEnv e) - , MonadTrans - ) - - -makeLenses 'EngineEnv - -makeLenses 'ResponseEnv - -data AnyProtocol e m = forall p . ( HasProtocol e p - , Response e p m - ) => - AnyProtocol - { myProtoId :: Integer - , protoDecode :: Encoded e -> Maybe p - , protoEncode :: p -> Encoded e - , handle :: p -> m () - } - -makeResponse :: forall e p m . ( MonadIO m - , Response e p m - , HasProtocol e p - ) - => (p -> m ()) -> AnyProtocol e m - -makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) - , protoDecode = decode - , protoEncode = encode - , handle = h - } - - -runEngineM :: EngineEnv e -> EngineM e m a -> m a -runEngineM e f = runReaderT (fromEngine f) e - -runResponseM :: Monad m => EngineEnv e -> Peer e -> ResponseM e m a -> EngineM e m a -runResponseM eng p f = lift $ runReaderT (fromResponse f) (ResponseEnv eng p) - -instance (MonadIO m, HasProtocol e p) => Request e p (EngineM e m) where - request p msg = do - let proto = protoId @e @p (Proxy @p) - ask >>= \case - EngineEnv { _self = s, bus = b} -> do - let bs = serialise (AnyMessage @e proto (encode msg)) - liftIO $ sendTo b (To p) (From s) bs - - - -instance ( MonadIO m - , HasProtocol e p - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (ResponseM e m) where - - find k f = flip runEngineM (find k f) =<< asks (view engine) - - fetch i d k f = flip runEngineM (fetch i d k f) =<< asks (view engine) - - update d k f = flip runEngineM (update d k f) =<< asks (view engine) - - expire k = flip runEngineM (expire k) =<< asks (view engine) - - -instance ( MonadIO m - , HasProtocol e p - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (EngineM e m) where - - - find k f = do - se <- asks (view sessions) - let sk = newSKey @(SessionKey e p) k - r <- liftIO $ Cache.lookup se sk - case fromDynamic @(SessionData e p) <$> r of - Just v -> pure $ f <$> v - Nothing -> pure Nothing - - fetch upd def k fn = do - se <- asks (view sessions) - let sk = newSKey @(SessionKey e p) k - let ddef = toDyn def - - r <- liftIO $ Cache.lookup se sk - - case r of - Just v -> pure $ fn $ fromMaybe def (fromDynamic @(SessionData e p) v ) - Nothing -> do - when upd $ liftIO $ Cache.insert se sk ddef - pure (fn def) - - update def k f = do - se <- asks (view sessions) - val <- fetch @e @p True def k id - liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) - - expire k = do - se <- asks (view sessions) - liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) - -instance (HasProtocol e p, Serialise (Encoded e)) => Response e p (ResponseM e IO) where - - thatPeer _ = asks (view respPeer) - - deferred _ m = do - e@(EngineEnv { defer = d }) <- asks (view engine) - p <- asks (view respPeer) - addJob d (runEngineM e (runResponseM e p m)) - - response resp = do - env <- ask - let p = env ^. respPeer - let s = env ^. (engine . self) - let proto = protoId @e @p (Proxy @p) - let bs = serialise (AnyMessage @e proto (encode resp)) - - -- TODO: wrap somehow - case env ^. engine of - EngineEnv { bus = b } -> liftIO $ sendTo b (To p) (From s) bs - -newEnv :: forall e bus m . ( Monad m - , MonadIO m - , Messaging bus e ByteString - , Serialise (Encoded e) - ) - => Peer e - -> bus - -> m (EngineEnv e) - -newEnv p pipe = do - de <- liftIO $ newPipeline defProtoPipelineSize - se <- liftIO $ Cache.newCache (Just defCookieTimeout) -- FIXME: some more clever for timeout, i.e. typeclass - pure $ EngineEnv Nothing p se pipe de - -runPeer :: forall e m a . ( MonadIO m - ) - => EngineEnv e - -> [AnyProtocol e (ResponseM e m)] - -> m a - -runPeer env@(EngineEnv {bus = pipe, defer = d}) hh = do - - let me = env ^. self - - let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] - - let disp = Map.fromList resp - - void $ liftIO $ async $ runPipeline d - - runEngineM env $ do - - forever $ do - messages <- receive pipe (To me) - - for_ messages $ \(From pip, bs) -> do - - case deserialiseOrFail @(AnyMessage e) bs of - - Left _-> pure () -- liftIO $ print "failed to deserialise" - - Right (AnyMessage n msg) -> do - - local (set peer (Just pip)) do - - ee <- ask - - case Map.lookup n disp of - Nothing -> pure () - - Just (AnyProtocol { protoDecode = decoder - , handle = h - }) -> maybe (pure ()) (runResponseM ee pip . h) (decoder msg) - -