wip, refactored

This commit is contained in:
voidlizard 2024-11-03 08:20:14 +03:00
parent 133ddd5df1
commit 56777747a0
1 changed files with 71 additions and 63 deletions

View File

@ -1,4 +1,5 @@
{-# Language TemplateHaskell #-} {-# Language TemplateHaskell #-}
{-# LANGUAGE ImplicitParams #-}
module HBS2.Net.Messaging.TCP module HBS2.Net.Messaging.TCP
( MessagingTCP ( MessagingTCP
, runMessagingTCP , runMessagingTCP
@ -19,17 +20,14 @@ import HBS2.Prelude.Plated
import HBS2.Net.Messaging.Stream import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple import HBS2.System.Logger.Simple
import HBS2.Misc.PrettyStuff
import Control.Concurrent.STM (flushTQueue,retry)
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Data.Bits import Data.Bits
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
import Data.List qualified as L import Data.HashSet qualified as HS
import Data.Maybe import Data.Maybe
import Data.Word import Data.Word
import Lens.Micro.Platform import Lens.Micro.Platform
@ -64,12 +62,27 @@ data MessagingTCP =
, _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
, _tcpClientThreadNum :: TVar Int
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
, _tcpProbe :: TVar AnyProbe , _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
} }
makeLenses 'MessagingTCP makeLenses 'MessagingTCP
newClientThread :: forall m . MonadUnliftIO m => MessagingTCP -> m () -> m Int
newClientThread MessagingTCP{..} a = do
as <- async a
atomically do
n <- stateTVar _tcpClientThreadNum ( \x -> (x, succ x))
modifyTVar _tcpClientThreads (HM.insert n as)
pure n
delClientThread :: MonadIO m => MessagingTCP -> Int -> m ()
delClientThread MessagingTCP{..} threadId = atomically $
modifyTVar' _tcpClientThreads (HM.delete threadId)
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m () messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
@ -88,6 +101,8 @@ newMessagingTCP pa = liftIO do
<*> newTQueueIO <*> newTQueueIO
<*> newTBQueueIO outMessageQLen <*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty <*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO (AnyProbe ()) <*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default <*> pure (\_ _ -> none) -- do nothing by default
@ -183,6 +198,14 @@ killCookie = \case
1 -> Nothing 1 -> Nothing
n -> Just (pred n) n -> Just (pred n)
useCookie :: (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
useCookie cookie = atomically do
let MessagingTCP{..} = ?env
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
pure n
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do runMessagingTCP env@MessagingTCP{..} = liftIO do
@ -193,13 +216,26 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
p1 <- ContT $ withAsync runClient p1 <- ContT $ withAsync runClient
p2 <- ContT $ withAsync runServer p2 <- ContT $ withAsync runServer
waitAnyCatchCancel [p1,p2] probes <- ContT $ withAsync $ forever do
-- waitAnyCatchCancel [p2] pause @'Seconds 10
-- waitAnyCatchCancel [p1] p <- readTVarIO _tcpProbe
acceptReport p =<< S.toList_ do
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
S.yield =<< ( readTVarIO _tcpPeerCookie <&> ("tcpPeerCookie",) . fromIntegral . HM.size)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
waitAnyCatchCancel [p1,p2,probes]
where where
runServer :: forall m . MonadIO m => m () readFrames so peer queue = forever $ do
void $ readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs)
runServer = do runServer = do
own <- toPeerAddr $ view tcpOwnPeer env own <- toPeerAddr $ view tcpOwnPeer env
@ -218,17 +254,13 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
<+> brackets (pretty own) <+> brackets (pretty own)
<+> brackets (pretty sa) <+> brackets (pretty sa)
let ?env = env
cookie <- handshake Server env so cookie <- handshake Server env so
when (cookie == myCookie) $ exit () when (cookie == myCookie) $ exit ()
here <- atomically do here <- useCookie cookie
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
pure n
when here $ do when here $ do
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
@ -236,11 +268,6 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
let newP = fromSockAddr @'TCP remote :: Peer L4Proto let newP = fromSockAddr @'TCP remote :: Peer L4Proto
-- FIXME: queue-size-hardcode
let inQLen = outMessageQLen
newInQ <- liftIO $ newTBQueueIO inQLen
newOutQ <- do newOutQ <- do
atomically do atomically do
mbQ <- readTVar _tcpSent <&> HM.lookup newP mbQ <- readTVar _tcpSent <&> HM.lookup newP
@ -264,18 +291,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
sendLazy so frame --(LBS.toStrict frame) sendLazy so frame --(LBS.toStrict frame)
rd <- ContT $ withAsync $ forever do rd <- ContT $ withAsync $ readFrames so newP _tcpReceived
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
-- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote
atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs)
void $ ContT $ bracket none $ const do void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
@ -285,21 +301,32 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically do atomically do
modifyTVar _tcpSent (HM.delete newP) modifyTVar _tcpSent (HM.delete newP)
modifyTVar _tcpPeerCookie $ \m -> do modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
HM.update killCookie cookie m
void $ waitAnyCatchCancel [rd,wr] void $ waitAnyCatchCancel [rd,wr]
runClient :: forall m . MonadIO m => m () runClient = flip runContT pure do
runClient = do
own <- toPeerAddr $ view tcpOwnPeer env own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own let (L4Address _ (IPAddrPort (i,_))) = own
let myCookie = view tcpCookie env let myCookie = view tcpCookie env
pause @'Seconds 10 pause @'Seconds 10
void $ ContT $ bracket none $ const $ do
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
mapM_ cancel what
void $ ContT $ withAsync $ forever do
pause @Seconds 60
done <- readTVarIO _tcpClientThreads <&> HM.toList
>>= filterM ( \(_,x) -> isJust <$> poll x )
<&> HS.fromList . fmap fst
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
forever $ void $ runMaybeT do forever $ void $ runMaybeT do
-- client sockets -- client sockets
@ -313,10 +340,12 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
debug "SHIT? BUSYLOOP?" debug "SHIT? BUSYLOOP?"
mzero mzero
-- FIXME: !!! liftIO $ newClientThread env $ do
liftIO $ async do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do connect (show ip) (show port) $ \(so, remoteAddr) -> do
let ?env = env
flip runContT pure $ callCC \exit -> do flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
@ -327,15 +356,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
debug $ "same peer, exit" <+> pretty remoteAddr debug $ "same peer, exit" <+> pretty remoteAddr
exit () exit ()
here <- atomically do here <- useCookie cookie
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
pure n
-- TODO: handshake notification -- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId liftIO $ _tcpOnClientStarted whoAddr connId
@ -371,20 +392,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ ContT $ bracket none $ const $ do void $ ContT $ bracket none $ const $ do
atomically do atomically do
modifyTVar _tcpPeerConn (HM.delete who) modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie $ \m -> do modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
HM.update killCookie cookie m
forever do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
-- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr
atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs)
readFrames so who _tcpReceived