This commit is contained in:
voidlizard 2024-11-14 08:14:14 +03:00
parent fe444bb7f7
commit b767792d77
1 changed files with 39 additions and 8 deletions

View File

@ -155,6 +155,7 @@ queryBlockSizeFromPeer cache e h peer = do
data BurstMachine = data BurstMachine =
BurstMachine BurstMachine
{ _buTimeout :: Double { _buTimeout :: Double
, _buStart :: Double
, _buBurstMax :: Int , _buBurstMax :: Int
, _buStepUp :: Double , _buStepUp :: Double
, _buStepDown :: Double , _buStepDown :: Double
@ -166,6 +167,12 @@ burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
burstMachineAddErrors BurstMachine{..} n = burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n) atomically $ modifyTVar _buErrors (+n)
burstMachineReset :: MonadUnliftIO m => BurstMachine -> m ()
burstMachineReset BurstMachine{..} = do
atomically do
writeTVar _buCurrent _buStart
writeTVar _buErrors 0
newBurstMachine :: MonadUnliftIO m newBurstMachine :: MonadUnliftIO m
=> Double -- ^ timeout => Double -- ^ timeout
-> Int -- ^ max burst -> Int -- ^ max burst
@ -175,7 +182,7 @@ newBurstMachine :: MonadUnliftIO m
-> m BurstMachine -> m BurstMachine
newBurstMachine t0 buMax buStart up' down' = do newBurstMachine t0 buMax buStart up' down' = do
BurstMachine t0 buMax up down BurstMachine t0 bu0 buMax up down
<$> newTVarIO bu0 <$> newTVarIO bu0
<*> newTVarIO 0 <*> newTVarIO 0
@ -187,6 +194,7 @@ newBurstMachine t0 buMax buStart up' down' = do
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
runBurstMachine :: MonadUnliftIO m runBurstMachine :: MonadUnliftIO m
=> BurstMachine => BurstMachine
-> m () -> m ()
@ -652,10 +660,20 @@ downloadDispatcher brains env = flip runContT pure do
loosers <- atomically do loosers <- atomically do
xs <- readTVar pts <&> HM.toList xs <- readTVar pts <&> HM.toList
-- FIXME: filter-stopped-tasks
let (alive,dead) = L.partition ( \(x,_) -> HM.member x peers ) xs writeTVar pts mempty
writeTVar pts (HM.fromList alive)
pure dead loo <- newTQueue
for_ xs $ \e@(h,v@(a,_)) -> do
running <- isNothing <$> pollSTM a
let here = HM.member h peers
if running && here then do
modifyTVar pts (HM.insert h v)
else
writeTQueue loo e
flushTQueue loo
for_ loosers $ \(p, (a, nonce)) -> do for_ loosers $ \(p, (a, nonce)) -> do
cancel a cancel a
@ -668,6 +686,7 @@ downloadDispatcher brains env = flip runContT pure do
_errors <- newTVarIO 0 _errors <- newTVarIO 0
_avg <- newTVarIO 600 _avg <- newTVarIO 600
_blknum <- newTVarIO 0
_sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) )
@ -681,10 +700,21 @@ downloadDispatcher brains env = flip runContT pure do
debug $ yellow "Started thread for" <+> pretty p debug $ yellow "Started thread for" <+> pretty p
-- TODO: sweep size cache! ContT $ withAsync $ forever do
ContT $ withAsync $ forever $ (>> pause @'Seconds 60) do b0 <- readTVarIO _blknum
pause @'Seconds 60
atomically $ writeTVar _errors 0 atomically $ writeTVar _errors 0
b1 <- readTVarIO _blknum
when (b0 == b1) do
debug $ blue "Reset burst machine" <+> pretty p
burstMachineReset bm
s <- readTVarIO wip <&> HM.size
when (s == 0) do
atomically $ writeTVar _sizeCache mempty
tsweep <- ContT $ withAsync do tsweep <- ContT $ withAsync do
let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys let hashes = readTVarIO _sizeCache <&> fmap (,60) . HM.keys
@ -737,7 +767,7 @@ downloadDispatcher brains env = flip runContT pure do
modifyTVar r (HPSQ.insert h eps dcb) modifyTVar r (HPSQ.insert h eps dcb)
s <- readTVar r <&> HPSQ.size s <- readTVar r <&> HPSQ.size
if s >= 8 then pure () else loop xs if s >= 4 then pure () else loop xs
w <- readTVar r <&> HPSQ.findMin w <- readTVar r <&> HPSQ.findMin
case w of case w of
@ -812,6 +842,7 @@ downloadDispatcher brains env = flip runContT pure do
atomically do atomically do
modifyTVar btimes ( take 100 . (dtsec :) ) modifyTVar btimes ( take 100 . (dtsec :) )
writeTVar (dcbDownloaded dcb) True writeTVar (dcbDownloaded dcb) True
modifyTVar _blknum succ
onBlock hx onBlock hx
go (PReleaseBlock hx dcb True) go (PReleaseBlock hx dcb True)