From a360dfb7ecec6f5203b78cfc36215fe063df0b1b Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 3 Nov 2024 10:04:57 +0300 Subject: [PATCH] wip, TCP --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 98 +++++++++++++++++-------- 1 file changed, 69 insertions(+), 29 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index ad48192b..6a2bf6f8 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -13,6 +13,7 @@ module HBS2.Net.Messaging.TCP ) where import HBS2.Clock +import HBS2.OrDie import HBS2.Net.IP.Addr import HBS2.Net.Messaging import HBS2.Prelude.Plated @@ -54,18 +55,19 @@ outMessageQLen = 256 -- | TCP Messaging environment data MessagingTCP = MessagingTCP - { _tcpSOCKS5 :: Maybe (PeerAddr L4Proto) - , _tcpOwnPeer :: Peer L4Proto - , _tcpCookie :: Word32 - , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) - , _tcpPeerCookie :: TVar (HashMap Word32 Int) - , _tcpConnDemand :: TQueue (Peer L4Proto) - , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) - , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) - , _tcpClientThreadNum :: TVar Int - , _tcpClientThreads :: TVar (HashMap Int (Async ())) - , _tcpProbe :: TVar AnyProbe - , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed + { _tcpSOCKS5 :: Maybe (PeerAddr L4Proto) + , _tcpOwnPeer :: Peer L4Proto + , _tcpCookie :: Word32 + , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) + , _tcpPeerCookie :: TVar (HashMap Word32 Int) + , _tcpConnDemand :: TQueue (Peer L4Proto) + , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) + , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) + , _tcpClientThreadNum :: TVar Int + , _tcpClientThreads :: TVar (HashMap Int (Async ())) + , _tcpServerThreadsCount :: TVar Int + , _tcpProbe :: TVar AnyProbe + , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } makeLenses 'MessagingTCP @@ -103,6 +105,7 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTVarIO 0 <*> newTVarIO mempty + <*> newTVarIO 0 <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default @@ -193,18 +196,12 @@ writeTBQueueDropSTM inQLen newInQ bs = do more (pred j) -killCookie :: Int -> Maybe Int -killCookie = \case - 1 -> Nothing - n -> Just (pred n) -useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool -useCookie cookie = atomically do - let MessagingTCP{..} = ?env - n <- readTVar _tcpPeerCookie <&> HM.member cookie - unless n do - modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) - pure n +data TCPMessagingError = + TCPPeerReadTimeout + deriving stock (Show,Typeable) + +instance Exception TCPMessagingError runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env@MessagingTCP{..} = liftIO do @@ -221,15 +218,51 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do p <- readTVarIO _tcpProbe acceptReport p =<< S.toList_ do S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size ) + S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral ) S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size) - S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size) + + coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",) + let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ] + + S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo) + S.yield ("tcpPeerCookieUsed", cooNn) + S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size) - waitAnyCatchCancel [p1,p2,probes] + sweep <- ContT $ withAsync $ forever do + pause @'Seconds 60 + atomically do + pips <- readTVar _tcpPeerConn + modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) + modifyTVar _tcpPeerCookie (HM.filter (>=1)) + + waitAnyCatchCancel [p1,p2,probes,sweep] where - readFrames so peer queue = forever $ do + withTCPTimeout timeout action = liftIO do + r <- race (pause timeout) action + case r of + Right{} -> pure () + Left{} -> do + debug "tcp connection timeout!" + throwIO TCPPeerReadTimeout + + killCookie :: Int -> Maybe Int + killCookie = \case + n | n <= 1 -> Nothing + n -> Just (pred n) + + -- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool + useCookie cookie = atomically do + let MessagingTCP{..} = ?env + n <- readTVar _tcpPeerCookie <&> HM.member cookie + unless n do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + pure n + + -- FIXME: timeout-hardcode + readFrames so peer queue = forever $ withTCPTimeout (TimeoutSec 67) do void $ readFromSocket so 4 <&> LBS.toStrict ssize <- readFromSocket so 4 <&> LBS.toStrict let size = word32 ssize & fromIntegral @@ -251,8 +284,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do liftIO $ withFdSocket so setCloseOnExecIfNeeded debug $ "!!! GOT INCOMING CONNECTION FROM !!!" - <+> brackets (pretty own) - <+> brackets (pretty sa) + <+> brackets (pretty remote) let ?env = env @@ -268,6 +300,8 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do let newP = fromSockAddr @'TCP remote :: Peer L4Proto + atomically $ modifyTVar _tcpServerThreadsCount succ + newOutQ <- do atomically do mbQ <- readTVar _tcpSent <&> HM.lookup newP @@ -295,6 +329,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ ContT $ bracket none $ const do debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote + atomically $ modifyTVar _tcpServerThreadsCount pred shutdown so ShutdownBoth cancel rd cancel wr @@ -334,6 +369,8 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do who <- atomically $ readTQueue _tcpConnDemand whoAddr <- toPeerAddr who + debug $ "DEMAND:" <+> pretty who + already <- atomically $ readTVar _tcpPeerConn <&> HM.member who when already do @@ -365,7 +402,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port exit () - atomically $ modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + atomically do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + modifyTVar _tcpPeerConn (HM.insert who connId) wr <- ContT $ withAsync $ forever do bss <- atomically do @@ -390,6 +429,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ ContT $ bracket none (const $ cancel wr) void $ ContT $ bracket none $ const $ do + debug "!!! TCP: BRACKET FIRED IN CLIENT !!!" atomically do modifyTVar _tcpPeerConn (HM.delete who) modifyTVar _tcpPeerCookie (HM.update killCookie cookie)