mirror of https://github.com/voidlizard/hbs2
wip, refactored
This commit is contained in:
parent
35b91e5a79
commit
e64f3e9c41
|
@ -1,4 +1,5 @@
|
|||
{-# Language TemplateHaskell #-}
|
||||
{-# LANGUAGE ImplicitParams #-}
|
||||
module HBS2.Net.Messaging.TCP
|
||||
( MessagingTCP
|
||||
, runMessagingTCP
|
||||
|
@ -19,17 +20,14 @@ import HBS2.Prelude.Plated
|
|||
import HBS2.Net.Messaging.Stream
|
||||
|
||||
import HBS2.System.Logger.Simple
|
||||
import HBS2.Misc.PrettyStuff
|
||||
|
||||
import Control.Concurrent.STM (flushTQueue,retry)
|
||||
import Control.Monad.Trans.Maybe
|
||||
import Data.Bits
|
||||
import Data.ByteString.Lazy (ByteString)
|
||||
import Data.ByteString.Lazy qualified as LBS
|
||||
import Data.Function
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.List qualified as L
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.Maybe
|
||||
import Data.Word
|
||||
import Lens.Micro.Platform
|
||||
|
@ -64,12 +62,27 @@ data MessagingTCP =
|
|||
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
||||
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
||||
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
|
||||
, _tcpClientThreadNum :: TVar Int
|
||||
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
|
||||
, _tcpProbe :: TVar AnyProbe
|
||||
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
||||
}
|
||||
|
||||
makeLenses 'MessagingTCP
|
||||
|
||||
|
||||
newClientThread :: forall m . MonadUnliftIO m => MessagingTCP -> m () -> m Int
|
||||
newClientThread MessagingTCP{..} a = do
|
||||
as <- async a
|
||||
atomically do
|
||||
n <- stateTVar _tcpClientThreadNum ( \x -> (x, succ x))
|
||||
modifyTVar _tcpClientThreads (HM.insert n as)
|
||||
pure n
|
||||
|
||||
delClientThread :: MonadIO m => MessagingTCP -> Int -> m ()
|
||||
delClientThread MessagingTCP{..} threadId = atomically $
|
||||
modifyTVar' _tcpClientThreads (HM.delete threadId)
|
||||
|
||||
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
|
||||
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
|
||||
|
||||
|
@ -88,6 +101,8 @@ newMessagingTCP pa = liftIO do
|
|||
<*> newTQueueIO
|
||||
<*> newTBQueueIO outMessageQLen
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO 0
|
||||
<*> newTVarIO mempty
|
||||
<*> newTVarIO (AnyProbe ())
|
||||
<*> pure (\_ _ -> none) -- do nothing by default
|
||||
|
||||
|
@ -183,6 +198,14 @@ killCookie = \case
|
|||
1 -> Nothing
|
||||
n -> Just (pred n)
|
||||
|
||||
useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
|
||||
useCookie cookie = atomically do
|
||||
let MessagingTCP{..} = ?env
|
||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
||||
unless n do
|
||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||
pure n
|
||||
|
||||
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
||||
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||
|
||||
|
@ -193,13 +216,26 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
p1 <- ContT $ withAsync runClient
|
||||
p2 <- ContT $ withAsync runServer
|
||||
|
||||
waitAnyCatchCancel [p1,p2]
|
||||
-- waitAnyCatchCancel [p2]
|
||||
-- waitAnyCatchCancel [p1]
|
||||
probes <- ContT $ withAsync $ forever do
|
||||
pause @'Seconds 10
|
||||
p <- readTVarIO _tcpProbe
|
||||
acceptReport p =<< S.toList_ do
|
||||
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
|
||||
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
|
||||
S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size)
|
||||
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
|
||||
|
||||
waitAnyCatchCancel [p1,p2,probes]
|
||||
|
||||
where
|
||||
|
||||
runServer :: forall m . MonadIO m => m ()
|
||||
readFrames so peer queue = forever $ do
|
||||
void $ readFromSocket so 4 <&> LBS.toStrict
|
||||
ssize <- readFromSocket so 4 <&> LBS.toStrict
|
||||
let size = word32 ssize & fromIntegral
|
||||
bs <- readFromSocket so size
|
||||
atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs)
|
||||
|
||||
runServer = do
|
||||
|
||||
own <- toPeerAddr $ view tcpOwnPeer env
|
||||
|
@ -218,17 +254,13 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
<+> brackets (pretty own)
|
||||
<+> brackets (pretty sa)
|
||||
|
||||
let ?env = env
|
||||
|
||||
cookie <- handshake Server env so
|
||||
|
||||
when (cookie == myCookie) $ exit ()
|
||||
|
||||
here <- atomically do
|
||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
||||
|
||||
unless n do
|
||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||
|
||||
pure n
|
||||
here <- useCookie cookie
|
||||
|
||||
when here $ do
|
||||
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
|
||||
|
@ -236,11 +268,6 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
|
||||
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
|
||||
|
||||
-- FIXME: queue-size-hardcode
|
||||
let inQLen = outMessageQLen
|
||||
|
||||
newInQ <- liftIO $ newTBQueueIO inQLen
|
||||
|
||||
newOutQ <- do
|
||||
atomically do
|
||||
mbQ <- readTVar _tcpSent <&> HM.lookup newP
|
||||
|
@ -264,18 +291,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
|
||||
sendLazy so frame --(LBS.toStrict frame)
|
||||
|
||||
rd <- ContT $ withAsync $ forever do
|
||||
|
||||
spx <- readFromSocket so 4 <&> LBS.toStrict
|
||||
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
|
||||
let px = word32 spx -- & fromIntegral
|
||||
let size = word32 ssize & fromIntegral
|
||||
|
||||
bs <- readFromSocket so size
|
||||
|
||||
-- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote
|
||||
|
||||
atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs)
|
||||
rd <- ContT $ withAsync $ readFrames so newP _tcpReceived
|
||||
|
||||
void $ ContT $ bracket none $ const do
|
||||
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
||||
|
@ -285,21 +301,32 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
|
||||
atomically do
|
||||
modifyTVar _tcpSent (HM.delete newP)
|
||||
modifyTVar _tcpPeerCookie $ \m -> do
|
||||
HM.update killCookie cookie m
|
||||
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||
|
||||
void $ waitAnyCatchCancel [rd,wr]
|
||||
|
||||
|
||||
runClient :: forall m . MonadIO m => m ()
|
||||
runClient = do
|
||||
runClient = flip runContT pure do
|
||||
|
||||
own <- toPeerAddr $ view tcpOwnPeer env
|
||||
let (L4Address _ (IPAddrPort (i,p))) = own
|
||||
let (L4Address _ (IPAddrPort (i,_))) = own
|
||||
let myCookie = view tcpCookie env
|
||||
|
||||
pause @'Seconds 10
|
||||
|
||||
void $ ContT $ bracket none $ const $ do
|
||||
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
|
||||
mapM_ cancel what
|
||||
|
||||
void $ ContT $ withAsync $ forever do
|
||||
pause @Seconds 60
|
||||
|
||||
done <- readTVarIO _tcpClientThreads <&> HM.toList
|
||||
>>= filterM ( \(_,x) -> isJust <$> poll x )
|
||||
<&> HS.fromList . fmap fst
|
||||
|
||||
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
|
||||
|
||||
forever $ void $ runMaybeT do
|
||||
-- client sockets
|
||||
|
||||
|
@ -313,10 +340,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
debug "SHIT? BUSYLOOP?"
|
||||
mzero
|
||||
|
||||
-- FIXME: !!!
|
||||
liftIO $ async do
|
||||
liftIO $ newClientThread env $ do
|
||||
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
|
||||
connect (show ip) (show port) $ \(so, remoteAddr) -> do
|
||||
|
||||
let ?env = env
|
||||
|
||||
flip runContT pure $ callCC \exit -> do
|
||||
|
||||
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
|
||||
|
@ -327,15 +356,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
debug $ "same peer, exit" <+> pretty remoteAddr
|
||||
exit ()
|
||||
|
||||
here <- atomically do
|
||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
||||
|
||||
unless n do
|
||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||
|
||||
modifyTVar _tcpPeerConn (HM.insert who connId)
|
||||
|
||||
pure n
|
||||
here <- useCookie cookie
|
||||
|
||||
-- TODO: handshake notification
|
||||
liftIO $ _tcpOnClientStarted whoAddr connId
|
||||
|
@ -371,20 +392,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
|||
void $ ContT $ bracket none $ const $ do
|
||||
atomically do
|
||||
modifyTVar _tcpPeerConn (HM.delete who)
|
||||
modifyTVar _tcpPeerCookie $ \m -> do
|
||||
HM.update killCookie cookie m
|
||||
|
||||
forever do
|
||||
|
||||
spx <- readFromSocket so 4 <&> LBS.toStrict
|
||||
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
|
||||
let px = word32 spx -- & fromIntegral
|
||||
let size = word32 ssize & fromIntegral
|
||||
|
||||
bs <- readFromSocket so size
|
||||
|
||||
-- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr
|
||||
|
||||
atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs)
|
||||
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||
|
||||
readFrames so who _tcpReceived
|
||||
|
||||
|
|
Loading…
Reference in New Issue