diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 93df1de7..8344ec6b 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -174,6 +174,7 @@ library , network-multicast , network-simple , network-byte-order + , psqueues , prettyprinter , prettyprinter-ansi-terminal , mwc-random diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 13f75be2..2e848ed7 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -29,6 +29,8 @@ import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM +import Data.HashPSQ (HashPSQ) +import Data.HashPSQ qualified as HPSQ import Data.HashSet qualified as HS import Data.Maybe import Data.Word @@ -66,7 +68,7 @@ data MessagingTCP = , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) , _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) - , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) + , _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString)) , _tcpClientThreadNum :: TVar Int , _tcpClientThreads :: TVar (HashMap Int (Async ())) , _tcpServerThreadsCount :: TVar Int @@ -108,7 +110,7 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTQueueIO <*> newTBQueueIO (10 * outMessageQLen) - <*> newTVarIO mempty + <*> newTVarIO HPSQ.empty <*> newTVarIO 0 <*> newTVarIO mempty <*> newTVarIO 0 @@ -121,19 +123,22 @@ instance Messaging MessagingTCP L4Proto ByteString where -- let _own = tcpOwnPeer -- debug $ "!!!! FUCKING SEND TO" <+> pretty p + now <- getTimeCoarse queue <- atomically do - q' <- readTVar _tcpSent <&> HM.lookup p + q' <- readTVar _tcpSent <&> HPSQ.lookup p case q' of Nothing -> do writeTQueue _tcpConnDemand p q <- newTBQueue outMessageQLen - modifyTVar _tcpSent (HM.insert p q) + modifyTVar _tcpSent (HPSQ.insert p now q) pure q - Just q -> pure q + Just (_,q) -> pure q atomically $ writeTBQueueDropSTM 10 queue msg + atomically $ stateTVar _tcpSent (HPSQ.alter (\x -> ((), fmap (set _1 now) x)) p) + -- atomically $ insert -- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE" @@ -240,10 +245,10 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo) S.yield ("tcpPeerCookieUsed", cooNn) - S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size) + S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size) sweepCookies <- ContT $ withAsync $ forever do - pause @'Seconds 120 + pause @'Seconds 300 -- atomically do -- pips <- readTVar _tcpPeerConn -- modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) @@ -251,7 +256,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do -- modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) sweep <- ContT $ withAsync $ forever do - pause @'Seconds 120 + pause @'Seconds 300 + now <- getTimeCoarse + atomically do + w <- readTVar _tcpSent <&> HPSQ.toList + let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 > 1200 ] + writeTVar _tcpSent (HPSQ.fromList live) -- atomically do -- pips <- readTVar _tcpPeerConn -- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) @@ -325,16 +335,20 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically $ modifyTVar _tcpServerThreadsCount succ - newOutQ <- do - atomically do - mbQ <- readTVar _tcpSent <&> HM.lookup newP - maybe (newTBQueue outMessageQLen) pure mbQ + now <- getTimeCoarse + newOutQ <- atomically do + q <- readTVar _tcpSent <&> HPSQ.lookup newP >>= \case + Just (_,w) -> pure w + Nothing -> do + nq <- newTBQueue outMessageQLen + modifyTVar _tcpSent (HPSQ.insert newP now nq) + pure nq - atomically do - modifyTVar _tcpSent (HM.insert newP newOutQ) modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie)) modifyTVar _tcpPeerSocket (HM.insert newP so) + pure q + wr <- ContT $ withAsync $ forever do bs <- atomically $ readTBQueue newOutQ @@ -363,12 +377,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do cancel wr atomically do - modifyTVar _tcpSent (HM.delete newP) + modifyTVar _tcpSent (HPSQ.delete newP) modifyTVar _tcpPeerCookie (HM.update killCookie cookie) void $ waitAnyCatchCancel [rd,wr] - runClient = flip runContT pure do let myCookie = view tcpCookie env @@ -433,7 +446,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do wr <- ContT $ withAsync $ forever do bss <- atomically do - q' <- readTVar _tcpSent <&> HM.lookup who + q' <- readTVar _tcpSent <&> fmap (view _2) . HPSQ.lookup who maybe1 q' mempty $ \q -> do s <- readTBQueue q sx <- flushTBQueue q @@ -458,6 +471,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do modifyTVar _tcpPeerCookie (HM.update killCookie cookie) modifyTVar _tcpPeerToCookie (HM.delete who) modifyTVar _tcpPeerSocket (HM.delete who) + modifyTVar _tcpSent (HPSQ.delete who) void $ ContT $ bracket none (const $ cancel wr)