multiple PEX fixes

This commit is contained in:
Dmitry Zuikov 2023-10-21 08:58:31 +03:00
parent 66ce6a659d
commit 0f69757813
9 changed files with 171 additions and 37 deletions

View File

@ -383,17 +383,18 @@ newPeerEnv :: forall e m . ( MonadIO m
, Hashable (PubKey 'Sign (Encryption e)) , Hashable (PubKey 'Sign (Encryption e))
, Hashable PeerNonce , Hashable PeerNonce
) )
=> AnyStorage => AnyPeerLocator e
-> AnyStorage
-> Fabriq e -> Fabriq e
-> Peer e -> Peer e
-> m (PeerEnv e) -> m (PeerEnv e)
newPeerEnv s bus p = do newPeerEnv pl s bus p = do
let _envSelf = p let _envSelf = p
_envPeerNonce <- newNonce @() _envPeerNonce <- newNonce @()
let _envFab = bus let _envFab = bus
let _envStorage = s let _envStorage = s
_envPeerLocator <- AnyPeerLocator <$> newStaticPeerLocator @e mempty let _envPeerLocator = pl
_envDeferred <- newPipeline defProtoPipelineSize _envDeferred <- newPipeline defProtoPipelineSize
_envSessions <- liftIO (Cache.newCache (Just defCookieTimeout)) _envSessions <- liftIO (Cache.newCache (Just defCookieTimeout))
_envEvents <- liftIO (newTVarIO mempty) _envEvents <- liftIO (newTVarIO mempty)

View File

@ -13,16 +13,20 @@ class PeerLocator e l where
bestPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] bestPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e]
addExcluded :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () addExcluded :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m ()
knownPeersForPEX :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e]
knownPeersForPEX = knownPeers
data AnyPeerLocator e = forall a . PeerLocator e a => AnyPeerLocator a data AnyPeerLocator e = forall a . PeerLocator e a => AnyPeerLocator a
instance HasPeer e => PeerLocator e (AnyPeerLocator e) where instance HasPeer e => PeerLocator e (AnyPeerLocator e) where
knownPeers (AnyPeerLocator l) = knownPeers l knownPeers (AnyPeerLocator l) = knownPeers l
knownPeersForPEX (AnyPeerLocator l) = knownPeersForPEX l
addPeers (AnyPeerLocator l) = addPeers l addPeers (AnyPeerLocator l) = addPeers l
delPeers (AnyPeerLocator l) = delPeers l delPeers (AnyPeerLocator l) = delPeers l
addExcluded (AnyPeerLocator l) = addExcluded l addExcluded (AnyPeerLocator l) = addExcluded l
-- FIXME: a better algorithm of choice -- FIXME: a better algorithm of choice
bestPeers (AnyPeerLocator l) = liftIO $ knownPeers l >>= shuffleM bestPeers (AnyPeerLocator l) = knownPeers l >>= liftIO . shuffleM
class Monad m => HasPeerLocator e m where class Monad m => HasPeerLocator e m where
getPeerLocator :: m (AnyPeerLocator e) getPeerLocator :: m (AnyPeerLocator e)

View File

