diff --git a/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs new file mode 100644 index 00000000..681cdcc9 --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/Sessions.hs @@ -0,0 +1,81 @@ +{-# Language FunctionalDependencies #-} +module HBS2.Net.Proto.Sessions where + +import HBS2.Net.Proto.Types + +import Data.Typeable +import Data.Dynamic +import Data.Hashable +import Data.Kind + +data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) Dynamic + +class Typeable a => Unkey a where + unKey :: Proxy a -> Dynamic -> Maybe a + +instance Typeable a => Unkey a where + unKey _ = fromDynamic @a + +newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey +newSKey s = SKey (Proxy @a) (toDyn s) + + +instance Hashable SKey where + hashWithSalt s (SKey p d) = hashWithSalt s (unKey p d) + + +instance Eq SKey where + (==) (SKey p1 a) (SKey p2 b) = unKey p1 a == unKey p1 b + + +-- we probably can not separate sessions +-- by sub-protocol types without +-- really crazy types. +-- +-- And if we really need this, it may be done +-- by injecting a protocol type into 'e' or +-- introducing a common ADT for all session types +-- for common 'e' i.e. 'engine' or 'transport' +-- +-- So it is that it is. + +data family SessionKey e p :: Type +type family SessionData e p :: Type + + +class ( Monad m + , HasProtocol e p + , Eq (SessionKey e p) + , Hashable (SessionKey e p) + , Typeable (SessionData e p) + ) => Sessions e p m | p -> e where + + + + -- | Session fetch function. + -- | It will insert a new session, if default value is Just something. + + find :: SessionKey e p -- ^ session key + -> (SessionData e p -> a) -- ^ modification function, i.e. lens + -> m (Maybe a) + + -- | Session fetch function. + -- | It will insert a new session, if default value is Just something. + + fetch :: Bool -- ^ do add new session if not exists + -> SessionData e p -- ^ default value in case it's not found + -> SessionKey e p -- ^ session key + -> (SessionData e p -> a ) -- ^ modification function, i.e. lens + -> m a + + -- | Session update function + -- | If will create a new session if it does not exist. + -- | A modified value (or default) value will we saved. + + update :: SessionData e p -- ^ default value in case it's not found + -> SessionKey e p -- ^ session key + -> (SessionData e p -> SessionData e p) -- ^ modification function, i.e. lens + -> m () + + expire :: SessionKey e p -> m () + diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index a1fffcff..879d56eb 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -122,7 +122,7 @@ instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync B class HasStorage m where getStorage :: m AnyStorage -data Fabriq e = forall bus . Messaging bus e ByteString => Fabriq bus +data Fabriq e = forall bus . (Serialise (Encoded e), Messaging bus e ByteString) => Fabriq bus class HasFabriq e m where getFabriq :: m (Fabriq e) @@ -253,8 +253,17 @@ instance ( MonadIO m liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) - - +instance ( MonadIO m + , HasProtocol e p + , HasFabriq e (PeerM e m) + , Serialise (Encoded e) + ) => Request e p (PeerM e m) where + request p msg = do + let proto = protoId @e @p (Proxy @p) + pipe <- getFabriq @e + me <- ownPeer @e + let bs = serialise (AnyMessage @e proto (encode msg)) + sendTo pipe (To p) (From me) bs runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () runPeerM s bus p f = do @@ -268,6 +277,9 @@ runPeerM s bus p f = do void $ liftIO $ stopPipeline de liftIO $ cancel as +withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () +withPeerM env action = void $ runReaderT (fromPeerM action) env + runProto :: forall e m . ( MonadIO m , HasOwnPeer e m , HasFabriq e m @@ -376,10 +388,11 @@ runTestPeer p zu = do mapM_ cancel [sw,cw] -handleBlockInfo :: forall e m . ( Monad m +handleBlockInfo :: forall e m . ( MonadIO m , Sessions e (BlockSize e) m , Default (SessionData e (BlockSize e)) , Ord (Peer e) + , Pretty (Peer e) ) => (Peer e, Hash HbSync, Maybe Integer) @@ -389,9 +402,28 @@ handleBlockInfo (p, h, sz') = do maybe1 sz' (pure ()) $ \sz -> do let bsz = fromIntegral sz update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) + liftIO $ debug $ "got block:" <+> pretty (p, h, sz) -- FIXME: turn back on event notification -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit + +blockDownloadLoop :: forall e . ( HasProtocol e (BlockSize e) + , Request e (BlockSize e) (PeerM e IO) + , Num (Peer e) + ) => PeerM e IO () +blockDownloadLoop = do + + -- w <- subscribe ??? + + request 1 (GetBlockSize @e "5KP4vM6RuEX6RA1ywthBMqZV5UJDLANC17UrF6zuWdRt") + request 1 (GetBlockSize @e "81JeD7LNR6Q7RYfyWBxfjJn1RsWzvegkUXae6FUNgrMZ") + + fix \next -> do + liftIO $ print "piu!" + + pause ( 0.85 :: Timeout 'Seconds ) + next + main :: IO () main = do hSetBuffering stderr LineBuffering @@ -425,11 +457,17 @@ main = do our <- async $ runTestPeer p0 $ \s -> do let blk = hasBlock s runPeerM (AnyStorage s) fake p0 $ do + env <- ask + + as <- liftIO $ async $ withPeerM env blockDownloadLoop + runProto @Fake [ makeResponse (blockSizeProto blk handleBlockInfo) -- , makeResponse (blockChunksProto undefined) ] + liftIO $ cancel as + pause ( 5 :: Timeout 'Seconds) mapM_ cancel (our:others)