mirror of https://github.com/voidlizard/hbs2
TCP probe
This commit is contained in:
parent
acf89cd749
commit
0f7adb9b24
|
@ -8,6 +8,7 @@ module HBS2.Net.Messaging.TCP
|
|||
, tcpPeerConn
|
||||
, tcpCookie
|
||||
, tcpOnClientStarted
|
||||
, messagingTCPSetProbe
|
||||
) where
|
||||
|
||||
import HBS2.Clock
|
||||
|
@ -43,7 +44,7 @@ import UnliftIO.Async
|
|||
import UnliftIO.STM
|
||||
import UnliftIO.Exception qualified as U
|
||||
|
||||
|
||||
{-HLINT ignore "Functor law"-}
|
||||
|
||||
-- FIXME: control-recv-capacity-to-avoid-leaks
|
||||
|
||||
|
@ -62,11 +63,15 @@ data MessagingTCP =
|
|||
, _tcpRecv :: TQueue (Peer L4Proto, ByteString)
|
||||
, _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)])
|
||||
, _tcpDeferEv :: TQueue ()
|
||||
, _tcpProbe :: TVar AnyProbe
|
||||
, _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed
|
||||
}
|
||||
|
||||
makeLenses 'MessagingTCP
|
||||
|
||||
messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m ()
|
||||
messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p
|
||||
|
||||
newMessagingTCP :: ( MonadIO m
|
||||
, FromSockAddr 'TCP (Peer L4Proto)
|
||||
)
|
||||
|
@ -86,6 +91,7 @@ newMessagingTCP pa = liftIO do
|
|||
<*> newTQueueIO
|
||||
<*> newTVarIO mempty
|
||||
<*> newTQueueIO
|
||||
<*> newTVarIO (AnyProbe ())
|
||||
<*> pure (\_ _ -> none) -- do nothing by default
|
||||
|
||||
instance Messaging MessagingTCP L4Proto ByteString where
|
||||
|
@ -358,75 +364,93 @@ connectPeerTCP env peer = liftIO do
|
|||
-- FIXME: link-all-asyncs
|
||||
|
||||
runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m ()
|
||||
runMessagingTCP env = liftIO do
|
||||
runMessagingTCP env@MessagingTCP{..} = liftIO do
|
||||
|
||||
own <- toPeerAddr $ view tcpOwnPeer env
|
||||
let (L4Address _ (IPAddrPort (i,p))) = own
|
||||
void $ flip runContT pure do
|
||||
|
||||
let defs = view tcpDefer env
|
||||
own <- toPeerAddr $ view tcpOwnPeer env
|
||||
let (L4Address _ (IPAddrPort (i,p))) = own
|
||||
|
||||
mon <- async $ forever do
|
||||
pause @'Seconds 30
|
||||
now <- getTimeCoarse
|
||||
let defs = view tcpDefer env
|
||||
|
||||
-- FIXME: time-hardcode-again
|
||||
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
|
||||
atomically $ modifyTVar defs
|
||||
$ HashMap.mapMaybe
|
||||
$ \es -> let rs = expire es
|
||||
in case rs of
|
||||
[] -> Nothing
|
||||
xs -> Just xs
|
||||
mon <- ContT $ withAsync $ forever do
|
||||
pause @'Seconds 30
|
||||
now <- getTimeCoarse
|
||||
|
||||
con <- async $ forever do
|
||||
-- FIXME: time-hardcode-again
|
||||
let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30)
|
||||
atomically $ modifyTVar defs
|
||||
$ HashMap.mapMaybe
|
||||
$ \es -> let rs = expire es
|
||||
in case rs of
|
||||
[] -> Nothing
|
||||
xs -> Just xs
|
||||
|
||||
let ev = view tcpDeferEv env
|
||||
con <- ContT $ withAsync $ forever do
|
||||
|
||||
-- FIXME: wait-period-hardcode
|
||||
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
|
||||
let ev = view tcpDeferEv env
|
||||
|
||||
dePips <- readTVarIO defs <&> HashMap.keys
|
||||
-- FIXME: wait-period-hardcode
|
||||
void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev)
|
||||
|
||||
dePips <- readTVarIO defs <&> HashMap.keys
|
||||
|
||||
|
||||
forM_ dePips $ \pip -> do
|
||||
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
|
||||
forM_ dePips $ \pip -> do
|
||||
msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip
|
||||
|
||||
unless (L.null msgs) do
|
||||
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
|
||||
unless (L.null msgs) do
|
||||
trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs)
|
||||
|
||||
let len = length msgs
|
||||
let len = length msgs
|
||||
|
||||
when (len > 10) do
|
||||
-- FIXME: deferred-message-hardcoded
|
||||
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
|
||||
when (len > 10) do
|
||||
-- FIXME: deferred-message-hardcoded
|
||||
atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip)
|
||||
|
||||
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
|
||||
co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip
|
||||
|
||||
maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do
|
||||
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
|
||||
maybe1 q' none $ \q -> do
|
||||
atomically do
|
||||
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
|
||||
modifyTVar defs $ HashMap.delete pip
|
||||
forM_ mss $ \m -> writeTQueue q (pip, snd m)
|
||||
maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do
|
||||
q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co
|
||||
maybe1 q' none $ \q -> do
|
||||
atomically do
|
||||
mss <- readTVar defs <&> HashMap.findWithDefault mempty pip
|
||||
modifyTVar defs $ HashMap.delete pip
|
||||
forM_ mss $ \m -> writeTQueue q (pip, snd m)
|
||||
|
||||
pure ()
|
||||
pure ()
|
||||
|
||||
stat <- async $ forever do
|
||||
pause @'Seconds 120
|
||||
ps <- readTVarIO $ view tcpConnPeer env
|
||||
let peers = HashMap.toList ps
|
||||
forM_ peers $ \(c,pip) -> do
|
||||
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
|
||||
trace $ "peer" <+> brackets (pretty own)
|
||||
<+> pretty pip
|
||||
<+> pretty c
|
||||
<+> parens ("used:" <+> pretty used)
|
||||
stat <- ContT $ withAsync $ forever do
|
||||
pause @'Seconds 120
|
||||
ps <- readTVarIO $ view tcpConnPeer env
|
||||
let peers = HashMap.toList ps
|
||||
forM_ peers $ \(c,pip) -> do
|
||||
used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c
|
||||
trace $ "peer" <+> brackets (pretty own)
|
||||
<+> pretty pip
|
||||
<+> pretty c
|
||||
<+> parens ("used:" <+> pretty used)
|
||||
|
||||
mapM_ link [mon,con,stat]
|
||||
mapM_ link [mon,con,stat]
|
||||
|
||||
liftIO (
|
||||
listen (Host (show i)) (show p) $ \(sock, sa) -> do
|
||||
probes <- ContT $ withAsync $ forever do
|
||||
pause @'Seconds 10
|
||||
probe <- readTVarIO _tcpProbe
|
||||
values <- atomically do
|
||||
a <- readTVar _tcpConnPeer <&> HashMap.size <&> (L.singleton . ("tcpConnPeer",))
|
||||
b <- readTVar _tcpPeerConn <&> HashMap.size <&> (L.singleton . ("tcpPeerConn",))
|
||||
c <- readTVar _tcpConnUsed <&> HashMap.size <&> (L.singleton . ("tcpConnUsed",))
|
||||
d <- readTVar _tcpConnQ <&> HashMap.size <&> (L.singleton . ("tcpConnQ",))
|
||||
e <- readTVar _tcpPeerPx <&> HashMap.size <&> (L.singleton . ("tcpPeerPx",))
|
||||
f <- readTVar _tcpPeerXp <&> HashMap.size <&> (L.singleton . ("tcpPeerXp",))
|
||||
g <- readTVar _tcpDefer <&> HashMap.size <&> (L.singleton . ("tcpPeerDefer",))
|
||||
pure $ mconcat [a, b, c, d, e, f, g]
|
||||
acceptReport probe (fmap (over _2 fromIntegral) values)
|
||||
|
||||
|
||||
ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes]
|
||||
|
||||
liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do
|
||||
withFdSocket sock setCloseOnExecIfNeeded
|
||||
debug $ "Listening on" <+> pretty sa
|
||||
|
||||
|
@ -449,7 +473,7 @@ runMessagingTCP env = liftIO do
|
|||
|
||||
debug $ "CLOSING CONNECTION" <+> pretty remote
|
||||
shutdown so ShutdownBoth
|
||||
close so ) `U.finally` mapM_ cancel [mon,con,stat]
|
||||
close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat]
|
||||
|
||||
|
||||
traceCmd :: forall a ann b m . ( Pretty a
|
||||
|
|
|
@ -857,6 +857,11 @@ runPeer opts = respawnOnError opts $ runResourceT do
|
|||
<&> set tcpOnClientStarted (onClientTCPConnected brains)
|
||||
<&> set tcpSOCKS5 socks5
|
||||
|
||||
lift do
|
||||
tcpProbe <- newSimpleProbe "MessagingTCP"
|
||||
addProbe tcpProbe
|
||||
messagingTCPSetProbe tcpEnv tcpProbe
|
||||
|
||||
void $ liftIO ( async do
|
||||
runMessagingTCP tcpEnv
|
||||
`U.withException` \(e :: SomeException) -> do
|
||||
|
@ -909,6 +914,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
|
|||
|
||||
rce <- refChanWorkerEnv conf penv denv refChanNotifySource
|
||||
|
||||
rcwProbe <- newSimpleProbe "RefChanWorker"
|
||||
addProbe rcwProbe
|
||||
refChanWorkerEnvSetProbe rce rcwProbe
|
||||
|
||||
let refChanAdapter =
|
||||
RefChanAdapter
|
||||
{ refChanOnHead = refChanOnHeadFn rce
|
||||
|
|
|
@ -12,6 +12,7 @@ module RefChan (
|
|||
, runRefChanRelyWorker
|
||||
, refChanWorkerEnv
|
||||
, refChanNotifyOnUpdated
|
||||
, refChanWorkerEnvSetProbe
|
||||
) where
|
||||
|
||||
import HBS2.Prelude.Plated
|
||||
|
@ -105,10 +106,18 @@ data RefChanWorkerEnv e =
|
|||
, _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip
|
||||
, _refChanWorkerNotifiersDone :: Cache (Hash HbSync) ()
|
||||
, _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) ()
|
||||
, _refChanWorkerProbe :: TVar AnyProbe
|
||||
}
|
||||
|
||||
makeLenses 'RefChanWorkerEnv
|
||||
|
||||
refChanWorkerEnvSetProbe :: forall m e . (MonadIO m, ForRefChans e)
|
||||
=> RefChanWorkerEnv e
|
||||
-> AnyProbe
|
||||
-> m ()
|
||||
refChanWorkerEnvSetProbe RefChanWorkerEnv{..} probe = do
|
||||
liftIO $ atomically $ writeTVar _refChanWorkerProbe probe
|
||||
|
||||
refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e)
|
||||
=> PeerConfig
|
||||
-> PeerEnv e
|
||||
|
@ -127,6 +136,8 @@ refChanWorkerEnv conf pe de nsource =
|
|||
<*> newTQueueIO
|
||||
<*> Cache.newCache (Just defRequestLimit)
|
||||
<*> Cache.newCache (Just defRequestLimit)
|
||||
<*> newTVarIO (AnyProbe ())
|
||||
|
||||
|
||||
refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m ()
|
||||
refChanOnHeadFn env chan tran = do
|
||||
|
@ -595,6 +606,15 @@ refChanWorker env@RefChanWorkerEnv{..} brains = do
|
|||
|
||||
bullshit <- ContT $ withAsync $ forever do
|
||||
pause @'Seconds 10
|
||||
probe <- readTVarIO _refChanWorkerProbe
|
||||
values <- atomically do
|
||||
refChanWorkerEnvDownloadSize <- readTVar _refChanWorkerEnvDownload <&> HashMap.size
|
||||
refChanWorkerNotifiersSize <- readTVar _refChanWorkerNotifiers <&> HashMap.size
|
||||
pure [ ("refChanWorkerEnvDownloadSize", fromIntegral refChanWorkerEnvDownloadSize)
|
||||
, ("refChanWorkerNotifiersSize", fromIntegral refChanWorkerNotifiersSize)
|
||||
]
|
||||
|
||||
acceptReport probe values
|
||||
debug "I'm refchan worker"
|
||||
|
||||
waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit]
|
||||
|
|
Loading…
Reference in New Issue