From 3d223d61d3ebd51bae743be46b412391629ff3a6 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 13 Nov 2024 17:22:37 +0300 Subject: [PATCH] wip --- hbs2-peer/app/BlockDownloadNew.hs | 43 +++++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index c24a3511..fc16617d 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -597,8 +597,14 @@ downloadDispatcher brains env = flip runContT pure do _blkNum <- newTVarIO 0 wip <- newTVarIO ( HPSQ.empty @HashRef @Double @DCB ) + parseQ <- newTQueueIO - void $ ContT $ withAsync $ manageThreads wip pts + + let + onBlockSTM :: HashRef -> STM () + onBlockSTM = writeTQueue parseQ + + void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts sto <- withPeerM env getStorage @@ -611,8 +617,9 @@ downloadDispatcher brains env = flip runContT pure do atomically do already <- readTVar wip <&> HPSQ.member (HashRef h) dcb <- newDcbSTM now + let w = 1.0 -- realToFrac now unless already do - modifyTVar wip (HPSQ.insert (HashRef h) 1.0 dcb) + modifyTVar wip (HPSQ.insert (HashRef h) w dcb) ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do debug "Sweep blocks" @@ -628,13 +635,24 @@ downloadDispatcher brains env = flip runContT pure do writeTVar wip (HPSQ.fromList (catMaybes alive)) + + ContT $ withAsync $ replicateM_ 4 $ forever do + what <- atomically $ readTQueue parseQ + missed <- findMissedBlocks sto what + now <- getTimeCoarse + for_ missed $ \hi -> do + atomically do + dcb <- newDcbSTM now + let w = realToFrac now + modifyTVar wip (HPSQ.insert hi 1.0 dcb) + forever $ (>> pause @'Seconds 10) do sw0 <- readTVarIO wip <&> HPSQ.size debug $ yellow $ "wip0" <+> pretty sw0 where - manageThreads wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do + manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do debug "MANAGE THREADS" peers <- getKnownPeers @e <&> HS.fromList @@ -642,7 +660,7 @@ downloadDispatcher brains env = flip runContT pure do for_ peers $ \p -> do here <- readTVarIO pts <&> HM.member p unless here do - a <- async (peerThread p wip) + a <- async (peerThread onBlock p wip) atomically $ modifyTVar pts (HM.insert p a) loosers <- atomically do @@ -654,7 +672,7 @@ downloadDispatcher brains env = flip runContT pure do mapM_ cancel (fmap snd loosers) - peerThread p wip = flip runContT pure do + peerThread onBlock p wip = flip runContT pure do btimes <- newTVarIO ( mempty :: [Double] ) @@ -687,15 +705,6 @@ downloadDispatcher brains env = flip runContT pure do parseQ <- newTQueueIO - tparse <- ContT $ withAsync $ forever do - what <- atomically $ readTQueue parseQ - missed <- findMissedBlocks sto what - now <- getTimeCoarse - for_ missed $ \hi -> do - atomically do - dcb <- newDcbSTM now - modifyTVar wip (HPSQ.insert hi 1.0 dcb) - bmt <- ContT $ withAsync $ runBurstMachine bm tstat <- ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do @@ -792,7 +801,7 @@ downloadDispatcher brains env = flip runContT pure do avg <- readTVarIO _avg - when (dtsec > avg * 1.05) do + when (dtsec > avg * 1.10) do burstMachineAddErrors bm 1 atomically $ modifyTVar btimes ( take 100 . (dtsec :) ) @@ -814,11 +823,11 @@ downloadDispatcher brains env = flip runContT pure do else do -- modifyTVar (dcbBusy dcb) pred modifyTVar wip (HPSQ.delete hx) - writeTQueue parseQ hx + onBlock hx bs <- ContT $ withAsync $ forever do pause @'Seconds 10 debug $ yellow "I'm thread" <+> pretty p - void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep,tparse] + void $ waitAnyCatchCancel [bmt,bs,twork,tstat,tsweep]