diff --git a/hbs2-core/lib/HBS2/Actors/Peer.hs b/hbs2-core/lib/HBS2/Actors/Peer.hs index aed50da7..ad63f929 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer.hs @@ -130,6 +130,8 @@ data PeerEnv e = , _envSweepers :: TVar (HashMap SKey [PeerM e IO ()]) , _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) () , _envReqProtoLimit :: Cache (Peer e, Integer) () + , _envSentCounter :: TVar Int + , _envRecvCounter :: TVar Int , _envProbe :: TVar AnyProbe } @@ -265,6 +267,7 @@ instance ( MonadIO m , Show (Peer e) ) => Request e msg m where request peer_e msg = do + let proto = protoId @e @msg (Proxy @msg) pip <- getFabriq @e me <- ownPeer @e @@ -392,6 +395,8 @@ newPeerEnv pl s bus p = do _envSweepers <- liftIO (newTVarIO mempty) _envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit)) _envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit)) + _envSentCounter <- liftIO (newTVarIO 0) + _envRecvCounter <- liftIO (newTVarIO 0) _envProbe <- liftIO (newTVarIO (AnyProbe ())) pure PeerEnv {..} @@ -423,6 +428,9 @@ peerEnvCollectProbes PeerEnv {..} = do item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit + item "data-sent" =<< (liftIO . readTVarIO) _envSentCounter + item "data-recv" =<< (liftIO . readTVarIO) _envRecvCounter + where calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length) liftReadTVar = liftIO . readTVarIO @@ -526,7 +534,9 @@ instance ( HasProtocol e p who <- thatPeer @p self <- lift $ ownPeer @e fab <- lift $ getFabriq @e - sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto (encode msg)) + let raw = encode msg + -- atomically $ modifyTVar + sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto raw) instance ( MonadIO m -- , HasProtocol e p diff --git a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs index 15c98f3c..db51ee8a 100644 --- a/hbs2-core/lib/HBS2/Actors/Peer/Types.hs +++ b/hbs2-core/lib/HBS2/Actors/Peer/Types.hs @@ -19,7 +19,16 @@ instance {-# OVERLAPPABLE #-} -- instance HasConf m => HasConf (ResponseM e m) +data PeerCounters = + PeerStats + { peerDataSent :: Int + , peerDataRecv :: Int + } +class HasPeerCounters m where + getPeerCounters :: m PeerCounters + setPeerCounters :: PeerCounters -> m () + updatePeerCountes :: (PeerCounters -> PeerCounters) -> m () class (Monad m, HasProtocol e p) => HasGossip e p m where gossip :: p -> m () diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs b/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs index 282ff62a..d245546e 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Encrypted/ByPass.hs @@ -90,6 +90,8 @@ data ByPassStat = , statDecryptFails :: Int , statSent :: Int , statReceived :: Int + , statSentBytes :: Int + , statRecvBytes :: Int , statFlowNum :: Int , statPeers :: Int , statAuthFail :: Int @@ -119,6 +121,8 @@ data ByPass e them = , decryptFails :: TVar Int , sentNum :: TVar Int , recvNum :: TVar Int + , sentBytes :: TVar Int + , recvBytes :: TVar Int , authFail :: TVar Int , maxPkt :: TVar Int , probe :: TVar AnyProbe @@ -161,6 +165,8 @@ getStat bus = liftIO $ <*> readTVarIO (decryptFails bus) <*> readTVarIO (sentNum bus) <*> readTVarIO (recvNum bus) + <*> readTVarIO (sentBytes bus) + <*> readTVarIO (recvBytes bus) <*> (readTVarIO (flowKeys bus) <&> HashMap.size) <*> (readTVarIO (noncesByPeer bus) <&> HashMap.size) <*> readTVarIO (authFail bus) @@ -242,6 +248,8 @@ newByPassMessaging o w self ps sk = do <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 + <*> newTVarIO 0 + <*> newTVarIO 0 <*> newTVarIO (AnyProbe ()) instance (ForByPass e, Messaging w e ByteString) @@ -251,7 +259,9 @@ instance (ForByPass e, Messaging w e ByteString) mkey <- lookupEncKey bus whom - atomically $ modifyTVar (sentNum bus) succ + atomically do + modifyTVar (sentNum bus) succ + modifyTVar (sentBytes bus) (+ (fromIntegral $ LBS.length m)) case mkey of Just fck -> do @@ -286,6 +296,7 @@ instance (ForByPass e, Messaging w e ByteString) atomically do modifyTVar (recvNum bus) succ + modifyTVar (recvBytes bus) (+ (fromIntegral $ LBS.length mess)) modifyTVar (maxPkt bus) (max (fromIntegral $ LBS.length mess)) hshake <- processHey who mess @@ -524,6 +535,8 @@ instance Pretty ByPassStat where , prettyField "decryptFails" statDecryptFails , prettyField "sent" statSent , prettyField "received" statReceived + , prettyField "sentBytes" statSentBytes + , prettyField "recvBytes" statRecvBytes , prettyField "flowNum" statFlowNum , prettyField "peers" statPeers , prettyField "authFail" statAuthFail diff --git a/hbs2-peer/app/Monkeys.hs b/hbs2-peer/app/Monkeys.hs new file mode 100644 index 00000000..fb482a01 --- /dev/null +++ b/hbs2-peer/app/Monkeys.hs @@ -0,0 +1,43 @@ +module Monkeys where + +import HBS2.Prelude +import HBS2.Net.Messaging.Encrypted.ByPass +import HBS2.Misc.PrettyStuff + +import PeerTypes +import RPC2 + +import System.Mem +import Control.Monad.Trans.Cont +import UnliftIO + + +runMonkeys :: MonadUnliftIO m => RPC2Context -> m () +runMonkeys RPC2Context{..} = flip runContT pure do + + pause @'Seconds 20 + + void $ ContT $ withAsync idleMonkey + forever do + pause @'Seconds 300 + + where + + idleSleep = 120 + + idleMonkey = do + flip fix 0 $ \next bytes0 -> do + ByPassStat{..} <- liftIO rpcByPassInfo + let bytes = statSentBytes + statReceived + + when ( bytes - bytes0 < 10 * 1024 * idleSleep ) do + notice $ "Idle monkey says:" <+> green "IDLE" + onIdle + + pause @'Seconds (realToFrac idleSleep) + next bytes + + onIdle = do + liftIO performMajorGC + + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 75587ddc..762755fd 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -60,6 +60,7 @@ import HttpWorker import DispatchProxy import PeerMeta import Watchdogs +import Monkeys import CLI.Common import CLI.RefChan import CLI.LWWRef @@ -741,7 +742,7 @@ respawnOnError opts act = | Just UserInterrupt <- Exception.fromException e = notice "Interrupted by user" | otherwise = - myException e >> performGC >> respawn opts + myException e >> performMajorGC >> respawn opts runPeer :: forall e s . ( e ~ L4Proto , FromStringMaybe (PeerAddr e) @@ -1234,6 +1235,8 @@ runPeer opts = respawnOnError opts $ do peerThread "rpcWatchDog" (runRpcWatchDog myself rpc) + -- peerThread "monkeys" (runMonkeys rpc) + liftIO $ withPeerM penv do runProto @e [ makeResponse (blockSizeProto blk onNoBlock) @@ -1355,6 +1358,8 @@ runPeer opts = respawnOnError opts $ do ] void $ waitAnyCancel (w1 : w2 : wws ) + monkeys <- async $ runMonkeys rpcctx + void $ waitAnyCancel $ w <> [ loop , m1 , rpcProto @@ -1364,6 +1369,7 @@ runPeer opts = respawnOnError opts $ do , proxyThread , brainsThread , messWatchDog + , monkeys ] liftIO $ simpleStorageStop s diff --git a/hbs2-peer/app/RPC2/PerformGC.hs b/hbs2-peer/app/RPC2/PerformGC.hs index 1d8a4d08..e6f38560 100644 --- a/hbs2-peer/app/RPC2/PerformGC.hs +++ b/hbs2-peer/app/RPC2/PerformGC.hs @@ -18,8 +18,6 @@ instance ( MonadIO m => HandleMethod m RpcPerformGC where handleMethod _ = do - debug $ "rpc.performGC" - liftIO performGC - pure () - + debug $ "rpc.performMajorGC" + liftIO performMajorGC diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 58c7a65a..04b6948e 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -290,6 +290,7 @@ executable hbs2-peer , Watchdogs , Brains , DispatchProxy + , Monkeys , CLI.Common , CLI.RefChan , CLI.LWWRef