mirror of https://github.com/voidlizard/hbs2
somehow works
This commit is contained in:
parent
a3a954bed0
commit
6762c7497d
|
@ -1203,7 +1203,7 @@ runPeer opts = respawnOnError opts $ do
|
||||||
peerThread "pexLoop" (pexLoop @e brains tcp)
|
peerThread "pexLoop" (pexLoop @e brains tcp)
|
||||||
|
|
||||||
-- FIXME: new-download-loop
|
-- FIXME: new-download-loop
|
||||||
peerThread "blockDownloadLoop" (blockDownloadLoop denv)
|
-- peerThread "blockDownloadLoop" (blockDownloadLoop denv)
|
||||||
|
|
||||||
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)
|
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,8 @@ import PeerInfo
|
||||||
import Control.Monad.Trans.Maybe
|
import Control.Monad.Trans.Maybe
|
||||||
import Control.Monad.Trans.Cont
|
import Control.Monad.Trans.Cont
|
||||||
import Control.Concurrent.STM (flushTQueue,retry)
|
import Control.Concurrent.STM (flushTQueue,retry)
|
||||||
|
import Data.Map qualified as Map
|
||||||
|
import Data.Map (Map)
|
||||||
import Data.HashSet (HashSet)
|
import Data.HashSet (HashSet)
|
||||||
import Data.HashSet qualified as HS
|
import Data.HashSet qualified as HS
|
||||||
import Data.IntMap qualified as IntMap
|
import Data.IntMap qualified as IntMap
|
||||||
|
@ -169,6 +171,105 @@ queryBlockSizeFromPeer cache e h peer = do
|
||||||
Right x -> pure (Right x)
|
Right x -> pure (Right x)
|
||||||
|
|
||||||
|
|
||||||
|
data BurstMachine =
|
||||||
|
BurstMachine
|
||||||
|
{ _buTimeout :: Double
|
||||||
|
, _buBurstMax :: Int
|
||||||
|
, _buStepUp :: Double
|
||||||
|
, _buStepDown :: Double
|
||||||
|
, _buCurrent :: TVar Double
|
||||||
|
, _buErrors :: TVar Int
|
||||||
|
}
|
||||||
|
|
||||||
|
burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
|
||||||
|
burstMachineAddErrors BurstMachine{..} n =
|
||||||
|
atomically $ modifyTVar _buErrors (+n)
|
||||||
|
|
||||||
|
newBurstMachine :: MonadUnliftIO m
|
||||||
|
=> Double -- ^ timeout
|
||||||
|
-> Int -- ^ max burst
|
||||||
|
-> Maybe Int -- ^ start burst
|
||||||
|
-> Double -- ^ step up
|
||||||
|
-> Double -- ^ step down
|
||||||
|
-> m BurstMachine
|
||||||
|
|
||||||
|
newBurstMachine t0 buMax buStart up' down' = do
|
||||||
|
BurstMachine t0 buMax up down
|
||||||
|
<$> newTVarIO bu0
|
||||||
|
<*> newTVarIO 0
|
||||||
|
|
||||||
|
where
|
||||||
|
bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart
|
||||||
|
down = min 0.85 down'
|
||||||
|
up = min 0.5 up'
|
||||||
|
|
||||||
|
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
|
||||||
|
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
|
||||||
|
|
||||||
|
runBurstMachine :: MonadUnliftIO m
|
||||||
|
=> BurstMachine
|
||||||
|
-> m ()
|
||||||
|
|
||||||
|
runBurstMachine BurstMachine{..} = do
|
||||||
|
|
||||||
|
pause @'Seconds ( 0.5 * 2.71 )
|
||||||
|
|
||||||
|
bu0 <- readTVarIO _buCurrent <&> realToFrac
|
||||||
|
let buMax = realToFrac _buBurstMax
|
||||||
|
let down = _buStepDown
|
||||||
|
let up = _buStepUp
|
||||||
|
|
||||||
|
_dEdT <- newTVarIO 0.00
|
||||||
|
|
||||||
|
_rates <- newTVarIO (mempty :: Map Double Double)
|
||||||
|
|
||||||
|
_buMaxReal <- newTVarIO buMax
|
||||||
|
|
||||||
|
flip runContT pure do
|
||||||
|
|
||||||
|
void $ ContT $ withAsync do
|
||||||
|
forever do
|
||||||
|
pause @'Seconds (realToFrac _buTimeout * 10)
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
e <- headDef bu0 . Map.elems <$> readTVar _rates
|
||||||
|
writeTVar _rates mempty
|
||||||
|
modifyTVar _buMaxReal (max e)
|
||||||
|
-- writeTVar _buCurrent e
|
||||||
|
|
||||||
|
void $ ContT $ withAsync do
|
||||||
|
forever do
|
||||||
|
pause @'Seconds 600
|
||||||
|
atomically $ writeTVar _buMaxReal buMax
|
||||||
|
|
||||||
|
forever do
|
||||||
|
|
||||||
|
e1 <- readTVarIO _buErrors
|
||||||
|
|
||||||
|
let dt = realToFrac _buTimeout
|
||||||
|
|
||||||
|
pause @'Seconds dt
|
||||||
|
|
||||||
|
atomically do
|
||||||
|
|
||||||
|
e2 <- readTVar _buErrors
|
||||||
|
current <- readTVar _buCurrent
|
||||||
|
buMaxReal <- readTVar _buMaxReal
|
||||||
|
|
||||||
|
let new = if e2 > e1 then
|
||||||
|
max 2.0 (current * (1.0 - down))
|
||||||
|
else
|
||||||
|
min buMaxReal (current * (1.0 + up))
|
||||||
|
|
||||||
|
writeTVar _buErrors 0
|
||||||
|
writeTVar _buCurrent new
|
||||||
|
|
||||||
|
let dedt = realToFrac (e2 - e1) / realToFrac dt
|
||||||
|
|
||||||
|
writeTVar _dEdT (realToFrac dedt)
|
||||||
|
|
||||||
|
modifyTVar _rates ( Map.insertWith max dedt current )
|
||||||
|
|
||||||
data S =
|
data S =
|
||||||
SInit
|
SInit
|
||||||
| SFetchQ
|
| SFetchQ
|
||||||
|
@ -198,6 +299,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
|
||||||
q <- newTQueueIO
|
q <- newTQueueIO
|
||||||
qq <- newTQueueIO
|
qq <- newTQueueIO
|
||||||
|
|
||||||
|
bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10
|
||||||
|
|
||||||
flip runContT pure do
|
flip runContT pure do
|
||||||
|
|
||||||
ContT $ withAsync $ forever do
|
ContT $ withAsync $ forever do
|
||||||
|
@ -208,6 +311,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
|
||||||
void $ queryBlockSizeFromPeer cache env h peer
|
void $ queryBlockSizeFromPeer cache env h peer
|
||||||
pause @'Seconds 1.5
|
pause @'Seconds 1.5
|
||||||
|
|
||||||
|
ContT $ withAsync $ runBurstMachine bm
|
||||||
|
|
||||||
flip fix SInit $ \next -> \case
|
flip fix SInit $ \next -> \case
|
||||||
|
|
||||||
SInit -> do
|
SInit -> do
|
||||||
|
@ -226,6 +331,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
|
||||||
pe <- isEmptyTQueue p
|
pe <- isEmptyTQueue p
|
||||||
qe <- isEmptyTQueue q
|
qe <- isEmptyTQueue q
|
||||||
when (qe && not pe) retry
|
when (qe && not pe) retry
|
||||||
|
-- when (not pe) retry
|
||||||
pure qe
|
pure qe
|
||||||
|
|
||||||
if done then
|
if done then
|
||||||
|
@ -239,13 +345,15 @@ downloadFromPeerRec t bu0 cache env h0 peer = do
|
||||||
Just bs -> next (SFetchPost h bs)
|
Just bs -> next (SFetchPost h bs)
|
||||||
Nothing -> none
|
Nothing -> none
|
||||||
|
|
||||||
w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer
|
bu <- lift $ getCurrentBurst bm
|
||||||
|
w <- lift $ downloadFromPeer t bu cache env (coerce h) peer
|
||||||
|
|
||||||
case w of
|
case w of
|
||||||
Right bs -> do
|
Right bs -> do
|
||||||
next (SFetchPost h bs)
|
next (SFetchPost h bs)
|
||||||
|
|
||||||
Left e -> do
|
Left e -> do
|
||||||
|
lift $ burstMachineAddErrors bm 1
|
||||||
err $ "DOWNLOAD ERROR" <+> viaShow e
|
err $ "DOWNLOAD ERROR" <+> viaShow e
|
||||||
next SFetchQ
|
next SFetchQ
|
||||||
|
|
||||||
|
@ -335,7 +443,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do
|
||||||
|
|
||||||
let watchdog = fix \next -> do
|
let watchdog = fix \next -> do
|
||||||
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
||||||
pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 )
|
pause @'MilliSeconds ( 1000 )
|
||||||
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size
|
||||||
when (s1 /= s2) next
|
when (s1 /= s2) next
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue