From fbf8bd27fbf1454cfd212bf3a6962270b2046028 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 4 Feb 2023 13:14:19 +0300 Subject: [PATCH] rpc ping right way --- hbs2-core/lib/HBS2/Actors/Peer.hs | 3 +- hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 21 ++++++++++-- hbs2-peer/app/BlockDownload.hs | 26 +++------------ hbs2-peer/app/PeerMain.hs | 49 ++++++++++++++++++---------- hbs2-peer/app/RPC.hs | 5 ++- 5 files changed, 60 insertions(+), 44 deletions(-) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index fbd2c305..1ed45e6b 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -247,8 +247,7 @@ instance ( MonadIO m sendTo pipe (To p) (From me) (AnyMessage @(Encoded e) @e proto (encode msg)) -instance ( HasProtocol e p - , Typeable (EventHandler e p (PeerM e IO)) +instance ( Typeable (EventHandler e p (PeerM e IO)) , Typeable (EventKey e p) , Typeable (Event e p) , Hashable (EventKey e p) diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index 8f15cf4b..b990d919 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -77,6 +77,7 @@ peerHandShakeProto :: forall e m . ( MonadIO m , Pretty (Peer e) , HasCredentials e m , EventEmitter e (PeerHandshake e) m + , EventEmitter e (ConcretePeer e) m ) => PeerHandshake e -> m () @@ -115,14 +116,27 @@ peerHandShakeProto = update (KnownPeer d) (KnownPeerKey pip) id - emit KnownPeerEventKey (KnownPeerEvent pip d) + emit AnyKnownPeerEventKey (KnownPeerEvent pip d) + emit (ConcretePeerKey pip) (ConcretePeerData pip d) where proto = Proxy @(PeerHandshake e) +data ConcretePeer e = ConcretePeer + +newtype instance EventKey e (ConcretePeer e) = + ConcretePeerKey (Peer e) + deriving stock (Generic) + +deriving stock instance (Eq (Peer e)) => Eq (EventKey e (ConcretePeer e)) +instance (Hashable (Peer e)) => Hashable (EventKey e (ConcretePeer e)) + +data instance Event e (ConcretePeer e) = + ConcretePeerData (Peer e) (PeerData e) + deriving stock (Typeable) data instance EventKey e (PeerHandshake e) = - KnownPeerEventKey + AnyKnownPeerEventKey deriving stock (Typeable, Eq,Generic) data instance Event e (PeerHandshake e) = @@ -140,6 +154,9 @@ instance EventType ( Event e ( PeerHandshake e) ) where instance Expires (EventKey e (PeerHandshake e)) where expiresIn _ = Nothing +instance Expires (EventKey e (ConcretePeer e)) where + expiresIn _ = Just 10 + instance Hashable (Peer e) => Hashable (EventKey e (PeerHandshake e)) deriving instance Eq (Peer e) => Eq (SessionKey e (KnownPeer e)) diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 02560827..f102b22c 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -226,13 +226,7 @@ processBlock h = do if here then do debug $ "block" <+> pretty blk <+> "is already here" processBlock blk -- NOTE: хуже не стало - -- FIXME: processBlock h - -- может быть, в этом причина того, - -- что мы периодически не докачиваем? - -- - -- может быть, нужно рекурсировать, что бы - -- посмотреть, что это за блок и что у нас - -- из него есть? + -- FIXME: fugure out if it's really required pure () -- we don't need to recurse, cause walkMerkle is recursing for us @@ -401,13 +395,12 @@ updatePeerInfo pinfo = do let bu1 = if down - downLast > 0 then max 1 $ min defBurstMax - $ ceiling $ if eps == 0 then - realToFrac bu * 1.05 -- FIXME: to defaults + ceiling $ realToFrac bu * 1.05 -- FIXME: to defaults else - realToFrac bu * 0.65 + floor $ realToFrac bu * 0.65 else - max defBurst $ ceiling (realToFrac bu * 0.65) + max defBurst $ floor (realToFrac bu * 0.65) writeTVar (view peerErrorsLast pinfo) errs writeTVar (view peerLastWatched pinfo) t1 @@ -528,17 +521,6 @@ blockDownloadLoop env0 = do p <- knownPeers @e pl >>= liftIO . shuffleM - -- FIXME: нам не повезло с пиром => сидим ждём defBlockWaitMax и скачивание - -- простаивает. - -- - -- Нужно: сначала запросить всех у кого есть блок. - -- Потом выбрать победителей и попытаться скачать - -- у них, запомнив размер в кэше. - -- - -- Когда находим блоки -- то сразу же асинхронно запрашиваем - -- размеры, что бы по приходу сюда они уже были - - -- debug $ "known peers" <+> pretty p -- debug $ "peers/blocks" <+> pretty peers diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 046ce81b..1da30d2a 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -63,7 +63,7 @@ defLocalMulticast = "239.192.152.145:10153" data RPCCommand = POKE | ANNOUNCE (Hash HbSync) - | PING (PeerAddr UDP) + | PING (PeerAddr UDP) (Maybe (Peer UDP)) | CHECK PeerNonce (PeerAddr UDP) (Hash HbSync) | FETCH (Hash HbSync) @@ -155,7 +155,7 @@ runCLI = join . customExecParser (prefs showHelpOnError) $ pPing = do rpc <- pRpcCommon h <- strArgument ( metavar "ADDR" ) - pure $ runRpcCommand rpc (PING h) + pure $ runRpcCommand rpc (PING h Nothing) myException :: SomeException -> IO () myException e = die ( show e ) >> exitFailure @@ -291,7 +291,7 @@ runPeer opts = Exception.handle myException $ do known <- find (KnownPeerKey pip) id <&> isJust unless known $ sendPing pip - subscribe @UDP KnownPeerEventKey $ \(KnownPeerEvent p d) -> do + subscribe @UDP AnyKnownPeerEventKey $ \(KnownPeerEvent p d) -> do addPeers pl [p] debug $ "Got authorized peer!" <+> pretty p <+> pretty (AsBase58 (view peerSignKey d)) @@ -314,9 +314,15 @@ runPeer opts = Exception.handle myException $ do case cmd of POKE -> debug "on poke: alive and kicking!" - PING s -> do - debug $ "ping" <+> pretty s - pip <- fromPeerAddr @UDP s + PING pa r -> do + debug $ "ping" <+> pretty pa + pip <- fromPeerAddr @UDP pa + subscribe (ConcretePeerKey pip) $ \(ConcretePeerData{}) -> do + + maybe1 r (pure ()) $ \rpcPeer -> do + pinged <- toPeerAddr pip + request rpcPeer (RPCPong @UDP pinged) + sendPing pip ANNOUNCE h -> do @@ -372,7 +378,8 @@ runPeer opts = Exception.handle myException $ do liftIO $ atomically $ writeTQueue rpcQ (ANNOUNCE h) let pingAction pa = do - liftIO $ atomically $ writeTQueue rpcQ (PING pa) + that <- thatPeer (Proxy @(RPC UDP)) + liftIO $ atomically $ writeTQueue rpcQ (PING pa (Just that)) let fetchAction h = do debug $ "fetchAction" <+> pretty h @@ -383,6 +390,7 @@ runPeer opts = Exception.handle myException $ do dontHandle annAction pingAction + dontHandle fetchAction rpc <- async $ runRPC udp1 do @@ -427,7 +435,7 @@ emitToPeer :: ( MonadIO m emitToPeer env k e = liftIO $ withPeerM env (emit k e) withRPC :: String -> RPC UDP -> IO () -withRPC saddr cmd = withSimpleLogger do +withRPC saddr cmd = do as <- parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress) let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as @@ -438,11 +446,13 @@ withRPC saddr cmd = withSimpleLogger do mrpc <- async $ runMessagingUDP udp1 + pingQ <- newTQueueIO + prpc <- async $ runRPC udp1 do env <- ask proto <- liftIO $ async $ continueWithRPC env $ do runProto @UDP - [ makeResponse (rpcHandler adapter) + [ makeResponse (rpcHandler (adapter pingQ)) ] request rpc cmd @@ -450,10 +460,14 @@ withRPC saddr cmd = withSimpleLogger do case cmd of RPCAnnounce{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - RPCPing{} -> pause @'Seconds 0.1 >> liftIO exitSuccess - RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess + RPCPing{} -> do + void $ liftIO $ void $ race (pause @'Seconds 5 >> exitFailure) do + pa <- liftIO $ atomically $ readTQueue pingQ + notice $ "pong from" <+> pretty pa + exitSuccess + _ -> pure () void $ liftIO $ waitAnyCatchCancel [proto] @@ -461,16 +475,17 @@ withRPC saddr cmd = withSimpleLogger do void $ waitAnyCatchCancel [mrpc, prpc] where - adapter = RpcAdapter dontHandle - (const $ notice "alive-and-kicking" >> liftIO exitSuccess) - (const $ liftIO exitSuccess) - (const $ debug "wat?") - dontHandle + adapter q = RpcAdapter dontHandle + (const $ notice "alive-and-kicking" >> liftIO exitSuccess) + (const $ liftIO exitSuccess) + (const $ notice "ping?") + (liftIO . atomically . writeTQueue q) + dontHandle runRpcCommand :: String -> RPCCommand -> IO () runRpcCommand saddr = \case POKE -> withRPC saddr (RPCPoke @UDP) - PING s -> withRPC saddr (RPCPing s) + PING s _ -> withRPC saddr (RPCPing s) ANNOUNCE h -> withRPC saddr (RPCAnnounce @UDP h) FETCH h -> withRPC saddr (RPCFetch @UDP h) diff --git a/hbs2-peer/app/RPC.hs b/hbs2-peer/app/RPC.hs index 44a87d00..133f6b0e 100644 --- a/hbs2-peer/app/RPC.hs +++ b/hbs2-peer/app/RPC.hs @@ -16,6 +16,7 @@ import Lens.Micro.Platform data RPC e = RPCPoke | RPCPing (PeerAddr e) + | RPCPong (PeerAddr e) | RPCPokeAnswer | RPCAnnounce (Hash HbSync) | RPCFetch (Hash HbSync) @@ -45,6 +46,7 @@ data RpcAdapter e m = , rpcOnPokeAnswer :: RPC e -> m () , rpcOnAnnounce :: Hash HbSync -> m () , rpcOnPing :: PeerAddr e -> m () + , rpcOnPong :: PeerAddr e -> m () , rpcOnFetch :: Hash HbSync -> m () } @@ -86,6 +88,7 @@ rpcHandler adapter = \case p@RPCPoke{} -> rpcOnPoke adapter p >> response (RPCPokeAnswer @e) p@RPCPokeAnswer{} -> rpcOnPokeAnswer adapter p (RPCAnnounce h) -> rpcOnAnnounce adapter h - (RPCPing pa) -> rpcOnPing adapter pa + (RPCPing pa) -> rpcOnPing adapter pa + (RPCPong pa) -> rpcOnPong adapter pa (RPCFetch h) -> rpcOnFetch adapter h