diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index 6efe8dcd..670762a3 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -28,6 +28,7 @@ import Control.Monad import Control.Monad.Trans.Maybe import Control.Concurrent.Async import Control.Monad.Reader +import Control.Monad.Writer.CPS qualified as CPS import Data.ByteString.Lazy (ByteString) import Data.Cache (Cache) import Data.Cache qualified as Cache @@ -41,6 +42,8 @@ import Data.HashMap.Strict qualified as HashMap import Control.Concurrent.STM.TVar import Control.Concurrent.STM import Control.Monad.IO.Unlift +import Data.List qualified as L +import Data.Monoid qualified as Monoid import Codec.Serialise (serialise, deserialiseOrFail) @@ -127,6 +130,7 @@ data PeerEnv e = , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqProtoLimit :: Cache (Peer e, Integer) () + , _envProbe :: TVar AnyProbe } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } @@ -388,8 +392,38 @@ newPeerEnv pl s bus p = do _envSweepers <- liftIO (newTVarIO mempty) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) + _envProbe <- liftIO (newTVarIO (AnyProbe ())) pure PeerEnv {..} +peerEnvSetProbe :: (MonadIO m) => PeerEnv e -> AnyProbe -> m () +peerEnvSetProbe PeerEnv {..} p = liftIO $ atomically $ writeTVar _envProbe p + +peerEnvCollectProbes :: (MonadIO m) => PeerEnv e -> m () +peerEnvCollectProbes PeerEnv {..} = do + probe <- liftIO $ readTVarIO _envProbe + acceptReport probe =<< CPS.execWriterT do + -- _envDeferred :: Pipeline IO () + + item "sessions" =<< (liftIO . Cache.size) _envSessions + + events <- liftReadTVar _envEvents + item "events-keys" $ HashMap.size events + item "events-values-all" $ calcValuesLengthTotal events + + item "expire-times" =<< (liftIO . Cache.size) _envExpireTimes + + sweepers <- liftReadTVar _envSweepers + item "sweepers-keys" $ HashMap.size sweepers + item "sweepers-values-all" $ calcValuesLengthTotal sweepers + + item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit + item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit + + where + calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length) + liftReadTVar = liftIO . atomically . readTVar + item k v = CPS.tell [(k, fromIntegral v)] + runPeerM :: forall e m . ( MonadIO m , HasPeer e , Ord (Peer e) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 1f1b4d79..ecd6bb0c 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -888,6 +888,13 @@ runPeer opts = Exception.handle (\e -> myException e (view peerSignSk pc) penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf + do + probe <- newSimpleProbe "PeerEnv_Main" + addProbe probe + peerEnvSetProbe penv probe + probesPenv <- liftIO $ async $ forever do + pause @'Seconds 10 + peerEnvCollectProbes penv proxyThread <- async $ runDispatchProxy proxy @@ -1212,6 +1219,13 @@ runPeer opts = Exception.handle (\e -> myException e lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box) menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) + do + probe <- newSimpleProbe "PeerEnv_Announce" + addProbe probe + peerEnvSetProbe menv probe + probesMenv <- liftIO $ async $ forever do + pause @'Seconds 10 + peerEnvCollectProbes menv ann <- liftIO $ async $ runPeerM menv $ do @@ -1294,8 +1308,10 @@ runPeer opts = Exception.handle (\e -> myException e void $ waitAnyCancel $ w <> [ loop , m1 , rpcProto + , probesMenv , ann , messMcast + , probesPenv , proxyThread , brainsThread , messWatchDog