somehow works

This commit is contained in:
voidlizard 2024-11-06 09:46:33 +03:00
parent 9eaca26b45
commit f11c17718f
2 changed files with 111 additions and 3 deletions

View File

@ -1203,7 +1203,7 @@ runPeer opts = respawnOnError opts $ do
peerThread "pexLoop" (pexLoop @e brains tcp)
-- FIXME: new-download-loop
peerThread "blockDownloadLoop" (blockDownloadLoop denv)
-- peerThread "blockDownloadLoop" (blockDownloadLoop denv)
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)

View File

@ -55,6 +55,8 @@ import PeerInfo
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map
import Data.Map (Map)
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap
@ -169,6 +171,105 @@ queryBlockSizeFromPeer cache e h peer = do
Right x -> pure (Right x)
data BurstMachine =
BurstMachine
{ _buTimeout :: Double
, _buBurstMax :: Int
, _buStepUp :: Double
, _buStepDown :: Double
, _buCurrent :: TVar Double
, _buErrors :: TVar Int
}
burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n)
newBurstMachine :: MonadUnliftIO m
=> Double -- ^ timeout
-> Int -- ^ max burst
-> Maybe Int -- ^ start burst
-> Double -- ^ step up
-> Double -- ^ step down
-> m BurstMachine
newBurstMachine t0 buMax buStart up' down' = do
BurstMachine t0 buMax up down
<$> newTVarIO bu0
<*> newTVarIO 0
where
bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart
down = min 0.85 down'
up = min 0.5 up'
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
runBurstMachine :: MonadUnliftIO m
=> BurstMachine
-> m ()
runBurstMachine BurstMachine{..} = do
pause @'Seconds ( 0.5 * 2.71 )
bu0 <- readTVarIO _buCurrent <&> realToFrac
let buMax = realToFrac _buBurstMax
let down = _buStepDown
let up = _buStepUp
_dEdT <- newTVarIO 0.00
_rates <- newTVarIO (mempty :: Map Double Double)
_buMaxReal <- newTVarIO buMax
flip runContT pure do
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 10)
atomically do
e <- headDef bu0 . Map.elems <$> readTVar _rates
writeTVar _rates mempty
modifyTVar _buMaxReal (max e)
-- writeTVar _buCurrent e
void $ ContT $ withAsync do
forever do
pause @'Seconds 600
atomically $ writeTVar _buMaxReal buMax
forever do
e1 <- readTVarIO _buErrors
let dt = realToFrac _buTimeout
pause @'Seconds dt
atomically do
e2 <- readTVar _buErrors
current <- readTVar _buCurrent
buMaxReal <- readTVar _buMaxReal
let new = if e2 > e1 then
max 2.0 (current * (1.0 - down))
else
min buMaxReal (current * (1.0 + up))
writeTVar _buErrors 0
writeTVar _buCurrent new
let dedt = realToFrac (e2 - e1) / realToFrac dt
writeTVar _dEdT (realToFrac dedt)
modifyTVar _rates ( Map.insertWith max dedt current )
data S =
SInit
| SFetchQ
@ -198,6 +299,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
q <- newTQueueIO
qq <- newTQueueIO
bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10
flip runContT pure do
ContT $ withAsync $ forever do
@ -208,6 +311,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
void $ queryBlockSizeFromPeer cache env h peer
pause @'Seconds 1.5
ContT $ withAsync $ runBurstMachine bm
flip fix SInit $ \next -> \case
SInit -> do
@ -226,6 +331,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
pe <- isEmptyTQueue p
qe <- isEmptyTQueue q
when (qe && not pe) retry
-- when (not pe) retry
pure qe
if done then
@ -239,13 +345,15 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
Just bs -> next (SFetchPost h bs)
Nothing -> none
w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer
bu <- lift $ getCurrentBurst bm
w <- lift $ downloadFromPeer t bu cache env (coerce h) peer
case w of
Right bs -> do
next (SFetchPost h bs)
Left e -> do
lift $ burstMachineAddErrors bm 1
err $ "DOWNLOAD ERROR" <+> viaShow e
next SFetchQ
@ -335,7 +443,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
let watchdog = fix \next -> do
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 )
pause @'MilliSeconds ( 1000 )
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
when (s1 /= s2) next