This commit is contained in:
voidlizard 2024-11-14 17:18:48 +03:00
parent 2334645944
commit ee02bbab51
1 changed files with 26 additions and 11 deletions

View File

@ -468,12 +468,14 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
let bursts = calcBursts bu chunkNums
_wx <- newTVarIO 1000
callCC $ \exit2 -> do
_wx <- newTVarIO 10000 -- waity
for_ bursts $ \(i,chunkN) -> do
let parts = [i .. i + chunkN-1]
wx <- readTVarIO _wx
flip fix 0 \again n -> do
@ -482,7 +484,8 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
lift $ request peer req
_num <- newTVarIO 0
_num <- atomically do
readTVar _sBlockChunks2 <&> IntMap.size >>= newTVar
let w0 = 2.0 :: Timeout 'MilliSeconds
@ -495,11 +498,11 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
t0 <- getTimeCoarse
r <- liftIO $ pause w0 >> race watchdog do
r <- liftIO $ race watchdog do
atomically do
pieces <- readTVar _sBlockChunks2
writeTVar _num ( IntMap.size pieces )
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
let done = and [ IntMap.member j pieces | j <- parts ]
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
t1 <- getTimeCoarse
@ -507,15 +510,22 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
atomically do
when (isRight r) do
let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
let wx1 = max 1000 (100 * realToFrac nano / 1e6) -- millis
let wx1 = 100 * realToFrac nano / 1e6 -- millis
writeTVar _wx wx1
case r of
Left{} -> do
if n < 2 then do
w <- readTVarIO _wx
debug $ red "Retry" <+> pretty w <+> pretty i <+> pretty chunkN <+> pretty h <+> pretty peer
again (succ n)
pieces <- readTVarIO _sBlockChunks2
let missed = IntMap.difference pieces (IntMap.fromList [ (j,()) | j <- parts ] )
debug $ red "Retry" <+> pretty w
<+> pretty (length missed)
<+> pretty h
<+> pretty peer
if L.null missed then none else again (succ n)
else do
exit2 (Left $ DownloadStuckError (HashRef h) peer)
@ -679,7 +689,7 @@ downloadDispatcher brains env = flip runContT pure do
pure (j,ssem)
unless here do
a <- async (peerThread sem i onBlock p wip)
a <- async (peerThread pts sem onBlock p wip)
atomically $ modifyTVar pts (HM.insert p (a,nonce))
loosers <- atomically do
@ -704,7 +714,7 @@ downloadDispatcher brains env = flip runContT pure do
atomically do
modifyTVar _psem (HM.delete nonce)
peerThread sem me onBlock p wip = flip runContT pure do
peerThread pts sem onBlock p wip = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] )
@ -779,6 +789,7 @@ downloadDispatcher brains env = flip runContT pure do
what <- atomically do
e <- readTVar _errors
peerNum <- readTVar pts <&> HM.size
if e > 5 then
pure Nothing
@ -786,7 +797,7 @@ downloadDispatcher brains env = flip runContT pure do
-- TSem.waitTSem sem
wpsize <- readTVar wip <&> HM.size
let trsh = if wpsize < 10 then 3 else 0
let trsh = if wpsize < peerNum then max 3 (peerNum `div` 2) else 0
blocks <- readTVar wip
@ -805,6 +816,7 @@ downloadDispatcher brains env = flip runContT pure do
if busy > trsh || down || absent then
retry
else do
modifyTVar dcbBusy succ
pure (Just (h,dcb))
case what of
@ -856,6 +868,9 @@ downloadDispatcher brains env = flip runContT pure do
bu <- liftIO $ getCurrentBurst bm
withPeerM env $ do
request p (GetBlockSize @e (coerce hx))
t0 <- getTimeCoarse
r <- downloadFromPeer bu (KnownSize size) env (coerce hx) p
t1 <- getTimeCoarse