tcp rewritten

This commit is contained in:
voidlizard 2024-11-02 13:04:21 +03:00
parent ef6b7379c3
commit a2955197a3
2 changed files with 210 additions and 358 deletions

View File

@ -10,6 +10,6 @@ constraints:
, http-client >=0.7.16 && <0.8
-- executable-static: True
-- profiling: True
profiling: True
--library-profiling: False

View File

@ -21,14 +21,14 @@ import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple
import HBS2.Misc.PrettyStuff
import Control.Concurrent.STM (flushTQueue)
import Control.Concurrent.STM (flushTQueue,retry)
import Control.Monad.Trans.Maybe
import Data.Bits
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Function
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict qualified as HM
import Data.List qualified as L
import Data.Maybe
import Data.Word
@ -50,24 +50,20 @@ import Streaming.Prelude qualified as S
-- FIXME: control-recv-capacity-to-avoid-leaks
outMessageQLen :: Natural
outMessageQLen = 256
-- | TCP Messaging environment
data MessagingTCP =
MessagingTCP
{ _tcpSOCKS5 :: Maybe (PeerAddr L4Proto)
, _tcpOwnPeer :: Peer L4Proto
, _tcpCookie :: Word32
, _tcpConnPeer :: TVar (HashMap Word64 (Peer L4Proto))
, _tcpPeerConn :: TVar (HashMap (Peer L4Proto) Word64)
, _tcpConnUsed :: TVar (HashMap Word64 Int)
, _tcpConnQ :: TVar (HashMap Word64 (TQueue (Peer L4Proto, ByteString)))
, _tcpPeerPx :: TVar (HashMap Word32 (Peer L4Proto))
, _tcpPeerXp :: TVar (HashMap (Peer L4Proto) Word32)
, _tcpRecv :: TQueue (Peer L4Proto, ByteString)
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
, _tcpDeferEv :: TQueue ()
, _tcpSpawned :: TVar Int
, _tcpFired :: TVar Int
, _tcpConnWip :: TVar (HashMap (Peer L4Proto) TimeSpec)
, _tcpPeerCookie :: TVar (HashMap Word32 Int)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
, _tcpProbe :: TVar AnyProbe
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
}
@ -89,55 +85,39 @@ newMessagingTCP pa = liftIO do
<*> randomIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTBQueueIO outMessageQLen
<*> newTVarIO mempty
<*> newTVarIO (AnyProbe ())
<*> pure (\_ _ -> none) -- do nothing by default
instance Messaging MessagingTCP L4Proto ByteString where
sendTo bus (To p) (From _f) msg = liftIO do
let _own = view tcpOwnPeer bus
sendTo MessagingTCP{..} (To p) (From _f) msg = liftIO do
-- let _own = tcpOwnPeer
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p
co' <- atomically $ readTVar (view tcpPeerConn bus) <&> HashMap.lookup p
queue <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup p
-- debug $ "sendTo" <+> brackets (pretty own)
-- <+> pretty p
-- <+> braces (pretty co')
-- <+> pretty (LBS.length msg)
case q' of
Nothing -> do
writeTQueue _tcpConnDemand p
q <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HM.insert p q)
pure q
maybe1 co' defer $ \co -> do
-- trace $ "writing to" <+> pretty co
q' <- atomically $ readTVar (view tcpConnQ bus) <&> HashMap.lookup co
maybe1 q' (warn $ "no queue for" <+> pretty co) $ \q -> do
atomically $ writeTQueue q (p, msg)
Just q -> pure q
where
defer = do
warn $ "defer" <+> pretty p
t <- getTimeCoarse
atomically $ modifyTVar (view tcpDefer bus) (HashMap.insertWith (<>) p [(t, msg)])
atomically $ writeTQueue (view tcpDeferEv bus) ()
receive bus _ = liftIO do
let q = view tcpRecv bus
ms <- atomically do
r <- readTQueue q
rs <- flushTQueue q
pure (r:rs)
forM ms $ \(p, msg) -> pure (From p, msg)
atomically $ writeTBQueueDropSTM 10 queue msg
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE"
receive MessagingTCP{..} _ = liftIO do
atomically do
s0 <- readTBQueue _tcpReceived
sx <- flushTBQueue _tcpReceived
pure $ fmap (over _1 From) ( s0 : sx )
connectionId :: Word32 -> Word32 -> Word64
connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
@ -145,7 +125,6 @@ connectionId a b = (fromIntegral hi `shiftL` 32) .|. fromIntegral low
low = min a b
hi = max a b
data ConnType = Server | Client
deriving (Eq,Ord,Show,Generic)
@ -183,92 +162,97 @@ handshake Client env so = do
sendCookie env so
recvCookie env so
spawnConnection :: forall m . MonadIO m
=> ConnType
-> MessagingTCP
-> Socket
-> SockAddr
-> m ()
writeTBQueueDropSTM :: Integral n
=> n
-> TBQueue a
-> a
-> STM ()
writeTBQueueDropSTM inQLen newInQ bs = do
flip fix inQLen $ \more j -> do
when (j > 0) do
full <- isFullTBQueue newInQ
if not full then do
writeTBQueue newInQ bs
else do
void $ tryReadTBQueue newInQ
more (pred j)
spawnConnection tp env@MessagingTCP{..} so sa = liftIO do
flip runContT pure $ do
killCookie :: Int -> Maybe Int
killCookie = \case
1 -> Nothing
n -> Just (pred n)
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do
p1 <- ContT $ withAsync runClient
p2 <- ContT $ withAsync runServer
waitAnyCatchCancel [p1,p2]
-- waitAnyCatchCancel [p2]
-- waitAnyCatchCancel [p1]
where
runServer :: forall m . MonadIO m => m ()
runServer = do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let myCookie = view tcpCookie env
let own = view tcpOwnPeer env
let newP = fromSockAddr @'TCP sa
theirCookie <- handshake tp env so
-- server
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
let connId = connectionId myCookie theirCookie
forever do
void $ acceptFork sock $ \(so, remote) -> void $ flip runContT pure $ callCC \exit -> do
liftIO $ withFdSocket so setCloseOnExecIfNeeded
debug $ "!!! GOT INCOMING CONNECTION FROM !!!"
<+> brackets (pretty own)
<+> brackets (pretty sa)
when (tp == Client && theirCookie /= myCookie) do
pa <- toPeerAddr newP
liftIO $ view tcpOnClientStarted env pa connId -- notify if we opened client tcp connection
cookie <- handshake Server env so
traceCmd own
( "spawnConnection "
<+> viaShow tp
<+> pretty myCookie
<+> pretty connId )
newP
when (cookie == myCookie) $ exit ()
debug $ "handshake" <+> viaShow tp
<+> brackets (pretty (view tcpOwnPeer env))
<+> pretty sa
<+> pretty theirCookie
<+> pretty connId
here <- atomically do
n <- readTVar _tcpPeerCookie <&> HM.member cookie
used <- atomically $ do
modifyTVar (view tcpConnUsed env) (HashMap.insertWith (+) connId 1)
readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
pure n
void $ ContT $ bracket (pure connId) cleanupConn
when here $ do
debug $ "SERVER : ALREADY CONNECTED" <+> pretty cookie <+> viaShow so
exit ()
debug $ "USED:" <+> viaShow tp <+> pretty own <+> pretty used
let newP = fromSockAddr @'TCP remote :: Peer L4Proto
-- when ( used <= 2 ) do
atomically $ modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
-- FIXME: queue-size-hardcode
let inQLen = outMessageQLen
when (used == 1) do
newInQ <- liftIO $ newTBQueueIO inQLen
atomically $ modifyTVar _tcpSpawned succ
newOutQ <- do
atomically do
mbQ <- readTVar _tcpSent <&> HM.lookup newP
maybe (newTBQueue outMessageQLen) pure mbQ
q <- getWriteQueue connId
updatePeer connId newP
atomically do
modifyTVar _tcpSent (HM.insert newP newOutQ)
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
debug $ "NEW PEER" <+> brackets (pretty own)
<+> pretty connId
<+> pretty newP
<+> parens ("used:" <+> pretty used)
wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ
rd <- ContT $ withAsync $ fix \next -> do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
memReqId newP px
pxes <- readTVarIO (view tcpPeerPx env)
let orig = fromMaybe (fromSockAddr @'TCP sa) (HashMap.lookup px pxes)
-- debug $ "RECEIVED" <+> pretty orig <+> pretty (LBS.length bs)
atomically $ writeTQueue (view tcpRecv env) (orig, bs)
next
wr <- ContT $ withAsync $ fix \next -> do
(rcpt, bs) <- atomically $ readTQueue q
pq <- makeReqId rcpt
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
@ -277,258 +261,126 @@ spawnConnection tp env@MessagingTCP{..} so sa = liftIO do
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
next
ContT $ bracket none $ \_ -> mapM cancel [rd,wr]
ContT $ bracket (pure connId) cleanupConn
ContT $ bracket none (const $ atomically $ modifyTVar _tcpSpawned pred)
rd <- ContT $ withAsync $ forever do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
bs <- readFromSocket so size
-- debug $ "READ SHIT FROM SOCKET!" <+> pretty remote
atomically $ writeTBQueueDropSTM outMessageQLen _tcpReceived (newP, bs)
void $ ContT $ bracket none $ const do
debug $ "SHUTDOWN SOCKET AND SHIT" <+> pretty remote
cancel rd
cancel wr
shutdown so ShutdownBoth
atomically do
modifyTVar _tcpSent (HM.delete newP)
modifyTVar _tcpPeerCookie $ \m -> do
HM.update killCookie cookie m
void $ waitAnyCatchCancel [rd,wr]
-- gracefulClose so 1000
debug $ "spawnConnection exit" <+> pretty sa
where
memReqId newP px =
atomically $ modifyTVar (view tcpPeerXp env) (HashMap.insert newP px)
makeReqId rcpt = do
let pxes = view tcpPeerPx env
let xpes = view tcpPeerXp env
nq <- randomIO
atomically $ do
px <- readTVar xpes <&> HashMap.lookup rcpt
case px of
Just qq -> pure qq
Nothing -> do
modifyTVar pxes (HashMap.insert nq rcpt)
modifyTVar xpes (HashMap.insert rcpt nq)
pure nq
updatePeer connId newP = atomically $ do
modifyTVar (view tcpPeerConn env) (HashMap.insert newP connId)
modifyTVar (view tcpConnPeer env) (HashMap.insert connId newP)
getWriteQueue connId = atomically $ do
readTVar (view tcpConnQ env) >>= \x -> do
case HashMap.lookup connId x of
Just qq -> pure qq
Nothing -> do
newQ <- newTQueue
modifyTVar (view tcpConnQ env) (HashMap.insert connId newQ)
pure newQ
cleanupConn connId = atomically do
modifyTVar (view tcpConnUsed env) (HashMap.alter del connId)
used <- readTVar (view tcpConnUsed env) <&> HashMap.findWithDefault 0 connId
when (used == 0) do
p <- stateTVar (view tcpConnPeer env)
$ \x -> (HashMap.lookup connId x, HashMap.delete connId x)
maybe1 p none $ \pp ->
modifyTVar (view tcpPeerConn env) (HashMap.delete pp)
modifyTVar (view tcpConnQ env) (HashMap.delete connId)
where
del = \case
Nothing -> Nothing
Just n | n <= 1 -> Nothing
| otherwise -> Just (pred n)
connectPeerTCP :: MonadIO m
=> MessagingTCP
-> Peer L4Proto
-> m ()
connectPeerTCP env peer = liftIO do
pa <- toPeerAddr peer
let (L4Address _ (IPAddrPort (i,p))) = pa
here <- readTVarIO (view tcpPeerConn env) <&> HashMap.member peer
unless here do
case view tcpSOCKS5 env of
Nothing -> do
connect (show i) (show p) $ \(sock, remoteAddr) -> do
spawnConnection Client env sock remoteAddr
shutdown sock ShutdownBoth
Just socks5 -> do
let (L4Address _ (IPAddrPort (socks,socksp))) = socks5
connectSOCKS5 (show socks) (show socksp) (show i) (show p) $ \(sock, socksAddr, _) -> do
let (PeerL4{..}) = peer
debug $ "CONNECTED VIA SOCKS5" <+> pretty socksAddr <+> pretty pa
spawnConnection Client env sock _sockAddr
shutdown sock ShutdownBoth
-- FIXME: link-all-asyncs
fireTCP :: forall e m . (e ~ L4Proto, Pretty (PeerAddr e), IsPeerAddr e m, MonadUnliftIO m) => MessagingTCP -> Peer e -> m () -> m ()
fireTCP MessagingTCP{..} pip what = do
void $ do
pa <- toPeerAddr @e pip
now <- getTimeCoarse
fire <- atomically do
here <- readTVar _tcpConnWip <&> HashMap.member pip
unless here do
modifyTVar _tcpConnWip (HashMap.insert pip now)
pure (not here)
when fire do
debug $ "Fire TCP" <+> pretty pa
atomically (modifyTVar _tcpFired succ)
void $ async do
r <- U.try @_ @SomeException what
atomically (modifyTVar _tcpFired pred)
either U.throwIO dontHandle r
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
runMessagingTCP env@MessagingTCP{..} = liftIO do
void $ flip runContT pure do
runClient :: forall m . MonadIO m => m ()
runClient = do
own <- toPeerAddr $ view tcpOwnPeer env
let (L4Address _ (IPAddrPort (i,p))) = own
let myCookie = view tcpCookie env
let defs = view tcpDefer env
mon <- ContT $ withAsync $ forever do
pause @'Seconds 30
now <- getTimeCoarse
-- FIXME: time-hardcode-again
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
atomically $ modifyTVar defs
$ HashMap.mapMaybe
$ \es -> let rs = expire es
in case rs of
[] -> Nothing
xs -> Just xs
forever $ void $ runMaybeT do
-- client sockets
con <- ContT $ withAsync $ forever do
-- смотрим к кому надо
who <- atomically $ readTQueue _tcpConnDemand
whoAddr <- toPeerAddr who
let ev = view tcpDeferEv env
already <- atomically $ readTVar _tcpPeerConn <&> HM.member who
-- FIXME: wait-period-hardcode
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
when already do
debug "SHIT? BUSYLOOP?"
mzero
dePips <- readTVarIO defs <&> HashMap.keys
-- FIXME: !!!
liftIO $ asyncLinked do
let (L4Address _ (IPAddrPort (ip,port))) = whoAddr
connect (show ip) (show port) $ \(so, remoteAddr) -> do
flip runContT pure $ callCC \exit -> do
debug $ "OPEN CLIENT CONNECTION" <+> pretty ip <+> pretty port <+> pretty remoteAddr
cookie <- handshake Client env so
let connId = connectionId cookie myCookie
forM_ dePips $ \pip -> void $ runMaybeT do
when (cookie == myCookie) $ exit ()
-- FIXME: make-sure-it-is-correct
already <- readTVarIO _tcpPeerXp <&> HashMap.member pip
here <- atomically do
n <- readTVar _tcpPeerCookie <&> HM.member cookie
guard (not already)
unless n do
modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
modifyTVar _tcpPeerConn (HM.insert who connId)
unless (L.null msgs) do
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
pure n
let len = length msgs
-- TODO: handshake notification
liftIO $ _tcpOnClientStarted whoAddr connId
when (len > 10) do
-- FIXME: deferred-message-hardcoded
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
when here do
debug $ "CLIENT: ALREADY CONNECTED" <+> pretty cookie <+> pretty ip <+> pretty port
exit ()
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
atomically $ modifyTVar _tcpPeerCookie (HM.insertWith (+) cookie 1)
when (isNothing co') do
trace $ red "No session for" <+> pretty pip
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
pure (s:sx)
lift $ maybe1 co' (void $ fireTCP env pip (connectPeerTCP env pip)) $ \co -> do
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
maybe1 q' none $ \q -> do
for_ bss $ \bs -> do
-- FIXME: check-this!
let pq = myCookie -- randomIO
let qids = bytestring32 pq
let size = bytestring32 (fromIntegral $ LBS.length bs)
let frame = LBS.fromStrict qids
<> LBS.fromStrict size -- req-size
<> bs -- payload
sendLazy so frame --(LBS.toStrict frame)
void $ ContT $ bracket none (const $ cancel wr)
void $ ContT $ bracket none $ const $ do
atomically do
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
modifyTVar defs $ HashMap.delete pip
forM_ mss $ \m -> writeTQueue q (pip, snd m)
stat <- ContT $ withAsync $ forever do
pause @'Seconds 120
ps <- readTVarIO $ view tcpConnPeer env
let peers = HashMap.toList ps
forM_ peers $ \(c,pip) -> do
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
trace $ "peer" <+> brackets (pretty own)
<+> pretty pip
<+> pretty c
<+> parens ("used:" <+> pretty used)
cleanup <- ContT $ withAsync $ forever do
pause @Seconds 60
now <- getTimeCoarse
connWip <- readTVarIO _tcpConnWip <&> HashMap.toList
let connAlive = [ (k,v) | (k,v) <- connWip, not (expired (TimeoutSec 60) (now - v)) ]
atomically $ writeTVar _tcpConnWip (HashMap.fromList connAlive)
mapM_ link [mon,con,stat,cleanup]
probes <- ContT $ withAsync $ forever do
pause @'Seconds 10
probe <- readTVarIO _tcpProbe
acceptReport probe =<< S.toList_ do
S.yield =<< atomically (readTVar _tcpConnPeer <&> ("tcpConnPeer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerConn <&> ("tcpPeerConn",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnUsed <&> ("tcpConnUsed",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpConnQ <&> ("tcpConnQ",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerPx <&> ("tcpPeerPx",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpPeerXp <&> ("tcpPeerXp",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpDefer <&> ("tcpPeerDefer",) . fromIntegral . HashMap.size)
S.yield =<< atomically (readTVar _tcpSpawned <&> ("tcpSpawned",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpFired <&> ("tcpFired",) . fromIntegral)
S.yield =<< atomically (readTVar _tcpConnWip <&> ("tcpConnWip",) . fromIntegral . HashMap.size)
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes,cleanup]
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
withFdSocket sock setCloseOnExecIfNeeded
debug $ "Listening on" <+> pretty sa
modifyTVar _tcpPeerConn (HM.delete who)
modifyTVar _tcpPeerCookie $ \m -> do
HM.update killCookie cookie m
forever do
void $ acceptFork sock $ \(so, remote) -> do
withFdSocket so setCloseOnExecIfNeeded
trace $ "GOT INCOMING CONNECTION FROM"
<+> brackets (pretty own)
<+> brackets (pretty sa)
<+> pretty remote
void $ try @SomeException $ do
spx <- readFromSocket so 4 <&> LBS.toStrict
ssize <- readFromSocket so 4 <&> LBS.toStrict --- УУУ, фреейминг
let px = word32 spx -- & fromIntegral
let size = word32 ssize & fromIntegral
spawnConnection Server env so remote
bs <- readFromSocket so size
-- gracefulClose so 1000
-- debug $ "READ SHIT FROM CLIENT SOCKET!" <+> pretty remoteAddr
-- TODO: probably-cleanup-peer
-- TODO: periodically-drop-inactive-connections
debug $ "CLOSING CONNECTION" <+> pretty remote
shutdown so ShutdownBoth
close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat]
atomically $ writeTBQueueDropSTM 10 _tcpReceived (who, bs)
traceCmd :: forall a ann b m . ( Pretty a
, Pretty b
, MonadIO m
)
=> a -> Doc ann -> b -> m ()
traceCmd p1 s p2 = do
trace $ brackets (pretty p1)
<+> s
<+> parens (pretty p2)