mirror of https://github.com/voidlizard/hbs2
wip, refactored
This commit is contained in:
parent
a8a58be27e
commit
b8abb8bddd
|
@ -1,4 +1,5 @@
|
||||||
{-# Language TemplateHaskell #-}
|
{-# Language TemplateHaskell #-}
|
||||||
|
{-# LANGUAGE ImplicitParams #-}
|
||||||
module HBS2.Net.Messaging.TCP
|
module HBS2.Net.Messaging.TCP
|
||||||
( MessagingTCP
|
( MessagingTCP
|
||||||
, runMessagingTCP
|
, runMessagingTCP
|
||||||
|
@ -19,17 +20,14 @@ import HBS2.Prelude.Plated
|
||||||
import HBS2.Net.Messaging.Stream
|
import HBS2.Net.Messaging.Stream
|
||||||
|
|
||||||
import HBS2.System.Logger.Simple
|
import HBS2.System.Logger.Simple
|
||||||
import HBS2.Misc.PrettyStuff
|
|
||||||
|
|
||||||
import Control.Concurrent.STM (flushTQueue,retry)
|
|
||||||
import Control.Monad.Trans.Maybe
|
import Control.Monad.Trans.Maybe
|
||||||
import Data.Bits
|
import Data.Bits
|
||||||
import Data.ByteString.Lazy (ByteString)
|
import Data.ByteString.Lazy (ByteString)
|
||||||
import Data.ByteString.Lazy qualified as LBS
|
import Data.ByteString.Lazy qualified as LBS
|
||||||
import Data.Function
|
|
||||||
import Data.HashMap.Strict (HashMap)
|
import Data.HashMap.Strict (HashMap)
|
||||||
import Data.HashMap.Strict qualified as HM
|
import Data.HashMap.Strict qualified as HM
|
||||||
import Data.List qualified as L
|
import Data.HashSet qualified as HS
|
||||||
import Data.Maybe
|
import Data.Maybe
|
||||||
import Data.Word
|
import Data.Word
|
||||||
import Lens.Micro.Platform
|
import Lens.Micro.Platform
|
||||||
|
@ -64,12 +62,27 @@ data MessagingTCP =
|
||||||
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
, _tcpConnDemand :: TQueue (Peer L4Proto)
|
||||||
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
|
||||||
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
|
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
|
||||||
|
, _tcpClientThreadNum :: TVar Int
|
||||||
|
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
|
||||||
, _tcpProbe :: TVar AnyProbe
|
, _tcpProbe :: TVar AnyProbe
|
||||||
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
||||||
}
|
}
|
||||||
|
|
||||||
makeLenses 'MessagingTCP
|
makeLenses 'MessagingTCP
|
||||||
|
|
||||||
|
|
||||||
|
newClientThread :: forall m . MonadUnliftIO m => MessagingTCP -> m () -> m Int
|
||||||
|
newClientThread MessagingTCP{..} a = do
|
||||||
|
as <- async a
|
||||||
|
atomically do
|
||||||
|
n <- stateTVar _tcpClientThreadNum ( \x -> (x, succ x))
|
||||||
|
modifyTVar _tcpClientThreads (HM.insert n as)
|
||||||
|
pure n
|
||||||
|
|
||||||
|
delClientThread :: MonadIO m => MessagingTCP -> Int -> m ()
|
||||||
|
delClientThread MessagingTCP{..} threadId = atomically $
|
||||||
|
modifyTVar' _tcpClientThreads (HM.delete threadId)
|
||||||
|
|
||||||
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
|
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
|
||||||
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
|
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
|
||||||
|
|
||||||
|
@ -88,6 +101,8 @@ newMessagingTCP pa = liftIO do
|
||||||
<*> newTQueueIO
|
<*> newTQueueIO
|
||||||
<*> newTBQueueIO outMessageQLen
|
<*> newTBQueueIO outMessageQLen
|
||||||
<*> newTVarIO mempty
|
<*> newTVarIO mempty
|
||||||
|
<*> newTVarIO 0
|
||||||
|
<*> newTVarIO mempty
|
||||||
<*> newTVarIO (AnyProbe ())
|
<*> newTVarIO (AnyProbe ())
|
||||||
<*> pure (\_ _ -> none) -- do nothing by default
|
<*> pure (\_ _ -> none) -- do nothing by default
|
||||||
|
|
||||||
|
@ -183,6 +198,14 @@ killCookie = \case
|
||||||
1 -> Nothing
|
1 -> Nothing
|
||||||
n -> Just (pred n)
|
n -> Just (pred n)
|
||||||
|
|
||||||
|
useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
|
||||||
|
useCookie cookie = atomically do
|
||||||
|
let MessagingTCP{..} = ?env
|
||||||
|
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
||||||
|
unless n do
|
||||||
|
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
||||||
|
pure n
|
||||||
|
|
||||||
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
||||||
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
|
@ -193,13 +216,26 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
p1 <- ContT $ withAsync runClient
|
p1 <- ContT $ withAsync runClient
|
||||||
p2 <- ContT $ withAsync runServer
|
p2 <- ContT $ withAsync runServer
|
||||||
|
|
||||||
waitAnyCatchCancel [p1,p2]
|
probes <- ContT $ withAsync $ forever do
|
||||||
-- waitAnyCatchCancel [p2]
|
pause @'Seconds 10
|
||||||
-- waitAnyCatchCancel [p1]
|
p <- readTVarIO _tcpProbe
|
||||||
|
acceptReport p =<< S.toList_ do
|
||||||
|
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
|
||||||
|
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
|
||||||
|
S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size)
|
||||||
|
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
|
||||||
|
|
||||||
|
waitAnyCatchCancel [p1,p2,probes]
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
runServer :: forall m . MonadIO m => m ()
|
readFrames so peer queue = forever $ do
|
||||||
|
void $ readFromSocket so 4 <&> LBS.toStrict
|
||||||
|
ssize <- readFromSocket so 4 <&> LBS.toStrict
|
||||||
|
let size = word32 ssize & fromIntegral
|
||||||
|
bs <- readFromSocket so size
|
||||||
|
atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs)
|
||||||
|
|
||||||
runServer = do
|
runServer = do
|
||||||
|
|
||||||
own <- toPeerAddr $ view tcpOwnPeer env
|
own <- toPeerAddr $ view tcpOwnPeer env
|
||||||
|
@ -218,17 +254,13 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
<+> brackets (pretty own)
|
<+> brackets (pretty own)
|
||||||
<+> brackets (pretty sa)
|
<+> brackets (pretty sa)
|
||||||
|
|
||||||
|
let ?env = env
|
||||||
|
|
||||||
cookie <- handshake Server env so
|
cookie <- handshake Server env so
|
||||||
|
|
||||||
when (cookie == myCookie) $ exit ()
|
when (cookie == myCookie) $ exit ()
|
||||||
|
|
||||||
here <- atomically do
|
here <- useCookie cookie
|
||||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
|
||||||
|
|
||||||
unless n do
|
|
||||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
|
||||||
|
|
||||||
pure n
|
|
||||||
|
|
||||||
when here $ do
|
when here $ do
|
||||||
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
|
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
|
||||||
|
@ -236,11 +268,6 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
|
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
|
||||||
|
|
||||||
-- FIXME: queue-size-hardcode
|
|
||||||
let inQLen = outMessageQLen
|
|
||||||
|
|
||||||
newInQ <- liftIO $ newTBQueueIO inQLen
|
|
||||||
|
|
||||||
newOutQ <- do
|
newOutQ <- do
|
||||||
atomically do
|
atomically do
|
||||||
mbQ <- readTVar _tcpSent <&> HM.lookup newP
|
mbQ <- readTVar _tcpSent <&> HM.lookup newP
|
||||||
|
@ -264,18 +291,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
sendLazy so frame --(LBS.toStrict frame)
|
sendLazy so frame --(LBS.toStrict frame)
|
||||||
|
|
||||||
rd <- ContT $ withAsync $ forever do
|
rd <- ContT $ withAsync $ readFrames so newP _tcpReceived
|
||||||
|
|
||||||
spx <- readFromSocket so 4 <&> LBS.toStrict
|
|
||||||
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
|
|
||||||
let px = word32 spx -- & fromIntegral
|
|
||||||
let size = word32 ssize & fromIntegral
|
|
||||||
|
|
||||||
bs <- readFromSocket so size
|
|
||||||
|
|
||||||
-- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote
|
|
||||||
|
|
||||||
atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs)
|
|
||||||
|
|
||||||
void $ ContT $ bracket none $ const do
|
void $ ContT $ bracket none $ const do
|
||||||
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
|
||||||
|
@ -285,21 +301,32 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _tcpSent (HM.delete newP)
|
modifyTVar _tcpSent (HM.delete newP)
|
||||||
modifyTVar _tcpPeerCookie $ \m -> do
|
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||||
HM.update killCookie cookie m
|
|
||||||
|
|
||||||
void $ waitAnyCatchCancel [rd,wr]
|
void $ waitAnyCatchCancel [rd,wr]
|
||||||
|
|
||||||
|
|
||||||
runClient :: forall m . MonadIO m => m ()
|
runClient = flip runContT pure do
|
||||||
runClient = do
|
|
||||||
|
|
||||||
own <- toPeerAddr $ view tcpOwnPeer env
|
own <- toPeerAddr $ view tcpOwnPeer env
|
||||||
let (L4Address _ (IPAddrPort (i,p))) = own
|
let (L4Address _ (IPAddrPort (i,_))) = own
|
||||||
let myCookie = view tcpCookie env
|
let myCookie = view tcpCookie env
|
||||||
|
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
|
||||||
|
void $ ContT $ bracket none $ const $ do
|
||||||
|
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
|
||||||
|
mapM_ cancel what
|
||||||
|
|
||||||
|
void $ ContT $ withAsync $ forever do
|
||||||
|
pause @Seconds 60
|
||||||
|
|
||||||
|
done <- readTVarIO _tcpClientThreads <&> HM.toList
|
||||||
|
>>= filterM ( \(_,x) -> isJust <$> poll x )
|
||||||
|
<&> HS.fromList . fmap fst
|
||||||
|
|
||||||
|
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
|
||||||
|
|
||||||
forever $ void $ runMaybeT do
|
forever $ void $ runMaybeT do
|
||||||
-- client sockets
|
-- client sockets
|
||||||
|
|
||||||
|
@ -313,10 +340,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
debug "SHIT? BUSYLOOP?"
|
debug "SHIT? BUSYLOOP?"
|
||||||
mzero
|
mzero
|
||||||
|
|
||||||
-- FIXME: !!!
|
liftIO $ newClientThread env $ do
|
||||||
liftIO $ async do
|
|
||||||
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
|
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
|
||||||
connect (show ip) (show port) $ \(so, remoteAddr) -> do
|
connect (show ip) (show port) $ \(so, remoteAddr) -> do
|
||||||
|
|
||||||
|
let ?env = env
|
||||||
|
|
||||||
flip runContT pure $ callCC \exit -> do
|
flip runContT pure $ callCC \exit -> do
|
||||||
|
|
||||||
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
|
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
|
||||||
|
@ -327,15 +356,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
debug $ "same peer, exit" <+> pretty remoteAddr
|
debug $ "same peer, exit" <+> pretty remoteAddr
|
||||||
exit ()
|
exit ()
|
||||||
|
|
||||||
here <- atomically do
|
here <- useCookie cookie
|
||||||
n <- readTVar _tcpPeerCookie <&> HM.member cookie
|
|
||||||
|
|
||||||
unless n do
|
|
||||||
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
|
|
||||||
|
|
||||||
modifyTVar _tcpPeerConn (HM.insert who connId)
|
|
||||||
|
|
||||||
pure n
|
|
||||||
|
|
||||||
-- TODO: handshake notification
|
-- TODO: handshake notification
|
||||||
liftIO $ _tcpOnClientStarted whoAddr connId
|
liftIO $ _tcpOnClientStarted whoAddr connId
|
||||||
|
@ -371,20 +392,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||||
void $ ContT $ bracket none $ const $ do
|
void $ ContT $ bracket none $ const $ do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _tcpPeerConn (HM.delete who)
|
modifyTVar _tcpPeerConn (HM.delete who)
|
||||||
modifyTVar _tcpPeerCookie $ \m -> do
|
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
|
||||||
HM.update killCookie cookie m
|
|
||||||
|
|
||||||
forever do
|
|
||||||
|
|
||||||
spx <- readFromSocket so 4 <&> LBS.toStrict
|
|
||||||
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
|
|
||||||
let px = word32 spx -- & fromIntegral
|
|
||||||
let size = word32 ssize & fromIntegral
|
|
||||||
|
|
||||||
bs <- readFromSocket so size
|
|
||||||
|
|
||||||
-- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr
|
|
||||||
|
|
||||||
atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs)
|
|
||||||
|
|
||||||
|
readFrames so who _tcpReceived
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue