mirror of https://github.com/voidlizard/hbs2
better
This commit is contained in:
parent
1d2aa791d2
commit
7322424f5e
|
@ -468,12 +468,14 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
let bursts = calcBursts bu chunkNums
|
let bursts = calcBursts bu chunkNums
|
||||||
|
|
||||||
|
_wx <- newTVarIO 1000
|
||||||
|
|
||||||
callCC $ \exit2 -> do
|
callCC $ \exit2 -> do
|
||||||
|
|
||||||
_wx <- newTVarIO 10000 -- waity
|
|
||||||
|
|
||||||
for_ bursts $ \(i,chunkN) -> do
|
for_ bursts $ \(i,chunkN) -> do
|
||||||
|
|
||||||
|
let parts = [i .. i + chunkN-1]
|
||||||
|
|
||||||
wx <- readTVarIO _wx
|
wx <- readTVarIO _wx
|
||||||
|
|
||||||
flip fix 0 \again n -> do
|
flip fix 0 \again n -> do
|
||||||
|
@ -482,7 +484,8 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
lift $ request peer req
|
lift $ request peer req
|
||||||
|
|
||||||
_num <- newTVarIO 0
|
_num <- atomically do
|
||||||
|
readTVar _sBlockChunks2 <&> IntMap.size >>= newTVar
|
||||||
|
|
||||||
let w0 = 2.0 :: Timeout 'MilliSeconds
|
let w0 = 2.0 :: Timeout 'MilliSeconds
|
||||||
|
|
||||||
|
@ -495,11 +498,11 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
t0 <- getTimeCoarse
|
t0 <- getTimeCoarse
|
||||||
|
|
||||||
r <- liftIO $ pause w0 >> race watchdog do
|
r <- liftIO $ race watchdog do
|
||||||
atomically do
|
atomically do
|
||||||
pieces <- readTVar _sBlockChunks2
|
pieces <- readTVar _sBlockChunks2
|
||||||
writeTVar _num ( IntMap.size pieces )
|
writeTVar _num ( IntMap.size pieces )
|
||||||
let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ]
|
let done = and [ IntMap.member j pieces | j <- parts ]
|
||||||
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
|
unless done retry -- $ pause @'MilliSeconds ( 0.25 * rtt ) >> next
|
||||||
|
|
||||||
t1 <- getTimeCoarse
|
t1 <- getTimeCoarse
|
||||||
|
@ -507,15 +510,22 @@ downloadFromPeer bu cache env h peer = liftIO $ withPeerM env do
|
||||||
atomically do
|
atomically do
|
||||||
when (isRight r) do
|
when (isRight r) do
|
||||||
let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
|
let nano = toNanoSeconds $ TimeoutTS (t1 - t0)
|
||||||
let wx1 = max 1000 (100 * realToFrac nano / 1e6) -- millis
|
let wx1 = 100 * realToFrac nano / 1e6 -- millis
|
||||||
writeTVar _wx wx1
|
writeTVar _wx wx1
|
||||||
|
|
||||||
case r of
|
case r of
|
||||||
Left{} -> do
|
Left{} -> do
|
||||||
if n < 2 then do
|
if n < 2 then do
|
||||||
w <- readTVarIO _wx
|
w <- readTVarIO _wx
|
||||||
debug $ red "Retry" <+> pretty w <+> pretty i <+> pretty chunkN <+> pretty h <+> pretty peer
|
pieces <- readTVarIO _sBlockChunks2
|
||||||
again (succ n)
|
let missed = IntMap.difference pieces (IntMap.fromList [ (j,()) | j <- parts ] )
|
||||||
|
|
||||||
|
debug $ red "Retry" <+> pretty w
|
||||||
|
<+> pretty (length missed)
|
||||||
|
<+> pretty h
|
||||||
|
<+> pretty peer
|
||||||
|
|
||||||
|
if L.null missed then none else again (succ n)
|
||||||
else do
|
else do
|
||||||
exit2 (Left $ DownloadStuckError (HashRef h) peer)
|
exit2 (Left $ DownloadStuckError (HashRef h) peer)
|
||||||
|
|
||||||
|
@ -679,7 +689,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
pure (j,ssem)
|
pure (j,ssem)
|
||||||
|
|
||||||
unless here do
|
unless here do
|
||||||
a <- async (peerThread sem i onBlock p wip)
|
a <- async (peerThread pts sem onBlock p wip)
|
||||||
atomically $ modifyTVar pts (HM.insert p (a,nonce))
|
atomically $ modifyTVar pts (HM.insert p (a,nonce))
|
||||||
|
|
||||||
loosers <- atomically do
|
loosers <- atomically do
|
||||||
|
@ -704,7 +714,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar _psem (HM.delete nonce)
|
modifyTVar _psem (HM.delete nonce)
|
||||||
|
|
||||||
peerThread sem me onBlock p wip = flip runContT pure do
|
peerThread pts sem onBlock p wip = flip runContT pure do
|
||||||
|
|
||||||
btimes <- newTVarIO ( mempty :: [Double] )
|
btimes <- newTVarIO ( mempty :: [Double] )
|
||||||
|
|
||||||
|
@ -779,6 +789,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
what <- atomically do
|
what <- atomically do
|
||||||
e <- readTVar _errors
|
e <- readTVar _errors
|
||||||
|
peerNum <- readTVar pts <&> HM.size
|
||||||
|
|
||||||
if e > 5 then
|
if e > 5 then
|
||||||
pure Nothing
|
pure Nothing
|
||||||
|
@ -786,7 +797,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
-- TSem.waitTSem sem
|
-- TSem.waitTSem sem
|
||||||
|
|
||||||
wpsize <- readTVar wip <&> HM.size
|
wpsize <- readTVar wip <&> HM.size
|
||||||
let trsh = if wpsize < 10 then 3 else 0
|
let trsh = if wpsize < peerNum then max 3 (peerNum `div` 2) else 0
|
||||||
|
|
||||||
blocks <- readTVar wip
|
blocks <- readTVar wip
|
||||||
|
|
||||||
|
@ -805,6 +816,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
if busy > trsh || down || absent then
|
if busy > trsh || down || absent then
|
||||||
retry
|
retry
|
||||||
else do
|
else do
|
||||||
|
modifyTVar dcbBusy succ
|
||||||
pure (Just (h,dcb))
|
pure (Just (h,dcb))
|
||||||
|
|
||||||
case what of
|
case what of
|
||||||
|
@ -856,6 +868,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
bu <- liftIO $ getCurrentBurst bm
|
bu <- liftIO $ getCurrentBurst bm
|
||||||
|
|
||||||
|
withPeerM env $ do
|
||||||
|
request p (GetBlockSize @e (coerce hx))
|
||||||
|
|
||||||
t0 <- getTimeCoarse
|
t0 <- getTimeCoarse
|
||||||
r <- downloadFromPeer bu (KnownSize size) env (coerce hx) p
|
r <- downloadFromPeer bu (KnownSize size) env (coerce hx) p
|
||||||
t1 <- getTimeCoarse
|
t1 <- getTimeCoarse
|
||||||
|
|
Loading…
Reference in New Issue