much betta

This commit is contained in:
voidlizard 2024-11-14 13:40:46 +03:00
parent 2265544ef7
commit e57acf80a1
1 changed files with 38 additions and 31 deletions

View File

@ -268,7 +268,7 @@ instance MonadUnliftIO m => IsBurstMachine BurstMachine m where
new <- if e2 > e1 then do
let d = max 2.0 (current * (1.0 - down))
nrates <- readTVar _rates <&> drop 1 . Map.toList
nrates <- readTVar _rates <&> drop 2 . Map.toList
let newFucked = maybe d snd (headMay nrates)
writeTVar _rates (Map.fromList nrates)
pure newFucked
@ -475,40 +475,47 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
for_ bursts $ \(i,chunkN) -> do
-- atomically $ flushTQueue chuQ
flip fix 0 \again n -> do
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
wx <- atomically do
void $ flushTQueue chuQ
readTVar _wx
lift $ request peer req
let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN))
t0 <- getTimeCoarse
lift $ request peer req
let watchdog = fix \next -> do
wx <- readTVarIO _wx <&> realToFrac
-- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity
r <- race (pause @'MilliSeconds wx) do
void $ atomically $ readTQueue chuQ
either (const none) (const next) r
t0 <- getTimeCoarse
r <- liftIO $ race watchdog do
atomically do
pieces <- readTVar _sBlockChunks2
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
let watchdog = fix \next -> do
r <- race (pause @'MilliSeconds wx) do
void $ atomically $ readTQueue chuQ
either (const none) (const next) r
atomically $ flushTQueue chuQ
r <- liftIO $ race watchdog do
atomically do
pieces <- readTVar _sBlockChunks2
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
t1 <- getTimeCoarse
t1 <- getTimeCoarse
atomically do
when (isRight r) do
-- wx0 <- readTVar _wx
let wx1 = 20000 -- min 10000 (2.5 * 100 * realToFrac (t1 - t0) / 1e6) -- millis
writeTVar _wx wx1
atomically do
void $ flushTQueue chuQ
when (isRight r) do
-- wx0 <- readTVar _wx
let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
let wx1 = 5 * realToFrac nano / 1e6 -- millis
writeTVar _wx wx1
case r of
Left{} -> exit2 (Left $ DownloadStuckError (HashRef h) peer)
_ -> pure ()
case r of
Left{} -> do
if n < 2 then do
again (succ n)
else do
exit2 (Left $ DownloadStuckError (HashRef h) peer)
_ -> pure ()
blk <- readTVarIO _sBlockChunks2
@ -705,8 +712,8 @@ downloadDispatcher brains env = flip runContT pure do
bm <- liftIO do
case _sockType p of
TCP -> AnyBurstMachine @IO <$> newBurstMachine 30 256 (Just 256) 0.20 0.10
UDP -> AnyBurstMachine @IO <$> newBurstMachine 10 256 (Just 128) 0.20 0.35
TCP -> AnyBurstMachine @IO <$> pure (ConstBurstMachine 256) -- newBurstMachine 60 256 (Just 256) 0.20 0.10
UDP -> AnyBurstMachine @IO <$> newBurstMachine 10 256 (Just 128) 0.05 0.25
void $ ContT $ bracket none $ const do
debug $ "Cancelling thread for" <+> pretty p
@ -772,7 +779,7 @@ downloadDispatcher brains env = flip runContT pure do
if e > 5 then
pure Nothing
else do
TSem.waitTSem sem
-- TSem.waitTSem sem
wpsize <- readTVar wip <&> HM.size
let trsh = if wpsize < 10 then 3 else 0
@ -856,7 +863,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg
when (dtsec > avg * 1.15) do
when (dtsec > avg) do
liftIO $ burstMachineAddErrors bm 1
atomically do
@ -876,7 +883,7 @@ downloadDispatcher brains env = flip runContT pure do
PReleaseBlock hx dcb done -> do
atomically do
TSem.signalTSem sem
-- TSem.signalTSem sem
if not done then do
modifyTVar (dcbBusy dcb) pred
else do