From e2ca7d3a2f2840e69eafef6ebecfc420ba2d745e Mon Sep 17 00:00:00 2001 From: Dmitry Zuikov Date: Sat, 25 Feb 2023 13:17:51 +0300 Subject: [PATCH] fixed (?) fetch speed degradation --- hbs2-core/lib/HBS2/Defaults.hs | 3 ++ hbs2-peer/app/BlockDownload.hs | 52 +++++++++++++++++++++++----------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/hbs2-core/lib/HBS2/Defaults.hs b/hbs2-core/lib/HBS2/Defaults.hs index 5f8e1fa3..bb9cbbea 100644 --- a/hbs2-core/lib/HBS2/Defaults.hs +++ b/hbs2-core/lib/HBS2/Defaults.hs @@ -50,6 +50,9 @@ defCookieTimeout = toTimeSpec defCookieTimeoutSec defRequestLimit :: TimeSpec defRequestLimit = toTimeSpec defRequestLimitSec +defBlockSizeCacheTime :: TimeSpec +defBlockSizeCacheTime = toTimeSpec ( 20 :: Timeout 'Seconds ) + defRequestLimitSec :: Timeout 'Seconds defRequestLimitSec = 60 diff --git a/hbs2-peer/app/BlockDownload.hs b/hbs2-peer/app/BlockDownload.hs index c0330850..7ec7062b 100644 --- a/hbs2-peer/app/BlockDownload.hs +++ b/hbs2-peer/app/BlockDownload.hs @@ -312,7 +312,7 @@ updatePeerInfo onError pinfo = do let eps = floor (dE / dT) - let win = min 10 $ defBurstMax - defBurst - 2 + let win = min 10 $ 4 * (defBurstMax - defBurst) when (down - downLast > 0 || onError) do @@ -323,7 +323,7 @@ updatePeerInfo onError pinfo = do else do let buM = headMay $ drop 2 $ IntSet.toDescList buSet writeTVar (view peerBurstMax pinfo) buM - let buN = headDef defBurst $ drop 8 $ IntSet.toDescList buSet + let buN = headDef defBurst $ drop 4 $ IntSet.toDescList buSet pure (buN, trimDown win $ IntSet.insert buN buSet) @@ -441,7 +441,7 @@ blockDownloadLoop env0 = do void $ liftIO $ async $ forever $ withPeerM e do - pause @'Seconds 2 + pause @'Seconds 1.5 pee <- knownPeers @e pl npi <- newPeerInfo @@ -511,11 +511,13 @@ peerDownloadLoop :: forall e m . ( MyPeer e , Request e (BlockInfo e) m , EventListener e (BlockInfo e) m , DownloadFromPeerStuff e m + , HasPeerLocator e m , m ~ PeerM e IO ) => Peer e -> BlockDownloadM e m () peerDownloadLoop peer = do bannedBlocks <- liftIO $ Cache.newCache (Just defBlockBanTime) + sizeCache <- liftIO $ Cache.newCache (Just defBlockSizeCacheTime) seenBlocks <- liftIO $ newTVarIO mempty pe <- lift ask @@ -555,21 +557,27 @@ peerDownloadLoop peer = do Just x -> Just (succ x) Nothing -> Just 1 - banned <- liftIO $ Cache.lookup bannedBlocks h <&> isJust if banned then do + pl <- getPeerLocator @e + ps <- knownPeers @e pl <&> length let seenTotal = view bsTimes st - let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (50 * seenTotal)))) - void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h) - trace $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa) + + if seenTotal < ps*100 then do + addDownload h + else do + let wa = min defBlockBanTimeSec (realToFrac (ceiling $ Prelude.logBase 10 (realToFrac (2 * seenTotal)))) + void $ liftIO $ async $ withAllStuff (pause wa >> addDownload h) + -- trace $ "block" <+> pretty h <+> "seen" <+> pretty seenTotal <+> "times" <+> parens (pretty wa) + else do liftIO $ atomically $ modifyTVar seenBlocks (HashMap.alter alterSeen h) seenTimes <- liftIO $ readTVarIO seenBlocks <&> fromMaybe 0 . HashMap.lookup h - when ( seenTimes > 1 ) do + when ( seenTimes > 5 ) do trace $ "ban block" <+> pretty h <+> "for a while" <+> parens (pretty seenTimes) liftIO $ atomically $ modifyTVar seenBlocks (HashMap.delete h) liftIO $ Cache.insert bannedBlocks h () @@ -577,17 +585,27 @@ peerDownloadLoop peer = do setBlockState h (set bsState Downloading st) r1 <- liftIO $ race ( pause defBlockInfoTimeout ) $ withPeerM e do - blksq <- liftIO newTQueueIO - subscribe @e (BlockSizeEventKey h) $ \case - (BlockSizeEvent (_,_,s)) -> do - liftIO $ atomically $ writeTQueue blksq (Just s) - (NoBlockEvent p) -> do - trace $ "NoBlockEvent" <+> pretty p <+> pretty h - liftIO $ atomically $ writeTQueue blksq Nothing + -- blksq <- liftIO newTQueueIO - request peer (GetBlockSize @e h) + cachedSize' <- liftIO $ Cache.lookup sizeCache h - liftIO $ atomically $ readTQueue blksq + case cachedSize' of + Just sz -> pure (Just sz) + Nothing -> do + subscribe @e (BlockSizeEventKey h) $ \case + (BlockSizeEvent (_,_,s)) -> do + -- liftIO $ atomically $ writeTQueue blksq (Just s) + liftIO $ Cache.insert sizeCache h s + + (NoBlockEvent p) -> do + pure () + -- trace $ "NoBlockEvent" <+> pretty p <+> pretty h + -- liftIO $ atomically $ writeTQueue blksq Nothing + + request peer (GetBlockSize @e h) + pure Nothing + + -- liftIO $ atomically $ readTQueue blksq case r1 of Left{} -> do