This commit is contained in:
voidlizard 2024-11-05 10:58:15 +03:00
parent 71dd5d65b9
commit a91a25e647
1 changed files with 125 additions and 89 deletions

View File

@ -178,99 +178,135 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto
downloadFromPeerRec t cache env h0 peer = do
w <- newTVarIO (mempty :: HashSet (Hash HbSync) )
q <- newTQueueIO
p <- newTQueueIO
timeouts <- newTVarIO 0
sto <- withPeerM env getStorage
let addBlocks hx = atomically do
for_ hx $ \h -> do
writeTQueue q h
-- here <- readTVar w <&> HS.member h
-- unless here do
-- modifyTVar w (HS.insert h)
q <- newTQueueIO
flip runContT pure do
atomically $ writeTQueue q h0
callCC \exit1 -> do
void $ ContT $ withAsync $ forever do
atomically do
(hx,bs) <- readTQueue p
let refs = extractBlockRefs hx bs
mapM (writeTQueue q) refs
flip runContT pure $ callCC \exit -> do
fix \next -> do
h <- atomically do
h1 <- tryReadTQueue q
e <- isEmptyTQueue p
mt <- atomically $ isEmptyTQueue q
case h1 of
Just x -> pure (Just x)
Nothing | e -> pure Nothing
| otherwise -> retry
when mt do
missed <- findMissedBlocks sto (HashRef h0)
mapM_ (atomically . writeTQueue q) missed
r <- case h of
Nothing -> exit1 okay
Just b -> do
debug $ "BLOCK TO DOWNLOAD" <+> pretty b
missed <- findMissedBlocks sto (HashRef b)
addBlocks (fmap coerce missed)
blk <- getBlock sto b
case blk of
Just bs -> pure (Right (b,bs))
Nothing -> do
debug $ "GO DOWNLOAD" <+> pretty b
w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer)
mt <- atomically $ isEmptyTQueue q
when (isLeft w) do
addBlocks [b]
when mt $ exit $ Right ()
pure $ fmap (b,) w
h <- atomically $ readTQueue q
case r of
Left (PeerRequestTimeout{}) -> do
debug "DOWNLOAD STUCK!"
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
w <- lift $ downloadFromPeer (TimeoutSec 5) cache env (coerce h) peer
Left (DownloadStuckError he pe) -> do
checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
Left e -> exit1 (Left e)
Right (hx,bs) -> do
resetTimeouts timeouts
let refs = extractBlockRefs hx bs
for_ refs $ \z -> do
debug $ "PARSED REF" <+> pretty z
atomically $ mapM (writeTQueue q) refs
pause @'Seconds 0.01
-- atomically $ writeTQueue p (hx,bs)
case w of
Right bs -> do
h <- enqueueBlock sto bs
pause @'Seconds 0.25
next
pure okay
Left e -> do
err $ "DOWNLOAD ERROR" <+> pretty h
-- pause @'Seconds 0.25
next
where
pure $ Right ()
resetTimeouts timeouts = atomically $ writeTVar timeouts 0
-- w <- newTVarIO (mempty :: HashSet (Hash HbSync) )
-- q <- newTQueueIO
-- p <- newTQueueIO
checkTimeout timeouts n e = do
tn <- atomically do
modifyTVar timeouts succ
readTVar timeouts
-- timeouts <- newTVarIO 0
if tn < 10 then n else e
-- sto <- withPeerM env getStorage
-- let addBlocks hx = atomically do
-- for_ hx $ \h -> do
-- writeTQueue q h
-- -- here <- readTVar w <&> HS.member h
-- -- unless here do
-- -- modifyTVar w (HS.insert h)
-- flip runContT pure do
-- atomically $ writeTQueue q h0
-- callCC \exit1 -> do
-- void $ ContT $ withAsync $ forever do
-- atomically do
-- (hx,bs) <- readTQueue p
-- let refs = extractBlockRefs hx bs
-- mapM (writeTQueue q) refs
-- fix \next -> do
-- h <- atomically do
-- h1 <- tryReadTQueue q
-- e <- isEmptyTQueue p
-- case h1 of
-- Just x -> pure (Just x)
-- Nothing | e -> pure Nothing
-- | otherwise -> retry
-- r <- case h of
-- Nothing -> exit1 okay
-- Just b -> do
-- debug $ "BLOCK TO DOWNLOAD" <+> pretty b
-- missed <- findMissedBlocks sto (HashRef b)
-- addBlocks (fmap coerce missed)
-- blk <- getBlock sto b
-- case blk of
-- Just bs -> pure (Right (b,bs))
-- Nothing -> do
-- debug $ "GO DOWNLOAD" <+> pretty b
-- w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer)
-- when (isLeft w) do
-- addBlocks [b]
-- pure $ fmap (b,) w
-- case r of
-- Left (PeerRequestTimeout{}) -> do
-- debug "DOWNLOAD STUCK!"
-- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer))
-- Left (DownloadStuckError he pe) -> do
-- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe))
-- Left e -> exit1 (Left e)
-- Right (hx,bs) -> do
-- resetTimeouts timeouts
-- let refs = extractBlockRefs hx bs
-- for_ refs $ \z -> do
-- debug $ "PARSED REF" <+> pretty z
-- atomically $ mapM (writeTQueue q) refs
-- pause @'Seconds 0.01
-- -- atomically $ writeTQueue p (hx,bs)
-- next
-- pure okay
-- where
-- resetTimeouts timeouts = atomically $ writeTVar timeouts 0
-- checkTimeout timeouts n e = do
-- tn <- atomically do
-- modifyTVar timeouts succ
-- readTVar timeouts
-- if tn < 10 then n else e
okay = Right ()
-- okay = Right ()
downloadFromPeer :: forall e t cache m . ( e ~ L4Proto