optional messagings (TCP, UDP) + watchdog

This commit is contained in:
Dmitry Zuikov 2023-10-26 06:18:27 +03:00
parent 27fd382cc3
commit 0a55f1a732
3 changed files with 69 additions and 25 deletions

View File

@ -143,6 +143,8 @@ runMessagingUDP udpMess = liftIO $ do
bind so addr bind so addr
w <- async $ udpWorker udpMess (sock udpMess) w <- async $ udpWorker udpMess (sock udpMess)
link w
waitCatch w >>= either throwIO (const $ pure ()) waitCatch w >>= either throwIO (const $ pure ())
instance Messaging MessagingUDP L4Proto ByteString where instance Messaging MessagingUDP L4Proto ByteString where

View File

@ -90,6 +90,14 @@ pexLoop brains tcpEnv = do
pl <- getPeerLocator @e pl <- getPeerLocator @e
-- TODO: investigate-tcp-session-cleanup
-- Есть подозрения, что TCP сессии не чистятся
-- надлежащим образом. Требуется расследовать.
-- NOTE: tcpPexInfo
-- Этот кусок говорит Brains о том,
-- какие TCP сессии есть в наличии.
-- Убирать пока нельзя
tcpPexInfo <- liftIO $ async $ forever do tcpPexInfo <- liftIO $ async $ forever do
-- FIXME: fix-hardcode -- FIXME: fix-hardcode
pause @'Seconds 20 pause @'Seconds 20

View File

