From e53273d3b6c22383746120a4427c1a9317a667e7 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sat, 9 Nov 2024 10:29:07 +0300 Subject: [PATCH] wip --- hbs2-peer/app/BlockDownloadNew.hs | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 4bc3b216..7dec176a 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -554,10 +554,17 @@ downloadDispatcher brains env = flip runContT pure do rq <- newTQueueIO let seenLimit = 1000 + seen <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef TimeSpec () ) blkQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef Int NominalDiffTime ) + let sizeCacheLimit = 10000 + + sizeCache <- newTVarIO ( HPSQ.empty :: HashPSQ (HashRef,Peer e) TimeSpec (Maybe Integer) ) + + sizeQ <- newTVarIO ( HPSQ.empty :: HashPSQ HashRef NominalDiffTime () ) + sto <- withPeerM env getStorage work <- newTQueueIO @@ -571,6 +578,7 @@ downloadDispatcher brains env = flip runContT pure do ContT $ withAsync $ forever do pause @'Seconds 600 + debug $ "CLEANUP SEEN" atomically do fix \next -> do @@ -579,6 +587,14 @@ downloadDispatcher brains env = flip runContT pure do modifyTVar seen HPSQ.deleteMin next + debug $ "CLEANUP SIZES" + atomically do + fix \next -> do + n <- readTVar sizeCache <&> HPSQ.size + when (n > sizeCacheLimit) do + modifyTVar sizeCache HPSQ.deleteMin + next + liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do now <- getTimeCoarse @@ -600,6 +616,23 @@ downloadDispatcher brains env = flip runContT pure do Nothing -> ((), Nothing) Just (p,v) -> ((), Just (succ p, v * 1.10 )) + ContT $ withAsync $ + polling (Polling 1 1) missChk $ \h -> do + pips <- readTVarIO pts <&> HM.keys + forConcurrently_ pips $ \p -> do + now <- getTimeCoarse + here <- readTVarIO sizeCache <&> HPSQ.member (h,p) + hereB <- hasBlock sto (coerce h) <&> isJust + when (not here && not hereB) do + size <- queryBlockSizeFromPeer brains env (coerce h) p + case size of + Left{} -> pure () + Right w -> do + atomically $ modifyTVar sizeCache ( HPSQ.insert (h,p) now w ) + debug $ green "GOT SIZE" <+> pretty w <+> pretty h <+> pretty p + + atomically $ modifyTVar blkQ ( snd . HPSQ.alter shiftPrio h ) + ContT $ withAsync $ forever do polling (Polling 1 1) missChk $ \h -> do debug $ blue "CHECK MISSED BLOCKS" <+> pretty h