diff --git a/cabal.project b/cabal.project index b147e7fb..d182df83 100644 --- a/cabal.project +++ b/cabal.project @@ -10,6 +10,6 @@ constraints: , http-client >=0.7.16 && <0.8 -- executable-static: True -profiling: True +-- profiling: True --library-profiling: False diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index e89f8930..16a16a2a 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -608,12 +608,10 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> SomeBrains e -> PeerEnv e -> m () -downloadDispatcher probe brains env = forever $ flip runContT pure do - +downloadDispatcher probe brains env = do debug $ red "downloadDispatcher spawned!" - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) ) - + sto <- withPeerM env getStorage wip <- newTVarIO ( mempty :: HashMap HashRef DCB ) parseQ <- newTQueueIO @@ -634,13 +632,6 @@ downloadDispatcher probe brains env = forever $ flip runContT pure do atomically $ insertNewDownloadSTM now ha newDownload @e brains ha - void $ ContT $ bracket none $ const do - readTVarIO pts <&> fmap fst . HM.elems >>= mapM_ cancel - - void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts - - sto <- withPeerM env getStorage - liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do here <- hasBlock sto h <&> isJust @@ -648,74 +639,83 @@ downloadDispatcher probe brains env = forever $ flip runContT pure do debug $ green "New download request" <+> pretty h insertNewDownload (HashRef h) - dupes <- newTVarIO ( mempty :: HashMap HashRef Int ) + forever $ flip runContT pure do - ContT $ withAsync $ forever $ pause @'Seconds 10 >> do - acceptReport probe =<< S.toList_ do - wip <- readTVarIO wip <&> HM.size - pn <- readTVarIO pts <&> HM.size - S.yield ( "wip", fromIntegral wip ) - S.yield ( "peerThreads", fromIntegral pn ) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) ) - ContT $ withAsync do - polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do - atomically $ modifyTVar dupes (HM.delete h) + void $ ContT $ bracket none $ const do + readTVarIO pts <&> fmap fst . HM.elems >>= mapM_ cancel - ContT $ withAsync do - pause @'Seconds 10 - forever $ (>> pause @'Seconds 60) $ do - down <- listDownloads @e brains - for down \(h,_) -> do - already <- readTVarIO wip <&> HM.member h - checked <- readTVarIO dupes <&> HM.member h - unless checked do - here <- hasBlock sto (coerce h) <&> isJust - when here do - atomically $ modifyTVar dupes (HM.insertWith (+) h 1) - delDownload @e brains h - unless already do - missed <- findMissedBlocks sto h - for_ missed insertNewDownload + void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts - ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do - debug "Sweep blocks" - atomically do - total <- readTVar wip <&> HM.toList + dupes <- newTVarIO ( mempty :: HashMap HashRef Int ) - alive <- for total $ \e@(h,DCB{..}) -> do - down <- readTVar dcbDownloaded - if down then - pure Nothing - else - pure (Just e) + ContT $ withAsync $ forever $ pause @'Seconds 10 >> do + acceptReport probe =<< S.toList_ do + wip <- readTVarIO wip <&> HM.size + pn <- readTVarIO pts <&> HM.size + S.yield ( "wip", fromIntegral wip ) + S.yield ( "peerThreads", fromIntegral pn ) - writeTVar wip (HM.fromList (catMaybes alive)) + ContT $ withAsync do + polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do + atomically $ modifyTVar dupes (HM.delete h) - ContT $ withAsync $ forever do - what <- atomically $ readTQueue parseQ - missed <- findMissedBlocks sto what - for_ missed insertNewDownload + ContT $ withAsync do + pause @'Seconds 10 + forever $ (>> pause @'Seconds 60) $ do + down <- listDownloads @e brains + for down \(h,_) -> do + already <- readTVarIO wip <&> HM.member h + checked <- readTVarIO dupes <&> HM.member h + unless checked do + here <- hasBlock sto (coerce h) <&> isJust + when here do + atomically $ modifyTVar dupes (HM.insertWith (+) h 1) + delDownload @e brains h + unless already do + missed <- findMissedBlocks sto h + for_ missed insertNewDownload - idle <- ContT $ withAsync $ do - t0 <- getTimeCoarse - flip fix t0 $ \next ti -> do - num <- readTVarIO wip <&> HM.size - t1 <- getTimeCoarse - if num /= 0 then do - pause @Seconds 5 >> next t1 - else do - let idle = expired (TimeoutSec 600) (t1 - ti) - -- debug $ blue "EXPIRED" <+> pretty (idle,t1,ti) - when idle $ throwIO DownloadSweepOnIdle - pause @Seconds 5 - next t0 + ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do + debug "Sweep blocks" + atomically do + total <- readTVar wip <&> HM.toList - ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do - sw0 <- readTVarIO wip <&> HM.size - n <- countDownloads @e brains - debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n) + alive <- for total $ \e@(h,DCB{..}) -> do + down <- readTVar dcbDownloaded + if down then + pure Nothing + else + pure (Just e) - void $ waitCatch idle + writeTVar wip (HM.fromList (catMaybes alive)) + + ContT $ withAsync $ forever do + what <- atomically $ readTQueue parseQ + missed <- findMissedBlocks sto what + for_ missed insertNewDownload + + idle <- ContT $ withAsync $ do + t0 <- getTimeCoarse + flip fix t0 $ \next ti -> do + num <- readTVarIO wip <&> HM.size + t1 <- getTimeCoarse + if num /= 0 then do + pause @Seconds 5 >> next t1 + else do + let idle = expired (TimeoutSec 600) (t1 - ti) + -- debug $ blue "EXPIRED" <+> pretty (idle,t1,ti) + when idle $ throwIO DownloadSweepOnIdle + pause @Seconds 5 + next t0 + + ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do + sw0 <- readTVarIO wip <&> HM.size + n <- countDownloads @e brains + debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n) + + void $ waitCatch idle where