From 0a55f1a73256d43bb3ae5d39010d349a973d191e Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Thu, 26 Oct 2023 06:18:27 +0300 Subject: [PATCH] optional messagings (TCP, UDP) + watchdog --- hbs2-core/lib/HBS2/Net/Messaging/UDP.hs | 2 + hbs2-peer/app/PeerInfo.hs | 8 +++ hbs2-peer/app/PeerMain.hs | 84 +++++++++++++++++-------- 3 files changed, 69 insertions(+), 25 deletions(-) diff --git a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs index e47789fb..1cb7f71c 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/UDP.hs @@ -143,6 +143,8 @@ runMessagingUDP udpMess = liftIO $ do bind so addr w <- async $ udpWorker udpMess (sock udpMess) + link w + waitCatch w >>= either throwIO (const $ pure ()) instance Messaging MessagingUDP L4Proto ByteString where diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index 172b3f27..9e2c7e4a 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -90,6 +90,14 @@ pexLoop brains tcpEnv = do pl <- getPeerLocator @e + -- TODO: investigate-tcp-session-cleanup + -- Есть подозрения, что TCP сессии не чистятся + -- надлежащим образом. Требуется расследовать. + + -- NOTE: tcpPexInfo + -- Этот кусок говорит Brains о том, + -- какие TCP сессии есть в наличии. + -- Убирать пока нельзя tcpPexInfo <- liftIO $ async $ forever do -- FIXME: fix-hardcode pause @'Seconds 20 diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 82a52a38..6ae79433 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -104,7 +104,7 @@ import System.Posix.Process import UnliftIO.Exception qualified as U -- import UnliftIO.STM -import UnliftIO.Async as U +import UnliftIO.Async import Control.Monad.Trans.Resource import Streaming.Prelude qualified as S @@ -587,7 +587,7 @@ runPeer opts = U.handle (\e -> myException e & fromInteger @(Timeout 'Seconds) . fromMaybe 300 - let listenSa = view listenOn opts <|> listenConf <|> Just defListenUDP + let listenSa = view listenOn opts <|> listenConf credFile <- pure (view peerCredFile opts <|> keyConf) `orDie` "credentials not set" let pref = view storage opts <|> storConf <|> Just xdg @@ -634,12 +634,6 @@ runPeer opts = U.handle (\e -> myException e notice $ "multicast:" <+> pretty localMulticast - mess <- newMessagingUDP False listenSa - `orDie` "unable listen on the given addr" - - udp <- async $ runMessagingUDP mess - - let udpAddr = getOwnPeer mess mcast <- newMessagingUDPMulticast defLocalMulticast `orDie` "Can't start RPC listener" @@ -654,24 +648,62 @@ runPeer opts = U.handle (\e -> myException e pl <- AnyPeerLocator <$> newBrainyPeerLocator @e (SomeBrains @e brains) mempty - let addr' = fromStringMay @(PeerAddr L4Proto) tcpListen + -- FIXME: messaing-watchdog + -- Раз уж мы не помирает в случае, если один + -- из месседжингов отвалился --- то нужно + -- сделать watchdog, который будет респавнить + -- всё, если нет ни одного живого месседжинга - trace $ "TCP addr:" <+> pretty tcpListen <+> pretty addr' + msgAlive <- liftIO $ newTVarIO 0 - tcp <- maybe1 addr' (pure Nothing) $ \addr -> do - tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) - -- FIXME: handle-tcp-thread-somehow - void $ async $ runMessagingTCP tcpEnv - pure $ Just tcpEnv + messWatchDog <- liftIO $ async do + pause @'Seconds 5 + fix \next -> do + alive <- readTVarIO msgAlive + if alive <= 0 then do + err "!!! No live messaging left. Trying to respawn" + pure () + else do + pause @'Seconds 2 + next - let mudp = Just $ Dispatched mess + udpPoint <- runMaybeT do + sa <- toMPlus listenSa + env <- toMPlus =<< newMessagingUDP False (Just sa) - let tcpaddr = view tcpOwnPeer <$> tcp + void $ liftIO ( async do + runMessagingUDP env + `U.withException` \(e :: SomeException) -> do + err (viaShow e) + err "!!! UDP messaging stopped" + liftIO $ atomically $ modifyTVar msgAlive pred + ) - let mtcp = Dispatched <$> tcp + let udpAddr = getOwnPeer env + liftIO $ atomically $ modifyTVar msgAlive succ + pure (env, (udpAddr, Dispatched env)) - let points = catMaybes [ (udpAddr ,) <$> mudp - , (,) <$> tcpaddr <*> mtcp + tcpPoint <- runMaybeT do + addr <- toMPlus $ fromStringMay @(PeerAddr L4Proto) tcpListen + tcpEnv <- newMessagingTCP addr <&> set tcpOnClientStarted (onClientTCPConnected brains) + void $ liftIO ( async do + runMessagingTCP tcpEnv + `U.withException` \(e :: SomeException) -> do + err (viaShow e) + err "!!! TCP messaging stopped" + liftIO $ atomically $ modifyTVar msgAlive pred + ) + let tcpaddr = view tcpOwnPeer tcpEnv + liftIO $ atomically $ modifyTVar msgAlive succ + pure (tcpEnv, (tcpaddr, Dispatched tcpEnv)) + + let tcp = fst <$> tcpPoint + + let mtcp = view (_2 . _2) <$> tcpPoint + let mudp = view (_2 . _2) <$> udpPoint + + let points = catMaybes [ snd <$> udpPoint + , snd <$> tcpPoint ] proxy <- newDispatchProxy points $ \_ pip -> case view sockType pip of @@ -684,14 +716,15 @@ runPeer opts = U.handle (\e -> myException e -- Таскается везде со времени, когда Messaging был -- через TQueue. Нужно его удалить повсеместно -- Или сделать некий AnyAddr/DefaultAddr + peerSelf <- fromPeerAddr "0.0.0.0:7351" byPass <- newByPassMessaging @L4Proto byPassDef proxy - (getOwnPeer mess) + peerSelf (view peerSignPk pc) (view peerSignSk pc) - penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) (getOwnPeer mess) + penv <- newPeerEnv pl (AnyStorage s) (Fabriq byPass) peerSelf proxyThread <- async $ runDispatchProxy proxy @@ -1006,7 +1039,8 @@ runPeer opts = U.handle (\e -> myException e let rpc = getRpcSocketName conf let pokeAnsw = show $ vcat [ "peer-key:" <+> dquotes (pretty (AsBase58 k)) - , "udp:" <+> dquotes (pretty (listenAddr mess)) + , "udp:" <+> dquotes (pretty (fst . snd <$> udpPoint)) + , "tcp:" <+> dquotes (pretty (fst . snd <$> tcpPoint)) , "local-multicast:" <+> dquotes (pretty localMulticast) , "rpc:" <+> dquotes (pretty rpc) , http @@ -1039,14 +1073,14 @@ runPeer opts = U.handle (\e -> myException e , makeResponse (makeServer @StorageAPI) ] - void $ waitAnyCancel $ w <> [ udp - , loop + void $ waitAnyCancel $ w <> [ loop , m1 , rpcProto , ann , messMcast , proxyThread , brainsThread + , messWatchDog ] liftIO $ simpleStorageStop s