somehow works

This commit is contained in:
voidlizard 2024-11-06 09:46:33 +03:00
parent c4b919b85d
commit 3b7e94bc40
2 changed files with 111 additions and 3 deletions

View File

@ -1203,7 +1203,7 @@ runPeer opts = respawnOnError opts $ do
peerThread "pexLoop" (pexLoop @e brains tcp) peerThread "pexLoop" (pexLoop @e brains tcp)
-- FIXME: new-download-loop -- FIXME: new-download-loop
peerThread "blockDownloadLoop" (blockDownloadLoop denv) -- peerThread "blockDownloadLoop" (blockDownloadLoop denv)
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv) peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)

View File

@ -55,6 +55,8 @@ import PeerInfo
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry) import Control.Concurrent.STM (flushTQueue,retry)
import Data.Map qualified as Map
import Data.Map (Map)
import Data.HashSet (HashSet) import Data.HashSet (HashSet)
import Data.HashSet qualified as HS import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
@ -169,6 +171,105 @@ queryBlockSizeFromPeer cache e h peer = do
Right x -> pure (Right x) Right x -> pure (Right x)
data BurstMachine =
BurstMachine
{ _buTimeout :: Double
, _buBurstMax :: Int
, _buStepUp :: Double
, _buStepDown :: Double
, _buCurrent :: TVar Double
, _buErrors :: TVar Int
}
burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n)
newBurstMachine :: MonadUnliftIO m
=> Double -- ^ timeout
-> Int -- ^ max burst
-> Maybe Int -- ^ start burst
-> Double -- ^ step up
-> Double -- ^ step down
-> m BurstMachine
newBurstMachine t0 buMax buStart up' down' = do
BurstMachine t0 buMax up down
<$> newTVarIO bu0
<*> newTVarIO 0
where
bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart
down = min 0.85 down'
up = min 0.5 up'
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
runBurstMachine :: MonadUnliftIO m
=> BurstMachine
-> m ()
runBurstMachine BurstMachine{..} = do
pause @'Seconds ( 0.5 * 2.71 )
bu0 <- readTVarIO _buCurrent <&> realToFrac
let buMax = realToFrac _buBurstMax
let down = _buStepDown
let up = _buStepUp
_dEdT <- newTVarIO 0.00
_rates <- newTVarIO (mempty :: Map Double Double)
_buMaxReal <- newTVarIO buMax
flip runContT pure do
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 10)
atomically do
e <- headDef bu0 . Map.elems <$> readTVar _rates
writeTVar _rates mempty
modifyTVar _buMaxReal (max e)
-- writeTVar _buCurrent e
void $ ContT $ withAsync do
forever do
pause @'Seconds 600
atomically $ writeTVar _buMaxReal buMax
forever do
e1 <- readTVarIO _buErrors
let dt = realToFrac _buTimeout
pause @'Seconds dt
atomically do
e2 <- readTVar _buErrors
current <- readTVar _buCurrent
buMaxReal <- readTVar _buMaxReal
let new = if e2 > e1 then
max 2.0 (current * (1.0 - down))
else
min buMaxReal (current * (1.0 + up))
writeTVar _buErrors 0
writeTVar _buCurrent new
let dedt = realToFrac (e2 - e1) / realToFrac dt
writeTVar _dEdT (realToFrac dedt)
modifyTVar _rates ( Map.insertWith max dedt current )
data S = data S =
SInit SInit
| SFetchQ | SFetchQ
@ -198,6 +299,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
q <- newTQueueIO q <- newTQueueIO
qq <- newTQueueIO qq <- newTQueueIO
bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10
flip runContT pure do flip runContT pure do
ContT $ withAsync $ forever do ContT $ withAsync $ forever do
@ -208,6 +311,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
void $ queryBlockSizeFromPeer cache env h peer void $ queryBlockSizeFromPeer cache env h peer
pause @'Seconds 1.5 pause @'Seconds 1.5
ContT $ withAsync $ runBurstMachine bm
flip fix SInit $ \next -> \case flip fix SInit $ \next -> \case
SInit -> do SInit -> do
@ -226,6 +331,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
pe <- isEmptyTQueue p pe <- isEmptyTQueue p
qe <- isEmptyTQueue q qe <- isEmptyTQueue q
when (qe && not pe) retry when (qe && not pe) retry
-- when (not pe) retry
pure qe pure qe
if done then if done then
@ -239,13 +345,15 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
Just bs -> next (SFetchPost h bs) Just bs -> next (SFetchPost h bs)
Nothing -> none Nothing -> none
w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer bu <- lift $ getCurrentBurst bm
w <- lift $ downloadFromPeer t bu cache env (coerce h) peer
case w of case w of
Right bs -> do Right bs -> do
next (SFetchPost h bs) next (SFetchPost h bs)
Left e -> do Left e -> do
lift $ burstMachineAddErrors bm 1
err $ "DOWNLOAD ERROR" <+> viaShow e err $ "DOWNLOAD ERROR" <+> viaShow e
next SFetchQ next SFetchQ
@ -335,7 +443,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
let watchdog = fix \next -> do let watchdog = fix \next -> do
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 ) pause @'MilliSeconds ( 1000 )
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
when (s1 /= s2) next when (s1 /= s2) next