This commit is contained in:
voidlizard 2024-11-06 11:47:56 +03:00
parent ecf5add30d
commit 73d9f7ec2c
3 changed files with 64 additions and 27 deletions

View File

@ -34,6 +34,7 @@ module HBS2.Prelude
, newSimpleProbe , newSimpleProbe
, whenTrue, whenFalse , whenTrue, whenFalse
, dontHandle , dontHandle
, median
) where ) where
import HBS2.Clock import HBS2.Clock
@ -58,6 +59,7 @@ import Data.Functor
import Data.Char qualified as Char import Data.Char qualified as Char
import Data.Text qualified as Text import Data.Text qualified as Text
import Data.Text (Text) import Data.Text (Text)
import Data.List qualified as List
import Data.Hashable import Data.Hashable
import Data.HashMap.Strict(HashMap) import Data.HashMap.Strict(HashMap)
import Data.HashMap.Strict qualified as HM import Data.HashMap.Strict qualified as HM
@ -256,3 +258,14 @@ instance Probe SimpleProbe where
dontHandle :: Applicative f => a -> f () dontHandle :: Applicative f => a -> f ()
dontHandle = const $ pure () dontHandle = const $ pure ()
-- | Compute the median of a list
median :: (Ord a, Integral a) => [a] -> Maybe a
median [] = Nothing
median xs = Just
if odd n
then sorted !! half
else ((sorted !! (half - 1)) + (sorted !! half)) `div` 2
where n = length xs
sorted = List.sort xs
half = n `div` 2

View File

@ -45,17 +45,6 @@ instance HasCfgKey PeerPingIntervalKey (Maybe Integer) where
key = "ping-interval" key = "ping-interval"
-- | Compute the median of a list
median :: (Ord a, Integral a) => [a] -> Maybe a
median [] = Nothing
median xs = Just
if odd n
then sorted !! half
else ((sorted !! (half - 1)) + (sorted !! half)) `div` 2
where n = length xs
sorted = List.sort xs
half = n `div` 2
-- | Get the median RTT for a given peer. -- | Get the median RTT for a given peer.
medianPeerRTT :: MonadIO m => PeerInfo e -> m (Maybe Integer) medianPeerRTT :: MonadIO m => PeerInfo e -> m (Maybe Integer)
medianPeerRTT pinfo = do medianPeerRTT pinfo = do

View File

