peermain probes

This commit is contained in:
Snail 2024-10-30 22:42:02 +04:00 committed by voidlizard
parent 45a98d0d2e
commit 7560c2f3f3
2 changed files with 50 additions and 0 deletions

View File

@ -28,6 +28,7 @@ import Control.Monad
import Control.Monad.Trans.Maybe
import Control.Concurrent.Async
import Control.Monad.Reader
import Control.Monad.Writer.CPS qualified as CPS
import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
@ -41,6 +42,8 @@ import Data.HashMap.Strict qualified as HashMap
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM
import Control.Monad.IO.Unlift
import Data.List qualified as L
import Data.Monoid qualified as Monoid
import Codec.Serialise (serialise, deserialiseOrFail)
@ -127,6 +130,7 @@ data PeerEnv e =
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
, _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) ()
, _envReqProtoLimit :: Cache (Peer e, Integer) ()
, _envProbe :: TVar AnyProbe
}
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a }
@ -388,8 +392,38 @@ newPeerEnv pl s bus p = do
_envSweepers <- liftIO (newTVarIO mempty)
_envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit))
_envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit))
_envProbe <- liftIO (newTVarIO (AnyProbe ()))
pure PeerEnv {..}
peerEnvSetProbe :: (MonadIO m) => PeerEnv e -> AnyProbe -> m ()
peerEnvSetProbe PeerEnv {..} p = liftIO $ atomically $ writeTVar _envProbe p
peerEnvCollectProbes :: (MonadIO m) => PeerEnv e -> m ()
peerEnvCollectProbes PeerEnv {..} = do
probe <- liftIO $ readTVarIO _envProbe
acceptReport probe =<< CPS.execWriterT do
-- _envDeferred :: Pipeline IO ()
item "sessions" =<< (liftIO . Cache.size) _envSessions
events <- liftReadTVar _envEvents
item "events-keys" $ HashMap.size events
item "events-values-all" $ calcValuesLengthTotal events
item "expire-times" =<< (liftIO . Cache.size) _envExpireTimes
sweepers <- liftReadTVar _envSweepers
item "sweepers-keys" $ HashMap.size sweepers
item "sweepers-values-all" $ calcValuesLengthTotal sweepers
item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit
item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit
where
calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length)
liftReadTVar = liftIO . atomically . readTVar
item k v = CPS.tell [(k, fromIntegral v)]
runPeerM :: forall e m . ( MonadIO m
, HasPeer e
, Ord (Peer e)

View File

@ -888,6 +888,13 @@ runPeer opts = Exception.handle (\e -> myException e
(view peerSignSk pc)
penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf
do
probe <- newSimpleProbe "PeerEnv_Main"
addProbe probe
peerEnvSetProbe penv probe
probesPenv <- liftIO $ async $ forever do
pause @'Seconds 10
peerEnvCollectProbes penv
proxyThread <- async $ runDispatchProxy proxy
@ -1212,6 +1219,13 @@ runPeer opts = Exception.handle (\e -> myException e
lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box)
menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast)
do
probe <- newSimpleProbe "PeerEnv_Announce"
addProbe probe
peerEnvSetProbe menv probe
probesMenv <- liftIO $ async $ forever do
pause @'Seconds 10
peerEnvCollectProbes menv
ann <- liftIO $ async $ runPeerM menv $ do
@ -1294,8 +1308,10 @@ runPeer opts = Exception.handle (\e -> myException e
void $ waitAnyCancel $ w <> [ loop
, m1
, rpcProto
, probesMenv
, ann
, messMcast
, probesPenv
, proxyThread
, brainsThread
, messWatchDog