dynamic burst management. makes downloading better

commit 7781e1df1dd258fabfb5d6dd2f9748f4c4d5985a
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 11:14:55 2023 +0300

    burst tune

commit 4e837691a5bd7fcda1d0bc8b89c5a377aa18ae30
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 11:06:09 2023 +0300

    dynamic-burst-management-wip

commit 575ba7bb6a8952ad79021420c488ffd4ef84f413
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 11:01:14 2023 +0300

    dynamic-burst-management-wip

commit 5517df31d790c130e65cf6eb0049dabdd3d627da
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 10:18:53 2023 +0300

    dynamic-burst-management-wip

commit 4c5c0b580d9a8c81e4133cc61a15d42b737e2b3c
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 09:29:45 2023 +0300

    wip

commit 0132b55be40939e321dc9adc8f4f6176c98c46dc
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Sat Feb 4 09:14:47 2023 +0300

    wip

commit 0bb90781e857076bb20a2e34da8f7edaa63f7815
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Fri Feb 3 14:50:49 2023 +0300

    wip

commit 67befe2af5f7c4139eee5f018a26b78e95aa6cb3
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Fri Feb 3 13:51:57 2023 +0300

    wip

commit d6d5769e032245a130dd4508a2780654b046dceb
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Fri Feb 3 12:48:49 2023 +0300

    wip

commit c78bb51f10c14bea483bfa235aa464d6b857263d
Author: Dmitry Zuikov <dzuikov@gmail.com>
Date:   Fri Feb 3 12:46:03 2023 +0300

    wip
This commit is contained in:
Dmitry Zuikov 2023-02-04 11:15:56 +03:00
parent 7fd8e9f153
commit b9d2adac3d
8 changed files with 334 additions and 123 deletions

12
README.md Normal file
View File

@ -0,0 +1,12 @@
## How to launch a peer
Example:
```
hbs2-peer run -p .peers/1 -k .peers/1/key -l addr:port -r rpcaddr:rpcport
```

View File

@ -13,7 +13,10 @@ defMessageQueueSize :: Integral a => a
defMessageQueueSize = 65536
defBurst :: Integral a => a
defBurst = 16
defBurst = 4
defBurstMax :: Integral a => a
defBurstMax = 256
-- defChunkSize :: Integer
defChunkSize :: Integral a => a
@ -47,6 +50,9 @@ defCookieTimeoutSec = 1200
defCookieTimeout :: TimeSpec
defCookieTimeout = toTimeSpec defCookieTimeoutSec
defBlockWipTimeout :: TimeSpec
defBlockWipTimeout = toTimeSpec defCookieTimeoutSec
defBlockInfoTimeout :: Timeout 'Seconds
defBlockInfoTimeout = 2

View File

@ -44,7 +44,6 @@ blockSizeProto getBlockSize evHasBlock =
emit @e (BlockSizeEventKey h) (BlockSizeEvent (that, h, sz))
evHasBlock ( that, h, Just sz )
newtype instance SessionKey e (BlockInfo e) =
BlockSizeKey (Hash HbSync)
deriving stock (Typeable,Eq,Show)

View File

