fixed-messaging-and-basic-probes

This commit is contained in:
voidlizard 2024-10-25 16:26:36 +03:00
parent bf7d590886
commit e98207f5b9
10 changed files with 151 additions and 26 deletions

View File

@ -16,15 +16,14 @@ import HBS2.Net.Messaging.Stream
import HBS2.System.Logger.Simple
import Control.Monad
import Control.Monad.Fix
import Control.Monad.Reader hiding (reader)
import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.Hashable
import Data.HashMap.Strict qualified as HashMap
import Data.HashMap.Strict qualified as HM
import Data.HashMap.Strict (HashMap)
import Data.Maybe
import Data.List qualified as List
import Network.ByteOrder hiding (ByteString)
import Network.Socket
import Network.Socket.ByteString hiding (sendTo)
@ -36,6 +35,7 @@ import Lens.Micro.Platform
import Control.Monad.Trans.Cont
import UnliftIO
import Control.Concurrent.STM (retry)
import Streaming.Prelude qualified as S
data UNIX = UNIX
deriving (Eq,Ord,Show,Generic)
@ -80,6 +80,7 @@ data MessagingUnix =
, msgUnixRetryTime :: Timeout 'Seconds
, msgUnixSelf :: Peer UNIX
, msgUnixOpts :: Set MessagingUnixOpts
, msgAnyProbe :: TVar AnyProbe
, msgUnixSendTo :: TVar (HashMap (Peer UNIX) (TQueue ByteString))
, msgUnixRecv :: TQueue (From UNIX, ByteString)
, msgUnixLast :: TVar TimeSpec
@ -111,7 +112,8 @@ newMessagingUnixOpts opts server tsec path = do
tsec
(PeerUNIX path)
(Set.fromList opts)
<$> liftIO (newTVarIO mempty)
<$> newTVarIO (AnyProbe ())
<*> liftIO (newTVarIO mempty)
<*> liftIO newTQueueIO
<*> liftIO (newTVarIO now)
<*> liftIO (newTVarIO 0)
@ -124,6 +126,15 @@ data UnixMessagingStopped = UnixMessagingStopped deriving (Show,Typeable)
instance Exception UnixMessagingStopped
setProbe :: MonadIO m => MessagingUnix -> AnyProbe -> m ()
setProbe MessagingUnix{..} p = atomically $ writeTVar msgAnyProbe p
myAcceptReport :: MonadUnliftIO m => MessagingUnix -> [(Text,Integer)] -> m ()
myAcceptReport MessagingUnix{..} values = do
p <- readTVarIO msgAnyProbe
debug "myAcceptReport"
acceptReport p values
runMessagingUnix :: MonadUnliftIO m => MessagingUnix -> m ()
runMessagingUnix env = do
@ -166,9 +177,20 @@ runMessagingUnix env = do
liftIO $ listen sock 1024
void $ ContT $ withAsync do
pause @'Seconds 5
pause @'Seconds 10
readTVarIO forked >>= filterM (fmap isNothing . poll)
>>= atomically . writeTVar forked
n1 <- readTVarIO forked <&> List.length
myAcceptReport env [("forked", fromIntegral n1)]
let reportStuff = forever do
pause @'Seconds 10
what <- S.toList_ do
n1 <- atomically $ readTVar (msgUnixSendTo env) <&> fromIntegral . HM.size
S.yield ("msgUnixSendTo", n1)
myAcceptReport env what
void $ ContT $ bracket (async reportStuff) cancel
forever do
(so, _sa) <- liftIO $ accept sock
@ -189,7 +211,7 @@ runMessagingUnix env = do
let writer = liftIO $ async do
-- FIXME: check!
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup that
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HM.lookup that
for_ mq $ \q -> do
forever do
@ -292,9 +314,9 @@ runMessagingUnix env = do
-- Мы клиент. Шлём кому? **ЕМУ**, на том конце трубы.
-- У нас один контрагент, имя сокета (файла) == адрес пира.
-- Как в TCP порт сервиса (а отвечает тот с другого порта)
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HashMap.lookup who
mq <- atomically $ readTVar (msgUnixSendTo env) <&> HM.lookup who
maybe1 mq (err "unix: no queue!") $ \q -> do
maybe1 mq (err "MessagingUnix. no queue") $ \q -> do
-- если WD установлен, то просыпаемся, скажем, wd/2 и
-- шлём пустую строку серверу
-- withWD do
@ -358,20 +380,20 @@ runMessagingUnix env = do
dropQueuesFor :: MonadIO m => Peer UNIX -> m ()
dropQueuesFor who = liftIO do
atomically do
modifyTVar (msgUnixSendTo env) (HashMap.delete who)
modifyTVar (msgUnixSendTo env) (HM.delete who)
-- modifyTVar (msgUnixRecvFrom env) (HashMap.delete who)
createQueues :: MonadIO m => MessagingUnix -> Peer UNIX -> m (Peer UNIX)
createQueues env who = liftIO do
atomically $ do
sHere <- readTVar (msgUnixSendTo env) <&> HashMap.member who
sHere <- readTVar (msgUnixSendTo env) <&> HM.member who
if sHere then do
pure False
else do
sendToQ <- newTQueue
modifyTVar (msgUnixSendTo env) (HashMap.insert who sendToQ)
modifyTVar (msgUnixSendTo env) (HM.insert who sendToQ)
pure True
pure who
@ -384,7 +406,7 @@ instance Messaging MessagingUnix UNIX ByteString where
-- FIXME: handle-no-queue-for-rcpt-situation-1
mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HashMap.lookup who
mq <- atomically $ readTVar (msgUnixSendTo bus) <&> HM.lookup who
maybe1 mq none $ \q -> do
atomically $ writeTQueue q msg

