mirror of https://github.com/voidlizard/hbs2
fucked on high speeds / multiple peers
This commit is contained in:
parent
be2e0f34f3
commit
31da921d62
|
@ -498,8 +498,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
t1 <- getTimeCoarse
|
t1 <- getTimeCoarse
|
||||||
|
|
||||||
atomically do
|
atomically do
|
||||||
wx0 <- readTVar _wx
|
-- wx0 <- readTVar _wx
|
||||||
let wx1 = realToFrac (t1 - t0) * 100 / 1e6 -- millis
|
let wx1 = 2 * realToFrac (t1 - t0) * 100 / 1e6 -- millis
|
||||||
writeTVar _wx wx1
|
writeTVar _wx wx1
|
||||||
|
|
||||||
case r of
|
case r of
|
||||||
|
@ -599,7 +599,7 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
debug $ green "New download request" <+> pretty h
|
debug $ green "New download request" <+> pretty h
|
||||||
atomically do
|
atomically do
|
||||||
modifyTVar sizeRq (HPSQ.insert (HashRef h) now ())
|
modifyTVar sizeRq (HPSQ.insert (HashRef h) now ())
|
||||||
modifyTVar wip (HM.insert (HashRef h) 10)
|
modifyTVar wip (HM.insert (HashRef h) 30)
|
||||||
|
|
||||||
let onBlockSize p h s = do
|
let onBlockSize p h s = do
|
||||||
|
|
||||||
|
@ -654,8 +654,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
BlockFetchError -> do
|
BlockFetchError -> do
|
||||||
now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
|
||||||
atomically $ modifyTVar sizeRq (HPSQ.insert h now ())
|
atomically do
|
||||||
atomically $ modifyTVar choosen (HS.delete h)
|
modifyTVar sizeRq (HPSQ.insert h now ())
|
||||||
|
modifyTVar choosen (HS.delete h)
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList
|
let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList
|
||||||
|
@ -672,27 +673,37 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
|
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
|
||||||
polling (Polling 1 1) blkz $ \h -> do
|
polling (Polling 1 1) blkz $ \h -> do
|
||||||
|
|
||||||
debug $ "CLEANUP BLOCK" <+> pretty h
|
debug $ "CLEANUP BLOCK" <+> pretty h
|
||||||
here <- hasBlock sto (coerce h) <&> isJust
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
|
|
||||||
when here $ do
|
when here $ do
|
||||||
liftIO $ deleteBlockFromWip h
|
liftIO $ deleteBlockFromWip h
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever do
|
||||||
|
pause @'Seconds 10
|
||||||
|
w <- readTVarIO downWip <&> HPSQ.size
|
||||||
|
when (w == 0) do
|
||||||
|
atomically $ writeTVar choosen mempty
|
||||||
|
|
||||||
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 300) do
|
||||||
|
atomically $ writeTVar fuckup mempty
|
||||||
|
|
||||||
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
|
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
|
||||||
atomically do
|
atomically do
|
||||||
w <- readTVar wip
|
w <- readTVar wip
|
||||||
cs <- readTVar sizeCache <&> HM.toList
|
cs <- readTVar sizeCache <&> HM.toList
|
||||||
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ])
|
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ])
|
||||||
|
|
||||||
-- ContT $ withAsync do
|
ContT $ withAsync do
|
||||||
-- let blkz = readTVarIO wip <&> HM.toList
|
let blkz = readTVarIO wip <&> HM.toList
|
||||||
-- polling (Polling 1 1) blkz $ \h -> do
|
polling (Polling 1 1) blkz $ \h -> do
|
||||||
-- debug $ "POLL BLOCK" <+> pretty h
|
debug $ "POLL BLOCK" <+> pretty h
|
||||||
-- atomically $ modifyTVar wip (HM.adjust (*1.10) h)
|
atomically $ modifyTVar wip (HM.adjust (*1.10) h)
|
||||||
-- here <- hasBlock sto (coerce h) <&> isJust
|
here <- hasBlock sto (coerce h) <&> isJust
|
||||||
-- now <- getTimeCoarse
|
now <- getTimeCoarse
|
||||||
-- unless here $ atomically do
|
unless here $ atomically do
|
||||||
-- modifyTVar sizeRq (HPSQ.insert h now ())
|
modifyTVar sizeRq (HPSQ.insert h now ())
|
||||||
-- modifyTVar choosen (HS.delete h)
|
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
(h,bs) <- atomically $ readTQueue parseQ
|
(h,bs) <- atomically $ readTQueue parseQ
|
||||||
|
@ -737,12 +748,14 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
dw <- readTVar downWip
|
dw <- readTVar downWip
|
||||||
|
|
||||||
let total = [ x | x@((p,_),_,_) <- L.take 16 (HPSQ.toList dw), HM.member p peers ]
|
let total = [ x | x@((p,_),_,_) <- L.take 10 (HPSQ.toList dw), HM.member p peers ]
|
||||||
|
|
||||||
|
when (L.null total) retry
|
||||||
|
|
||||||
let queue = total
|
let queue = total
|
||||||
|
|
||||||
let qq = [ (h, HS.singleton p)
|
let qq = [ (h, HS.singleton p)
|
||||||
| ((p,h),_,_) <- queue, not (HS.member h already), HM.member p peers
|
| ((p,h),_,_) <- queue, not (HS.member h already)
|
||||||
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
|
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
|
||||||
|
|
||||||
when (L.null qq) retry
|
when (L.null qq) retry
|
||||||
|
@ -788,7 +801,6 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
|
|
||||||
where
|
where
|
||||||
|
|
||||||
|
|
||||||
manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
|
||||||
debug "MANAGE THREADS"
|
debug "MANAGE THREADS"
|
||||||
|
|
||||||
|
@ -813,12 +825,13 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
peerThread stat p work = flip runContT pure do
|
peerThread stat p work = flip runContT pure do
|
||||||
|
|
||||||
btimes <- newTVarIO ( mempty :: [Double] )
|
btimes <- newTVarIO ( mempty :: [Double] )
|
||||||
|
_avg <- newTVarIO 600
|
||||||
|
|
||||||
sto <- withPeerM env getStorage
|
sto <- withPeerM env getStorage
|
||||||
|
|
||||||
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
|
||||||
|
|
||||||
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10
|
bm <- liftIO $ newBurstMachine 0.35 256 (Just 50) 0.01 0.15
|
||||||
|
|
||||||
void $ ContT $ bracket none $ const do
|
void $ ContT $ bracket none $ const do
|
||||||
debug $ "Cancelling thread for" <+> pretty p
|
debug $ "Cancelling thread for" <+> pretty p
|
||||||
|
@ -831,7 +844,9 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
tss <- readTVarIO btimes
|
tss <- readTVarIO btimes
|
||||||
unless (L.null tss) do
|
unless (L.null tss) do
|
||||||
let avg = sum tss / realToFrac (L.length tss)
|
let avg = sum tss / realToFrac (L.length tss)
|
||||||
atomically $ modifyTVar stat (HM.insert p avg)
|
atomically do
|
||||||
|
modifyTVar stat (HM.insert p avg)
|
||||||
|
writeTVar _avg avg
|
||||||
|
|
||||||
twork <- ContT $ withAsync $ forever do
|
twork <- ContT $ withAsync $ forever do
|
||||||
w <- atomically $ readTQueue work
|
w <- atomically $ readTQueue work
|
||||||
|
@ -856,16 +871,22 @@ downloadDispatcher brains env = flip runContT pure do
|
||||||
bu <- lift $ getCurrentBurst bm
|
bu <- lift $ getCurrentBurst bm
|
||||||
|
|
||||||
t0 <- getTimeCoarse
|
t0 <- getTimeCoarse
|
||||||
r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p
|
r <- lift $ downloadFromPeer (TimeoutSec 60) bu (KnownSize s) env (coerce h) p
|
||||||
t1 <- getTimeCoarse
|
t1 <- getTimeCoarse
|
||||||
|
|
||||||
case r of
|
case r of
|
||||||
Right bs -> do
|
Right bs -> do
|
||||||
let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9
|
let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9
|
||||||
|
|
||||||
|
avg <- readTVarIO _avg
|
||||||
|
|
||||||
|
when (dtsec > avg) do
|
||||||
|
burstMachineAddErrors bm 1
|
||||||
|
|
||||||
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
|
||||||
liftIO $ answ (BlockFetched bs)
|
liftIO $ answ (BlockFetched bs)
|
||||||
|
|
||||||
Left{} | i >= 5 -> liftIO $ answ BlockFetchError
|
Left{} | i >= 2 -> liftIO $ answ BlockFetchError
|
||||||
| otherwise -> next (succ i)
|
| otherwise -> next (succ i)
|
||||||
|
|
||||||
bs <- ContT $ withAsync $ forever do
|
bs <- ContT $ withAsync $ forever do
|
||||||
|
|
Loading…
Reference in New Issue