diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index d091876d..7ba173b3 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1203,7 +1203,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) -- FIXME: new-download-loop - peerThread "blockDownloadLoop" (blockDownloadLoop denv) + -- peerThread "blockDownloadLoop" (blockDownloadLoop denv) peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 5f3a8560..4fe06e8f 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -55,6 +55,8 @@ import PeerInfo import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont import Control.Concurrent.STM (flushTQueue,retry) +import Data.Map qualified as Map +import Data.Map (Map) import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.IntMap qualified as IntMap @@ -169,6 +171,105 @@ queryBlockSizeFromPeer cache e h peer = do Right x -> pure (Right x) +data BurstMachine = + BurstMachine + { _buTimeout :: Double + , _buBurstMax :: Int + , _buStepUp :: Double + , _buStepDown :: Double + , _buCurrent :: TVar Double + , _buErrors :: TVar Int + } + +burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m () +burstMachineAddErrors BurstMachine{..} n = + atomically $ modifyTVar _buErrors (+n) + +newBurstMachine :: MonadUnliftIO m + => Double -- ^ timeout + -> Int -- ^ max burst + -> Maybe Int -- ^ start burst + -> Double -- ^ step up + -> Double -- ^ step down + -> m BurstMachine + +newBurstMachine t0 buMax buStart up' down' = do + BurstMachine t0 buMax up down + <$> newTVarIO bu0 + <*> newTVarIO 0 + + where + bu0 = realToFrac $ fromMaybe (max 2 (buMax `div` 2)) buStart + down = min 0.85 down' + up = min 0.5 up' + +getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int +getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round + +runBurstMachine :: MonadUnliftIO m + => BurstMachine + -> m () + +runBurstMachine BurstMachine{..} = do + + pause @'Seconds ( 0.5 * 2.71 ) + + bu0 <- readTVarIO _buCurrent <&> realToFrac + let buMax = realToFrac _buBurstMax + let down = _buStepDown + let up = _buStepUp + + _dEdT <- newTVarIO 0.00 + + _rates <- newTVarIO (mempty :: Map Double Double) + + _buMaxReal <- newTVarIO buMax + + flip runContT pure do + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 10) + + atomically do + e <- headDef bu0 . Map.elems <$> readTVar _rates + writeTVar _rates mempty + modifyTVar _buMaxReal (max e) + -- writeTVar _buCurrent e + + void $ ContT $ withAsync do + forever do + pause @'Seconds 600 + atomically $ writeTVar _buMaxReal buMax + + forever do + + e1 <- readTVarIO _buErrors + + let dt = realToFrac _buTimeout + + pause @'Seconds dt + + atomically do + + e2 <- readTVar _buErrors + current <- readTVar _buCurrent + buMaxReal <- readTVar _buMaxReal + + let new = if e2 > e1 then + max 2.0 (current * (1.0 - down)) + else + min buMaxReal (current * (1.0 + up)) + + writeTVar _buErrors 0 + writeTVar _buCurrent new + + let dedt = realToFrac (e2 - e1) / realToFrac dt + + writeTVar _dEdT (realToFrac dedt) + + modifyTVar _rates ( Map.insertWith max dedt current ) + data S = SInit | SFetchQ @@ -198,6 +299,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do q <- newTQueueIO qq <- newTQueueIO + bm <- newBurstMachine 3.00 256 (Just bu0) 0.05 0.10 + flip runContT pure do ContT $ withAsync $ forever do @@ -208,6 +311,8 @@ downloadFromPeerRec t bu0 cache env h0 peer = do void $ queryBlockSizeFromPeer cache env h peer pause @'Seconds 1.5 + ContT $ withAsync $ runBurstMachine bm + flip fix SInit $ \next -> \case SInit -> do @@ -226,6 +331,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do pe <- isEmptyTQueue p qe <- isEmptyTQueue q when (qe && not pe) retry + -- when (not pe) retry pure qe if done then @@ -239,13 +345,15 @@ downloadFromPeerRec t bu0 cache env h0 peer = do Just bs -> next (SFetchPost h bs) Nothing -> none - w <- lift $ downloadFromPeer t bu0 cache env (coerce h) peer + bu <- lift $ getCurrentBurst bm + w <- lift $ downloadFromPeer t bu cache env (coerce h) peer case w of Right bs -> do next (SFetchPost h bs) Left e -> do + lift $ burstMachineAddErrors bm 1 err $ "DOWNLOAD ERROR" <+> viaShow e next SFetchQ @@ -335,7 +443,7 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do let watchdog = fix \next -> do s1 <- readTVarIO _sBlockChunks2 <&> IntMap.size - pause @'MilliSeconds ( max (realToFrac chunkN * rtt) 2000 ) + pause @'MilliSeconds ( 1000 ) s2 <- readTVarIO _sBlockChunks2 <&> IntMap.size when (s1 /= s2) next