From 819cec6402ae7b39be2d70fa003f59cabfcf9546 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Wed, 19 Jul 2023 08:06:46 +0300 Subject: [PATCH] wip, refchanget proto skeleton, no log merge --- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 14 +++- hbs2-core/lib/HBS2/Net/Proto/RefChan.hs | 85 +++++++++++++++++++--- hbs2-peer/app/CLI/RefChan.hs | 8 ++ hbs2-peer/app/PeerMain.hs | 8 +- 4 files changed, 104 insertions(+), 11 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 5f920387..7fd02c68 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -139,7 +139,19 @@ instance HasProtocol L4Proto (RefChanUpdate L4Proto) where -- мы не можем рассылать одинаковые сообщения никогда, -- ну или хотя бы не чаще, чем раз в 10 минут. - requestPeriodLim = ReqLimPerMessage 600 + -- но poll у нас в минутах, и с минимальным периодом 1 минута + requestPeriodLim = ReqLimPerMessage 60 + +instance HasProtocol L4Proto (RefChanRequest L4Proto) where + type instance ProtocolId (RefChanRequest L4Proto) = 11003 + type instance Encoded L4Proto = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + + -- мы не можем рассылать одинаковые сообщения никогда, + -- ну или хотя бы не чаще, чем раз в 10 минут. + -- но poll у нас в минутах, и с минимальным периодом 1 минута + requestPeriodLim = ReqLimPerMessage 60 instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs index 5107913b..75a938a7 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/RefChan.hs @@ -206,11 +206,17 @@ instance Expires (EventKey e (RefChanRound e)) where data RefChanUpdate e = Propose (RefChanId e) (SignedBox (ProposeTran e) e) -- подписано ключом пира | Accept (RefChanId e) (SignedBox (AcceptTran e) e) -- подписано ключом пира - deriving stock (Generic) instance ForRefChans e => Serialise (RefChanUpdate e) +data RefChanRequest e = + RefChanRequest (RefChanId e) + | RefChanResponse (RefChanId e) HashRef + deriving stock (Generic) + +instance ForRefChans e => Serialise (RefChanRequest e) + type instance SessionData e (RefChanHeadBlock e) = RefChanHeadBlock e newtype instance SessionKey e (RefChanHeadBlock e) = @@ -234,10 +240,18 @@ data RefChanAdapter e m = , refChanWriteTran :: HashRef -> m () } -refChanUpdateChan :: RefChanUpdate e -> RefChanId e -refChanUpdateChan = \case - Propose c _ -> c - Accept c _ -> c +class HasRefChanId e p | p -> e where + getRefChanId :: p -> RefChanId e + +instance HasRefChanId e (RefChanUpdate e) where + getRefChanId = \case + Propose c _ -> c + Accept c _ -> c + +instance HasRefChanId e (RefChanRequest e) where + getRefChanId = \case + RefChanRequest c -> c + RefChanResponse c _ -> c refChanHeadProto :: forall e s m . ( MonadIO m , Request e (RefChanHead e) m @@ -348,7 +362,7 @@ refChanUpdateProto self pc adapter msg = do -- "блок". -- так-то и количество proposers можно ограничить - guard =<< lift (refChanSubscribed adapter (refChanUpdateChan msg)) + guard =<< lift (refChanSubscribed adapter (getRefChanId msg)) let h0 = hashObject @HbSync (serialise msg) guard =<< liftIO (hasBlock sto h0 <&> isNothing) @@ -356,9 +370,6 @@ refChanUpdateProto self pc adapter msg = do case msg of Propose chan box -> do - let h0 = hashObject @HbSync (serialise msg) - guard =<< liftIO (hasBlock sto h0 <&> isNothing) - debug $ "RefChanUpdate/Propose" <+> pretty h0 deferred proto do @@ -534,6 +545,62 @@ refChanUpdateProto self pc adapter msg = do -- Пишем в итоговый лог только такие -- propose + accept у которых больше quorum accept -- каждую транзу обрабатываем только один раз +-- + +refChanRequestProto :: forall e s m . ( MonadIO m + , Request e (RefChanRequest e) m + , Response e (RefChanRequest e) m + , HasDeferred e (RefChanRequest e) m + , IsPeerAddr e m + , Pretty (Peer e) + , Sessions e (KnownPeer e) m + , Sessions e (RefChanHeadBlock e) m + , HasStorage m + , Signatures s + , IsRefPubKey s + , Pretty (AsBase58 (PubKey 'Sign s)) + -- , Serialise (Signature s) + , ForRefChans e + , s ~ Encryption e + ) + => Bool + -> RefChanAdapter e m + -> RefChanRequest e + -> m () + +refChanRequestProto self adapter msg = do + + peer <- thatPeer proto + + auth' <- find (KnownPeerKey peer) id + + sto <- getStorage + + void $ runMaybeT do + + guard (self || isJust auth') + + auth <- MaybeT $ pure auth' + + guard =<< lift (refChanSubscribed adapter (getRefChanId @e msg)) + + case msg of + + RefChanRequest chan -> do + rv <- MaybeT $ liftIO $ getRef sto (RefChanLogKey @s chan) + lift $ response @e (RefChanResponse @e chan (HashRef rv)) + + RefChanResponse chan val -> do + hd <- MaybeT $ getActualRefChanHead @e (RefChanHeadKey @s chan) + let ppk = view peerSignKey auth + + guard $ ppk `HashMap.member` view refChanHeadPeers hd + + debug $ "RefChanResponse" <+> pretty peer <+> pretty (AsBase58 chan) <+> pretty val + + where + proto = Proxy @(RefChanRequest e) + getActualRefChanHead :: forall e s m . ( MonadIO m , Sessions e (RefChanHeadBlock e) m diff --git a/hbs2-peer/app/CLI/RefChan.hs b/hbs2-peer/app/CLI/RefChan.hs index 9d86d4c4..83a63b25 100644 --- a/hbs2-peer/app/CLI/RefChan.hs +++ b/hbs2-peer/app/CLI/RefChan.hs @@ -22,6 +22,7 @@ import Data.Maybe pRefChan :: Parser (IO ()) pRefChan = hsubparser ( command "head" (info pRefChanHead (progDesc "head commands" )) <> command "propose" (info pRefChanPropose (progDesc "post propose transaction")) + <> command "fetch" (info pRefChanFetch (progDesc "fetch and sync refchan value")) <> command "get" (info pRefChanGet (progDesc "get refchan value")) ) @@ -127,5 +128,12 @@ pRefChanGet = do puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" runRpcCommand opts (REFCHANGET puk) +pRefChanFetch :: Parser (IO ()) +pRefChanFetch = do + opts <- pRpcCommon + sref <- strArgument (metavar "REFCHAH-REF") + pure do + puk <- pure (fromStringMay @(RefChanId L4Proto) sref) `orDie` "can't parse refchan/public key" + runRpcCommand opts (REFCHANFETCH puk) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index bbcb24a2..4c8b0735 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -893,6 +893,7 @@ runPeer opts = U.handle (\e -> myException e , makeResponse (peerMetaProto (mkPeerMeta conf)) , makeResponse (refChanHeadProto False refChanAdapter) , makeResponse (refChanUpdateProto False pc refChanAdapter) + , makeResponse (refChanRequestProto False refChanAdapter) ] void $ liftIO $ waitAnyCancel workers @@ -1034,6 +1035,11 @@ runPeer opts = U.handle (\e -> myException e trace $ "refChanGetAction ANSWER IS" <+> pretty h request who (RPCRefChanGetAnsw @e h) + let refChanFetchAction puk = do + trace $ "refChanFetchAction" <+> pretty (AsBase58 puk) + void $ liftIO $ async $ withPeerM penv $ do + gossip (RefChanRequest @e puk) + let arpc = RpcAdapter pokeAction dieAction dontHandle @@ -1054,7 +1060,7 @@ runPeer opts = U.handle (\e -> myException e dontHandle refChanHeadFetchAction - dontHandle -- rpcOnRefChanFetch + refChanFetchAction refChanGetAction dontHandle -- rpcOnRefChanGetAnsw