diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs b/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs index 931ff70b..282ff62a 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs @@ -2,7 +2,7 @@ {-# Language RecordWildCards #-} module HBS2.Net.Messaging.Encrypted.ByPass ( ForByPass - , ByPass + , ByPass(..) , ByPassOpts(..) , ByPassStat(..) , byPassDef diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index 0b7cd883..1cfb5107 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -108,29 +108,11 @@ newMessagingUDP reuse saddr = udpWorker :: MessagingUDP -> TVar Socket -> IO () udpWorker env tso = do - so <- readTVarIO tso - - rcvLoop <- async $ forever $ do - -- so <- readTVarIO tso - -- pause ( 10 :: Timeout 'Seconds ) + forever $ do (msg, from) <- recvFrom so defMaxDatagram - -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) - -- FIXME: ASAP-check-addr-type liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) - sndLoop <- async $ forever $ do - pause ( 10 :: Timeout 'Seconds ) - -- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env) - -- print "YAY!" - -- sendAllTo so (LBS.toStrict msg) (view sockAddr whom) - - -- (msg, from) <- recvFrom so defMaxDatagram - -- liftIO $ print $ "recv:" <+> pretty (BS.length msg) - -- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg) - - void $ waitAnyCatchCancel [rcvLoop,sndLoop] - -- FIXME: stopping runMessagingUDP :: MonadIO m => MessagingUDP -> m () diff --git a/hbs2-peer/app/ByPassWorker.hs b/hbs2-peer/app/ByPassWorker.hs index fb0c4b56..fb39d879 100644 --- a/hbs2-peer/app/ByPassWorker.hs +++ b/hbs2-peer/app/ByPassWorker.hs @@ -1,9 +1,11 @@ +{-# Language RecordWildCards #-} module ByPassWorker where import HBS2.Prelude import HBS2.Clock import HBS2.Actors.Peer import HBS2.Net.Messaging.Encrypted.ByPass +import HBS2.Misc.PrettyStuff import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.PeerExchange @@ -12,6 +14,7 @@ import HBS2.Net.Proto.Types import PeerTypes +import Data.HashMap.Strict qualified as HM import Control.Monad import UnliftIO import Control.Monad.Trans.Cont @@ -24,26 +27,36 @@ byPassWorker :: ( ForByPass e , Expires (SessionKey e (KnownPeer e)) ) => ByPass e w - -> PeerEnv e -> m () -byPassWorker bp penv = do +byPassWorker bp@ByPass{..} = do + + info $ green "byPassWorker started" flip runContT pure do void $ ContT $ withAsync $ forever do + pause @'Seconds 60 stats <- getStat bp info $ "ByPass stats" <> line <> indent 2 (pretty stats) <> line - pause @'Seconds 60 - forever do - pips <- getKnownPeers - cleanupByPassMessaging bp pips - pause @'Seconds 600 + void $ ContT $ withAsync $ forever do + pause @'Seconds 10 + p <- readTVarIO probe + acceptReport p =<< do + h <- readTVarIO heySent <&> ("heysSent",) . fromIntegral . HM.size + n <- readTVarIO noncesByPeer <&> ("noncesByPeer",) . fromIntegral . HM.size + f <- readTVarIO flowKeys <&> ("flowKeys",) . fromIntegral . HM.size + pure [h,n,f] + + forever do + pips <- lift getKnownPeers + cleanupByPassMessaging bp pips + pause @'Seconds 600 diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index d1e3caab..9b337638 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -895,6 +895,8 @@ runPeer opts = respawnOnError opts $ runResourceT do -- через TQueue. Нужно его удалить повсеместно -- Или сделать некий AnyAddr/DefaultAddr peerSelf <- fromPeerAddr "0.0.0.0:7351" + byPassProbe <- newSimpleProbe "Messaging.Encrypted.ByPass" + addProbe byPassProbe byPass <- newByPassMessaging @L4Proto byPassDef proxy @@ -902,6 +904,8 @@ runPeer opts = respawnOnError opts $ runResourceT do (view peerSignPk pc) (view peerSignSk pc) + byPassMessagingSetProbe byPass byPassProbe + penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf do probe <- newSimpleProbe "PeerEnv_Main" @@ -1138,7 +1142,7 @@ runPeer opts = respawnOnError opts $ runResourceT do debug "sending local peer announce" request localMulticast (PeerAnnounce @e pnonce) - peerThread "byPassWorker" (byPassWorker byPass penv) + peerThread "byPassWorker" (byPassWorker byPass) peerThread "httpWorker" (httpWorker conf peerMeta denv)