From 8ab74bec71289be5ce3e377b926d5ac8d11d1ab6 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Mon, 26 May 2025 18:08:17 +0300 Subject: [PATCH] maybe fixed TCP loop on error --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 99 +++++++++++++------------ hbs2-peer/app/PeerMain.hs | 4 +- 2 files changed, 55 insertions(+), 48 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 3499eb03..70608b4a 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -66,7 +66,7 @@ data MessagingTCP = , _tcpPeerCookie :: TVar (HashMap Word32 Int) , _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32) , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) - , _tcpConnDemand :: TQueue (Peer L4Proto) + , _tcpConnDemand :: TVar (HashPSQ (Peer L4Proto) TimeSpec ()) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString)) , _tcpClientThreadNum :: TVar Int @@ -108,7 +108,7 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTVarIO mempty <*> newTVarIO mempty - <*> newTQueueIO + <*> newTVarIO HPSQ.empty <*> newTBQueueIO (10 * outMessageQLen) <*> newTVarIO HPSQ.empty <*> newTVarIO 0 @@ -129,7 +129,7 @@ instance Messaging MessagingTCP L4Proto ByteString where case q' of Nothing -> do - writeTQueue _tcpConnDemand p + modifyTVar _tcpConnDemand (HPSQ.insert p now ()) q <- newTBQueue outMessageQLen modifyTVar _tcpSent (HPSQ.insert p now q) pure q @@ -224,51 +224,55 @@ tcpPeerKick MessagingTCP{..} p = do runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env@MessagingTCP{..} = liftIO do - void $ flip runContT pure do + fix \again -> do - forever do + void $ flip runContT pure do - p1 <- ContT $ withAsync runClient - p2 <- ContT $ withAsync runServer + p1 <- ContT $ withAsync runClient + p2 <- ContT $ withAsync runServer - probes <- ContT $ withAsync $ forever do + probes <- ContT $ withAsync $ forever do + pause @'Seconds 10 + p <- readTVarIO _tcpProbe + acceptReport p =<< S.toList_ do + S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size ) + S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral ) + S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size) + + coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",) + let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ] + + S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo) + S.yield ("tcpPeerCookieUsed", cooNn) + + S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size) + + sweepCookies <- ContT $ withAsync $ forever do + pause @'Seconds 300 + atomically do + pips <- readTVar _tcpPeerConn + modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) + alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems + modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) + + sweep <- ContT $ withAsync $ forever do + pause @'Seconds 300 + now <- getTimeCoarse + atomically do + w <- readTVar _tcpSent <&> HPSQ.toList + let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ] + writeTVar _tcpSent (HPSQ.fromList live) + -- atomically do + -- pips <- readTVar _tcpPeerConn + -- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) + -- modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips)) + -- modifyTVar _tcpPeerCookie (HM.filter (>=1)) + + (_,e) <- waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies] + + err $ "TCP server is down because of" <+> viaShow e pause @'Seconds 10 - p <- readTVarIO _tcpProbe - acceptReport p =<< S.toList_ do - S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size ) - S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral ) - S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size) - - coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",) - let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ] - - S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo) - S.yield ("tcpPeerCookieUsed", cooNn) - - S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size) - - sweepCookies <- ContT $ withAsync $ forever do - pause @'Seconds 300 - atomically do - pips <- readTVar _tcpPeerConn - modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) - alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems - modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) - - sweep <- ContT $ withAsync $ forever do - pause @'Seconds 300 - now <- getTimeCoarse - atomically do - w <- readTVar _tcpSent <&> HPSQ.toList - let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ] - writeTVar _tcpSent (HPSQ.fromList live) - -- atomically do - -- pips <- readTVar _tcpPeerConn - -- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) - -- modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips)) - -- modifyTVar _tcpPeerCookie (HM.filter (>=1)) - - waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies] + lift again where @@ -302,7 +306,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do bs <- readFromSocket so size atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs) - runServer = do + runServer = flip runContT pure do own <- toPeerAddr $ view tcpOwnPeer env let (L4Address _ (IPAddrPort (i,p))) = own @@ -404,7 +408,10 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do forever $ void $ runMaybeT do -- client sockets - who <- atomically $ readTQueue _tcpConnDemand + who <- atomically do + readTVar _tcpConnDemand <&> HPSQ.minView >>= \case + Nothing -> STM.retry + Just (p,_,_,rest) -> writeTVar _tcpConnDemand rest >> pure p already <- readTVarIO _tcpPeerConn <&> HM.member who diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 7e452d26..67ed11b2 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1,4 +1,3 @@ - {-# Language TemplateHaskell #-} {-# Language AllowAmbiguousTypes #-} {-# Language UndecidableInstances #-} @@ -930,12 +929,13 @@ runPeer opts = respawnOnError opts $ flip runContT pure do addProbe tcpProbe messagingTCPSetProbe tcpEnv tcpProbe - void $ liftIO ( async do + void $ liftIO ( asyncLinked do runMessagingTCP tcpEnv `U.withException` \(e :: SomeException) -> do err (viaShow e) err "!!! TCP messaging stopped" liftIO $ atomically $ modifyTVar msgAlive pred + throwIO e ) let tcpaddr = view tcpOwnPeer tcpEnv liftIO $ atomically $ modifyTVar msgAlive succ