This commit is contained in:
voidlizard 2024-11-05 09:19:25 +03:00
parent 3e737feb0c
commit b6c97a254a
2 changed files with 35 additions and 5 deletions

View File

@ -1202,7 +1202,7 @@ runPeer opts = respawnOnError opts $ do
peerThread "pexLoop" (pexLoop @e brains tcp)
peerThread "blockDownloadLoop" (blockDownloadLoop denv)
-- peerThread "blockDownloadLoop" (blockDownloadLoop denv)
peerThread "blockDownloadQ" (downloadQueue conf (SomeBrains brains) denv)

View File

@ -32,6 +32,7 @@ import HBS2.Peer.Proto.BlockInfo
import HBS2.Peer.Proto.BlockChunks
import HBS2.Peer.Brains
import HBS2.Storage
import HBS2.Storage.Operations.Missed
import HBS2.Clock
import HBS2.Net.Auth.Schema
@ -54,6 +55,8 @@ import PeerInfo
import Control.Monad.Trans.Maybe
import Control.Monad.Trans.Cont
import Control.Concurrent.STM (flushTQueue,retry)
import Data.HashSet (HashSet)
import Data.HashSet qualified as HS
import Data.IntMap qualified as IntMap
import Data.IntMap (IntMap)
import Data.Text qualified as Text
@ -175,6 +178,7 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
downloadFromPeerRec t cache env h0 peer = do
w <- newTVarIO (mempty :: HashSet (Hash HbSync) )
q <- newTQueueIO
p <- newTQueueIO
@ -182,7 +186,17 @@ downloadFromPeerRec t cache env h0 peer = do
sto <- withPeerM env getStorage
let addBlocks hx = atomically do
for_ hx $ \h -> do
writeTQueue q h
-- here <- readTVar w <&> HS.member h
-- unless here do
-- modifyTVar w (HS.insert h)
flip runContT pure do
atomically $ writeTQueue q h0
callCC \exit1 -> do
void $ ContT $ withAsync $ forever do
@ -205,15 +219,24 @@ downloadFromPeerRec t cache env h0 peer = do
r <- case h of
Nothing -> exit1 okay
Just b -> do
debug $ "BLOCK TO DOWNLOAD" <+> pretty b
missed <- findMissedBlocks sto (HashRef b)
addBlocks (fmap coerce missed)
blk <- getBlock sto b
case blk of
Just bs -> pure (Right (b,bs))
Nothing -> do
w <- lift (downloadFromPeer t cache env b peer)
debug $ "GO DOWNLOAD" <+> pretty b
w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer)
when (isLeft w) do
addBlocks [b]
pure $ fmap (b,) w
case r of
Left (PeerRequestTimeout{}) -> do
debug "DOWNLOAD STUCK!"
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
Left (DownloadStuckError he pe) -> do
@ -223,7 +246,14 @@ downloadFromPeerRec t cache env h0 peer = do
Right (hx,bs) -> do
resetTimeouts timeouts
atomically $ writeTQueue p (hx,bs)
let refs = extractBlockRefs hx bs
for_ refs $ \z -> do
debug $ "PARSED REF" <+> pretty z
atomically $ mapM (writeTQueue q) refs
pause @'Seconds 0.01
-- atomically $ writeTQueue p (hx,bs)
next
pure okay
@ -237,7 +267,7 @@ downloadFromPeerRec t cache env h0 peer = do
modifyTVar timeouts succ
readTVar timeouts
if tn < 3 then n else e
if tn < 10 then n else e
okay = Right ()
@ -289,7 +319,7 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do
worker <- ContT $ withAsync do
atomically do
wtf <- readTVar _sBlockChunks2
unless (IntMap.size wtf == total) retry
unless (IntMap.size wtf >= total) retry
pure wtf
waiter <- ContT $ withAsync $ fix \next -> do