fucked on high speeds / multiple peers

This commit is contained in:
voidlizard 2024-11-11 19:38:40 +03:00
parent fe88522200
commit e84693b247
1 changed files with 43 additions and 22 deletions

View File

@ -498,8 +498,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
t1 <- getTimeCoarse t1 <- getTimeCoarse
atomically do atomically do
wx0 <- readTVar _wx -- wx0 <- readTVar _wx
let wx1 = realToFrac (t1 - t0) * 100 / 1e6 -- millis let wx1 = 2 * realToFrac (t1 - t0) * 100 / 1e6 -- millis
writeTVar _wx wx1 writeTVar _wx wx1
case r of case r of
@ -599,7 +599,7 @@ downloadDispatcher brains env = flip runContT pure do
debug $ green "New download request" <+> pretty h debug $ green "New download request" <+> pretty h
atomically do atomically do
modifyTVar sizeRq (HPSQ.insert (HashRef h) now ()) modifyTVar sizeRq (HPSQ.insert (HashRef h) now ())
modifyTVar wip (HM.insert (HashRef h) 10) modifyTVar wip (HM.insert (HashRef h) 30)
let onBlockSize p h s = do let onBlockSize p h s = do
@ -654,8 +654,9 @@ downloadDispatcher brains env = flip runContT pure do
BlockFetchError -> do BlockFetchError -> do
now <- getTimeCoarse now <- getTimeCoarse
debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p
atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) atomically do
atomically $ modifyTVar choosen (HS.delete h) modifyTVar sizeRq (HPSQ.insert h now ())
modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList
@ -672,27 +673,37 @@ downloadDispatcher brains env = flip runContT pure do
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
let blkz = readTVarIO wip <&> fmap (,10) . HM.keys let blkz = readTVarIO wip <&> fmap (,10) . HM.keys
polling (Polling 1 1) blkz $ \h -> do polling (Polling 1 1) blkz $ \h -> do
debug $ "CLEANUP BLOCK" <+> pretty h debug $ "CLEANUP BLOCK" <+> pretty h
here <- hasBlock sto (coerce h) <&> isJust here <- hasBlock sto (coerce h) <&> isJust
when here $ do when here $ do
liftIO $ deleteBlockFromWip h liftIO $ deleteBlockFromWip h
ContT $ withAsync $ forever do
pause @'Seconds 10
w <- readTVarIO downWip <&> HPSQ.size
when (w == 0) do
atomically $ writeTVar choosen mempty
ContT $ withAsync $ forever $ (>> pause @'Seconds 300) do
atomically $ writeTVar fuckup mempty
ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do
atomically do atomically do
w <- readTVar wip w <- readTVar wip
cs <- readTVar sizeCache <&> HM.toList cs <- readTVar sizeCache <&> HM.toList
writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ]) writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ])
-- ContT $ withAsync do ContT $ withAsync do
-- let blkz = readTVarIO wip <&> HM.toList let blkz = readTVarIO wip <&> HM.toList
-- polling (Polling 1 1) blkz $ \h -> do polling (Polling 1 1) blkz $ \h -> do
-- debug $ "POLL BLOCK" <+> pretty h debug $ "POLL BLOCK" <+> pretty h
-- atomically $ modifyTVar wip (HM.adjust (*1.10) h) atomically $ modifyTVar wip (HM.adjust (*1.10) h)
-- here <- hasBlock sto (coerce h) <&> isJust here <- hasBlock sto (coerce h) <&> isJust
-- now <- getTimeCoarse now <- getTimeCoarse
-- unless here $ atomically do unless here $ atomically do
-- modifyTVar sizeRq (HPSQ.insert h now ()) modifyTVar sizeRq (HPSQ.insert h now ())
-- modifyTVar choosen (HS.delete h)
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
(h,bs) <- atomically $ readTQueue parseQ (h,bs) <- atomically $ readTQueue parseQ
@ -737,12 +748,14 @@ downloadDispatcher brains env = flip runContT pure do
dw <- readTVar downWip dw <- readTVar downWip
let total = [ x | x@((p,_),_,_) <- L.take 16 (HPSQ.toList dw), HM.member p peers ] let total = [ x | x@((p,_),_,_) <- L.take 10 (HPSQ.toList dw), HM.member p peers ]
when (L.null total) retry
let queue = total let queue = total
let qq = [ (h, HS.singleton p) let qq = [ (h, HS.singleton p)
| ((p,h),_,_) <- queue, not (HS.member h already), HM.member p peers | ((p,h),_,_) <- queue, not (HS.member h already)
] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>)
when (L.null qq) retry when (L.null qq) retry
@ -788,7 +801,6 @@ downloadDispatcher brains env = flip runContT pure do
where where
manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do
debug "MANAGE THREADS" debug "MANAGE THREADS"
@ -813,12 +825,13 @@ downloadDispatcher brains env = flip runContT pure do
peerThread stat p work = flip runContT pure do peerThread stat p work = flip runContT pure do
btimes <- newTVarIO ( mempty :: [Double] ) btimes <- newTVarIO ( mempty :: [Double] )
_avg <- newTVarIO 600
sto <- withPeerM env getStorage sto <- withPeerM env getStorage
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 bm <- liftIO $ newBurstMachine 0.35 256 (Just 50) 0.01 0.15
void $ ContT $ bracket none $ const do void $ ContT $ bracket none $ const do
debug $ "Cancelling thread for" <+> pretty p debug $ "Cancelling thread for" <+> pretty p
@ -831,7 +844,9 @@ downloadDispatcher brains env = flip runContT pure do
tss <- readTVarIO btimes tss <- readTVarIO btimes
unless (L.null tss) do unless (L.null tss) do
let avg = sum tss / realToFrac (L.length tss) let avg = sum tss / realToFrac (L.length tss)
atomically $ modifyTVar stat (HM.insert p avg) atomically do
modifyTVar stat (HM.insert p avg)
writeTVar _avg avg
twork <- ContT $ withAsync $ forever do twork <- ContT $ withAsync $ forever do
w <- atomically $ readTQueue work w <- atomically $ readTQueue work
@ -856,16 +871,22 @@ downloadDispatcher brains env = flip runContT pure do
bu <- lift $ getCurrentBurst bm bu <- lift $ getCurrentBurst bm
t0 <- getTimeCoarse t0 <- getTimeCoarse
r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p r <- lift $ downloadFromPeer (TimeoutSec 60) bu (KnownSize s) env (coerce h) p
t1 <- getTimeCoarse t1 <- getTimeCoarse
case r of case r of
Right bs -> do Right bs -> do
let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9 let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9
avg <- readTVarIO _avg
when (dtsec > avg) do
burstMachineAddErrors bm 1
atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) atomically $ modifyTVar btimes ( take 100 . (dtsec :) )
liftIO $ answ (BlockFetched bs) liftIO $ answ (BlockFetched bs)
Left{} | i >= 5 -> liftIO $ answ BlockFetchError Left{} | i >= 2 -> liftIO $ answ BlockFetchError
| otherwise -> next (succ i) | otherwise -> next (succ i)
bs <- ContT $ withAsync $ forever do bs <- ContT $ withAsync $ forever do