From 1c5a90984f3042c758ccaac8998e3d341b0ede1f Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Fri, 30 Jun 2023 13:43:53 +0300 Subject: [PATCH] fixing tcp pex --- hbs2-core/lib/HBS2/Net/IP/Addr.hs | 9 +- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 37 ++-- hbs2-core/lib/HBS2/Net/Proto/Definition.hs | 4 +- hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs | 8 +- hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs | 5 + hbs2-core/lib/HBS2/Net/Proto/Types.hs | 4 + hbs2-peer/app/Brains.hs | 222 ++++++++++++++++--- hbs2-peer/app/PeerInfo.hs | 53 ++++- hbs2-peer/app/PeerMain.hs | 19 +- hbs2-peer/app/PeerMeta.hs | 19 +- 10 files changed, 316 insertions(+), 64 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/IP/Addr.hs b/hbs2-core/lib/HBS2/Net/IP/Addr.hs index 2108dddf..082b71a7 100644 --- a/hbs2-core/lib/HBS2/Net/IP/Addr.hs +++ b/hbs2-core/lib/HBS2/Net/IP/Addr.hs @@ -51,12 +51,15 @@ instance Hashable (IPAddrPort e) instance Serialise (IPAddrPort e) +instance Pretty IP where + pretty ip = case ip of + i4@(IPv4{}) -> pretty (show i4) + i6@(IPv6{}) -> brackets $ pretty (show i6) + instance Pretty (IPAddrPort e) where pretty (IPAddrPort (ip,p)) = pretty (show pip) <> colon <> pretty p where - pip = case ip of - i4@(IPv4{}) -> pretty (show i4) - i6@(IPv6{}) -> brackets $ pretty (show i6) + pip = pretty ip instance IsString (IPAddrPort e) where fromString s = IPAddrPort (read h, fromIntegral p) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 560ed82d..dd4c6e81 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -6,6 +6,7 @@ module HBS2.Net.Messaging.TCP , tcpOwnPeer , tcpPeerConn , tcpCookie + , tcpOnClientStarted ) where import HBS2.Clock @@ -16,8 +17,7 @@ import HBS2.Prelude.Plated import HBS2.System.Logger.Simple --- import Control.Concurrent.Async -import Control.Concurrent.STM (flushTQueue,stateTVar) +import Control.Concurrent.STM (flushTQueue) import Control.Exception (try,Exception,SomeException,throwIO) import Control.Monad import Data.Bits @@ -51,19 +51,22 @@ instance Exception SocketClosedException -- FIXME: control-recv-capacity-to-avoid-leaks + +-- | TCP Messaging environment data MessagingTCP = MessagingTCP - { _tcpOwnPeer :: Peer L4Proto - , _tcpCookie :: Word32 - , _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto)) - , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) - , _tcpConnUsed :: TVar (HashMap Word64 Int) - , _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString))) - , _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto)) - , _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32) - , _tcpRecv :: TQueue (Peer L4Proto, ByteString) - , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) - , _tcpDeferEv :: TQueue () + { _tcpOwnPeer :: Peer L4Proto + , _tcpCookie :: Word32 + , _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto)) + , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) + , _tcpConnUsed :: TVar (HashMap Word64 Int) + , _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString))) + , _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto)) + , _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32) + , _tcpRecv :: TQueue (Peer L4Proto, ByteString) + , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) + , _tcpDeferEv :: TQueue () + , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } makeLenses 'MessagingTCP @@ -86,6 +89,7 @@ newMessagingTCP pa = liftIO do <*> newTQueueIO <*> newTVarIO mempty <*> newTQueueIO + <*> pure (\_ _ -> none) -- do nothing by default instance Messaging MessagingTCP L4Proto ByteString where @@ -209,6 +213,10 @@ spawnConnection tp env so sa = liftIO do let connId = connectionId myCookie theirCookie + when (tp == Client && theirCookie /= myCookie) do + pa <- toPeerAddr newP + liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection + traceCmd own ( "spawnConnection " <+> viaShow tp @@ -345,11 +353,8 @@ connectPeerTCP env peer = liftIO do connect (show i) (show p) $ \(sock, remoteAddr) -> do spawnConnection Client env sock remoteAddr - -- FIXME: tcp-pex. Где-то здесь добавить этих пиров в пекс ? - -- REVIEW: так что в итоге? где-то здесь? shutdown sock ShutdownBoth - -- FIXME: link-all-asyncs runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () diff --git a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs index e84fa2cd..230a649f 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Definition.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Definition.hs @@ -83,7 +83,7 @@ instance HasProtocol L4Proto (PeerHandshake L4Proto) where decode = either (const Nothing) Just . deserialiseOrFail encode = serialise - requestPeriodLim = ReqLimPerProto 2 + requestPeriodLim = ReqLimPerProto 0.5 instance HasProtocol L4Proto (PeerAnnounce L4Proto) where type instance ProtocolId (PeerAnnounce L4Proto) = 5 @@ -118,7 +118,7 @@ instance HasProtocol L4Proto (PeerMetaProto L4Proto) where encode = serialise -- FIXME: real-period - requestPeriodLim = ReqLimPerMessage 1 + requestPeriodLim = ReqLimPerMessage 0.25 instance Expires (SessionKey L4Proto (BlockInfo L4Proto)) where expiresIn _ = Just defCookieTimeoutSec diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs index 60d3a5fd..98635d4a 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs @@ -62,9 +62,11 @@ peerExchangeProto :: forall e m . ( MonadIO m , Pretty (Peer e) , e ~ L4Proto ) - => PeerExchange e -> m () + => ( [Peer e] -> m [Peer e] ) + -> PeerExchange e + -> m () -peerExchangeProto msg = do +peerExchangeProto pexFilt msg = do case msg of PeerExchangeGet n -> peerExchangeGet PEX1 n PeerExchangeGet2 n -> peerExchangeGet PEX2 n @@ -104,7 +106,7 @@ peerExchangeProto msg = do debug $ "PeerExchangeGet" <+> "from" <+> pretty that pl <- getPeerLocator @e - pips <- knownPeers @e pl + pips <- knownPeers @e pl >>= pexFilt case pex of PEX1 -> do diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs index 520e18d7..e0dd44a5 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerMeta.hs @@ -9,6 +9,8 @@ import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Sessions import HBS2.Prelude.Plated +import HBS2.System.Logger.Simple + import Codec.Serialise import Control.Monad import Data.ByteString ( ByteString ) @@ -30,6 +32,7 @@ peerMetaProto :: forall e m . ( MonadIO m , HasDeferred e (PeerMetaProto e) m , EventEmitter e (PeerMetaProto e) m , Sessions e (KnownPeer e) m + , Pretty (Peer e) ) => AnnMetaData -> PeerMetaProto e @@ -41,11 +44,13 @@ peerMetaProto peerMeta = p <- thatPeer (Proxy @(PeerMetaProto e)) auth <- find (KnownPeerKey p) id <&> isJust when auth do + debug $ "PEER META: ANSWERING" <+> pretty p <+> viaShow peerMeta deferred (Proxy @(PeerMetaProto e)) do response (ThePeerMeta @e peerMeta) ThePeerMeta meta -> do that <- thatPeer (Proxy @(PeerMetaProto e)) + debug $ "GOT PEER META FROM" <+> pretty that <+> viaShow meta emit @e (PeerMetaEventKey that) (PeerMetaEvent meta) newtype instance EventKey e (PeerMetaProto e) = diff --git a/hbs2-core/lib/HBS2/Net/Proto/Types.hs b/hbs2-core/lib/HBS2/Net/Proto/Types.hs index d0a08c85..83a10916 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/Types.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/Types.hs @@ -43,6 +43,10 @@ instance Show L4Proto where show UDP = "udp" show TCP = "tcp" +instance Pretty L4Proto where + pretty UDP = "udp" + pretty TCP = "tcp" + -- type family Encryption e :: Type class Monad m => GenCookie e m where diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index a41645b9..9f880c28 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -7,6 +7,7 @@ import HBS2.Prelude.Plated import HBS2.Clock import HBS2.Net.Proto import HBS2.Hash +import HBS2.Net.IP.Addr import HBS2.System.Logger.Simple @@ -32,9 +33,27 @@ import Data.Either import System.Directory import System.FilePath +data PeerBrainsDb + +instance HasCfgKey PeerBrainsDb (Maybe String) where + key = "brains" + class HasBrains e a where + onClientTCPConnected :: MonadIO m => a -> PeerAddr e -> Word64 -> m () + onClientTCPConnected _ _ = const none + + getClientTCP :: MonadIO m => a -> m [(PeerAddr e,Word64)] + getClientTCP = const $ pure mempty + + setActiveTCPSessions :: MonadIO m => a -> [(PeerAddr e, Word64)] -> m () + setActiveTCPSessions _ _ = none + + listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e] + listTCPPexCandidates _ = pure mempty + onKnownPeers :: MonadIO m => a -> [Peer e] -> m () + onKnownPeers _ _ = none onBlockSize :: ( MonadIO m , IsPeerAddr e m @@ -44,6 +63,7 @@ class HasBrains e a where -> Hash HbSync -> Integer -> m () + onBlockSize _ _ _ _ = none onBlockDownloadAttempt :: ( MonadIO m , IsPeerAddr e m @@ -53,27 +73,36 @@ class HasBrains e a where -> Hash HbSync -> m () + onBlockDownloadAttempt _ _ _ = none + onBlockDownloaded :: MonadIO m => a -> Peer e -> Hash HbSync -> m () + onBlockDownloaded _ _ _ = none + onBlockPostponed :: MonadIO m => a -> Hash HbSync -> m () + onBlockPostponed _ _ = none + claimBlockCameFrom :: MonadIO m => a -> Hash HbSync -> Hash HbSync -> m () + claimBlockCameFrom _ _ _ = none + shouldPostponeBlock :: MonadIO m => a -> Hash HbSync -> m Bool + shouldPostponeBlock _ _ = pure False shouldDownloadBlock :: MonadIO m @@ -81,11 +110,13 @@ class HasBrains e a where -> Peer e -> Hash HbSync -> m Bool + shouldDownloadBlock _ _ _ = pure False advisePeersForBlock :: (MonadIO m, FromStringMaybe (PeerAddr e)) => a -> Hash HbSync -> m [PeerAddr e] + advisePeersForBlock _ _ = pure mempty blockSize :: forall m . MonadIO m => a @@ -109,35 +140,18 @@ class HasBrains e a where setReflogProcessed _ _ = pure () + type NoBrains = () instance Pretty (Peer e) => HasBrains e NoBrains where - onKnownPeers _ ps = pure () - - onBlockSize _ _ _ _ = do - pure () - - onBlockDownloadAttempt _ p h = do - pure () - - onBlockDownloaded _ p h = do - pure () - - onBlockPostponed _ h = do - pure () - - claimBlockCameFrom _ _ _ = do pure () - - shouldPostponeBlock _ _ = pure False - - shouldDownloadBlock _ _ _ = pure True - - advisePeersForBlock _ _ = pure mempty - data SomeBrains e = forall a . HasBrains e a => SomeBrains a instance HasBrains e (SomeBrains e) where + onClientTCPConnected (SomeBrains a) = onClientTCPConnected @e a + getClientTCP (SomeBrains a) = getClientTCP @e a + setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a + listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a onKnownPeers (SomeBrains a) = onKnownPeers a onBlockSize (SomeBrains a) = onBlockSize a onBlockDownloadAttempt (SomeBrains a) = onBlockDownloadAttempt a @@ -172,12 +186,36 @@ cleanupPostponed b h = do let flt (_,h1) _ = h1 /= h liftIO $ atomically $ modifyTVar po $ HashMap.filterWithKey flt -instance (Hashable (Peer e), Pretty (Peer e)) => HasBrains e (BasicBrains e) where +instance ( Hashable (Peer e) + , Pretty (Peer e), Pretty (PeerAddr e) + , e ~ L4Proto + ) => HasBrains e (BasicBrains e) where + + onClientTCPConnected br pa@(L4Address proto _) ssid = do + debug $ "BRAINS: onClientTCPConnected" <+> pretty proto <+> pretty pa <+> pretty ssid + updateOP br $ insertClientTCP br pa ssid + commitNow br True + + getClientTCP br = liftIO (selectClientTCP br) + + setActiveTCPSessions br ssids = do + trace $ "BRAINS: setActiveTCPSessions" <+> pretty ssids + updateOP br $ updateTCPSessions br ssids + commitNow br True + + listTCPPexCandidates = liftIO . selectTCPPexCandidates onKnownPeers br ps = do - -- trace "BRAINS: onKnownPeers" + trace $ "BRAINS: onKnownPeers" <+> pretty ps let tv = view brainsPeers br liftIO $ atomically $ writeTVar tv ps + updateOP br $ do + transactional br $ do + deleteKnownPeers br + forM_ ps $ \pip -> do + pa <- toPeerAddr pip + insertKnownPeer br pa + commitNow br True onBlockSize b p h size = do updateOP b $ insertSize b p h size @@ -282,6 +320,35 @@ insertSize br p h s = do |] (show $ pretty h, show $ pretty p, s, s) +insertClientTCP :: forall e . (Pretty (Peer e), e ~ L4Proto) + => BasicBrains e + -> PeerAddr e + -> Word64 + -> IO () + +-- | only stores TCP address +insertClientTCP br pa@(L4Address TCP (IPAddrPort (h,p))) ssid = do + let conn = view brainsDb br + void $ liftIO $ execute conn [qc| + insert into tcpclient (peer,ssid,ip,port) values (?,?,?,?) + on conflict (peer) do update set ssid = excluded.ssid + |] (show $ pretty pa, ssid, show (pretty h), p) + +insertClientTCP _ _ _ = pure () + +selectClientTCP :: BasicBrains L4Proto -> IO [(PeerAddr L4Proto, Word64)] +selectClientTCP br = do + let conn = view brainsDb br + rows <- liftIO $ query_ @(String, Word64) conn [qc| + select peer,ssid from tcpclient limit 200 + |] + + pas <- forM rows $ \(speer,ssid) -> do + pure $ (,) <$> fromStringMay speer + <*> pure ssid + + pure $ catMaybes pas + insertReflogProcessed :: BasicBrains e -> Hash HbSync -> IO () @@ -366,6 +433,77 @@ insertPeer br blk peer = do |] (show $ pretty blk, show $ pretty peer) +insertKnownPeer :: forall e . e ~ L4Proto + => BasicBrains e + -> PeerAddr e + -> IO () + +insertKnownPeer br peer@(L4Address _ (IPAddrPort (i,a))) = do + let conn = view brainsDb br + void $ liftIO $ execute conn [qc| + INSERT INTO knownpeer (peer,ip,port) + VALUES (?,?,?) + ON CONFLICT (peer) + DO NOTHING + |] (show $ pretty peer, show (pretty i), a) + + +deleteKnownPeers :: forall e . e ~ L4Proto + => BasicBrains e + -> IO () + +deleteKnownPeers br = do + let conn = view brainsDb br + void $ liftIO $ execute_ conn [qc| + DELETE FROM knownpeer; + |] + +selectKnownPeers :: forall e . e ~ L4Proto + => BasicBrains e + -> IO [PeerAddr e] -- ^ list of peers + +selectKnownPeers br = do + let conn = view brainsDb br + liftIO $ query_ conn [qc|SELECT peer FROM knownpeer|] + <&> fmap (fromStringMay . fromOnly) + <&> catMaybes + + +selectTCPPexCandidates :: forall e . e ~ L4Proto + => BasicBrains e + -> IO [PeerAddr e] -- ^ list of peers + +selectTCPPexCandidates br = do + let conn = view brainsDb br + liftIO $ query_ conn + [qc| SELECT distinct(cl.peer) + FROM tcpclient cl JOIN knownpeer p on p.ip = cl.ip + |] <&> fmap (fromStringMay . fromOnly) + <&> catMaybes + +updateTCPSessions :: forall e . e ~ L4Proto + => BasicBrains e + -> [(PeerAddr e, Word64)] + -> IO () + +updateTCPSessions br ssids = do + let conn = view brainsDb br + let sss = fmap (over _1 (show . pretty) . ip) ssids + transactional br $ do + void $ liftIO $ execute_ conn [qc|DELETE FROM tcpsession|] + void $ liftIO $ executeMany conn [qc| + INSERT INTO tcpsession (peer, ssid, ip, port) + VALUES (?, ?, ?, ?) + ON CONFLICT (ssid) + DO UPDATE SET + peer = excluded.peer, + ip = excluded.ip, + port = excluded.port + |] sss + + where + ip (a@(L4Address _ (IPAddrPort (i,p))), s) = (a,s,show $ pretty i,p) + newtype DBData a = DBData { fromDBData :: a } instance FromField (DBData (Hash HbSync)) where @@ -477,7 +615,13 @@ newBasicBrains cfg = liftIO do let stateDb = sdir "brains.db" - conn <- open ":memory:" + let brains = fromMaybe ":memory:" $ cfgValue @PeerBrainsDb cfg + + unless ( brains == ":memory:" ) do + here <- doesFileExist brains + when here $ do removeFile brains + + conn <- open brains execute_ conn [qc|ATTACH DATABASE '{stateDb}' as statedb|] @@ -518,6 +662,34 @@ newBasicBrains cfg = liftIO do , primary key (block,peer)) |] + execute_ conn [qc| + create table if not exists tcpclient + ( peer text not null + , ssid unsigned big int not null + , ip text not null + , port int not null + , primary key (peer) ) + |] + + execute_ conn [qc| + create table if not exists knownpeer + ( peer text not null + , ip text not null + , port int not null + , primary key (peer) + ) + |] + + execute_ conn [qc| + create table if not exists tcpsession + ( ssid unsigned bin int not null + , peer text not null + , ip text not null + , port int not null + , primary key (ssid) + ) + |] + BasicBrains <$> newTVarIO mempty <*> newTVarIO mempty <*> Cache.newCache (Just (toTimeSpec (30 :: Timeout 'Seconds))) diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 738db87e..0743060b 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -13,8 +13,11 @@ import HBS2.Net.Proto.Types import HBS2.Prelude.Plated import HBS2.System.Logger.Simple +import HBS2.Net.Messaging.TCP + import PeerConfig import PeerTypes +import Brains import Control.Concurrent.Async import Control.Concurrent.STM @@ -26,6 +29,7 @@ import Data.Maybe import Lens.Micro.Platform import Numeric (showGFloat) import System.Random.Shuffle +import Data.HashMap.Strict qualified as HashMap data PeerPingIntervalKey @@ -66,21 +70,52 @@ insertRTT x rttList = do else x:init xs ) -pexLoop :: forall e m . ( HasPeerLocator e m - , HasPeer e - , Sessions e (KnownPeer e) m - , HasNonces (PeerExchange e) m - , Request e (PeerExchange e) m - , Sessions e (PeerExchange e) m - , MonadIO m - ) => m () +pexLoop :: forall e brains m . ( HasPeerLocator e m + , HasPeer e + , HasBrains e brains + , Sessions e (KnownPeer e) m + , HasNonces (PeerExchange e) m + , Request e (PeerExchange e) m + , Sessions e (PeerExchange e) m + , MonadIO m + , e ~ L4Proto + ) => brains -> Maybe MessagingTCP -> m () -pexLoop = do +pexLoop brains tcpEnv = do pause @'Seconds 5 pl <- getPeerLocator @e + tcpPexInfo <- liftIO $ async $ forever do + -- FIXME: fix-hardcode + pause @'Seconds 20 + + pips <- knownPeers @e pl + onKnownPeers brains pips + + conns <- maybe1 (view tcpPeerConn <$> tcpEnv) (pure mempty) $ \tconn -> do + liftIO $ readTVarIO tconn <&> HashMap.toList + + ssids <- forM conns $ \(p,coo) -> do + debug $ "ACTUAL TCP SESSIONS" <+> pretty p <+> pretty coo + pa <- toPeerAddr p + pure (pa, coo) + + setActiveTCPSessions @e brains ssids + + tcp <- getClientTCP @e brains + + forM_ tcp $ \(pa, ssid) -> do + debug $ "TCP PEX CANDIDATE" <+> pretty pa <+> pretty ssid + + pex <- listTCPPexCandidates @e brains + + forM_ pex $ \pa -> do + debug $ "BRAINS: TCP PEX CANDIDATE" <+> pretty pa + + liftIO $ mapM_ link [tcpPexInfo] + forever do pips <- knownPeers @e pl diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 19b34cb0..142c37d9 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -70,6 +70,7 @@ import Data.Set (Set) import Data.Text.Encoding qualified as TE import Data.Text qualified as Text import Data.Text (Text) +import Data.HashSet qualified as HashSet import GHC.Stats import GHC.TypeLits import Lens.Micro.Platform @@ -559,7 +560,7 @@ runPeer opts = U.handle (\e -> myException e trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' tcp <- maybe1 addr' (pure Nothing) $ \addr -> do - tcpEnv <- newMessagingTCP addr + tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) -- FIXME: handle-tcp-thread-somehow void $ async $ runMessagingTCP tcpEnv pure $ Just tcpEnv @@ -576,6 +577,16 @@ runPeer opts = U.handle (\e -> myException e pause @'Seconds 600 liftIO $ Cache.purgeExpired nbcache + let pexFilt pips = do + tcpex <- listTCPPexCandidates @e brains <&> HashSet.fromList + fset <- forM pips $ \p -> do + toPeerAddr p >>= \case + (L4Address UDP _) -> pure $ Just p + pa@(L4Address TCP _) | HashSet.member pa tcpex -> pure $ Just p + _ -> pure Nothing + + pure (catMaybes fset) + let onNoBlock (p, h) = do already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust unless already do @@ -619,6 +630,7 @@ runPeer opts = U.handle (\e -> myException e def <- newPeerInfo tv <- lift $ fetch True def (PeerInfoKey p) (view peerRTTBuffer) insertRTT rttNew tv + let hshakeAdapter = PeerHandshakeAdapter addNewRtt env <- ask @@ -664,6 +676,7 @@ runPeer opts = U.handle (\e -> myException e unless here do debug $ "Got authorized peer!" <+> pretty p <+> pretty (AsBase58 (view peerSignKey d)) + request @e p (GetPeerMeta @e) -- FIXME: check if we've got a reference to ourselves @@ -764,7 +777,7 @@ runPeer opts = U.handle (\e -> myException e peerThread "bootstrapDnsLoop" (bootstrapDnsLoop @e conf) - peerThread "pexLoop" (pexLoop @e) + peerThread "pexLoop" (pexLoop @e brains tcp) peerThread "blockDownloadLoop" (blockDownloadLoop denv) @@ -881,7 +894,7 @@ runPeer opts = U.handle (\e -> myException e , makeResponse (blockChunksProto adapter) , makeResponse blockAnnounceProto , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter) - , makeResponse peerExchangeProto + , makeResponse (peerExchangeProto pexFilt) , makeResponse (refLogUpdateProto reflogAdapter) , makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (peerMetaProto (mkPeerMeta conf)) diff --git a/hbs2-peer/app/PeerMeta.hs b/hbs2-peer/app/PeerMeta.hs index ee3a1988..6ed2d17e 100644 --- a/hbs2-peer/app/PeerMeta.hs +++ b/hbs2-peer/app/PeerMeta.hs @@ -53,8 +53,9 @@ fillPeerMeta mtcp probePeriod = do debug "I'm fillPeerMeta" pl <- getPeerLocator @e + pause @'Seconds 10 -- wait 'till everything calm down + forever $ (>> pause probePeriod) $ do - pause @'Seconds 5 -- wait 'till everything calm down ps <- knownPeers @e pl debug $ "fillPeerMeta peers:" <+> pretty ps @@ -73,6 +74,7 @@ fillPeerMeta mtcp probePeriod = do subscribe @e (PeerMetaEventKey p) $ \case PeerMetaEvent meta -> do liftIO $ atomically $ writeTQueue q (Just meta) + request p (GetPeerMeta @e) r <- liftIO $ race ( pause defGetPeerMetaTimeout ) @@ -85,7 +87,11 @@ fillPeerMeta mtcp probePeriod = do Left _ -> liftIO $ atomically $ writeTVar (_peerHttpApiAddress pinfo) $ if attemptn < 3 then (Left (attemptn + 1)) else (Right Nothing) + Right (Just meta) -> (void . runMaybeT) do + + debug $ "*** GOT GOOD META *** " <+> pretty p <+> viaShow meta + peerMeta <- case meta of NoMetaData -> (MaybeT . pure) Nothing ShortMetadata t -> do @@ -93,11 +99,18 @@ fillPeerMeta mtcp probePeriod = do AnnHashRef h -> (MaybeT . pure) Nothing liftIO $ atomically $ writeTVar (_peerMeta pinfo) (Just peerMeta) + debug $ "*** GOT VERY GOOD META *** " <+> pretty p <+> viaShow peerMeta + -- 3) пробить, что есть tcp forM_ (lookupDecode "listen-tcp" (unPeerMeta peerMeta)) \listenTCPPort -> lift do peerTCPAddrPort <- replacePort p listenTCPPort - p <- fromPeerAddr (L4Address TCP peerTCPAddrPort) - sendPing p + candidate <- fromPeerAddr (L4Address TCP peerTCPAddrPort) + + debug $ "** SENDING PING BASE ON META ** " <+> pretty candidate + + sendPing candidate + -- если пинг на этот адрес уйдет, то пир сам добавится + -- в knownPeers, делать ничего не надо forM_ mtcp \(tcp :: MessagingTCP) -> do -- 4) выяснить, можно ли к нему открыть соединение на этот порт