hbs2/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs

532 lines
16 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{-# Language TemplateHaskell #-}
module HBS2.Net.Messaging.TCP
( MessagingTCP
, runMessagingTCP
, newMessagingTCP
, tcpSOCKS5
, tcpOwnPeer
, tcpPeerConn
, tcpCookie
, tcpOnClientStarted
, messagingTCPSetProbe
) where
import HBS2.Clock
import HBS2.Net.IP.Addr
import HBS2.Net.Messaging
import HBS2.Prelude.Plated
import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple
import HBS2.Misc.PrettyStuff
import Control.Concurrent.STM (flushTQueue)
import Control.Monad
import Data.Bits
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.List qualified as L
import Data.Maybe
import Data.Word
import Lens.Micro.Platform
import Network.ByteOrder hiding (ByteString)
import Network.Simple.TCP
import Network.Socket hiding (listen,connect)
import System.Random hiding (next)
import Control.Monad.Trans.Cont
import Control.Exception
import UnliftIO (MonadUnliftIO(..))
import UnliftIO.Async
import UnliftIO.STM
import UnliftIO.Exception qualified as U
import Streaming.Prelude qualified as S
{-HLINT ignore "Functor law"-}
-- FIXME: control-recv-capacity-to-avoid-leaks
-- | TCP Messaging environment
data MessagingTCP =
MessagingTCP
{ _tcpSOCKS5 :: Maybe (PeerAddr L4Proto)
, _tcpOwnPeer :: Peer L4Proto
, _tcpCookie :: Word32
, _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto))
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpConnUsed :: TVar (HashMap Word64 Int)
, _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString)))
, _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto))
, _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpRecv :: TQueue (Peer L4Proto, ByteString)
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
, _tcpDeferEv :: TQueue ()
, _tcpSpawned :: TVar Int
, _tcpFired :: TVar Int
, _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec)
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
makeLenses 'MessagingTCP
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
newMessagingTCP :: ( MonadIO m
, FromSockAddr 'TCP (Peer L4Proto)
)
=> PeerAddr L4Proto
-> m MessagingTCP
newMessagingTCP pa = liftIO do
MessagingTCP Nothing
<$> fromPeerAddr pa
<*> randomIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
instance Messaging MessagingTCP L4Proto ByteString where
sendTo bus (To p) (From _f) msg = liftIO do
let _own = view tcpOwnPeer bus
co' <- atomically $ readTVar (view tcpPeerConn bus) <&> HashMap.lookup p
-- debug $ "sendTo" <+> brackets (pretty own)
-- <+> pretty p
-- <+> braces (pretty co')
-- <+> pretty (LBS.length msg)
maybe1 co' defer $ \co -> do
-- trace $ "writing to" <+> pretty co
q' <- atomically $ readTVar (view tcpConnQ bus) <&> HashMap.lookup co
maybe1 q' (warn $ "no queue for" <+> pretty co) $ \q -> do
atomically $ writeTQueue q (p, msg)
where
defer = do
warn $ "defer" <+> pretty p
t <- getTimeCoarse
atomically $ modifyTVar (view tcpDefer bus) (HashMap.insertWith (<>) p [(t, msg)])
atomically $ writeTQueue (view tcpDeferEv bus) ()
receive bus _ = liftIO do
let q = view tcpRecv bus
ms <- atomically do
r <- readTQueue q
rs <- flushTQueue q
pure (r:rs)
forM ms $ \(p, msg) -> pure (From p, msg)
connectionId :: Word32 -> Word32 -> Word64
connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
where
low = min a b
hi = max a b
data ConnType = Server | Client
deriving (Eq,Ord,Show,Generic)
sendCookie :: MonadIO m
=> MessagingTCP
-> Socket
-> m ()
sendCookie env so = do
let coo = view tcpCookie env & bytestring32
send so coo
recvCookie :: MonadIO m
=> MessagingTCP
-> Socket
-> m Word32
recvCookie _ so = liftIO do
scoo <- readFromSocket so 4 <&> LBS.toStrict
pure $ word32 scoo
handshake :: MonadIO m
=> ConnType
-> MessagingTCP
-> Socket
-> m Word32
handshake Server env so = do
cookie <- recvCookie env so
sendCookie env so
pure cookie
handshake Client env so = do
sendCookie env so
recvCookie env so
spawnConnection :: forall m . MonadIO m
=> ConnType
-> MessagingTCP
-> Socket
-> SockAddr
-> m ()
spawnConnection tp env@MessagingTCP{..} so sa = liftIO do
flip runContT pure $ do
let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
theirCookie <- handshake tp env so
let connId = connectionId myCookie theirCookie
when (tp == Client && theirCookie /= myCookie) do
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
traceCmd own
( "spawnConnection "
<+> viaShow tp
<+> pretty myCookie
<+> pretty connId )
newP
debug $ "handshake" <+> viaShow tp
<+> brackets (pretty (view tcpOwnPeer env))
<+> pretty sa
<+> pretty theirCookie
<+> pretty connId
used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
void $ ContT $ bracket (pure connId) cleanupConn
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
when (used == 1) do
atomically $ modifyTVar _tcpSpawned succ
q <- getWriteQueue connId
updatePeer connId newP
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
rd <- ContT $ withAsync $ fix \next -> do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
memReqId newP px
pxes <- readTVarIO (view tcpPeerPx env)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
next
wr <- ContT $ withAsync $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
pq <- makeReqId rcpt
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
next
ContT $ bracket none $ \_ -> mapM cancel [rd,wr]
ContT $ bracket (pure connId) cleanupConn
ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred)
void $ waitAnyCatchCancel [rd,wr]
-- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa
where
memReqId newP px =
atomically $ modifyTVar (view tcpPeerXp env) (HashMap.insert newP px)
makeReqId rcpt = do
let pxes = view tcpPeerPx env
let xpes = view tcpPeerXp env
nq <- randomIO
atomically $ do
px <- readTVar xpes <&> HashMap.lookup rcpt
case px of
Just qq -> pure qq
Nothing -> do
modifyTVar pxes (HashMap.insert nq rcpt)
modifyTVar xpes (HashMap.insert rcpt nq)
pure nq
updatePeer connId newP = atomically $ do
modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
modifyTVar (view tcpConnPeer env) (HashMap.insert connId newP)
getWriteQueue connId = atomically $ do
readTVar (view tcpConnQ env) >>= \x -> do
case HashMap.lookup connId x of
Just qq -> pure qq
Nothing -> do
newQ <- newTQueue
modifyTVar (view tcpConnQ env) (HashMap.insert connId newQ)
pure newQ
cleanupConn connId = atomically do
modifyTVar (view tcpConnUsed env) (HashMap.alter del connId)
used <- readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
when (used == 0) do
p <- stateTVar (view tcpConnPeer env)
$ \x -> (HashMap.lookup connId x, HashMap.delete connId x)
maybe1 p none $ \pp ->
modifyTVar (view tcpPeerConn env) (HashMap.delete pp)
modifyTVar (view tcpConnQ env) (HashMap.delete connId)
where
del = \case
Nothing -> Nothing
Just n | n <= 1 -> Nothing
| otherwise -> Just (pred n)
connectPeerTCP :: MonadIO m
=> MessagingTCP
-> Peer L4Proto
-> m ()
connectPeerTCP env peer = liftIO do
pa <- toPeerAddr peer
let (L4Address _ (IPAddrPort (i,p))) = pa
here <- readTVarIO (view tcpPeerConn env) <&> HashMap.member peer
unless here do
case view tcpSOCKS5 env of
Nothing -> do
connect (show i) (show p) $ \(sock, remoteAddr) -> do
spawnConnection Client env sock remoteAddr
shutdown sock ShutdownBoth
Just socks5 -> do
let (L4Address _ (IPAddrPort (socks,socksp))) = socks5
connectSOCKS5 (show socks) (show socksp) (show i) (show p) $ \(sock, socksAddr, _) -> do
let (PeerL4{..}) = peer
debug $ "CONNECTED VIA SOCKS5" <+> pretty socksAddr <+> pretty pa
spawnConnection Client env sock _sockAddr
shutdown sock ShutdownBoth
-- FIXME: link-all-asyncs
fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m ()
fireTCP MessagingTCP{..} pip what = do
void $ do
pa <- toPeerAddr @e pip
now <- getTimeCoarse
fire <- atomically do
here <- readTVar _tcpConnWip <&> HashMap.member pip
unless here do
modifyTVar _tcpConnWip (HashMap.insert pip now)
pure (not here)
when fire do
debug $ "Fire TCP" <+> pretty pa
atomically (modifyTVar _tcpFired succ)
void $ async do
r <- U.try @_ @SomeException what
atomically (modifyTVar _tcpFired pred)
either U.throwIO dontHandle r
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let defs = view tcpDefer env
mon <- ContT $ withAsync $ forever do
pause @'Seconds 30
now <- getTimeCoarse
-- FIXME: time-hardcode-again
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
atomically $ modifyTVar defs
$ HashMap.mapMaybe
$ \es -> let rs = expire es
in case rs of
[] -> Nothing
xs -> Just xs
con <- ContT $ withAsync $ forever do
let ev = view tcpDeferEv env
-- FIXME: wait-period-hardcode
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
dePips <- readTVarIO defs <&> HashMap.keys
forM_ dePips $ \pip -> do
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
unless (L.null msgs) do
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
let len = length msgs
when (len > 10) do
-- FIXME: deferred-message-hardcoded
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
when (isNothing co') do
debug $ red "No session for" <+> pretty pip
pure ()
maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
maybe1 q' none $ \q -> do
atomically do
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
modifyTVar defs $ HashMap.delete pip
forM_ mss $ \m -> writeTQueue q (pip, snd m)
pure ()
stat <- ContT $ withAsync $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
forM_ peers $ \(c,pip) -> do
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
trace $ "peer" <+> brackets (pretty own)
<+> pretty pip
<+> pretty c
<+> parens ("used:" <+> pretty used)
cleanup <- ContT $ withAsync $ forever do
pause @Seconds 60
now <- getTimeCoarse
connWip <- readTVarIO _tcpConnWip <&> HashMap.toList
let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ]
atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive)
mapM_ link [mon,con,stat,cleanup]
probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _tcpProbe
acceptReport probe =<< S.toList_ do
S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size)
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup]
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
forever do
void $ acceptFork sock $ \(so, remote) -> do
withFdSocket so setCloseOnExecIfNeeded
trace $ "GOT INCOMING CONNECTION FROM"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> pretty remote
void $ try @SomeException $ do
spawnConnection Server env so remote
-- gracefulClose so 1000
-- TODO: probably-cleanup-peer
-- TODO: periodically-drop-inactive-connections
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat]
traceCmd :: forall a ann b m . ( Pretty a
, Pretty b
, MonadIO m
)
=> a -> Doc ann -> b -> m ()
traceCmd p1 s p2 = do
trace $ brackets (pretty p1)
<+> s
<+> parens (pretty p2)