diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 67fc06b3..3503ac2a 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -47,6 +47,7 @@ import Data.Vector ((!?)) import Data.ByteString qualified as BS import Data.List qualified as L import Data.Coerce +import Data.Kind import Numeric import UnliftIO import Control.Concurrent.STM.TSem (TSem) @@ -154,6 +155,12 @@ queryBlockSizeFromPeer cache e h peer = do Right x -> pure (Right x) +class Monad m => IsBurstMachine a m where + getCurrentBurst :: a -> m Int + burstMachineAddErrors :: a -> Int -> m () + burstMachineReset :: a -> m () + runBurstMachine :: a -> m () + data BurstMachine = BurstMachine { _buTimeout :: Double @@ -165,15 +172,15 @@ data BurstMachine = , _buErrors :: TVar Int } -burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m () -burstMachineAddErrors BurstMachine{..} n = - atomically $ modifyTVar _buErrors (+n) +data ConstBurstMachine = ConstBurstMachine Int -burstMachineReset :: MonadUnliftIO m => BurstMachine -> m () -burstMachineReset BurstMachine{..} = do - atomically do - writeTVar _buCurrent _buStart - writeTVar _buErrors 0 +data AnyBurstMachine (m :: Type -> Type) = forall a . IsBurstMachine a m => AnyBurstMachine a + +instance MonadIO m => IsBurstMachine (AnyBurstMachine m) m where + getCurrentBurst (AnyBurstMachine a) = getCurrentBurst a + burstMachineAddErrors (AnyBurstMachine a) = burstMachineAddErrors a + burstMachineReset (AnyBurstMachine a) = burstMachineReset a + runBurstMachine (AnyBurstMachine a) = runBurstMachine a newBurstMachine :: MonadUnliftIO m => Double -- ^ timeout @@ -193,92 +200,101 @@ newBurstMachine t0 buMax buStart up' down' = do down = min 0.85 down' up = min 0.5 up' -getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int -getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round +instance MonadUnliftIO m => IsBurstMachine BurstMachine m where + getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round + burstMachineAddErrors BurstMachine{..} n = + atomically $ modifyTVar _buErrors (+n) -runBurstMachine :: MonadUnliftIO m - => BurstMachine - -> m () - -runBurstMachine BurstMachine{..} = do - - - bu0 <- readTVarIO _buCurrent <&> realToFrac - let buMax = realToFrac _buBurstMax - let down = _buStepDown - let up = _buStepUp - - _dEdT <- newTVarIO 0.00 - - _rates <- newTVarIO (mempty :: Map Double Double) - - _buMaxReal <- newTVarIO buMax - - pause @'Seconds (realToFrac _buTimeout) - - flip runContT pure do - - void $ ContT $ withAsync do - forever do - pause @'Seconds (realToFrac _buTimeout * 10) - - atomically do - e <- headDef bu0 . Map.elems <$> readTVar _rates - nrates <- readTVar _rates <&> take 100 . Map.toList - writeTVar _rates (Map.fromList nrates) - modifyTVar _buMaxReal (max e) - - void $ ContT $ withAsync do - forever do - pause @'Seconds 600 - atomically $ writeTVar _buMaxReal buMax - - - void $ ContT $ withAsync do - forever do - pause @'Seconds (realToFrac _buTimeout * 2.0) - ddt <- readTVarIO _dEdT - - when (ddt <= 0) do - atomically do - buMaxReal <- readTVar _buMaxReal - current <- readTVar _buCurrent - let new = min buMaxReal (current * (1.0 + up)) - writeTVar _buCurrent new - - flip fix 0 $ \next e1 -> do - - let dt = realToFrac _buTimeout - - eNew <- atomically do - - e2 <- readTVar _buErrors - current <- readTVar _buCurrent - - new <- if e2 > e1 then do - let d = max 2.0 (current * (1.0 - down)) - nrates <- readTVar _rates <&> drop 1 . Map.toList - let newFucked = maybe d snd (headMay nrates) - writeTVar _rates (Map.fromList nrates) - pure newFucked - - else - pure current -- $ min buMaxReal (current * (1.0 + up)) - + burstMachineReset BurstMachine{..} = do + atomically do + writeTVar _buCurrent _buStart writeTVar _buErrors 0 - writeTVar _buCurrent new - let dedt = realToFrac (e2 - e1) / realToFrac dt + runBurstMachine BurstMachine{..} = do - writeTVar _dEdT (realToFrac dedt) - modifyTVar _rates ( Map.insertWith max dedt current ) + bu0 <- readTVarIO _buCurrent <&> realToFrac + let buMax = realToFrac _buBurstMax + let down = _buStepDown + let up = _buStepUp - pure e2 + _dEdT <- newTVarIO 0.00 - pause @'Seconds dt - next eNew + _rates <- newTVarIO (mempty :: Map Double Double) + + _buMaxReal <- newTVarIO buMax + + pause @'Seconds (realToFrac _buTimeout) + + flip runContT pure do + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 10) + + atomically do + e <- headDef bu0 . Map.elems <$> readTVar _rates + nrates <- readTVar _rates <&> take 100 . Map.toList + writeTVar _rates (Map.fromList nrates) + modifyTVar _buMaxReal (max e) + + void $ ContT $ withAsync do + forever do + pause @'Seconds 600 + atomically $ writeTVar _buMaxReal buMax + + + void $ ContT $ withAsync do + forever do + pause @'Seconds (realToFrac _buTimeout * 2.0) + ddt <- readTVarIO _dEdT + + when (ddt <= 0) do + atomically do + buMaxReal <- readTVar _buMaxReal + current <- readTVar _buCurrent + let new = min buMaxReal (current * (1.0 + up)) + writeTVar _buCurrent new + + flip fix 0 $ \next e1 -> do + + let dt = realToFrac _buTimeout + + eNew <- atomically do + + e2 <- readTVar _buErrors + current <- readTVar _buCurrent + + new <- if e2 > e1 then do + let d = max 2.0 (current * (1.0 - down)) + nrates <- readTVar _rates <&> drop 1 . Map.toList + let newFucked = maybe d snd (headMay nrates) + writeTVar _rates (Map.fromList nrates) + pure newFucked + + else + pure current -- $ min buMaxReal (current * (1.0 + up)) + + writeTVar _buErrors 0 + writeTVar _buCurrent new + + let dedt = realToFrac (e2 - e1) / realToFrac dt + + writeTVar _dEdT (realToFrac dedt) + + modifyTVar _rates ( Map.insertWith max dedt current ) + + pure e2 + + pause @'Seconds dt + next eNew + +instance MonadIO m => IsBurstMachine ConstBurstMachine m where + getCurrentBurst (ConstBurstMachine n) = pure n + burstMachineAddErrors _ _ = none + burstMachineReset _ = none + runBurstMachine _ = forever $ pause @'Seconds 300 data S = SInit @@ -689,8 +705,8 @@ downloadDispatcher brains env = flip runContT pure do bm <- liftIO do case _sockType p of - TCP -> newBurstMachine 20 256 (Just 256) 0.25 0.45 - UDP -> newBurstMachine 3 256 (Just 50) 0.10 0.25 + TCP -> AnyBurstMachine @IO <$> newBurstMachine 30 256 (Just 256) 0.20 0.10 + UDP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 100) 0.15 0.35 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p @@ -706,7 +722,7 @@ downloadDispatcher brains env = flip runContT pure do when (b0 == b1) do debug $ blue "Reset burst machine" <+> pretty p - burstMachineReset bm + liftIO $ burstMachineReset bm s <- readTVarIO wip <&> HM.size @@ -721,7 +737,7 @@ downloadDispatcher brains env = flip runContT pure do unless here do modifyTVar _sizeCache (HM.delete h) - bmt <- ContT $ withAsync $ runBurstMachine bm + bmt <- ContT $ withAsync $ liftIO $ runBurstMachine bm tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do tss <- readTVarIO btimes @@ -734,7 +750,7 @@ downloadDispatcher brains env = flip runContT pure do pinfo' <- lift $ find @e (PeerInfoKey p) id PeerInfo{..} <- ContT $ maybe1 pinfo' none - bu <- lift $ getCurrentBurst bm + bu <- liftIO $ getCurrentBurst bm atomically do erno <- readTVar _errors down <- readTVar _blknum @@ -827,10 +843,10 @@ downloadDispatcher brains env = flip runContT pure do PFetchBlock hx dcb size -> do - bu <- lift $ getCurrentBurst bm + bu <- liftIO $ getCurrentBurst bm t0 <- getTimeCoarse - r <- lift $ downloadFromPeer bu (KnownSize size) env (coerce hx) p + r <- downloadFromPeer bu (KnownSize size) env (coerce hx) p t1 <- getTimeCoarse case r of @@ -841,7 +857,7 @@ downloadDispatcher brains env = flip runContT pure do avg <- readTVarIO _avg when (dtsec > avg * 1.15) do - burstMachineAddErrors bm 1 + liftIO $ burstMachineAddErrors bm 1 atomically do modifyTVar btimes ( take 100 . (dtsec :) ) @@ -852,7 +868,7 @@ downloadDispatcher brains env = flip runContT pure do go (PReleaseBlock hx dcb True) Left e -> do - burstMachineAddErrors bm 1 + liftIO $ burstMachineAddErrors bm 1 atomically $ modifyTVar _errors succ debug $ red "BLOCK DOWNLOAD FUCKED" <+> pretty p <+> pretty hx <+> viaShow e