bypass messaging probes

This commit is contained in:
voidlizard 2024-11-01 08:13:25 +03:00
parent befc867208
commit 389c842ad9
4 changed files with 27 additions and 28 deletions

View File

@ -2,7 +2,7 @@
{-# Language RecordWildCards #-} {-# Language RecordWildCards #-}
module HBS2.Net.Messaging.Encrypted.ByPass module HBS2.Net.Messaging.Encrypted.ByPass
( ForByPass ( ForByPass
, ByPass , ByPass(..)
, ByPassOpts(..) , ByPassOpts(..)
, ByPassStat(..) , ByPassStat(..)
, byPassDef , byPassDef

View File

@ -108,29 +108,11 @@ newMessagingUDP reuse saddr =
udpWorker :: MessagingUDP -> TVar Socket -> IO () udpWorker :: MessagingUDP -> TVar Socket -> IO ()
udpWorker env tso = do udpWorker env tso = do
so <- readTVarIO tso so <- readTVarIO tso
forever $ do
rcvLoop <- async $ forever $ do
-- so <- readTVarIO tso
-- pause ( 10 :: Timeout 'Seconds )
(msg, from) <- recvFrom so defMaxDatagram (msg, from) <- recvFrom so defMaxDatagram
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
-- FIXME: ASAP-check-addr-type
liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg) liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg)
sndLoop <- async $ forever $ do
pause ( 10 :: Timeout 'Seconds )
-- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env)
-- print "YAY!"
-- sendAllTo so (LBS.toStrict msg) (view sockAddr whom)
-- (msg, from) <- recvFrom so defMaxDatagram
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
-- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg)
void $ waitAnyCatchCancel [rcvLoop,sndLoop]
-- FIXME: stopping -- FIXME: stopping
runMessagingUDP :: MonadIO m => MessagingUDP -> m () runMessagingUDP :: MonadIO m => MessagingUDP -> m ()

View File

@ -1,9 +1,11 @@
{-# Language RecordWildCards #-}
module ByPassWorker where module ByPassWorker where
import HBS2.Prelude import HBS2.Prelude
import HBS2.Clock import HBS2.Clock
import HBS2.Actors.Peer import HBS2.Actors.Peer
import HBS2.Net.Messaging.Encrypted.ByPass import HBS2.Net.Messaging.Encrypted.ByPass
import HBS2.Misc.PrettyStuff
import HBS2.Peer.Proto.Peer import HBS2.Peer.Proto.Peer
import HBS2.Peer.Proto.PeerExchange import HBS2.Peer.Proto.PeerExchange
@ -12,6 +14,7 @@ import HBS2.Net.Proto.Types
import PeerTypes import PeerTypes
import Data.HashMap.Strict qualified as HM
import Control.Monad import Control.Monad
import UnliftIO import UnliftIO
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
@ -24,24 +27,34 @@ byPassWorker :: ( ForByPass e
, Expires (SessionKey e (KnownPeer e)) , Expires (SessionKey e (KnownPeer e))
) )
=> ByPass e w => ByPass e w
-> PeerEnv e
-> m () -> m ()
byPassWorker bp penv = do byPassWorker bp@ByPass{..} = do
info $ green "byPassWorker started"
flip runContT pure do flip runContT pure do
void $ ContT $ withAsync $ forever do void $ ContT $ withAsync $ forever do
pause @'Seconds 60
stats <- getStat bp stats <- getStat bp
info $ "ByPass stats" info $ "ByPass stats"
<> line <> line
<> indent 2 (pretty stats) <> indent 2 (pretty stats)
<> line <> line
pause @'Seconds 60
void $ ContT $ withAsync $ forever do
pause @'Seconds 10
p <- readTVarIO probe
acceptReport p =<< do
h <- readTVarIO heySent <&> ("heysSent",) . fromIntegral . HM.size
n <- readTVarIO noncesByPeer <&> ("noncesByPeer",) . fromIntegral . HM.size
f <- readTVarIO flowKeys <&> ("flowKeys",) . fromIntegral . HM.size
pure [h,n,f]
forever do forever do
pips <- getKnownPeers pips <- lift getKnownPeers
cleanupByPassMessaging bp pips cleanupByPassMessaging bp pips
pause @'Seconds 600 pause @'Seconds 600

View File

@ -895,6 +895,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
-- через TQueue. Нужно его удалить повсеместно -- через TQueue. Нужно его удалить повсеместно
-- Или сделать некий AnyAddr/DefaultAddr -- Или сделать некий AnyAddr/DefaultAddr
peerSelf <- fromPeerAddr "0.0.0.0:7351" peerSelf <- fromPeerAddr "0.0.0.0:7351"
byPassProbe <- newSimpleProbe "Messaging.Encrypted.ByPass"
addProbe byPassProbe
byPass <- newByPassMessaging @L4Proto byPass <- newByPassMessaging @L4Proto
byPassDef byPassDef
proxy proxy
@ -902,6 +904,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
(view peerSignPk pc) (view peerSignPk pc)
(view peerSignSk pc) (view peerSignSk pc)
byPassMessagingSetProbe byPass byPassProbe
penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf
do do
probe <- newSimpleProbe "PeerEnv_Main" probe <- newSimpleProbe "PeerEnv_Main"
@ -1138,7 +1142,7 @@ runPeer opts = respawnOnError opts $ runResourceT do
debug "sending local peer announce" debug "sending local peer announce"
request localMulticast (PeerAnnounce @e pnonce) request localMulticast (PeerAnnounce @e pnonce)
peerThread "byPassWorker" (byPassWorker byPass penv) peerThread "byPassWorker" (byPassWorker byPass)
peerThread "httpWorker" (httpWorker conf peerMeta denv) peerThread "httpWorker" (httpWorker conf peerMeta denv)