@ -67,11 +67,10 @@ peerExchangeProto :: forall e m . ( MonadIO m
, Pretty (Peer e) , Pretty (Peer e)
, e ~ L4Proto , e ~ L4Proto
) )
=> ( [Peer e] -> m [Peer e] ) => PeerExchange e
-> PeerExchange e
-> m () -> m ()
peerExchangeProto pexFilt msg = do peerExchangeProto msg = do
case msg of case msg of
PeerExchangeGet n -> peerExchangeGet PEX1 n PeerExchangeGet n -> peerExchangeGet PEX1 n
PeerExchangeGet2 n -> peerExchangeGet PEX2 n PeerExchangeGet2 n -> peerExchangeGet PEX2 n
@ -110,9 +109,6 @@ peerExchangeProto pexFilt msg = do
debug $ "PeerExchangeGet" <+> "from" <+> pretty that debug $ "PeerExchangeGet" <+> "from" <+> pretty that
pl <- getPeerLocator @e
pips <- knownPeers @e pl >>= pexFilt
case pex of case pex of
PEX1 -> do PEX1 -> do
pa <- take defPexMaxPeers <$> getAllPex1Peers pa <- take defPexMaxPeers <$> getAllPex1Peers
@ -131,7 +127,7 @@ getAllPex1Peers :: forall e m .
=> m [IPAddrPort L4Proto] => m [IPAddrPort L4Proto]
getAllPex1Peers = do getAllPex1Peers = do
pl <- getPeerLocator @e pl <- getPeerLocator @e
pips <- knownPeers @e pl pips <- knownPeersForPEX @e pl
-- TODO: tcp-peer-support-in-pex -- TODO: tcp-peer-support-in-pex
pa' <- forM pips $ \p -> do pa' <- forM pips $ \p -> do
auth <- find (KnownPeerKey p) id <&> isJust auth <- find (KnownPeerKey p) id <&> isJust
@ -143,7 +139,6 @@ getAllPex1Peers = do
type PexInfoContext e m = ( Sessions e (KnownPeer e) m type PexInfoContext e m = ( Sessions e (KnownPeer e) m
, HasPeerLocator L4Proto m , HasPeerLocator L4Proto m
-- , Expired e (
) )
getAllPex2Peers :: forall e m . getAllPex2Peers :: forall e m .
@ -154,12 +149,9 @@ getAllPex2Peers :: forall e m .
=> m [PeerAddr L4Proto] => m [PeerAddr L4Proto]
getAllPex2Peers = do getAllPex2Peers = do
pl <- getPeerLocator @e pl <- getPeerLocator @e
pips <- knownPeers @e pl pips <- knownPeersForPEX @e pl
pa' <- forM pips $ \p -> do -- FIXME: random-shuffle
auth <- find (KnownPeerKey p) id forM pips toPeerAddr
maybe1 auth (pure mempty) ( const $ fmap L.singleton (toPeerAddr p) )
-- FIXME: asap-random-shuffle-peers
pure $ mconcat pa'
newtype instance SessionKey e (PeerExchange e) = newtype instance SessionKey e (PeerExchange e) =
PeerExchangeKey (Nonce (PeerExchange e)) PeerExchangeKey (Nonce (PeerExchange e))

View File

@ -8,6 +8,7 @@ import HBS2.Net.Proto.Types
import HBS2.Net.Proto.Peer import HBS2.Net.Proto.Peer
import HBS2.Clock import HBS2.Clock
import HBS2.Net.Proto.Sessions import HBS2.Net.Proto.Sessions
import HBS2.Peer.Brains
import PeerConfig import PeerConfig
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
@ -19,6 +20,7 @@ import Data.Foldable
import Data.Maybe import Data.Maybe
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Set (Set) import Data.Set (Set)
import Data.List qualified as List
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
@ -82,8 +84,10 @@ knownPeersPingLoop :: forall e m . ( HasPeer e
, Pretty (Peer e) , Pretty (Peer e)
, e ~ L4Proto , e ~ L4Proto
, MonadIO m) , MonadIO m)
=> PeerConfig -> m () => PeerConfig
knownPeersPingLoop (PeerConfig syn) = do -> SomeBrains e
-> m ()
knownPeersPingLoop (PeerConfig syn) brains = do
-- FIXME: add validation and error handling -- FIXME: add validation and error handling
-- FIXME: tcp-addr-support-2 -- FIXME: tcp-addr-support-2
let parseKnownPeers xs = do let parseKnownPeers xs = do
@ -91,8 +95,17 @@ knownPeersPingLoop (PeerConfig syn) = do
mapM fromPeerAddr pa mapM fromPeerAddr pa
let them = runReader (cfgValue @PeerKnownPeer) syn & Set.toList let them = runReader (cfgValue @PeerKnownPeer) syn & Set.toList
knownPeers' <- liftIO $ parseKnownPeers them
forever do pex <- listPexInfo @e brains >>= liftIO . mapM fromPeerAddr
forM_ knownPeers' (sendPing @e)
pause @'Minutes 20 knownPeers' <- liftIO $ parseKnownPeers them
let pips = List.nub (knownPeers' <> pex)
forever do
forM_ pips (sendPing @e)
pause @'Minutes 10

View File

@ -107,6 +107,10 @@ instance ( Hashable (Peer e)
listDownloads = liftIO . selectDownloads listDownloads = liftIO . selectDownloads
listPexInfo = liftIO . selectPexInfo
updatePexInfo b pex = updateOP b $ insertPexInfo b pex
delDownload br what = do delDownload br what = do
liftIO $ Cache.insert (view brainsRemoved br) what () liftIO $ Cache.insert (view brainsRemoved br) what ()
updateOP br (deleteDownload br what) updateOP br (deleteDownload br what)
@ -544,6 +548,7 @@ SAVEPOINT zzz1;
DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600;
DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600;
DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > 300; DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > 300;
DELETE FROM statedb.pexinfo where seen < datetime('now', '-7 days');
RELEASE SAVEPOINT zzz1; RELEASE SAVEPOINT zzz1;
@ -655,6 +660,33 @@ selectDownloads br = do
--- ---
insertPexInfo :: forall e . ( e ~ L4Proto)
=> BasicBrains e
-> [PeerAddr e]
-> IO ()
insertPexInfo br peers = liftIO do
let conn = view brainsDb br
forM_ peers $ \p -> do
execute conn [qc|
insert into statedb.pexinfo (peer)
values(?)
on conflict (peer)
do update set seen = datetime('now','localtime')
|] (Only (show $ pretty p))
selectPexInfo :: forall e . (e ~ L4Proto)
=> BasicBrains e
-> IO [PeerAddr e]
selectPexInfo br = liftIO do
let conn = view brainsDb br
query_ conn [qc|
select peer from statedb.pexinfo where seen >= datetime('now', '-7 days')
order by seen desc
limit 100
|] <&> fmap (fromStringMay . fromOnly)
<&> catMaybes
-- FIXME: eventually-close-db -- FIXME: eventually-close-db
newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m)
=> PeerConfig => PeerConfig
@ -693,6 +725,13 @@ newBasicBrains cfg = liftIO do
); );
|] |]
execute_ conn [qc|
create table if not exists statedb.pexinfo
( peer text not null primary key
, seen DATE DEFAULT (datetime('now','localtime'))
);
|]
execute_ conn [qc| execute_ conn [qc|
create table if not exists ancestors create table if not exists ancestors
( child text not null ( child text not null

View File

@ -0,0 +1,81 @@
{-# Language UndecidableInstances #-}
module BrainyPeerLocator
( BrainyPeerLocator
, newBrainyPeerLocator
) where
import HBS2.Prelude
import HBS2.Net.Proto
import HBS2.Net.PeerLocator
import HBS2.Peer.Brains
import Control.Concurrent.STM
import Data.Set (Set)
import Data.Set qualified as Set
data BrainyPeerLocator =
BrainyPeerLocator
{ brains :: SomeBrains L4Proto
, include :: TVar (Set (Peer L4Proto))
, exclude :: TVar (Set (Peer L4Proto))
}
newBrainyPeerLocator :: forall e m . (Ord (Peer e), HasPeer e, e ~ L4Proto, MonadIO m)
=> SomeBrains e
-> [Peer e]
-> m BrainyPeerLocator
newBrainyPeerLocator brains seeds = do
tv <- liftIO $ newTVarIO (Set.fromList seeds)
tv2 <- liftIO $ newTVarIO mempty
pure $ BrainyPeerLocator brains tv tv2
instance (Ord (Peer L4Proto), Pretty (Peer L4Proto)) => PeerLocator L4Proto BrainyPeerLocator where
knownPeers (BrainyPeerLocator br peers e) = do
ps <- liftIO $ readTVarIO peers
excl <- liftIO $ readTVarIO e
pure $ Set.toList (ps `Set.difference` excl)
knownPeersForPEX l@(BrainyPeerLocator br _ e) = do
excl <- liftIO $ readTVarIO e
pips <- knownPeers @L4Proto l
<&> filter udpOnly
<&> Set.fromList
tcp <- listTCPPexCandidates @L4Proto br
>>= liftIO . mapM (fromPeerAddr @L4Proto)
<&> Set.fromList
let what = Set.toList ( (pips <> tcp) `Set.difference` excl)
addr <- liftIO $ mapM (toPeerAddr @L4Proto) what
updatePexInfo br addr
pure what
where
udpOnly = \case
(PeerL4 UDP _) -> True
_ -> False
addPeers (BrainyPeerLocator _ peers te) new = do
excl <- liftIO $ readTVarIO te
liftIO $ atomically $ modifyTVar' peers ((`Set.difference` excl) . (<> Set.fromList new))
delPeers (BrainyPeerLocator _ peers _) del = do
liftIO $ atomically $ modifyTVar' peers (`Set.difference` Set.fromList del)
addExcluded p excl = do
liftIO $ atomically $ modifyTVar' (exclude p) (<> Set.fromList excl)
bestPeers = knownPeers

View File

@ -42,6 +42,7 @@ import HBS2.Data.Detect
import HBS2.System.Logger.Simple hiding (info) import HBS2.System.Logger.Simple hiding (info)
import Brains import Brains
import BrainyPeerLocator
import PeerTypes import PeerTypes
import BlockDownload import BlockDownload
import CheckBlockAnnounce (checkBlockAnnounce) import CheckBlockAnnounce (checkBlockAnnounce)
@ -640,6 +641,8 @@ runPeer opts = U.handle (\e -> myException e
denv <- newDownloadEnv brains denv <- newDownloadEnv brains
pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty
let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen
trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr'
@ -683,7 +686,7 @@ runPeer opts = U.handle (\e -> myException e
peer peer
} }
penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) penv <- newPeerEnv pl (AnyStorage s) (Fabriq proxy) (getOwnPeer mess)
pure (proxy, penv) pure (proxy, penv)
proxyThread <- async $ runProxyMessaging proxy proxyThread <- async $ runProxyMessaging proxy
@ -708,15 +711,6 @@ runPeer opts = U.handle (\e -> myException e
rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter
let pexFilt pips = do
tcpex <- listTCPPexCandidates @e brains -- <&> HashSet.fromList
pips2 <- filter onlyUDP <$> mapM toPeerAddr pips
mapM fromPeerAddr (L.nub (pips2 <> tcpex))
where
onlyUDP = \case
(L4Address UDP _) -> True
_ -> False
let onNoBlock (p, h) = do let onNoBlock (p, h) = do
already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust
unless already do unless already do
@ -949,7 +943,7 @@ runPeer opts = U.handle (\e -> myException e
peerThread "peerPingLoop" (peerPingLoop @e conf penv) peerThread "peerPingLoop" (peerPingLoop @e conf penv)
peerThread "knownPeersPingLoop" (knownPeersPingLoop @e conf) peerThread "knownPeersPingLoop" (knownPeersPingLoop @e conf (SomeBrains brains))
peerThread "bootstrapDnsLoop" (bootstrapDnsLoop @e conf) peerThread "bootstrapDnsLoop" (bootstrapDnsLoop @e conf)
@ -978,7 +972,7 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse blockAnnounceProto , makeResponse blockAnnounceProto
, makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv)
, makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter) , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter)
, makeResponse (peerExchangeProto pexFilt) , makeResponse peerExchangeProto
, makeResponse refLogUpdateProto , makeResponse refLogUpdateProto
, makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (refLogRequestProto reflogReqAdapter)
, makeResponse (peerMetaProto peerMeta) , makeResponse (peerMetaProto peerMeta)
@ -1028,7 +1022,7 @@ runPeer opts = U.handle (\e -> myException e
runMaybeT do runMaybeT do
lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (Notify @e puk box) lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (Notify @e puk box)
menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast)
ann <- liftIO $ async $ runPeerM menv $ do ann <- liftIO $ async $ runPeerM menv $ do

View File

@ -154,6 +154,7 @@ executable hbs2-peer
other-modules: other-modules:
BlockDownload BlockDownload
, BrainyPeerLocator
, DownloadQ , DownloadQ
, DownloadMon , DownloadMon
, EncryptionKeys , EncryptionKeys

View File

@ -43,6 +43,12 @@ class HasBrains e a where
listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e] listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e]
listTCPPexCandidates _ = pure mempty listTCPPexCandidates _ = pure mempty
listPexInfo :: MonadIO m => a -> m [PeerAddr e]
listPexInfo _ = pure mempty
updatePexInfo :: MonadIO m => a -> [PeerAddr e] -> m ()
updatePexInfo _ _ = pure ()
listDownloads :: MonadIO m => a -> m [(HashRef, Integer)] listDownloads :: MonadIO m => a -> m [(HashRef, Integer)]
listDownloads _ = pure mempty listDownloads _ = pure mempty
@ -154,6 +160,9 @@ instance HasBrains e (SomeBrains e) where
setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a
listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a
listPexInfo (SomeBrains a) = listPexInfo @e a
updatePexInfo (SomeBrains a) = updatePexInfo @e a
listDownloads (SomeBrains a) = listDownloads @e a listDownloads (SomeBrains a) = listDownloads @e a
delDownload (SomeBrains a) = delDownload @e a delDownload (SomeBrains a) = delDownload @e a