This commit is contained in:
voidlizard 2024-11-05 09:19:25 +03:00
parent cb5512d114
commit 71dd5d65b9
2 changed files with 35 additions and 5 deletions

View File

@ -1202,7 +1202,7 @@ runPeer opts = respawnOnError opts $ do
peerThread "pexLoop" (pexLoop @e brains tcp) peerThread "pexLoop" (pexLoop @e brains tcp)
peerThread "blockDownloadLoop" (blockDownloadLoop denv) -- peerThread "blockDownloadLoop" (blockDownloadLoop denv)
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv) peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)

View File

@ -32,6 +32,7 @@ import HBS2.Peer.Proto.BlockInfo
import HBS2.Peer.Proto.BlockChunks import HBS2.Peer.Proto.BlockChunks
import HBS2.Peer.Brains import HBS2.Peer.Brains
import HBS2.Storage import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Clock import HBS2.Clock
import HBS2.Net.Auth.Schema import HBS2.Net.Auth.Schema
@ -54,6 +55,8 @@ import PeerInfo
import Control.Monad.Trans.Maybe import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry) import Control.Concurrent.STM (flushTQueue,retry)
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap) import Data.IntMap (IntMap)
import Data.Text qualified as Text import Data.Text qualified as Text
@ -175,6 +178,7 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
downloadFromPeerRec t cache env h0 peer = do downloadFromPeerRec t cache env h0 peer = do
w <- newTVarIO (mempty :: HashSet (Hash HbSync) )
q <- newTQueueIO q <- newTQueueIO
p <- newTQueueIO p <- newTQueueIO
@ -182,7 +186,17 @@ downloadFromPeerRec t cache env h0 peer = do
sto <- withPeerM env getStorage sto <- withPeerM env getStorage
let addBlocks hx = atomically do
for_ hx $ \h -> do
writeTQueue q h
-- here <- readTVar w <&> HS.member h
-- unless here do
-- modifyTVar w (HS.insert h)
flip runContT pure do flip runContT pure do
atomically $ writeTQueue q h0
callCC \exit1 -> do callCC \exit1 -> do
void $ ContT $ withAsync $ forever do void $ ContT $ withAsync $ forever do
@ -205,15 +219,24 @@ downloadFromPeerRec t cache env h0 peer = do
r <- case h of r <- case h of
Nothing -> exit1 okay Nothing -> exit1 okay
Just b -> do Just b -> do
debug $ "BLOCK TO DOWNLOAD" <+> pretty b
missed <- findMissedBlocks sto (HashRef b)
addBlocks (fmap coerce missed)
blk <- getBlock sto b blk <- getBlock sto b
case blk of case blk of
Just bs -> pure (Right (b,bs)) Just bs -> pure (Right (b,bs))
Nothing -> do Nothing -> do
w <- lift (downloadFromPeer t cache env b peer) debug $ "GO DOWNLOAD" <+> pretty b
w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer)
when (isLeft w) do
addBlocks [b]
pure $ fmap (b,) w pure $ fmap (b,) w
case r of case r of
Left (PeerRequestTimeout{}) -> do Left (PeerRequestTimeout{}) -> do
debug "DOWNLOAD STUCK!"
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
Left (DownloadStuckError he pe) -> do Left (DownloadStuckError he pe) -> do
@ -223,7 +246,14 @@ downloadFromPeerRec t cache env h0 peer = do
Right (hx,bs) -> do Right (hx,bs) -> do
resetTimeouts timeouts resetTimeouts timeouts
atomically $ writeTQueue p (hx,bs) let refs = extractBlockRefs hx bs
for_ refs $ \z -> do
debug $ "PARSED REF" <+> pretty z
atomically $ mapM (writeTQueue q) refs
pause @'Seconds 0.01
-- atomically $ writeTQueue p (hx,bs)
next next
pure okay pure okay
@ -237,7 +267,7 @@ downloadFromPeerRec t cache env h0 peer = do
modifyTVar timeouts succ modifyTVar timeouts succ
readTVar timeouts readTVar timeouts
if tn < 3 then n else e if tn < 10 then n else e
okay = Right () okay = Right ()
@ -289,7 +319,7 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
worker <- ContT $ withAsync do worker <- ContT $ withAsync do
atomically do atomically do
wtf <- readTVar _sBlockChunks2 wtf <- readTVar _sBlockChunks2
unless (IntMap.size wtf == total) retry unless (IntMap.size wtf >= total) retry
pure wtf pure wtf
waiter <- ContT $ withAsync $ fix \next -> do waiter <- ContT $ withAsync $ fix \next -> do