From c6af08d920ac716e3284dcb6105248ad142ef791 Mon Sep 17 00:00:00 2001 From: Snail <> Date: Wed, 30 Oct 2024 22:42:02 +0400 Subject: [PATCH] peermain probes --- hbs2-core/lib/HBS2/Actors/Peer.hs | 34 +++++++++++++++++++++++++++++++ hbs2-peer/app/PeerMain.hs | 16 +++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 6efe8dcd..670762a3 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -28,6 +28,7 @@ import Control.Monad import Control.Monad.Trans.Maybe import Control.Concurrent.Async import Control.Monad.Reader +import Control.Monad.Writer.CPS qualified as CPS import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) import Data.Cache qualified as Cache @@ -41,6 +42,8 @@ import Data.HashMap.Strict qualified as HashMap import Control.Concurrent.STM.TVar import Control.Concurrent.STM import Control.Monad.IO.Unlift +import Data.List qualified as L +import Data.Monoid qualified as Monoid import Codec.Serialise (serialise, deserialiseOrFail) @@ -127,6 +130,7 @@ data PeerEnv e = , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqProtoLimit :: Cache (Peer e, Integer) () + , _envProbe :: TVar AnyProbe } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -388,8 +392,38 @@ newPeerEnv pl s bus p = do _envSweepers <- liftIO (newTVarIO mempty) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) + _envProbe <- liftIO (newTVarIO (AnyProbe ())) pure PeerEnv {..} +peerEnvSetProbe :: (MonadIO m) => PeerEnv e -> AnyProbe -> m () +peerEnvSetProbe PeerEnv {..} p = liftIO $ atomically $ writeTVar _envProbe p + +peerEnvCollectProbes :: (MonadIO m) => PeerEnv e -> m () +peerEnvCollectProbes PeerEnv {..} = do + probe <- liftIO $ readTVarIO _envProbe + acceptReport probe =<< CPS.execWriterT do + -- _envDeferred :: Pipeline IO () + + item "sessions" =<< (liftIO . Cache.size) _envSessions + + events <- liftReadTVar _envEvents + item "events-keys" $ HashMap.size events + item "events-values-all" $ calcValuesLengthTotal events + + item "expire-times" =<< (liftIO . Cache.size) _envExpireTimes + + sweepers <- liftReadTVar _envSweepers + item "sweepers-keys" $ HashMap.size sweepers + item "sweepers-values-all" $ calcValuesLengthTotal sweepers + + item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit + item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit + + where + calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length) + liftReadTVar = liftIO . atomically . readTVar + item k v = CPS.tell [(k, fromIntegral v)] + runPeerM :: forall e m . ( MonadIO m , HasPeer e , Ord (Peer e) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index ffdd90d6..74f07e94 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -901,6 +901,13 @@ runPeer opts = respawnOnError opts $ runResourceT do (view peerSignSk pc) penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf + do + probe <- newSimpleProbe "PeerEnv_Main" + addProbe probe + peerEnvSetProbe penv probe + probesPenv <- liftIO $ async $ forever do + pause @'Seconds 10 + peerEnvCollectProbes penv proxyThread <- async $ runDispatchProxy proxy @@ -1225,6 +1232,13 @@ runPeer opts = respawnOnError opts $ runResourceT do lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box) menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) + do + probe <- newSimpleProbe "PeerEnv_Announce" + addProbe probe + peerEnvSetProbe menv probe + probesMenv <- liftIO $ async $ forever do + pause @'Seconds 10 + peerEnvCollectProbes menv ann <- liftIO $ async $ runPeerM menv $ do @@ -1307,8 +1321,10 @@ runPeer opts = respawnOnError opts $ runResourceT do void $ waitAnyCancel $ w <> [ loop , m1 , rpcProto + , probesMenv , ann , messMcast + , probesPenv , proxyThread , brainsThread , messWatchDog