@ -104,7 +104,7 @@ import System.Posix.Process
import UnliftIO.Exception qualified as U import UnliftIO.Exception qualified as U
-- import UnliftIO.STM -- import UnliftIO.STM
import UnliftIO.Async as U import UnliftIO.Async
import Control.Monad.Trans.Resource import Control.Monad.Trans.Resource
import Streaming.Prelude qualified as S import Streaming.Prelude qualified as S
@ -587,7 +587,7 @@ runPeer opts = U.handle (\e -> myException e
& fromInteger @(Timeout 'Seconds) . fromMaybe 300 & fromInteger @(Timeout 'Seconds) . fromMaybe 300
let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP let listenSa = view listenOn opts <|> listenConf
credFile <- pure (view peerCredFile opts <|> keyConf) `orDie` "credentials not set" credFile <- pure (view peerCredFile opts <|> keyConf) `orDie` "credentials not set"
let pref = view storage opts <|> storConf <|> Just xdg let pref = view storage opts <|> storConf <|> Just xdg
@ -634,12 +634,6 @@ runPeer opts = U.handle (\e -> myException e
notice $ "multicast:" <+> pretty localMulticast notice $ "multicast:" <+> pretty localMulticast
mess <- newMessagingUDP False listenSa
`orDie` "unable listen on the given addr"
udp <- async $ runMessagingUDP mess
let udpAddr = getOwnPeer mess
mcast <- newMessagingUDPMulticast defLocalMulticast mcast <- newMessagingUDPMulticast defLocalMulticast
`orDie` "Can't start RPC listener" `orDie` "Can't start RPC listener"
@ -654,24 +648,62 @@ runPeer opts = U.handle (\e -> myException e
pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty
let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen -- FIXME: messaing-watchdog
-- Раз уж мы не помирает в случае, если один
-- из месседжингов отвалился --- то нужно
-- сделать watchdog, который будет респавнить
-- всё, если нет ни одного живого месседжинга
trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' msgAlive <- liftIO $ newTVarIO 0
tcp <- maybe1 addr' (pure Nothing) $ \addr -> do messWatchDog <- liftIO $ async do
tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) pause @'Seconds 5
-- FIXME: handle-tcp-thread-somehow fix \next -> do
void $ async $ runMessagingTCP tcpEnv alive <- readTVarIO msgAlive
pure $ Just tcpEnv if alive <= 0 then do
err "!!! No live messaging left. Trying to respawn"
pure ()
else do
pause @'Seconds 2
next
let mudp = Just $ Dispatched mess udpPoint <- runMaybeT do
sa <- toMPlus listenSa
env <- toMPlus =<< newMessagingUDP False (Just sa)
let tcpaddr = view tcpOwnPeer <$> tcp void $ liftIO ( async do
runMessagingUDP env
`U.withException` \(e :: SomeException) -> do
err (viaShow e)
err "!!! UDP messaging stopped"
liftIO $ atomically $ modifyTVar msgAlive pred
)
let mtcp = Dispatched <$> tcp let udpAddr = getOwnPeer env
liftIO $ atomically $ modifyTVar msgAlive succ
pure (env, (udpAddr, Dispatched env))
let points = catMaybes [ (udpAddr ,) <$> mudp tcpPoint <- runMaybeT do
, (,) <$> tcpaddr <*> mtcp addr <- toMPlus $ fromStringMay @(PeerAddr L4Proto) tcpListen
tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains)
void $ liftIO ( async do
runMessagingTCP tcpEnv
`U.withException` \(e :: SomeException) -> do
err (viaShow e)
err "!!! TCP messaging stopped"
liftIO $ atomically $ modifyTVar msgAlive pred
)
let tcpaddr = view tcpOwnPeer tcpEnv
liftIO $ atomically $ modifyTVar msgAlive succ
pure (tcpEnv, (tcpaddr, Dispatched tcpEnv))
let tcp = fst <$> tcpPoint
let mtcp = view (_2 . _2) <$> tcpPoint
let mudp = view (_2 . _2) <$> udpPoint
let points = catMaybes [ snd <$> udpPoint
, snd <$> tcpPoint
] ]
proxy <- newDispatchProxy points $ \_ pip -> case view sockType pip of proxy <- newDispatchProxy points $ \_ pip -> case view sockType pip of
@ -684,14 +716,15 @@ runPeer opts = U.handle (\e -> myException e
-- Таскается везде со времени, когда Messaging был -- Таскается везде со времени, когда Messaging был
-- через TQueue. Нужно его удалить повсеместно -- через TQueue. Нужно его удалить повсеместно
-- Или сделать некий AnyAddr/DefaultAddr -- Или сделать некий AnyAddr/DefaultAddr
peerSelf <- fromPeerAddr "0.0.0.0:7351"
byPass <- newByPassMessaging @L4Proto byPass <- newByPassMessaging @L4Proto
byPassDef byPassDef
proxy proxy
(getOwnPeer mess) peerSelf
(view peerSignPk pc) (view peerSignPk pc)
(view peerSignSk pc) (view peerSignSk pc)
penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) (getOwnPeer mess) penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf
proxyThread <- async $ runDispatchProxy proxy proxyThread <- async $ runDispatchProxy proxy
@ -1006,7 +1039,8 @@ runPeer opts = U.handle (\e -> myException e
let rpc = getRpcSocketName conf let rpc = getRpcSocketName conf
let pokeAnsw = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k)) let pokeAnsw = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k))
, "udp:" <+> dquotes (pretty (listenAddr mess)) , "udp:" <+> dquotes (pretty (fst . snd <$> udpPoint))
, "tcp:" <+> dquotes (pretty (fst . snd <$> tcpPoint))
, "local-multicast:" <+> dquotes (pretty localMulticast) , "local-multicast:" <+> dquotes (pretty localMulticast)
, "rpc:" <+> dquotes (pretty rpc) , "rpc:" <+> dquotes (pretty rpc)
, http , http
@ -1039,14 +1073,14 @@ runPeer opts = U.handle (\e -> myException e
, makeResponse (makeServer @StorageAPI) , makeResponse (makeServer @StorageAPI)
] ]
void $ waitAnyCancel $ w <> [ udp void $ waitAnyCancel $ w <> [ loop
, loop
, m1 , m1
, rpcProto , rpcProto
, ann , ann
, messMcast , messMcast
, proxyThread , proxyThread
, brainsThread , brainsThread
, messWatchDog
] ]
liftIO $ simpleStorageStop s liftIO $ simpleStorageStop s