mirror of https://github.com/voidlizard/hbs2
attemtps to minimize memory footprint
This commit is contained in:
parent
52dff69f74
commit
96ac593e71
|
@ -130,6 +130,8 @@ data PeerEnv e =
|
||||||
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
|
, _envSweepers :: TVar (HashMap SKey [PeerM e IO ()])
|
||||||
, _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) ()
|
, _envReqMsgLimit :: Cache (Peer e, Integer, Encoded e) ()
|
||||||
, _envReqProtoLimit :: Cache (Peer e, Integer) ()
|
, _envReqProtoLimit :: Cache (Peer e, Integer) ()
|
||||||
|
, _envSentCounter :: TVar Int
|
||||||
|
, _envRecvCounter :: TVar Int
|
||||||
, _envProbe :: TVar AnyProbe
|
, _envProbe :: TVar AnyProbe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,6 +267,7 @@ instance ( MonadIO m
|
||||||
, Show (Peer e)
|
, Show (Peer e)
|
||||||
) => Request e msg m where
|
) => Request e msg m where
|
||||||
request peer_e msg = do
|
request peer_e msg = do
|
||||||
|
|
||||||
let proto = protoId @e @msg (Proxy @msg)
|
let proto = protoId @e @msg (Proxy @msg)
|
||||||
pip <- getFabriq @e
|
pip <- getFabriq @e
|
||||||
me <- ownPeer @e
|
me <- ownPeer @e
|
||||||
|
@ -392,6 +395,8 @@ newPeerEnv pl s bus p = do
|
||||||
_envSweepers <- liftIO (newTVarIO mempty)
|
_envSweepers <- liftIO (newTVarIO mempty)
|
||||||
_envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit))
|
_envReqMsgLimit <- liftIO (Cache.newCache (Just defRequestLimit))
|
||||||
_envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit))
|
_envReqProtoLimit <- liftIO (Cache.newCache (Just defRequestLimit))
|
||||||
|
_envSentCounter <- liftIO (newTVarIO 0)
|
||||||
|
_envRecvCounter <- liftIO (newTVarIO 0)
|
||||||
_envProbe <- liftIO (newTVarIO (AnyProbe ()))
|
_envProbe <- liftIO (newTVarIO (AnyProbe ()))
|
||||||
pure PeerEnv {..}
|
pure PeerEnv {..}
|
||||||
|
|
||||||
|
@ -423,6 +428,9 @@ peerEnvCollectProbes PeerEnv {..} = do
|
||||||
item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit
|
item "req-msg-limit" =<< (liftIO . Cache.size) _envReqMsgLimit
|
||||||
item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit
|
item "req-proto-limit" =<< (liftIO . Cache.size) _envReqProtoLimit
|
||||||
|
|
||||||
|
item "data-sent" =<< (liftIO . readTVarIO) _envSentCounter
|
||||||
|
item "data-recv" =<< (liftIO . readTVarIO) _envRecvCounter
|
||||||
|
|
||||||
where
|
where
|
||||||
calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length)
|
calcValuesLengthTotal = Monoid.getSum . foldMap (Monoid.Sum . L.length)
|
||||||
liftReadTVar = liftIO . readTVarIO
|
liftReadTVar = liftIO . readTVarIO
|
||||||
|
@ -526,7 +534,9 @@ instance ( HasProtocol e p
|
||||||
who <- thatPeer @p
|
who <- thatPeer @p
|
||||||
self <- lift $ ownPeer @e
|
self <- lift $ ownPeer @e
|
||||||
fab <- lift $ getFabriq @e
|
fab <- lift $ getFabriq @e
|
||||||
sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto (encode msg))
|
let raw = encode msg
|
||||||
|
-- atomically $ modifyTVar
|
||||||
|
sendTo fab (To who) (From self) (AnyMessage @(Encoded e) @e proto raw)
|
||||||
|
|
||||||
instance ( MonadIO m
|
instance ( MonadIO m
|
||||||
-- , HasProtocol e p
|
-- , HasProtocol e p
|
||||||
|
|
|
@ -19,7 +19,16 @@ instance {-# OVERLAPPABLE #-}
|
||||||
|
|
||||||
-- instance HasConf m => HasConf (ResponseM e m)
|
-- instance HasConf m => HasConf (ResponseM e m)
|
||||||
|
|
||||||
|
data PeerCounters =
|
||||||
|
PeerStats
|
||||||
|
{ peerDataSent :: Int
|
||||||
|
, peerDataRecv :: Int
|
||||||
|
}
|
||||||
|
|
||||||
|
class HasPeerCounters m where
|
||||||
|
getPeerCounters :: m PeerCounters
|
||||||
|
setPeerCounters :: PeerCounters -> m ()
|
||||||
|
updatePeerCountes :: (PeerCounters -> PeerCounters) -> m ()
|
||||||
|
|
||||||
class (Monad m, HasProtocol e p) => HasGossip e p m where
|
class (Monad m, HasProtocol e p) => HasGossip e p m where
|
||||||
gossip :: p -> m ()
|
gossip :: p -> m ()
|
||||||
|
|
|
@ -90,6 +90,8 @@ data ByPassStat =
|
||||||
, statDecryptFails :: Int
|
, statDecryptFails :: Int
|
||||||
, statSent :: Int
|
, statSent :: Int
|
||||||
, statReceived :: Int
|
, statReceived :: Int
|
||||||
|
, statSentBytes :: Int
|
||||||
|
, statRecvBytes :: Int
|
||||||
, statFlowNum :: Int
|
, statFlowNum :: Int
|
||||||
, statPeers :: Int
|
, statPeers :: Int
|
||||||
, statAuthFail :: Int
|
, statAuthFail :: Int
|
||||||
|
@ -119,6 +121,8 @@ data ByPass e them =
|
||||||
, decryptFails :: TVar Int
|
, decryptFails :: TVar Int
|
||||||
, sentNum :: TVar Int
|
, sentNum :: TVar Int
|
||||||
, recvNum :: TVar Int
|
, recvNum :: TVar Int
|
||||||
|
, sentBytes :: TVar Int
|
||||||
|
, recvBytes :: TVar Int
|
||||||
, authFail :: TVar Int
|
, authFail :: TVar Int
|
||||||
, maxPkt :: TVar Int
|
, maxPkt :: TVar Int
|
||||||
, probe :: TVar AnyProbe
|
, probe :: TVar AnyProbe
|
||||||
|
@ -161,6 +165,8 @@ getStat bus = liftIO $
|
||||||
<*> readTVarIO (decryptFails bus)
|
<*> readTVarIO (decryptFails bus)
|
||||||
<*> readTVarIO (sentNum bus)
|
<*> readTVarIO (sentNum bus)
|
||||||
<*> readTVarIO (recvNum bus)
|
<*> readTVarIO (recvNum bus)
|
||||||
|
<*> readTVarIO (sentBytes bus)
|
||||||
|
<*> readTVarIO (recvBytes bus)
|
||||||
<*> (readTVarIO (flowKeys bus) <&> HashMap.size)
|
<*> (readTVarIO (flowKeys bus) <&> HashMap.size)
|
||||||
<*> (readTVarIO (noncesByPeer bus) <&> HashMap.size)
|
<*> (readTVarIO (noncesByPeer bus) <&> HashMap.size)
|
||||||
<*> readTVarIO (authFail bus)
|
<*> readTVarIO (authFail bus)
|
||||||
|
@ -242,6 +248,8 @@ newByPassMessaging o w self ps sk = do
|
||||||
<*> newTVarIO 0
|
<*> newTVarIO 0
|
||||||
<*> newTVarIO 0
|
<*> newTVarIO 0
|
||||||
<*> newTVarIO 0
|
<*> newTVarIO 0
|
||||||
|
<*> newTVarIO 0
|
||||||
|
<*> newTVarIO 0
|
||||||
<*> newTVarIO (AnyProbe ())
|
<*> newTVarIO (AnyProbe ())
|
||||||
|
|
||||||
instance (ForByPass e, Messaging w e ByteString)
|
instance (ForByPass e, Messaging w e ByteString)
|
||||||
|
@ -251,7 +259,9 @@ instance (ForByPass e, Messaging w e ByteString)
|
||||||
|
|
||||||
mkey <- lookupEncKey bus whom
|
mkey <- lookupEncKey bus whom
|
||||||
|
|
||||||
atomically $ modifyTVar (sentNum bus) succ
|
atomically do
|
||||||
|
modifyTVar (sentNum bus) succ
|
||||||
|
modifyTVar (sentBytes bus) (+ (fromIntegral $ LBS.length m))
|
||||||
|
|
||||||
case mkey of
|
case mkey of
|
||||||
Just fck -> do
|
Just fck -> do
|
||||||
|
@ -286,6 +296,7 @@ instance (ForByPass e, Messaging w e ByteString)
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar (recvNum bus) succ
|
modifyTVar (recvNum bus) succ
|
||||||
|
modifyTVar (recvBytes bus) (+ (fromIntegral $ LBS.length mess))
|
||||||
modifyTVar (maxPkt bus) (max (fromIntegral $ LBS.length mess))
|
modifyTVar (maxPkt bus) (max (fromIntegral $ LBS.length mess))
|
||||||
|
|
||||||
hshake <- processHey who mess
|
hshake <- processHey who mess
|
||||||
|
@ -524,6 +535,8 @@ instance Pretty ByPassStat where
|
||||||
, prettyField "decryptFails" statDecryptFails
|
, prettyField "decryptFails" statDecryptFails
|
||||||
, prettyField "sent" statSent
|
, prettyField "sent" statSent
|
||||||
, prettyField "received" statReceived
|
, prettyField "received" statReceived
|
||||||
|
, prettyField "sentBytes" statSentBytes
|
||||||
|
, prettyField "recvBytes" statRecvBytes
|
||||||
, prettyField "flowNum" statFlowNum
|
, prettyField "flowNum" statFlowNum
|
||||||
, prettyField "peers" statPeers
|
, prettyField "peers" statPeers
|
||||||
, prettyField "authFail" statAuthFail
|
, prettyField "authFail" statAuthFail
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
module Monkeys where
|
||||||
|
|
||||||
|
import HBS2.Prelude
|
||||||
|
import HBS2.Net.Messaging.Encrypted.ByPass
|
||||||
|
import HBS2.Misc.PrettyStuff
|
||||||
|
|
||||||
|
import PeerTypes
|
||||||
|
import RPC2
|
||||||
|
|
||||||
|
import System.Mem
|
||||||
|
import Control.Monad.Trans.Cont
|
||||||
|
import UnliftIO
|
||||||
|
|
||||||
|
|
||||||
|
runMonkeys :: MonadUnliftIO m => RPC2Context -> m ()
|
||||||
|
runMonkeys RPC2Context{..} = flip runContT pure do
|
||||||
|
|
||||||
|
pause @'Seconds 20
|
||||||
|
|
||||||
|
void $ ContT $ withAsync idleMonkey
|
||||||
|
forever do
|
||||||
|
pause @'Seconds 300
|
||||||
|
|
||||||
|
where
|
||||||
|
|
||||||
|
idleSleep = 120
|
||||||
|
|
||||||
|
idleMonkey = do
|
||||||
|
flip fix 0 $ \next bytes0 -> do
|
||||||
|
ByPassStat{..} <- liftIO rpcByPassInfo
|
||||||
|
let bytes = statSentBytes + statReceived
|
||||||
|
|
||||||
|
when ( bytes - bytes0 < 10 * 1024 * idleSleep ) do
|
||||||
|
notice $ "Idle monkey says:" <+> green "IDLE"
|
||||||
|
onIdle
|
||||||
|
|
||||||
|
pause @'Seconds (realToFrac idleSleep)
|
||||||
|
next bytes
|
||||||
|
|
||||||
|
onIdle = do
|
||||||
|
liftIO performMajorGC
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ import HttpWorker
|
||||||
import DispatchProxy
|
import DispatchProxy
|
||||||
import PeerMeta
|
import PeerMeta
|
||||||
import Watchdogs
|
import Watchdogs
|
||||||
|
import Monkeys
|
||||||
import CLI.Common
|
import CLI.Common
|
||||||
import CLI.RefChan
|
import CLI.RefChan
|
||||||
import CLI.LWWRef
|
import CLI.LWWRef
|
||||||
|
@ -741,7 +742,7 @@ respawnOnError opts act =
|
||||||
| Just UserInterrupt <- Exception.fromException e =
|
| Just UserInterrupt <- Exception.fromException e =
|
||||||
notice "Interrupted by user"
|
notice "Interrupted by user"
|
||||||
| otherwise =
|
| otherwise =
|
||||||
myException e >> performGC >> respawn opts
|
myException e >> performMajorGC >> respawn opts
|
||||||
|
|
||||||
runPeer :: forall e s . ( e ~ L4Proto
|
runPeer :: forall e s . ( e ~ L4Proto
|
||||||
, FromStringMaybe (PeerAddr e)
|
, FromStringMaybe (PeerAddr e)
|
||||||
|
@ -1234,6 +1235,8 @@ runPeer opts = respawnOnError opts $ do
|
||||||
|
|
||||||
peerThread "rpcWatchDog" (runRpcWatchDog myself rpc)
|
peerThread "rpcWatchDog" (runRpcWatchDog myself rpc)
|
||||||
|
|
||||||
|
-- peerThread "monkeys" (runMonkeys rpc)
|
||||||
|
|
||||||
liftIO $ withPeerM penv do
|
liftIO $ withPeerM penv do
|
||||||
runProto @e
|
runProto @e
|
||||||
[ makeResponse (blockSizeProto blk onNoBlock)
|
[ makeResponse (blockSizeProto blk onNoBlock)
|
||||||
|
@ -1355,6 +1358,8 @@ runPeer opts = respawnOnError opts $ do
|
||||||
]
|
]
|
||||||
void $ waitAnyCancel (w1 : w2 : wws )
|
void $ waitAnyCancel (w1 : w2 : wws )
|
||||||
|
|
||||||
|
monkeys <- async $ runMonkeys rpcctx
|
||||||
|
|
||||||
void $ waitAnyCancel $ w <> [ loop
|
void $ waitAnyCancel $ w <> [ loop
|
||||||
, m1
|
, m1
|
||||||
, rpcProto
|
, rpcProto
|
||||||
|
@ -1364,6 +1369,7 @@ runPeer opts = respawnOnError opts $ do
|
||||||
, proxyThread
|
, proxyThread
|
||||||
, brainsThread
|
, brainsThread
|
||||||
, messWatchDog
|
, messWatchDog
|
||||||
|
, monkeys
|
||||||
]
|
]
|
||||||
|
|
||||||
liftIO $ simpleStorageStop s
|
liftIO $ simpleStorageStop s
|
||||||
|
|
|
@ -18,8 +18,6 @@ instance ( MonadIO m
|
||||||
=> HandleMethod m RpcPerformGC where
|
=> HandleMethod m RpcPerformGC where
|
||||||
|
|
||||||
handleMethod _ = do
|
handleMethod _ = do
|
||||||
debug $ "rpc.performGC"
|
debug $ "rpc.performMajorGC"
|
||||||
liftIO performGC
|
liftIO performMajorGC
|
||||||
pure ()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -290,6 +290,7 @@ executable hbs2-peer
|
||||||
, Watchdogs
|
, Watchdogs
|
||||||
, Brains
|
, Brains
|
||||||
, DispatchProxy
|
, DispatchProxy
|
||||||
|
, Monkeys
|
||||||
, CLI.Common
|
, CLI.Common
|
||||||
, CLI.RefChan
|
, CLI.RefChan
|
||||||
, CLI.LWWRef
|
, CLI.LWWRef
|
||||||
|
|
Loading…
Reference in New Issue