From 61a44eb544ce012fff6bcb315992a411ddffaec9 Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Mon, 6 Feb 2023 11:24:07 +0300 Subject: [PATCH] basic PEX has glitches; it's needed to determine when same peer shows up under different address (like in case of NAT) --- docs/devlog.md | 24 ++++ hbs2-core/hbs2-core.cabal | 1 + hbs2-core/lib/HBS2/Actors/Peer.hs | 5 +- hbs2-core/lib/HBS2/Defaults.hs | 3 + hbs2-core/lib/HBS2/Net/PeerLocator.hs | 15 +- hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs | 35 +++-- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 13 ++ hbs2-core/lib/HBS2/Net/Proto/Peer.hs | 30 ++-- hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs | 137 ++++++++++++++++++ hbs2-peer/app/BlockDownload.hs | 35 +++-- hbs2-peer/app/PeerInfo.hs | 90 +++++++++--- hbs2-peer/app/PeerMain.hs | 139 +++++++++++-------- nix/peer/flake.lock | 13 +- nix/peer/flake.nix | 2 +- 14 files changed, 420 insertions(+), 122 deletions(-) create mode 100644 hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs diff --git a/docs/devlog.md b/docs/devlog.md index 85214f42..4970fef7 100644 --- a/docs/devlog.md +++ b/docs/devlog.md @@ -1,6 +1,30 @@ ## 2023-02-06 +Ну а так, базовый PEX заработал + +TODO: Добавлять пиров в KnownPeers + только после того, как они + пинганулись. Т.е пинговать + пиров, если их еще нет. + Не добавлять в KnownPeers до + того, как ответили на пинг. + +TODO: Научиться убирать дубликаты пиров. + Их можно распознать по PeerNonce, + но непонятно, какой из пиров + оставлять. + Иначе это будет реально большая + проблема при скачивании. + + +TODO: Убедиться, что subscribe на перманентное + событие НИКОГДА не вызывается в рекурсии. + Проверить ВСЕ subscribe. + Возможно, вставить проверки в рантайм. + Возможно, ограничить число таких событий + и ругаться в рантайме. + FIXME: При вычислении burst надо каким-то образом находить плато и не лезть выше него. diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 5f139390..407ec0f1 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -91,6 +91,7 @@ library , HBS2.Net.Proto.Definition , HBS2.Net.Proto.Peer , HBS2.Net.Proto.PeerAnnounce + , HBS2.Net.Proto.PeerExchange , HBS2.Net.Proto.Sessions , HBS2.Net.Proto.Types , HBS2.OrDie diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index fd5382ec..fc4d73a0 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -57,9 +57,6 @@ data AnyMessage enc e = AnyMessage !Integer !(Encoded e) class Monad m => HasOwnPeer e m where ownPeer :: m (Peer e) -class Monad m => HasPeerLocator e m where - getPeerLocator :: m (AnyPeerLocator e) - class HasStorage m where getStorage :: m AnyStorage @@ -464,4 +461,6 @@ instance (Monad m, HasFabriq e m) => HasFabriq e (ResponseM e m) where instance (Monad m, HasPeerNonce e m) => HasPeerNonce e (ResponseM e m) where peerNonce = lift $ peerNonce @e +instance (Monad m, HasPeerLocator e m) => HasPeerLocator e (ResponseM e m) where + getPeerLocator = lift getPeerLocator diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index c8fbefdb..85e3125d 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -70,5 +70,8 @@ defSweepTimeout = 30 -- FIXME: only for debug! defPeerAnnounceTime :: Timeout 'Seconds defPeerAnnounceTime = 120 +defPexMaxPeers :: Int +defPexMaxPeers = 50 + diff --git a/hbs2-core/lib/HBS2/Net/PeerLocator.hs b/hbs2-core/lib/HBS2/Net/PeerLocator.hs index e3041a84..ab6182f0 100644 --- a/hbs2-core/lib/HBS2/Net/PeerLocator.hs +++ b/hbs2-core/lib/HBS2/Net/PeerLocator.hs @@ -4,10 +4,14 @@ module HBS2.Net.PeerLocator where import HBS2.Prelude import HBS2.Net.Proto.Types +import System.Random.Shuffle (shuffleM) + class PeerLocator e l where - knownPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] - addPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () - delPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () + knownPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] + addPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () + delPeers :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () + bestPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] + addExcluded :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () data AnyPeerLocator e = forall a . PeerLocator e a => AnyPeerLocator a @@ -15,5 +19,10 @@ instance HasPeer e => PeerLocator e (AnyPeerLocator e) where knownPeers (AnyPeerLocator l) = knownPeers l addPeers (AnyPeerLocator l) = addPeers l delPeers (AnyPeerLocator l) = delPeers l + addExcluded (AnyPeerLocator l) = addExcluded l + -- FIXME: a better algorithm of choice + bestPeers (AnyPeerLocator l) = liftIO $ knownPeers l >>= shuffleM +class Monad m => HasPeerLocator e m where + getPeerLocator :: m (AnyPeerLocator e) diff --git a/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs b/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs index 04112a4c..c96939db 100644 --- a/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs +++ b/hbs2-core/lib/HBS2/Net/PeerLocator/Static.hs @@ -1,4 +1,8 @@ -module HBS2.Net.PeerLocator.Static where +module HBS2.Net.PeerLocator.Static + ( StaticPeerLocator + , newStaticPeerLocator + , PeerLocator() + ) where import HBS2.Prelude import HBS2.Net.Proto @@ -8,27 +12,36 @@ import Control.Concurrent.STM import Data.Set (Set) import Data.Set qualified as Set -import Prettyprinter -newtype StaticPeerLocator e = - StaticPeerLocator (TVar (Set (Peer e))) +data StaticPeerLocator e = + StaticPeerLocator + { include :: TVar (Set (Peer e)) + , exclude :: TVar (Set (Peer e)) + } newStaticPeerLocator :: (Ord (Peer p), HasPeer p, MonadIO m) => [Peer p] -> m (StaticPeerLocator p) newStaticPeerLocator seeds = do - tv <- liftIO $ newTVarIO (Set.fromList seeds) - pure $ StaticPeerLocator tv + tv <- liftIO $ newTVarIO (Set.fromList seeds) + tv2 <- liftIO $ newTVarIO mempty + pure $ StaticPeerLocator tv tv2 instance (Ord (Peer e), Pretty (Peer e)) => PeerLocator e (StaticPeerLocator e) where - knownPeers (StaticPeerLocator peers) = do + knownPeers (StaticPeerLocator peers e) = do ps <- liftIO $ readTVarIO peers - pure $ Set.toList ps + excl <- liftIO $ readTVarIO e + pure $ Set.toList (ps `Set.difference` excl) - addPeers (StaticPeerLocator peers) new = do - liftIO $ atomically $ modifyTVar' peers (<> Set.fromList new) + addPeers (StaticPeerLocator peers te) new = do + excl <- liftIO $ readTVarIO te + liftIO $ atomically $ modifyTVar' peers ((`Set.difference` excl) . (<> Set.fromList new)) - delPeers (StaticPeerLocator peers) del = do + delPeers (StaticPeerLocator peers _) del = do liftIO $ atomically $ modifyTVar' peers (`Set.difference` Set.fromList del) + addExcluded p excl = do + liftIO $ atomically $ modifyTVar' (exclude p) (<> Set.fromList excl) + + bestPeers = knownPeers diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index 4feace58..952f8b05 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -15,6 +15,7 @@ import HBS2.Net.Proto.BlockChunks import HBS2.Net.Proto.BlockInfo import HBS2.Net.Proto.Peer import HBS2.Net.Proto.PeerAnnounce +import HBS2.Net.Proto.PeerExchange import HBS2.Defaults import Data.Functor @@ -65,6 +66,12 @@ instance HasProtocol UDP (PeerAnnounce UDP) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise +instance HasProtocol UDP (PeerExchange UDP) where + type instance ProtocolId (PeerExchange UDP) = 6 + type instance Encoded UDP = ByteString + decode = either (const Nothing) Just . deserialiseOrFail + encode = serialise + instance Expires (SessionKey UDP (BlockInfo UDP)) where expiresIn _ = Just defCookieTimeoutSec @@ -94,6 +101,12 @@ instance MonadIO m => HasNonces (PeerHandshake UDP) m where n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) pure $ BS.take 32 n +instance MonadIO m => HasNonces (PeerExchange UDP) m where + type instance Nonce (PeerExchange UDP) = BS.ByteString + newNonce = do + n <- liftIO ( Crypto.newNonce <&> Crypto.encode ) + pure $ BS.take 32 n + instance MonadIO m => HasNonces () m where type instance Nonce () = BS.ByteString newNonce = do diff --git a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs index dbe1dcc4..eac90479 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Peer.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Peer.hs @@ -10,6 +10,8 @@ import HBS2.Clock import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple + import Data.Maybe import Codec.Serialise() import Data.ByteString qualified as BS @@ -31,7 +33,7 @@ makeLenses 'PeerData data PeerHandshake e = PeerPing PingNonce - | PeerPong (Signature e) (PeerData e) + | PeerPong PingNonce (Signature e) (PeerData e) deriving stock (Generic) newtype KnownPeer e = KnownPeer (PeerData e) @@ -41,10 +43,10 @@ newtype instance SessionKey e (KnownPeer e) = KnownPeerKey (Peer e) deriving stock (Generic,Typeable) -type instance SessionData e (KnownPeer e) = KnownPeer e +type instance SessionData e (KnownPeer e) = PeerData e newtype instance SessionKey e (PeerHandshake e) = - PeerHandshakeKey (Peer e) + PeerHandshakeKey (PingNonce, Peer e) deriving stock (Generic, Typeable) type instance SessionData e (PeerHandshake e) = PingNonce @@ -61,7 +63,7 @@ sendPing :: forall e m . ( MonadIO m sendPing pip = do nonce <- newNonce @(PeerHandshake e) - update nonce (PeerHandshakeKey pip) id + update nonce (PeerHandshakeKey (nonce,pip)) id request pip (PeerPing @e nonce) peerHandShakeProto :: forall e m . ( MonadIO m @@ -93,7 +95,7 @@ peerHandShakeProto = own <- peerNonce @e -- TODO: отправить обратно вместе с публичным ключом - response (PeerPong @e sign (PeerData (view peerSignPk creds) own)) + response (PeerPong @e nonce sign (PeerData (view peerSignPk creds) own)) -- TODO: да и пингануть того самим @@ -102,10 +104,10 @@ peerHandShakeProto = unless se $ do sendPing pip - PeerPong sign d -> do + PeerPong nonce0 sign d -> do pip <- thatPeer proto - se' <- find @e (PeerHandshakeKey pip) id + se' <- find @e (PeerHandshakeKey (nonce0,pip)) id maybe1 se' (pure ()) $ \nonce -> do @@ -113,14 +115,16 @@ peerHandShakeProto = let signed = verifySign @e pk sign nonce - expire (PeerHandshakeKey pip) + when signed $ do - -- FIXME: check if peer is blacklisted - -- right here - update (KnownPeer d) (KnownPeerKey pip) id + expire (PeerHandshakeKey (nonce0,pip)) - emit AnyKnownPeerEventKey (KnownPeerEvent pip d) - emit (ConcretePeerKey pip) (ConcretePeerData pip d) + -- FIXME: check if peer is blacklisted + -- right here + update d (KnownPeerKey pip) id + + emit AnyKnownPeerEventKey (KnownPeerEvent pip d) + emit (ConcretePeerKey pip) (ConcretePeerData pip d) where proto = Proxy @(PeerHandshake e) diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs new file mode 100644 index 00000000..9ec42e1b --- /dev/null +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs @@ -0,0 +1,137 @@ +{-# Language UndecidableInstances #-} +module HBS2.Net.Proto.PeerExchange where + +import HBS2.Prelude.Plated +import HBS2.Net.Proto +import HBS2.Net.Proto.Peer +import HBS2.Net.PeerLocator +import HBS2.Net.Proto.Sessions +import HBS2.Events +import HBS2.Clock +import HBS2.Defaults + +import Data.ByteString qualified as BS +import Data.Traversable +import Data.Functor +import Data.Maybe +import Codec.Serialise +import Data.Hashable +import Type.Reflection + +import HBS2.System.Logger.Simple +import Prettyprinter + +data PeerExchange e = + PeerExchangeGet (Nonce (PeerExchange e)) + | PeerExchangePeers (Nonce (PeerExchange e)) [PeerAddr e] + deriving stock (Generic, Typeable) + +data PeerExchangePeersEv e + + + +sendPeerExchangeGet :: forall e m . ( MonadIO m + , HasNonces (PeerExchange e) m + , Request e (PeerExchange e) m + , Sessions e (PeerExchange e) m + ) + => Peer e -> m () + +sendPeerExchangeGet pip = do + nonce <- newNonce @(PeerExchange e) + update nonce (PeerExchangeKey @e nonce) id + request pip (PeerExchangeGet @e nonce) + +peerExchangeProto :: forall e m . ( MonadIO m + , Response e (PeerExchange e) m + , HasPeerLocator e m + , HasDeferred e (PeerExchange e) m + , HasNonces (PeerExchange e) m + , IsPeerAddr e m + , Sessions e (KnownPeer e) m + , Sessions e (PeerExchange e) m + , EventEmitter e (PeerExchangePeersEv e) m + , Eq (Nonce (PeerExchange e)) + , Pretty (Peer e) + ) + => PeerExchange e -> m () + +peerExchangeProto = + \case + PeerExchangeGet n -> deferred proto do + -- TODO: sort peers by their usefulness + + that <- thatPeer proto + + debug $ "PeerExchangeGet" <+> "from" <+> pretty that + + pl <- getPeerLocator @e + pips <- knownPeers @e pl + + pa' <- forM pips $ \p -> do + auth <- find (KnownPeerKey p) id <&> isJust + if auth then do + a <- toPeerAddr p + pure [a] + else + pure mempty + + let pa = take defPexMaxPeers $ mconcat pa' + + response (PeerExchangePeers @e n pa) + + PeerExchangePeers nonce pips -> do + + pip <- thatPeer proto + + ok <- find (PeerExchangeKey @e nonce) id <&> isJust + + when ok do + sa <- mapM (fromPeerAddr @e) pips + debug $ "got pex" <+> "from" <+> pretty pip <+> pretty sa + expire @e (PeerExchangeKey nonce) + emit @e PeerExchangePeersKey (PeerExchangePeersData sa) + + where + proto = Proxy @(PeerExchange e) + + +newtype instance SessionKey e (PeerExchange e) = + PeerExchangeKey (Nonce (PeerExchange e)) + deriving stock (Generic, Typeable) + +type instance SessionData e (PeerExchange e) = Nonce (PeerExchange e) + +data instance EventKey e (PeerExchangePeersEv e) = + PeerExchangePeersKey + deriving stock (Typeable, Eq,Generic) + +deriving instance Eq (Nonce (PeerExchange e)) => Eq (SessionKey e (PeerExchange e)) +instance Hashable (Nonce (PeerExchange e)) => Hashable (SessionKey e (PeerExchange e)) + +instance Expires (SessionKey e (PeerExchange e)) where + expiresIn _ = Just 10 + + +instance Typeable (PeerExchangePeersEv e) + => Hashable (EventKey e (PeerExchangePeersEv e)) where + hashWithSalt salt _ = hashWithSalt salt (someTypeRep p) + where + p = Proxy @(PeerExchangePeersEv e) + +instance EventType ( Event e ( PeerExchangePeersEv e) ) where + isPersistent = True + +instance Expires (EventKey e (PeerExchangePeersEv e)) where + expiresIn _ = Nothing + +newtype instance Event e (PeerExchangePeersEv e) = + PeerExchangePeersData [Peer e] + deriving stock (Typeable) + +instance ( Serialise (PeerAddr e) + , Serialise (Nonce (PeerExchange e))) + + => Serialise (PeerExchange e) + + diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 34106d70..07d90417 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -22,15 +22,16 @@ import HBS2.System.Logger.Simple import PeerInfo -import Numeric ( showGFloat ) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Monad.Reader import Control.Monad.Trans.Maybe +import Control.Concurrent.STM.TSem as Sem import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) import Data.Cache qualified as Cache import Data.Foldable hiding (find) +import Data.Hashable import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HashMap import Data.IntMap (IntMap) @@ -40,8 +41,10 @@ import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) import Lens.Micro.Platform +import Numeric ( showGFloat ) import Prettyprinter import System.Random.Shuffle +import Type.Reflection calcBursts :: forall a . Integral a => a -> [a] -> [(a,a)] @@ -648,12 +651,11 @@ blockDownloadLoop env0 = do withDownload env (addBlockInfo p1 hx s) pips <- knownPeers @e pl - for_ pips $ \pip -> request pip (GetBlockSize @e h) + for_ pips $ \pip -> do + auth <- find (KnownPeerKey pip) id <&> isJust - p <- knownPeers @e pl >>= liftIO . shuffleM - - -- debug $ "known peers" <+> pretty p - -- debug $ "peers/blocks" <+> pretty peers + when auth $ request pip (GetBlockSize @e h) -- FIXME: request only known peers + -- move this to peer locator p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster @@ -687,15 +689,30 @@ tossPostponed penv = do env <- ask - waitQ <- liftIO newTQueueIO + waitQ <- liftIO $ newTBQueueIO 1 + + busy <- liftIO $ newTVarIO False cache <- asks (view blockPostponed) lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do - liftIO $ atomically $ writeTQueue waitQ () + cant <- liftIO $ readTVarIO busy + unless cant $ do + debug "AnyKnownPeerEventKey" + mt <- liftIO $ atomically $ isEmptyTBQueue waitQ + when mt do + liftIO $ atomically $ writeTBQueue waitQ () forever do - r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTQueue waitQ ) + r <- liftIO $ race ( pause @'Seconds 20 ) ( atomically $ readTBQueue waitQ ) + + void $ liftIO $ atomically $ flushTBQueue waitQ + + liftIO $ atomically $ writeTVar busy True + + void $ liftIO $ async $ do + pause @'Seconds 5 + atomically $ writeTVar busy False let allBack = either (const False) (const True) r diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 69ea7edb..4308f157 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -5,19 +5,26 @@ module PeerInfo where import HBS2.Actors.Peer import HBS2.Clock import HBS2.Defaults +import HBS2.Events import HBS2.Net.Messaging.UDP import HBS2.Net.PeerLocator import HBS2.Net.Proto.Peer +import HBS2.Net.Proto.PeerExchange import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Types import HBS2.Prelude.Plated import HBS2.System.Logger.Simple -import Data.Foldable +import Data.Maybe +import Data.Set qualified as Set +import Data.List qualified as List +import Data.Foldable hiding (find) import Lens.Micro.Platform import Control.Concurrent.STM.TVar import Control.Concurrent.STM import Control.Monad +import Control.Concurrent.Async +import System.Random.Shuffle import Prettyprinter data PeerInfo e = @@ -59,6 +66,35 @@ instance Expires (SessionKey UDP (PeerInfo UDP)) where expiresIn = const (Just 600) +pexLoop :: forall e m . ( HasPeerLocator e m + , HasPeer e + , Sessions e (KnownPeer e) m + , HasNonces (PeerExchange e) m + , Request e (PeerExchange e) m + , Sessions e (PeerExchange e) m + , MonadIO m + ) => m () + +pexLoop = do + + pause @'Seconds 5 + + pl <- getPeerLocator @e + + forever do + + pips <- knownPeers @e pl + + peers' <- forM pips $ \p -> do + au <- find @e (KnownPeerKey p) id + pure $ maybe1 au mempty (const [p]) + + peers <- liftIO (shuffleM (mconcat peers')) <&> take 10 -- FIXME: defaults + + for_ peers sendPeerExchangeGet + + pause @'Seconds 60 -- FIXME: defaults + peerPingLoop :: forall e m . ( HasPeerLocator e m , HasPeer e , HasNonces (PeerHandshake e) m @@ -67,29 +103,47 @@ peerPingLoop :: forall e m . ( HasPeerLocator e m , Sessions e (PeerHandshake e) m , Sessions e (PeerInfo e) m , Sessions e (KnownPeer e) m + , EventListener e (PeerExchangePeersEv e) m , Pretty (Peer e) , MonadIO m ) => m () -peerPingLoop = forever do - pause @'Seconds 120 -- FIXME: defaults - debug "peerPingLoop" +peerPingLoop = do - pl <- getPeerLocator @e - pips <- knownPeers @e pl + wake <- liftIO newTQueueIO - for_ pips $ \p -> do - npi <- newPeerInfo - pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) - liftIO $ atomically $ modifyTVar pfails succ - sendPing @e p - pause @'Seconds 2 -- NOTE: it's okay? + subscribe @e PeerExchangePeersKey $ \(PeerExchangePeersData sas) -> do + liftIO $ atomically $ writeTQueue wake sas - fnum <- liftIO $ readTVarIO pfails + forever do - when (fnum > 3) do -- FIXME: hardcode! - warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" - delPeers pl [p] - expire (PeerInfoKey p) - expire (KnownPeerKey p) + -- FIXME: defaults + r <- liftIO $ race (pause @'Seconds 60) + (atomically $ readTQueue wake) + + sas' <- liftIO $ atomically $ flushTQueue wake <&> mconcat + + let sas = case r of + Left{} -> sas' + Right sa -> sa <> sas' + + debug "peerPingLoop" + + pl <- getPeerLocator @e + pips <- knownPeers @e pl <&> (<> sas) <&> List.nub + + for_ pips $ \p -> do + npi <- newPeerInfo + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + liftIO $ atomically $ modifyTVar pfails succ + sendPing @e p + pause @'Seconds 1 -- NOTE: it's okay? + + fnum <- liftIO $ readTVarIO pfails + + when (fnum > 3) do -- FIXME: hardcode! + warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" + delPeers pl [p] + expire (PeerInfoKey p) + expire (KnownPeerKey p) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 08d7dabb..4cb04b33 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -18,6 +18,7 @@ import HBS2.Net.Proto import HBS2.Net.Proto.Definition import HBS2.Net.Proto.Peer import HBS2.Net.Proto.PeerAnnounce +import HBS2.Net.Proto.PeerExchange import HBS2.Net.Proto.Sessions import HBS2.OrDie import HBS2.Prelude.Plated @@ -294,91 +295,113 @@ runPeer opts = Exception.handle myException $ do unless known $ sendPing pip subscribe @UDP AnyKnownPeerEventKey $ \(KnownPeerEvent p d) -> do - unless (pnonce == view peerOwnNonce d) $ do - addPeers pl [p] - npi <- newPeerInfo - pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) - liftIO $ atomically $ writeTVar pfails 0 + -- FIXME: check if we've got a reference to ourselves + if pnonce == view peerOwnNonce d then do + delPeers pl [p] + addExcluded pl [p] + expire (KnownPeerKey p) - debug $ "Got authorized peer!" <+> pretty p - <+> pretty (AsBase58 (view peerSignKey d)) + else do + + prev <- find (KnownPeerKey p) (view peerOwnNonce) + + case prev of + Just nonce0 | nonce0 /= view peerOwnNonce d -> do + debug "old peer, new address. ignoring" + + _ -> do + addPeers pl [p] + + npi <- newPeerInfo + pfails <- fetch True npi (PeerInfoKey p) (view peerPingFailed) + liftIO $ atomically $ writeTVar pfails 0 + + debug $ "Got authorized peer!" <+> pretty p + <+> pretty (AsBase58 (view peerSignKey d)) void $ liftIO $ async $ withPeerM env do pause @'Seconds 1 debug "sending first peer announce" request localMulticast (PeerAnnounce @UDP pnonce) - void $ liftIO $ async $ withPeerM env $ forever $ do - pause defPeerAnnounceTime -- FIXME: setting! - debug "sending local peer announce" - request localMulticast (PeerAnnounce @UDP pnonce) + let wo = fmap L.singleton - as <- liftIO $ async $ withPeerM env (peerPingLoop @UDP) + workers <- do - as <- liftIO $ async $ withPeerM env (blockDownloadLoop denv) + wo $ liftIO $ async $ withPeerM env $ forever $ do + pause defPeerAnnounceTime -- FIXME: setting! + debug "sending local peer announce" + request localMulticast (PeerAnnounce @UDP pnonce) - rpc <- liftIO $ async $ withPeerM env $ forever $ do - cmd <- liftIO $ atomically $ readTQueue rpcQ - case cmd of - POKE -> debug "on poke: alive and kicking!" + wo $ liftIO $ async $ withPeerM env (peerPingLoop @UDP) - PING pa r -> do - debug $ "ping" <+> pretty pa - pip <- fromPeerAddr @UDP pa - subscribe (ConcretePeerKey pip) $ \(ConcretePeerData{}) -> do + wo $ liftIO $ async $ withPeerM env (pexLoop @UDP) - maybe1 r (pure ()) $ \rpcPeer -> do - pinged <- toPeerAddr pip - request rpcPeer (RPCPong @UDP pinged) + wo $ liftIO $ async $ withPeerM env (blockDownloadLoop denv) - sendPing pip + wo $ liftIO $ async $ withPeerM env $ forever $ do + cmd <- liftIO $ atomically $ readTQueue rpcQ + case cmd of + POKE -> debug "on poke: alive and kicking!" - ANNOUNCE h -> do - debug $ "got announce rpc" <+> pretty h - sto <- getStorage - mbsize <- liftIO $ hasBlock sto h + PING pa r -> do + debug $ "ping" <+> pretty pa + pip <- fromPeerAddr @UDP pa + subscribe (ConcretePeerKey pip) $ \(ConcretePeerData{}) -> do - maybe1 mbsize (pure ()) $ \size -> do - let ann = BlockAnnounceInfo 0 NoBlockInfoMeta size h - no <- peerNonce @UDP - request localMulticast (BlockAnnounce @UDP no ann) + maybe1 r (pure ()) $ \rpcPeer -> do + pinged <- toPeerAddr pip + request rpcPeer (RPCPong @UDP pinged) - CHECK nonce pa h -> do - pip <- fromPeerAddr @UDP pa + sendPing pip - n1 <- peerNonce @UDP + ANNOUNCE h -> do + debug $ "got announce rpc" <+> pretty h + sto <- getStorage + mbsize <- liftIO $ hasBlock sto h - unless (nonce == n1) do + maybe1 mbsize (pure ()) $ \size -> do + let ann = BlockAnnounceInfo 0 NoBlockInfoMeta size h + no <- peerNonce @UDP + request localMulticast (BlockAnnounce @UDP no ann) - peer <- find @UDP (KnownPeerKey pip) id + CHECK nonce pa h -> do + pip <- fromPeerAddr @UDP pa - debug $ "received announce from" - <+> pretty pip - <+> pretty h + n1 <- peerNonce @UDP - case peer of - Nothing -> sendPing @UDP pip - Just{} -> do - debug "announce from a known peer" - debug "preparing to dowload shit" - debug "checking policy, blah-blah-blah. tomorrow" + unless (nonce == n1) do - withDownload denv $ do - processBlock h + peer <- find @UDP (KnownPeerKey pip) id - _ -> pure () + debug $ "received announce from" + <+> pretty pip + <+> pretty h + + case peer of + Nothing -> sendPing @UDP pip + Just{} -> do + debug "announce from a known peer" + debug "preparing to dowload shit" + debug "checking policy, blah-blah-blah. tomorrow" + + withDownload denv $ do + processBlock h + + _ -> pure () - me <- liftIO $ async $ withPeerM env $ do - runProto @UDP - [ makeResponse (blockSizeProto blk dontHandle) - , makeResponse (blockChunksProto adapter) - , makeResponse blockAnnounceProto - , makeResponse (withCredentials pc . peerHandShakeProto) - ] + wo $ liftIO $ async $ withPeerM env $ do + runProto @UDP + [ makeResponse (blockSizeProto blk dontHandle) + , makeResponse (blockChunksProto adapter) + , makeResponse blockAnnounceProto + , makeResponse (withCredentials pc . peerHandShakeProto) + , makeResponse peerExchangeProto + ] - void $ liftIO $ waitAnyCatchCancel [me,as] + void $ liftIO $ waitAnyCatchCancel workers let pokeAction _ = do liftIO $ atomically $ writeTQueue rpcQ POKE diff --git a/nix/peer/flake.lock b/nix/peer/flake.lock index dd973d09..a38703c1 100644 --- a/nix/peer/flake.lock +++ b/nix/peer/flake.lock @@ -112,15 +112,16 @@ "saltine": "saltine" }, "locked": { - "lastModified": 1675599025, - "narHash": "sha256-ZVOkBwFMpUHyhsdBg8ubv/h43N7phoLqR1lYbgZEeH0=", + "lastModified": 1675665762, + "narHash": "sha256-zVfyDqVyPgMXMsj/od1xmOCQz5gkHzi3pTyf83qaqF0=", "owner": "voidlizard", "repo": "hbs2", - "rev": "4ab83f0517c0ab465634a15607d3f0dddeaba3e7", + "rev": "85d34209e10016fc243e37e221dca8217af3f7bb", "type": "github" }, "original": { "owner": "voidlizard", + "ref": "wip", "repo": "hbs2", "type": "github" } @@ -165,11 +166,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1675512093, - "narHash": "sha256-u1CY4feK14B57E6T+0Bhkuoj8dpBxCPrWO+SP87UVP8=", + "lastModified": 1675600654, + "narHash": "sha256-ipsDTkzRq1CAl2g5tYd7ugjVMSKF6KLh9F+5Kso0lT0=", "owner": "nixos", "repo": "nixpkgs", - "rev": "8e8240194eda25b61449f29bb5131e02b28a5486", + "rev": "cff83d5032a21aad4f69bf284e95b5f564f4a54e", "type": "github" }, "original": { diff --git a/nix/peer/flake.nix b/nix/peer/flake.nix index df5c7041..c48b813b 100644 --- a/nix/peer/flake.nix +++ b/nix/peer/flake.nix @@ -5,7 +5,7 @@ inputs = { extra-container.url = "github:erikarvstedt/extra-container"; nixpkgs.url = "github:nixos/nixpkgs/nixos-22.11"; - hbs2.url = "github:voidlizard/hbs2"; + hbs2.url = "github:voidlizard/hbs2/wip"; hbs2.inputs.nixpkgs.follows = "nixpkgs"; };