TCP probe

This commit is contained in:
voidlizard 2024-10-30 09:49:41 +03:00
parent 39e790ef32
commit 251b9ce5c3
3 changed files with 105 additions and 52 deletions

View File

@ -8,6 +8,7 @@ module HBS2.Net.Messaging.TCP
, tcpPeerConn
, tcpCookie
, tcpOnClientStarted
, messagingTCPSetProbe
) where
import HBS2.Clock
@ -43,7 +44,7 @@ import UnliftIO.Async
import UnliftIO.STM
import UnliftIO.Exception qualified as U
{-HLINT ignore "Functor law"-}
-- FIXME: control-recv-capacity-to-avoid-leaks
@ -62,11 +63,15 @@ data MessagingTCP =
, _tcpRecv :: TQueue (Peer L4Proto, ByteString)
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
, _tcpDeferEv :: TQueue ()
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
makeLenses 'MessagingTCP
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
newMessagingTCP :: ( MonadIO m
, FromSockAddr 'TCP (Peer L4Proto)
)
@ -86,6 +91,7 @@ newMessagingTCP pa = liftIO do
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
instance Messaging MessagingTCP L4Proto ByteString where
@ -358,75 +364,93 @@ connectPeerTCP env peer = liftIO do
-- FIXME: link-all-asyncs
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env = liftIO do
runMessagingTCP env@MessagingTCP{..} = liftIO do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
void $ flip runContT pure do
let defs = view tcpDefer env
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
mon <- async $ forever do
pause @'Seconds 30
now <- getTimeCoarse
let defs = view tcpDefer env
-- FIXME: time-hardcode-again
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
atomically $ modifyTVar defs
$ HashMap.mapMaybe
$ \es -> let rs = expire es
in case rs of
[] -> Nothing
xs -> Just xs
mon <- ContT $ withAsync $ forever do
pause @'Seconds 30
now <- getTimeCoarse
con <- async $ forever do
-- FIXME: time-hardcode-again
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
atomically $ modifyTVar defs
$ HashMap.mapMaybe
$ \es -> let rs = expire es
in case rs of
[] -> Nothing
xs -> Just xs
let ev = view tcpDeferEv env
con <- ContT $ withAsync $ forever do
-- FIXME: wait-period-hardcode
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
let ev = view tcpDeferEv env
dePips <- readTVarIO defs <&> HashMap.keys
-- FIXME: wait-period-hardcode
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
dePips <- readTVarIO defs <&> HashMap.keys
forM_ dePips $ \pip -> do
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
forM_ dePips $ \pip -> do
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
unless (L.null msgs) do
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
unless (L.null msgs) do
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
let len = length msgs
let len = length msgs
when (len > 10) do
-- FIXME: deferred-message-hardcoded
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
when (len > 10) do
-- FIXME: deferred-message-hardcoded
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
maybe1 q' none $ \q -> do
atomically do
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
modifyTVar defs $ HashMap.delete pip
forM_ mss $ \m -> writeTQueue q (pip, snd m)
maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
maybe1 q' none $ \q -> do
atomically do
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
modifyTVar defs $ HashMap.delete pip
forM_ mss $ \m -> writeTQueue q (pip, snd m)
pure ()
pure ()
stat <- async $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
forM_ peers $ \(c,pip) -> do
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
trace $ "peer" <+> brackets (pretty own)
<+> pretty pip
<+> pretty c
<+> parens ("used:" <+> pretty used)
stat <- ContT $ withAsync $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
forM_ peers $ \(c,pip) -> do
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
trace $ "peer" <+> brackets (pretty own)
<+> pretty pip
<+> pretty c
<+> parens ("used:" <+> pretty used)
mapM_ link [mon,con,stat]
mapM_ link [mon,con,stat]
liftIO (
listen (Host (show i)) (show p) $ \(sock, sa) -> do
probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _tcpProbe
values <- atomically do
a <- readTVar _tcpConnPeer <&> HashMap.size <&> (L.singleton . ("tcpConnPeer",))
b <- readTVar _tcpPeerConn <&> HashMap.size <&> (L.singleton . ("tcpPeerConn",))
c <- readTVar _tcpConnUsed <&> HashMap.size <&> (L.singleton . ("tcpConnUsed",))
d <- readTVar _tcpConnQ <&> HashMap.size <&> (L.singleton . ("tcpConnQ",))
e <- readTVar _tcpPeerPx <&> HashMap.size <&> (L.singleton . ("tcpPeerPx",))
f <- readTVar _tcpPeerXp <&> HashMap.size <&> (L.singleton . ("tcpPeerXp",))
g <- readTVar _tcpDefer <&> HashMap.size <&> (L.singleton . ("tcpPeerDefer",))
pure $ mconcat [a, b, c, d, e, f, g]
acceptReport probe (fmap (over _2 fromIntegral) values)
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes]
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
@ -449,7 +473,7 @@ runMessagingTCP env = liftIO do
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so ) `U.finally` mapM_ cancel [mon,con,stat]
close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat]
traceCmd :: forall a ann b m . ( Pretty a

View File

@ -857,6 +857,11 @@ runPeer opts = respawnOnError opts $ runResourceT do
<&> set tcpOnClientStarted (onClientTCPConnected brains)
<&> set tcpSOCKS5 socks5
lift do
tcpProbe <- newSimpleProbe "MessagingTCP"
addProbe tcpProbe
messagingTCPSetProbe tcpEnv tcpProbe
void $ liftIO ( async do
runMessagingTCP tcpEnv
`U.withException` \(e :: SomeException) -> do
@ -909,6 +914,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
rce <- refChanWorkerEnv conf penv denv refChanNotifySource
rcwProbe <- newSimpleProbe "RefChanWorker"
addProbe rcwProbe
refChanWorkerEnvSetProbe rce rcwProbe
let refChanAdapter =
RefChanAdapter
{ refChanOnHead = refChanOnHeadFn rce

View File

@ -12,6 +12,7 @@ module RefChan (
, runRefChanRelyWorker
, refChanWorkerEnv
, refChanNotifyOnUpdated
, refChanWorkerEnvSetProbe
) where
import HBS2.Prelude.Plated
@ -105,10 +106,18 @@ data RefChanWorkerEnv e =
, _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip
, _refChanWorkerNotifiersDone :: Cache (Hash HbSync) ()
, _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) ()
, _refChanWorkerProbe :: TVar AnyProbe
}
makeLenses 'RefChanWorkerEnv
refChanWorkerEnvSetProbe :: forall m e . (MonadIO m, ForRefChans e)
=> RefChanWorkerEnv e
-> AnyProbe
-> m ()
refChanWorkerEnvSetProbe RefChanWorkerEnv{..} probe = do
liftIO $ atomically $ writeTVar _refChanWorkerProbe probe
refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
=> PeerConfig
-> PeerEnv e
@ -127,6 +136,8 @@ refChanWorkerEnv conf pe de nsource =
<*> newTQueueIO
<*> Cache.newCache (Just defRequestLimit)
<*> Cache.newCache (Just defRequestLimit)
<*> newTVarIO (AnyProbe ())
refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
refChanOnHeadFn env chan tran = do
@ -595,6 +606,15 @@ refChanWorker env@RefChanWorkerEnv{..} brains = do
bullshit <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _refChanWorkerProbe
values <- atomically do
refChanWorkerEnvDownloadSize <- readTVar _refChanWorkerEnvDownload <&> HashMap.size
refChanWorkerNotifiersSize <- readTVar _refChanWorkerNotifiers <&> HashMap.size
pure [ ("refChanWorkerEnvDownloadSize", fromIntegral refChanWorkerEnvDownloadSize)
, ("refChanWorkerNotifiersSize", fromIntegral refChanWorkerNotifiersSize)
]
acceptReport probe values
debug "I'm refchan worker"
waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit]