From 72659b7eb028d55a36d05424d89bf0e4aac7d9f7 Mon Sep 17 00:00:00 2001 From: voidlizard Date: Tue, 5 Nov 2024 13:09:59 +0300 Subject: [PATCH] wip --- hbs2-peer/app/RPC2.hs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 9913f9c9..e9737c2c 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -184,6 +184,7 @@ downloadFromPeerRec t bu0 cache env h0 peer = do sto <- withPeerM env getStorage + p <- newTQueueIO q <- newTQueueIO done <- newTVarIO (mempty :: HashSet (Hash HbSync)) @@ -195,9 +196,16 @@ downloadFromPeerRec t bu0 cache env h0 peer = do flip runContT pure $ callCC \exit -> do + ContT $ withAsync $ liftIO $ forever do + atomically (readTQueue p) + fix \next -> do - mt <- atomically $ isEmptyTQueue q + mt <- atomically do + pe <- isEmptyTQueue p + qe <- isEmptyTQueue q + when ( qe && not pe ) retry + pure qe when mt $ exit $ Right () @@ -213,10 +221,18 @@ downloadFromPeerRec t bu0 cache env h0 peer = do case w of Right bs -> do + h' <- enqueueBlock sto bs h3 <- ContT $ maybe1 h' (pure $ Left StorageError) - let refs = extractBlockRefs h3 bs - atomically $ mapM_ (writeTQueue q . coerce) refs + + let + parse :: IO () + parse = do + let refs = extractBlockRefs h3 bs + atomically $ mapM_ (writeTQueue q . coerce) refs + + atomically $ writeTQueue p parse + next Left e -> do