This commit is contained in:
voidlizard 2024-11-14 08:14:14 +03:00
parent 3653eff6b0
commit 0bf4071340
1 changed files with 39 additions and 8 deletions

View File

@ -155,6 +155,7 @@ queryBlockSizeFromPeer cache e h peer = do
data BurstMachine =
BurstMachine
{ _buTimeout :: Double
, _buStart :: Double
, _buBurstMax :: Int
, _buStepUp :: Double
, _buStepDown :: Double
@ -166,6 +167,12 @@ burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n)
burstMachineReset :: MonadUnliftIO m => BurstMachine -> m ()
burstMachineReset BurstMachine{..} = do
atomically do
writeTVar _buCurrent _buStart
writeTVar _buErrors 0
newBurstMachine :: MonadUnliftIO m
=> Double -- ^ timeout
-> Int -- ^ max burst
@ -175,7 +182,7 @@ newBurstMachine :: MonadUnliftIO m
-> m BurstMachine
newBurstMachine t0 buMax buStart up' down' = do
BurstMachine t0 buMax up down
BurstMachine t0 bu0 buMax up down
<$> newTVarIO bu0
<*> newTVarIO 0
@ -187,6 +194,7 @@ newBurstMachine t0 buMax buStart up' down' = do
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
runBurstMachine :: MonadUnliftIO m
=> BurstMachine
-> m ()
@ -652,10 +660,20 @@ downloadDispatcher brains env = flip runContT pure do
loosers <- atomically do
xs <- readTVar pts <&> HM.toList
-- FIXME: filter-stopped-tasks
let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs
writeTVar pts (HM.fromList alive)
pure dead
writeTVar pts mempty
loo <- newTQueue
for_ xs $ \e@(h,v@(a,_)) -> do
running <- isNothing <$> pollSTM a
let here = HM.member h peers
if running && here then do
modifyTVar pts (HM.insert h v)
else
writeTQueue loo e
flushTQueue loo
for_ loosers $ \(p, (a, nonce)) -> do
cancel a
@ -668,6 +686,7 @@ downloadDispatcher brains env = flip runContT pure do
_errors <- newTVarIO 0
_avg <- newTVarIO 600
_blknum <- newTVarIO 0
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
@ -681,10 +700,21 @@ downloadDispatcher brains env = flip runContT pure do
debug $ yellow "Started thread for" <+> pretty p
-- TODO: sweep size cache!
ContT $ withAsync $ forever do
ContT $ withAsync $ forever $ (>> pause @'Seconds 60) do
b0 <- readTVarIO _blknum
pause @'Seconds 60
atomically $ writeTVar _errors 0
b1 <- readTVarIO _blknum
when (b0 == b1) do
debug $ blue "Reset burst machine" <+> pretty p
burstMachineReset bm
s <- readTVarIO wip <&> HM.size
when (s == 0) do
atomically $ writeTVar _sizeCache mempty
tsweep <- ContT $ withAsync do
let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys
@ -737,7 +767,7 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar r (HPSQ.insert h eps dcb)
s <- readTVar r <&> HPSQ.size
if s >= 8 then pure () else loop xs
if s >= 4 then pure () else loop xs
w <- readTVar r <&> HPSQ.findMin
case w of
@ -812,6 +842,7 @@ downloadDispatcher brains env = flip runContT pure do
atomically do
modifyTVar btimes ( take 100 . (dtsec :) )
writeTVar (dcbDownloaded dcb) True
modifyTVar _blknum succ
onBlock hx
go (PReleaseBlock hx dcb True)