From b8abb8bddd6202aba02ee54e2532074054798bac Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sun, 3 Nov 2024 08:20:14 +0300 Subject: [PATCH] wip, refactored --- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 134 +++++++++++++----------- 1 file changed, 71 insertions(+), 63 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 39c64ee7..ad48192b 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -1,4 +1,5 @@ {-# Language TemplateHaskell #-} +{-# LANGUAGE ImplicitParams #-} module HBS2.Net.Messaging.TCP ( MessagingTCP , runMessagingTCP @@ -19,17 +20,14 @@ import HBS2.Prelude.Plated import HBS2.Net.Messaging.Stream import HBS2.System.Logger.Simple -import HBS2.Misc.PrettyStuff -import Control.Concurrent.STM (flushTQueue,retry) import Control.Monad.Trans.Maybe import Data.Bits import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS -import Data.Function import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM -import Data.List qualified as L +import Data.HashSet qualified as HS import Data.Maybe import Data.Word import Lens.Micro.Platform @@ -64,12 +62,27 @@ data MessagingTCP = , _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) + , _tcpClientThreadNum :: TVar Int + , _tcpClientThreads :: TVar (HashMap Int (Async ())) , _tcpProbe :: TVar AnyProbe , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } makeLenses 'MessagingTCP + +newClientThread :: forall m . MonadUnliftIO m => MessagingTCP -> m () -> m Int +newClientThread MessagingTCP{..} a = do + as <- async a + atomically do + n <- stateTVar _tcpClientThreadNum ( \x -> (x, succ x)) + modifyTVar _tcpClientThreads (HM.insert n as) + pure n + +delClientThread :: MonadIO m => MessagingTCP -> Int -> m () +delClientThread MessagingTCP{..} threadId = atomically $ + modifyTVar' _tcpClientThreads (HM.delete threadId) + messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m () messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p @@ -88,6 +101,8 @@ newMessagingTCP pa = liftIO do <*> newTQueueIO <*> newTBQueueIO outMessageQLen <*> newTVarIO mempty + <*> newTVarIO 0 + <*> newTVarIO mempty <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default @@ -183,6 +198,14 @@ killCookie = \case 1 -> Nothing n -> Just (pred n) +useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool +useCookie cookie = atomically do + let MessagingTCP{..} = ?env + n <- readTVar _tcpPeerCookie <&> HM.member cookie + unless n do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + pure n + runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env@MessagingTCP{..} = liftIO do @@ -193,13 +216,26 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do p1 <- ContT $ withAsync runClient p2 <- ContT $ withAsync runServer - waitAnyCatchCancel [p1,p2] - -- waitAnyCatchCancel [p2] - -- waitAnyCatchCancel [p1] + probes <- ContT $ withAsync $ forever do + pause @'Seconds 10 + p <- readTVarIO _tcpProbe + acceptReport p =<< S.toList_ do + S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size ) + S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size) + S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size) + S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size) + + waitAnyCatchCancel [p1,p2,probes] where - runServer :: forall m . MonadIO m => m () + readFrames so peer queue = forever $ do + void $ readFromSocket so 4 <&> LBS.toStrict + ssize <- readFromSocket so 4 <&> LBS.toStrict + let size = word32 ssize & fromIntegral + bs <- readFromSocket so size + atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs) + runServer = do own <- toPeerAddr $ view tcpOwnPeer env @@ -218,17 +254,13 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do <+> brackets (pretty own) <+> brackets (pretty sa) + let ?env = env + cookie <- handshake Server env so when (cookie == myCookie) $ exit () - here <- atomically do - n <- readTVar _tcpPeerCookie <&> HM.member cookie - - unless n do - modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) - - pure n + here <- useCookie cookie when here $ do debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so @@ -236,11 +268,6 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do let newP = fromSockAddr @'TCP remote :: Peer L4Proto - -- FIXME: queue-size-hardcode - let inQLen = outMessageQLen - - newInQ <- liftIO $ newTBQueueIO inQLen - newOutQ <- do atomically do mbQ <- readTVar _tcpSent <&> HM.lookup newP @@ -264,18 +291,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do sendLazy so frame --(LBS.toStrict frame) - rd <- ContT $ withAsync $ forever do - - spx <- readFromSocket so 4 <&> LBS.toStrict - ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг - let px = word32 spx -- & fromIntegral - let size = word32 ssize & fromIntegral - - bs <- readFromSocket so size - - -- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote - - atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs) + rd <- ContT $ withAsync $ readFrames so newP _tcpReceived void $ ContT $ bracket none $ const do debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote @@ -285,21 +301,32 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically do modifyTVar _tcpSent (HM.delete newP) - modifyTVar _tcpPeerCookie $ \m -> do - HM.update killCookie cookie m + modifyTVar _tcpPeerCookie (HM.update killCookie cookie) void $ waitAnyCatchCancel [rd,wr] - runClient :: forall m . MonadIO m => m () - runClient = do + runClient = flip runContT pure do own <- toPeerAddr $ view tcpOwnPeer env - let (L4Address _ (IPAddrPort (i,p))) = own + let (L4Address _ (IPAddrPort (i,_))) = own let myCookie = view tcpCookie env pause @'Seconds 10 + void $ ContT $ bracket none $ const $ do + what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty)) + mapM_ cancel what + + void $ ContT $ withAsync $ forever do + pause @Seconds 60 + + done <- readTVarIO _tcpClientThreads <&> HM.toList + >>= filterM ( \(_,x) -> isJust <$> poll x ) + <&> HS.fromList . fmap fst + + atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done))) + forever $ void $ runMaybeT do -- client sockets @@ -313,10 +340,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do debug "SHIT? BUSYLOOP?" mzero - -- FIXME: !!! - liftIO $ async do + liftIO $ newClientThread env $ do let (L4Address _ (IPAddrPort (ip,port))) = whoAddr connect (show ip) (show port) $ \(so, remoteAddr) -> do + + let ?env = env + flip runContT pure $ callCC \exit -> do debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr @@ -327,15 +356,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do debug $ "same peer, exit" <+> pretty remoteAddr exit () - here <- atomically do - n <- readTVar _tcpPeerCookie <&> HM.member cookie - - unless n do - modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) - - modifyTVar _tcpPeerConn (HM.insert who connId) - - pure n + here <- useCookie cookie -- TODO: handshake notification liftIO $ _tcpOnClientStarted whoAddr connId @@ -371,20 +392,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do void $ ContT $ bracket none $ const $ do atomically do modifyTVar _tcpPeerConn (HM.delete who) - modifyTVar _tcpPeerCookie $ \m -> do - HM.update killCookie cookie m - - forever do - - spx <- readFromSocket so 4 <&> LBS.toStrict - ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг - let px = word32 spx -- & fromIntegral - let size = word32 ssize & fromIntegral - - bs <- readFromSocket so size - - -- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr - - atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs) + modifyTVar _tcpPeerCookie (HM.update killCookie cookie) + readFrames so who _tcpReceived