This commit is contained in:
voidlizard 2024-10-30 12:08:17 +03:00
parent 55ca183f8b
commit 5452a22045
4 changed files with 44 additions and 2 deletions

View File

@ -7,6 +7,7 @@ module HBS2.Net.Messaging.Encrypted.ByPass
, ByPassStat(..) , ByPassStat(..)
, byPassDef , byPassDef
, newByPassMessaging , newByPassMessaging
, byPassMessagingSetProbe
, cleanupByPassMessaging , cleanupByPassMessaging
, getStat , getStat
) where ) where
@ -120,6 +121,7 @@ data ByPass e them =
, recvNum :: TVar Int , recvNum :: TVar Int
, authFail :: TVar Int , authFail :: TVar Int
, maxPkt :: TVar Int , maxPkt :: TVar Int
, probe :: TVar AnyProbe
} }
type ForByPass e = ( Hashable (Peer e) type ForByPass e = ( Hashable (Peer e)
@ -207,6 +209,13 @@ byPassDef =
, byPassTimeRange = Nothing , byPassTimeRange = Nothing
} }
byPassMessagingSetProbe :: forall e w m . ( ForByPass e, MonadIO m, Messaging w e ByteString)
=> ByPass e w
-> AnyProbe
-> m ()
byPassMessagingSetProbe ByPass{..} p = atomically $ writeTVar probe p
newByPassMessaging :: forall e w m . ( ForByPass e newByPassMessaging :: forall e w m . ( ForByPass e
, MonadIO m , MonadIO m
, Messaging w e ByteString , Messaging w e ByteString
@ -233,6 +242,7 @@ newByPassMessaging o w self ps sk = do
<*> newTVarIO 0 <*> newTVarIO 0
<*> newTVarIO 0 <*> newTVarIO 0
<*> newTVarIO 0 <*> newTVarIO 0
<*> newTVarIO (AnyProbe ())
instance (ForByPass e, Messaging w e ByteString) instance (ForByPass e, Messaging w e ByteString)
=> Messaging (ByPass e w) e ByteString where => Messaging (ByPass e w) e ByteString where

View File

@ -5,6 +5,7 @@
{-# Language PatternSynonyms #-} {-# Language PatternSynonyms #-}
module MailboxProtoWorker ( mailboxProtoWorker module MailboxProtoWorker ( mailboxProtoWorker
, createMailboxProtoWorker , createMailboxProtoWorker
, mailboxProtoWorkerSetProbe
, MailboxProtoWorker , MailboxProtoWorker
, IsMailboxProtoAdapter , IsMailboxProtoAdapter
, MailboxProtoException(..) , MailboxProtoException(..)
@ -137,6 +138,7 @@ data MailboxProtoWorker (s :: CryptoScheme) e =
, inMessageQueueDropped :: TVar Int , inMessageQueueDropped :: TVar Int
, inMessageDeclined :: TVar Int , inMessageDeclined :: TVar Int
, mailboxDB :: TVar (Maybe DBPipeEnv) , mailboxDB :: TVar (Maybe DBPipeEnv)
, probe :: TVar AnyProbe
} }
okay :: Monad m => good -> m (Either bad good) okay :: Monad m => good -> m (Either bad good)
@ -533,6 +535,17 @@ getMailboxType_ d r = do
<&> fmap (fromStringMay @MailboxType . fromOnly) <&> fmap (fromStringMay @MailboxType . fromOnly)
<&> headMay . catMaybes <&> headMay . catMaybes
mailboxProtoWorkerSetProbe :: forall s e m . ( MonadIO m
, s ~ Encryption e
, ForMailbox s
)
=> MailboxProtoWorker s e
-> AnyProbe
-> m ()
mailboxProtoWorkerSetProbe MailboxProtoWorker{..} p
= atomically $ writeTVar probe p
createMailboxProtoWorker :: forall s e m . ( MonadIO m createMailboxProtoWorker :: forall s e m . ( MonadIO m
, s ~ Encryption e , s ~ Encryption e
, ForMailbox s , ForMailbox s
@ -556,6 +569,7 @@ createMailboxProtoWorker pc pe de sto = do
<*> newTVarIO 0 <*> newTVarIO 0
<*> newTVarIO 0 <*> newTVarIO 0
<*> newTVarIO Nothing <*> newTVarIO Nothing
<*> newTVarIO (AnyProbe ())
mailboxProtoWorker :: forall e s m . ( MonadIO m mailboxProtoWorker :: forall e s m . ( MonadIO m
, MonadUnliftIO m , MonadUnliftIO m
@ -600,6 +614,20 @@ mailboxProtoWorker readConf me@MailboxProtoWorker{..} = do
forever do forever do
pause @'Seconds 10 pause @'Seconds 10
pro <- readTVarIO probe
values <- atomically do
mpwFetchQSize <- readTVar mpwFetchQ <&> HS.size
inMessageMergeQueueSize <- readTVar inMessageMergeQueue <&> HM.size
inPolicyDownloadQSize <- readTVar inPolicyDownloadQ <&> HM.size
inMailboxDownloadQSize <- readTVar inMailboxDownloadQ <&> HM.size
pure $ [ ("mpwFetchQ", fromIntegral mpwFetchQSize)
, ("inMessageMergeQueue", fromIntegral inMessageMergeQueueSize)
, ("inPolicyDownloadQ", fromIntegral inPolicyDownloadQSize)
, ("inMailboxDownloadQ", fromIntegral inMailboxDownloadQSize)
]
acceptReport pro values
debug $ "I'm" <+> yellow "mailboxProtoWorker" debug $ "I'm" <+> yellow "mailboxProtoWorker"
void $ waitAnyCancel [bs,dpipe,inq,mergeQ,pDownQ,sDownQ,mCheckQ,mFetchQ] void $ waitAnyCancel [bs,dpipe,inq,mergeQ,pDownQ,sDownQ,mCheckQ,mFetchQ]

View File

@ -932,6 +932,10 @@ runPeer opts = Exception.handle (\e -> myException e
mailboxWorker <- createMailboxProtoWorker pc penv denv (AnyStorage s) mailboxWorker <- createMailboxProtoWorker pc penv denv (AnyStorage s)
p <- newSimpleProbe "MailboxProtoWorker"
mailboxProtoWorkerSetProbe mailboxWorker p
addProbe p
let onNoBlock (p, h) = do let onNoBlock (p, h) = do
already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust already <- liftIO $ Cache.lookup nbcache (p,h) <&> isJust
unless already do unless already do

View File

@ -92,8 +92,8 @@ common shared-properties
-- -fno-warn-unused-binds -- -fno-warn-unused-binds
-threaded -threaded
-rtsopts -rtsopts
"-with-rtsopts=-N -A64m -AL256m -I0 -T" -- "-with-rtsopts=-N -A64m -AL256m -I0 -T"
"-with-rtsopts=-N -A4m -AL16m -I0 -T"
default-language: Haskell2010 default-language: Haskell2010