mirror of https://github.com/voidlizard/hbs2
TCP connection leak
This commit is contained in:
parent
43eb9abb7e
commit
fa74de1fdb
|
@ -40,6 +40,7 @@ import System.Random hiding (next)
|
||||||
import Control.Monad.Trans.Cont
|
import Control.Monad.Trans.Cont
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
|
|
||||||
|
import UnliftIO (MonadUnliftIO(..))
|
||||||
import UnliftIO.Async
|
import UnliftIO.Async
|
||||||
import UnliftIO.STM
|
import UnliftIO.STM
|
||||||
import UnliftIO.Exception qualified as U
|
import UnliftIO.Exception qualified as U
|
||||||
|
@ -65,6 +66,8 @@ data MessagingTCP =
|
||||||
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
|
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
|
||||||
, _tcpDeferEv :: TQueue ()
|
, _tcpDeferEv :: TQueue ()
|
||||||
, _tcpSpawned :: TVar Int
|
, _tcpSpawned :: TVar Int
|
||||||
|
, _tcpFired :: TVar Int
|
||||||
|
, _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec)
|
||||||
, _tcpProbe :: TVar AnyProbe
|
, _tcpProbe :: TVar AnyProbe
|
||||||
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
||||||
}
|
}
|
||||||
|
@ -94,6 +97,8 @@ newMessagingTCP pa = liftIO do
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
<*> newTQueueIO
|
<*> newTQueueIO
|
||||||
<*> newTVarIO 0
|
<*> newTVarIO 0
|
||||||
|
<*> newTVarIO 0
|
||||||
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO (AnyProbe ())
|
<*> newTVarIO (AnyProbe ())
|
||||||
<*> pure (\_ _ -> none) -- do nothing by default
|
<*> pure (\_ _ -> none) -- do nothing by default
|
||||||
|
|
||||||
|
@ -371,6 +376,21 @@ connectPeerTCP env peer = liftIO do
|
||||||
|
|
||||||
-- FIXME: link-all-asyncs
|
-- FIXME: link-all-asyncs
|
||||||
|
|
||||||
|
fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m ()
|
||||||
|
fireTCP MessagingTCP{..} pip what = do
|
||||||
|
void $ do
|
||||||
|
pa <- toPeerAddr @e pip
|
||||||
|
now <- getTimeCoarse
|
||||||
|
fire <- atomically do
|
||||||
|
here <- readTVar _tcpConnWip <&> HashMap.member pip
|
||||||
|
unless here do
|
||||||
|
modifyTVar _tcpConnWip (HashMap.insert pip now)
|
||||||
|
pure (not here)
|
||||||
|
when fire do
|
||||||
|
debug $ "Fire TCP" <+> pretty pa
|
||||||
|
atomically (modifyTVar _tcpFired succ)
|
||||||
|
void $ async (what >> atomically (modifyTVar _tcpFired pred))
|
||||||
|
|
||||||
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
||||||
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
|
@ -418,7 +438,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
|
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
|
||||||
|
|
||||||
maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do
|
maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do
|
||||||
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
|
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
|
||||||
maybe1 q' none $ \q -> do
|
maybe1 q' none $ \q -> do
|
||||||
atomically do
|
atomically do
|
||||||
|
@ -439,7 +459,14 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
<+> pretty c
|
<+> pretty c
|
||||||
<+> parens ("used:" <+> pretty used)
|
<+> parens ("used:" <+> pretty used)
|
||||||
|
|
||||||
mapM_ link [mon,con,stat]
|
cleanup <- ContT $ withAsync $ forever do
|
||||||
|
pause @Seconds 60
|
||||||
|
now <- getTimeCoarse
|
||||||
|
connWip <- readTVarIO _tcpConnWip <&> HashMap.toList
|
||||||
|
let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ]
|
||||||
|
atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive)
|
||||||
|
|
||||||
|
mapM_ link [mon,con,stat,cleanup]
|
||||||
|
|
||||||
probes <- ContT $ withAsync $ forever do
|
probes <- ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
@ -453,8 +480,10 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
|
S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
|
||||||
S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
|
S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
|
||||||
S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
|
S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
|
||||||
|
S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral)
|
||||||
|
S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size)
|
||||||
|
|
||||||
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes]
|
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup]
|
||||||
|
|
||||||
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
|
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
|
||||||
withFdSocket sock setCloseOnExecIfNeeded
|
withFdSocket sock setCloseOnExecIfNeeded
|
||||||
|
|
Loading…
Reference in New Issue