diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 100386e1..49bb4f74 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -267,7 +267,7 @@ runBurstMachine BurstMachine{..} = do new <- if e2 > e1 then do let d = max 2.0 (current * (1.0 - down)) - nrates <- readTVar _rates <&> drop 3 . Map.toList + nrates <- readTVar _rates <&> drop 5 . Map.toList let newFucked = maybe d snd (headMay nrates) writeTVar _rates (Map.fromList nrates) pure newFucked @@ -319,7 +319,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do qq <- newTQueueIO toq <- newTVarIO ( mempty :: [Int] ) - bm <- newBurstMachine 0.5 256 (Just bu0) 0.05 0.10 + bm <- newBurstMachine 10 256 (Just bu0) 0.10 0.35 flip runContT pure do @@ -437,7 +437,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do <&> fromMaybe 1000 <&> (/1e6) - let waity = 10 * rtt + let waity = 20 * rtt sto <- getStorage @@ -483,7 +483,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do let watchdog = fix \next -> do wx <- readTVarIO _wx <&> realToFrac -- debug $ "WATCHDOG" <+> pretty wx <+> pretty waity - r <- race (pause @'MilliSeconds (min wx waity)) do + r <- race (pause @'MilliSeconds (max wx waity)) do void $ atomically $ readTQueue chuQ either (const none) (const next) r @@ -499,7 +499,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do atomically do -- wx0 <- readTVar _wx - let wx1 = 2 * realToFrac (t1 - t0) * 100 / 1e6 -- millis + let wx1 = 1.50 * realToFrac (t1 - t0) * 100 / 1e6 -- millis writeTVar _wx wx1 case r of @@ -609,18 +609,19 @@ downloadDispatcher brains env = flip runContT pure do -- let color = if isJust s then green else red -- debug $ color "GOT BLOCK SIZE" <+> pretty h <+> pretty s <+> pretty p - dtt <- randomRIO (-0.01, 0.01) + dtt <- randomRIO (-0.05, 0.05) -- let dtt = 0 here <- hasBlock sto (coerce h) <&> isJust unless here do dt <- readTVarIO stat <&> (*(1+dtt)) . fromMaybe 1.0 . HM.lookup p + let rate = dt atomically do -- blkNum <- stateTVar _blkNum (\x -> (x, succ x)) modifyTVar sizeCache (HM.insert (p,h) s) choo <- readTVar choosen <&> HS.member h maybe1 s none $ \size -> do unless choo do - modifyTVar downWip (HPSQ.insert (p,h) dt size) + modifyTVar downWip (HPSQ.insert (p,h) rate size) parseQ <- newTQueueIO @@ -659,7 +660,7 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do - let blkz = readTVarIO choosen <&> fmap (,30) . HS.toList + let blkz = readTVarIO choosen <&> fmap (,10) . HS.toList polling (Polling 1 1) blkz $ \h -> do here <- hasBlock sto (coerce h) <&> isJust if here then do @@ -704,6 +705,7 @@ downloadDispatcher brains env = flip runContT pure do now <- getTimeCoarse unless here $ atomically do modifyTVar sizeRq (HPSQ.insert h now ()) + modifyTVar choosen (HS.delete h) ContT $ withAsync $ forever do (h,bs) <- atomically $ readTQueue parseQ @@ -748,7 +750,7 @@ downloadDispatcher brains env = flip runContT pure do dw <- readTVar downWip - let total = [ x | x@((p,_),_,_) <- L.take 10 (HPSQ.toList dw), HM.member p peers ] + let total = [ x | x@((p,_),_,_) <- L.take 32 (HPSQ.toList dw), HM.member p peers ] when (L.null total) retry @@ -831,7 +833,7 @@ downloadDispatcher brains env = flip runContT pure do _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) - bm <- liftIO $ newBurstMachine 0.35 256 (Just 50) 0.01 0.15 + bm <- liftIO $ newBurstMachine 5 256 (Just 50) 0.05 0.20 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p @@ -871,7 +873,7 @@ downloadDispatcher brains env = flip runContT pure do bu <- lift $ getCurrentBurst bm t0 <- getTimeCoarse - r <- lift $ downloadFromPeer (TimeoutSec 60) bu (KnownSize s) env (coerce h) p + r <- lift $ downloadFromPeer (TimeoutSec 10) bu (KnownSize s) env (coerce h) p t1 <- getTimeCoarse case r of @@ -886,7 +888,7 @@ downloadDispatcher brains env = flip runContT pure do atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) liftIO $ answ (BlockFetched bs) - Left{} | i >= 2 -> liftIO $ answ BlockFetchError + Left{} | i >= 5 -> liftIO $ answ BlockFetchError | otherwise -> next (succ i) bs <- ContT $ withAsync $ forever do