tcp rewritten

This commit is contained in:
voidlizard 2024-11-02 13:04:21 +03:00
parent b2ce060650
commit 6c51498064
2 changed files with 210 additions and 358 deletions

View File

@ -10,6 +10,6 @@ constraints:
, http-client >=0.7.16 && <0.8
-- executable-static: True
-- profiling: True
profiling: True
--library-profiling: False

View File

@ -21,14 +21,14 @@ import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple
import HBS2.Misc.PrettyStuff
import Control.Concurrent.STM (flushTQueue)
import Control.Concurrent.STM (flushTQueue,retry)
import Control.Monad.Trans.Maybe
import Data.Bits
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict qualified as HM
import Data.List qualified as L
import Data.Maybe
import Data.Word
@ -50,24 +50,20 @@ import Streaming.Prelude qualified as S
-- FIXME: control-recv-capacity-to-avoid-leaks
outMessageQLen :: Natural
outMessageQLen = 256
-- | TCP Messaging environment
data MessagingTCP =
MessagingTCP
{ _tcpSOCKS5 :: Maybe (PeerAddr L4Proto)
, _tcpOwnPeer :: Peer L4Proto
, _tcpCookie :: Word32
, _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto))
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpConnUsed :: TVar (HashMap Word64 Int)
, _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString)))
, _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto))
, _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpRecv :: TQueue (Peer L4Proto, ByteString)
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
, _tcpDeferEv :: TQueue ()
, _tcpSpawned :: TVar Int
, _tcpFired :: TVar Int
, _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec)
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
@ -89,55 +85,39 @@ newMessagingTCP pa = liftIO do
<*> randomIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
instance Messaging MessagingTCP L4Proto ByteString where
sendTo bus (To p) (From _f) msg = liftIO do
let _own = view tcpOwnPeer bus
sendTo MessagingTCP{..} (To p) (From _f) msg = liftIO do
-- let _own = tcpOwnPeer
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p
co' <- atomically $ readTVar (view tcpPeerConn bus) <&> HashMap.lookup p
queue <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup p
-- debug $ "sendTo" <+> brackets (pretty own)
-- <+> pretty p
-- <+> braces (pretty co')
-- <+> pretty (LBS.length msg)
case q' of
Nothing -> do
writeTQueue _tcpConnDemand p
q <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HM.insert p q)
pure q
maybe1 co' defer $ \co -> do
-- trace $ "writing to" <+> pretty co
q' <- atomically $ readTVar (view tcpConnQ bus) <&> HashMap.lookup co
maybe1 q' (warn $ "no queue for" <+> pretty co) $ \q -> do
atomically $ writeTQueue q (p, msg)
Just q -> pure q
where
defer = do
warn $ "defer" <+> pretty p
t <- getTimeCoarse
atomically $ modifyTVar (view tcpDefer bus) (HashMap.insertWith (<>) p [(t, msg)])
atomically $ writeTQueue (view tcpDeferEv bus) ()
receive bus _ = liftIO do
let q = view tcpRecv bus
ms <- atomically do
r <- readTQueue q
rs <- flushTQueue q
pure (r:rs)
forM ms $ \(p, msg) -> pure (From p, msg)
atomically $ writeTBQueueDropSTM 10 queue msg
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE"
receive MessagingTCP{..} _ = liftIO do
atomically do
s0 <- readTBQueue _tcpReceived
sx <- flushTBQueue _tcpReceived
pure $ fmap (over _1 From) ( s0 : sx )
connectionId :: Word32 -> Word32 -> Word64
connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
@ -145,7 +125,6 @@ connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
low = min a b
hi = max a b
data ConnType = Server | Client
deriving (Eq,Ord,Show,Generic)
@ -183,352 +162,225 @@ handshake Client env so = do
sendCookie env so
recvCookie env so
spawnConnection :: forall m . MonadIO m
=> ConnType
-> MessagingTCP
-> Socket
-> SockAddr
-> m ()
spawnConnection tp env@MessagingTCP{..} so sa = liftIO do
flip runContT pure $ do
let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
theirCookie <- handshake tp env so
let connId = connectionId myCookie theirCookie
when (tp == Client && theirCookie /= myCookie) do
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
traceCmd own
( "spawnConnection "
<+> viaShow tp
<+> pretty myCookie
<+> pretty connId )
newP
debug $ "handshake" <+> viaShow tp
<+> brackets (pretty (view tcpOwnPeer env))
<+> pretty sa
<+> pretty theirCookie
<+> pretty connId
used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
writeTBQueueDropSTM :: Integral n
=> n
-> TBQueue a
-> a
-> STM ()
writeTBQueueDropSTM inQLen newInQ bs = do
flip fix inQLen $ \more j -> do
when (j > 0) do
full <- isFullTBQueue newInQ
if not full then do
writeTBQueue newInQ bs
else do
void $ tryReadTBQueue newInQ
more (pred j)
void $ ContT $ bracket (pure connId) cleanupConn
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
-- when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
when (used == 1) do
atomically $ modifyTVar _tcpSpawned succ
q <- getWriteQueue connId
updatePeer connId newP
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
rd <- ContT $ withAsync $ fix \next -> do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
memReqId newP px
pxes <- readTVarIO (view tcpPeerPx env)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
next
wr <- ContT $ withAsync $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
pq <- makeReqId rcpt
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
next
ContT $ bracket none $ \_ -> mapM cancel [rd,wr]
ContT $ bracket (pure connId) cleanupConn
ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred)
void $ waitAnyCatchCancel [rd,wr]
-- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa
where
memReqId newP px =
atomically $ modifyTVar (view tcpPeerXp env) (HashMap.insert newP px)
makeReqId rcpt = do
let pxes = view tcpPeerPx env
let xpes = view tcpPeerXp env
nq <- randomIO
atomically $ do
px <- readTVar xpes <&> HashMap.lookup rcpt
case px of
Just qq -> pure qq
Nothing -> do
modifyTVar pxes (HashMap.insert nq rcpt)
modifyTVar xpes (HashMap.insert rcpt nq)
pure nq
updatePeer connId newP = atomically $ do
modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
modifyTVar (view tcpConnPeer env) (HashMap.insert connId newP)
getWriteQueue connId = atomically $ do
readTVar (view tcpConnQ env) >>= \x -> do
case HashMap.lookup connId x of
Just qq -> pure qq
Nothing -> do
newQ <- newTQueue
modifyTVar (view tcpConnQ env) (HashMap.insert connId newQ)
pure newQ
cleanupConn connId = atomically do
modifyTVar (view tcpConnUsed env) (HashMap.alter del connId)
used <- readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
when (used == 0) do
p <- stateTVar (view tcpConnPeer env)
$ \x -> (HashMap.lookup connId x, HashMap.delete connId x)
maybe1 p none $ \pp ->
modifyTVar (view tcpPeerConn env) (HashMap.delete pp)
modifyTVar (view tcpConnQ env) (HashMap.delete connId)
where
del = \case
Nothing -> Nothing
Just n | n <= 1 -> Nothing
| otherwise -> Just (pred n)
connectPeerTCP :: MonadIO m
=> MessagingTCP
-> Peer L4Proto
-> m ()
connectPeerTCP env peer = liftIO do
pa <- toPeerAddr peer
let (L4Address _ (IPAddrPort (i,p))) = pa
here <- readTVarIO (view tcpPeerConn env) <&> HashMap.member peer
unless here do
case view tcpSOCKS5 env of
Nothing -> do
connect (show i) (show p) $ \(sock, remoteAddr) -> do
spawnConnection Client env sock remoteAddr
shutdown sock ShutdownBoth
Just socks5 -> do
let (L4Address _ (IPAddrPort (socks,socksp))) = socks5
connectSOCKS5 (show socks) (show socksp) (show i) (show p) $ \(sock, socksAddr, _) -> do
let (PeerL4{..}) = peer
debug $ "CONNECTED VIA SOCKS5" <+> pretty socksAddr <+> pretty pa
spawnConnection Client env sock _sockAddr
shutdown sock ShutdownBoth
-- FIXME: link-all-asyncs
fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m ()
fireTCP MessagingTCP{..} pip what = do
void $ do
pa <- toPeerAddr @e pip
now <- getTimeCoarse
fire <- atomically do
here <- readTVar _tcpConnWip <&> HashMap.member pip
unless here do
modifyTVar _tcpConnWip (HashMap.insert pip now)
pure (not here)
when fire do
debug $ "Fire TCP" <+> pretty pa
atomically (modifyTVar _tcpFired succ)
void $ async do
r <- U.try @_ @SomeException what
atomically (modifyTVar _tcpFired pred)
either U.throwIO dontHandle r
killCookie :: Int -> Maybe Int
killCookie = \case
1 -> Nothing
n -> Just (pred n)
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
p1 <- ContT $ withAsync runClient
p2 <- ContT $ withAsync runServer
let defs = view tcpDefer env
waitAnyCatchCancel [p1,p2]
-- waitAnyCatchCancel [p2]
-- waitAnyCatchCancel [p1]
mon <- ContT $ withAsync $ forever do
pause @'Seconds 30
now <- getTimeCoarse
where
-- FIXME: time-hardcode-again
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
atomically $ modifyTVar defs
$ HashMap.mapMaybe
$ \es -> let rs = expire es
in case rs of
[] -> Nothing
xs -> Just xs
runServer :: forall m . MonadIO m => m ()
runServer = do
con <- ContT $ withAsync $ forever do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let myCookie = view tcpCookie env
let ev = view tcpDeferEv env
-- server
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
-- FIXME: wait-period-hardcode
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
forever do
void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do
liftIO $ withFdSocket so setCloseOnExecIfNeeded
debug $ "!!! GOT INCOMING CONNECTION FROM !!!"
<+> brackets (pretty own)
<+> brackets (pretty sa)
dePips <- readTVarIO defs <&> HashMap.keys
cookie <- handshake Server env so
when (cookie == myCookie) $ exit ()
forM_ dePips $ \pip -> void $ runMaybeT do
here <- atomically do
n <- readTVar _tcpPeerCookie <&> HM.member cookie
-- FIXME: make-sure-it-is-correct
already <- readTVarIO _tcpPeerXp <&> HashMap.member pip
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
guard (not already)
pure n
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
when here $ do
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
exit ()
unless (L.null msgs) do
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
let len = length msgs
-- FIXME: queue-size-hardcode
let inQLen = outMessageQLen
when (len > 10) do
-- FIXME: deferred-message-hardcoded
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
newInQ <- liftIO $ newTBQueueIO inQLen
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
newOutQ <- do
atomically do
mbQ <- readTVar _tcpSent <&> HM.lookup newP
maybe (newTBQueue outMessageQLen) pure mbQ
when (isNothing co') do
trace $ red "No session for" <+> pretty pip
lift $ maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
maybe1 q' none $ \q -> do
atomically do
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
modifyTVar defs $ HashMap.delete pip
forM_ mss $ \m -> writeTQueue q (pip, snd m)
modifyTVar _tcpSent (HM.insert newP newOutQ)
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
stat <- ContT $ withAsync $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
forM_ peers $ \(c,pip) -> do
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
trace $ "peer" <+> brackets (pretty own)
<+> pretty pip
<+> pretty c
<+> parens ("used:" <+> pretty used)
wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ
cleanup <- ContT $ withAsync $ forever do
pause @Seconds 60
now <- getTimeCoarse
connWip <- readTVarIO _tcpConnWip <&> HashMap.toList
let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ]
atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive)
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
mapM_ link [mon,con,stat,cleanup]
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _tcpProbe
acceptReport probe =<< S.toList_ do
S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size)
sendLazy so frame --(LBS.toStrict frame)
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup]
rd <- ContT $ withAsync $ forever do
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
forever do
void $ acceptFork sock $ \(so, remote) -> do
withFdSocket so setCloseOnExecIfNeeded
trace $ "GOT INCOMING CONNECTION FROM"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> pretty remote
bs <- readFromSocket so size
void $ try @SomeException $ do
-- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote
spawnConnection Server env so remote
atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs)
-- gracefulClose so 1000
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
cancel rd
cancel wr
shutdown so ShutdownBoth
-- TODO: probably-cleanup-peer
-- TODO: periodically-drop-inactive-connections
atomically do
modifyTVar _tcpSent (HM.delete newP)
modifyTVar _tcpPeerCookie $ \m -> do
HM.update killCookie cookie m
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat]
void $ waitAnyCatchCancel [rd,wr]
traceCmd :: forall a ann b m . ( Pretty a
, Pretty b
, MonadIO m
)
=> a -> Doc ann -> b -> m ()
runClient :: forall m . MonadIO m => m ()
runClient = do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let myCookie = view tcpCookie env
pause @'Seconds 30
forever $ void $ runMaybeT do
-- client sockets
-- смотрим к кому надо
who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
when already do
debug "SHIT? BUSYLOOP?"
mzero
-- FIXME: !!!
liftIO $ asyncLinked do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do
flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so
let connId = connectionId cookie myCookie
when (cookie == myCookie) $ exit ()
here <- atomically do
n <- readTVar _tcpPeerCookie <&> HM.member cookie
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
modifyTVar _tcpPeerConn (HM.insert who connId)
pure n
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
atomically $ modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none (const $ cancel wr)
void $ ContT $ bracket none $ const $ do
atomically do
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie $ \m -> do
HM.update killCookie cookie m
forever do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
-- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr
atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs)
traceCmd p1 s p2 = do
trace $ brackets (pretty p1)
<+> s
<+> parens (pretty p2)