This commit is contained in:
voidlizard 2024-11-03 10:04:57 +03:00
parent e64f3e9c41
commit a360dfb7ec
1 changed files with 69 additions and 29 deletions

View File

@ -13,6 +13,7 @@ module HBS2.Net.Messaging.TCP
) where
import HBS2.Clock
import HBS2.OrDie
import HBS2.Net.IP.Addr
import HBS2.Net.Messaging
import HBS2.Prelude.Plated
@ -64,6 +65,7 @@ data MessagingTCP =
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
, _tcpClientThreadNum :: TVar Int
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
, _tcpServerThreadsCount :: TVar Int
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
@ -103,6 +105,7 @@ newMessagingTCP pa = liftIO do
<*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
@ -193,18 +196,12 @@ writeTBQueueDropSTM inQLen newInQ bs = do
more (pred j)
killCookie :: Int -> Maybe Int
killCookie = \case
1 -> Nothing
n -> Just (pred n)
useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
useCookie cookie = atomically do
let MessagingTCP{..} = ?env
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
pure n
data TCPMessagingError =
TCPPeerReadTimeout
deriving stock (Show,Typeable)
instance Exception TCPMessagingError
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
@ -221,15 +218,51 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
p <- readTVarIO _tcpProbe
acceptReport p =<< S.toList_ do
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral )
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size)
coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",)
let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ]
S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo)
S.yield ("tcpPeerCookieUsed", cooNn)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
waitAnyCatchCancel [p1,p2,probes]
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 60
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep]
where
readFrames so peer queue = forever $ do
withTCPTimeout timeout action = liftIO do
r <- race (pause timeout) action
case r of
Right{} -> pure ()
Left{} -> do
debug "tcp connection timeout!"
throwIO TCPPeerReadTimeout
killCookie :: Int -> Maybe Int
killCookie = \case
n | n <= 1 -> Nothing
n -> Just (pred n)
-- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
useCookie cookie = atomically do
let MessagingTCP{..} = ?env
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
pure n
-- FIXME: timeout-hardcode
readFrames so peer queue = forever $ withTCPTimeout (TimeoutSec 67) do
void $ readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict
let size = word32 ssize & fromIntegral
@ -251,8 +284,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do
liftIO $ withFdSocket so setCloseOnExecIfNeeded
debug $ "!!! GOT INCOMING CONNECTION FROM !!!"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> brackets (pretty remote)
let ?env = env
@ -268,6 +300,8 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
atomically $ modifyTVar _tcpServerThreadsCount succ
newOutQ <- do
atomically do
mbQ <- readTVar _tcpSent <&> HM.lookup newP
@ -295,6 +329,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
atomically $ modifyTVar _tcpServerThreadsCount pred
shutdown so ShutdownBoth
cancel rd
cancel wr
@ -334,6 +369,8 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who
debug $ "DEMAND:" <+> pretty who
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
when already do
@ -365,7 +402,9 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
atomically $ modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
@ -390,6 +429,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none (const $ cancel wr)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)