diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index a141dd4f..258d1b20 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -383,17 +383,18 @@ newPeerEnv :: forall e m . ( MonadIO m , Hashable (PubKey 'Sign (Encryption e)) , Hashable PeerNonce ) - => AnyStorage + => AnyPeerLocator e + -> AnyStorage -> Fabriq e -> Peer e -> m (PeerEnv e) -newPeerEnv s bus p = do +newPeerEnv pl s bus p = do let _envSelf = p _envPeerNonce <- newNonce @() let _envFab = bus let _envStorage = s - _envPeerLocator <- AnyPeerLocator <$> newStaticPeerLocator @e mempty + let _envPeerLocator = pl _envDeferred <- newPipeline defProtoPipelineSize _envSessions <- liftIO (Cache.newCache (Just defCookieTimeout)) _envEvents <- liftIO (newTVarIO mempty) diff --git a/hbs2-core/lib/HBS2/Net/PeerLocator.hs b/hbs2-core/lib/HBS2/Net/PeerLocator.hs index ab6182f0..14eb7215 100644 --- a/hbs2-core/lib/HBS2/Net/PeerLocator.hs +++ b/hbs2-core/lib/HBS2/Net/PeerLocator.hs @@ -13,16 +13,20 @@ class PeerLocator e l where bestPeers :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] addExcluded :: forall m . (HasPeer e, MonadIO m) => l -> [Peer e] -> m () + knownPeersForPEX :: forall m . (HasPeer e, MonadIO m) => l -> m [Peer e] + knownPeersForPEX = knownPeers + data AnyPeerLocator e = forall a . PeerLocator e a => AnyPeerLocator a instance HasPeer e => PeerLocator e (AnyPeerLocator e) where knownPeers (AnyPeerLocator l) = knownPeers l + knownPeersForPEX (AnyPeerLocator l) = knownPeersForPEX l addPeers (AnyPeerLocator l) = addPeers l delPeers (AnyPeerLocator l) = delPeers l addExcluded (AnyPeerLocator l) = addExcluded l -- FIXME: a better algorithm of choice - bestPeers (AnyPeerLocator l) = liftIO $ knownPeers l >>= shuffleM + bestPeers (AnyPeerLocator l) = knownPeers l >>= liftIO . shuffleM class Monad m => HasPeerLocator e m where getPeerLocator :: m (AnyPeerLocator e) diff --git a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs index 6aa966c9..581432a4 100644 --- a/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs +++ b/hbs2-core/lib/HBS2/Net/Proto/PeerExchange.hs @@ -67,11 +67,10 @@ peerExchangeProto :: forall e m . ( MonadIO m , Pretty (Peer e) , e ~ L4Proto ) - => ( [Peer e] -> m [Peer e] ) - -> PeerExchange e + => PeerExchange e -> m () -peerExchangeProto pexFilt msg = do +peerExchangeProto msg = do case msg of PeerExchangeGet n -> peerExchangeGet PEX1 n PeerExchangeGet2 n -> peerExchangeGet PEX2 n @@ -110,9 +109,6 @@ peerExchangeProto pexFilt msg = do debug $ "PeerExchangeGet" <+> "from" <+> pretty that - pl <- getPeerLocator @e - pips <- knownPeers @e pl >>= pexFilt - case pex of PEX1 -> do pa <- take defPexMaxPeers <$> getAllPex1Peers @@ -131,7 +127,7 @@ getAllPex1Peers :: forall e m . => m [IPAddrPort L4Proto] getAllPex1Peers = do pl <- getPeerLocator @e - pips <- knownPeers @e pl + pips <- knownPeersForPEX @e pl -- TODO: tcp-peer-support-in-pex pa' <- forM pips $ \p -> do auth <- find (KnownPeerKey p) id <&> isJust @@ -143,7 +139,6 @@ getAllPex1Peers = do type PexInfoContext e m = ( Sessions e (KnownPeer e) m , HasPeerLocator L4Proto m - -- , Expired e ( ) getAllPex2Peers :: forall e m . @@ -154,12 +149,9 @@ getAllPex2Peers :: forall e m . => m [PeerAddr L4Proto] getAllPex2Peers = do pl <- getPeerLocator @e - pips <- knownPeers @e pl - pa' <- forM pips $ \p -> do - auth <- find (KnownPeerKey p) id - maybe1 auth (pure mempty) ( const $ fmap L.singleton (toPeerAddr p) ) - -- FIXME: asap-random-shuffle-peers - pure $ mconcat pa' + pips <- knownPeersForPEX @e pl + -- FIXME: random-shuffle + forM pips toPeerAddr newtype instance SessionKey e (PeerExchange e) = PeerExchangeKey (Nonce (PeerExchange e)) diff --git a/hbs2-peer/app/Bootstrap.hs b/hbs2-peer/app/Bootstrap.hs index b05ea6c7..9b307f48 100644 --- a/hbs2-peer/app/Bootstrap.hs +++ b/hbs2-peer/app/Bootstrap.hs @@ -8,6 +8,7 @@ import HBS2.Net.Proto.Types import HBS2.Net.Proto.Peer import HBS2.Clock import HBS2.Net.Proto.Sessions +import HBS2.Peer.Brains import PeerConfig import HBS2.System.Logger.Simple @@ -19,6 +20,7 @@ import Data.Foldable import Data.Maybe import Data.Set qualified as Set import Data.Set (Set) +import Data.List qualified as List import Control.Monad.Trans.Maybe @@ -82,8 +84,10 @@ knownPeersPingLoop :: forall e m . ( HasPeer e , Pretty (Peer e) , e ~ L4Proto , MonadIO m) - => PeerConfig -> m () -knownPeersPingLoop (PeerConfig syn) = do + => PeerConfig + -> SomeBrains e + -> m () +knownPeersPingLoop (PeerConfig syn) brains = do -- FIXME: add validation and error handling -- FIXME: tcp-addr-support-2 let parseKnownPeers xs = do @@ -91,8 +95,17 @@ knownPeersPingLoop (PeerConfig syn) = do mapM fromPeerAddr pa let them = runReader (cfgValue @PeerKnownPeer) syn & Set.toList - knownPeers' <- liftIO $ parseKnownPeers them - forever do - forM_ knownPeers' (sendPing @e) - pause @'Minutes 20 + + pex <- listPexInfo @e brains >>= liftIO . mapM fromPeerAddr + + knownPeers' <- liftIO $ parseKnownPeers them + + let pips = List.nub (knownPeers' <> pex) + + forever do + forM_ pips (sendPing @e) + pause @'Minutes 10 + + + diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 55e51551..bc512f7f 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -107,6 +107,10 @@ instance ( Hashable (Peer e) listDownloads = liftIO . selectDownloads + listPexInfo = liftIO . selectPexInfo + + updatePexInfo b pex = updateOP b $ insertPexInfo b pex + delDownload br what = do liftIO $ Cache.insert (view brainsRemoved br) what () updateOP br (deleteDownload br what) @@ -544,6 +548,7 @@ SAVEPOINT zzz1; DELETE FROM ancestors WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM seenby WHERE strftime('%s','now') - strftime('%s', ts) > 600; DELETE FROM blocksize WHERE strftime('%s','now') - strftime('%s', ts) > 300; +DELETE FROM statedb.pexinfo where seen < datetime('now', '-7 days'); RELEASE SAVEPOINT zzz1; @@ -655,6 +660,33 @@ selectDownloads br = do --- +insertPexInfo :: forall e . ( e ~ L4Proto) + => BasicBrains e + -> [PeerAddr e] + -> IO () +insertPexInfo br peers = liftIO do + let conn = view brainsDb br + forM_ peers $ \p -> do + execute conn [qc| + insert into statedb.pexinfo (peer) + values(?) + on conflict (peer) + do update set seen = datetime('now','localtime') + |] (Only (show $ pretty p)) + + +selectPexInfo :: forall e . (e ~ L4Proto) + => BasicBrains e + -> IO [PeerAddr e] +selectPexInfo br = liftIO do + let conn = view brainsDb br + query_ conn [qc| + select peer from statedb.pexinfo where seen >= datetime('now', '-7 days') + order by seen desc + limit 100 + |] <&> fmap (fromStringMay . fromOnly) + <&> catMaybes + -- FIXME: eventually-close-db newBasicBrains :: forall e m . (Hashable (Peer e), MonadIO m) => PeerConfig @@ -693,6 +725,13 @@ newBasicBrains cfg = liftIO do ); |] + execute_ conn [qc| + create table if not exists statedb.pexinfo + ( peer text not null primary key + , seen DATE DEFAULT (datetime('now','localtime')) + ); + |] + execute_ conn [qc| create table if not exists ancestors ( child text not null diff --git a/hbs2-peer/app/BrainyPeerLocator.hs b/hbs2-peer/app/BrainyPeerLocator.hs new file mode 100644 index 00000000..ebcdf649 --- /dev/null +++ b/hbs2-peer/app/BrainyPeerLocator.hs @@ -0,0 +1,81 @@ +{-# Language UndecidableInstances #-} +module BrainyPeerLocator + ( BrainyPeerLocator + , newBrainyPeerLocator + ) where + + +import HBS2.Prelude +import HBS2.Net.Proto +import HBS2.Net.PeerLocator +import HBS2.Peer.Brains + +import Control.Concurrent.STM +import Data.Set (Set) +import Data.Set qualified as Set + + +data BrainyPeerLocator = + BrainyPeerLocator + { brains :: SomeBrains L4Proto + , include :: TVar (Set (Peer L4Proto)) + , exclude :: TVar (Set (Peer L4Proto)) + } + +newBrainyPeerLocator :: forall e m . (Ord (Peer e), HasPeer e, e ~ L4Proto, MonadIO m) + => SomeBrains e + -> [Peer e] + -> m BrainyPeerLocator + +newBrainyPeerLocator brains seeds = do + tv <- liftIO $ newTVarIO (Set.fromList seeds) + tv2 <- liftIO $ newTVarIO mempty + pure $ BrainyPeerLocator brains tv tv2 + +instance (Ord (Peer L4Proto), Pretty (Peer L4Proto)) => PeerLocator L4Proto BrainyPeerLocator where + + knownPeers (BrainyPeerLocator br peers e) = do + + + ps <- liftIO $ readTVarIO peers + + excl <- liftIO $ readTVarIO e + pure $ Set.toList (ps `Set.difference` excl) + + knownPeersForPEX l@(BrainyPeerLocator br _ e) = do + + excl <- liftIO $ readTVarIO e + + pips <- knownPeers @L4Proto l + <&> filter udpOnly + <&> Set.fromList + + tcp <- listTCPPexCandidates @L4Proto br + >>= liftIO . mapM (fromPeerAddr @L4Proto) + <&> Set.fromList + + let what = Set.toList ( (pips <> tcp) `Set.difference` excl) + + addr <- liftIO $ mapM (toPeerAddr @L4Proto) what + + updatePexInfo br addr + + pure what + + where + udpOnly = \case + (PeerL4 UDP _) -> True + _ -> False + + + addPeers (BrainyPeerLocator _ peers te) new = do + excl <- liftIO $ readTVarIO te + liftIO $ atomically $ modifyTVar' peers ((`Set.difference` excl) . (<> Set.fromList new)) + + delPeers (BrainyPeerLocator _ peers _) del = do + liftIO $ atomically $ modifyTVar' peers (`Set.difference` Set.fromList del) + + addExcluded p excl = do + liftIO $ atomically $ modifyTVar' (exclude p) (<> Set.fromList excl) + + bestPeers = knownPeers diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 3ec7dc03..05374bf0 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -42,6 +42,7 @@ import HBS2.Data.Detect import HBS2.System.Logger.Simple hiding (info) import Brains +import BrainyPeerLocator import PeerTypes import BlockDownload import CheckBlockAnnounce (checkBlockAnnounce) @@ -640,6 +641,8 @@ runPeer opts = U.handle (\e -> myException e denv <- newDownloadEnv brains + pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty + let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' @@ -683,7 +686,7 @@ runPeer opts = U.handle (\e -> myException e peer } - penv <- newPeerEnv (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) + penv <- newPeerEnv pl (AnyStorage s) (Fabriq proxy) (getOwnPeer mess) pure (proxy, penv) proxyThread <- async $ runProxyMessaging proxy @@ -708,15 +711,6 @@ runPeer opts = U.handle (\e -> myException e rcw <- async $ liftIO $ runRefChanRelyWorker rce refChanAdapter - let pexFilt pips = do - tcpex <- listTCPPexCandidates @e brains -- <&> HashSet.fromList - pips2 <- filter onlyUDP <$> mapM toPeerAddr pips - mapM fromPeerAddr (L.nub (pips2 <> tcpex)) - where - onlyUDP = \case - (L4Address UDP _) -> True - _ -> False - let onNoBlock (p, h) = do already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust unless already do @@ -949,7 +943,7 @@ runPeer opts = U.handle (\e -> myException e peerThread "peerPingLoop" (peerPingLoop @e conf penv) - peerThread "knownPeersPingLoop" (knownPeersPingLoop @e conf) + peerThread "knownPeersPingLoop" (knownPeersPingLoop @e conf (SomeBrains brains)) peerThread "bootstrapDnsLoop" (bootstrapDnsLoop @e conf) @@ -978,7 +972,7 @@ runPeer opts = U.handle (\e -> myException e , makeResponse blockAnnounceProto , makeResponse (withCredentials @e pc . peerHandShakeProto hshakeAdapter penv) , makeResponse (withCredentials @e pc . encryptionHandshakeProto encryptionHshakeAdapter) - , makeResponse (peerExchangeProto pexFilt) + , makeResponse peerExchangeProto , makeResponse refLogUpdateProto , makeResponse (refLogRequestProto reflogReqAdapter) , makeResponse (peerMetaProto peerMeta) @@ -1028,7 +1022,7 @@ runPeer opts = U.handle (\e -> myException e runMaybeT do lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (Notify @e puk box) - menv <- newPeerEnv (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) + menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) ann <- liftIO $ async $ runPeerM menv $ do diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 6afa7e3f..d147be7d 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -154,6 +154,7 @@ executable hbs2-peer other-modules: BlockDownload + , BrainyPeerLocator , DownloadQ , DownloadMon , EncryptionKeys diff --git a/hbs2-peer/lib/HBS2/Peer/Brains.hs b/hbs2-peer/lib/HBS2/Peer/Brains.hs index 713087dc..d5641c16 100644 --- a/hbs2-peer/lib/HBS2/Peer/Brains.hs +++ b/hbs2-peer/lib/HBS2/Peer/Brains.hs @@ -43,6 +43,12 @@ class HasBrains e a where listTCPPexCandidates :: MonadIO m => a -> m [PeerAddr e] listTCPPexCandidates _ = pure mempty + listPexInfo :: MonadIO m => a -> m [PeerAddr e] + listPexInfo _ = pure mempty + + updatePexInfo :: MonadIO m => a -> [PeerAddr e] -> m () + updatePexInfo _ _ = pure () + listDownloads :: MonadIO m => a -> m [(HashRef, Integer)] listDownloads _ = pure mempty @@ -154,6 +160,9 @@ instance HasBrains e (SomeBrains e) where setActiveTCPSessions (SomeBrains a) = setActiveTCPSessions @e a listTCPPexCandidates (SomeBrains a) = listTCPPexCandidates @e a + listPexInfo (SomeBrains a) = listPexInfo @e a + updatePexInfo (SomeBrains a) = updatePexInfo @e a + listDownloads (SomeBrains a) = listDownloads @e a delDownload (SomeBrains a) = delDownload @e a