mirror of https://github.com/voidlizard/hbs2
bypass messaging probes
This commit is contained in:
parent
c972edcc4a
commit
03b638a571
|
@ -2,7 +2,7 @@
|
|||
{-# Language RecordWildCards #-}
|
||||
module HBS2.Net.Messaging.Encrypted.ByPass
|
||||
( ForByPass
|
||||
, ByPass
|
||||
, ByPass(..)
|
||||
, ByPassOpts(..)
|
||||
, ByPassStat(..)
|
||||
, byPassDef
|
||||
|
|
|
@ -108,29 +108,11 @@ newMessagingUDP reuse saddr =
|
|||
|
||||
udpWorker :: MessagingUDP -> TVar Socket -> IO ()
|
||||
udpWorker env tso = do
|
||||
|
||||
so <- readTVarIO tso
|
||||
|
||||
rcvLoop <- async $ forever $ do
|
||||
-- so <- readTVarIO tso
|
||||
-- pause ( 10 :: Timeout 'Seconds )
|
||||
forever $ do
|
||||
(msg, from) <- recvFrom so defMaxDatagram
|
||||
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
|
||||
-- FIXME: ASAP-check-addr-type
|
||||
liftIO $ atomically $ Q0.writeTQueue (sink env) (From (PeerL4 UDP from), LBS.fromStrict msg)
|
||||
|
||||
sndLoop <- async $ forever $ do
|
||||
pause ( 10 :: Timeout 'Seconds )
|
||||
-- (To whom, msg) <- atomically $ Q0.readTQueue (inbox env)
|
||||
-- print "YAY!"
|
||||
-- sendAllTo so (LBS.toStrict msg) (view sockAddr whom)
|
||||
|
||||
-- (msg, from) <- recvFrom so defMaxDatagram
|
||||
-- liftIO $ print $ "recv:" <+> pretty (BS.length msg)
|
||||
-- atomically $ Q.writeTBQueue (sink env) (From (PeerUDP from), LBS.fromStrict msg)
|
||||
|
||||
void $ waitAnyCatchCancel [rcvLoop,sndLoop]
|
||||
|
||||
-- FIXME: stopping
|
||||
|
||||
runMessagingUDP :: MonadIO m => MessagingUDP -> m ()
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
{-# Language RecordWildCards #-}
|
||||
module ByPassWorker where
|
||||
|
||||
import HBS2.Prelude
|
||||
import HBS2.Clock
|
||||
import HBS2.Actors.Peer
|
||||
import HBS2.Net.Messaging.Encrypted.ByPass
|
||||
import HBS2.Misc.PrettyStuff
|
||||
|
||||
import HBS2.Peer.Proto.Peer
|
||||
import HBS2.Peer.Proto.PeerExchange
|
||||
|
@ -12,6 +14,7 @@ import HBS2.Net.Proto.Types
|
|||
|
||||
import PeerTypes
|
||||
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Control.Monad
|
||||
import UnliftIO
|
||||
import Control.Monad.Trans.Cont
|
||||
|
@ -24,26 +27,36 @@ byPassWorker :: ( ForByPass e
|
|||
, Expires (SessionKey e (KnownPeer e))
|
||||
)
|
||||
=> ByPass e w
|
||||
-> PeerEnv e
|
||||
-> m ()
|
||||
|
||||
byPassWorker bp penv = do
|
||||
byPassWorker bp@ByPass{..} = do
|
||||
|
||||
info $ green "byPassWorker started"
|
||||
|
||||
flip runContT pure do
|
||||
|
||||
void $ ContT $ withAsync $ forever do
|
||||
pause @'Seconds 60
|
||||
stats <- getStat bp
|
||||
info $ "ByPass stats"
|
||||
<> line
|
||||
<> indent 2 (pretty stats)
|
||||
<> line
|
||||
|
||||
pause @'Seconds 60
|
||||
|
||||
forever do
|
||||
pips <- getKnownPeers
|
||||
cleanupByPassMessaging bp pips
|
||||
pause @'Seconds 600
|
||||
void $ ContT $ withAsync $ forever do
|
||||
pause @'Seconds 10
|
||||
p <- readTVarIO probe
|
||||
acceptReport p =<< do
|
||||
h <- readTVarIO heySent <&> ("heysSent",) . fromIntegral . HM.size
|
||||
n <- readTVarIO noncesByPeer <&> ("noncesByPeer",) . fromIntegral . HM.size
|
||||
f <- readTVarIO flowKeys <&> ("flowKeys",) . fromIntegral . HM.size
|
||||
pure [h,n,f]
|
||||
|
||||
forever do
|
||||
pips <- lift getKnownPeers
|
||||
cleanupByPassMessaging bp pips
|
||||
pause @'Seconds 600
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -895,6 +895,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
|
|||
-- через TQueue. Нужно его удалить повсеместно
|
||||
-- Или сделать некий AnyAddr/DefaultAddr
|
||||
peerSelf <- fromPeerAddr "0.0.0.0:7351"
|
||||
byPassProbe <- newSimpleProbe "Messaging.Encrypted.ByPass"
|
||||
addProbe byPassProbe
|
||||
byPass <- newByPassMessaging @L4Proto
|
||||
byPassDef
|
||||
proxy
|
||||
|
@ -902,6 +904,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
|
|||
(view peerSignPk pc)
|
||||
(view peerSignSk pc)
|
||||
|
||||
byPassMessagingSetProbe byPass byPassProbe
|
||||
|
||||
penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf
|
||||
do
|
||||
probe <- newSimpleProbe "PeerEnv_Main"
|
||||
|
@ -1138,7 +1142,7 @@ runPeer opts = respawnOnError opts $ runResourceT do
|
|||
debug "sending local peer announce"
|
||||
request localMulticast (PeerAnnounce @e pnonce)
|
||||
|
||||
peerThread "byPassWorker" (byPassWorker byPass penv)
|
||||
peerThread "byPassWorker" (byPassWorker byPass)
|
||||
|
||||
peerThread "httpWorker" (httpWorker conf peerMeta denv)
|
||||
|
||||
|
|
Loading…
Reference in New Issue