fixed some issues

This commit is contained in:
voidlizard 2025-03-18 18:20:18 +03:00
parent 3843066d83
commit 2564a05bce
5 changed files with 74 additions and 42 deletions

View File

@ -174,6 +174,7 @@ library
, network-multicast
, network-simple
, network-byte-order
, psqueues
, prettyprinter
, prettyprinter-ansi-terminal
, mwc-random

View File

@ -29,6 +29,8 @@ import Data.ByteString.Lazy (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HM
import Data.HashPSQ (HashPSQ)
import Data.HashPSQ qualified as HPSQ
import Data.HashSet qualified as HS
import Data.Maybe
import Data.Word
@ -66,7 +68,7 @@ data MessagingTCP =
, _tcpPeerSocket :: TVar (HashMap (Peer L4Proto) Socket)
, _tcpConnDemand :: TQueue (Peer L4Proto)
, _tcpReceived :: TBQueue (Peer L4Proto, ByteString)
, _tcpSent :: TVar (HashMap (Peer L4Proto) (TBQueue ByteString))
, _tcpSent :: TVar (HashPSQ (Peer L4Proto) TimeSpec (TBQueue ByteString))
, _tcpClientThreadNum :: TVar Int
, _tcpClientThreads :: TVar (HashMap Int (Async ()))
, _tcpServerThreadsCount :: TVar Int
@ -108,7 +110,7 @@ newMessagingTCP pa = liftIO do
<*> newTVarIO mempty
<*> newTQueueIO
<*> newTBQueueIO (10 * outMessageQLen)
<*> newTVarIO mempty
<*> newTVarIO HPSQ.empty
<*> newTVarIO 0
<*> newTVarIO mempty
<*> newTVarIO 0
@ -121,19 +123,22 @@ instance Messaging MessagingTCP L4Proto ByteString where
-- let _own = tcpOwnPeer
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p
now <- getTimeCoarse
queue <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup p
q' <- readTVar _tcpSent <&> HPSQ.lookup p
case q' of
Nothing -> do
writeTQueue _tcpConnDemand p
q <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HM.insert p q)
modifyTVar _tcpSent (HPSQ.insert p now q)
pure q
Just q -> pure q
Just (_,q) -> pure q
atomically $ writeTBQueueDropSTM 10 queue msg
atomically $ stateTVar _tcpSent (HPSQ.alter (\x -> ((), fmap (set _1 now) x)) p)
-- atomically $ insert
-- debug $ "!!!! FUCKING SEND TO" <+> pretty p <+> "DONE"
@ -240,18 +245,23 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
S.yield ("tcpPeerCookie", fromIntegral $ HM.size coo)
S.yield ("tcpPeerCookieUsed", cooNn)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HM.size)
S.yield =<< ( readTVarIO _tcpSent <&> ("tcpSent",) . fromIntegral . HPSQ.size)
sweepCookies <- ContT $ withAsync $ forever do
pause @'Seconds 120
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
-- alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
-- modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
pause @'Seconds 300
atomically do
pips <- readTVar _tcpPeerConn
modifyTVar _tcpPeerToCookie (HM.filterWithKey (\k _ -> HM.member k pips))
alive <- readTVar _tcpPeerToCookie <&> HS.fromList . HM.elems
modifyTVar _tcpPeerCookie (HM.filterWithKey (\k _ -> HS.member k alive))
sweep <- ContT $ withAsync $ forever do
pause @'Seconds 120
pause @'Seconds 300
now <- getTimeCoarse
atomically do
w <- readTVar _tcpSent <&> HPSQ.toList
let live = [ x | x@(_,t,_) <- w, realToFrac (now - t) / 1e9 < 300 ]
writeTVar _tcpSent (HPSQ.fromList live)
-- atomically do
-- pips <- readTVar _tcpPeerConn
-- modifyTVar _tcpSent (HM.filterWithKey (\k _ -> HM.member k pips))
@ -325,16 +335,20 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
atomically $ modifyTVar _tcpServerThreadsCount succ
newOutQ <- do
atomically do
mbQ <- readTVar _tcpSent <&> HM.lookup newP
maybe (newTBQueue outMessageQLen) pure mbQ
now <- getTimeCoarse
newOutQ <- atomically do
q <- readTVar _tcpSent <&> HPSQ.lookup newP >>= \case
Just (_,w) -> pure w
Nothing -> do
nq <- newTBQueue outMessageQLen
modifyTVar _tcpSent (HPSQ.insert newP now nq)
pure nq
atomically do
modifyTVar _tcpSent (HM.insert newP newOutQ)
modifyTVar _tcpPeerConn (HM.insert newP (connectionId myCookie cookie))
modifyTVar _tcpPeerSocket (HM.insert newP so)
pure q
wr <- ContT $ withAsync $ forever do
bs <- atomically $ readTBQueue newOutQ
@ -363,12 +377,11 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
cancel wr
atomically do
modifyTVar _tcpSent (HM.delete newP)
modifyTVar _tcpSent (HPSQ.delete newP)
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
void $ waitAnyCatchCancel [rd,wr]
runClient = flip runContT pure do
let myCookie = view tcpCookie env
@ -433,7 +446,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
wr <- ContT $ withAsync $ forever do
bss <- atomically do
q' <- readTVar _tcpSent <&> HM.lookup who
q' <- readTVar _tcpSent <&> fmap (view _2) . HPSQ.lookup who
maybe1 q' mempty $ \q -> do
s <- readTBQueue q
sx <- flushTBQueue q
@ -458,6 +471,7 @@ runMessagingTCP env@MessagingTCP{..} = liftIO do
modifyTVar _tcpPeerCookie (HM.update killCookie cookie)
modifyTVar _tcpPeerToCookie (HM.delete who)
modifyTVar _tcpPeerSocket (HM.delete who)
modifyTVar _tcpSent (HPSQ.delete who)
void $ ContT $ bracket none (const $ cancel wr)

View File

@ -234,22 +234,17 @@ peerPingLoop (PeerConfig syn) penv = do
forever do
-- FIXME: defaults
r <- liftIO $ race (pause @'Seconds pingTime)
(atomically $ readTQueue wake)
sas' <- liftIO $ atomically $ STM.flushTQueue wake <&> mconcat
let sas = case r of
Left{} -> sas'
Right sa -> sa <> sas'
debug "peerPingLoop"
pips <- knownPeers @e pl <&> (<> sas) <&> List.nub
let pips = do
sas <- liftIO (atomically $ STM.flushTQueue wake <&> mconcat)
rest <- knownPeers @e pl
pure (fmap (,realToFrac pingTime) (List.nub $ sas <> rest))
for_ pips $ \p -> do
-- trace $ "SEND PING TO" <+> pretty p
lift $ sendPing @e p
-- trace $ "SENT PING TO" <+> pretty p
polling (Polling 2.5 2) pips $ \p -> do
liftIO $ withPeerM penv do
debug $ "SEND PING TO" <+> pretty p
try @_ @SomeException (sendPing @e p) >>= \case
Left e -> err (viaShow e)
Right{} -> none

View File

@ -252,6 +252,7 @@ runCLI = do
<*> hsubparser (
command "init" (info pInit (progDesc "creates default config"))
<> command "run" (info pRun (progDesc "run peer"))
<> command "start" (info pRunStart (progDesc "run peer"))
<> command "poke" (info pPoke (progDesc "poke peer by rpc"))
<> command "die" (info pDie (progDesc "die cmd"))
<> command "announce" (info pAnnounce (progDesc "announce block"))
@ -306,7 +307,26 @@ runCLI = do
pPubKeySign = maybeReader (fromStringMay @(PubKey 'Sign 'HBS2Basic))
pRun = do
pRun = pure do
self <- getExecutablePath
args' <- getArgs
print (self, args')
let args = "start" : L.dropWhile (=="run") args'
flip runContT pure $ fix \next -> do
pid <- ContT $ withAsync do
liftIO (executeFile self False args Nothing)
void $ waitCatch pid
liftIO $ putStrLn "hbs2-peer run stopped/terminated; respawning"
pause @'Seconds 3
next
pRunStart = do
runPeer <$> common
pDie = do

View File

@ -1,6 +1,8 @@
{-# Language UndecidableInstances #-}
module RPC2.Die where
import HBS2.Prelude.Plated
import HBS2.Peer.RPC.Internal.Types
import HBS2.Clock
import HBS2.Net.Proto.Service
@ -8,16 +10,16 @@ import HBS2.System.Logger.Simple
import HBS2.Peer.RPC.API.Peer
import Control.Concurrent
import System.Exit qualified as Exit
import Control.Concurrent.Async
instance (MonadIO m) => HandleMethod m RpcDie where
instance (MonadIO m, HasRpcContext PeerAPI RPC2Context m) => HandleMethod m RpcDie where
handleMethod _ = do
RPC2Context{..} <- getRpcContext @PeerAPI
debug $ "rpc.die: exiting"
void $ liftIO $ do
pause @'Seconds 0.5 >> Exit.exitSuccess
killThread rpcSelf