From 0e1959dfe588cf719c4b8ed8ce3cdb73e67a1763 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 15 Jul 2023 06:56:59 +0300 Subject: [PATCH] wip, tryin to fetch refchan head --- hbs2-core/lib/HBS2/Actors/Peer.hs | 4 ++++ hbs2-core/lib/HBS2/Actors/Peer/Types.hs | 6 +++++ hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 30 ++++++++++++++++++++----- hbs2-core/lib/HBS2/Net/Proto/Types.hs | 4 ++++ hbs2-peer/app/Brains.hs | 7 +++++- hbs2-peer/app/CLI/RefChan.hs | 10 ++++++++- hbs2-peer/app/PeerMain.hs | 7 ++++++ hbs2-peer/app/RPC.hs | 12 ++++++++++ 8 files changed, 73 insertions(+), 7 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 9d348941..eec945b2 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -513,3 +513,7 @@ instance (Monad m, HasPeerNonce e m) => HasPeerNonce e (ResponseM e m) where instance (Monad m, HasPeerLocator e m) => HasPeerLocator e (ResponseM e m) where getPeerLocator = lift getPeerLocator +instance (Monad m, HasStorage m) => HasStorage (ResponseM e m) where + getStorage = lift getStorage + + diff --git a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs index ba067fb4..1d959a5d 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs @@ -3,6 +3,8 @@ module HBS2.Actors.Peer.Types where import HBS2.Storage import HBS2.Hash +import Control.Monad.Trans.Class +import Control.Monad.Trans.Maybe import Data.ByteString.Lazy (ByteString) @@ -26,3 +28,7 @@ class HasStorage m where getStorage :: m AnyStorage +instance (Monad m, HasStorage m) => HasStorage (MaybeT m) where + getStorage = lift getStorage + + diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 783d2695..c187a7ed 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -11,8 +11,11 @@ import HBS2.Net.Auth.Credentials import HBS2.Base58 -- import HBS2.Events import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.BlockAnnounce import HBS2.Net.Proto.Sessions import HBS2.Data.Types.Refs +import HBS2.Actors.Peer.Types +import HBS2.Storage import Data.Config.Suckless @@ -111,11 +114,16 @@ data RefChanHeadAdapter e m = refChanHeadProto :: forall e s m . ( MonadIO m , Request e (RefChanHead e) m + , Request e (BlockAnnounce e) m , Response e (RefChanHead e) m + , HasPeerNonce e m + , HasDeferred e (RefChanHead e) m , IsPeerAddr e m , Pretty (Peer e) , Sessions e (KnownPeer e) m + , HasStorage m , Signatures s + , IsRefPubKey s , Pretty (AsBase58 (PubKey 'Sign s)) , s ~ Encryption e ) @@ -126,10 +134,12 @@ refChanHeadProto :: forall e s m . ( MonadIO m refChanHeadProto self adapter msg = do -- авторизовать пира - peer <- thatPeer (Proxy @(RefChanHead e)) + peer <- thatPeer proto auth <- find (KnownPeerKey peer) id <&> isJust + no <- peerNonce @e + void $ runMaybeT do guard (auth || self) @@ -141,10 +151,20 @@ refChanHeadProto self adapter msg = do -- FIXME: check-chan-is-listened lift $ refChanHeadOnHead adapter chan pkt - RefChanGetHead _ -> do - -- прочитать ссылку - -- послать хэш головы - pure () + RefChanGetHead chan -> deferred proto do + trace $ "RefChanGetHead" <+> pretty self <+> pretty (AsBase58 chan) + + sto <- getStorage + ref <- MaybeT $ liftIO $ getRef sto (RefChanHeadKey @s chan) + sz <- MaybeT $ liftIO $ hasBlock sto ref + + let annInfo = BlockAnnounceInfo 0 NoBlockInfoMeta sz ref + let announce = BlockAnnounce @e no annInfo + lift $ request peer announce + lift $ request peer (RefChanHead @e chan (RefChanHeadBlockTran (HashRef ref))) + + where + proto = Proxy @(RefChanHead e) makeSignedBox :: forall e p . (Serialise p, ForRefChans e, Signatures (Encryption e)) => PubKey 'Sign (Encryption e) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index 83a10916..aa4d622f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -23,6 +23,7 @@ import Network.Socket import System.Random qualified as Random import Codec.Serialise import Data.Maybe +import Control.Monad.Trans.Maybe -- e -> Transport (like, UDP or TChan) -- p -> L4 Protocol (like Ping/Pong) @@ -92,6 +93,9 @@ class (Monad m, HasProtocol e p) => HasThatPeer e p (m :: Type -> Type) where class (MonadIO m, HasProtocol e p) => HasDeferred e p m | p -> e where deferred :: Proxy p -> m () -> m () +-- TODO: actually-no-idea-if-it-works +instance (HasDeferred e p m, Monad m) => HasDeferred e p (MaybeT m) where + deferred p a = lift $ deferred p (void $ runMaybeT a) class ( MonadIO m , HasProtocol e p diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index d28dbb65..e265e9c8 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -6,6 +6,7 @@ module Brains where import HBS2.Prelude.Plated import HBS2.Clock import HBS2.Data.Types.Refs +import HBS2.Net.Proto.RefChan(ForRefChans) import HBS2.Net.Proto import HBS2.Hash import HBS2.Base58 @@ -201,6 +202,7 @@ cleanupPostponed b h = do instance ( Hashable (Peer e) , Pretty (Peer e), Pretty (PeerAddr e) , e ~ L4Proto + , ForRefChans e ) => HasBrains e (BasicBrains e) where onClientTCPConnected br pa@(L4Address proto _) ssid = do @@ -736,7 +738,10 @@ newBasicBrains cfg = liftIO do <*> newTQueueIO <*> newTQueueIO -runBasicBrains :: forall e m . ( e ~ L4Proto, MonadUnliftIO m ) +runBasicBrains :: forall e m . ( e ~ L4Proto + , MonadUnliftIO m + , ForRefChans e + ) => PeerConfig -> BasicBrains e -> m () diff --git a/hbs2-peer/app/CLI/RefChan.hs b/hbs2-peer/app/CLI/RefChan.hs index cd85dc84..cdc5d084 100644 --- a/hbs2-peer/app/CLI/RefChan.hs +++ b/hbs2-peer/app/CLI/RefChan.hs @@ -26,6 +26,7 @@ pRefChanHead :: Parser (IO ()) pRefChanHead = hsubparser ( command "gen" (info pRefChanHeadGen (progDesc "generate head blob")) <> command "dump" (info pRefChanHeadDump (progDesc "dump head blob")) <> command "post" (info pRefChanHeadPost (progDesc "post head transaction")) + <> command "fetch" (info pRefChanHeadFetch (progDesc "fetch head from neighbours")) <> command "get" (info pRefChanHeadGet (progDesc "get head value")) ) @@ -66,7 +67,6 @@ pRpcCommon = do RPCOpt <$> optional confOpt <*> optional rpcOpt - pRefChanHeadPost :: Parser (IO ()) pRefChanHeadPost = do opts <- pRpcCommon @@ -75,6 +75,14 @@ pRefChanHeadPost = do href <- pure (fromStringMay ref) `orDie` "HEAD-BLOCK-TREE-HASH" runRpcCommand opts (REFCHANHEADSEND href) +pRefChanHeadFetch :: Parser (IO ()) +pRefChanHeadFetch = do + opts <- pRpcCommon + ref <- strArgument (metavar "REFCHAH-HEAD-REF") + pure $ do + href <- pure (fromStringMay ref) `orDie` "invalid REFCHAN-HEAD-REF" + runRpcCommand opts (REFCHANHEADFETCH href) + pRefChanHeadGet :: Parser (IO ()) pRefChanHeadGet = do diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index e1f1fd35..46065663 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -890,6 +890,7 @@ runPeer opts = U.handle (\e -> myException e , makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (peerMetaProto (mkPeerMeta conf)) + , makeResponse (refChanHeadProto False refChanHeadAdapter) ] void $ liftIO $ waitAnyCancel workers @@ -1001,6 +1002,11 @@ runPeer opts = U.handle (\e -> myException e h <- liftIO $ getRef sto (RefChanHeadKey @(Encryption e) puk) request who (RPCRefChanHeadGetAnsw @e h) + let refChanHeadFetchAction puk = do + trace "reChanFetchAction" + void $ liftIO $ async $ withPeerM penv $ do + broadCastMessage (RefChanGetHead @e puk) + let arpc = RpcAdapter pokeAction dieAction dontHandle @@ -1019,6 +1025,7 @@ runPeer opts = U.handle (\e -> myException e refChanHeadSendAction -- rpcOnRefChanHeadSend refChanHeadGetAction dontHandle + refChanHeadFetchAction rpc <- async $ runRPC udp1 do runProto @e diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 19579ac8..c4c84c22 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -63,6 +63,7 @@ data RPCCommand = | REFLOGGET (PubKey 'Sign (Encryption L4Proto)) | REFCHANHEADSEND (Hash HbSync) | REFCHANHEADGET (PubKey 'Sign (Encryption L4Proto)) + | REFCHANHEADFETCH (PubKey 'Sign (Encryption L4Proto)) data RPC e = RPCDie @@ -83,6 +84,7 @@ data RPC e = | RPCRefChanHeadSend (Hash HbSync) | RPCRefChanHeadGet (PubKey 'Sign (Encryption e)) | RPCRefChanHeadGetAnsw (Maybe (Hash HbSync)) + | RPCRefChanHeadFetch (PubKey 'Sign (Encryption e)) deriving stock (Generic) @@ -123,6 +125,7 @@ data RpcAdapter e m = , rpcOnRefChanHeadSend :: Hash HbSync -> m () , rpcOnRefChanHeadGet :: PubKey 'Sign (Encryption e) -> m () , rpcOnRefChanHeadGetAnsw :: Maybe (Hash HbSync) -> m () + , rpcOnRefChanHeadFetch :: PubKey 'Sign (Encryption e) -> m () } newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a } @@ -181,6 +184,7 @@ rpcHandler adapter = \case (RPCRefChanHeadSend s) -> rpcOnRefChanHeadSend adapter s (RPCRefChanHeadGet s) -> rpcOnRefChanHeadGet adapter s (RPCRefChanHeadGetAnsw s) -> rpcOnRefChanHeadGetAnsw adapter s + (RPCRefChanHeadFetch s) -> rpcOnRefChanHeadFetch adapter s data RPCOpt = RPCOpt @@ -205,6 +209,7 @@ runRpcCommand opt = \case REFLOGGET k -> withRPC opt (RPCRefLogGet k) REFCHANHEADSEND h -> withRPC opt (RPCRefChanHeadSend h) REFCHANHEADGET s -> withRPC opt (RPCRefChanHeadGet s) + REFCHANHEADFETCH s -> withRPC opt (RPCRefChanHeadFetch s) _ -> pure () @@ -266,6 +271,9 @@ withRPC o cmd = rpcClientMain o $ runResourceT do (liftIO . putMVar rchanheadMVar) -- rpcOnRefChanHeadGetAnsw + dontHandle -- rpcOnRefChanHeadFetch + + prpc <- async $ runRPC udp1 do env <- ask proto <- liftIO $ async $ continueWithRPC env $ do @@ -341,6 +349,10 @@ withRPC o cmd = rpcClientMain o $ runResourceT do _ -> exitFailure + RPCRefChanHeadFetch {} -> liftIO do + pause @'Seconds 0.25 + exitSuccess + _ -> pure () void $ liftIO $ waitAnyCancel [proto]