diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index bf342ff8..e89f8930 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -52,8 +52,13 @@ import Numeric import UnliftIO import Control.Concurrent.STM.TSem (TSem) import Control.Concurrent.STM.TSem qualified as TSem + import UnliftIO.Concurrent import UnliftIO.STM +import UnliftIO.Exception as U + +import Control.Exception qualified as E + import Lens.Micro.Platform import System.Random import System.Random.Shuffle (shuffleM,shuffle') @@ -582,6 +587,13 @@ data DCB = newDcbSTM :: TimeSpec -> Maybe HashRef -> STM DCB newDcbSTM ts parent = DCB ts parent <$> newTVar 0 <*> newTVar False + +data DownloadSweepOnIdle = + DownloadSweepOnIdle + deriving stock (Show,Typeable) + +instance Exception DownloadSweepOnIdle + data PSt = PChoose | PInit HashRef DCB @@ -596,101 +608,121 @@ downloadDispatcher :: forall e m . ( e ~ L4Proto -> SomeBrains e -> PeerEnv e -> m () -downloadDispatcher probe brains env = flip runContT pure do +downloadDispatcher probe brains env = forever $ flip runContT pure do + debug $ red "downloadDispatcher spawned!" - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) ) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async (), PeerNonce) ) - wip <- newTVarIO ( mempty :: HashMap HashRef DCB ) - parseQ <- newTQueueIO + wip <- newTVarIO ( mempty :: HashMap HashRef DCB ) + parseQ <- newTQueueIO - let - onBlockSTM :: HashRef -> STM () - onBlockSTM = writeTQueue parseQ + let + onBlockSTM :: HashRef -> STM () + onBlockSTM = writeTQueue parseQ - insertNewDownloadSTM :: TimeSpec -> HashRef -> STM () - insertNewDownloadSTM now ha = do - already <- readTVar wip <&> HM.member ha - unless already do - dcb <- newDcbSTM now mzero - modifyTVar wip (HM.insert ha dcb) + insertNewDownloadSTM :: TimeSpec -> HashRef -> STM () + insertNewDownloadSTM now ha = do + already <- readTVar wip <&> HM.member ha + unless already do + dcb <- newDcbSTM now mzero + modifyTVar wip (HM.insert ha dcb) - insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 () - insertNewDownload ha = do - now <- getTimeCoarse - atomically $ insertNewDownloadSTM now ha - newDownload @e brains ha + insertNewDownload :: forall m1 . MonadIO m1 => HashRef -> m1 () + insertNewDownload ha = do + now <- getTimeCoarse + atomically $ insertNewDownloadSTM now ha + newDownload @e brains ha - void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts + void $ ContT $ bracket none $ const do + readTVarIO pts <&> fmap fst . HM.elems >>= mapM_ cancel - sto <- withPeerM env getStorage + void $ ContT $ withAsync $ manageThreads onBlockSTM wip pts - liftIO $ withPeerM env do - subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do - here <- hasBlock sto h <&> isJust - unless here do - debug $ green "New download request" <+> pretty h - insertNewDownload (HashRef h) + sto <- withPeerM env getStorage - dupes <- newTVarIO ( mempty :: HashMap HashRef Int ) + liftIO $ withPeerM env do + subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do + here <- hasBlock sto h <&> isJust + unless here do + debug $ green "New download request" <+> pretty h + insertNewDownload (HashRef h) - ContT $ withAsync $ forever $ pause @'Seconds 10 >> do - acceptReport probe =<< S.toList_ do - wip <- readTVarIO wip <&> HM.size - pn <- readTVarIO pts <&> HM.size - S.yield ( "wip", fromIntegral wip ) - S.yield ( "peerThreads", fromIntegral pn ) + dupes <- newTVarIO ( mempty :: HashMap HashRef Int ) - ContT $ withAsync do - polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do - atomically $ modifyTVar dupes (HM.delete h) + ContT $ withAsync $ forever $ pause @'Seconds 10 >> do + acceptReport probe =<< S.toList_ do + wip <- readTVarIO wip <&> HM.size + pn <- readTVarIO pts <&> HM.size + S.yield ( "wip", fromIntegral wip ) + S.yield ( "peerThreads", fromIntegral pn ) - ContT $ withAsync do - pause @'Seconds 10 - forever $ (>> pause @'Seconds 60) $ do - down <- listDownloads @e brains - for down \(h,_) -> do - already <- readTVarIO wip <&> HM.member h - checked <- readTVarIO dupes <&> HM.member h - unless checked do - here <- hasBlock sto (coerce h) <&> isJust - when here do - atomically $ modifyTVar dupes (HM.insertWith (+) h 1) - delDownload @e brains h - unless already do - missed <- findMissedBlocks sto h - for_ missed insertNewDownload + ContT $ withAsync do + polling (Polling 10 10) (readTVarIO dupes <&> fmap (,60) . HM.keys) $ \h -> do + atomically $ modifyTVar dupes (HM.delete h) - ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do - debug "Sweep blocks" - atomically do - total <- readTVar wip <&> HM.toList + ContT $ withAsync do + pause @'Seconds 10 + forever $ (>> pause @'Seconds 60) $ do + down <- listDownloads @e brains + for down \(h,_) -> do + already <- readTVarIO wip <&> HM.member h + checked <- readTVarIO dupes <&> HM.member h + unless checked do + here <- hasBlock sto (coerce h) <&> isJust + when here do + atomically $ modifyTVar dupes (HM.insertWith (+) h 1) + delDownload @e brains h + unless already do + missed <- findMissedBlocks sto h + for_ missed insertNewDownload - alive <- for total $ \e@(h,DCB{..}) -> do - down <- readTVar dcbDownloaded - if down then - pure Nothing - else - pure (Just e) + ContT $ withAsync $ forever $ (>> pause @'Seconds 30) do + debug "Sweep blocks" + atomically do + total <- readTVar wip <&> HM.toList - writeTVar wip (HM.fromList (catMaybes alive)) + alive <- for total $ \e@(h,DCB{..}) -> do + down <- readTVar dcbDownloaded + if down then + pure Nothing + else + pure (Just e) - ContT $ withAsync $ forever do - what <- atomically $ readTQueue parseQ - missed <- findMissedBlocks sto what - for_ missed insertNewDownload + writeTVar wip (HM.fromList (catMaybes alive)) - forever $ (>> pause @'Seconds 10) do - sw0 <- readTVarIO wip <&> HM.size - n <- countDownloads @e brains - debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n) + ContT $ withAsync $ forever do + what <- atomically $ readTQueue parseQ + missed <- findMissedBlocks sto what + for_ missed insertNewDownload + + idle <- ContT $ withAsync $ do + t0 <- getTimeCoarse + flip fix t0 $ \next ti -> do + num <- readTVarIO wip <&> HM.size + t1 <- getTimeCoarse + if num /= 0 then do + pause @Seconds 5 >> next t1 + else do + let idle = expired (TimeoutSec 600) (t1 - ti) + -- debug $ blue "EXPIRED" <+> pretty (idle,t1,ti) + when idle $ throwIO DownloadSweepOnIdle + pause @Seconds 5 + next t0 + + ContT $ withAsync $ forever $ (>> pause @'Seconds 10) do + sw0 <- readTVarIO wip <&> HM.size + n <- countDownloads @e brains + debug $ yellow $ "wip" <+> pretty sw0 <+> parens (pretty n) + + void $ waitCatch idle where - manageThreads :: (HashRef -> STM ()) - -> TVar (HashMap HashRef DCB) - -> TVar (HashMap (Peer e) (Async (), PeerNonce)) - -> m () + -- manageThreads :: (HashRef -> STM ()) + -- -> TVar (HashMap HashRef DCB) + -- -> TVar (HashMap (Peer e) (Async (), PeerNonce)) + -- -> m () manageThreads onBlock wip pts = do _pnum <- newTVarIO 1 @@ -711,7 +743,6 @@ downloadDispatcher probe brains env = flip runContT pure do maybe1 mpd (pure ()) $ \PeerData{..} -> S.yield (p, _peerOwnNonce) - for_ (HM.toList peers) $ \(p,nonce) -> do here <- readTVarIO pts <&> HM.member p