From d5abcf331ef6a4aac86fddd04200e4c087beef1a Mon Sep 17 00:00:00 2001 From: voidlizard Date: Wed, 13 Nov 2024 18:53:22 +0300 Subject: [PATCH] more or less works --- hbs2-peer/app/BlockDownloadNew.hs | 94 +++++++++++++++++++------------ 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 4bbb7c9c..99120d14 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -569,13 +569,14 @@ data Work = -- | Download control block data DCB = DCB - { dcbStart :: TimeSpec - , dcbBusy :: TVar Int - , dcbDownloaded :: TVar Bool + { dcbStart :: !TimeSpec + , dcbParent :: !(Maybe HashRef) + , dcbBusy :: !(TVar Int) + , dcbDownloaded :: !(TVar Bool) } -newDcbSTM :: TimeSpec -> STM DCB -newDcbSTM ts = DCB ts <$> newTVar 0 <*> newTVar False +newDcbSTM :: TimeSpec -> Maybe HashRef -> STM DCB +newDcbSTM ts parent = DCB ts parent <$> newTVar 0 <*> newTVar False data PSt = PChoose @@ -616,12 +617,12 @@ downloadDispatcher brains env = flip runContT pure do debug $ green "New download request" <+> pretty h atomically do already <- readTVar wip <&> HPSQ.member (HashRef h) - dcb <- newDcbSTM now + dcb <- newDcbSTM now mzero let w = 1.0 -- realToFrac now unless already do modifyTVar wip (HPSQ.insert (HashRef h) w dcb) - ContT $ withAsync $ forever $ (>> pause @'Seconds 5) do + ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do debug "Sweep blocks" atomically do total <- readTVar wip <&> HPSQ.toList @@ -636,13 +637,13 @@ downloadDispatcher brains env = flip runContT pure do writeTVar wip (HPSQ.fromList (catMaybes alive)) - ContT $ withAsync $ replicateM_ 4 $ forever do + ContT $ withAsync $ forever do what <- atomically $ readTQueue parseQ missed <- findMissedBlocks sto what now <- getTimeCoarse for_ missed $ \hi -> do atomically do - dcb <- newDcbSTM now + dcb <- newDcbSTM now (Just what) let w = realToFrac now already <- readTVar wip <&> HPSQ.member hi unless already do @@ -654,27 +655,31 @@ downloadDispatcher brains env = flip runContT pure do where - manageThreads onBlock wip pts = forever $ (>> pause @'Seconds 10) $ withPeerM env do - debug "MANAGE THREADS" + manageThreads onBlock wip pts = do + _pnum <- newTVarIO 1 - peers <- getKnownPeers @e <&> HS.fromList + forever $ (>> pause @'Seconds 10) $ withPeerM env do + debug "MANAGE THREADS" - for_ peers $ \p -> do - here <- readTVarIO pts <&> HM.member p - unless here do - a <- async (peerThread onBlock p wip) - atomically $ modifyTVar pts (HM.insert p a) + peers <- getKnownPeers @e <&> HS.fromList - loosers <- atomically do - xs <- readTVar pts <&> HM.toList - -- FIXME: filter-stopped-tasks - let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs - writeTVar pts (HM.fromList alive) - pure dead + for_ peers $ \p -> do + here <- readTVarIO pts <&> HM.member p + i <- atomically $ stateTVar _pnum (\i -> (i, succ i)) + unless here do + a <- async (peerThread i onBlock p wip) + atomically $ modifyTVar pts (HM.insert p a) - mapM_ cancel (fmap snd loosers) + loosers <- atomically do + xs <- readTVar pts <&> HM.toList + -- FIXME: filter-stopped-tasks + let (alive,dead) = L.partition ( \(x,_) -> HS.member x peers ) xs + writeTVar pts (HM.fromList alive) + pure dead - peerThread onBlock p wip = flip runContT pure do + mapM_ cancel (fmap snd loosers) + + peerThread me onBlock p wip = flip runContT pure do btimes <- newTVarIO ( mempty :: [Double] ) @@ -685,7 +690,7 @@ downloadDispatcher brains env = flip runContT pure do _sizeCache <- newTVarIO ( mempty :: HashMap HashRef (Maybe Integer) ) - bm <- liftIO $ newBurstMachine 10 256 (Just 50) 0.10 0.25 + bm <- liftIO $ newBurstMachine 5 256 (Just 80) 0.10 0.25 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p @@ -721,26 +726,45 @@ downloadDispatcher brains env = flip runContT pure do PChoose -> do what <- atomically do + r <- newTVar ( HPSQ.empty @HashRef @Double @DCB ) blocks <- readTVar wip <&> HPSQ.toList let todo = blocks flip fix todo $ \loop w -> do erno <- readTVar _errors if erno > 10 then - pure Nothing + pure () else do case w of - [] -> retry - (h,_,dcb):xs -> do + [] -> none + + (h,_,dcb@DCB{..}):xs -> do wpsize <- readTVar wip <&> HPSQ.size let trsh = if wpsize < 10 then 3 else 0 - busy <- readTVar (dcbBusy dcb) - down <- readTVar (dcbDownloaded dcb) + busy <- readTVar dcbBusy + down <- readTVar dcbDownloaded absent <- readTVar _sizeCache <&> (== Just Nothing) . HM.lookup h if busy > trsh || down || absent then loop xs else do - modifyTVar (dcbBusy dcb) succ - pure $ Just (h,dcb) + sizeCache <- readTVar _sizeCache + + let eps = case dcbParent of + Nothing -> 1.0 + Just hp -> case HM.lookup hp sizeCache of + Just (Just _) -> 0.5 + Just Nothing -> 1.5 + Nothing -> 1.0 + + modifyTVar r (HPSQ.insert h eps dcb) + s <- readTVar r <&> HPSQ.size + if s >= 8 then pure () else loop xs + + w <- readTVar r <&> HPSQ.findMin + case w of + Nothing -> retry + Just (h,_,d) -> do + modifyTVar (dcbBusy d) succ + pure (Just (h,d)) case what of Just (hx, dcb) -> go (PInit hx dcb) @@ -749,7 +773,7 @@ downloadDispatcher brains env = flip runContT pure do if erno > 50 then do pause @'Seconds 60 else do - pause @'Seconds 1 + pause @'Seconds 0.25 go PChoose PInit hx dcb -> do @@ -802,7 +826,7 @@ downloadDispatcher brains env = flip runContT pure do avg <- readTVarIO _avg - when (dtsec > avg * 1.10) do + when (dtsec > avg * 2) do burstMachineAddErrors bm 1 atomically do