mirror of https://github.com/voidlizard/hbs2
wip, tcp
This commit is contained in:
parent
68baa0adb1
commit
63bd5c0aa6
|
@ -61,6 +61,7 @@ data MessagingTCP =
|
||||||
, _tcpCookie :: Word32
|
, _tcpCookie :: Word32
|
||||||
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
|
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
|
||||||
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
|
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
|
||||||
|
, _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32)
|
||||||
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
|
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
|
||||||
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
||||||
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
||||||
|
@ -103,6 +104,7 @@ newMessagingTCP pa = liftIO do
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
|
<*> newTVarIO mempty
|
||||||
<*> newTQueueIO
|
<*> newTQueueIO
|
||||||
<*> newTBQueueIO outMessageQLen
|
<*> newTBQueueIO outMessageQLen
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
|
@ -239,6 +241,14 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
|
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
|
||||||
|
|
||||||
|
sweepCookies <- ContT $ withAsync $ forever do
|
||||||
|
pause @'Seconds 10
|
||||||
|
atomically do
|
||||||
|
pips <- readTVar _tcpPeerConn
|
||||||
|
modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
|
||||||
|
alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
|
||||||
|
modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
|
||||||
|
|
||||||
sweep <- ContT $ withAsync $ forever do
|
sweep <- ContT $ withAsync $ forever do
|
||||||
pause @'Seconds 60
|
pause @'Seconds 60
|
||||||
atomically do
|
atomically do
|
||||||
|
@ -247,7 +257,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
|
modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
|
||||||
modifyTVar _tcpPeerCookie (HM.filter (>=1))
|
modifyTVar _tcpPeerCookie (HM.filter (>=1))
|
||||||
|
|
||||||
waitAnyCatchCancel [p1,p2,probes,sweep]
|
waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies]
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
@ -265,11 +275,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
n -> Just (pred n)
|
n -> Just (pred n)
|
||||||
|
|
||||||
-- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
|
-- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
|
||||||
useCookie cookie = atomically do
|
useCookie peer cookie = atomically do
|
||||||
let MessagingTCP{..} = ?env
|
let MessagingTCP{..} = ?env
|
||||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
||||||
unless n do
|
unless n do
|
||||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||||
|
modifyTVar _tcpPeerToCookie (HM.insert peer cookie)
|
||||||
pure n
|
pure n
|
||||||
|
|
||||||
-- FIXME: timeout-hardcode
|
-- FIXME: timeout-hardcode
|
||||||
|
@ -299,18 +310,18 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
let ?env = env
|
let ?env = env
|
||||||
|
|
||||||
|
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
|
||||||
|
|
||||||
cookie <- handshake Server env so
|
cookie <- handshake Server env so
|
||||||
|
|
||||||
when (cookie == myCookie) $ exit ()
|
when (cookie == myCookie) $ exit ()
|
||||||
|
|
||||||
here <- useCookie cookie
|
here <- useCookie newP cookie
|
||||||
|
|
||||||
when here $ do
|
when here $ do
|
||||||
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
|
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow remote
|
||||||
exit ()
|
exit ()
|
||||||
|
|
||||||
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
|
|
||||||
|
|
||||||
atomically $ modifyTVar _tcpServerThreadsCount succ
|
atomically $ modifyTVar _tcpServerThreadsCount succ
|
||||||
|
|
||||||
newOutQ <- do
|
newOutQ <- do
|
||||||
|
@ -341,8 +352,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
void $ ContT $ bracket none $ const do
|
void $ ContT $ bracket none $ const do
|
||||||
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
||||||
atomically $ modifyTVar _tcpServerThreadsCount pred
|
atomically do
|
||||||
atomically $ modifyTVar _tcpPeerSocket (HM.delete newP)
|
modifyTVar _tcpServerThreadsCount pred
|
||||||
|
modifyTVar _tcpPeerSocket (HM.delete newP)
|
||||||
|
modifyTVar _tcpPeerToCookie (HM.delete newP)
|
||||||
|
|
||||||
shutdown so ShutdownBoth
|
shutdown so ShutdownBoth
|
||||||
cancel rd
|
cancel rd
|
||||||
cancel wr
|
cancel wr
|
||||||
|
@ -404,7 +418,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
debug $ "same peer, exit" <+> pretty remoteAddr
|
debug $ "same peer, exit" <+> pretty remoteAddr
|
||||||
exit ()
|
exit ()
|
||||||
|
|
||||||
here <- useCookie cookie
|
here <- useCookie who cookie
|
||||||
|
|
||||||
-- TODO: handshake notification
|
-- TODO: handshake notification
|
||||||
liftIO $ _tcpOnClientStarted whoAddr connId
|
liftIO $ _tcpOnClientStarted whoAddr connId
|
||||||
|
@ -443,6 +457,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _tcpPeerConn (HM.delete who)
|
modifyTVar _tcpPeerConn (HM.delete who)
|
||||||
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||||
|
modifyTVar _tcpPeerToCookie (HM.delete who)
|
||||||
modifyTVar _tcpPeerSocket (HM.delete who)
|
modifyTVar _tcpPeerSocket (HM.delete who)
|
||||||
|
|
||||||
void $ ContT $ bracket none (const $ cancel wr)
|
void $ ContT $ bracket none (const $ cancel wr)
|
||||||
|
|
Loading…
Reference in New Issue