View File

@ -9,7 +9,6 @@ module HBS2.Prelude
, module Numeric.Natural
, module HBS2.Clock
, MonadIO(..), MonadPlus(..)
, void, guard, when, unless
, maybe1
, eitherToMaybe
, asyncLinked
@ -27,6 +26,12 @@ module HBS2.Prelude
, (&), (<&>), for_, for
, HasErrorStatus(..), ErrorStatus(..), SomeError(..), WithSomeError(..), mayE, someE
, ByFirst(..)
, Probe(..)
, ProbeSnapshot(..)
, ToProbeSnapshot(..)
, ProbeSnapshotElement(..)
, AnyProbe(..)
, newSimpleProbe
, whenTrue, whenFalse
) where
@ -51,14 +56,19 @@ import Data.Function
import Data.Functor
import Data.Char qualified as Char
import Data.Text qualified as Text
import Data.Text (Text)
import Data.Hashable
import Data.HashMap.Strict(HashMap)
import Data.HashMap.Strict qualified as HM
import Data.Set qualified as Set
import Prettyprinter
import Data.Word
import GHC.Generics
import Control.Monad.Except
import Numeric.Natural
import Streaming.Prelude qualified as S
import UnliftIO
import Codec.Serialise
none :: forall m . Monad m => m ()
none = pure ()
@ -174,11 +184,69 @@ instance Eq a => Eq (ByFirst a b) where
instance Hashable a => Hashable (ByFirst a b) where
hashWithSalt s (ByFirst a _) = hashWithSalt s a
class ToProbeSnapshot a => Probe a where
acceptReport :: forall m . MonadIO m => a -> [(Text, Integer)] -> m ()
-- asyncLinked :: forall m . MonadUnliftIO m =>
data ProbeSnapshotElement =
ProbeSnapshotElement Text Integer
deriving stock (Eq,Ord,Show,Generic)
instance Serialise ProbeSnapshotElement
instance Pretty ProbeSnapshotElement where
pretty (ProbeSnapshotElement x y) = pretty x <+> pretty y
class ProbeSnapshot a where
probeSnapshot :: MonadIO m => a -> m [ProbeSnapshotElement]
class ToProbeSnapshot a where
toSnapshotElements :: forall m . MonadIO m => a -> m [ProbeSnapshotElement]
data SimpleProbe =
SimpleProbe
{ spName :: Text
, spTimestamp :: TVar Word64
, spProbeValues :: TVar (HashMap Text Integer)
}
instance ToProbeSnapshot SimpleProbe where
toSnapshotElements SimpleProbe{..} = do
vs <- readTVarIO spProbeValues <&> HM.toList
pure [ ProbeSnapshotElement (spName <> "." <> n) i | (n,i) <- vs ]
instance ProbeSnapshot [AnyProbe] where
probeSnapshot spx = do
what <- S.toList_ do
for_ spx $ \s -> do
toSnapshotElements s >>= S.each
pure $ Set.toList $ Set.fromList what
newSimpleProbe :: forall m . MonadIO m => Text -> m AnyProbe
newSimpleProbe name = do
s <- SimpleProbe name
<$> (liftIO getPOSIXTime >>= newTVarIO . round)
<*> newTVarIO mempty
pure $ AnyProbe s
instance ToProbeSnapshot () where
toSnapshotElements _ = pure mempty
instance Probe () where
acceptReport _ _ = pure ()
data AnyProbe = forall a . Probe a => AnyProbe a
instance Probe AnyProbe where
acceptReport (AnyProbe p) = acceptReport p
instance ToProbeSnapshot AnyProbe where
toSnapshotElements (AnyProbe p) = toSnapshotElements p
instance Probe SimpleProbe where
acceptReport SimpleProbe{..} values = do
t <- liftIO getPOSIXTime <&> round
atomically do
writeTVar spTimestamp t
modifyTVar spProbeValues (<> HM.fromList values)

