basic events work

This commit is contained in:
Dmitry Zuikov 2023-01-22 07:39:31 +03:00
parent e0efd2ac1d
commit 0f458134d3
4 changed files with 74 additions and 19 deletions

View File

@ -1,5 +1,6 @@
{-# Language TemplateHaskell #-}
{-# Language UndecidableInstances #-}
{-# Language AllowAmbiguousTypes #-}
module HBS2.Actors.Peer where
import HBS2.Prelude.Plated
@ -10,7 +11,9 @@ import HBS2.Net.Proto
import HBS2.Net.Messaging
import HBS2.Net.Proto.Sessions
import HBS2.Defaults
import HBS2.Events
import Control.Monad.Trans.Maybe
import Codec.Serialise hiding (encode,decode)
import Control.Concurrent.Async
import Control.Monad.Reader
@ -84,6 +87,7 @@ data PeerEnv e =
, _envStorage :: AnyStorage
, _envDeferred :: Pipeline IO ()
, _envSessions :: Cache SKey Dynamic
, _envEvents :: Cache SKey Dynamic
}
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a }
@ -183,11 +187,45 @@ instance ( MonadIO m
let bs = serialise (AnyMessage @e proto (encode msg))
sendTo pipe (To p) (From me) bs
instance ( HasProtocol e p
, Typeable (EventHandler e p (PeerM e IO))
, Typeable (EventKey e p)
, Typeable (Event e p)
, Hashable (EventKey e p)
, Eq (EventKey e p)
) => EventListener e p (PeerM e IO) where
subscribe k h = do
ev <- asks (view envEvents)
liftIO $ Cache.insert ev (newSKey @(EventKey e p) k) (toDyn h)
pure ()
instance ( HasProtocol e p
, Typeable (EventKey e p)
, Typeable (Event e p)
, Hashable (EventKey e p)
, Eq (EventKey e p)
, Typeable (EventHandler e p (PeerM e IO))
) => EventEmitter e p (PeerM e IO) where
emit k d = do
se <- asks (view envEvents)
let sk = newSKey @(EventKey e p) k
void $ runMaybeT $ do
r <- MaybeT $ liftIO $ Cache.lookup se sk
ev <- MaybeT $ pure $ fromDynamic @(EventHandler e p (PeerM e IO)) r
lift $ ev d
runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m ()
runPeerM s bus p f = do
env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize
<*> liftIO (Cache.newCache (Just defCookieTimeout))
<*> liftIO (Cache.newCache (Just defCookieTimeout))
let de = view envDeferred env
as <- liftIO $ async $ runPipeline de
@ -211,8 +249,6 @@ runProto hh = do
me <- ownPeer @e @m
pipe <- getFabriq
-- defer <- newPipeline @(ResponseM e m ()) @m defProtoPipelineSize
let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ]
let disp = Map.fromList resp
@ -278,3 +314,14 @@ instance ( MonadIO m
expire k = lift (expire k)
instance ( MonadIO m
, Hashable (EventKey e p)
, EventEmitter e p m
) => EventEmitter e p (ResponseM e m) where
emit k d = lift $ emit k d

View File

@ -3,7 +3,6 @@ module HBS2.Events where
import Data.Kind
-- General Events class.
--
-- It's may be way too general.
@ -28,11 +27,16 @@ import Data.Kind
-- I suspect that 'e' has a global meaning and
-- represent an 'interpreter'.
class Monad m => HasEvents e a m | a -> e where
data family EventKey e a :: Type
type family Event e a :: Type
data family Event e a :: Type
type EventHandler e a m = Event e a -> m ()
class Monad m => EventListener e a m | a -> e where
subscribe :: EventKey e a -> EventHandler e a m -> m ()
class Monad m => EventEmitter e a m | a -> e where
emit :: EventKey e a -> Event e a -> m ()
subscribe :: EventKey e a -> Event e a -> m ()

View File

@ -3,6 +3,7 @@ module HBS2.Net.Proto.BlockInfo where
import HBS2.Prelude.Plated
import HBS2.Net.Proto
import HBS2.Net.Proto.Sessions
import HBS2.Events
import HBS2.Hash
import Codec.Serialise ()
@ -47,3 +48,13 @@ newtype instance SessionKey e (BlockSize e) =
deriving stock (Typeable,Eq,Show)
deriving newtype (Hashable,IsString)
newtype instance EventKey e (BlockSize e) =
BlockSizeEventKey ()
deriving stock (Typeable, Eq)
deriving newtype (Hashable)
data instance Event e (BlockSize e) =
BlockSizeEvent
deriving stock (Typeable)

View File

@ -131,6 +131,7 @@ handleBlockInfo :: forall e m . ( MonadIO m
, Default (SessionData e (BlockSize e))
, Ord (Peer e)
, Pretty (Peer e)
, EventEmitter e (BlockSize e) m
)
=> (Peer e, Hash HbSync, Maybe Integer)
@ -141,28 +142,20 @@ handleBlockInfo (p, h, sz') = do
let bsz = fromIntegral sz
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
liftIO $ debug $ "got block:" <+> pretty (p, h, sz)
emit @e (BlockSizeEventKey ()) BlockSizeEvent
-- FIXME: turn back on event notification
-- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit
instance HasEvents Fake (BlockSize Fake) (PeerM Fake IO) where
data instance EventKey Fake (BlockSize Fake) = BlockSizeEvent ()
type instance Event Fake (BlockSize Fake) = ()
subscribe = undefined
blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e)
, Request e (BlockSize e) (PeerM e IO)
, HasEvents e (BlockSize e) (PeerM e IO)
, EventListener e (BlockSize e) (PeerM e IO)
, Num (Peer e)
) => PeerM e IO ()
blockDownloadLoop = do
-- w <- subscribe ???
--
-- subscribe @(GetBlockSize e) $ \(p,h,i) -> do
-- debug "WE GOT BLOCK!"
w <- subscribe @e @(BlockSize e) (BlockSizeEventKey ()) $ \_ -> do
debug "can't believe this shit works"
request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")
request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")