From 76579675b66418b3e70b3bdefacc0898ea471d7d Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 21 Jan 2023 14:41:26 +0300 Subject: [PATCH] wip --- hbs2-core/hbs2-core.cabal | 7 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 19 ----- hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs | 1 + hbs2-core/lib/HBS2/Net/Proto/Types.hs | 53 ------------- hbs2-tests/test/Peer2Main.hs | 92 ++++++++++++++++++++++- 5 files changed, 94 insertions(+), 78 deletions(-) diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index baf979d3..e55763bb 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -66,7 +66,7 @@ library exposed-modules: HBS2.Actors , HBS2.Actors.ChunkWriter - , HBS2.Actors.Peer + -- , HBS2.Actors.Peer , HBS2.Clock , HBS2.Data.Types , HBS2.Data.Types.Refs @@ -78,9 +78,10 @@ library , HBS2.Net.PeerLocator , HBS2.Net.PeerLocator.Static , HBS2.Net.Proto - , HBS2.Net.Proto.Types - , HBS2.Net.Proto.BlockInfo , HBS2.Net.Proto.BlockChunks + , HBS2.Net.Proto.BlockInfo + , HBS2.Net.Proto.Sessions + , HBS2.Net.Proto.Types , HBS2.Prelude , HBS2.Prelude.Plated , HBS2.Storage diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 8a1fe9f4..8f809003 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -31,25 +31,6 @@ import Data.Maybe import Codec.Serialise hiding (encode,decode) -data SKey = forall a . (Unkey a, Eq a, Hashable a) => SKey (Proxy a) Dynamic - -class Typeable a => Unkey a where - unfuck :: Proxy a -> Dynamic -> Maybe a - -instance Typeable a => Unkey a where - unfuck _ = fromDynamic @a - -newSKey :: forall a . (Eq a, Typeable a, Unkey a, Hashable a) => a -> SKey -newSKey s = SKey (Proxy @a) (toDyn s) - - -instance Hashable SKey where - hashWithSalt s (SKey p d) = hashWithSalt s (unfuck p d) - - -instance Eq SKey where - (==) (SKey p1 a) (SKey p2 b) = unfuck p1 a == unfuck p1 b - data AnyMessage e = AnyMessage Integer (Encoded e) deriving stock (Generic) diff --git a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs index 09d081d8..585c0585 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/BlockInfo.hs @@ -2,6 +2,7 @@ module HBS2.Net.Proto.BlockInfo where import HBS2.Prelude.Plated import HBS2.Net.Proto +import HBS2.Net.Proto.Sessions import HBS2.Hash import Codec.Serialise () diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 4039e874..33503afc 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -11,7 +11,6 @@ import GHC.TypeLits import Data.Proxy import Data.Hashable import Control.Monad.IO.Class -import Data.Typeable import System.Random qualified as Random import Data.Digest.Murmur32 @@ -41,58 +40,6 @@ class Request e p (m :: Type -> Type) | p -> e where request :: Peer e -> p -> m () --- we probably can not separate sessions --- by sub-protocol types without --- really crazy types. --- --- And if we really need this, it may be done --- by injecting a protocol type into 'e' or --- introducing a common ADT for all session types --- for common 'e' i.e. 'engine' or 'transport' --- --- So it is that it is. - -data family SessionKey e p :: Type -type family SessionData e p :: Type - - -class ( Monad m - , HasProtocol e p - , Eq (SessionKey e p) - , Hashable (SessionKey e p) - , Typeable (SessionData e p) - -- , Typeable e - -- , Typeable p - ) => Sessions e p m | p -> e where - - - - -- | Session fetch function. - -- | It will insert a new session, if default value is Just something. - - find :: SessionKey e p -- ^ session key - -> (SessionData e p -> a) -- ^ modification function, i.e. lens - -> m (Maybe a) - - -- | Session fetch function. - -- | It will insert a new session, if default value is Just something. - - fetch :: Bool -- ^ do add new session if not exists - -> SessionData e p -- ^ default value in case it's not found - -> SessionKey e p -- ^ session key - -> (SessionData e p -> a ) -- ^ modification function, i.e. lens - -> m a - - -- | Session update function - -- | If will create a new session if it does not exist. - -- | A modified value (or default) value will we saved. - - update :: SessionData e p -- ^ default value in case it's not found - -> SessionKey e p -- ^ session key - -> (SessionData e p -> SessionData e p) -- ^ modification function, i.e. lens - -> m () - - expire :: SessionKey e p -> m () class (KnownNat (ProtocolId p), HasPeer e) => HasProtocol e p | p -> e where type family ProtocolId p = (id :: Nat) | id -> p diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index 8c7aadab..a1fffcff 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -12,6 +12,7 @@ import HBS2.Hash import HBS2.Net.Messaging import HBS2.Net.Messaging.Fake import HBS2.Net.Proto +import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo import HBS2.Prelude.Plated @@ -26,10 +27,14 @@ import Control.Concurrent.Async import Control.Monad.Reader import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy.Char8 qualified as B8 +import Data.Cache (Cache) +import Data.Cache qualified as Cache import Data.Default -import Data.Foldable +import Data.Dynamic +import Data.Foldable hiding (find) import Data.Map (Map) import Data.Map qualified as Map +import Data.Maybe import Data.Word import GHC.TypeLits import Lens.Micro.Platform @@ -79,7 +84,7 @@ instance HasProtocol Fake (BlockChunks Fake) where encode = serialise -type instance SessionData Fake (BlockSize Fake) = BlockSizeSession Fake +type instance SessionData e (BlockSize e) = BlockSizeSession e type instance SessionData Fake (BlockChunks Fake) = BlockDownload newtype instance SessionKey Fake (BlockChunks Fake) = @@ -160,6 +165,7 @@ data PeerEnv e = , _envFab :: Fabriq e , _envStorage :: AnyStorage , _envDeferred :: Pipeline IO () + , _envSessions :: Cache SKey Dynamic } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -206,9 +212,56 @@ instance Monad m => HasFabriq e (PeerM e m) where instance Monad m => HasStorage (PeerM e m) where getStorage = asks (view envStorage) + +instance ( MonadIO m + , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (PeerM e m) where + + + find k f = do + se <- asks (view envSessions) + let sk = newSKey @(SessionKey e p) k + r <- liftIO $ Cache.lookup se sk + case fromDynamic @(SessionData e p) <$> r of + Just v -> pure $ f <$> v + Nothing -> pure Nothing + + fetch upd de k fn = do + se <- asks (view envSessions) + let sk = newSKey @(SessionKey e p) k + let ddef = toDyn de + + r <- liftIO $ Cache.lookup se sk + + case r of + Just v -> pure $ fn $ fromMaybe de (fromDynamic @(SessionData e p) v ) + Nothing -> do + when upd $ liftIO $ Cache.insert se sk ddef + pure (fn de) + + update de k f = do + se <- asks (view envSessions) + val <- fetch @e @p True de k id + liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) + + expire k = do + se <- asks (view envSessions) + liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) + + + + + runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () runPeerM s bus p f = do + env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize + <*> liftIO (Cache.newCache (Just defCookieTimeout)) + let de = view envDeferred env as <- liftIO $ async $ runPipeline de void $ runReaderT (fromPeerM f) env @@ -277,6 +330,25 @@ instance ( HasProtocol e p sendTo fab (To who) (From self) bs + +instance ( MonadIO m + , HasProtocol e p + , Sessions e p m + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (ResponseM e m) where + + find k f = lift (find k f) + + fetch i d k f = lift (fetch i d k f) + + update d k f = lift (update d k f) + + expire k = lift (expire k) + + runTestPeer :: Peer Fake -> (SimpleStorage HbSync -> IO ()) -> IO () @@ -304,7 +376,21 @@ runTestPeer p zu = do mapM_ cancel [sw,cw] +handleBlockInfo :: forall e m . ( Monad m + , Sessions e (BlockSize e) m + , Default (SessionData e (BlockSize e)) + , Ord (Peer e) + ) + => (Peer e, Hash HbSync, Maybe Integer) + -> m () + +handleBlockInfo (p, h, sz') = do + maybe1 sz' (pure ()) $ \sz -> do + let bsz = fromIntegral sz + update @e def (BlockSizeKey h) (over bsBlockSizes (Map.insert p bsz)) + -- FIXME: turn back on event notification + -- lift $ runEngineM env $ emitBlockSizeEvent ev h (p, h, Just sz) -- TODO: fix this crazy shit main :: IO () main = do @@ -340,7 +426,7 @@ main = do let blk = hasBlock s runPeerM (AnyStorage s) fake p0 $ do runProto @Fake - [ makeResponse (blockSizeProto blk dontHandle) + [ makeResponse (blockSizeProto blk handleBlockInfo) -- , makeResponse (blockChunksProto undefined) ]