peermain probes

This commit is contained in:
Snail 2024-10-30 22:42:02 +04:00 committed by voidlizard
parent 74782d00d2
commit 5e997acc59
2 changed files with 50 additions and 0 deletions

View File

@ -28,6 +28,7 @@ import Control.Monad
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Writer.CPS qualified as CPS
import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache) import Data.Cache (Cache)
import Data.Cache qualified as Cache import Data.Cache qualified as Cache
@ -41,6 +42,8 @@ import Data.HashMap.Strict qualified as HashMap
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Monad.IO.Unlift import Control.Monad.IO.Unlift
import Data.List qualified as L
import Data.Monoid qualified as Monoid
import Codec.Serialise (serialise, deserialiseOrFail) import Codec.Serialise (serialise, deserialiseOrFail)
@ -127,6 +130,7 @@ data PeerEnv e =
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
, _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) ()
, _envReqProtoLimit :: Cache (Peer e, Integer) () , _envReqProtoLimit :: Cache (Peer e, Integer) ()
, _envProbe :: TVar AnyProbe
} }
newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a } newtype PeerM e m a = PeerM { fromPeerM :: ReaderT (PeerEnv e) m a }
@ -388,8 +392,38 @@ newPeerEnv pl s bus p = do
_envSweepers <- liftIO (newTVarIO mempty) _envSweepers <- liftIO (newTVarIO mempty)
_envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit))
_envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit))
_envProbe <- liftIO (newTVarIO (AnyProbe ()))
pure PeerEnv {..} pure PeerEnv {..}
peerEnvSetProbe :: (MonadIO m) => PeerEnv e -> AnyProbe -> m ()
peerEnvSetProbe PeerEnv {..} p = liftIO $ atomically $ writeTVar _envProbe p
peerEnvCollectProbes :: (MonadIO m) => PeerEnv e -> m ()
peerEnvCollectProbes PeerEnv {..} = do
probe <- liftIO $ readTVarIO _envProbe
acceptReport probe =<< CPS.execWriterT do
-- _envDeferred :: Pipeline IO ()
item "sessions" =<< (liftIO . Cache.size) _envSessions
events <- liftReadTVar _envEvents
item "events-keys" $ HashMap.size events
item "events-values-all" $ calcValuesLengthTotal events
item "expire-times" =<< (liftIO . Cache.size) _envExpireTimes
sweepers <- liftReadTVar _envSweepers
item "sweepers-keys" $ HashMap.size sweepers
item "sweepers-values-all" $ calcValuesLengthTotal sweepers
item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit
item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit
where
calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length)
liftReadTVar = liftIO . atomically . readTVar
item k v = CPS.tell [(k, fromIntegral v)]
runPeerM :: forall e m . ( MonadIO m runPeerM :: forall e m . ( MonadIO m
, HasPeer e , HasPeer e
, Ord (Peer e) , Ord (Peer e)

View File

@ -901,6 +901,13 @@ runPeer opts = respawnOnError opts $ runResourceT do
(view peerSignSk pc) (view peerSignSk pc)
penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf
do
probe <- newSimpleProbe "PeerEnv_Main"
addProbe probe
peerEnvSetProbe penv probe
probesPenv <- liftIO $ async $ forever do
pause @'Seconds 10
peerEnvCollectProbes penv
proxyThread <- async $ runDispatchProxy proxy proxyThread <- async $ runDispatchProxy proxy
@ -1225,6 +1232,13 @@ runPeer opts = respawnOnError opts $ runResourceT do
lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box) lift $ runResponseM me $ refChanNotifyProto @e True refChanAdapter (R.Notify @e puk box)
menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast) menv <- newPeerEnv pl (AnyStorage s) (Fabriq mcast) (getOwnPeer mcast)
do
probe <- newSimpleProbe "PeerEnv_Announce"
addProbe probe
peerEnvSetProbe menv probe
probesMenv <- liftIO $ async $ forever do
pause @'Seconds 10
peerEnvCollectProbes menv
ann <- liftIO $ async $ runPeerM menv $ do ann <- liftIO $ async $ runPeerM menv $ do
@ -1307,8 +1321,10 @@ runPeer opts = respawnOnError opts $ runResourceT do
void $ waitAnyCancel $ w <> [ loop void $ waitAnyCancel $ w <> [ loop
, m1 , m1
, rpcProto , rpcProto
, probesMenv
, ann , ann
, messMcast , messMcast
, probesPenv
, proxyThread , proxyThread
, brainsThread , brainsThread
, messWatchDog , messWatchDog