diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 706fc363..f6df46ea 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -40,6 +40,7 @@ import System.Random hiding (next) import Control.Monad.Trans.Cont import Control.Exception +import UnliftIO (MonadUnliftIO(..)) import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U @@ -65,6 +66,8 @@ data MessagingTCP = , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) , _tcpDeferEv :: TQueue () , _tcpSpawned :: TVar Int + , _tcpFired :: TVar Int + , _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec) , _tcpProbe :: TVar AnyProbe , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } @@ -94,6 +97,8 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTQueueIO <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO mempty <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default @@ -371,6 +376,21 @@ connectPeerTCP env peer = liftIO do -- FIXME: link-all-asyncs +fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m () +fireTCP MessagingTCP{..} pip what = do + void $ do + pa <- toPeerAddr @e pip + now <- getTimeCoarse + fire <- atomically do + here <- readTVar _tcpConnWip <&> HashMap.member pip + unless here do + modifyTVar _tcpConnWip (HashMap.insert pip now) + pure (not here) + when fire do + debug $ "Fire TCP" <+> pretty pa + atomically (modifyTVar _tcpFired succ) + void $ async (what >> atomically (modifyTVar _tcpFired pred)) + runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env@MessagingTCP{..} = liftIO do @@ -418,7 +438,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip - maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do + maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co maybe1 q' none $ \q -> do atomically do @@ -439,7 +459,14 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do <+> pretty c <+> parens ("used:" <+> pretty used) - mapM_ link [mon,con,stat] + cleanup <- ContT $ withAsync $ forever do + pause @Seconds 60 + now <- getTimeCoarse + connWip <- readTVarIO _tcpConnWip <&> HashMap.toList + let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ] + atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive) + + mapM_ link [mon,con,stat,cleanup] probes <- ContT $ withAsync $ forever do pause @'Seconds 10 @@ -453,8 +480,10 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size) S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size) S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral) + S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral) + S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size) - ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes] + ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup] liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do withFdSocket sock setCloseOnExecIfNeeded