From d5d7c6fbb7d2e7bfc4869814d2c0a8696b2ec6bb Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 4 Feb 2023 14:11:05 +0300 Subject: [PATCH] added peer ping loop to remove inactive peers --- hbs2-core/lib/HBS2/Net/PeerLocator.hs | 2 + hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs | 2 + hbs2-peer/app/BlockDownload.hs | 5 +- hbs2-peer/app/PeerInfo.hs | 52 ++++++++++++++++++-- hbs2-peer/app/PeerMain.hs | 17 +++++-- 5 files changed, 68 insertions(+), 10 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/PeerLocator.hs b/hbs2-core/lib/HBS2/Net/PeerLocator.hs index 5073d5a5..9ddd8114 100644 --- a/hbs2-core/lib/HBS2/Net/PeerLocator.hs +++ b/hbs2-core/lib/HBS2/Net/PeerLocator.hs @@ -7,11 +7,13 @@ import HBS2.Net.Proto.Types class PeerLocator e l where knownPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] addPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () + delPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () data AnyPeerLocator e = forall a . PeerLocator e a => AnyPeerLocator a instance HasPeer e => PeerLocator e (AnyPeerLocator e) where knownPeers (AnyPeerLocator l) = knownPeers l addPeers (AnyPeerLocator l) = addPeers l + delPeers (AnyPeerLocator l) = addPeers l diff --git a/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs b/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs index 2becc66b..0ffe81d4 100644 --- a/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs +++ b/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs @@ -26,5 +26,7 @@ instance Ord (Peer e) => PeerLocator e (StaticPeerLocator e) where addPeers (StaticPeerLocator peers) new = do liftIO $ atomically $ modifyTVar' peers (<> Set.fromList new) + delPeers (StaticPeerLocator peers) del = do + liftIO $ atomically $ modifyTVar' peers (`Set.difference` Set.fromList del) diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index f102b22c..644b93c1 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -314,7 +314,10 @@ downloadFromWithPeer peer thisBkSize h = do ) if not (null catched) then do - liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN) + liftIO $ atomically do + modifyTVar (view peerDownloaded pinfo) (+chunksN) + writeTVar (view peerPingFailed pinfo) 0 + else do liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 365419be..ae8a2c65 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -1,15 +1,24 @@ {-# Language TemplateHaskell #-} +{-# Language AllowAmbiguousTypes #-} module PeerInfo where -import HBS2.Prelude.Plated -import HBS2.Net.Proto.Sessions -import HBS2.Net.Messaging.UDP +import HBS2.Actors.Peer import HBS2.Clock import HBS2.Defaults +import HBS2.Net.Messaging.UDP +import HBS2.Net.PeerLocator +import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.Sessions +import HBS2.Net.Proto.Types +import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple +import Data.Foldable import Lens.Micro.Platform import Control.Concurrent.STM.TVar - +import Control.Concurrent.STM +import Control.Monad +import Prettyprinter data PeerInfo e = PeerInfo @@ -20,6 +29,7 @@ data PeerInfo e = , _peerLastWatched :: TVar TimeSpec , _peerDownloaded :: TVar Int , _peerDownloadedLast :: TVar Int + , _peerPingFailed :: TVar Int } deriving stock (Generic,Typeable) @@ -35,6 +45,7 @@ newPeerInfo = liftIO do <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 + <*> newTVarIO 0 type instance SessionData e (PeerInfo e) = PeerInfo e @@ -48,3 +59,36 @@ instance Expires (SessionKey UDP (PeerInfo UDP)) where expiresIn = const (Just 600) +peerPingLoop :: forall e m . ( HasPeerLocator e m + , HasPeer e + , HasNonces (PeerHandshake e) m + , Nonce (PeerHandshake e) ~ PingNonce + , Request e (PeerHandshake e) m + , Sessions e (PeerHandshake e) m + , Sessions e (PeerInfo e) m + , Pretty (Peer e) + , MonadIO m + ) + => m () +peerPingLoop = forever do + pause @'Minutes 2 -- FIXME: defaults + debug "peerPingLoop" + + pl <- getPeerLocator @e + pips <- knownPeers @e pl + + for_ pips $ \p -> do + npi <- newPeerInfo + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + liftIO $ atomically $ modifyTVar pfails succ + sendPing @e p + pause @'Seconds 2 -- NOTE: it's okay? + + fnum <- liftIO $ readTVarIO pfails + + when (fnum > 3) do -- FIXME: hardcode! + warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" + delPeers pl [p] + expire (PeerInfoKey p) + + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 1da30d2a..f45b9a2c 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -28,6 +28,7 @@ import HBS2.System.Logger.Simple qualified as Log import RPC import BlockDownload +import PeerInfo import Data.Maybe import Crypto.Saltine (sodiumInit) @@ -87,10 +88,10 @@ main = do sodiumInit setLogging @DEBUG (set loggerTr ("[debug] " <>)) - setLogging @INFO defLog - setLogging @ERROR defLog - setLogging @WARN defLog - setLogging @NOTICE defLog + setLogging @INFO (set loggerTr ("[info] " <>)) + setLogging @ERROR (set loggerTr ("[error] " <>)) + setLogging @WARN (set loggerTr ("[warn] " <>)) + setLogging @NOTICE (set loggerTr ("[notice] " <>)) withSimpleLogger runCLI @@ -293,10 +294,14 @@ runPeer opts = Exception.handle myException $ do subscribe @UDP AnyKnownPeerEventKey $ \(KnownPeerEvent p d) -> do addPeers pl [p] + + npi <- newPeerInfo + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + liftIO $ atomically $ writeTVar pfails 0 + debug $ "Got authorized peer!" <+> pretty p <+> pretty (AsBase58 (view peerSignKey d)) - void $ liftIO $ async $ withPeerM env do pause @'Seconds 1 debug "sending first peer announce" @@ -307,6 +312,8 @@ runPeer opts = Exception.handle myException $ do debug "sending local peer announce" request localMulticast (PeerAnnounce @UDP pnonce) + as <- liftIO $ async $ withPeerM env (peerPingLoop @UDP) + as <- liftIO $ async $ withPeerM env (blockDownloadLoop denv) rpc <- liftIO $ async $ withPeerM env $ forever $ do