diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 077f3ab3..5f8e1fa3 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -53,6 +53,12 @@ defRequestLimit = toTimeSpec defRequestLimitSec defRequestLimitSec :: Timeout 'Seconds defRequestLimitSec = 60 +defBlockBanTime :: TimeSpec +defBlockBanTime = toTimeSpec defBlockBanTimeSec + +defBlockBanTimeSec :: Timeout 'Seconds +defBlockBanTimeSec = 30 :: Timeout 'Seconds + defBlockWipTimeout :: TimeSpec defBlockWipTimeout = toTimeSpec defCookieTimeoutSec diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index 8ae8e89c..6327db3b 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -22,7 +22,6 @@ import HBS2.System.Logger.Simple import PeerTypes import PeerInfo -import PokePostponed import Control.Concurrent.Async import Control.Concurrent.STM @@ -478,8 +477,6 @@ blockDownloadLoop env0 = do pinfo <- fetch True npi (PeerInfoKey p) id updatePeerInfo False pinfo - void $ liftIO $ async $ withPeerM e $ withDownload env0 (pokePostponed e) - -- TODO: peer info loop void $ liftIO $ async $ forever $ withPeerM e $ do pause @'Seconds 10 @@ -543,75 +540,105 @@ peerDownloadLoop :: forall e m . ( MyPeer e , DownloadFromPeerStuff e m , m ~ PeerM e IO ) => Peer e -> BlockDownloadM e m () -peerDownloadLoop peer = forever do +peerDownloadLoop peer = do - sto <- lift getStorage + bannedBlocks <- liftIO $ Cache.newCache (Just defBlockBanTime) + seenBlocks <- liftIO $ newTVarIO mempty - auth <- lift $ find (KnownPeerKey peer) id <&> isJust - pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail) + pe <- lift ask + e <- ask - maybe1 pinfo' none $ \pinfo -> do + let withAllStuff m = withPeerM pe $ withDownload e m - let downFail = view peerDownloadFail pinfo - let downBlk = view peerDownloadedBlk pinfo - failNum <- liftIO $ readTVarIO downFail + forever do - -- FIXME: failNum-to-defaults - let notFailed = failNum < defDownloadFails + sto <- lift getStorage - -- FIXME: better-avoiding-busyloop - -- unless notFailed do - -- pause @'Seconds 1 + auth <- lift $ find (KnownPeerKey peer) id <&> isJust + pinfo' <- lift $ find (PeerInfoKey peer) id -- (view peerDownloadFail) - when (failNum > 5) do - pause @'Seconds defBlockWaitMax + maybe1 pinfo' none $ \pinfo -> do - when auth do + let downFail = view peerDownloadFail pinfo + let downBlk = view peerDownloadedBlk pinfo + failNum <- liftIO $ readTVarIO downFail - withBlockForDownload $ \h -> do - e <- lift ask - ee <- ask + -- FIXME: failNum-to-defaults + let notFailed = failNum < defDownloadFails + + -- FIXME: better-avoiding-busyloop + -- unless notFailed do + -- pause @'Seconds 1 + + when (failNum > 5) do + pause @'Seconds defBlockWaitMax + + when auth do + + withBlockForDownload $ \h -> do + e <- lift ask + ee <- ask + + st <- getBlockState h + + let alterSeen = \case + Just x -> Just (succ x) + Nothing -> Just 1 - st <- getBlockState h - setBlockState h (set bsState Downloading st) + banned <- liftIO $ Cache.lookup bannedBlocks h <&> isJust - r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do - blksq <- liftIO newTQueueIO - subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do - liftIO $ atomically $ writeTQueue blksq s + if banned then do + let seenTotal = view bsTimes st + let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (50 * seenTotal)))) + void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h) + debug $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa) + else do - request peer (GetBlockSize @e h) + liftIO $ atomically $ modifyTVar seenBlocks (HashMap.alter alterSeen h) - liftIO $ atomically $ readTQueue blksq + seenTimes <- liftIO $ readTVarIO seenBlocks <&> fromMaybe 0 . HashMap.lookup h - case r1 of - Left{} -> do - liftIO $ atomically $ modifyTVar downFail succ - addDownload h + when ( seenTimes > 1 ) do + debug $ "ban block" <+> pretty h <+> "for a while" <+> parens (pretty seenTimes) + liftIO $ atomically $ modifyTVar seenBlocks (HashMap.delete h) + liftIO $ Cache.insert bannedBlocks h () - Right size -> do - r2 <- liftIO $ race ( pause defBlockWaitMax ) - $ withPeerM e - $ withDownload ee - $ downloadFromWithPeer peer size h + setBlockState h (set bsState Downloading st) - case r2 of + r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do + blksq <- liftIO newTQueueIO + subscribe @e (BlockSizeEventKey h) $ \(BlockSizeEvent (_,_,s)) -> do + liftIO $ atomically $ writeTQueue blksq s + + request peer (GetBlockSize @e h) + + liftIO $ atomically $ readTQueue blksq + + case r1 of Left{} -> do liftIO $ atomically $ modifyTVar downFail succ addDownload h --- Right Nothing -> do --- liftIO $ atomically $ modifyTVar downFail succ --- addDownload h + Right size -> do + r2 <- liftIO $ race ( pause defBlockWaitMax ) + $ withPeerM e + $ withDownload ee + $ downloadFromWithPeer peer size h - Right{} -> do - processBlock h - liftIO $ atomically do - writeTVar downFail 0 - modifyTVar downBlk succ + case r2 of + Left{} -> do + liftIO $ atomically $ modifyTVar downFail succ + addDownload h + -- FIXME: remove-block-seen-times-hardcode - pure () + Right{} -> do + processBlock h + liftIO $ atomically do + writeTVar downFail 0 + modifyTVar downBlk succ + + pure () -- NOTE: this is an adapter for a ResponseM monad -- because response is working in ResponseM monad (ha!) diff --git a/hbs2-peer/app/PeerInfo.hs b/hbs2-peer/app/PeerInfo.hs index fb45aa20..dd909db1 100644 --- a/hbs2-peer/app/PeerInfo.hs +++ b/hbs2-peer/app/PeerInfo.hs @@ -128,6 +128,8 @@ peerPingLoop = do forever do + pause @'Seconds 1 + -- FIXME: defaults r <- liftIO $ race (pause @'Seconds 60) (atomically $ readTQueue wake) @@ -154,7 +156,7 @@ peerPingLoop = do fnum <- liftIO $ readTVarIO pfails fdown <- liftIO $ readTVarIO pdownfails - when (fnum > 4) do -- FIXME: hardcode! + when (fnum > 2) do -- FIXME: hardcode! warn $ "removing peer" <+> pretty p <+> "for not responding to our pings" delPeers pl [p] expire (PeerInfoKey p) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 445c98dc..a2587e9e 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -435,6 +435,10 @@ runPeer opts = Exception.handle myException $ do let pd = Map.fromList $ catMaybes pd' case Map.lookup thatNonce pd of + + -- TODO: prefer-local-peer-with-same-nonce-over-remote-peer + -- remove remote peer + -- add local peer Just p0 | p0 /= p -> debug "Same peer, different address" _ -> do diff --git a/hbs2-peer/app/PokePostponed.hs b/hbs2-peer/app/PokePostponed.hs deleted file mode 100644 index 945229ce..00000000 --- a/hbs2-peer/app/PokePostponed.hs +++ /dev/null @@ -1,85 +0,0 @@ -module PokePostponed where - -import HBS2.Prelude.Plated -import HBS2.Clock -import HBS2.Actors.Peer -import HBS2.Net.Proto.Peer -import HBS2.Events - -import HBS2.System.Logger.Simple - -import PeerTypes - -import Data.Foldable (for_) -import Control.Concurrent.Async -import Control.Concurrent.STM -import Control.Monad.Reader -import Data.Cache qualified as Cache -import Lens.Micro.Platform -import Numeric ( showGFloat ) -import Prettyprinter - -pokePostponed :: forall e m . ( MonadIO m - , EventListener e (PeerHandshake e) m - , MyPeer e - ) - => PeerEnv e - -> BlockDownloadM e m () - -pokePostponed penv = do - - env <- ask - - waitQ <- liftIO $ newTBQueueIO 1 - - busy <- liftIO $ newTVarIO False - - cache <- asks (view blockPostponed) - - lift $ subscribe @e AnyKnownPeerEventKey $ \(KnownPeerEvent{}) -> do - cant <- liftIO $ readTVarIO busy - unless cant $ do - debug "AnyKnownPeerEventKey" - mt <- liftIO $ atomically $ isEmptyTBQueue waitQ - when mt do - liftIO $ atomically $ writeTBQueue waitQ () - - forever do - -- FIXME: to defaults! - r <- liftIO $ race ( pause @'Seconds 60 ) ( atomically $ readTBQueue waitQ ) - - void $ liftIO $ atomically $ flushTBQueue waitQ - - liftIO $ atomically $ writeTVar busy True - - void $ liftIO $ async $ do - pause @'Seconds 30 - atomically $ writeTVar busy False - - let allBack = either (const False) (const True) r - - blks <- liftIO $ Cache.toList cache - - w <- calcWaitTime - - debug $ "tossPostponed" <+> pretty (showGFloat (Just 2) w "") - <+> pretty (length blks) - - for_ blks $ \case - (k,_,Nothing) | not allBack -> pure () - | otherwise -> pushBack cache k - (k,_,Just{}) -> pushBack cache k - - where - pushBack cache k = do - w <- calcWaitTime - liftIO $ Cache.delete cache k - st <- getBlockState k - t0 <- liftIO $ getTime MonotonicCoarse - setBlockState k ( set bsStart t0 - . set bsState Initial - . set bsWipTo w - $ st - ) - debug $ "returning block to downloads" <+> pretty k - addDownload k diff --git a/hbs2-peer/hbs2-peer.cabal b/hbs2-peer/hbs2-peer.cabal index 75f1c6bd..6d1ba6c8 100644 --- a/hbs2-peer/hbs2-peer.cabal +++ b/hbs2-peer/hbs2-peer.cabal @@ -107,7 +107,6 @@ executable hbs2-peer other-modules: BlockDownload , Bootstrap , PeerInfo - , PokePostponed , RPC , PeerTypes , PeerConfig