@ -12,7 +12,7 @@ module HBS2.System.Logger.Simple
, notice
, info
, setLogging
, asIs
, defLog
, loggerTr
, module HBS2.System.Logger.Simple.Class
) where
@ -39,8 +39,8 @@ data LoggerEntry =
makeLenses 'LoggerEntry
asIs :: a -> a
asIs = id
defLog :: a -> a
defLog = id
{-# OPTIONS_GHC -fno-cse #-}
{-# NOINLINE loggers #-}

View File

@ -21,18 +21,22 @@ import HBS2.System.Logger.Simple
import PeerInfo
import Data.Foldable hiding (find)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.ByteString.Lazy (ByteString)
import Data.Cache (Cache)
import Data.Cache qualified as Cache
import Data.Foldable hiding (find)
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.IntMap (IntMap)
import Data.IntMap qualified as IntMap
import Data.IntSet qualified as IntSet
import Data.Maybe
import Data.Set qualified as Set
import Data.Set (Set)
import Lens.Micro.Platform
import Prettyprinter
import System.Random.Shuffle
@ -50,7 +54,6 @@ calcBursts bu pieces = go seed
go [x] = [x]
go [] = []
data BlockDownload =
BlockDownload
{ _sBlockHash :: Hash HbSync
@ -74,11 +77,16 @@ newtype instance SessionKey e (BlockChunks e) =
deriving stock (Generic,Typeable)
-- data MyBlkInfo e =
-- MyBlkInfo (Peer e) Integer
-- deriving stock (Eq,Ord)
data DownloadEnv e =
DownloadEnv
{ _downloadQ :: TQueue (Hash HbSync)
, _peerBusy :: TVar (HashMap (Peer e) ())
, _blockPeers :: TVar (HashMap (Hash HbSync) (HashMap (Peer e) Integer) )
, _blockWip :: Cache (Hash HbSync) ()
}
makeLenses 'DownloadEnv
@ -90,6 +98,8 @@ newDownloadEnv :: (MonadIO m, MyPeer e) => m (DownloadEnv e)
newDownloadEnv = liftIO do
DownloadEnv <$> newTQueueIO
<*> newTVarIO mempty
<*> newTVarIO mempty
<*> Cache.newCache (Just defBlockWipTimeout)
newtype BlockDownloadM e m a =
BlockDownloadM { fromBlockDownloadM :: ReaderT (DownloadEnv e) m a }
@ -110,9 +120,16 @@ withDownload e m = runReaderT ( fromBlockDownloadM m ) e
addDownload :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
addDownload h = do
q <- asks (view downloadQ)
liftIO $ atomically $ writeTQueue q h
-- debug $ "addDownload" <+> pretty h
-- pause ( 0.25 :: Timeout 'Seconds )
wip <- asks (view blockWip)
liftIO do
atomically $ writeTQueue q h
Cache.insert wip h ()
removeFromWip :: MonadIO m => Hash HbSync -> BlockDownloadM e m ()
removeFromWip h = do
wip <- asks (view blockWip)
liftIO $ Cache.delete wip h
withFreePeer :: (MyPeer e, MonadIO m)
=> Peer e
@ -134,11 +151,46 @@ withFreePeer p n m = do
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
pure r
-- NOTE: dangerous! if called in
-- wrong place/wrong time,
-- if may cause a drastical
-- download speed degradation
dismissPeer :: (MyPeer e, MonadIO m)
=> Peer e
-> BlockDownloadM e m ()
dismissPeer p = do
busy <- asks (view peerBusy)
liftIO $ atomically $ modifyTVar busy $ HashMap.delete p
getBlockForDownload :: MonadIO m => BlockDownloadM e m (Hash HbSync)
getBlockForDownload = do
q <- asks (view downloadQ)
liftIO $ atomically $ readTQueue q
addBlockInfo :: (MonadIO m, MyPeer e)
=> Peer e
-> Hash HbSync
-> Integer
-> BlockDownloadM e m ()
addBlockInfo pip h size = do
-- debug $ "addBlockInfo" <+> pretty h <+> pretty pip <+> pretty size
tv <- asks (view blockPeers)
let mySize = HashMap.singleton pip size
liftIO $ atomically
$ modifyTVar tv (HashMap.insertWith (<>) h mySize)
getPeersForBlock :: (MonadIO m, MyPeer e)
=> Hash HbSync
-> BlockDownloadM e m [(Peer e, Integer)]
getPeersForBlock h = do
tv <- asks (view blockPeers)
liftIO $ readTVarIO tv <&> foldMap HashMap.toList
. maybeToList
. HashMap.lookup h
processBlock :: forall e m . ( MonadIO m
, HasStorage m
, Block ByteString ~ ByteString
@ -152,6 +204,10 @@ processBlock h = do
bt <- liftIO $ getBlock sto h <&> fmap (tryDetect h)
-- FIXME: если блок нашёлся, то удаляем его из wip
when (isJust bt) (removeFromWip h)
case bt of
Nothing -> addDownload h
@ -169,14 +225,28 @@ processBlock h = do
if here then do
debug $ "block" <+> pretty blk <+> "is already here"
processBlock blk -- NOTE: хуже не стало
-- FIXME: processBlock h
-- может быть, в этом причина того,
-- что мы периодически не докачиваем?
--
-- может быть, нужно рекурсировать, что бы
-- посмотреть, что это за блок и что у нас
-- из него есть?
pure () -- we don't need to recurse, cause walkMerkle is recursing for us
else
else do
addDownload blk
Just (Blob{}) -> do
pure ()
-- NOTE: if peer does not have a block, it may
-- cause to an unpleasant timeouts
-- So make sure that this peer really answered to
-- GetBlockSize request
downloadFromWithPeer :: forall e m . ( MyPeer e
, MonadIO m
@ -193,33 +263,15 @@ downloadFromWithPeer :: forall e m . ( MyPeer e
, HasStorage m
)
=> Peer e
-> Integer
-> Hash HbSync
-> BlockDownloadM e m ()
downloadFromWithPeer peer h = do
downloadFromWithPeer peer thisBkSize h = do
npi <- newPeerInfo
pinfo <- lift $ fetch True npi (PeerInfoKey peer) id
waitSize <- liftIO $ newTBQueueIO 1
lift $ do
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
when ( p1 == peer ) $ do
liftIO $ atomically $ writeTBQueue waitSize s
request @e peer (GetBlockSize @e h)
esize <- liftIO $ race ( pause defBlockInfoTimeout ) do -- FIXME: block size wait time
atomically $ readTBQueue waitSize
let mbSize = either (const Nothing) Just esize
sto <- lift $ getStorage
case mbSize of
Nothing -> void $ addDownload h
Just thisBkSize -> do
sto <- lift getStorage
coo <- genCookie (peer,h)
let key = DownloadSessionKey (peer, coo)
@ -267,17 +319,21 @@ downloadFromWithPeer peer h = do
$ readTQueue chuQ )
)
when (null catched) $ do
if not (null catched) then do
liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN)
else do
-- nerfing peer burst size.
-- FIXME: we need a thread that will be reset them again
liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
updatePeerInfo pinfo
newBurst <- liftIO $ atomically
$ stateTVar burstSizeT $ \c -> let v = max 1 (c `div` 2)
in (v,v)
newBurst <- liftIO $ readTVarIO burstSizeT
liftIO $ atomically $ modifyTVar (view peerDownloaded pinfo) (+chunksN)
let chuchu = calcBursts newBurst [ i + n | n <- [0 .. chunksN] ]
liftIO $ atomically $ modifyTVar (view peerErrors pinfo) succ
debug $ "new burst: " <+> pretty newBurst
debug $ "missed chunks for request" <+> pretty (i,chunksN)
@ -323,6 +379,43 @@ downloadFromWithPeer peer h = do
instance HasPeerLocator e m => HasPeerLocator e (BlockDownloadM e m) where
getPeerLocator = lift getPeerLocator
updatePeerInfo :: MonadIO m => PeerInfo e -> m ()
updatePeerInfo pinfo = do
t1 <- liftIO $ getTime MonotonicCoarse
void $ liftIO $ atomically $ do
bu <- readTVar (view peerBurst pinfo)
errs <- readTVar (view peerErrors pinfo)
errsLast <- readTVar (view peerErrorsLast pinfo)
t0 <- readTVar (view peerLastWatched pinfo)
down <- readTVar (view peerDownloaded pinfo)
downLast <- readTVar (view peerDownloadedLast pinfo)
let dE = realToFrac $ max 0 (errs - errsLast)
let dT = realToFrac (max 1 (toNanoSecs t1 - toNanoSecs t0)) / 1e9
let eps = floor (dE / dT)
let bu1 = if down - downLast > 0 then
max 1 $ min defBurstMax
$ ceiling
$ if eps == 0 then
realToFrac bu * 1.05 -- FIXME: to defaults
else
realToFrac bu * 0.65
else
max defBurst $ ceiling (realToFrac bu * 0.65)
writeTVar (view peerErrorsLast pinfo) errs
writeTVar (view peerLastWatched pinfo) t1
writeTVar (view peerErrorsPerSec pinfo) eps
writeTVar (view peerBurst pinfo) bu1
writeTVar (view peerDownloadedLast pinfo) down
blockDownloadLoop :: forall e m . ( m ~ PeerM e IO
, MonadIO m
, Request e (BlockInfo e) m
@ -354,6 +447,18 @@ blockDownloadLoop env0 = do
pl <- getPeerLocator @e
void $ liftIO $ async $ forever $ withPeerM e do
pause @'Seconds 0.5
pee <- knownPeers @e pl
npi <- newPeerInfo
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
updatePeerInfo pinfo
-- TODO: peer info loop
void $ liftIO $ async $ forever $ withPeerM e $ do
pause @'Seconds 20
@ -366,16 +471,39 @@ blockDownloadLoop env0 = do
for_ pee $ \p -> do
pinfo <- fetch True npi (PeerInfoKey p) id
burst <- liftIO $ readTVarIO (view peerBurst pinfo)
errors <- liftIO $ readTVarIO (view peerErrorsPerSec pinfo)
debug $ "peer" <+> pretty p <+> "burst: " <+> pretty burst
<+> "errors:" <+> pretty errors
pure ()
void $ liftIO $ async $ forever $ withPeerM e $ withDownload env0 do
pause @'Seconds 5 -- FIXME: put to defaults
-- we need to show download stats
tinfo <- asks (view blockPeers)
binfo <- liftIO $ readTVarIO tinfo
wip <- asks (view blockWip)
liftIO $ Cache.purgeExpired wip
aliveWip <- Set.fromList <$> liftIO (Cache.keys wip)
let alive = HashMap.fromList [ (h,i)
| (h,i) <- HashMap.toList binfo
, Set.member h aliveWip
]
liftIO $ atomically $ writeTVar tinfo alive
debug $ "maintain blocks wip" <+> pretty (Set.size aliveWip)
withDownload env0 do
env <- ask
let again h = do
debug $ "block fucked: " <+> pretty h
withPeerM e $ withDownload env (processBlock h)
-- debug $ "retrying block: " <+> pretty h
withPeerM e $ withDownload env (addDownload h)
mapM_ processBlock blks
@ -387,13 +515,46 @@ blockDownloadLoop env0 = do
unless here do
void $ runMaybeT $ do
p <- MaybeT $ knownPeers @e pl >>= liftIO . shuffleM <&> headMay
peers <- getPeersForBlock h
liftIO $ race ( pause defBlockWaitMax >> again h ) do
withPeerM e $ withDownload env $ do -- NOTE: really crazy shit
withFreePeer p (processBlock h >> pause (0.1 :: Timeout 'Seconds)) do
downloadFromWithPeer p h
when (null peers) $ do
lift do -- in PeerM
subscribe (BlockSizeEventKey h) $ \(BlockSizeEvent (p1,hx,s)) -> do
withDownload env (addBlockInfo p1 hx s)
pips <- knownPeers @e pl
for_ pips $ \pip -> request pip (GetBlockSize @e h)
p <- knownPeers @e pl >>= liftIO . shuffleM
-- FIXME: нам не повезло с пиром => сидим ждём defBlockWaitMax и скачивание
-- простаивает.
--
-- Нужно: сначала запросить всех у кого есть блок.
-- Потом выбрать победителей и попытаться скачать
-- у них, запомнив размер в кэше.
--
-- Когда находим блоки -- то сразу же асинхронно запрашиваем
-- размеры, что бы по приходу сюда они уже были
-- debug $ "known peers" <+> pretty p
-- debug $ "peers/blocks" <+> pretty peers
p0 <- headMay <$> liftIO (shuffleM peers) -- FIXME: random choice to work faster
let withAllShit f = withPeerM e $ withDownload env f
maybe1 p0 (again h) $ \(p1,size) -> do
withFreePeer p1 (again h) $
liftIO do
re <- race ( pause defBlockWaitMax ) $
withAllShit $ downloadFromWithPeer p1 size h
case re of
Left{} -> withAllShit (again h)
Right{} -> withAllShit (processBlock h)
next

View File

@ -11,9 +11,15 @@ import Lens.Micro.Platform
import Control.Concurrent.STM.TVar
newtype PeerInfo e =
data PeerInfo e =
PeerInfo
{ _peerBurst :: TVar Int
, _peerErrors :: TVar Int
, _peerErrorsLast :: TVar Int
, _peerErrorsPerSec :: TVar Int
, _peerLastWatched :: TVar TimeSpec
, _peerDownloaded :: TVar Int
, _peerDownloadedLast :: TVar Int
}
deriving stock (Generic,Typeable)
@ -23,7 +29,12 @@ makeLenses 'PeerInfo
newPeerInfo :: MonadIO m => m (PeerInfo e)
newPeerInfo = liftIO do
PeerInfo <$> newTVarIO defBurst
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
<*> newTVarIO 0
type instance SessionData e (PeerInfo e) = PeerInfo e

View File

@ -65,6 +65,7 @@ data RPCCommand =
| ANNOUNCE (Hash HbSync)
| PING (PeerAddr UDP)
| CHECK PeerNonce (PeerAddr UDP) (Hash HbSync)
| FETCH (Hash HbSync)
data PeerOpts =
PeerOpts
@ -86,10 +87,10 @@ main = do
sodiumInit
setLogging @DEBUG (set loggerTr ("[debug] " <>))
setLogging @INFO asIs
setLogging @ERROR asIs
setLogging @WARN asIs
setLogging @NOTICE asIs
setLogging @INFO defLog
setLogging @ERROR defLog
setLogging @WARN defLog
setLogging @NOTICE defLog
withSimpleLogger runCLI
@ -106,6 +107,7 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
<> command "poke" (info pPoke (progDesc "poke peer by rpc"))
<> command "announce" (info pAnnounce (progDesc "announce block"))
<> command "ping" (info pPing (progDesc "ping another peer"))
<> command "fetch" (info pFetch (progDesc "fetch block"))
)
common = do
@ -145,6 +147,11 @@ runCLI = join . customExecParser (prefs showHelpOnError) $
h <- strArgument ( metavar "HASH" )
pure $ runRpcCommand rpc (ANNOUNCE h)
pFetch = do
rpc <- pRpcCommon
h <- strArgument ( metavar "HASH" )
pure $ runRpcCommand rpc (FETCH h)
pPing = do
rpc <- pRpcCommon
h <- strArgument ( metavar "ADDR" )
@ -289,6 +296,12 @@ runPeer opts = Exception.handle myException $ do
debug $ "Got authorized peer!" <+> pretty p
<+> pretty (AsBase58 (view peerSignKey d))
void $ liftIO $ async $ withPeerM env do
pause @'Seconds 1
debug "sending first peer announce"
request localMulticast (PeerAnnounce @UDP pnonce)
void $ liftIO $ async $ withPeerM env $ forever $ do
pause defPeerAnnounceTime -- FIXME: setting!
debug "sending local peer announce"
@ -339,6 +352,8 @@ runPeer opts = Exception.handle myException $ do
withDownload denv $ do
processBlock h
_ -> pure ()
me <- liftIO $ async $ withPeerM env $ do
runProto @UDP
@ -359,10 +374,16 @@ runPeer opts = Exception.handle myException $ do
let pingAction pa = do
liftIO $ atomically $ writeTQueue rpcQ (PING pa)
let fetchAction h = do
debug $ "fetchAction" <+> pretty h
liftIO $ withPeerM penv
$ withDownload denv (processBlock h)
let arpc = RpcAdapter pokeAction
dontHandle
annAction
pingAction
fetchAction
rpc <- async $ runRPC udp1 do
runProto @UDP
@ -408,12 +429,6 @@ emitToPeer env k e = liftIO $ withPeerM env (emit k e)
withRPC :: String -> RPC UDP -> IO ()
withRPC saddr cmd = withSimpleLogger do
setLogging @DEBUG asIs
setLogging @INFO asIs
setLogging @ERROR asIs
setLogging @WARN asIs
setLogging @NOTICE asIs
as <- parseAddr (fromString saddr) <&> fmap (PeerUDP . addrAddress)
let rpc' = headMay $ L.sortBy (compare `on` addrPriority) as
@ -437,6 +452,8 @@ withRPC saddr cmd = withSimpleLogger do
RPCPing{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
RPCFetch{} -> pause @'Seconds 0.1 >> liftIO exitSuccess
_ -> pure ()
void $ liftIO $ waitAnyCatchCancel [proto]
@ -445,15 +462,17 @@ withRPC saddr cmd = withSimpleLogger do
where
adapter = RpcAdapter dontHandle
(const $ debug "alive-and-kicking" >> liftIO exitSuccess)
(const $ notice "alive-and-kicking" >> liftIO exitSuccess)
(const $ liftIO exitSuccess)
(const $ debug "wat?")
dontHandle
runRpcCommand :: String -> RPCCommand -> IO ()
runRpcCommand saddr = \case
POKE -> withRPC saddr (RPCPoke @UDP)
PING s -> withRPC saddr (RPCPing s)
ANNOUNCE h -> withRPC saddr (RPCAnnounce @UDP h)
FETCH h -> withRPC saddr (RPCFetch @UDP h)
_ -> pure ()

View File

@ -18,6 +18,7 @@ data RPC e =
| RPCPing (PeerAddr e)
| RPCPokeAnswer
| RPCAnnounce (Hash HbSync)
| RPCFetch (Hash HbSync)
deriving stock (Generic)
@ -44,6 +45,7 @@ data RpcAdapter e m =
, rpcOnPokeAnswer :: RPC e -> m ()
, rpcOnAnnounce :: Hash HbSync -> m ()
, rpcOnPing :: PeerAddr e -> m ()
, rpcOnFetch :: Hash HbSync -> m ()
}
newtype RpcM m a = RpcM { fromRpcM :: ReaderT RPCEnv m a }
@ -85,4 +87,5 @@ rpcHandler adapter = \case
p@RPCPokeAnswer{} -> rpcOnPokeAnswer adapter p
(RPCAnnounce h) -> rpcOnAnnounce adapter h
(RPCPing pa) -> rpcOnPing adapter pa
(RPCFetch h) -> rpcOnFetch adapter h