diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 0fdb7d99..134c14d6 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,5 +1,6 @@ {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} +{-# Language AllowAmbiguousTypes #-} module HBS2.Actors.Peer where import HBS2.Prelude.Plated @@ -10,7 +11,9 @@ import HBS2.Net.Proto import HBS2.Net.Messaging import HBS2.Net.Proto.Sessions import HBS2.Defaults +import HBS2.Events +import Control.Monad.Trans.Maybe import Codec.Serialise hiding (encode,decode) import Control.Concurrent.Async import Control.Monad.Reader @@ -84,6 +87,7 @@ data PeerEnv e = , _envStorage :: AnyStorage , _envDeferred :: Pipeline IO () , _envSessions :: Cache SKey Dynamic + , _envEvents :: Cache SKey Dynamic } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -183,11 +187,45 @@ instance ( MonadIO m let bs = serialise (AnyMessage @e proto (encode msg)) sendTo pipe (To p) (From me) bs + + +instance ( HasProtocol e p + , Typeable (EventHandler e p (PeerM e IO)) + , Typeable (EventKey e p) + , Typeable (Event e p) + , Hashable (EventKey e p) + , Eq (EventKey e p) + ) => EventListener e p (PeerM e IO) where + + subscribe k h = do + ev <- asks (view envEvents) + liftIO $ Cache.insert ev (newSKey @(EventKey e p) k) (toDyn h) + pure () + +instance ( HasProtocol e p + , Typeable (EventKey e p) + , Typeable (Event e p) + , Hashable (EventKey e p) + , Eq (EventKey e p) + , Typeable (EventHandler e p (PeerM e IO)) + ) => EventEmitter e p (PeerM e IO) where + + emit k d = do + se <- asks (view envEvents) + let sk = newSKey @(EventKey e p) k + + void $ runMaybeT $ do + r <- MaybeT $ liftIO $ Cache.lookup se sk + ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r + lift $ ev d + + runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () runPeerM s bus p f = do env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize <*> liftIO (Cache.newCache (Just defCookieTimeout)) + <*> liftIO (Cache.newCache (Just defCookieTimeout)) let de = view envDeferred env as <- liftIO $ async $ runPipeline de @@ -211,8 +249,6 @@ runProto hh = do me <- ownPeer @e @m pipe <- getFabriq - -- defer <- newPipeline @(ResponseM e m ()) @m defProtoPipelineSize - let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] let disp = Map.fromList resp @@ -278,3 +314,14 @@ instance ( MonadIO m expire k = lift (expire k) + +instance ( MonadIO m + , Hashable (EventKey e p) + , EventEmitter e p m + ) => EventEmitter e p (ResponseM e m) where + + emit k d = lift $ emit k d + + + + diff --git a/hbs2-core/lib/HBS2/Events.hs b/hbs2-core/lib/HBS2/Events.hs index b1ad8ca6..05a49946 100644 --- a/hbs2-core/lib/HBS2/Events.hs +++ b/hbs2-core/lib/HBS2/Events.hs @@ -3,7 +3,6 @@ module HBS2.Events where import Data.Kind - -- General Events class. -- -- It's may be way too general. @@ -28,11 +27,16 @@ import Data.Kind -- I suspect that 'e' has a global meaning and -- represent an 'interpreter'. -class Monad m => HasEvents e a m | a -> e where +data family EventKey e a :: Type +data family Event e a :: Type - data family EventKey e a :: Type - type family Event e a :: Type +type EventHandler e a m = Event e a -> m () + +class Monad m => EventListener e a m | a -> e where + subscribe :: EventKey e a -> EventHandler e a m -> m () + +class Monad m => EventEmitter e a m | a -> e where + emit :: EventKey e a -> Event e a -> m () - subscribe :: EventKey e a -> Event e a -> m () diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 585c0585..50eb615b 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -3,6 +3,7 @@ module HBS2.Net.Proto.BlockInfo where import HBS2.Prelude.Plated import HBS2.Net.Proto import HBS2.Net.Proto.Sessions +import HBS2.Events import HBS2.Hash import Codec.Serialise () @@ -47,3 +48,13 @@ newtype instance SessionKey e (BlockSize e) = deriving stock (Typeable,Eq,Show) deriving newtype (Hashable,IsString) + +newtype instance EventKey e (BlockSize e) = + BlockSizeEventKey () + deriving stock (Typeable, Eq) + deriving newtype (Hashable) + +data instance Event e (BlockSize e) = + BlockSizeEvent + deriving stock (Typeable) + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index c86db5e5..243e3d5e 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -131,6 +131,7 @@ handleBlockInfo :: forall e m . ( MonadIO m , Default (SessionData e (BlockSize e)) , Ord (Peer e) , Pretty (Peer e) + , EventEmitter e (BlockSize e) m ) => (Peer e, Hash HbSync, Maybe Integer) @@ -141,28 +142,20 @@ handleBlockInfo (p, h, sz') = do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) liftIO $ debug $ "got block:" <+> pretty (p, h, sz) + emit @e (BlockSizeEventKey ()) BlockSizeEvent -- FIXME: turn back on event notification -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit -instance HasEvents Fake (BlockSize Fake) (PeerM Fake IO) where - data instance EventKey Fake (BlockSize Fake) = BlockSizeEvent () - type instance Event Fake (BlockSize Fake) = () - - subscribe = undefined - blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e) , Request e (BlockSize e) (PeerM e IO) - , HasEvents e (BlockSize e) (PeerM e IO) + , EventListener e (BlockSize e) (PeerM e IO) , Num (Peer e) ) => PeerM e IO () blockDownloadLoop = do - -- w <- subscribe ??? - -- - - -- subscribe @(GetBlockSize e) $ \(p,h,i) -> do - -- debug "WE GOT BLOCK!" + w <- subscribe @e @(BlockSize e) (BlockSizeEventKey ()) $ \_ -> do + debug "can't believe this shit works" request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt") request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")