DDD works

This commit is contained in:
voidlizard 2024-11-14 12:41:34 +03:00
parent 36d35123d5
commit 22bf622259
1 changed files with 111 additions and 95 deletions

View File

@ -47,6 +47,7 @@ import Data.Vector ((!?))
import Data.ByteString qualified as BS
import Data.List qualified as L
import Data.Coerce
import Data.Kind
import Numeric
import UnliftIO
import Control.Concurrent.STM.TSem (TSem)
@ -154,6 +155,12 @@ queryBlockSizeFromPeer cache e h peer = do
Right x -> pure (Right x)
class Monad m => IsBurstMachine a m where
getCurrentBurst :: a -> m Int
burstMachineAddErrors :: a -> Int -> m ()
burstMachineReset :: a -> m ()
runBurstMachine :: a -> m ()
data BurstMachine =
BurstMachine
{ _buTimeout :: Double
@ -165,15 +172,15 @@ data BurstMachine =
, _buErrors :: TVar Int
}
burstMachineAddErrors :: MonadUnliftIO m => BurstMachine -> Int -> m ()
burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n)
data ConstBurstMachine = ConstBurstMachine Int
burstMachineReset :: MonadUnliftIO m => BurstMachine -> m ()
burstMachineReset BurstMachine{..} = do
atomically do
writeTVar _buCurrent _buStart
writeTVar _buErrors 0
data AnyBurstMachine (m :: Type -> Type) = forall a . IsBurstMachine a m => AnyBurstMachine a
instance MonadIO m => IsBurstMachine (AnyBurstMachine m) m where
getCurrentBurst (AnyBurstMachine a) = getCurrentBurst a
burstMachineAddErrors (AnyBurstMachine a) = burstMachineAddErrors a
burstMachineReset (AnyBurstMachine a) = burstMachineReset a
runBurstMachine (AnyBurstMachine a) = runBurstMachine a
newBurstMachine :: MonadUnliftIO m
=> Double -- ^ timeout
@ -193,92 +200,101 @@ newBurstMachine t0 buMax buStart up' down' = do
down = min 0.85 down'
up = min 0.5 up'
getCurrentBurst :: MonadUnliftIO m => BurstMachine -> m Int
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
instance MonadUnliftIO m => IsBurstMachine BurstMachine m where
getCurrentBurst BurstMachine{..} = readTVarIO _buCurrent <&> round
burstMachineAddErrors BurstMachine{..} n =
atomically $ modifyTVar _buErrors (+n)
runBurstMachine :: MonadUnliftIO m
=> BurstMachine
-> m ()
runBurstMachine BurstMachine{..} = do
bu0 <- readTVarIO _buCurrent <&> realToFrac
let buMax = realToFrac _buBurstMax
let down = _buStepDown
let up = _buStepUp
_dEdT <- newTVarIO 0.00
_rates <- newTVarIO (mempty :: Map Double Double)
_buMaxReal <- newTVarIO buMax
pause @'Seconds (realToFrac _buTimeout)
flip runContT pure do
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 10)
atomically do
e <- headDef bu0 . Map.elems <$> readTVar _rates
nrates <- readTVar _rates <&> take 100 . Map.toList
writeTVar _rates (Map.fromList nrates)
modifyTVar _buMaxReal (max e)
void $ ContT $ withAsync do
forever do
pause @'Seconds 600
atomically $ writeTVar _buMaxReal buMax
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 2.0)
ddt <- readTVarIO _dEdT
when (ddt <= 0) do
atomically do
buMaxReal <- readTVar _buMaxReal
current <- readTVar _buCurrent
let new = min buMaxReal (current * (1.0 + up))
writeTVar _buCurrent new
flip fix 0 $ \next e1 -> do
let dt = realToFrac _buTimeout
eNew <- atomically do
e2 <- readTVar _buErrors
current <- readTVar _buCurrent
new <- if e2 > e1 then do
let d = max 2.0 (current * (1.0 - down))
nrates <- readTVar _rates <&> drop 1 . Map.toList
let newFucked = maybe d snd (headMay nrates)
writeTVar _rates (Map.fromList nrates)
pure newFucked
else
pure current -- $ min buMaxReal (current * (1.0 + up))
burstMachineReset BurstMachine{..} = do
atomically do
writeTVar _buCurrent _buStart
writeTVar _buErrors 0
writeTVar _buCurrent new
let dedt = realToFrac (e2 - e1) / realToFrac dt
runBurstMachine BurstMachine{..} = do
writeTVar _dEdT (realToFrac dedt)
modifyTVar _rates ( Map.insertWith max dedt current )
bu0 <- readTVarIO _buCurrent <&> realToFrac
let buMax = realToFrac _buBurstMax
let down = _buStepDown
let up = _buStepUp
pure e2
_dEdT <- newTVarIO 0.00
pause @'Seconds dt
next eNew
_rates <- newTVarIO (mempty :: Map Double Double)
_buMaxReal <- newTVarIO buMax
pause @'Seconds (realToFrac _buTimeout)
flip runContT pure do
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 10)
atomically do
e <- headDef bu0 . Map.elems <$> readTVar _rates
nrates <- readTVar _rates <&> take 100 . Map.toList
writeTVar _rates (Map.fromList nrates)
modifyTVar _buMaxReal (max e)
void $ ContT $ withAsync do
forever do
pause @'Seconds 600
atomically $ writeTVar _buMaxReal buMax
void $ ContT $ withAsync do
forever do
pause @'Seconds (realToFrac _buTimeout * 2.0)
ddt <- readTVarIO _dEdT
when (ddt <= 0) do
atomically do
buMaxReal <- readTVar _buMaxReal
current <- readTVar _buCurrent
let new = min buMaxReal (current * (1.0 + up))
writeTVar _buCurrent new
flip fix 0 $ \next e1 -> do
let dt = realToFrac _buTimeout
eNew <- atomically do
e2 <- readTVar _buErrors
current <- readTVar _buCurrent
new <- if e2 > e1 then do
let d = max 2.0 (current * (1.0 - down))
nrates <- readTVar _rates <&> drop 1 . Map.toList
let newFucked = maybe d snd (headMay nrates)
writeTVar _rates (Map.fromList nrates)
pure newFucked
else
pure current -- $ min buMaxReal (current * (1.0 + up))
writeTVar _buErrors 0
writeTVar _buCurrent new
let dedt = realToFrac (e2 - e1) / realToFrac dt
writeTVar _dEdT (realToFrac dedt)
modifyTVar _rates ( Map.insertWith max dedt current )
pure e2
pause @'Seconds dt
next eNew
instance MonadIO m => IsBurstMachine ConstBurstMachine m where
getCurrentBurst (ConstBurstMachine n) = pure n
burstMachineAddErrors _ _ = none
burstMachineReset _ = none
runBurstMachine _ = forever $ pause @'Seconds 300
data S =
SInit
@ -689,8 +705,8 @@ downloadDispatcher brains env = flip runContT pure do
bm <- liftIO do
case _sockType p of
TCP -> newBurstMachine 20 256 (Just 256) 0.25 0.45
UDP -> newBurstMachine 3 256 (Just 50) 0.10 0.25
TCP -> AnyBurstMachine @IO <$> newBurstMachine 30 256 (Just 256) 0.20 0.10
UDP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 100) 0.15 0.35
void $ ContT $ bracket none $ const do
debug $ "Cancelling thread for" <+> pretty p
@ -706,7 +722,7 @@ downloadDispatcher brains env = flip runContT pure do
when (b0 == b1) do
debug $ blue "Reset burst machine" <+> pretty p
burstMachineReset bm
liftIO $ burstMachineReset bm
s <- readTVarIO wip <&> HM.size
@ -721,7 +737,7 @@ downloadDispatcher brains env = flip runContT pure do
unless here do
modifyTVar _sizeCache (HM.delete h)
bmt <- ContT $ withAsync $ runBurstMachine bm
bmt <- ContT $ withAsync $ liftIO $ runBurstMachine bm
tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do
tss <- readTVarIO btimes
@ -734,7 +750,7 @@ downloadDispatcher brains env = flip runContT pure do
pinfo' <- lift $ find @e (PeerInfoKey p) id
PeerInfo{..} <- ContT $ maybe1 pinfo' none
bu <- lift $ getCurrentBurst bm
bu <- liftIO $ getCurrentBurst bm
atomically do
erno <- readTVar _errors
down <- readTVar _blknum
@ -827,10 +843,10 @@ downloadDispatcher brains env = flip runContT pure do
PFetchBlock hx dcb size -> do
bu <- lift $ getCurrentBurst bm
bu <- liftIO $ getCurrentBurst bm
t0 <- getTimeCoarse
r <- lift $ downloadFromPeer bu (KnownSize size) env (coerce hx) p
r <- downloadFromPeer bu (KnownSize size) env (coerce hx) p
t1 <- getTimeCoarse
case r of
@ -841,7 +857,7 @@ downloadDispatcher brains env = flip runContT pure do
avg <- readTVarIO _avg
when (dtsec > avg * 1.15) do
burstMachineAddErrors bm 1
liftIO $ burstMachineAddErrors bm 1
atomically do
modifyTVar btimes ( take 100 . (dtsec :) )
@ -852,7 +868,7 @@ downloadDispatcher brains env = flip runContT pure do
go (PReleaseBlock hx dcb True)
Left e -> do
burstMachineAddErrors bm 1
liftIO $ burstMachineAddErrors bm 1
atomically $ modifyTVar _errors succ
debug $ red "BLOCK DOWNLOAD FUCKED" <+> pretty p <+> pretty hx <+> viaShow e