mirror of https://github.com/voidlizard/hbs2
better block postpone algorithm
This commit is contained in:
parent
393657ef7c
commit
82b50a193a
|
@ -53,6 +53,12 @@ defRequestLimit = toTimeSpec defRequestLimitSec
|
||||||
defRequestLimitSec :: Timeout 'Seconds
|
defRequestLimitSec :: Timeout 'Seconds
|
||||||
defRequestLimitSec = 60
|
defRequestLimitSec = 60
|
||||||
|
|
||||||
|
defBlockBanTime :: TimeSpec
|
||||||
|
defBlockBanTime = toTimeSpec defBlockBanTimeSec
|
||||||
|
|
||||||
|
defBlockBanTimeSec :: Timeout 'Seconds
|
||||||
|
defBlockBanTimeSec = 30 :: Timeout 'Seconds
|
||||||
|
|
||||||
defBlockWipTimeout :: TimeSpec
|
defBlockWipTimeout :: TimeSpec
|
||||||
defBlockWipTimeout = toTimeSpec defCookieTimeoutSec
|
defBlockWipTimeout = toTimeSpec defCookieTimeoutSec
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,6 @@ import HBS2.System.Logger.Simple
|
||||||
|
|
||||||
import PeerTypes
|
import PeerTypes
|
||||||
import PeerInfo
|
import PeerInfo
|
||||||
import PokePostponed
|
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
@ -478,8 +477,6 @@ blockDownloadLoop env0 = do
|
||||||
pinfo <- fetch True npi (PeerInfoKey p) id
|
pinfo <- fetch True npi (PeerInfoKey p) id
|
||||||
updatePeerInfo False pinfo
|
updatePeerInfo False pinfo
|
||||||
|
|
||||||
void $ liftIO $ async $ withPeerM e $ withDownload env0 (pokePostponed e)
|
|
||||||
|
|
||||||
-- TODO: peer info loop
|
-- TODO: peer info loop
|
||||||
void $ liftIO $ async $ forever $ withPeerM e $ do
|
void $ liftIO $ async $ forever $ withPeerM e $ do
|
||||||
pause @'Seconds 10
|
pause @'Seconds 10
|
||||||
|
@ -543,75 +540,105 @@ peerDownloadLoop :: forall e m . ( MyPeer e
|
||||||
, DownloadFromPeerStuff e m
|
, DownloadFromPeerStuff e m
|
||||||
, m ~ PeerM e IO
|
, m ~ PeerM e IO
|
||||||
) => Peer e -> BlockDownloadM e m ()
|
) => Peer e -> BlockDownloadM e m ()
|
||||||
peerDownloadLoop peer = forever do
|
peerDownloadLoop peer = do
|
||||||
|
|
||||||
sto <- lift getStorage
|
bannedBlocks <- liftIO $ Cache.newCache (Just defBlockBanTime)
|
||||||
|
seenBlocks <- liftIO $ newTVarIO mempty
|
||||||
|
|
||||||
auth <- lift $ find (KnownPeerKey peer) id <&> isJust
|
pe <- lift ask
|
||||||
pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail)
|
e <- ask
|
||||||
|
|
||||||
maybe1 pinfo' none $ \pinfo -> do
|
let withAllStuff m = withPeerM pe $ withDownload e m
|
||||||
|
|
||||||
let downFail = view peerDownloadFail pinfo
|
forever do
|
||||||
let downBlk = view peerDownloadedBlk pinfo
|
|
||||||
failNum <- liftIO $ readTVarIO downFail
|
|
||||||
|
|
||||||
-- FIXME: failNum-to-defaults
|
sto <- lift getStorage
|
||||||
let notFailed = failNum < defDownloadFails
|
|
||||||
|
|
||||||
-- FIXME: better-avoiding-busyloop
|
auth <- lift $ find (KnownPeerKey peer) id <&> isJust
|
||||||
-- unless notFailed do
|
pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail)
|
||||||
-- pause @'Seconds 1
|
|
||||||
|
|
||||||
when (failNum > 5) do
|
maybe1 pinfo' none $ \pinfo -> do
|
||||||
pause @'Seconds defBlockWaitMax
|
|
||||||
|
|
||||||
when auth do
|
let downFail = view peerDownloadFail pinfo
|
||||||
|
let downBlk = view peerDownloadedBlk pinfo
|
||||||
|
failNum <- liftIO $ readTVarIO downFail
|
||||||
|
|
||||||
withBlockForDownload $ \h -> do
|
-- FIXME: failNum-to-defaults
|
||||||
e <- lift ask
|
let notFailed = failNum < defDownloadFails
|
||||||
ee <- ask
|
|
||||||
|
-- FIXME: better-avoiding-busyloop
|
||||||
|
-- unless notFailed do
|
||||||
|
-- pause @'Seconds 1
|
||||||
|
|
||||||
|
when (failNum > 5) do
|
||||||
|
pause @'Seconds defBlockWaitMax
|
||||||
|
|
||||||
|
when auth do
|
||||||
|
|
||||||
|
withBlockForDownload $ \h -> do
|
||||||
|
e <- lift ask
|
||||||
|
ee <- ask
|
||||||
|
|
||||||
|
st <- getBlockState h
|
||||||
|
|
||||||
|
let alterSeen = \case
|
||||||
|
Just x -> Just (succ x)
|
||||||
|
Nothing -> Just 1
|
||||||
|
|
||||||
|
|
||||||
st <- getBlockState h
|
banned <- liftIO $ Cache.lookup bannedBlocks h <&> isJust
|
||||||
setBlockState h (set bsState Downloading st)
|
|
||||||
|
|
||||||
r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do
|
if banned then do
|
||||||
blksq <- liftIO newTQueueIO
|
let seenTotal = view bsTimes st
|
||||||
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do
|
let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (50 * seenTotal))))
|
||||||
liftIO $ atomically $ writeTQueue blksq s
|
void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h)
|
||||||
|
debug $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa)
|
||||||
|
else do
|
||||||
|
|
||||||
request peer (GetBlockSize @e h)
|
liftIO $ atomically $ modifyTVar seenBlocks (HashMap.alter alterSeen h)
|
||||||
|
|
||||||
liftIO $ atomically $ readTQueue blksq
|
seenTimes <- liftIO $ readTVarIO seenBlocks <&> fromMaybe 0 . HashMap.lookup h
|
||||||
|
|
||||||
case r1 of
|
when ( seenTimes > 1 ) do
|
||||||
Left{} -> do
|
debug $ "ban block" <+> pretty h <+> "for a while" <+> parens (pretty seenTimes)
|
||||||
liftIO $ atomically $ modifyTVar downFail succ
|
liftIO $ atomically $ modifyTVar seenBlocks (HashMap.delete h)
|
||||||
addDownload h
|
liftIO $ Cache.insert bannedBlocks h ()
|
||||||
|
|
||||||
Right size -> do
|
setBlockState h (set bsState Downloading st)
|
||||||
r2 <- liftIO $ race ( pause defBlockWaitMax )
|
|
||||||
$ withPeerM e
|
|
||||||
$ withDownload ee
|
|
||||||
$ downloadFromWithPeer peer size h
|
|
||||||
|
|
||||||
case r2 of
|
r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do
|
||||||
|
blksq <- liftIO newTQueueIO
|
||||||
|
subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do
|
||||||
|
liftIO $ atomically $ writeTQueue blksq s
|
||||||
|
|
||||||
|
request peer (GetBlockSize @e h)
|
||||||
|
|
||||||
|
liftIO $ atomically $ readTQueue blksq
|
||||||
|
|
||||||
|
case r1 of
|
||||||
Left{} -> do
|
Left{} -> do
|
||||||
liftIO $ atomically $ modifyTVar downFail succ
|
liftIO $ atomically $ modifyTVar downFail succ
|
||||||
addDownload h
|
addDownload h
|
||||||
|
|
||||||
-- Right Nothing -> do
|
Right size -> do
|
||||||
-- liftIO $ atomically $ modifyTVar downFail succ
|
r2 <- liftIO $ race ( pause defBlockWaitMax )
|
||||||
-- addDownload h
|
$ withPeerM e
|
||||||
|
$ withDownload ee
|
||||||
|
$ downloadFromWithPeer peer size h
|
||||||
|
|
||||||
Right{} -> do
|
case r2 of
|
||||||
processBlock h
|
Left{} -> do
|
||||||
liftIO $ atomically do
|
liftIO $ atomically $ modifyTVar downFail succ
|
||||||
writeTVar downFail 0
|
addDownload h
|
||||||
modifyTVar downBlk succ
|
-- FIXME: remove-block-seen-times-hardcode
|
||||||
|
|
||||||
pure ()
|
Right{} -> do
|
||||||
|
processBlock h
|
||||||
|
liftIO $ atomically do
|
||||||
|
writeTVar downFail 0
|
||||||
|
modifyTVar downBlk succ
|
||||||
|
|
||||||
|
pure ()
|
||||||
|
|
||||||
-- NOTE: this is an adapter for a ResponseM monad
|
-- NOTE: this is an adapter for a ResponseM monad
|
||||||
-- because response is working in ResponseM monad (ha!)
|
-- because response is working in ResponseM monad (ha!)
|
||||||
|
|
|
@ -128,6 +128,8 @@ peerPingLoop = do
|
||||||
|
|
||||||
forever do
|
forever do
|
||||||
|
|
||||||
|
pause @'Seconds 1
|
||||||
|
|
||||||
-- FIXME: defaults
|
-- FIXME: defaults
|
||||||
r <- liftIO $ race (pause @'Seconds 60)
|
r <- liftIO $ race (pause @'Seconds 60)
|
||||||
(atomically $ readTQueue wake)
|
(atomically $ readTQueue wake)
|
||||||
|
@ -154,7 +156,7 @@ peerPingLoop = do
|
||||||
fnum <- liftIO $ readTVarIO pfails
|
fnum <- liftIO $ readTVarIO pfails
|
||||||
fdown <- liftIO $ readTVarIO pdownfails
|
fdown <- liftIO $ readTVarIO pdownfails
|
||||||
|
|
||||||
when (fnum > 4) do -- FIXME: hardcode!
|
when (fnum > 2) do -- FIXME: hardcode!
|
||||||
warn $ "removing peer" <+> pretty p <+> "for not responding to our pings"
|
warn $ "removing peer" <+> pretty p <+> "for not responding to our pings"
|
||||||
delPeers pl [p]
|
delPeers pl [p]
|
||||||
expire (PeerInfoKey p)
|
expire (PeerInfoKey p)
|
||||||
|
|
|
@ -435,6 +435,10 @@ runPeer opts = Exception.handle myException $ do
|
||||||
let pd = Map.fromList $ catMaybes pd'
|
let pd = Map.fromList $ catMaybes pd'
|
||||||
|
|
||||||
case Map.lookup thatNonce pd of
|
case Map.lookup thatNonce pd of
|
||||||
|
|
||||||
|
-- TODO: prefer-local-peer-with-same-nonce-over-remote-peer
|
||||||
|
-- remove remote peer
|
||||||
|
-- add local peer
|
||||||
Just p0 | p0 /= p -> debug "Same peer, different address"
|
Just p0 | p0 /= p -> debug "Same peer, different address"
|
||||||
_ -> do
|
_ -> do
|
||||||
|
|
||||||
|
|
|
@ -1,85 +0,0 @@
|
||||||
module PokePostponed where
|
|
||||||
|
|
||||||
import HBS2.Prelude.Plated
|
|
||||||
import HBS2.Clock
|
|
||||||
import HBS2.Actors.Peer
|
|
||||||
import HBS2.Net.Proto.Peer
|
|
||||||
import HBS2.Events
|
|
||||||
|
|
||||||
import HBS2.System.Logger.Simple
|
|
||||||
|
|
||||||
import PeerTypes
|
|
||||||
|
|
||||||
import Data.Foldable (for_)
|
|
||||||
import Control.Concurrent.Async
|
|
||||||
import Control.Concurrent.STM
|
|
||||||
import Control.Monad.Reader
|
|
||||||
import Data.Cache qualified as Cache
|
|
||||||
import Lens.Micro.Platform
|
|
||||||
import Numeric ( showGFloat )
|
|
||||||
import Prettyprinter
|
|
||||||
|
|
||||||
pokePostponed :: forall e m . ( MonadIO m
|
|
||||||
, EventListener e (PeerHandshake e) m
|
|
||||||
, MyPeer e
|
|
||||||
)
|
|
||||||
=> PeerEnv e
|
|
||||||
-> BlockDownloadM e m ()
|
|
||||||
|
|
||||||
pokePostponed penv = do
|
|
||||||
|
|
||||||
env <- ask
|
|
||||||
|
|
||||||
waitQ <- liftIO $ newTBQueueIO 1
|
|
||||||
|
|
||||||
busy <- liftIO $ newTVarIO False
|
|
||||||
|
|
||||||
cache <- asks (view blockPostponed)
|
|
||||||
|
|
||||||
lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do
|
|
||||||
cant <- liftIO $ readTVarIO busy
|
|
||||||
unless cant $ do
|
|
||||||
debug "AnyKnownPeerEventKey"
|
|
||||||
mt <- liftIO $ atomically $ isEmptyTBQueue waitQ
|
|
||||||
when mt do
|
|
||||||
liftIO $ atomically $ writeTBQueue waitQ ()
|
|
||||||
|
|
||||||
forever do
|
|
||||||
-- FIXME: to defaults!
|
|
||||||
r <- liftIO $ race ( pause @'Seconds 60 ) ( atomically $ readTBQueue waitQ )
|
|
||||||
|
|
||||||
void $ liftIO $ atomically $ flushTBQueue waitQ
|
|
||||||
|
|
||||||
liftIO $ atomically $ writeTVar busy True
|
|
||||||
|
|
||||||
void $ liftIO $ async $ do
|
|
||||||
pause @'Seconds 30
|
|
||||||
atomically $ writeTVar busy False
|
|
||||||
|
|
||||||
let allBack = either (const False) (const True) r
|
|
||||||
|
|
||||||
blks <- liftIO $ Cache.toList cache
|
|
||||||
|
|
||||||
w <- calcWaitTime
|
|
||||||
|
|
||||||
debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "")
|
|
||||||
<+> pretty (length blks)
|
|
||||||
|
|
||||||
for_ blks $ \case
|
|
||||||
(k,_,Nothing) | not allBack -> pure ()
|
|
||||||
| otherwise -> pushBack cache k
|
|
||||||
(k,_,Just{}) -> pushBack cache k
|
|
||||||
|
|
||||||
where
|
|
||||||
pushBack cache k = do
|
|
||||||
w <- calcWaitTime
|
|
||||||
liftIO $ Cache.delete cache k
|
|
||||||
st <- getBlockState k
|
|
||||||
t0 <- liftIO $ getTime MonotonicCoarse
|
|
||||||
setBlockState k ( set bsStart t0
|
|
||||||
. set bsState Initial
|
|
||||||
. set bsWipTo w
|
|
||||||
$ st
|
|
||||||
)
|
|
||||||
debug $ "returning block to downloads" <+> pretty k
|
|
||||||
addDownload k
|
|
|
@ -107,7 +107,6 @@ executable hbs2-peer
|
||||||
other-modules: BlockDownload
|
other-modules: BlockDownload
|
||||||
, Bootstrap
|
, Bootstrap
|
||||||
, PeerInfo
|
, PeerInfo
|
||||||
, PokePostponed
|
|
||||||
, RPC
|
, RPC
|
||||||
, PeerTypes
|
, PeerTypes
|
||||||
, PeerConfig
|
, PeerConfig
|
||||||
|
|
Loading…
Reference in New Issue