diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index cb4883a9..100386e1 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -498,8 +498,8 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do t1 <- getTimeCoarse atomically do - wx0 <- readTVar _wx - let wx1 = realToFrac (t1 - t0) * 100 / 1e6 -- millis + -- wx0 <- readTVar _wx + let wx1 = 2 * realToFrac (t1 - t0) * 100 / 1e6 -- millis writeTVar _wx wx1 case r of @@ -599,7 +599,7 @@ downloadDispatcher brains env = flip runContT pure do debug $ green "New download request" <+> pretty h atomically do modifyTVar sizeRq (HPSQ.insert (HashRef h) now ()) - modifyTVar wip (HM.insert (HashRef h) 10) + modifyTVar wip (HM.insert (HashRef h) 30) let onBlockSize p h s = do @@ -654,8 +654,9 @@ downloadDispatcher brains env = flip runContT pure do BlockFetchError -> do now <- getTimeCoarse debug $ red "BLOCK DOWNLOAD FAIL" <+> pretty h <+> pretty p - atomically $ modifyTVar sizeRq (HPSQ.insert h now ()) - atomically $ modifyTVar choosen (HS.delete h) + atomically do + modifyTVar sizeRq (HPSQ.insert h now ()) + modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList @@ -672,27 +673,37 @@ downloadDispatcher brains env = flip runContT pure do ContT $ withAsync $ forever do let blkz = readTVarIO wip <&> fmap (,10) . HM.keys polling (Polling 1 1) blkz $ \h -> do + debug $ "CLEANUP BLOCK" <+> pretty h here <- hasBlock sto (coerce h) <&> isJust + when here $ do liftIO $ deleteBlockFromWip h + ContT $ withAsync $ forever do + pause @'Seconds 10 + w <- readTVarIO downWip <&> HPSQ.size + when (w == 0) do + atomically $ writeTVar choosen mempty + + ContT $ withAsync $ forever $ (>> pause @'Seconds 300) do + atomically $ writeTVar fuckup mempty + ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do atomically do w <- readTVar wip cs <- readTVar sizeCache <&> HM.toList writeTVar sizeCache (HM.fromList $ [ x | x@((_,hi),_) <- cs, HM.member hi w ]) --- ContT $ withAsync do --- let blkz = readTVarIO wip <&> HM.toList --- polling (Polling 1 1) blkz $ \h -> do --- debug $ "POLL BLOCK" <+> pretty h --- atomically $ modifyTVar wip (HM.adjust (*1.10) h) --- here <- hasBlock sto (coerce h) <&> isJust --- now <- getTimeCoarse --- unless here $ atomically do --- modifyTVar sizeRq (HPSQ.insert h now ()) --- modifyTVar choosen (HS.delete h) + ContT $ withAsync do + let blkz = readTVarIO wip <&> HM.toList + polling (Polling 1 1) blkz $ \h -> do + debug $ "POLL BLOCK" <+> pretty h + atomically $ modifyTVar wip (HM.adjust (*1.10) h) + here <- hasBlock sto (coerce h) <&> isJust + now <- getTimeCoarse + unless here $ atomically do + modifyTVar sizeRq (HPSQ.insert h now ()) ContT $ withAsync $ forever do (h,bs) <- atomically $ readTQueue parseQ @@ -737,12 +748,14 @@ downloadDispatcher brains env = flip runContT pure do dw <- readTVar downWip - let total = [ x | x@((p,_),_,_) <- L.take 16 (HPSQ.toList dw), HM.member p peers ] + let total = [ x | x@((p,_),_,_) <- L.take 10 (HPSQ.toList dw), HM.member p peers ] + + when (L.null total) retry let queue = total let qq = [ (h, HS.singleton p) - | ((p,h),_,_) <- queue, not (HS.member h already), HM.member p peers + | ((p,h),_,_) <- queue, not (HS.member h already) ] & fmap (over _2 (V.fromList . HS.toList)) . HM.toList . HM.fromListWith (<>) when (L.null qq) retry @@ -788,7 +801,6 @@ downloadDispatcher brains env = flip runContT pure do where - manageThreads stat pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" @@ -813,12 +825,13 @@ downloadDispatcher brains env = flip runContT pure do peerThread stat p work = flip runContT pure do btimes <- newTVarIO ( mempty :: [Double] ) + _avg <- newTVarIO 600 sto <- withPeerM env getStorage _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) - bm <- liftIO $ newBurstMachine 0.5 256 (Just 50) 0.05 0.10 + bm <- liftIO $ newBurstMachine 0.35 256 (Just 50) 0.01 0.15 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p @@ -831,7 +844,9 @@ downloadDispatcher brains env = flip runContT pure do tss <- readTVarIO btimes unless (L.null tss) do let avg = sum tss / realToFrac (L.length tss) - atomically $ modifyTVar stat (HM.insert p avg) + atomically do + modifyTVar stat (HM.insert p avg) + writeTVar _avg avg twork <- ContT $ withAsync $ forever do w <- atomically $ readTQueue work @@ -856,16 +871,22 @@ downloadDispatcher brains env = flip runContT pure do bu <- lift $ getCurrentBurst bm t0 <- getTimeCoarse - r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p + r <- lift $ downloadFromPeer (TimeoutSec 60) bu (KnownSize s) env (coerce h) p t1 <- getTimeCoarse case r of Right bs -> do let dtsec = realToFrac (toNanoSeconds (TimeoutTS (t1 - t0))) / 1e9 + + avg <- readTVarIO _avg + + when (dtsec > avg) do + burstMachineAddErrors bm 1 + atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) liftIO $ answ (BlockFetched bs) - Left{} | i >= 5 -> liftIO $ answ BlockFetchError + Left{} | i >= 2 -> liftIO $ answ BlockFetchError | otherwise -> next (succ i) bs <- ContT $ withAsync $ forever do