This commit is contained in:
voidlizard 2024-11-03 13:13:59 +03:00
parent 2c6d260895
commit 4fb9c948be
1 changed files with 24 additions and 9 deletions

View File

@ -61,6 +61,7 @@ data MessagingTCP =
, _tcpCookie :: Word32
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
@ -103,6 +104,7 @@ newMessagingTCP pa = liftIO do
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty
@ -239,6 +241,14 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
sweepCookies <- ContT $ withAsync $ forever do
pause @'Seconds 10
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 60
atomically do
@ -247,7 +257,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep]
waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies]
where
@ -265,11 +275,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
n -> Just (pred n)
-- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
useCookie cookie = atomically do
useCookie peer cookie = atomically do
let MessagingTCP{..} = ?env
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerToCookie (HM.insert peer cookie)
pure n
-- FIXME: timeout-hardcode
@ -299,18 +310,18 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
let ?env = env
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
cookie <- handshake Server env so
when (cookie == myCookie) $ exit ()
here <- useCookie cookie
here <- useCookie newP cookie
when here $ do
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow remote
exit ()
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
atomically $ modifyTVar _tcpServerThreadsCount succ
newOutQ <- do
@ -341,8 +352,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
atomically $ modifyTVar _tcpServerThreadsCount pred
atomically $ modifyTVar _tcpPeerSocket (HM.delete newP)
atomically do
modifyTVar _tcpServerThreadsCount pred
modifyTVar _tcpPeerSocket (HM.delete newP)
modifyTVar _tcpPeerToCookie (HM.delete newP)
shutdown so ShutdownBoth
cancel rd
cancel wr
@ -404,7 +418,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
debug $ "same peer, exit" <+> pretty remoteAddr
exit ()
here <- useCookie cookie
here <- useCookie who cookie
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
@ -443,6 +457,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
modifyTVar _tcpPeerToCookie (HM.delete who)
modifyTVar _tcpPeerSocket (HM.delete who)
void $ ContT $ bracket none (const $ cancel wr)