From 0d3036f7be9395f98921862595d17eab6538335d Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 11 Feb 2025 12:26:29 +0300 Subject: [PATCH] hbs2-peer extract multicast worker --- hbs2-peer/app/Multicast.hs | 237 +++++++++++++++++++++++++++++++++++++ hbs2-peer/app/PeerMain.hs | 61 ++-------- hbs2-peer/app/PeerTypes.hs | 14 +++ hbs2-peer/hbs2-peer.cabal | 1 + 4 files changed, 260 insertions(+), 53 deletions(-) create mode 100644 hbs2-peer/app/Multicast.hs diff --git a/hbs2-peer/app/Multicast.hs b/hbs2-peer/app/Multicast.hs new file mode 100644 index 00000000..f8e0cf62 --- /dev/null +++ b/hbs2-peer/app/Multicast.hs @@ -0,0 +1,237 @@ +module Multicast where + +import HBS2.Prelude + +import PeerTypes + +import HBS2.Actors.Peer +import HBS2.Base58 +import HBS2.Merkle +import HBS2.Defaults +import HBS2.System.Dir (takeDirectory,()) +import HBS2.Events +import HBS2.Hash +import HBS2.Data.Types.Refs +import HBS2.Data.Types.SignedBox +import HBS2.Data.Types +import HBS2.Net.Auth.Credentials +import HBS2.Net.Auth.Schema() +import HBS2.Net.IP.Addr +import HBS2.Net.Messaging.UDP +import HBS2.Net.Messaging.TCP +import HBS2.Net.Messaging.Unix +import HBS2.Net.Messaging.Encrypted.ByPass +import HBS2.Net.PeerLocator +import HBS2.Peer.Proto +import HBS2.Peer.Proto.RefChan qualified as R +import HBS2.Peer.Proto.RefChan.Adapter +import HBS2.Net.Proto.Notify +import HBS2.Peer.Proto.Mailbox +import HBS2.OrDie +import HBS2.Storage.Simple +import HBS2.Storage.Operations.Missed +import HBS2.Data.Detect + +import HBS2.KeyMan.Keys.Direct + +import HBS2.Version +import Paths_hbs2_peer qualified as Pkg + +import Brains +import BrainyPeerLocator +import ByPassWorker +import PeerTypes hiding (info) +import BlockDownloadNew +import CheckBlockAnnounce (checkBlockAnnounce) +import CheckPeer (peerBanned) +import PeerInfo +import PeerConfig +import Bootstrap +import CheckMetrics +import RefLog qualified +import RefLog (reflogWorker) +import LWWRef (lwwRefWorker) +import MailboxProtoWorker +import HttpWorker +import DispatchProxy +import PeerMeta +import Watchdogs +import CLI.Common +import CLI.RefChan +import CLI.LWWRef +import CLI.Mailbox +import RefChan +import RefChanNotifyLog +import Fetch (fetchHash) +import Log hiding (info) + +import HBS2.Misc.PrettyStuff +import HBS2.Peer.RPC.Internal.Types() +import HBS2.Peer.RPC.Internal.Storage() + +import HBS2.Peer.RPC.API.Storage +import HBS2.Peer.RPC.API.Peer +import HBS2.Peer.RPC.API.RefLog +import HBS2.Peer.RPC.API.RefChan +import HBS2.Peer.RPC.API.LWWRef +import HBS2.Peer.RPC.API.Mailbox +import HBS2.Peer.Notify +import HBS2.Peer.RPC.Client.StorageClient + +import HBS2.Peer.Proto.LWWRef.Internal + +import RPC2(RPC2Context(..)) + +import Data.Config.Suckless.Script hiding (optional) +import Data.Config.Suckless.Almost.RPC + +import Codec.Serialise as Serialise +import Control.Concurrent (myThreadId) +-- import Control.Concurrent.STM +import Control.Exception as Exception +import Control.Monad.Reader +import Control.Monad.Trans.Maybe +import Control.Monad.Trans.Writer.CPS qualified as W +import Crypto.Saltine (sodiumInit) +import Data.Aeson qualified as Aeson +import Data.ByteString.Lazy qualified as LBS +import Data.ByteString qualified as BS +import Data.Cache qualified as Cache +import Data.Coerce +import Data.Fixed +import Data.List qualified as L +import Data.Map (Map) +import Data.Map qualified as Map +import Data.Maybe +import Data.Either +import Data.Set qualified as Set +import Data.Set (Set) +import Data.Text qualified as Text +import Data.Text.IO qualified as Text +import Data.Time.Clock.POSIX +import Data.Time.Format +import Lens.Micro.Platform as Lens +import Network.Socket +import Options.Applicative +import Prettyprinter.Render.Terminal +import System.Directory +import System.Environment +import System.Exit +import System.IO +import System.Mem +import System.Metrics +import System.Posix.Process +import System.Posix.Signals +import Control.Monad.Trans.Cont + + +import UnliftIO (MonadUnliftIO(..)) +import UnliftIO.Exception qualified as U +import UnliftIO.STM +import UnliftIO.Async +import UnliftIO.Concurrent (getNumCapabilities) + +import Streaming.Prelude qualified as S + + +import Data.Kind + + -- menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) + -- do + -- probe <- newSimpleProbe "PeerEnv_Announce" + -- addProbe probe + -- peerEnvSetProbe menv probe + + -- probesMenv <- liftIO $ async $ forever do + -- pause @'Seconds 10 + -- peerEnvCollectProbes menv + + -- ann <- liftIO $ async $ runPeerM menv $ do + + -- self <- ownPeer @e + + -- subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do + -- unless (p == self) do + -- pa <- toPeerAddr p + -- checkBlockAnnounce conf penv no pa (view biHash bi) + + -- subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent{}) -> do + -- -- debug $ "Got peer announce!" <+> pretty pip + -- emitToPeer penv PeerAnnounceEventKey pe + + -- runProto @e + -- [ makeResponse blockAnnounceProto + -- , makeResponse peerAnnounceProto + -- ] + + +multicastWorker :: forall e s m . ( s ~ Encryption e + , e ~ L4Proto + , MonadUnliftIO m + -- , HasStorage m + -- , HasPeerLocator e m + -- , HasPeerNonce L4Proto m + ) + => PeerConfig -> PeerEnv e -> PeerM e m () + +multicastWorker conf penv = recover do + + debug $ red "multicastWorker started" + + sto <- getStorage + pl <- getPeerLocator @e + pnonce <- peerNonce @e + + localMCast_ <- liftIO newEmptyTMVarIO + + flip runContT pure do + + mcast' <- lift (newMessagingUDPMulticast defLocalMulticast) + + -- FIXME: log-on-exit + mcast <- ContT $ maybe1 mcast' none + + messMcast <- ContT $ withAsync $ runMessagingUDP mcast + + menv <- newPeerEnv pl sto (Fabriq mcast) (getOwnPeer mcast) + + ann <- ContT $ withAsync $ do + localMulticast <- atomically $ takeTMVar localMCast_ + forever do + pips <- getKnownPeers @L4Proto + let w = if null pips then 10 else defPeerAnnounceTime + debug $ yellow "Sending local peer announce" + request localMulticast (PeerAnnounce @e pnonce) + pause w + + liftIO $ runPeerM menv $ do + + self <- ownPeer @e + + atomically $ putTMVar localMCast_ self + + subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do + unless (p == self) do + pa <- toPeerAddr p + checkBlockAnnounce (coerce conf) penv no pa (view biHash bi) + + subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent p _) -> do + debug $ green "Got peer announce from" <+> pretty p + emitToPeer penv PeerAnnounceEventKey pe + + runProto @e + [ makeResponse blockAnnounceProto + , makeResponse peerAnnounceProto + ] + + where + recover m = U.catch (withPeerM penv m) \case + ( e :: IOError ) -> do + err $ "Multicast thread error" <+> viaShow e + let t = 10 :: Timeout 'Seconds + warn $ "Wait" <+> pretty t + pause @'Seconds 120 + recover m + + + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index e9baec45..1711307d 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -54,6 +54,7 @@ import CheckMetrics import RefLog qualified import RefLog (reflogWorker) import LWWRef (lwwRefWorker) +import Multicast import MailboxProtoWorker import HttpWorker import DispatchProxy @@ -149,8 +150,6 @@ import Control.Concurrent.Async (ExceptionInLinkedThread(..)) defStorageThreads :: Integral a => a defStorageThreads = 8 -defLocalMulticast :: String -defLocalMulticast = "239.192.152.145:10153" data PeerListenKey data PeerKeyFileKey @@ -840,13 +839,12 @@ runPeer opts = respawnOnError opts $ do `orDie` "assertion: localMulticastPeer not set" - notice $ "multicast:" <+> pretty localMulticast + -- notice $ "multicast:" <+> pretty localMulticast + -- mcast <- newMessagingUDPMulticast defLocalMulticast + -- `orDie` "Can't start multicast listener" - mcast <- newMessagingUDPMulticast defLocalMulticast - `orDie` "Can't start multicast listener" - - messMcast <- async $ runMessagingUDP mcast + -- messMcast <- async $ runMessagingUDP mcast brains <- newBasicBrains @e conf @@ -1188,12 +1186,7 @@ runPeer opts = respawnOnError opts $ do flip runContT pure do - peerThread "local multicast" $ forever $ do - pips <- getKnownPeers @L4Proto - let w = if null pips then 10 else defPeerAnnounceTime - debug "sending local peer announce" - request localMulticast (PeerAnnounce @e pnonce) - pause w + peerThread "multicastWorker" $ multicastWorker conf env peerThread "byPassWorker" (byPassWorker byPass) @@ -1304,33 +1297,6 @@ runPeer opts = respawnOnError opts $ do runMaybeT do lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box) - menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) - do - probe <- newSimpleProbe "PeerEnv_Announce" - addProbe probe - peerEnvSetProbe menv probe - - probesMenv <- liftIO $ async $ forever do - pause @'Seconds 10 - peerEnvCollectProbes menv - - ann <- liftIO $ async $ runPeerM menv $ do - - self <- ownPeer @e - - subscribe @e BlockAnnounceInfoKey $ \(BlockAnnounceEvent p bi no) -> do - unless (p == self) do - pa <- toPeerAddr p - checkBlockAnnounce conf penv no pa (view biHash bi) - - subscribe @e PeerAnnounceEventKey $ \pe@(PeerAnnounceEvent{}) -> do - -- debug $ "Got peer announce!" <+> pretty pip - emitToPeer penv PeerAnnounceEventKey pe - - runProto @e - [ makeResponse blockAnnounceProto - , makeResponse peerAnnounceProto - ] let k = view peerSignPk pc @@ -1396,9 +1362,8 @@ runPeer opts = respawnOnError opts $ do void $ waitAnyCancel $ w <> [ loop , m1 , rpcProto - , probesMenv - , ann - , messMcast + -- , probesMenv + -- , ann , probesPenv , proxyThread , brainsThread @@ -1411,14 +1376,4 @@ runPeer opts = respawnOnError opts $ do -- we want to clean up all resources throwIO GoAgainException -emitToPeer :: ( MonadIO m - , EventEmitter e a (PeerM e IO) - ) - => PeerEnv e - -> EventKey e a - -> Event e a - -> m () - -emitToPeer env k e = liftIO $ withPeerM env (emit k e) - diff --git a/hbs2-peer/app/PeerTypes.hs b/hbs2-peer/app/PeerTypes.hs index 3c6570b7..bd5b2e8d 100644 --- a/hbs2-peer/app/PeerTypes.hs +++ b/hbs2-peer/app/PeerTypes.hs @@ -65,6 +65,9 @@ import UnliftIO import Streaming.Prelude qualified as S +defLocalMulticast :: String +defLocalMulticast = "239.192.152.145:10153" + data GoAgainException = GoAgainException deriving (Eq,Ord,Show,Typeable) @@ -447,3 +450,14 @@ mkAdapter = do } +emitToPeer :: ( MonadIO m + , EventEmitter e a (PeerM e IO) + ) + => PeerEnv e + -> EventKey e a + -> Event e a + -> m () + +emitToPeer env k e = liftIO $ withPeerM env (emit k e) + + diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index df672439..58c7a65a 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -284,6 +284,7 @@ executable hbs2-peer , RefChanNotifyLog , LWWRef , MailboxProtoWorker + , Multicast , CheckMetrics , HttpWorker , Watchdogs