diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 27db86f6..706fc363 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -43,6 +43,7 @@ import Control.Exception import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U +import Streaming.Prelude qualified as S {-HLINT ignore "Functor law"-} @@ -63,6 +64,7 @@ data MessagingTCP = , _tcpRecv :: TQueue (Peer L4Proto, ByteString) , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) , _tcpDeferEv :: TQueue () + , _tcpSpawned :: TVar Int , _tcpProbe :: TVar AnyProbe , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } @@ -91,6 +93,7 @@ newMessagingTCP pa = liftIO do <*> newTQueueIO <*> newTVarIO mempty <*> newTQueueIO + <*> newTVarIO 0 <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default @@ -182,7 +185,7 @@ spawnConnection :: forall m . MonadIO m -> SockAddr -> m () -spawnConnection tp env so sa = liftIO do +spawnConnection tp env@MessagingTCP{..} so sa = liftIO do flip runContT pure $ do @@ -224,6 +227,9 @@ spawnConnection tp env so sa = liftIO do atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) when (used == 1) do + + atomically $ modifyTVar _tcpSpawned succ + q <- getWriteQueue connId updatePeer connId newP @@ -270,6 +276,7 @@ spawnConnection tp env so sa = liftIO do ContT $ bracket none $ \_ -> mapM cancel [rd,wr] ContT $ bracket (pure connId) cleanupConn + ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred) void $ waitAnyCatchCancel [rd,wr] @@ -437,17 +444,15 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do probes <- ContT $ withAsync $ forever do pause @'Seconds 10 probe <- readTVarIO _tcpProbe - values <- atomically do - a <- readTVar _tcpConnPeer <&> HashMap.size <&> (L.singleton . ("tcpConnPeer",)) - b <- readTVar _tcpPeerConn <&> HashMap.size <&> (L.singleton . ("tcpPeerConn",)) - c <- readTVar _tcpConnUsed <&> HashMap.size <&> (L.singleton . ("tcpConnUsed",)) - d <- readTVar _tcpConnQ <&> HashMap.size <&> (L.singleton . ("tcpConnQ",)) - e <- readTVar _tcpPeerPx <&> HashMap.size <&> (L.singleton . ("tcpPeerPx",)) - f <- readTVar _tcpPeerXp <&> HashMap.size <&> (L.singleton . ("tcpPeerXp",)) - g <- readTVar _tcpDefer <&> HashMap.size <&> (L.singleton . ("tcpPeerDefer",)) - pure $ mconcat [a, b, c, d, e, f, g] - acceptReport probe (fmap (over _2 fromIntegral) values) - + acceptReport probe =<< S.toList_ do + S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size) + S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral) ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes]