From 73d9f7ec2c5c1d1ebd051c31338cf1709cab1896 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 6 Nov 2024 11:47:56 +0300 Subject: [PATCH] better --- hbs2-core/lib/HBS2/Prelude.hs | 13 +++++++ hbs2-peer/app/PeerInfo.hs | 11 ------ hbs2-peer/app/RPC2.hs | 67 ++++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 27 deletions(-) diff --git a/hbs2-core/lib/HBS2/Prelude.hs b/hbs2-core/lib/HBS2/Prelude.hs index d8ba546b..b58ee4b0 100644 --- a/hbs2-core/lib/HBS2/Prelude.hs +++ b/hbs2-core/lib/HBS2/Prelude.hs @@ -34,6 +34,7 @@ module HBS2.Prelude , newSimpleProbe , whenTrue, whenFalse , dontHandle + , median ) where import HBS2.Clock @@ -58,6 +59,7 @@ import Data.Functor import Data.Char qualified as Char import Data.Text qualified as Text import Data.Text (Text) +import Data.List qualified as List import Data.Hashable import Data.HashMap.Strict(HashMap) import Data.HashMap.Strict qualified as HM @@ -256,3 +258,14 @@ instance Probe SimpleProbe where dontHandle :: Applicative f => a -> f () dontHandle = const $ pure () +-- | Compute the median of a list +median :: (Ord a, Integral a) => [a] -> Maybe a +median [] = Nothing +median xs = Just + if odd n + then sorted !! half + else ((sorted !! (half - 1)) + (sorted !! half)) `div` 2 + where n = length xs + sorted = List.sort xs + half = n `div` 2 + diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index f2c5940f..8594390f 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -45,17 +45,6 @@ instance HasCfgKey PeerPingIntervalKey (Maybe Integer) where key = "ping-interval" --- | Compute the median of a list -median :: (Ord a, Integral a) => [a] -> Maybe a -median [] = Nothing -median xs = Just - if odd n - then sorted !! half - else ((sorted !! (half - 1)) + (sorted !! half)) `div` 2 - where n = length xs - sorted = List.sort xs - half = n `div` 2 - -- | Get the median RTT for a given peer. medianPeerRTT :: MonadIO m => PeerInfo e -> m (Maybe Integer) medianPeerRTT pinfo = do diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 89c0abe6..0a6c4496 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -212,7 +212,6 @@ runBurstMachine :: MonadUnliftIO m runBurstMachine BurstMachine{..} = do - pause @'Seconds ( 0.5 * 2.71 ) bu0 <- readTVarIO _buCurrent <&> realToFrac let buMax = realToFrac _buBurstMax @@ -225,6 +224,8 @@ runBurstMachine BurstMachine{..} = do _buMaxReal <- newTVarIO buMax + pause @'Seconds (realToFrac _buTimeout) + flip runContT pure do void $ ContT $ withAsync do @@ -234,30 +235,46 @@ runBurstMachine BurstMachine{..} = do atomically do e <- headDef bu0 . Map.elems <$> readTVar _rates writeTVar _rates mempty + nrates <- readTVar _rates <&> take 100 . Map.toList + writeTVar _rates (Map.fromList nrates) modifyTVar _buMaxReal (max e) - -- writeTVar _buCurrent e void $ ContT $ withAsync do forever do pause @'Seconds 600 atomically $ writeTVar _buMaxReal buMax + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 2.0) + ddt <- readTVarIO _dEdT + + when (ddt <= 0) do + atomically do + buMaxReal <- readTVar _buMaxReal + current <- readTVar _buCurrent + let new = min buMaxReal (current * (1.0 + up)) + writeTVar _buCurrent new + flip fix 0 $ \next e1 -> do let dt = realToFrac _buTimeout - pause @'Seconds dt - eNew <- atomically do e2 <- readTVar _buErrors current <- readTVar _buCurrent - buMaxReal <- readTVar _buMaxReal - let new = if e2 > e1 then - max 2.0 (current * (1.0 - down)) + new <- if e2 > e1 then do + let d = max 2.0 (current * (1.0 - down)) + nrates <- readTVar _rates <&> drop 2 . Map.toList + let newFucked = maybe d snd (headMay nrates) + writeTVar _rates (Map.fromList nrates) + pure newFucked + else - min buMaxReal (current * (1.0 + up)) + pure current -- $ min buMaxReal (current * (1.0 + up)) writeTVar _buErrors 0 writeTVar _buCurrent new @@ -270,6 +287,7 @@ runBurstMachine BurstMachine{..} = do pure e2 + pause @'Seconds dt next eNew data S = @@ -300,8 +318,9 @@ downloadFromPeerRec t bu0 cache env h0 peer = do p <- newTQueueIO q <- newTQueueIO qq <- newTQueueIO + toq <- newTVarIO ( mempty :: [Int] ) - bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10 + bm <- newBurstMachine 0.5 256 (Just bu0) 0.05 0.10 flip runContT pure do @@ -313,6 +332,13 @@ downloadFromPeerRec t bu0 cache env h0 peer = do void $ queryBlockSizeFromPeer cache env h peer pause @'Seconds 1.5 + ContT $ withAsync $ flip fix 10000000 $ \next m0 -> do + txs <- readTVarIO toq <&> L.take 1000 + let m1 = fromMaybe m0 $ median txs + when ( m1 > m0 ) $ burstMachineAddErrors bm 1 + pause @'Seconds 5 + next m1 + ContT $ withAsync $ runBurstMachine bm flip fix SInit $ \next -> \case @@ -348,7 +374,12 @@ downloadFromPeerRec t bu0 cache env h0 peer = do Nothing -> none bu <- lift $ getCurrentBurst bm + + t0 <- getTimeCoarse w <- lift $ downloadFromPeer t bu cache env (coerce h) peer + t1 <- getTimeCoarse + let dt = toMicroSeconds $ TimeoutTS (t1 - t0) + atomically $ modifyTVar toq ( dt : ) case w of Right bs -> do @@ -403,10 +434,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do rtt <- liftIO $ medianPeerRTT pinfo <&> fmap ((*1) . realToFrac) - <&> fromMaybe 0 + <&> fromMaybe 1000 <&> (/1e6) - let wait = 1000 + let waity = 10 * rtt sto <- getStorage @@ -437,17 +468,21 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do for_ bursts $ \(i,chunkN) -> do - atomically $ flushTQueue chuQ + -- atomically $ flushTQueue chuQ let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) lift $ request peer req let watchdog = fix \next -> do - s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size - pause @'MilliSeconds ( 1000 ) - s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size - when (s1 /= s2) next + r <- race (pause @'MilliSeconds waity) do + void $ atomically $ readTQueue chuQ + either (const none) (const next) r + -- next + -- s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size + -- pause @'MilliSeconds 1000 + -- s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size + -- when (s1 /= s2) next r <- liftIO $ race watchdog do