diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 1073721c..ef5a13ae 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -8,6 +8,7 @@ module HBS2.Net.Messaging.TCP , tcpPeerConn , tcpCookie , tcpOnClientStarted + , messagingTCPSetProbe ) where import HBS2.Clock @@ -43,7 +44,7 @@ import UnliftIO.Async import UnliftIO.STM import UnliftIO.Exception qualified as U - +{-HLINT ignore "Functor law"-} -- FIXME: control-recv-capacity-to-avoid-leaks @@ -62,11 +63,15 @@ data MessagingTCP = , _tcpRecv :: TQueue (Peer L4Proto, ByteString) , _tcpDefer :: TVar (HashMap (Peer L4Proto) [(TimeSpec, ByteString)]) , _tcpDeferEv :: TQueue () + , _tcpProbe :: TVar AnyProbe , _tcpOnClientStarted :: PeerAddr L4Proto -> Word64 -> IO () -- ^ Cient TCP connection succeed } makeLenses 'MessagingTCP +messagingTCPSetProbe :: MonadIO m => MessagingTCP -> AnyProbe -> m () +messagingTCPSetProbe MessagingTCP{..} p = atomically $ writeTVar _tcpProbe p + newMessagingTCP :: ( MonadIO m , FromSockAddr 'TCP (Peer L4Proto) ) @@ -86,6 +91,7 @@ newMessagingTCP pa = liftIO do <*> newTQueueIO <*> newTVarIO mempty <*> newTQueueIO + <*> newTVarIO (AnyProbe ()) <*> pure (\_ _ -> none) -- do nothing by default instance Messaging MessagingTCP L4Proto ByteString where @@ -358,75 +364,93 @@ connectPeerTCP env peer = liftIO do -- FIXME: link-all-asyncs runMessagingTCP :: forall m . MonadIO m => MessagingTCP -> m () -runMessagingTCP env = liftIO do +runMessagingTCP env@MessagingTCP{..} = liftIO do - own <- toPeerAddr $ view tcpOwnPeer env - let (L4Address _ (IPAddrPort (i,p))) = own + void $ flip runContT pure do - let defs = view tcpDefer env + own <- toPeerAddr $ view tcpOwnPeer env + let (L4Address _ (IPAddrPort (i,p))) = own - mon <- async $ forever do - pause @'Seconds 30 - now <- getTimeCoarse + let defs = view tcpDefer env - -- FIXME: time-hardcode-again - let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30) - atomically $ modifyTVar defs - $ HashMap.mapMaybe - $ \es -> let rs = expire es - in case rs of - [] -> Nothing - xs -> Just xs + mon <- ContT $ withAsync $ forever do + pause @'Seconds 30 + now <- getTimeCoarse - con <- async $ forever do + -- FIXME: time-hardcode-again + let expire = filter (\e -> (realToFrac (toNanoSecs (now - fst e)) / (1e9 :: Double)) < 30) + atomically $ modifyTVar defs + $ HashMap.mapMaybe + $ \es -> let rs = expire es + in case rs of + [] -> Nothing + xs -> Just xs - let ev = view tcpDeferEv env + con <- ContT $ withAsync $ forever do - -- FIXME: wait-period-hardcode - void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev) + let ev = view tcpDeferEv env - dePips <- readTVarIO defs <&> HashMap.keys + -- FIXME: wait-period-hardcode + void $ race (pause @'Seconds 0.25) (atomically $ readTQueue ev >> flushTQueue ev) + + dePips <- readTVarIO defs <&> HashMap.keys - forM_ dePips $ \pip -> do - msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip + forM_ dePips $ \pip -> do + msgs <- readTVarIO defs <&> HashMap.findWithDefault mempty pip - unless (L.null msgs) do - trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs) + unless (L.null msgs) do + trace $ "DEFERRED FOR" <+> pretty pip <+> pretty (length msgs) - let len = length msgs + let len = length msgs - when (len > 10) do - -- FIXME: deferred-message-hardcoded - atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip) + when (len > 10) do + -- FIXME: deferred-message-hardcoded + atomically $ modifyTVar defs (HashMap.adjust (L.drop (len - 10)) pip) - co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip + co' <- atomically $ readTVar (view tcpPeerConn env) <&> HashMap.lookup pip - maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do - q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co - maybe1 q' none $ \q -> do - atomically do - mss <- readTVar defs <&> HashMap.findWithDefault mempty pip - modifyTVar defs $ HashMap.delete pip - forM_ mss $ \m -> writeTQueue q (pip, snd m) + maybe1 co' (void $ async (connectPeerTCP env pip)) $ \co -> do + q' <- atomically $ readTVar (view tcpConnQ env) <&> HashMap.lookup co + maybe1 q' none $ \q -> do + atomically do + mss <- readTVar defs <&> HashMap.findWithDefault mempty pip + modifyTVar defs $ HashMap.delete pip + forM_ mss $ \m -> writeTQueue q (pip, snd m) - pure () + pure () - stat <- async $ forever do - pause @'Seconds 120 - ps <- readTVarIO $ view tcpConnPeer env - let peers = HashMap.toList ps - forM_ peers $ \(c,pip) -> do - used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c - trace $ "peer" <+> brackets (pretty own) - <+> pretty pip - <+> pretty c - <+> parens ("used:" <+> pretty used) + stat <- ContT $ withAsync $ forever do + pause @'Seconds 120 + ps <- readTVarIO $ view tcpConnPeer env + let peers = HashMap.toList ps + forM_ peers $ \(c,pip) -> do + used <- readTVarIO (view tcpConnUsed env) <&> HashMap.findWithDefault 0 c + trace $ "peer" <+> brackets (pretty own) + <+> pretty pip + <+> pretty c + <+> parens ("used:" <+> pretty used) - mapM_ link [mon,con,stat] + mapM_ link [mon,con,stat] - liftIO ( - listen (Host (show i)) (show p) $ \(sock, sa) -> do + probes <- ContT $ withAsync $ forever do + pause @'Seconds 10 + probe <- readTVarIO _tcpProbe + values <- atomically do + a <- readTVar _tcpConnPeer <&> HashMap.size <&> (L.singleton . ("tcpConnPeer",)) + b <- readTVar _tcpPeerConn <&> HashMap.size <&> (L.singleton . ("tcpPeerConn",)) + c <- readTVar _tcpConnUsed <&> HashMap.size <&> (L.singleton . ("tcpConnUsed",)) + d <- readTVar _tcpConnQ <&> HashMap.size <&> (L.singleton . ("tcpConnQ",)) + e <- readTVar _tcpPeerPx <&> HashMap.size <&> (L.singleton . ("tcpPeerPx",)) + f <- readTVar _tcpPeerXp <&> HashMap.size <&> (L.singleton . ("tcpPeerXp",)) + g <- readTVar _tcpDefer <&> HashMap.size <&> (L.singleton . ("tcpPeerDefer",)) + pure $ mconcat [a, b, c, d, e, f, g] + acceptReport probe (fmap (over _2 fromIntegral) values) + + + ContT $ bracket (pure ()) $ \_ -> mapM_ cancel [mon,con,stat,probes] + + liftIO $ listen (Host (show i)) (show p) $ \(sock, sa) -> do withFdSocket sock setCloseOnExecIfNeeded debug $ "Listening on" <+> pretty sa @@ -449,7 +473,7 @@ runMessagingTCP env = liftIO do debug $ "CLOSING CONNECTION" <+> pretty remote shutdown so ShutdownBoth - close so ) `U.finally` mapM_ cancel [mon,con,stat] + close so -- ) -- `U.finally` mapM_ cancel [mon,con,stat] traceCmd :: forall a ann b m . ( Pretty a diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 8cbad2a9..d766f2c0 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -844,6 +844,11 @@ runPeer opts = Exception.handle (\e -> myException e <&> set tcpOnClientStarted (onClientTCPConnected brains) <&> set tcpSOCKS5 socks5 + lift do + tcpProbe <- newSimpleProbe "MessagingTCP" + addProbe tcpProbe + messagingTCPSetProbe tcpEnv tcpProbe + void $ liftIO ( async do runMessagingTCP tcpEnv `U.withException` \(e :: SomeException) -> do @@ -896,6 +901,10 @@ runPeer opts = Exception.handle (\e -> myException e rce <- refChanWorkerEnv conf penv denv refChanNotifySource + rcwProbe <- newSimpleProbe "RefChanWorker" + addProbe rcwProbe + refChanWorkerEnvSetProbe rce rcwProbe + let refChanAdapter = RefChanAdapter { refChanOnHead = refChanOnHeadFn rce diff --git a/hbs2-peer/app/RefChan.hs b/hbs2-peer/app/RefChan.hs index d0ecbc5c..0169b1c9 100644 --- a/hbs2-peer/app/RefChan.hs +++ b/hbs2-peer/app/RefChan.hs @@ -12,6 +12,7 @@ module RefChan ( , runRefChanRelyWorker , refChanWorkerEnv , refChanNotifyOnUpdated + , refChanWorkerEnvSetProbe ) where import HBS2.Prelude.Plated @@ -105,10 +106,18 @@ data RefChanWorkerEnv e = , _refChanWorkerNotifiersInbox :: TQueue (RefChanNotify e) -- ^ to rely messages from clients to gossip , _refChanWorkerNotifiersDone :: Cache (Hash HbSync) () , _refChanWorkerLocalRelyDone :: Cache (Peer UNIX, Hash HbSync) () + , _refChanWorkerProbe :: TVar AnyProbe } makeLenses 'RefChanWorkerEnv +refChanWorkerEnvSetProbe :: forall m e . (MonadIO m, ForRefChans e) + => RefChanWorkerEnv e + -> AnyProbe + -> m () +refChanWorkerEnvSetProbe RefChanWorkerEnv{..} probe = do + liftIO $ atomically $ writeTVar _refChanWorkerProbe probe + refChanWorkerEnv :: forall m e . (MonadIO m, ForRefChans e) => PeerConfig -> PeerEnv e @@ -127,6 +136,8 @@ refChanWorkerEnv conf pe de nsource = <*> newTQueueIO <*> Cache.newCache (Just defRequestLimit) <*> Cache.newCache (Just defRequestLimit) + <*> newTVarIO (AnyProbe ()) + refChanOnHeadFn :: forall e m . (ForRefChans e, MonadIO m) => RefChanWorkerEnv e -> RefChanId e -> RefChanHeadBlockTran e -> m () refChanOnHeadFn env chan tran = do @@ -595,6 +606,15 @@ refChanWorker env@RefChanWorkerEnv{..} brains = do bullshit <- ContT $ withAsync $ forever do pause @'Seconds 10 + probe <- readTVarIO _refChanWorkerProbe + values <- atomically do + refChanWorkerEnvDownloadSize <- readTVar _refChanWorkerEnvDownload <&> HashMap.size + refChanWorkerNotifiersSize <- readTVar _refChanWorkerNotifiers <&> HashMap.size + pure [ ("refChanWorkerEnvDownloadSize", fromIntegral refChanWorkerEnvDownloadSize) + , ("refChanWorkerNotifiersSize", fromIntegral refChanWorkerNotifiersSize) + ] + + acceptReport probe values debug "I'm refchan worker" waitAnyCatchCancel [hw,downloads,polls,wtrans,merge,cleanup1,bullshit]