From 08816dfc46e0439878c13f44367b54289ecc544e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Sat, 16 Nov 2024 08:41:33 +0300 Subject: [PATCH] bring back persistent DownloadQ --- hbs2-peer/app/BlockDownloadNew.hs | 60 +++++++++++++++++++++---------- hbs2-peer/app/Brains.hs | 9 +++++ hbs2-peer/lib/HBS2/Peer/Brains.hs | 8 +++++ 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index b86dc705..d7f43d9c 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -606,6 +606,19 @@ downloadDispatcher brains env = flip runContT pure do onBlockSTM :: HashRef -> STM () onBlockSTM = writeTQueue parseQ + insertNewDownloadSTM :: TimeSpec -> HashRef -> STM () + insertNewDownloadSTM now ha = do + already <- readTVar wip <&> HM.member ha + unless already do + dcb <- newDcbSTM now mzero + modifyTVar wip (HM.insert ha dcb) + + insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 () + insertNewDownload ha = do + now <- getTimeCoarse + atomically $ insertNewDownloadSTM now ha + newDownload @e brains ha + void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts sto <- withPeerM env getStorage @@ -614,14 +627,30 @@ downloadDispatcher brains env = flip runContT pure do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do here <- hasBlock sto h <&> isJust unless here do - now <- getTimeCoarse debug $ green "New download request" <+> pretty h - atomically do - already <- readTVar wip <&> HM.member (HashRef h) - dcb <- newDcbSTM now mzero - let w = 1.0 -- realToFrac now - unless already do - modifyTVar wip (HM.insert (HashRef h) dcb) + insertNewDownload (HashRef h) + + dupes <- newTVarIO ( mempty :: HashMap HashRef Int ) + + ContT $ withAsync do + polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do + atomically $ modifyTVar dupes (HM.delete h) + + ContT $ withAsync do + pause @'Seconds 10 + forever $ (>> pause @'Seconds 60) $ do + down <- listDownloads @e brains + for down \(h,_) -> do + already <- readTVarIO wip <&> HM.member h + checked <- readTVarIO dupes <&> HM.member h + unless checked do + here <- hasBlock sto (coerce h) <&> isJust + when here do + atomically $ modifyTVar dupes (HM.insertWith (+) h 1) + delDownload @e brains h + unless already do + missed <- findMissedBlocks sto h + for_ missed insertNewDownload ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do debug "Sweep blocks" @@ -637,22 +666,15 @@ downloadDispatcher brains env = flip runContT pure do writeTVar wip (HM.fromList (catMaybes alive)) - ContT $ withAsync $ forever do what <- atomically $ readTQueue parseQ missed <- findMissedBlocks sto what - now <- getTimeCoarse - for_ missed $ \hi -> do - atomically do - dcb <- newDcbSTM now (Just what) - let w = realToFrac now - already <- readTVar wip <&> HM.member hi - unless already do - modifyTVar wip (HM.insert hi dcb) + for_ missed insertNewDownload forever $ (>> pause @'Seconds 10) do sw0 <- readTVarIO wip <&> HM.size - debug $ yellow $ "wip0" <+> pretty sw0 + n <- countDownloads @e brains + debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n) where @@ -729,8 +751,8 @@ downloadDispatcher brains env = flip runContT pure do -- pure (ConstBurstMachine 256) -- newBurstMachine 60 256 (Just 256) 0.20 0.10 bm <- liftIO do case _sockType p of - TCP -> AnyBurstMachine @IO <$> newBurstMachine 3 256 (Just 50) 0.10 0.35 - UDP -> AnyBurstMachine @IO <$> newBurstMachine 3 256 (Just 50) 0.10 0.35 + TCP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 100) 0.10 0.35 + UDP -> AnyBurstMachine @IO <$> newBurstMachine 5 256 (Just 50) 0.10 0.35 void $ ContT $ bracket none $ const do debug $ "Cancelling thread for" <+> pretty p diff --git a/hbs2-peer/app/Brains.hs b/hbs2-peer/app/Brains.hs index 0d86c793..b8ae041f 100644 --- a/hbs2-peer/app/Brains.hs +++ b/hbs2-peer/app/Brains.hs @@ -122,6 +122,11 @@ instance ( Hashable (Peer e) listDownloads = liftIO . selectDownloads + countDownloads b = do + let conn = view brainsDb b + liftIO $ query_ conn [qc|select count(hash) from download|] + <&> headDef 0 . fmap fromOnly + listPexInfo = liftIO . selectPexInfo updatePexInfo b pex = updateOP b $ insertPexInfo b pex @@ -130,6 +135,10 @@ instance ( Hashable (Peer e) liftIO $ Cache.insert (view brainsRemoved br) what () updateOP br (deleteDownload br what) + newDownload br what = do + -- debug $ "Brains: newDownload" <+> pretty what + updateOP br (insertDownload br mzero what) + onKnownPeers br ps = do trace $ "BRAINS: onKnownPeers" <+> pretty ps let tv = view brainsPeers br diff --git a/hbs2-peer/lib/HBS2/Peer/Brains.hs b/hbs2-peer/lib/HBS2/Peer/Brains.hs index f17cdbcd..0a87e63a 100644 --- a/hbs2-peer/lib/HBS2/Peer/Brains.hs +++ b/hbs2-peer/lib/HBS2/Peer/Brains.hs @@ -58,9 +58,15 @@ class HasBrains e a where updatePexInfo :: MonadIO m => a -> [PeerAddr e] -> m () updatePexInfo _ _ = pure () + countDownloads :: MonadIO m => a -> m Int + countDownloads _ = pure 0 + listDownloads :: MonadIO m => a -> m [(HashRef, Integer)] listDownloads _ = pure mempty + newDownload :: MonadIO m => a -> HashRef -> m () + newDownload _ _ = none + delDownload :: MonadIO m => a -> HashRef -> m () delDownload _ _ = pure () @@ -196,7 +202,9 @@ instance HasBrains e (SomeBrains e) where updatePexInfo (SomeBrains a) = updatePexInfo @e a listDownloads (SomeBrains a) = listDownloads @e a + countDownloads (SomeBrains a) = countDownloads @e a delDownload (SomeBrains a) = delDownload @e a + newDownload (SomeBrains a) = newDownload @e a onKnownPeers (SomeBrains a) = onKnownPeers a onBlockSize (SomeBrains a) = onBlockSize a