added TCP spawned parameter to probe

This commit is contained in:
voidlizard 2024-11-01 11:12:22 +03:00
parent 6cca320c34
commit 4c4e773fa5
1 changed files with 17 additions and 12 deletions

View File

@ -43,6 +43,7 @@ import Control.Exception
import UnliftIO.Async import UnliftIO.Async
import UnliftIO.STM import UnliftIO.STM
import UnliftIO.Exception qualified as U import UnliftIO.Exception qualified as U
import Streaming.Prelude qualified as S
{-HLINT ignore "Functor law"-} {-HLINT ignore "Functor law"-}
@ -63,6 +64,7 @@ data MessagingTCP =
, _tcpRecv :: TQueue (Peer L4Proto, ByteString) , _tcpRecv :: TQueue (Peer L4Proto, ByteString)
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
, _tcpDeferEv :: TQueue () , _tcpDeferEv :: TQueue ()
, _tcpSpawned :: TVar Int
, _tcpProbe :: TVar AnyProbe , _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
} }
@ -91,6 +93,7 @@ newMessagingTCP pa = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTQueueIO <*> newTQueueIO
<*> newTVarIO 0
<*> newTVarIO (AnyProbe ()) <*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default <*> pure (\_ _ -> none) -- do nothing by default
@ -182,7 +185,7 @@ spawnConnection :: forall m . MonadIO m
-> SockAddr -> SockAddr
-> m () -> m ()
spawnConnection tp env so sa = liftIO do spawnConnection tp env@MessagingTCP{..} so sa = liftIO do
flip runContT pure $ do flip runContT pure $ do
@ -224,6 +227,9 @@ spawnConnection tp env so sa = liftIO do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
when (used == 1) do when (used == 1) do
atomically $ modifyTVar _tcpSpawned succ
q <- getWriteQueue connId q <- getWriteQueue connId
updatePeer connId newP updatePeer connId newP
@ -270,6 +276,7 @@ spawnConnection tp env so sa = liftIO do
ContT $ bracket none $ \_ -> mapM cancel [rd,wr] ContT $ bracket none $ \_ -> mapM cancel [rd,wr]
ContT $ bracket (pure connId) cleanupConn ContT $ bracket (pure connId) cleanupConn
ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred)
void $ waitAnyCatchCancel [rd,wr] void $ waitAnyCatchCancel [rd,wr]
@ -437,17 +444,15 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
probes <- ContT $ withAsync $ forever do probes <- ContT $ withAsync $ forever do
pause @'Seconds 10 pause @'Seconds 10
probe <- readTVarIO _tcpProbe probe <- readTVarIO _tcpProbe
values <- atomically do acceptReport probe =<< S.toList_ do
a <- readTVar _tcpConnPeer <&> HashMap.size <&> (L.singleton . ("tcpConnPeer",)) S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size)
b <- readTVar _tcpPeerConn <&> HashMap.size <&> (L.singleton . ("tcpPeerConn",)) S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size)
c <- readTVar _tcpConnUsed <&> HashMap.size <&> (L.singleton . ("tcpConnUsed",)) S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size)
d <- readTVar _tcpConnQ <&> HashMap.size <&> (L.singleton . ("tcpConnQ",)) S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size)
e <- readTVar _tcpPeerPx <&> HashMap.size <&> (L.singleton . ("tcpPeerPx",)) S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size)
f <- readTVar _tcpPeerXp <&> HashMap.size <&> (L.singleton . ("tcpPeerXp",)) S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
g <- readTVar _tcpDefer <&> HashMap.size <&> (L.singleton . ("tcpPeerDefer",)) S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
pure $ mconcat [a, b, c, d, e, f, g] S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
acceptReport probe (fmap (over _2 fromIntegral) values)
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes] ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes]