diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 5f87c2b9..80d1ff61 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -190,7 +190,7 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do . set sBlockSize (fromIntegral size) $ down - (what, rs) <- liftIO $ withPeerM env do + eblk <- liftIO $ withPeerM env do update @e new key id debug $ "FUCKIN WAIT FOR CHUNKS!" <+> pretty h @@ -199,30 +199,48 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do let total = L.length $ calcChunks size (fromIntegral chunkSize) - blk <- atomically do - wtf <- readTVar _sBlockChunks2 - unless (IntMap.size wtf == total) retry - pure wtf + flip runContT pure do - atomically $ flushTQueue chuQ + worker <- ContT $ withAsync do + atomically do + wtf <- readTVar _sBlockChunks2 + unless (IntMap.size wtf == total) retry + pure wtf + + waiter <- ContT $ withAsync $ fix \next -> do + r <- race (pause t) (atomically $ readTQueue chuQ) + case r of + Left{} -> cancel worker + Right{} -> next + + result <- waitCatch worker + + void $ ContT $ bracket none $ const do + cancel waiter + atomically $ flushTQueue chuQ + expire @e key + + pure result + + callCC \exit2 -> do + + blk <- case eblk of + Right x -> pure x + Left{} -> exit2 (Left (PeerRequestTimeout peer)) let rs = LBS.concat $ IntMap.elems blk ha <- enqueueBlock sto rs - expire @e key + case ha of + Nothing -> pure $ Left StorageError - pure (ha, rs) + Just h1 | h1 == h -> do + pure $ Right rs - case what of - Nothing -> pure $ Left StorageError - - Just h1 | h1 == h -> do - pure $ Right rs - - Just h1 -> do - delBlock sto h1 - pure $ Left (PeerBlockHashMismatch peer) + Just h1 -> do + delBlock sto h1 + pure $ Left (PeerBlockHashMismatch peer) where