View File

@ -186,7 +186,7 @@ withApp cfgPath action = do
mn <- ContT $ withAsync $ liftIO $ runReaderT (runServiceClientMulti endpoints) client
let o = [MUWatchdog 20,MUDontRetry]
let o = [MUDontRetry]
clientN <- newMessagingUnixOpts o False 1.0 soname
notif <- ContT $ withAsync (runMessagingUnix clientN)

View File

@ -254,6 +254,7 @@ runCLI = do
<> command "log" (info pLog (progDesc "set logging level"))
<> command "bypass" (info pByPass (progDesc "bypass"))
<> command "gc" (info pRunGC (progDesc "run RAM garbage collector"))
<> command "probes" (info pRunProbes (progDesc "show probes"))
<> command "version" (info pVersion (progDesc "show program version"))
)
@ -590,6 +591,17 @@ runCLI = do
void $ runMaybeT do
void $ callService @RpcPerformGC caller ()
pRunProbes = do
rpc <- pRpcCommon
pure do
withMyRPC @PeerAPI rpc $ \caller -> do
void $ runMaybeT do
p <- callService @RpcGetProbes caller ()
>>= toMPlus
liftIO $ print $ vcat (fmap pretty p)
refP :: ReadM (PubKey 'Sign 'HBS2Basic)
refP = maybeReader fromStringMay
@ -694,6 +706,8 @@ runPeer :: forall e s . ( e ~ L4Proto
runPeer opts = respawnOnError opts $ runResourceT do
probes <- liftIO $ newTVarIO (mempty :: [AnyProbe])
myself <- liftIO myThreadId
metrics <- liftIO newStore
@ -715,6 +729,8 @@ runPeer opts = respawnOnError opts $ runResourceT do
let tcpProbeWait = runReader (cfgValue @PeerTcpProbeWaitKey) syn
& fromInteger @(Timeout 'Seconds) . fromMaybe 300
let addProbe p = liftIO $ atomically $ modifyTVar probes (p:)
-- let downloadThreadNum = runReader (cfgValue @PeerDownloadThreadKey) syn & fromMaybe 1
let useSocks5 = runReader (cfgValue @PeerTcpSOCKS5) syn
@ -1212,6 +1228,9 @@ runPeer opts = respawnOnError opts $ runResourceT do
let rpcSa = getRpcSocketName conf
rpcmsg <- newMessagingUnix True 1.0 rpcSa
rpcProbe <- newSimpleProbe "RPC.MessagingUnix"
setProbe rpcmsg rpcProbe
addProbe rpcProbe
let rpcctx = RPC2Context { rpcConfig = fromPeerConfig conf
, rpcMessaging = rpcmsg
@ -1221,6 +1240,7 @@ runPeer opts = respawnOnError opts $ runResourceT do
, rpcStorage = AnyStorage s
, rpcBrains = SomeBrains brains
, rpcByPassInfo = liftIO (getStat byPass)
, rpcProbes = probes
, rpcDoFetch = liftIO . fetchHash penv denv
, rpcDoRefChanHeadPost = refChanHeadPostAction
, rpcDoRefChanPropose = refChanProposeAction

View File

@ -19,5 +19,5 @@ import RPC2.LogLevel()
import RPC2.Poll()
import RPC2.Downloads()
import RPC2.ByPassStat()
import RPC2.Probes()

View File

@ -258,6 +258,7 @@ executable hbs2-peer
, RPC2.Fetch
, RPC2.Die
, RPC2.ByPassStat
, RPC2.Probes
, RPC2.LogLevel
, RPC2.Peers
, RPC2.PexInfo

View File

@ -36,6 +36,8 @@ data RpcByPassInfo
data RpcPerformGC
data RpcGetProbes
type PeerAPI = '[ RpcPoke
, RpcPing
, RpcAnnounce
@ -52,6 +54,7 @@ type PeerAPI = '[ RpcPoke
, RpcByPassInfo
, RpcPerformGC
, RpcPollList2
, RpcGetProbes
]
instance HasProtocol UNIX (ServiceProto PeerAPI UNIX) where
@ -113,6 +116,9 @@ type instance Output RpcByPassInfo = ByPassStat
type instance Input RpcPerformGC = ()
type instance Output RpcPerformGC = ()
type instance Input RpcGetProbes = ()
type instance Output RpcGetProbes = [ProbeSnapshotElement]
data SetLogging =
DebugOn Bool
| TraceOn Bool

View File

@ -4,9 +4,9 @@ module HBS2.Peer.RPC.Internal.Types
, module HBS2.Peer.RPC.Class
) where
import HBS2.Prelude
import HBS2.Actors.Peer
import HBS2.Net.Auth.Credentials
import HBS2.Net.Proto.Types
import HBS2.Storage()
import HBS2.Data.Types.Refs (HashRef)
import HBS2.Data.Types.SignedBox
@ -17,10 +17,8 @@ import HBS2.Peer.RPC.Class
import HBS2.Peer.Brains
import Data.Config.Suckless.Syntax
import Data.Config.Suckless.Parse
import Data.Kind
import Control.Monad
import Control.Monad.Reader
import Data.ByteString ( ByteString )
import UnliftIO
@ -36,6 +34,7 @@ data RPC2Context =
, rpcStorage :: AnyStorage
, rpcBrains :: SomeBrains L4Proto
, rpcByPassInfo :: IO ByPassStat
, rpcProbes :: TVar [AnyProbe]
, rpcDoFetch :: HashRef -> IO ()
, rpcDoRefChanHeadPost :: HashRef -> IO ()
, rpcDoRefChanPropose :: (PubKey 'Sign 'HBS2Basic, SignedBox ByteString 'HBS2Basic) -> IO ()

View File

@ -74,9 +74,10 @@ instance
where
deferred m = void (async m)
withServer :: (() -> IO r) -> IO r
withServer = runContT do
withServer :: AnyProbe -> (() -> IO r) -> IO r
withServer p = runContT do
server <- newMessagingUnixOpts [] True 0.10 soname
setProbe server p
(link <=< ContT . withAsync) do
runMessagingUnix server
(link <=< ContT . withAsync) do
@ -105,15 +106,17 @@ main = do
totfuck <- newTVarIO 0
p <- newSimpleProbe "MessagingUnix"
flip runContT pure do
void $ ContT withServer
-- pause @'Seconds 1
void $ ContT (withServer p)
pause @'Seconds 1
s <- replicateM 16 $ lift $ async do
void $ flip runContT pure do
caller <- ContT withClient
tsucc <- newTVarIO 0
tfail <- newTVarIO 0
for_ [1..1000] $ \i -> do
for_ [1..10000] $ \i -> do
lift (callRpcWaitMay @EchoH (TimeoutSec 2) caller ((cs . show) i))
>>= \case
Just (Right _) -> atomically $ modifyTVar tsucc succ
@ -124,12 +127,18 @@ main = do
atomically $ modifyTVar totfuck (+fuck)
notice $ "Finished:" <+> "succeed" <+> pretty ok <+> "failed" <+> pretty fuck
pause @'Seconds 3
mapM_ wait s
tf <- readTVarIO totfuck
notice $ "total errors" <+> pretty tf
-- notice "waiting for metrics"
-- pause @'Seconds 10
-- s <- probeSnapshot [p]
-- liftIO $ print $ "probes" <> line <> vcat (fmap pretty s)
setLoggingOff @ERROR
setLoggingOff @WARN
setLoggingOff @NOTICE

View File

@ -117,7 +117,7 @@ work = do
--
-- так лучше
--
let o = [MUWatchdog 10]
let o = []
soname <- detectRPC
>>= orThrowUser "hbs2-peer not found"