diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 984adc5a..6a65d93e 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -155,6 +155,7 @@ queryBlockSizeFromPeer cache e h peer = do data BurstMachine = BurstMachine { _buTimeout :: Double + , _buStart :: Double , _buBurstMax :: Int , _buStepUp :: Double , _buStepDown :: Double @@ -166,6 +167,12 @@ burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m () burstMachineAddErrors BurstMachine{..} n = atomically $ modifyTVar _buErrors (+n) +burstMachineReset :: MonadUnliftIO m => BurstMachine -> m () +burstMachineReset BurstMachine{..} = do + atomically do + writeTVar _buCurrent _buStart + writeTVar _buErrors 0 + newBurstMachine :: MonadUnliftIO m => Double -- ^ timeout -> Int -- ^ max burst @@ -175,7 +182,7 @@ newBurstMachine :: MonadUnliftIO m -> m BurstMachine newBurstMachine t0 buMax buStart up' down' = do - BurstMachine t0 buMax up down + BurstMachine t0 bu0 buMax up down <$> newTVarIO bu0 <*> newTVarIO 0 @@ -187,6 +194,7 @@ newBurstMachine t0 buMax buStart up' down' = do getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round + runBurstMachine :: MonadUnliftIO m => BurstMachine -> m () @@ -652,10 +660,20 @@ downloadDispatcher brains env = flip runContT pure do loosers <- atomically do xs <- readTVar pts <&> HM.toList - -- FIXME: filter-stopped-tasks - let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs - writeTVar pts (HM.fromList alive) - pure dead + + writeTVar pts mempty + + loo <- newTQueue + + for_ xs $ \e@(h,v@(a,_)) -> do + running <- isNothing <$> pollSTM a + let here = HM.member h peers + if running && here then do + modifyTVar pts (HM.insert h v) + else + writeTQueue loo e + + flushTQueue loo for_ loosers $ \(p, (a, nonce)) -> do cancel a @@ -668,6 +686,7 @@ downloadDispatcher brains env = flip runContT pure do _errors <- newTVarIO 0 _avg <- newTVarIO 600 + _blknum <- newTVarIO 0 _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) @@ -681,10 +700,21 @@ downloadDispatcher brains env = flip runContT pure do debug $ yellow "Started thread for" <+> pretty p - -- TODO: sweep size cache! + ContT $ withAsync $ forever do - ContT $ withAsync $ forever $ (>> pause @'Seconds 60) do + b0 <- readTVarIO _blknum + pause @'Seconds 60 atomically $ writeTVar _errors 0 + b1 <- readTVarIO _blknum + + when (b0 == b1) do + debug $ blue "Reset burst machine" <+> pretty p + burstMachineReset bm + + s <- readTVarIO wip <&> HM.size + + when (s == 0) do + atomically $ writeTVar _sizeCache mempty tsweep <- ContT $ withAsync do let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys @@ -737,7 +767,7 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar r (HPSQ.insert h eps dcb) s <- readTVar r <&> HPSQ.size - if s >= 8 then pure () else loop xs + if s >= 4 then pure () else loop xs w <- readTVar r <&> HPSQ.findMin case w of @@ -812,6 +842,7 @@ downloadDispatcher brains env = flip runContT pure do atomically do modifyTVar btimes ( take 100 . (dtsec :) ) writeTVar (dcbDownloaded dcb) True + modifyTVar _blknum succ onBlock hx go (PReleaseBlock hx dcb True)