From b6c97a254ab13d69e1a54ff65a64cd58e7489fcf Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 5 Nov 2024 09:19:25 +0300 Subject: [PATCH] fucked --- hbs2-peer/app/PeerMain.hs | 2 +- hbs2-peer/app/RPC2.hs | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/hbs2-peer/app/PeerMain.hs b/hbs2-peer/app/PeerMain.hs index fbc9f1ba..6934c0d6 100644 --- a/hbs2-peer/app/PeerMain.hs +++ b/hbs2-peer/app/PeerMain.hs @@ -1202,7 +1202,7 @@ runPeer opts = respawnOnError opts $ do peerThread "pexLoop" (pexLoop @e brains tcp) - peerThread "blockDownloadLoop" (blockDownloadLoop denv) + -- peerThread "blockDownloadLoop" (blockDownloadLoop denv) peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 8bfa7e53..a0389c4c 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -32,6 +32,7 @@ import HBS2.Peer.Proto.BlockInfo import HBS2.Peer.Proto.BlockChunks import HBS2.Peer.Brains import HBS2.Storage +import HBS2.Storage.Operations.Missed import HBS2.Clock import HBS2.Net.Auth.Schema @@ -54,6 +55,8 @@ import PeerInfo import Control.Monad.Trans.Maybe import Control.Monad.Trans.Cont import Control.Concurrent.STM (flushTQueue,retry) +import Data.HashSet (HashSet) +import Data.HashSet qualified as HS import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) import Data.Text qualified as Text @@ -175,6 +178,7 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto downloadFromPeerRec t cache env h0 peer = do + w <- newTVarIO (mempty :: HashSet (Hash HbSync) ) q <- newTQueueIO p <- newTQueueIO @@ -182,7 +186,17 @@ downloadFromPeerRec t cache env h0 peer = do sto <- withPeerM env getStorage + let addBlocks hx = atomically do + for_ hx $ \h -> do + writeTQueue q h + -- here <- readTVar w <&> HS.member h + -- unless here do + -- modifyTVar w (HS.insert h) + flip runContT pure do + + atomically $ writeTQueue q h0 + callCC \exit1 -> do void $ ContT $ withAsync $ forever do @@ -205,15 +219,24 @@ downloadFromPeerRec t cache env h0 peer = do r <- case h of Nothing -> exit1 okay Just b -> do + debug $ "BLOCK TO DOWNLOAD" <+> pretty b + missed <- findMissedBlocks sto (HashRef b) + addBlocks (fmap coerce missed) blk <- getBlock sto b case blk of Just bs -> pure (Right (b,bs)) Nothing -> do - w <- lift (downloadFromPeer t cache env b peer) + debug $ "GO DOWNLOAD" <+> pretty b + w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer) + + when (isLeft w) do + addBlocks [b] + pure $ fmap (b,) w case r of Left (PeerRequestTimeout{}) -> do + debug "DOWNLOAD STUCK!" checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) Left (DownloadStuckError he pe) -> do @@ -223,7 +246,14 @@ downloadFromPeerRec t cache env h0 peer = do Right (hx,bs) -> do resetTimeouts timeouts - atomically $ writeTQueue p (hx,bs) + let refs = extractBlockRefs hx bs + + for_ refs $ \z -> do + debug $ "PARSED REF" <+> pretty z + + atomically $ mapM (writeTQueue q) refs + pause @'Seconds 0.01 + -- atomically $ writeTQueue p (hx,bs) next pure okay @@ -237,7 +267,7 @@ downloadFromPeerRec t cache env h0 peer = do modifyTVar timeouts succ readTVar timeouts - if tn < 3 then n else e + if tn < 10 then n else e okay = Right () @@ -289,7 +319,7 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do worker <- ContT $ withAsync do atomically do wtf <- readTVar _sBlockChunks2 - unless (IntMap.size wtf == total) retry + unless (IntMap.size wtf >= total) retry pure wtf waiter <- ContT $ withAsync $ fix \next -> do