From a91a25e647889551f6b40a58d3310d6ba96a548e Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 5 Nov 2024 10:58:15 +0300 Subject: [PATCH] fucked --- hbs2-peer/app/RPC2.hs | 214 ++++++++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 89 deletions(-) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index a0389c4c..832633f1 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -178,99 +178,135 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto downloadFromPeerRec t cache env h0 peer = do - w <- newTVarIO (mempty :: HashSet (Hash HbSync) ) - q <- newTQueueIO - p <- newTQueueIO - - timeouts <- newTVarIO 0 - sto <- withPeerM env getStorage - let addBlocks hx = atomically do - for_ hx $ \h -> do - writeTQueue q h - -- here <- readTVar w <&> HS.member h - -- unless here do - -- modifyTVar w (HS.insert h) - - flip runContT pure do - - atomically $ writeTQueue q h0 - - callCC \exit1 -> do - - void $ ContT $ withAsync $ forever do - atomically do - (hx,bs) <- readTQueue p - let refs = extractBlockRefs hx bs - mapM (writeTQueue q) refs - - fix \next -> do - - h <- atomically do - h1 <- tryReadTQueue q - e <- isEmptyTQueue p - - case h1 of - Just x -> pure (Just x) - Nothing | e -> pure Nothing - | otherwise -> retry - - r <- case h of - Nothing -> exit1 okay - Just b -> do - debug $ "BLOCK TO DOWNLOAD" <+> pretty b - missed <- findMissedBlocks sto (HashRef b) - addBlocks (fmap coerce missed) - blk <- getBlock sto b - case blk of - Just bs -> pure (Right (b,bs)) - Nothing -> do - debug $ "GO DOWNLOAD" <+> pretty b - w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer) - - when (isLeft w) do - addBlocks [b] - - pure $ fmap (b,) w - - case r of - Left (PeerRequestTimeout{}) -> do - debug "DOWNLOAD STUCK!" - checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) - - Left (DownloadStuckError he pe) -> do - checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe)) - - Left e -> exit1 (Left e) - - Right (hx,bs) -> do - resetTimeouts timeouts - let refs = extractBlockRefs hx bs - - for_ refs $ \z -> do - debug $ "PARSED REF" <+> pretty z - - atomically $ mapM (writeTQueue q) refs - pause @'Seconds 0.01 - -- atomically $ writeTQueue p (hx,bs) - next - - pure okay - - where - - resetTimeouts timeouts = atomically $ writeTVar timeouts 0 - - checkTimeout timeouts n e = do - tn <- atomically do - modifyTVar timeouts succ - readTVar timeouts - - if tn < 10 then n else e + q <- newTQueueIO - okay = Right () + flip runContT pure $ callCC \exit -> do + + fix \next -> do + + mt <- atomically $ isEmptyTQueue q + + when mt do + missed <- findMissedBlocks sto (HashRef h0) + mapM_ (atomically . writeTQueue q) missed + + mt <- atomically $ isEmptyTQueue q + + when mt $ exit $ Right () + + h <- atomically $ readTQueue q + + w <- lift $ downloadFromPeer (TimeoutSec 5) cache env (coerce h) peer + + case w of + Right bs -> do + h <- enqueueBlock sto bs + pause @'Seconds 0.25 + next + + Left e -> do + err $ "DOWNLOAD ERROR" <+> pretty h + -- pause @'Seconds 0.25 + next + + pure $ Right () + + -- w <- newTVarIO (mempty :: HashSet (Hash HbSync) ) + -- q <- newTQueueIO + -- p <- newTQueueIO + + -- timeouts <- newTVarIO 0 + + -- sto <- withPeerM env getStorage + + -- let addBlocks hx = atomically do + -- for_ hx $ \h -> do + -- writeTQueue q h + -- -- here <- readTVar w <&> HS.member h + -- -- unless here do + -- -- modifyTVar w (HS.insert h) + + -- flip runContT pure do + + -- atomically $ writeTQueue q h0 + + -- callCC \exit1 -> do + + -- void $ ContT $ withAsync $ forever do + -- atomically do + -- (hx,bs) <- readTQueue p + -- let refs = extractBlockRefs hx bs + -- mapM (writeTQueue q) refs + + -- fix \next -> do + + -- h <- atomically do + -- h1 <- tryReadTQueue q + -- e <- isEmptyTQueue p + + -- case h1 of + -- Just x -> pure (Just x) + -- Nothing | e -> pure Nothing + -- | otherwise -> retry + + -- r <- case h of + -- Nothing -> exit1 okay + -- Just b -> do + -- debug $ "BLOCK TO DOWNLOAD" <+> pretty b + -- missed <- findMissedBlocks sto (HashRef b) + -- addBlocks (fmap coerce missed) + -- blk <- getBlock sto b + -- case blk of + -- Just bs -> pure (Right (b,bs)) + -- Nothing -> do + -- debug $ "GO DOWNLOAD" <+> pretty b + -- w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer) + + -- when (isLeft w) do + -- addBlocks [b] + + -- pure $ fmap (b,) w + + -- case r of + -- Left (PeerRequestTimeout{}) -> do + -- debug "DOWNLOAD STUCK!" + -- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) + + -- Left (DownloadStuckError he pe) -> do + -- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe)) + + -- Left e -> exit1 (Left e) + + -- Right (hx,bs) -> do + -- resetTimeouts timeouts + -- let refs = extractBlockRefs hx bs + + -- for_ refs $ \z -> do + -- debug $ "PARSED REF" <+> pretty z + + -- atomically $ mapM (writeTQueue q) refs + -- pause @'Seconds 0.01 + -- -- atomically $ writeTQueue p (hx,bs) + -- next + + -- pure okay + + -- where + + -- resetTimeouts timeouts = atomically $ writeTVar timeouts 0 + + -- checkTimeout timeouts n e = do + -- tn <- atomically do + -- modifyTVar timeouts succ + -- readTVar timeouts + + -- if tn < 10 then n else e + + + -- okay = Right () downloadFromPeer :: forall e t cache m . ( e ~ L4Proto