From e98207f5b93348b5a61e6422438b99d1d8e2e46a Mon Sep 17 00:00:00 2001 From: voidlizard Date: Fri, 25 Oct 2024 16:26:36 +0300 Subject: [PATCH] fixed-messaging-and-basic-probes --- hbs2-core/lib/HBS2/Net/Messaging/Unix.hs | 46 +++++++++--- hbs2-core/lib/HBS2/Prelude.hs | 74 ++++++++++++++++++- hbs2-fixer/app/Main.hs | 2 +- hbs2-peer/app/PeerMain.hs | 20 +++++ hbs2-peer/app/RPC2/Peer.hs | 2 +- hbs2-peer/hbs2-peer.cabal | 1 + hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs | 6 ++ hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs | 5 +- hbs2-tests/test/CheckUnixMessaging.hs | 19 +++-- hbs2-tests/test/TestRefChanNotify.hs | 2 +- 10 files changed, 151 insertions(+), 26 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs index 9a33bfbd..f6fd7f79 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/Unix.hs @@ -16,15 +16,14 @@ import HBS2.Net.Messaging.Stream import HBS2.System.Logger.Simple -import Control.Monad -import Control.Monad.Fix import Control.Monad.Reader hiding (reader) import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.Hashable -import Data.HashMap.Strict qualified as HashMap +import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict (HashMap) import Data.Maybe +import Data.List qualified as List import Network.ByteOrder hiding (ByteString) import Network.Socket import Network.Socket.ByteString hiding (sendTo) @@ -36,6 +35,7 @@ import Lens.Micro.Platform import Control.Monad.Trans.Cont import UnliftIO import Control.Concurrent.STM (retry) +import Streaming.Prelude qualified as S data UNIX = UNIX deriving (Eq,Ord,Show,Generic) @@ -80,6 +80,7 @@ data MessagingUnix = , msgUnixRetryTime :: Timeout 'Seconds , msgUnixSelf :: Peer UNIX , msgUnixOpts :: Set MessagingUnixOpts + , msgAnyProbe :: TVar AnyProbe , msgUnixSendTo :: TVar (HashMap (Peer UNIX) (TQueue ByteString)) , msgUnixRecv :: TQueue (From UNIX, ByteString) , msgUnixLast :: TVar TimeSpec @@ -111,7 +112,8 @@ newMessagingUnixOpts opts server tsec path = do tsec (PeerUNIX path) (Set.fromList opts) - <$> liftIO (newTVarIO mempty) + <$> newTVarIO (AnyProbe ()) + <*> liftIO (newTVarIO mempty) <*> liftIO newTQueueIO <*> liftIO (newTVarIO now) <*> liftIO (newTVarIO 0) @@ -124,6 +126,15 @@ data UnixMessagingStopped = UnixMessagingStopped deriving (Show,Typeable) instance Exception UnixMessagingStopped +setProbe :: MonadIO m => MessagingUnix -> AnyProbe -> m () +setProbe MessagingUnix{..} p = atomically $ writeTVar msgAnyProbe p + +myAcceptReport :: MonadUnliftIO m => MessagingUnix -> [(Text,Integer)] -> m () +myAcceptReport MessagingUnix{..} values = do + p <- readTVarIO msgAnyProbe + debug "myAcceptReport" + acceptReport p values + runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m () runMessagingUnix env = do @@ -166,9 +177,20 @@ runMessagingUnix env = do liftIO $ listen sock 1024 void $ ContT $ withAsync do - pause @'Seconds 5 + pause @'Seconds 10 readTVarIO forked >>= filterM (fmap isNothing . poll) >>= atomically . writeTVar forked + n1 <- readTVarIO forked <&> List.length + myAcceptReport env [("forked", fromIntegral n1)] + + let reportStuff = forever do + pause @'Seconds 10 + what <- S.toList_ do + n1 <- atomically $ readTVar (msgUnixSendTo env) <&> fromIntegral . HM.size + S.yield ("msgUnixSendTo", n1) + myAcceptReport env what + + void $ ContT $ bracket (async reportStuff) cancel forever do (so, _sa) <- liftIO $ accept sock @@ -189,7 +211,7 @@ runMessagingUnix env = do let writer = liftIO $ async do -- FIXME: check! - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HM.lookup that for_ mq $ \q -> do forever do @@ -292,9 +314,9 @@ runMessagingUnix env = do -- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы. -- У нас один контрагент, имя сокета (файла) == адрес пира. -- Как в TCP порт сервиса (а отвечает тот с другого порта) - mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who + mq <- atomically $ readTVar (msgUnixSendTo env) <&> HM.lookup who - maybe1 mq (err "unix: no queue!") $ \q -> do + maybe1 mq (err "MessagingUnix. no queue") $ \q -> do -- если WD установлен, то просыпаемся, скажем, wd/2 и -- шлём пустую строку серверу -- withWD do @@ -358,20 +380,20 @@ runMessagingUnix env = do dropQueuesFor :: MonadIO m => Peer UNIX -> m () dropQueuesFor who = liftIO do atomically do - modifyTVar (msgUnixSendTo env) (HashMap.delete who) + modifyTVar (msgUnixSendTo env) (HM.delete who) -- modifyTVar (msgUnixRecvFrom env) (HashMap.delete who) createQueues :: MonadIO m => MessagingUnix -> Peer UNIX -> m (Peer UNIX) createQueues env who = liftIO do atomically $ do - sHere <- readTVar (msgUnixSendTo env) <&> HashMap.member who + sHere <- readTVar (msgUnixSendTo env) <&> HM.member who if sHere then do pure False else do sendToQ <- newTQueue - modifyTVar (msgUnixSendTo env) (HashMap.insert who sendToQ) + modifyTVar (msgUnixSendTo env) (HM.insert who sendToQ) pure True pure who @@ -384,7 +406,7 @@ instance Messaging MessagingUnix UNIX ByteString where -- FIXME: handle-no-queue-for-rcpt-situation-1 - mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HashMap.lookup who + mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HM.lookup who maybe1 mq none $ \q -> do atomically $ writeTQueue q msg diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index e70a84bc..a5323992 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -9,7 +9,6 @@ module HBS2.Prelude , module Numeric.Natural , module HBS2.Clock , MonadIO(..), MonadPlus(..) - , void, guard, when, unless , maybe1 , eitherToMaybe , asyncLinked @@ -27,6 +26,12 @@ module HBS2.Prelude , (&), (<&>), for_, for , HasErrorStatus(..), ErrorStatus(..), SomeError(..), WithSomeError(..), mayE, someE , ByFirst(..) + , Probe(..) + , ProbeSnapshot(..) + , ToProbeSnapshot(..) + , ProbeSnapshotElement(..) + , AnyProbe(..) + , newSimpleProbe , whenTrue, whenFalse ) where @@ -51,14 +56,19 @@ import Data.Function import Data.Functor import Data.Char qualified as Char import Data.Text qualified as Text +import Data.Text (Text) import Data.Hashable +import Data.HashMap.Strict(HashMap) +import Data.HashMap.Strict qualified as HM +import Data.Set qualified as Set import Prettyprinter import Data.Word import GHC.Generics import Control.Monad.Except import Numeric.Natural - +import Streaming.Prelude qualified as S import UnliftIO +import Codec.Serialise none :: forall m . Monad m => m () none = pure () @@ -174,11 +184,69 @@ instance Eq a => Eq (ByFirst a b) where instance Hashable a => Hashable (ByFirst a b) where hashWithSalt s (ByFirst a _) = hashWithSalt s a +class ToProbeSnapshot a => Probe a where + acceptReport :: forall m . MonadIO m => a -> [(Text, Integer)] -> m () --- asyncLinked :: forall m . MonadUnliftIO m => +data ProbeSnapshotElement = + ProbeSnapshotElement Text Integer + deriving stock (Eq,Ord,Show,Generic) + +instance Serialise ProbeSnapshotElement + +instance Pretty ProbeSnapshotElement where + pretty (ProbeSnapshotElement x y) = pretty x <+> pretty y + +class ProbeSnapshot a where + probeSnapshot :: MonadIO m => a -> m [ProbeSnapshotElement] + +class ToProbeSnapshot a where + toSnapshotElements :: forall m . MonadIO m => a -> m [ProbeSnapshotElement] + +data SimpleProbe = + SimpleProbe + { spName :: Text + , spTimestamp :: TVar Word64 + , spProbeValues :: TVar (HashMap Text Integer) + } + +instance ToProbeSnapshot SimpleProbe where + toSnapshotElements SimpleProbe{..} = do + vs <- readTVarIO spProbeValues <&> HM.toList + pure [ ProbeSnapshotElement (spName <> "." <> n) i | (n,i) <- vs ] +instance ProbeSnapshot [AnyProbe] where + probeSnapshot spx = do + what <- S.toList_ do + for_ spx $ \s -> do + toSnapshotElements s >>= S.each + pure $ Set.toList $ Set.fromList what +newSimpleProbe :: forall m . MonadIO m => Text -> m AnyProbe +newSimpleProbe name = do + s <- SimpleProbe name + <$> (liftIO getPOSIXTime >>= newTVarIO . round) + <*> newTVarIO mempty + pure $ AnyProbe s +instance ToProbeSnapshot () where + toSnapshotElements _ = pure mempty +instance Probe () where + acceptReport _ _ = pure () + +data AnyProbe = forall a . Probe a => AnyProbe a + +instance Probe AnyProbe where + acceptReport (AnyProbe p) = acceptReport p + +instance ToProbeSnapshot AnyProbe where + toSnapshotElements (AnyProbe p) = toSnapshotElements p + +instance Probe SimpleProbe where + acceptReport SimpleProbe{..} values = do + t <- liftIO getPOSIXTime <&> round + atomically do + writeTVar spTimestamp t + modifyTVar spProbeValues (<> HM.fromList values) diff --git a/hbs2-fixer/app/Main.hs b/hbs2-fixer/app/Main.hs index e454f681..7a53812c 100644 --- a/hbs2-fixer/app/Main.hs +++ b/hbs2-fixer/app/Main.hs @@ -186,7 +186,7 @@ withApp cfgPath action = do mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client - let o = [MUWatchdog 20,MUDontRetry] + let o = [MUDontRetry] clientN <- newMessagingUnixOpts o False 1.0 soname notif <- ContT $ withAsync (runMessagingUnix clientN) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index f21fc882..a2061b8d 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -254,6 +254,7 @@ runCLI = do <> command "log" (info pLog (progDesc "set logging level")) <> command "bypass" (info pByPass (progDesc "bypass")) <> command "gc" (info pRunGC (progDesc "run RAM garbage collector")) + <> command "probes" (info pRunProbes (progDesc "show probes")) <> command "version" (info pVersion (progDesc "show program version")) ) @@ -590,6 +591,17 @@ runCLI = do void $ runMaybeT do void $ callService @RpcPerformGC caller () + pRunProbes = do + rpc <- pRpcCommon + pure do + withMyRPC @PeerAPI rpc $ \caller -> do + void $ runMaybeT do + + p <- callService @RpcGetProbes caller () + >>= toMPlus + + liftIO $ print $ vcat (fmap pretty p) + refP :: ReadM (PubKey 'Sign 'HBS2Basic) refP = maybeReader fromStringMay @@ -694,6 +706,8 @@ runPeer :: forall e s . ( e ~ L4Proto runPeer opts = respawnOnError opts $ runResourceT do + probes <- liftIO $ newTVarIO (mempty :: [AnyProbe]) + myself <- liftIO myThreadId metrics <- liftIO newStore @@ -715,6 +729,8 @@ runPeer opts = respawnOnError opts $ runResourceT do let tcpProbeWait = runReader (cfgValue @PeerTcpProbeWaitKey) syn & fromInteger @(Timeout 'Seconds) . fromMaybe 300 + let addProbe p = liftIO $ atomically $ modifyTVar probes (p:) + -- let downloadThreadNum = runReader (cfgValue @PeerDownloadThreadKey) syn & fromMaybe 1 let useSocks5 = runReader (cfgValue @PeerTcpSOCKS5) syn @@ -1212,6 +1228,9 @@ runPeer opts = respawnOnError opts $ runResourceT do let rpcSa = getRpcSocketName conf rpcmsg <- newMessagingUnix True 1.0 rpcSa + rpcProbe <- newSimpleProbe "RPC.MessagingUnix" + setProbe rpcmsg rpcProbe + addProbe rpcProbe let rpcctx = RPC2Context { rpcConfig = fromPeerConfig conf , rpcMessaging = rpcmsg @@ -1221,6 +1240,7 @@ runPeer opts = respawnOnError opts $ runResourceT do , rpcStorage = AnyStorage s , rpcBrains = SomeBrains brains , rpcByPassInfo = liftIO (getStat byPass) + , rpcProbes = probes , rpcDoFetch = liftIO . fetchHash penv denv , rpcDoRefChanHeadPost = refChanHeadPostAction , rpcDoRefChanPropose = refChanProposeAction diff --git a/hbs2-peer/app/RPC2/Peer.hs b/hbs2-peer/app/RPC2/Peer.hs index 496d6a6a..e0023f8e 100644 --- a/hbs2-peer/app/RPC2/Peer.hs +++ b/hbs2-peer/app/RPC2/Peer.hs @@ -19,5 +19,5 @@ import RPC2.LogLevel() import RPC2.Poll() import RPC2.Downloads() import RPC2.ByPassStat() - +import RPC2.Probes() diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index d23c9983..211d7d9f 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -258,6 +258,7 @@ executable hbs2-peer , RPC2.Fetch , RPC2.Die , RPC2.ByPassStat + , RPC2.Probes , RPC2.LogLevel , RPC2.Peers , RPC2.PexInfo diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs index e95a40a9..99f3c6ff 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/API/Peer.hs @@ -36,6 +36,8 @@ data RpcByPassInfo data RpcPerformGC +data RpcGetProbes + type PeerAPI = '[ RpcPoke , RpcPing , RpcAnnounce @@ -52,6 +54,7 @@ type PeerAPI = '[ RpcPoke , RpcByPassInfo , RpcPerformGC , RpcPollList2 + , RpcGetProbes ] instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where @@ -113,6 +116,9 @@ type instance Output RpcByPassInfo = ByPassStat type instance Input RpcPerformGC = () type instance Output RpcPerformGC = () +type instance Input RpcGetProbes = () +type instance Output RpcGetProbes = [ProbeSnapshotElement] + data SetLogging = DebugOn Bool | TraceOn Bool diff --git a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs index 2aa3dc61..36d8e0ab 100644 --- a/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs +++ b/hbs2-peer/lib/HBS2/Peer/RPC/Internal/Types.hs @@ -4,9 +4,9 @@ module HBS2.Peer.RPC.Internal.Types , module HBS2.Peer.RPC.Class ) where +import HBS2.Prelude import HBS2.Actors.Peer import HBS2.Net.Auth.Credentials -import HBS2.Net.Proto.Types import HBS2.Storage() import HBS2.Data.Types.Refs (HashRef) import HBS2.Data.Types.SignedBox @@ -17,10 +17,8 @@ import HBS2.Peer.RPC.Class import HBS2.Peer.Brains import Data.Config.Suckless.Syntax -import Data.Config.Suckless.Parse import Data.Kind -import Control.Monad import Control.Monad.Reader import Data.ByteString ( ByteString ) import UnliftIO @@ -36,6 +34,7 @@ data RPC2Context = , rpcStorage :: AnyStorage , rpcBrains :: SomeBrains L4Proto , rpcByPassInfo :: IO ByPassStat + , rpcProbes :: TVar [AnyProbe] , rpcDoFetch :: HashRef -> IO () , rpcDoRefChanHeadPost :: HashRef -> IO () , rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO () diff --git a/hbs2-tests/test/CheckUnixMessaging.hs b/hbs2-tests/test/CheckUnixMessaging.hs index c9acf5b8..909b84c6 100644 --- a/hbs2-tests/test/CheckUnixMessaging.hs +++ b/hbs2-tests/test/CheckUnixMessaging.hs @@ -74,9 +74,10 @@ instance where deferred m = void (async m) -withServer :: (() -> IO r) -> IO r -withServer = runContT do +withServer :: AnyProbe -> (() -> IO r) -> IO r +withServer p = runContT do server <- newMessagingUnixOpts [] True 0.10 soname + setProbe server p (link <=< ContT . withAsync) do runMessagingUnix server (link <=< ContT . withAsync) do @@ -105,15 +106,17 @@ main = do totfuck <- newTVarIO 0 + p <- newSimpleProbe "MessagingUnix" + flip runContT pure do - void $ ContT withServer - -- pause @'Seconds 1 + void $ ContT (withServer p) + pause @'Seconds 1 s <- replicateM 16 $ lift $ async do void $ flip runContT pure do caller <- ContT withClient tsucc <- newTVarIO 0 tfail <- newTVarIO 0 - for_ [1..1000] $ \i -> do + for_ [1..10000] $ \i -> do lift (callRpcWaitMay @EchoH (TimeoutSec 2) caller ((cs . show) i)) >>= \case Just (Right _) -> atomically $ modifyTVar tsucc succ @@ -124,12 +127,18 @@ main = do atomically $ modifyTVar totfuck (+fuck) notice $ "Finished:" <+> "succeed" <+> pretty ok <+> "failed" <+> pretty fuck + pause @'Seconds 3 mapM_ wait s tf <- readTVarIO totfuck notice $ "total errors" <+> pretty tf + -- notice "waiting for metrics" + -- pause @'Seconds 10 + -- s <- probeSnapshot [p] + -- liftIO $ print $ "probes" <> line <> vcat (fmap pretty s) + setLoggingOff @ERROR setLoggingOff @WARN setLoggingOff @NOTICE diff --git a/hbs2-tests/test/TestRefChanNotify.hs b/hbs2-tests/test/TestRefChanNotify.hs index 9458c33b..3a0fa132 100644 --- a/hbs2-tests/test/TestRefChanNotify.hs +++ b/hbs2-tests/test/TestRefChanNotify.hs @@ -117,7 +117,7 @@ work = do -- -- так лучше -- - let o = [MUWatchdog 10] + let o = [] soname <- detectRPC >>= orThrowUser "hbs2-peer not found"