hbs2/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs

480 lines
16 KiB
Haskell

{-# Language TemplateHaskell #-}
{-# LANGUAGE ImplicitParams #-}
module HBS2.Net.Messaging.TCP
( MessagingTCP
, runMessagingTCP
, newMessagingTCP
, tcpSOCKS5
, tcpOwnPeer
, tcpPeerConn
, tcpCookie
, tcpOnClientStarted
, tcpPeerKick
, messagingTCPSetProbe
) where
import HBS2.Clock
import HBS2.OrDie
import HBS2.Net.IP.Addr
import HBS2.Net.Messaging
import HBS2.Prelude.Plated
import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple
import Control.Monad.Trans.Maybe
import Data.Bits
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.HashPSQ (HashPSQ)
import Data.HashPSQ qualified as HPSQ
import Data.HashSet qualified as HS
import Data.Maybe
import Data.Word
import Lens.Micro.Platform
import Network.ByteOrder hiding (ByteString)
import Network.Simple.TCP
import Network.Socket hiding (listen,connect)
import System.Random hiding (next)
import Control.Monad.Trans.Cont
import Control.Exception
import Control.Concurrent.STM qualified as STM
import UnliftIO (MonadUnliftIO(..))
import UnliftIO.Async
import UnliftIO.STM
import UnliftIO.Exception qualified as U
import Streaming.Prelude qualified as S
{-HLINT ignore "Functor law"-}
-- FIXME: control-recv-capacity-to-avoid-leaks
outMessageQLen :: Natural
outMessageQLen = 1024*32
-- | TCP Messaging environment
data MessagingTCP =
MessagingTCP
{ _tcpSOCKS5 :: Maybe (PeerAddr L4Proto)
, _tcpOwnPeer :: Peer L4Proto
, _tcpCookie :: Word32
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpPeerToCookie :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString))
, _tcpClientThreadNum :: TVar Int
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
, _tcpServerThreadsCount :: TVar Int
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
makeLenses 'MessagingTCP
newClientThread :: forall m . MonadUnliftIO m => MessagingTCP -> m () -> m Int
newClientThread MessagingTCP{..} a = do
as <- async a
atomically do
n <- stateTVar _tcpClientThreadNum ( \x -> (x, succ x))
modifyTVar _tcpClientThreads (HM.insert n as)
pure n
delClientThread :: MonadIO m => MessagingTCP -> Int -> m ()
delClientThread MessagingTCP{..} threadId = atomically $
modifyTVar' _tcpClientThreads (HM.delete threadId)
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
newMessagingTCP :: ( MonadIO m
, FromSockAddr 'TCP (Peer L4Proto)
)
=> PeerAddr L4Proto
-> m MessagingTCP
newMessagingTCP pa = liftIO do
MessagingTCP Nothing
<$> fromPeerAddr pa
<*> randomIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTBQueueIO (10 * outMessageQLen)
<*> newTVarIO HPSQ.empty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO 0
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
instance Messaging MessagingTCP L4Proto ByteString where
sendTo MessagingTCP{..} (To p) (From _f) msg = liftIO do
-- let _own = tcpOwnPeer
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p
now <- getTimeCoarse
queue <- atomically do
q' <- readTVar _tcpSent <&> HPSQ.lookup p
case q' of
Nothing -> do
writeTQueue _tcpConnDemand p
q <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HPSQ.insert p now q)
pure q
Just (_,q) -> pure q
atomically $ writeTBQueueDropSTM 10 queue msg
atomically $ stateTVar _tcpSent (HPSQ.alter (\x -> ((), fmap (set _1 now) x)) p)
-- atomically $ insert
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE"
receive MessagingTCP{..} _ = liftIO do
atomically do
s0 <- readTBQueue _tcpReceived
sx <- flushTBQueue _tcpReceived
pure $ fmap (over _1 From) ( s0 : sx )
connectionId :: Word32 -> Word32 -> Word64
connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
where
low = min a b
hi = max a b
data ConnType = Server | Client
deriving (Eq,Ord,Show,Generic)
sendCookie :: MonadIO m
=> MessagingTCP
-> Socket
-> m ()
sendCookie env so = do
let coo = view tcpCookie env & bytestring32
send so coo
recvCookie :: MonadIO m
=> MessagingTCP
-> Socket
-> m Word32
recvCookie _ so = liftIO do
scoo <- readFromSocket so 4 <&> LBS.toStrict
pure $ word32 scoo
handshake :: MonadIO m
=> ConnType
-> MessagingTCP
-> Socket
-> m Word32
handshake Server env so = do
cookie <- recvCookie env so
sendCookie env so
pure cookie
handshake Client env so = do
sendCookie env so
recvCookie env so
writeTBQueueDropSTM :: Integral n
=> n
-> TBQueue a
-> a
-> STM ()
writeTBQueueDropSTM inQLen newInQ bs = do
flip fix inQLen $ \more j -> do
when (j > 0) do
full <- isFullTBQueue newInQ
if not full then do
writeTBQueue newInQ bs
else do
void $ tryReadTBQueue newInQ
more (pred j)
data TCPMessagingError =
TCPPeerReadTimeout
deriving stock (Show,Typeable)
instance Exception TCPMessagingError
tcpPeerKick :: forall m . MonadIO m => MessagingTCP -> Peer L4Proto -> m ()
tcpPeerKick MessagingTCP{..} p = do
whoever <- readTVarIO _tcpPeerSocket <&> HM.lookup p
for_ whoever $ \so -> do
debug $ "tcpPeerKick" <+> pretty p
liftIO $ shutdown so ShutdownBoth
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do
forever do
p1 <- ContT $ withAsync runClient
p2 <- ContT $ withAsync runServer
probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
p <- readTVarIO _tcpProbe
acceptReport p =<< S.toList_ do
S.yield =<< ( readTVarIO _tcpClientThreads <&> ("tcpClientThreads",) . fromIntegral . HM.size )
S.yield =<< ( readTVarIO _tcpServerThreadsCount <&> ("tcpServerThreadsCount",) . fromIntegral )
S.yield =<< ( readTVarIO _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HM.size)
coo <- readTVarIO _tcpPeerCookie -- <&> ("tcpPeerCookie",)
let cooNn = sum [ 1 | (_,v) <- HM.toList coo, v >= 1 ]
S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo)
S.yield ("tcpPeerCookieUsed", cooNn)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size)
sweepCookies <- ContT $ withAsync $ forever do
pause @'Seconds 300
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
-- alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
-- modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 300
now <- getTimeCoarse
atomically do
w <- readTVar _tcpSent <&> HPSQ.toList
let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 > 1200 ]
writeTVar _tcpSent (HPSQ.fromList live)
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerSocket (HM.filterWithKey (\k _ -> HM.member k pips))
-- modifyTVar _tcpPeerCookie (HM.filter (>=1))
waitAnyCatchCancel [p1,p2,probes,sweep,sweepCookies]
where
withTCPTimeout timeout action = liftIO do
r <- race (pause timeout) action
case r of
Right{} -> pure ()
Left{} -> do
debug "tcp connection timeout!"
throwIO TCPPeerReadTimeout
killCookie :: Int -> Maybe Int
killCookie = \case
n | n <= 1 -> Nothing
n -> Just (pred n)
-- useCookie :: forall m . (?env :: MessagingTCP, MonadIO m) => Word32 -> m Bool
useCookie peer cookie = atomically do
let MessagingTCP{..} = ?env
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerToCookie (HM.insert peer cookie)
pure n
-- FIXME: timeout-hardcode
readFrames so peer queue = forever $ withTCPTimeout (TimeoutSec 67) do
void $ readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
atomically $ writeTBQueueDropSTM outMessageQLen queue (peer, bs)
runServer = do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let myCookie = view tcpCookie env
-- server
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
forever do
void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do
liftIO $ withFdSocket so setCloseOnExecIfNeeded
debug $ "!!! GOT INCOMING CONNECTION FROM !!!"
<+> brackets (pretty remote)
let ?env = env
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
cookie <- handshake Server env so
when (cookie == myCookie) $ exit ()
here <- useCookie newP cookie
when here $ do
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow remote
exit ()
atomically $ modifyTVar _tcpServerThreadsCount succ
now <- getTimeCoarse
newOutQ <- atomically do
q <- readTVar _tcpSent <&> HPSQ.lookup newP >>= \case
Just (_,w) -> pure w
Nothing -> do
nq <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HPSQ.insert newP now nq)
pure nq
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
modifyTVar _tcpPeerSocket (HM.insert newP so)
pure q
wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
rd <- ContT $ withAsync $ readFrames so newP _tcpReceived
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
atomically do
modifyTVar _tcpServerThreadsCount pred
modifyTVar _tcpPeerSocket (HM.delete newP)
modifyTVar _tcpPeerToCookie (HM.delete newP)
shutdown so ShutdownBoth
cancel rd
cancel wr
atomically do
modifyTVar _tcpSent (HPSQ.delete newP)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
void $ waitAnyCatchCancel [rd,wr]
runClient = flip runContT pure do
let myCookie = view tcpCookie env
pause @'Seconds 3.14
void $ ContT $ bracket none $ const $ do
what <- atomically $ stateTVar _tcpClientThreads (\x -> (HM.elems x, mempty))
mapM_ cancel what
void $ ContT $ withAsync $ forever do
pause @Seconds 60
done <- readTVarIO _tcpClientThreads <&> HM.toList
>>= filterM ( \(_,x) -> isJust <$> poll x )
<&> HS.fromList . fmap fst
atomically $ modifyTVar _tcpClientThreads (HM.filterWithKey (\k _ -> not (HS.member k done)))
forever $ void $ runMaybeT do
-- client sockets
who <- atomically $ readTQueue _tcpConnDemand
already <- readTVarIO _tcpPeerConn <&> HM.member who
when already mzero
debug $ "DEMAND:" <+> pretty who
whoAddr <- toPeerAddr who
liftIO $ newClientThread env $ do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do
let ?env = env
flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so
let connId = connectionId cookie myCookie
when (cookie == myCookie) $ do
debug $ "same peer, exit" <+> pretty remoteAddr
exit ()
here <- useCookie who cookie
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
atomically do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
modifyTVar _tcpPeerSocket (HM.insert who so)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> fmap (view _2) . HPSQ.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none $ const $ do
debug "!!! TCP: BRACKET FIRED IN CLIENT !!!"
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
modifyTVar _tcpPeerToCookie (HM.delete who)
modifyTVar _tcpPeerSocket (HM.delete who)
modifyTVar _tcpSent (HPSQ.delete who)
void $ ContT $ bracket none (const $ cancel wr)
readFrames so who _tcpReceived