From 2564a05bce038401f640bb28bed572dd52c9a5ab Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 18 Mar 2025 18:20:18 +0300 Subject: [PATCH] fixed some issues --- hbs2-core/hbs2-core.cabal | 1 + hbs2-core/lib/HBS2/Net/Messaging/TCP.hs | 58 +++++++++++++++---------- hbs2-peer/app/PeerInfo.hs | 25 +++++------ hbs2-peer/app/PeerMain.hs | 22 +++++++++- hbs2-peer/app/RPC2/Die.hs | 10 +++-- 5 files changed, 74 insertions(+), 42 deletions(-) diff --git a/hbs2-core/hbs2-core.cabal b/hbs2-core/hbs2-core.cabal index 93df1de7..8344ec6b 100644 --- a/hbs2-core/hbs2-core.cabal +++ b/hbs2-core/hbs2-core.cabal @@ -174,6 +174,7 @@ library , network-multicast , network-simple , network-byte-order + , psqueues , prettyprinter , prettyprinter-ansi-terminal , mwc-random diff --git a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs index 13f75be2..3499eb03 100644 --- a/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs +++ b/hbs2-core/lib/HBS2/Net/Messaging/TCP.hs @@ -29,6 +29,8 @@ import Data.ByteString.Lazy (ByteString) import Data.ByteString.Lazy qualified as LBS import Data.HashMap.Strict (HashMap) import Data.HashMap.Strict qualified as HM +import Data.HashPSQ (HashPSQ) +import Data.HashPSQ qualified as HPSQ import Data.HashSet qualified as HS import Data.Maybe import Data.Word @@ -66,7 +68,7 @@ data MessagingTCP = , _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket) , _tcpConnDemand :: TQueue (Peer L4Proto) , _tcpReceived :: TBQueue (Peer L4Proto, ByteString) - , _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString)) + , _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString)) , _tcpClientThreadNum :: TVar Int , _tcpClientThreads :: TVar (HashMap Int (Async ())) , _tcpServerThreadsCount :: TVar Int @@ -108,7 +110,7 @@ newMessagingTCP pa = liftIO do <*> newTVarIO mempty <*> newTQueueIO <*> newTBQueueIO (10 * outMessageQLen) - <*> newTVarIO mempty + <*> newTVarIO HPSQ.empty <*> newTVarIO 0 <*> newTVarIO mempty <*> newTVarIO 0 @@ -121,19 +123,22 @@ instance Messaging MessagingTCP L4Proto ByteString where -- let _own = tcpOwnPeer -- debug $ "!!!! FUCKING SEND TO" <+> pretty p + now <- getTimeCoarse queue <- atomically do - q' <- readTVar _tcpSent <&> HM.lookup p + q' <- readTVar _tcpSent <&> HPSQ.lookup p case q' of Nothing -> do writeTQueue _tcpConnDemand p q <- newTBQueue outMessageQLen - modifyTVar _tcpSent (HM.insert p q) + modifyTVar _tcpSent (HPSQ.insert p now q) pure q - Just q -> pure q + Just (_,q) -> pure q atomically $ writeTBQueueDropSTM 10 queue msg + atomically $ stateTVar _tcpSent (HPSQ.alter (\x -> ((), fmap (set _1 now) x)) p) + -- atomically $ insert -- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE" @@ -240,18 +245,23 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo) S.yield ("tcpPeerCookieUsed", cooNn) - S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size) + S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size) sweepCookies <- ContT $ withAsync $ forever do - pause @'Seconds 120 - -- atomically do - -- pips <- readTVar _tcpPeerConn - -- modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) - -- alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems - -- modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) + pause @'Seconds 300 + atomically do + pips <- readTVar _tcpPeerConn + modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips)) + alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems + modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive)) sweep <- ContT $ withAsync $ forever do - pause @'Seconds 120 + pause @'Seconds 300 + now <- getTimeCoarse + atomically do + w <- readTVar _tcpSent <&> HPSQ.toList + let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ] + writeTVar _tcpSent (HPSQ.fromList live) -- atomically do -- pips <- readTVar _tcpPeerConn -- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips)) @@ -325,16 +335,20 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do atomically $ modifyTVar _tcpServerThreadsCount succ - newOutQ <- do - atomically do - mbQ <- readTVar _tcpSent <&> HM.lookup newP - maybe (newTBQueue outMessageQLen) pure mbQ + now <- getTimeCoarse + newOutQ <- atomically do + q <- readTVar _tcpSent <&> HPSQ.lookup newP >>= \case + Just (_,w) -> pure w + Nothing -> do + nq <- newTBQueue outMessageQLen + modifyTVar _tcpSent (HPSQ.insert newP now nq) + pure nq - atomically do - modifyTVar _tcpSent (HM.insert newP newOutQ) modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie)) modifyTVar _tcpPeerSocket (HM.insert newP so) + pure q + wr <- ContT $ withAsync $ forever do bs <- atomically $ readTBQueue newOutQ @@ -363,12 +377,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do cancel wr atomically do - modifyTVar _tcpSent (HM.delete newP) + modifyTVar _tcpSent (HPSQ.delete newP) modifyTVar _tcpPeerCookie (HM.update killCookie cookie) void $ waitAnyCatchCancel [rd,wr] - runClient = flip runContT pure do let myCookie = view tcpCookie env @@ -433,7 +446,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do wr <- ContT $ withAsync $ forever do bss <- atomically do - q' <- readTVar _tcpSent <&> HM.lookup who + q' <- readTVar _tcpSent <&> fmap (view _2) . HPSQ.lookup who maybe1 q' mempty $ \q -> do s <- readTBQueue q sx <- flushTBQueue q @@ -458,6 +471,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do modifyTVar _tcpPeerCookie (HM.update killCookie cookie) modifyTVar _tcpPeerToCookie (HM.delete who) modifyTVar _tcpPeerSocket (HM.delete who) + modifyTVar _tcpSent (HPSQ.delete who) void $ ContT $ bracket none (const $ cancel wr) diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index a31cb94e..102c715a 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -234,22 +234,17 @@ peerPingLoop (PeerConfig syn) penv = do forever do - -- FIXME: defaults - r <- liftIO $ race (pause @'Seconds pingTime) - (atomically $ readTQueue wake) - - sas' <- liftIO $ atomically $ STM.flushTQueue wake <&> mconcat - - let sas = case r of - Left{} -> sas' - Right sa -> sa <> sas' - debug "peerPingLoop" - pips <- knownPeers @e pl <&> (<> sas) <&> List.nub + let pips = do + sas <- liftIO (atomically $ STM.flushTQueue wake <&> mconcat) + rest <- knownPeers @e pl + pure (fmap (,realToFrac pingTime) (List.nub $ sas <> rest)) - for_ pips $ \p -> do - -- trace $ "SEND PING TO" <+> pretty p - lift $ sendPing @e p - -- trace $ "SENT PING TO" <+> pretty p + polling (Polling 2.5 2) pips $ \p -> do + liftIO $ withPeerM penv do + debug $ "SEND PING TO" <+> pretty p + try @_ @SomeException (sendPing @e p) >>= \case + Left e -> err (viaShow e) + Right{} -> none diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 6bded06a..fc94a230 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -252,6 +252,7 @@ runCLI = do <*> hsubparser ( command "init" (info pInit (progDesc "creates default config")) <> command "run" (info pRun (progDesc "run peer")) + <> command "start" (info pRunStart (progDesc "run peer")) <> command "poke" (info pPoke (progDesc "poke peer by rpc")) <> command "die" (info pDie (progDesc "die cmd")) <> command "announce" (info pAnnounce (progDesc "announce block")) @@ -306,7 +307,26 @@ runCLI = do pPubKeySign = maybeReader (fromStringMay @(PubKey 'Sign 'HBS2Basic)) - pRun = do + pRun = pure do + self <- getExecutablePath + args' <- getArgs + + print (self, args') + + let args = "start" : L.dropWhile (=="run") args' + + flip runContT pure $ fix \next -> do + + pid <- ContT $ withAsync do + liftIO (executeFile self False args Nothing) + + void $ waitCatch pid + + liftIO $ putStrLn "hbs2-peer run stopped/terminated; respawning" + pause @'Seconds 3 + next + + pRunStart = do runPeer <$> common pDie = do diff --git a/hbs2-peer/app/RPC2/Die.hs b/hbs2-peer/app/RPC2/Die.hs index 34f3263e..f091820b 100644 --- a/hbs2-peer/app/RPC2/Die.hs +++ b/hbs2-peer/app/RPC2/Die.hs @@ -1,6 +1,8 @@ +{-# Language UndecidableInstances #-} module RPC2.Die where import HBS2.Prelude.Plated +import HBS2.Peer.RPC.Internal.Types import HBS2.Clock import HBS2.Net.Proto.Service @@ -8,16 +10,16 @@ import HBS2.System.Logger.Simple import HBS2.Peer.RPC.API.Peer +import Control.Concurrent import System.Exit qualified as Exit -import Control.Concurrent.Async - -instance (MonadIO m) => HandleMethod m RpcDie where +instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcDie where handleMethod _ = do + RPC2Context{..} <- getRpcContext @PeerAPI debug $ "rpc.die: exiting" void $ liftIO $ do - pause @'Seconds 0.5 >> Exit.exitSuccess + killThread rpcSelf