This commit is contained in:
Dmitry Zuikov 2023-01-21 15:24:53 +03:00
parent 76579675b6
commit d5ea301ffc
2 changed files with 123 additions and 4 deletions

View File

@ -0,0 +1,81 @@
{-# Language FunctionalDependencies #-}
module HBS2.Net.Proto.Sessions where
import HBS2.Net.Proto.Types
import Data.Typeable
import Data.Dynamic
import Data.Hashable
import Data.Kind
data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) Dynamic
class Typeable a => Unkey a where
unKey :: Proxy a -> Dynamic -> Maybe a
instance Typeable a => Unkey a where
unKey _ = fromDynamic @a
newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey
newSKey s = SKey (Proxy @a) (toDyn s)
instance Hashable SKey where
hashWithSalt s (SKey p d) = hashWithSalt s (unKey p d)
instance Eq SKey where
(==) (SKey p1 a) (SKey p2 b) = unKey p1 a == unKey p1 b
-- we probably can not separate sessions
-- by sub-protocol types without
-- really crazy types.
--
-- And if we really need this, it may be done
-- by injecting a protocol type into 'e' or
-- introducing a common ADT for all session types
-- for common 'e' i.e. 'engine' or 'transport'
--
-- So it is that it is.
data family SessionKey e p :: Type
type family SessionData e p :: Type
class ( Monad m
, HasProtocol e p
, Eq (SessionKey e p)
, Hashable (SessionKey e p)
, Typeable (SessionData e p)
) => Sessions e p m | p -> e where
-- | Session fetch function.
-- | It will insert a new session, if default value is Just something.
find :: SessionKey e p -- ^ session key
-> (SessionData e p -> a) -- ^ modification function, i.e. lens
-> m (Maybe a)
-- | Session fetch function.
-- | It will insert a new session, if default value is Just something.
fetch :: Bool -- ^ do add new session if not exists
-> SessionData e p -- ^ default value in case it's not found
-> SessionKey e p -- ^ session key
-> (SessionData e p -> a ) -- ^ modification function, i.e. lens
-> m a
-- | Session update function
-- | If will create a new session if it does not exist.
-- | A modified value (or default) value will we saved.
update :: SessionData e p -- ^ default value in case it's not found
-> SessionKey e p -- ^ session key
-> (SessionData e p -> SessionData e p) -- ^ modification function, i.e. lens
-> m ()
expire :: SessionKey e p -> m ()

View File

@ -122,7 +122,7 @@ instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync B
class HasStorage m where
getStorage :: m AnyStorage
data Fabriq e = forall bus . Messaging bus e ByteString => Fabriq bus
data Fabriq e = forall bus . (Serialise (Encoded e), Messaging bus e ByteString) => Fabriq bus
class HasFabriq e m where
getFabriq :: m (Fabriq e)
@ -253,8 +253,17 @@ instance ( MonadIO m
liftIO $ Cache.delete se (newSKey @(SessionKey e p) k)
instance ( MonadIO m
, HasProtocol e p
, HasFabriq e (PeerM e m)
, Serialise (Encoded e)
) => Request e p (PeerM e m) where
request p msg = do
let proto = protoId @e @p (Proxy @p)
pipe <- getFabriq @e
me <- ownPeer @e
let bs = serialise (AnyMessage @e proto (encode msg))
sendTo pipe (To p) (From me) bs
runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m ()
runPeerM s bus p f = do
@ -268,6 +277,9 @@ runPeerM s bus p f = do
void $ liftIO $ stopPipeline de
liftIO $ cancel as
withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m ()
withPeerM env action = void $ runReaderT (fromPeerM action) env
runProto :: forall e m . ( MonadIO m
, HasOwnPeer e m
, HasFabriq e m
@ -376,10 +388,11 @@ runTestPeer p zu = do
mapM_ cancel [sw,cw]
handleBlockInfo :: forall e m . ( Monad m
handleBlockInfo :: forall e m . ( MonadIO m
, Sessions e (BlockSize e) m
, Default (SessionData e (BlockSize e))
, Ord (Peer e)
, Pretty (Peer e)
)
=> (Peer e, Hash HbSync, Maybe Integer)
@ -389,9 +402,28 @@ handleBlockInfo (p, h, sz') = do
maybe1 sz' (pure ()) $ \sz -> do
let bsz = fromIntegral sz
update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz))
liftIO $ debug $ "got block:" <+> pretty (p, h, sz)
-- FIXME: turn back on event notification
-- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit
blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e)
, Request e (BlockSize e) (PeerM e IO)
, Num (Peer e)
) => PeerM e IO ()
blockDownloadLoop = do
-- w <- subscribe ???
request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt")
request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ")
fix \next -> do
liftIO $ print "piu!"
pause ( 0.85 :: Timeout 'Seconds )
next
main :: IO ()
main = do
hSetBuffering stderr LineBuffering
@ -425,11 +457,17 @@ main = do
our <- async $ runTestPeer p0 $ \s -> do
let blk = hasBlock s
runPeerM (AnyStorage s) fake p0 $ do
env <- ask
as <- liftIO $ async $ withPeerM env blockDownloadLoop
runProto @Fake
[ makeResponse (blockSizeProto blk handleBlockInfo)
-- , makeResponse (blockChunksProto undefined)
]
liftIO $ cancel as
pause ( 5 :: Timeout 'Seconds)
mapM_ cancel (our:others)