From ec9e7230cc001c06036ac91269ea71c06d377f9e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Thu, 7 Nov 2024 13:53:43 +0300 Subject: [PATCH] wip --- hbs2-core/lib/HBS2/Clock.hs | 2 +- hbs2-peer/app/BlockDownloadNew.hs | 147 ++++++++++++++++++++++++++++-- hbs2-peer/app/PeerMain.hs | 2 +- 3 files changed, 139 insertions(+), 12 deletions(-) diff --git a/hbs2-core/lib/HBS2/Clock.hs b/hbs2-core/lib/HBS2/Clock.hs index 4b4a18d1..d59daa41 100644 --- a/hbs2-core/lib/HBS2/Clock.hs +++ b/hbs2-core/lib/HBS2/Clock.hs @@ -3,7 +3,7 @@ module HBS2.Clock ( module HBS2.Clock , module System.Clock - , POSIXTime, getPOSIXTime + , POSIXTime, getPOSIXTime, NominalDiffTime )where import Data.Functor diff --git a/hbs2-peer/app/BlockDownloadNew.hs b/hbs2-peer/app/BlockDownloadNew.hs index 4fc4ba37..836241ec 100644 --- a/hbs2-peer/app/BlockDownloadNew.hs +++ b/hbs2-peer/app/BlockDownloadNew.hs @@ -4,6 +4,7 @@ module BlockDownloadNew where import HBS2.Prelude.Plated +import HBS2.Clock import HBS2.OrDie import HBS2.Data.Detect import HBS2.Hash @@ -521,34 +522,160 @@ downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do +data S1 = + S1Init + | S1QuerySize (Hash HbSync) + | S1CheckMissed (Hash HbSync) + downloadDispatcher :: forall e m . ( e ~ L4Proto , MonadUnliftIO m ) - => PeerEnv e + => SomeBrains e + -> PeerEnv e -> m () -downloadDispatcher env = flip runContT pure do +downloadDispatcher brains env = flip runContT pure do - pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) + pts <- newTVarIO ( mempty :: HashMap (Peer e) (Async ()) ) + down <- newTVarIO ( mempty :: HashMap (Peer e) (HashMap (Hash HbSync) Integer) ) + use <- newTVarIO ( mempty :: HashMap (Hash HbSync) Integer ) + + rq <- newTQueueIO + + sto <- withPeerM env getStorage + + work <- newTQueueIO ContT $ bracket none $ const do readTVarIO pts >>= mapM_ cancel atomically $ writeTVar pts mempty + ContT $ withAsync $ forever do + join $ atomically (readTQueue work) + liftIO $ withPeerM env do subscribe @e DownloadReqKey $ \(DownloadReqData h) -> do debug $ green "Download request" <+> pretty h + let w = do + -- here <- hasBlock sto h <&> isJust + -- let hs = if not here then [h] else mempty + -- missed <- findMissedBlocks sto (HashRef h) + -- for_ ( hs <> fmap coerce missed ) $ \hx -> do + atomically $ writeTQueue rq h + -- pause @'Seconds 0.01 - pause @'Seconds 1 + atomically $ writeTQueue work w - ContT $ withAsync $ withPeerM env $ forever do - pips <- getKnownPeers @e - for_ pips $ \p -> do - debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) - - pause @'Seconds 5 + ContT $ withAsync (manageThreads pts down use) + ContT $ withAsync (sizeQueryLoop pts rq down) forever do pause @'Seconds 10 debug $ yellow $ "I'm download dispatcher" + u <- readTVarIO use <&> length . HM.keys + d <- readTVarIO down <&> HS.size . HS.unions . fmap HM.keysSet . HM.elems + debug $ yellow $ "wip:" <+> pretty d + + where + + manageThreads pts down use = withPeerM env $ forever do + pips <- getKnownPeers @e <&> HS.fromList + + for_ pips $ \p -> do + here <- readTVarIO pts <&> HM.member p + unless here do + a <- async $ peerDownloadLoop env p down use + atomically $ modifyTVar pts (HM.insert p a) + + debug $ "downloadDispatcher: knownPeer" <+> yellow (pretty p) + + dead <- atomically do + total <- readTVar pts <&> HM.toList + + what <- for total $ \(p,a) -> do + let pipExist = HS.member p pips + stillAlive <- pollSTM a <&> isNothing + + if pipExist && stillAlive then do + pure $ Right (p,a) + else + pure $ Left (p,a) + + writeTVar pts (HM.fromList (rights what)) + pure $ lefts what + + for_ dead $ \(p,a) -> do + cancel a + debug $ red "terminating peer loop" <+> pretty p + + pause @'Seconds 5 + + sizeQueryLoop pts rq down = flip runContT pure do + sto <- withPeerM env getStorage + wip <- newTVarIO ( mempty :: HashMap (Hash HbSync) NominalDiffTime ) + + void $ ContT $ withAsync $ replicateM_ 4 do + flip fix S1Init $ \next -> \case + S1Init -> do + + w <- atomically $ readTQueue rq + here <- hasBlock sto w <&> isJust + + if not here then + next (S1QuerySize w) + else + next (S1CheckMissed w) + + S1QuerySize h -> do + debug $ "QUERY SIZE1" <+> pretty h + atomically $ modifyTVar wip (HM.insert h 10) + next S1Init + + S1CheckMissed h -> do + -- debug $ "CHECK MISSED!" <+> pretty h + next S1Init + + void $ ContT $ withAsync $ forever do + pause @'Seconds 10 + + let blkz = readTVarIO wip <&> HM.toList + + polling (Polling 1 1) blkz $ \h -> liftIO do + + pips <- readTVarIO pts <&> HM.keys + + forConcurrently_ pips $ \p -> do + + debug $ "QUERY SIZE" <+> pretty h <+> pretty p + r <- queryBlockSizeFromPeer brains env h p + + case r of + Right (Just size) -> do + atomically do + modifyTVar wip (HM.delete h) + modifyTVar down (HM.insertWith (<>) p (HM.singleton h size)) + + Right Nothing -> do + atomically $ modifyTVar wip (HM.adjust ((*1.10).(+60)) h) + + Left{} -> do + atomically $ modifyTVar wip (HM.adjust (*1.05) h) + forever do + pause @'Seconds 10 + + peerDownloadLoop env p down use = forever do + debug $ "Peer download loop" <+> green (pretty p) + hh <- atomically do + u <- readTVar use + q <- readTVar down <&> HM.toList . fromMaybe mempty . HM.lookup p + let blk = headMay [ (k,v) | (k, v) <- q, fromMaybe 0 (HM.lookup k u) == 0 ] + case blk of + Just (h,size) -> do + modifyTVar use (HM.insertWith (+) h 1) + pure (h,size) + + Nothing -> retry + + debug $ red "START TO DOWNLOAD" <+> pretty hh <+> "FROM" <+> pretty p + diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index 1043cbff..9657aeea 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1197,7 +1197,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) -- FIXME: new-download-loop - peerThread "downloadDispatcher" (downloadDispatcher env) + peerThread "downloadDispatcher" (downloadDispatcher (SomeBrains brains) env) peerThread "fillPeerMeta" (fillPeerMeta tcp tcpProbeWait)