@ -212,7 +212,6 @@ runBurstMachine :: MonadUnliftIO m
runBurstMachine BurstMachine{..} = do runBurstMachine BurstMachine{..} = do
pause @'Seconds ( 0.5 * 2.71 )
bu0 <- readTVarIO _buCurrent <&> realToFrac bu0 <- readTVarIO _buCurrent <&> realToFrac
let buMax = realToFrac _buBurstMax let buMax = realToFrac _buBurstMax
@ -225,6 +224,8 @@ runBurstMachine BurstMachine{..} = do
_buMaxReal <- newTVarIO buMax _buMaxReal <- newTVarIO buMax
pause @'Seconds (realToFrac _buTimeout)
flip runContT pure do flip runContT pure do
void $ ContT $ withAsync do void $ ContT $ withAsync do
@ -234,30 +235,46 @@ runBurstMachine BurstMachine{..} = do
atomically do atomically do
e <- headDef bu0 . Map.elems <$> readTVar _rates e <- headDef bu0 . Map.elems <$> readTVar _rates
writeTVar _rates mempty writeTVar _rates mempty
nrates <- readTVar _rates <&> take 100 . Map.toList
writeTVar _rates (Map.fromList nrates)
modifyTVar _buMaxReal (max e) modifyTVar _buMaxReal (max e)
-- writeTVar _buCurrent e
void $ ContT $ withAsync do void $ ContT $ withAsync do
forever do forever do
pause @'Seconds 600 pause @'Seconds 600
atomically $ writeTVar _buMaxReal buMax atomically $ writeTVar _buMaxReal buMax
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 2.0)
ddt <- readTVarIO _dEdT
when (ddt <= 0) do
atomically do
buMaxReal <- readTVar _buMaxReal
current <- readTVar _buCurrent
let new = min buMaxReal (current * (1.0 + up))
writeTVar _buCurrent new
flip fix 0 $ \next e1 -> do flip fix 0 $ \next e1 -> do
let dt = realToFrac _buTimeout let dt = realToFrac _buTimeout
pause @'Seconds dt
eNew <- atomically do eNew <- atomically do
e2 <- readTVar _buErrors e2 <- readTVar _buErrors
current <- readTVar _buCurrent current <- readTVar _buCurrent
buMaxReal <- readTVar _buMaxReal
let new = if e2 > e1 then new <- if e2 > e1 then do
max 2.0 (current * (1.0 - down)) let d = max 2.0 (current * (1.0 - down))
nrates <- readTVar _rates <&> drop 2 . Map.toList
let newFucked = maybe d snd (headMay nrates)
writeTVar _rates (Map.fromList nrates)
pure newFucked
else else
min buMaxReal (current * (1.0 + up)) pure current -- $ min buMaxReal (current * (1.0 + up))
writeTVar _buErrors 0 writeTVar _buErrors 0
writeTVar _buCurrent new writeTVar _buCurrent new
@ -270,6 +287,7 @@ runBurstMachine BurstMachine{..} = do
pure e2 pure e2
pause @'Seconds dt
next eNew next eNew
data S = data S =
@ -300,8 +318,9 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
p <- newTQueueIO p <- newTQueueIO
q <- newTQueueIO q <- newTQueueIO
qq <- newTQueueIO qq <- newTQueueIO
toq <- newTVarIO ( mempty :: [Int] )
bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10 bm <- newBurstMachine 0.5 256 (Just bu0) 0.05 0.10
flip runContT pure do flip runContT pure do
@ -313,6 +332,13 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
void $ queryBlockSizeFromPeer cache env h peer void $ queryBlockSizeFromPeer cache env h peer
pause @'Seconds 1.5 pause @'Seconds 1.5
ContT $ withAsync $ flip fix 10000000 $ \next m0 -> do
txs <- readTVarIO toq <&> L.take 1000
let m1 = fromMaybe m0 $ median txs
when ( m1 > m0 ) $ burstMachineAddErrors bm 1
pause @'Seconds 5
next m1
ContT $ withAsync $ runBurstMachine bm ContT $ withAsync $ runBurstMachine bm
flip fix SInit $ \next -> \case flip fix SInit $ \next -> \case
@ -348,7 +374,12 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
Nothing -> none Nothing -> none
bu <- lift $ getCurrentBurst bm bu <- lift $ getCurrentBurst bm
t0 <- getTimeCoarse
w <- lift $ downloadFromPeer t bu cache env (coerce h) peer w <- lift $ downloadFromPeer t bu cache env (coerce h) peer
t1 <- getTimeCoarse
let dt = toMicroSeconds $ TimeoutTS (t1 - t0)
atomically $ modifyTVar toq ( dt : )
case w of case w of
Right bs -> do Right bs -> do
@ -403,10 +434,10 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
rtt <- liftIO $ medianPeerRTT pinfo rtt <- liftIO $ medianPeerRTT pinfo
<&> fmap ((*1) . realToFrac) <&> fmap ((*1) . realToFrac)
<&> fromMaybe 0 <&> fromMaybe 1000
<&> (/1e6) <&> (/1e6)
let wait = 1000 let waity = 10 * rtt
sto <- getStorage sto <- getStorage
@ -437,17 +468,21 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
for_ bursts $ \(i,chunkN) -> do for_ bursts $ \(i,chunkN) -> do
atomically $ flushTQueue chuQ -- atomically $ flushTQueue chuQ
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
lift $ request peer req lift $ request peer req
let watchdog = fix \next -> do let watchdog = fix \next -> do
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size r <- race (pause @'MilliSeconds waity) do
pause @'MilliSeconds ( 1000 ) void $ atomically $ readTQueue chuQ
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size either (const none) (const next) r
when (s1 /= s2) next -- next
-- s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
-- pause @'MilliSeconds 1000
-- s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
-- when (s1 /= s2) next
r <- liftIO $ race watchdog do r <- liftIO $ race watchdog do