From 881b60d6ea8faf49fc9207792e7cebab4e6cf0e5 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 21 Jan 2023 16:27:39 +0300 Subject: [PATCH] wip --- cabal.project | 6 +- hbs2-core/hbs2-core.cabal | 2 +- hbs2-core/lib/HBS2/Actors/OldPeer.hs | 2 +- hbs2-core/lib/HBS2/Actors/Peer.hs | 277 ++++++++++++++++++ hbs2-core/lib/HBS2/Storage.hs | 2 + .../lib/HBS2/Storage/Simple/Extra.hs | 2 +- hbs2-tests/test/Peer2Main.hs | 256 +--------------- 7 files changed, 284 insertions(+), 263 deletions(-) diff --git a/cabal.project b/cabal.project index c029f303..4ae153ca 100644 --- a/cabal.project +++ b/cabal.project @@ -1,6 +1,2 @@ -packages: - hbs2/hbs2.cabal - hbs2-core/hbs2-core.cabal - hbs2-storage-simple/hbs2-storage-simple.cabal - +packages: **/*.cabal diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index e55763bb..64fb9160 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -66,7 +66,7 @@ library exposed-modules: HBS2.Actors , HBS2.Actors.ChunkWriter - -- , HBS2.Actors.Peer + , HBS2.Actors.Peer , HBS2.Clock , HBS2.Data.Types , HBS2.Data.Types.Refs diff --git a/hbs2-core/lib/HBS2/Actors/OldPeer.hs b/hbs2-core/lib/HBS2/Actors/OldPeer.hs index 8f809003..f04c28d5 100644 --- a/hbs2-core/lib/HBS2/Actors/OldPeer.hs +++ b/hbs2-core/lib/HBS2/Actors/OldPeer.hs @@ -1,6 +1,6 @@ {-# Language TemplateHaskell #-} {-# Language UndecidableInstances #-} -module HBS2.Actors.Peer where +module HBS2.Actors.OldPeer where import HBS2.Prelude import HBS2.Prelude.Plated diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index e220dc54..0fdb7d99 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -1,3 +1,280 @@ +{-# Language TemplateHaskell #-} +{-# Language UndecidableInstances #-} module HBS2.Actors.Peer where +import HBS2.Prelude.Plated +import HBS2.Hash +import HBS2.Actors +import HBS2.Storage +import HBS2.Net.Proto +import HBS2.Net.Messaging +import HBS2.Net.Proto.Sessions +import HBS2.Defaults + +import Codec.Serialise hiding (encode,decode) +import Control.Concurrent.Async +import Control.Monad.Reader +import Data.ByteString.Lazy (ByteString) +import Data.Cache (Cache) +import Data.Cache qualified as Cache +import Data.Dynamic +import Data.Foldable hiding (find) +import Data.Map qualified as Map +import Data.Maybe +import GHC.TypeLits +import Lens.Micro.Platform + +data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu + +instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync ByteString IO where + + putBlock (AnyStorage s) = putBlock s + enqueueBlock (AnyStorage s) = enqueueBlock s + getBlock (AnyStorage s) = getBlock s + getChunk (AnyStorage s) = getChunk s + hasBlock (AnyStorage s) = hasBlock s + +class Monad m => HasOwnPeer e m where + ownPeer :: m (Peer e) + +class HasStorage m where + getStorage :: m AnyStorage + +data Fabriq e = forall bus . (Serialise (Encoded e), Messaging bus e ByteString) => Fabriq bus + +class HasFabriq e m where + getFabriq :: m (Fabriq e) + +instance HasPeer e => Messaging (Fabriq e) e ByteString where + sendTo (Fabriq bus) = sendTo bus + receive (Fabriq bus) = receive bus + +data AnyMessage e = AnyMessage Integer (Encoded e) + deriving stock (Generic) + +instance Serialise (Encoded e) => Serialise (AnyMessage e) + + +data AnyProtocol e m = forall p . ( HasProtocol e p + , Response e p m + ) => + AnyProtocol + { myProtoId :: Integer + , protoDecode :: Encoded e -> Maybe p + , protoEncode :: p -> Encoded e + , handle :: p -> m () + } + +makeResponse :: forall e p m . ( MonadIO m + , Response e p m + , HasProtocol e p + ) + => (p -> m ()) -> AnyProtocol e m + +makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) + , protoDecode = decode + , protoEncode = encode + , handle = h + } + +data PeerEnv e = + PeerEnv + { _envSelf :: Peer e + , _envFab :: Fabriq e + , _envStorage :: AnyStorage + , _envDeferred :: Pipeline IO () + , _envSessions :: Cache SKey Dynamic + } + +newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadReader (PeerEnv e) + , MonadIO + ) + + +newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } + deriving newtype ( Functor + , Applicative + , Monad + , MonadReader (ResponseEnv e) + , MonadIO + , MonadTrans + ) + +newtype ResponseEnv e = + ResponseEnv + { _answTo :: Peer e + } + +makeLenses 'PeerEnv + +makeLenses 'ResponseEnv + + +runResponseM :: forall e m . (Monad m) + => Peer e + -> ResponseM e m () + -> m () + +runResponseM peer f = runReaderT (fromResponse f) (ResponseEnv peer) + +instance Monad m => HasOwnPeer e (PeerM e m) where + ownPeer = asks (view envSelf) + +instance Monad m => HasFabriq e (PeerM e m) where + getFabriq = asks (view envFab) + +instance Monad m => HasStorage (PeerM e m) where + getStorage = asks (view envStorage) + + +instance ( MonadIO m + , HasProtocol e p + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (PeerM e m) where + + + find k f = do + se <- asks (view envSessions) + let sk = newSKey @(SessionKey e p) k + r <- liftIO $ Cache.lookup se sk + case fromDynamic @(SessionData e p) <$> r of + Just v -> pure $ f <$> v + Nothing -> pure Nothing + + fetch upd de k fn = do + se <- asks (view envSessions) + let sk = newSKey @(SessionKey e p) k + let ddef = toDyn de + + r <- liftIO $ Cache.lookup se sk + + case r of + Just v -> pure $ fn $ fromMaybe de (fromDynamic @(SessionData e p) v ) + Nothing -> do + when upd $ liftIO $ Cache.insert se sk ddef + pure (fn de) + + update de k f = do + se <- asks (view envSessions) + val <- fetch @e @p True de k id + liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) + + expire k = do + se <- asks (view envSessions) + liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) + + +instance ( MonadIO m + , HasProtocol e p + , HasFabriq e (PeerM e m) + , Serialise (Encoded e) + ) => Request e p (PeerM e m) where + request p msg = do + let proto = protoId @e @p (Proxy @p) + pipe <- getFabriq @e + me <- ownPeer @e + let bs = serialise (AnyMessage @e proto (encode msg)) + sendTo pipe (To p) (From me) bs + +runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () +runPeerM s bus p f = do + + env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize + <*> liftIO (Cache.newCache (Just defCookieTimeout)) + + let de = view envDeferred env + as <- liftIO $ async $ runPipeline de + void $ runReaderT (fromPeerM f) env + void $ liftIO $ stopPipeline de + liftIO $ cancel as + +withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () +withPeerM env action = void $ runReaderT (fromPeerM action) env + +runProto :: forall e m . ( MonadIO m + , HasOwnPeer e m + , HasFabriq e m + , HasPeer e + , Serialise (Encoded e) + ) + => [AnyProtocol e (ResponseM e m)] + -> m () + +runProto hh = do + me <- ownPeer @e @m + pipe <- getFabriq + + -- defer <- newPipeline @(ResponseM e m ()) @m defProtoPipelineSize + + let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] + + let disp = Map.fromList resp + + forever $ do + + messages <- receive pipe (To me) + + for_ messages $ \(From pip, bs) -> do + + case deserialiseOrFail @(AnyMessage e) bs of + + Left _-> pure () + + Right (AnyMessage n msg) -> do + + case Map.lookup n disp of + Nothing -> pure () + + Just (AnyProtocol { protoDecode = decoder + , handle = h + }) -> maybe (pure ()) (runResponseM pip . h) (decoder msg) + +instance ( HasProtocol e p + , Serialise (Encoded e) + , MonadTrans (ResponseM e) + , HasStorage (PeerM e IO) + ) => Response e p (ResponseM e (PeerM e IO)) where + + thatPeer _ = asks (view answTo) + + deferred _ action = do + who <- asks (view answTo) + fab <- lift $ getFabriq @e + pip <- lift $ asks (view envDeferred) + ss <- lift getStorage + liftIO $ addJob pip $ runPeerM ss fab who (runResponseM who action) + + response msg = do + let proto = protoId @e @p (Proxy @p) + who <- asks (view answTo) + self <- lift $ ownPeer @e + fab <- lift $ getFabriq @e + let bs = serialise (AnyMessage @e proto (encode msg)) + sendTo fab (To who) (From self) bs + + + +instance ( MonadIO m + , HasProtocol e p + , Sessions e p m + , Eq (SessionKey e p) + , Typeable (SessionKey e p) + , Typeable (SessionData e p) + , Hashable (SessionKey e p) + ) => Sessions e p (ResponseM e m) where + + find k f = lift (find k f) + + fetch i d k f = lift (fetch i d k f) + + update d k f = lift (update d k f) + + expire k = lift (expire k) diff --git a/hbs2-core/lib/HBS2/Storage.hs b/hbs2-core/lib/HBS2/Storage.hs index 0d33132a..5f05f3a6 100644 --- a/hbs2-core/lib/HBS2/Storage.hs +++ b/hbs2-core/lib/HBS2/Storage.hs @@ -58,3 +58,5 @@ calcChunks s1 s2 = fmap (over _1 fromIntegral . over _2 fromIntegral) chu where chu = fmap (,s2) (takeWhile ( a pieces = 8192 class SimpleStorageExtra a where - putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString) => SimpleStorage h -> a -> IO MerkleHash + putAsMerkle :: forall h . (IsKey h, Hash h ~ Key h, Hashed h ByteString,Block ByteString~ByteString) => SimpleStorage h -> a -> IO MerkleHash readChunked :: MonadIO m => Handle -> Int -> S.Stream (S.Of ByteString) m () readChunked handle size = fuu diff --git a/hbs2-tests/test/Peer2Main.hs b/hbs2-tests/test/Peer2Main.hs index e2bf33b7..c96fed50 100644 --- a/hbs2-tests/test/Peer2Main.hs +++ b/hbs2-tests/test/Peer2Main.hs @@ -6,6 +6,7 @@ module Main where import HBS2.Actors import HBS2.Actors.ChunkWriter +import HBS2.Actors.Peer import HBS2.Clock import HBS2.Defaults import HBS2.Hash @@ -105,261 +106,6 @@ instance Ord (Peer e) => Default (BlockSizeSession e) where deriving stock instance Show (BlockSizeSession Fake) -class Monad m => HasOwnPeer e m where - ownPeer :: m (Peer e) - - -data AnyStorage = forall zu . Storage zu HbSync ByteString IO => AnyStorage zu - -instance (IsKey HbSync, Key HbSync ~ Hash HbSync) => Storage AnyStorage HbSync ByteString IO where - - putBlock (AnyStorage s) = putBlock s - enqueueBlock (AnyStorage s) = enqueueBlock s - getBlock (AnyStorage s) = getBlock s - getChunk (AnyStorage s) = getChunk s - hasBlock (AnyStorage s) = hasBlock s - -class HasStorage m where - getStorage :: m AnyStorage - -data Fabriq e = forall bus . (Serialise (Encoded e), Messaging bus e ByteString) => Fabriq bus - -class HasFabriq e m where - getFabriq :: m (Fabriq e) - -instance HasPeer e => Messaging (Fabriq e) e ByteString where - sendTo (Fabriq bus) = sendTo bus - receive (Fabriq bus) = receive bus - -data AnyMessage e = AnyMessage Integer (Encoded e) - deriving stock (Generic) - -instance Serialise (Encoded e) => Serialise (AnyMessage e) - - -data AnyProtocol e m = forall p . ( HasProtocol e p - , Response e p m - ) => - AnyProtocol - { myProtoId :: Integer - , protoDecode :: Encoded e -> Maybe p - , protoEncode :: p -> Encoded e - , handle :: p -> m () - } - -makeResponse :: forall e p m . ( MonadIO m - , Response e p m - , HasProtocol e p - ) - => (p -> m ()) -> AnyProtocol e m - -makeResponse h = AnyProtocol { myProtoId = natVal (Proxy @(ProtocolId p)) - , protoDecode = decode - , protoEncode = encode - , handle = h - } - -data PeerEnv e = - PeerEnv - { _envSelf :: Peer e - , _envFab :: Fabriq e - , _envStorage :: AnyStorage - , _envDeferred :: Pipeline IO () - , _envSessions :: Cache SKey Dynamic - } - -newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadReader (PeerEnv e) - , MonadIO - ) - - -newtype ResponseM e m a = ResponseM { fromResponse :: ReaderT (ResponseEnv e) m a } - deriving newtype ( Functor - , Applicative - , Monad - , MonadReader (ResponseEnv e) - , MonadIO - , MonadTrans - ) - -newtype ResponseEnv e = - ResponseEnv - { _answTo :: Peer e - } - -makeLenses 'PeerEnv - -makeLenses 'ResponseEnv - - -runResponseM :: forall e m . (Monad m) - => Peer e - -> ResponseM e m () - -> m () - -runResponseM peer f = runReaderT (fromResponse f) (ResponseEnv peer) - -instance Monad m => HasOwnPeer e (PeerM e m) where - ownPeer = asks (view envSelf) - -instance Monad m => HasFabriq e (PeerM e m) where - getFabriq = asks (view envFab) - -instance Monad m => HasStorage (PeerM e m) where - getStorage = asks (view envStorage) - - -instance ( MonadIO m - , HasProtocol e p - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (PeerM e m) where - - - find k f = do - se <- asks (view envSessions) - let sk = newSKey @(SessionKey e p) k - r <- liftIO $ Cache.lookup se sk - case fromDynamic @(SessionData e p) <$> r of - Just v -> pure $ f <$> v - Nothing -> pure Nothing - - fetch upd de k fn = do - se <- asks (view envSessions) - let sk = newSKey @(SessionKey e p) k - let ddef = toDyn de - - r <- liftIO $ Cache.lookup se sk - - case r of - Just v -> pure $ fn $ fromMaybe de (fromDynamic @(SessionData e p) v ) - Nothing -> do - when upd $ liftIO $ Cache.insert se sk ddef - pure (fn de) - - update de k f = do - se <- asks (view envSessions) - val <- fetch @e @p True de k id - liftIO $ Cache.insert se (newSKey @(SessionKey e p) k) (toDyn (f val)) - - expire k = do - se <- asks (view envSessions) - liftIO $ Cache.delete se (newSKey @(SessionKey e p) k) - - -instance ( MonadIO m - , HasProtocol e p - , HasFabriq e (PeerM e m) - , Serialise (Encoded e) - ) => Request e p (PeerM e m) where - request p msg = do - let proto = protoId @e @p (Proxy @p) - pipe <- getFabriq @e - me <- ownPeer @e - let bs = serialise (AnyMessage @e proto (encode msg)) - sendTo pipe (To p) (From me) bs - -runPeerM :: MonadIO m => AnyStorage -> Fabriq e -> Peer e -> PeerM e m a -> m () -runPeerM s bus p f = do - - env <- PeerEnv p bus s <$> newPipeline defProtoPipelineSize - <*> liftIO (Cache.newCache (Just defCookieTimeout)) - - let de = view envDeferred env - as <- liftIO $ async $ runPipeline de - void $ runReaderT (fromPeerM f) env - void $ liftIO $ stopPipeline de - liftIO $ cancel as - -withPeerM :: MonadIO m => PeerEnv e -> PeerM e m a -> m () -withPeerM env action = void $ runReaderT (fromPeerM action) env - -runProto :: forall e m . ( MonadIO m - , HasOwnPeer e m - , HasFabriq e m - , HasPeer e - , Serialise (Encoded e) - ) - => [AnyProtocol e (ResponseM e m)] - -> m () - -runProto hh = do - me <- ownPeer @e @m - pipe <- getFabriq - - -- defer <- newPipeline @(ResponseM e m ()) @m defProtoPipelineSize - - let resp = [ (pid, a) | a@AnyProtocol { myProtoId = pid } <- hh ] - - let disp = Map.fromList resp - - forever $ do - - messages <- receive pipe (To me) - - for_ messages $ \(From pip, bs) -> do - - case deserialiseOrFail @(AnyMessage e) bs of - - Left _-> pure () - - Right (AnyMessage n msg) -> do - - case Map.lookup n disp of - Nothing -> pure () - - Just (AnyProtocol { protoDecode = decoder - , handle = h - }) -> maybe (pure ()) (runResponseM pip . h) (decoder msg) - -instance ( HasProtocol e p - , Serialise (Encoded e) - , MonadTrans (ResponseM e) - , HasStorage (PeerM e IO) - ) => Response e p (ResponseM e (PeerM e IO)) where - - thatPeer _ = asks (view answTo) - - deferred _ action = do - who <- asks (view answTo) - fab <- lift $ getFabriq @e - pip <- lift $ asks (view envDeferred) - ss <- lift getStorage - liftIO $ addJob pip $ runPeerM ss fab who (runResponseM who action) - - response msg = do - let proto = protoId @e @p (Proxy @p) - who <- asks (view answTo) - self <- lift $ ownPeer @e - fab <- lift $ getFabriq @e - let bs = serialise (AnyMessage @e proto (encode msg)) - sendTo fab (To who) (From self) bs - - - -instance ( MonadIO m - , HasProtocol e p - , Sessions e p m - , Eq (SessionKey e p) - , Typeable (SessionKey e p) - , Typeable (SessionData e p) - , Hashable (SessionKey e p) - ) => Sessions e p (ResponseM e m) where - - find k f = lift (find k f) - - fetch i d k f = lift (fetch i d k f) - - update d k f = lift (update d k f) - - expire k = lift (expire k) - runTestPeer :: Peer Fake -> (SimpleStorage HbSync -> IO ())