maybe fixed TCP loop on error

This commit is contained in:
voidlizard 2025-05-26 18:08:17 +03:00
parent d03273fa3e
commit 8ab74bec71
2 changed files with 55 additions and 48 deletions

View File

@ -66,7 +66,7 @@ data MessagingTCP =
, _tcpPeerCookie :: TVar (HashMap Word32 Int) , _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32) , _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpConnDemand :: TVar (HashPSQ (Peer L4Proto) TimeSpec ())
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString)) , _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString))
, _tcpClientThreadNum :: TVar Int , _tcpClientThreadNum :: TVar Int
@ -108,7 +108,7 @@ newMessagingTCP pa = liftIO do
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTQueueIO <*> newTVarIO HPSQ.empty
<*> newTBQueueIO (10 * outMessageQLen) <*> newTBQueueIO (10 * outMessageQLen)
<*> newTVarIO HPSQ.empty <*> newTVarIO HPSQ.empty
<*> newTVarIO 0 <*> newTVarIO 0
@ -129,7 +129,7 @@ instance Messaging MessagingTCP L4Proto ByteString where
case q' of case q' of
Nothing -> do Nothing -> do
writeTQueue _tcpConnDemand p modifyTVar _tcpConnDemand (HPSQ.insert p now ())
q <- newTBQueue outMessageQLen q <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HPSQ.insert p now q) modifyTVar _tcpSent (HPSQ.insert p now q)
pure q pure q
@ -224,51 +224,55 @@ tcpPeerKick MessagingTCP{..} p = do
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do fix \again -> do
forever do void $ flip runContT pure do
p1 <- ContT $ withAsync runClient p1 <- ContT $ withAsync runClient
p2 <- ContT $ withAsync runServer p2 <- ContT $ withAsync runServer
probes <- ContT $ withAsync $ forever do probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
p <- readTVarIO _tcpProbe
acceptReport p =<< S.toList_ do
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral )
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",)
let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ]
S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo)
S.yield ("tcpPeerCookieUsed", cooNn)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size)
sweepCookies <- ContT $ withAsync $ forever do
pause @'Seconds 300
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 300
now <- getTimeCoarse
atomically do
w <- readTVar _tcpSent <&> HPSQ.toList
let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ]
writeTVar _tcpSent (HPSQ.fromList live)
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerCookie (HM.filter (>=1))
(_,e) <- waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies]
err $ "TCP server is down because of" <+> viaShow e
pause @'Seconds 10 pause @'Seconds 10
p <- readTVarIO _tcpProbe lift again
acceptReport p =<< S.toList_ do
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral )
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",)
let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ]
S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo)
S.yield ("tcpPeerCookieUsed", cooNn)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size)
sweepCookies <- ContT $ withAsync $ forever do
pause @'Seconds 300
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 300
now <- getTimeCoarse
atomically do
w <- readTVar _tcpSent <&> HPSQ.toList
let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ]
writeTVar _tcpSent (HPSQ.fromList live)
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies]
where where
@ -302,7 +306,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
bs <- readFromSocket so size bs <- readFromSocket so size
atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs) atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs)
runServer = do runServer = flip runContT pure do
own <- toPeerAddr $ view tcpOwnPeer env own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own let (L4Address _ (IPAddrPort (i,p))) = own
@ -404,7 +408,10 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
forever $ void $ runMaybeT do forever $ void $ runMaybeT do
-- client sockets -- client sockets
who <- atomically $ readTQueue _tcpConnDemand who <- atomically do
readTVar _tcpConnDemand <&> HPSQ.minView >>= \case
Nothing -> STM.retry
Just (p,_,_,rest) -> writeTVar _tcpConnDemand rest >> pure p
already <- readTVarIO _tcpPeerConn <&> HM.member who already <- readTVarIO _tcpPeerConn <&> HM.member who

View File

@ -1,4 +1,3 @@
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# Language AllowAmbiguousTypes #-} {-# Language AllowAmbiguousTypes #-}
{-# Language UndecidableInstances #-} {-# Language UndecidableInstances #-}
@ -930,12 +929,13 @@ runPeer opts = respawnOnError opts $ flip runContT pure do
addProbe tcpProbe addProbe tcpProbe
messagingTCPSetProbe tcpEnv tcpProbe messagingTCPSetProbe tcpEnv tcpProbe
void $ liftIO ( async do void $ liftIO ( asyncLinked do
runMessagingTCP tcpEnv runMessagingTCP tcpEnv
`U.withException` \(e :: SomeException) -> do `U.withException` \(e :: SomeException) -> do
err (viaShow e) err (viaShow e)
err "!!! TCP messaging stopped" err "!!! TCP messaging stopped"
liftIO $ atomically $ modifyTVar msgAlive pred liftIO $ atomically $ modifyTVar msgAlive pred
throwIO e
) )
let tcpaddr = view tcpOwnPeer tcpEnv let tcpaddr = view tcpOwnPeer tcpEnv
liftIO $ atomically $ modifyTVar msgAlive succ liftIO $ atomically $ modifyTVar msgAlive succ