From c1717f2defd2391aeb9284016e5feb70025928c5 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 3 Nov 2024 13:13:59 +0300 Subject: [PATCH] wip, tcp --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 33 ++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 4a0d3a0b..e541277c 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -61,6 +61,7 @@ data MessagingTCP = , _tcpCookie :: Word32 , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) , _tcpPeerCookie :: TVar (HashMap Word32 Int) + , _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32) , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) , _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) @@ -103,6 +104,7 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTVarIO mempty <*> newTVarIO mempty + <*> newTVarIO mempty <*> newTQueueIO <*> newTBQueueIO outMessageQLen <*> newTVarIO mempty @@ -239,6 +241,14 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size) + sweepCookies <- ContT $ withAsync $ forever do + pause @'Seconds 10 + atomically do + pips <- readTVar _tcpPeerConn + modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) + alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems + modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) + sweep <- ContT $ withAsync $ forever do pause @'Seconds 60 atomically do @@ -247,7 +257,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips)) modifyTVar _tcpPeerCookie (HM.filter (>=1)) - waitAnyCatchCancel [p1,p2,probes,sweep] + waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies] where @@ -265,11 +275,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do n -> Just (pred n) -- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool - useCookie cookie = atomically do + useCookie peer cookie = atomically do let MessagingTCP{..} = ?env n <- readTVar _tcpPeerCookie <&> HM.member cookie unless n do modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + modifyTVar _tcpPeerToCookie (HM.insert peer cookie) pure n -- FIXME: timeout-hardcode @@ -299,18 +310,18 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do let ?env = env + let newP = fromSockAddr @'TCP remote :: Peer L4Proto + cookie <- handshake Server env so when (cookie == myCookie) $ exit () - here <- useCookie cookie + here <- useCookie newP cookie when here $ do - debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so + debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow remote exit () - let newP = fromSockAddr @'TCP remote :: Peer L4Proto - atomically $ modifyTVar _tcpServerThreadsCount succ newOutQ <- do @@ -341,8 +352,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ ContT $ bracket none $ const do debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote - atomically $ modifyTVar _tcpServerThreadsCount pred - atomically $ modifyTVar _tcpPeerSocket (HM.delete newP) + atomically do + modifyTVar _tcpServerThreadsCount pred + modifyTVar _tcpPeerSocket (HM.delete newP) + modifyTVar _tcpPeerToCookie (HM.delete newP) + shutdown so ShutdownBoth cancel rd cancel wr @@ -404,7 +418,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do debug $ "same peer, exit" <+> pretty remoteAddr exit () - here <- useCookie cookie + here <- useCookie who cookie -- TODO: handshake notification liftIO $ _tcpOnClientStarted whoAddr connId @@ -443,6 +457,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically do modifyTVar _tcpPeerConn (HM.delete who) modifyTVar _tcpPeerCookie (HM.update killCookie cookie) + modifyTVar _tcpPeerToCookie (HM.delete who) modifyTVar _tcpPeerSocket (HM.delete who) void $ ContT $ bracket none (const $ cancel wr)