From a2955197a30c03de5bddd3a698a224c70094e9c1 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sat, 2 Nov 2024 13:04:21 +0300 Subject: [PATCH] tcp rewritten --- cabal.project | 2 +- hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 566 +++++++++--------------- 2 files changed, 210 insertions(+), 358 deletions(-) diff --git a/cabal.project b/cabal.project index d182df83..b147e7fb 100644 --- a/cabal.project +++ b/cabal.project @@ -10,6 +10,6 @@ constraints: , http-client >=0.7.16 && <0.8 -- executable-static: True --- profiling: True +profiling: True --library-profiling: False diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 422e6c8b..d42f8d73 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -21,14 +21,14 @@ import HBS2.Net.Messaging.Stream import HBS2.System.Logger.Simple import HBS2.Misc.PrettyStuff -import Control.Concurrent.STM (flushTQueue) +import Control.Concurrent.STM (flushTQueue,retry) import Control.Monad.Trans.Maybe import Data.Bits import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.Function import Data.HashMap.Strict (HashMap) -import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict qualified as HM import Data.List qualified as L import Data.Maybe import Data.Word @@ -50,24 +50,20 @@ import Streaming.Prelude qualified as S -- FIXME: control-recv-capacity-to-avoid-leaks +outMessageQLen :: Natural +outMessageQLen = 256 + -- | TCP Messaging environment data MessagingTCP = MessagingTCP { _tcpSOCKS5 :: Maybe (PeerAddr L4Proto) , _tcpOwnPeer :: Peer L4Proto , _tcpCookie :: Word32 - , _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto)) , _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64) - , _tcpConnUsed :: TVar (HashMap Word64 Int) - , _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString))) - , _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto)) - , _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32) - , _tcpRecv :: TQueue (Peer L4Proto, ByteString) - , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) - , _tcpDeferEv :: TQueue () - , _tcpSpawned :: TVar Int - , _tcpFired :: TVar Int - , _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec) + , _tcpPeerCookie :: TVar (HashMap Word32 Int) + , _tcpConnDemand :: TQueue (Peer L4Proto) + , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) + , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) , _tcpProbe :: TVar AnyProbe , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } @@ -89,55 +85,39 @@ newMessagingTCP pa = liftIO do <*> randomIO <*> newTVarIO mempty <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty - <*> newTVarIO mempty <*> newTQueueIO - <*> newTVarIO mempty - <*> newTQueueIO - <*> newTVarIO 0 - <*> newTVarIO 0 + <*> newTBQueueIO outMessageQLen <*> newTVarIO mempty <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default instance Messaging MessagingTCP L4Proto ByteString where - sendTo bus (To p) (From _f) msg = liftIO do - let _own = view tcpOwnPeer bus + sendTo MessagingTCP{..} (To p) (From _f) msg = liftIO do + -- let _own = tcpOwnPeer + -- debug $ "!!!! FUCKING SEND TO" <+> pretty p - co' <- atomically $ readTVar (view tcpPeerConn bus) <&> HashMap.lookup p + queue <- atomically do + q' <- readTVar _tcpSent <&> HM.lookup p - -- debug $ "sendTo" <+> brackets (pretty own) - -- <+> pretty p - -- <+> braces (pretty co') - -- <+> pretty (LBS.length msg) + case q' of + Nothing -> do + writeTQueue _tcpConnDemand p + q <- newTBQueue outMessageQLen + modifyTVar _tcpSent (HM.insert p q) + pure q - maybe1 co' defer $ \co -> do - -- trace $ "writing to" <+> pretty co - q' <- atomically $ readTVar (view tcpConnQ bus) <&> HashMap.lookup co - maybe1 q' (warn $ "no queue for" <+> pretty co) $ \q -> do - atomically $ writeTQueue q (p, msg) + Just q -> pure q - where - defer = do - warn $ "defer" <+> pretty p - t <- getTimeCoarse - atomically $ modifyTVar (view tcpDefer bus) (HashMap.insertWith (<>) p [(t, msg)]) - atomically $ writeTQueue (view tcpDeferEv bus) () - - receive bus _ = liftIO do - let q = view tcpRecv bus - - ms <- atomically do - r <- readTQueue q - rs <- flushTQueue q - pure (r:rs) - - forM ms $ \(p, msg) -> pure (From p, msg) + atomically $ writeTBQueueDropSTM 10 queue msg + -- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE" + receive MessagingTCP{..} _ = liftIO do + atomically do + s0 <- readTBQueue _tcpReceived + sx <- flushTBQueue _tcpReceived + pure $ fmap (over _1 From) ( s0 : sx ) connectionId :: Word32 -> Word32 -> Word64 connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low @@ -145,7 +125,6 @@ connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low low = min a b hi = max a b - data ConnType = Server | Client deriving (Eq,Ord,Show,Generic) @@ -183,352 +162,225 @@ handshake Client env so = do sendCookie env so recvCookie env so -spawnConnection :: forall m . MonadIO m - => ConnType - -> MessagingTCP - -> Socket - -> SockAddr - -> m () - -spawnConnection tp env@MessagingTCP{..} so sa = liftIO do - - flip runContT pure $ do - - let myCookie = view tcpCookie env - let own = view tcpOwnPeer env - let newP = fromSockAddr @'TCP sa - - theirCookie <- handshake tp env so - - let connId = connectionId myCookie theirCookie - - when (tp == Client && theirCookie /= myCookie) do - pa <- toPeerAddr newP - liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection - - traceCmd own - ( "spawnConnection " - <+> viaShow tp - <+> pretty myCookie - <+> pretty connId ) - newP - - debug $ "handshake" <+> viaShow tp - <+> brackets (pretty (view tcpOwnPeer env)) - <+> pretty sa - <+> pretty theirCookie - <+> pretty connId - - used <- atomically $ do - modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1) - readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId +writeTBQueueDropSTM :: Integral n + => n + -> TBQueue a + -> a + -> STM () +writeTBQueueDropSTM inQLen newInQ bs = do + flip fix inQLen $ \more j -> do + when (j > 0) do + full <- isFullTBQueue newInQ + if not full then do + writeTBQueue newInQ bs + else do + void $ tryReadTBQueue newInQ + more (pred j) - void $ ContT $ bracket (pure connId) cleanupConn - - debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used - - -- when ( used <= 2 ) do - atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - - when (used == 1) do - - atomically $ modifyTVar _tcpSpawned succ - - q <- getWriteQueue connId - updatePeer connId newP - - debug $ "NEW PEER" <+> brackets (pretty own) - <+> pretty connId - <+> pretty newP - <+> parens ("used:" <+> pretty used) - - rd <- ContT $ withAsync $ fix \next -> do - - spx <- readFromSocket so 4 <&> LBS.toStrict - ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг - let px = word32 spx -- & fromIntegral - let size = word32 ssize & fromIntegral - - - bs <- readFromSocket so size - - memReqId newP px - - pxes <- readTVarIO (view tcpPeerPx env) - - let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes) - - -- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs) - - atomically $ writeTQueue (view tcpRecv env) (orig, bs) - - next - - wr <- ContT $ withAsync $ fix \next -> do - (rcpt, bs) <- atomically $ readTQueue q - - pq <- makeReqId rcpt - let qids = bytestring32 pq - let size = bytestring32 (fromIntegral $ LBS.length bs) - - let frame = LBS.fromStrict qids - <> LBS.fromStrict size -- req-size - <> bs -- payload - - sendLazy so frame --(LBS.toStrict frame) - next - - ContT $ bracket none $ \_ -> mapM cancel [rd,wr] - ContT $ bracket (pure connId) cleanupConn - ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred) - - void $ waitAnyCatchCancel [rd,wr] - - -- gracefulClose so 1000 - debug $ "spawnConnection exit" <+> pretty sa - - where - - memReqId newP px = - atomically $ modifyTVar (view tcpPeerXp env) (HashMap.insert newP px) - - makeReqId rcpt = do - let pxes = view tcpPeerPx env - let xpes = view tcpPeerXp env - - nq <- randomIO - atomically $ do - px <- readTVar xpes <&> HashMap.lookup rcpt - case px of - Just qq -> pure qq - Nothing -> do - modifyTVar pxes (HashMap.insert nq rcpt) - modifyTVar xpes (HashMap.insert rcpt nq) - pure nq - - updatePeer connId newP = atomically $ do - modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId) - modifyTVar (view tcpConnPeer env) (HashMap.insert connId newP) - - getWriteQueue connId = atomically $ do - readTVar (view tcpConnQ env) >>= \x -> do - case HashMap.lookup connId x of - Just qq -> pure qq - Nothing -> do - newQ <- newTQueue - modifyTVar (view tcpConnQ env) (HashMap.insert connId newQ) - pure newQ - - cleanupConn connId = atomically do - modifyTVar (view tcpConnUsed env) (HashMap.alter del connId) - used <- readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId - when (used == 0) do - p <- stateTVar (view tcpConnPeer env) - $ \x -> (HashMap.lookup connId x, HashMap.delete connId x) - - maybe1 p none $ \pp -> - modifyTVar (view tcpPeerConn env) (HashMap.delete pp) - - modifyTVar (view tcpConnQ env) (HashMap.delete connId) - - where - del = \case - Nothing -> Nothing - Just n | n <= 1 -> Nothing - | otherwise -> Just (pred n) - - -connectPeerTCP :: MonadIO m - => MessagingTCP - -> Peer L4Proto - -> m () - -connectPeerTCP env peer = liftIO do - pa <- toPeerAddr peer - let (L4Address _ (IPAddrPort (i,p))) = pa - - - here <- readTVarIO (view tcpPeerConn env) <&> HashMap.member peer - - unless here do - - case view tcpSOCKS5 env of - Nothing -> do - - connect (show i) (show p) $ \(sock, remoteAddr) -> do - spawnConnection Client env sock remoteAddr - shutdown sock ShutdownBoth - - Just socks5 -> do - - let (L4Address _ (IPAddrPort (socks,socksp))) = socks5 - - connectSOCKS5 (show socks) (show socksp) (show i) (show p) $ \(sock, socksAddr, _) -> do - - let (PeerL4{..}) = peer - - debug $ "CONNECTED VIA SOCKS5" <+> pretty socksAddr <+> pretty pa - - spawnConnection Client env sock _sockAddr - - shutdown sock ShutdownBoth - --- FIXME: link-all-asyncs - -fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m () -fireTCP MessagingTCP{..} pip what = do - void $ do - pa <- toPeerAddr @e pip - now <- getTimeCoarse - fire <- atomically do - here <- readTVar _tcpConnWip <&> HashMap.member pip - unless here do - modifyTVar _tcpConnWip (HashMap.insert pip now) - pure (not here) - when fire do - debug $ "Fire TCP" <+> pretty pa - atomically (modifyTVar _tcpFired succ) - void $ async do - r <- U.try @_ @SomeException what - atomically (modifyTVar _tcpFired pred) - either U.throwIO dontHandle r +killCookie :: Int -> Maybe Int +killCookie = \case + 1 -> Nothing + n -> Just (pred n) runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP env@MessagingTCP{..} = liftIO do void $ flip runContT pure do - own <- toPeerAddr $ view tcpOwnPeer env - let (L4Address _ (IPAddrPort (i,p))) = own + p1 <- ContT $ withAsync runClient + p2 <- ContT $ withAsync runServer - let defs = view tcpDefer env + waitAnyCatchCancel [p1,p2] + -- waitAnyCatchCancel [p2] + -- waitAnyCatchCancel [p1] - mon <- ContT $ withAsync $ forever do - pause @'Seconds 30 - now <- getTimeCoarse + where - -- FIXME: time-hardcode-again - let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30) - atomically $ modifyTVar defs - $ HashMap.mapMaybe - $ \es -> let rs = expire es - in case rs of - [] -> Nothing - xs -> Just xs + runServer :: forall m . MonadIO m => m () + runServer = do - con <- ContT $ withAsync $ forever do + own <- toPeerAddr $ view tcpOwnPeer env + let (L4Address _ (IPAddrPort (i,p))) = own + let myCookie = view tcpCookie env - let ev = view tcpDeferEv env + -- server + liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do + withFdSocket sock setCloseOnExecIfNeeded + debug $ "Listening on" <+> pretty sa - -- FIXME: wait-period-hardcode - void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev) + forever do + void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do + liftIO $ withFdSocket so setCloseOnExecIfNeeded + debug $ "!!! GOT INCOMING CONNECTION FROM !!!" + <+> brackets (pretty own) + <+> brackets (pretty sa) - dePips <- readTVarIO defs <&> HashMap.keys + cookie <- handshake Server env so + when (cookie == myCookie) $ exit () - forM_ dePips $ \pip -> void $ runMaybeT do + here <- atomically do + n <- readTVar _tcpPeerCookie <&> HM.member cookie - -- FIXME: make-sure-it-is-correct - already <- readTVarIO _tcpPeerXp <&> HashMap.member pip + unless n do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) - guard (not already) + pure n - msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip + when here $ do + debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so + exit () - unless (L.null msgs) do - trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs) + let newP = fromSockAddr @'TCP remote :: Peer L4Proto - let len = length msgs + -- FIXME: queue-size-hardcode + let inQLen = outMessageQLen - when (len > 10) do - -- FIXME: deferred-message-hardcoded - atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip) + newInQ <- liftIO $ newTBQueueIO inQLen - co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip + newOutQ <- do + atomically do + mbQ <- readTVar _tcpSent <&> HM.lookup newP + maybe (newTBQueue outMessageQLen) pure mbQ - when (isNothing co') do - trace $ red "No session for" <+> pretty pip - - lift $ maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do - q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co - maybe1 q' none $ \q -> do atomically do - mss <- readTVar defs <&> HashMap.findWithDefault mempty pip - modifyTVar defs $ HashMap.delete pip - forM_ mss $ \m -> writeTQueue q (pip, snd m) + modifyTVar _tcpSent (HM.insert newP newOutQ) + modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie)) - stat <- ContT $ withAsync $ forever do - pause @'Seconds 120 - ps <- readTVarIO $ view tcpConnPeer env - let peers = HashMap.toList ps - forM_ peers $ \(c,pip) -> do - used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c - trace $ "peer" <+> brackets (pretty own) - <+> pretty pip - <+> pretty c - <+> parens ("used:" <+> pretty used) + wr <- ContT $ withAsync $ forever do + bs <- atomically $ readTBQueue newOutQ - cleanup <- ContT $ withAsync $ forever do - pause @Seconds 60 - now <- getTimeCoarse - connWip <- readTVarIO _tcpConnWip <&> HashMap.toList - let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ] - atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive) + -- FIXME: check-this! + let pq = myCookie -- randomIO + let qids = bytestring32 pq + let size = bytestring32 (fromIntegral $ LBS.length bs) - mapM_ link [mon,con,stat,cleanup] + let frame = LBS.fromStrict qids + <> LBS.fromStrict size -- req-size + <> bs -- payload - probes <- ContT $ withAsync $ forever do - pause @'Seconds 10 - probe <- readTVarIO _tcpProbe - acceptReport probe =<< S.toList_ do - S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size) - S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral) - S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral) - S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size) + sendLazy so frame --(LBS.toStrict frame) - ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup] + rd <- ContT $ withAsync $ forever do - liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do - withFdSocket sock setCloseOnExecIfNeeded - debug $ "Listening on" <+> pretty sa + spx <- readFromSocket so 4 <&> LBS.toStrict + ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг + let px = word32 spx -- & fromIntegral + let size = word32 ssize & fromIntegral - forever do - void $ acceptFork sock $ \(so, remote) -> do - withFdSocket so setCloseOnExecIfNeeded - trace $ "GOT INCOMING CONNECTION FROM" - <+> brackets (pretty own) - <+> brackets (pretty sa) - <+> pretty remote + bs <- readFromSocket so size - void $ try @SomeException $ do + -- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote - spawnConnection Server env so remote + atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs) - -- gracefulClose so 1000 + void $ ContT $ bracket none $ const do + debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote + cancel rd + cancel wr + shutdown so ShutdownBoth - -- TODO: probably-cleanup-peer - -- TODO: periodically-drop-inactive-connections + atomically do + modifyTVar _tcpSent (HM.delete newP) + modifyTVar _tcpPeerCookie $ \m -> do + HM.update killCookie cookie m - debug $ "CLOSING CONNECTION" <+> pretty remote - shutdown so ShutdownBoth - close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat] + void $ waitAnyCatchCancel [rd,wr] -traceCmd :: forall a ann b m . ( Pretty a - , Pretty b - , MonadIO m - ) - => a -> Doc ann -> b -> m () + runClient :: forall m . MonadIO m => m () + runClient = do + + own <- toPeerAddr $ view tcpOwnPeer env + let (L4Address _ (IPAddrPort (i,p))) = own + let myCookie = view tcpCookie env + + pause @'Seconds 30 + + forever $ void $ runMaybeT do + -- client sockets + + -- смотрим к кому надо + who <- atomically $ readTQueue _tcpConnDemand + whoAddr <- toPeerAddr who + + already <- atomically $ readTVar _tcpPeerConn <&> HM.member who + + when already do + debug "SHIT? BUSYLOOP?" + mzero + + -- FIXME: !!! + liftIO $ asyncLinked do + let (L4Address _ (IPAddrPort (ip,port))) = whoAddr + connect (show ip) (show port) $ \(so, remoteAddr) -> do + flip runContT pure $ callCC \exit -> do + + debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr + cookie <- handshake Client env so + let connId = connectionId cookie myCookie + + when (cookie == myCookie) $ exit () + + here <- atomically do + n <- readTVar _tcpPeerCookie <&> HM.member cookie + + unless n do + modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + + modifyTVar _tcpPeerConn (HM.insert who connId) + + pure n + + -- TODO: handshake notification + liftIO $ _tcpOnClientStarted whoAddr connId + + when here do + debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port + exit () + + atomically $ modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1) + + wr <- ContT $ withAsync $ forever do + bss <- atomically do + q' <- readTVar _tcpSent <&> HM.lookup who + maybe1 q' mempty $ \q -> do + s <- readTBQueue q + sx <- flushTBQueue q + pure (s:sx) + + for_ bss $ \bs -> do + -- FIXME: check-this! + let pq = myCookie -- randomIO + let qids = bytestring32 pq + let size = bytestring32 (fromIntegral $ LBS.length bs) + + let frame = LBS.fromStrict qids + <> LBS.fromStrict size -- req-size + <> bs -- payload + + sendLazy so frame --(LBS.toStrict frame) + + void $ ContT $ bracket none (const $ cancel wr) + + void $ ContT $ bracket none $ const $ do + atomically do + modifyTVar _tcpPeerConn (HM.delete who) + modifyTVar _tcpPeerCookie $ \m -> do + HM.update killCookie cookie m + + forever do + + spx <- readFromSocket so 4 <&> LBS.toStrict + ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг + let px = word32 spx -- & fromIntegral + let size = word32 ssize & fromIntegral + + bs <- readFromSocket so size + + -- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr + + atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs) -traceCmd p1 s p2 = do - trace $ brackets (pretty p1) - <+> s - <+> parens (pretty p2)