diff --git a/hbs2-peer/app/RPC2.hs b/hbs2-peer/app/RPC2.hs index 832633f1..9913f9c9 100644 --- a/hbs2-peer/app/RPC2.hs +++ b/hbs2-peer/app/RPC2.hs @@ -59,6 +59,7 @@ import Data.HashSet (HashSet) import Data.HashSet qualified as HS import Data.IntMap qualified as IntMap import Data.IntMap (IntMap) +import Data.List.Split qualified as Split import Data.Text qualified as Text import Data.Either import Data.Maybe @@ -77,6 +78,7 @@ data DownloadError e = DownloadStuckError HashRef (Peer e) | StorageError | UnknownPeerError (Peer e) + | InternalError Int | PeerMissBlockError HashRef (Peer e) | PeerBlockHashMismatch (Peer e) | PeerRequestTimeout (Peer e) @@ -89,6 +91,7 @@ instance Pretty (Peer e) => Show (DownloadError e) where show (PeerRequestTimeout p) = show $ parens $ "PeerRequestTimeout" <+> pretty p show (PeerBlockHashMismatch p) = show $ parens $ "PeerBlockHashMismatch" <+> pretty p show StorageError = show "StorageError" + show (InternalError n) = show $ parens "InternalError" <+> pretty n instance (Typeable e, Pretty (Peer e)) => Exception (DownloadError e) @@ -170,18 +173,25 @@ downloadFromPeerRec :: forall e t cache m . ( e ~ L4Proto , BlockSizeCache e cache ) => Timeout t + -> Int -> cache -> PeerEnv e -> Hash HbSync -> Peer e -> m (Either (DownloadError e) ()) -downloadFromPeerRec t cache env h0 peer = do +downloadFromPeerRec t bu0 cache env h0 peer = do sto <- withPeerM env getStorage q <- newTQueueIO + done <- newTVarIO (mempty :: HashSet (Hash HbSync)) + atomically $ writeTQueue q h0 + + missed <- findMissedBlocks sto (HashRef h0) + + mapM_ (atomically . writeTQueue q . coerce) missed flip runContT pure $ callCC \exit -> do @@ -189,144 +199,58 @@ downloadFromPeerRec t cache env h0 peer = do mt <- atomically $ isEmptyTQueue q - when mt do - missed <- findMissedBlocks sto (HashRef h0) - mapM_ (atomically . writeTQueue q) missed - - mt <- atomically $ isEmptyTQueue q - when mt $ exit $ Right () h <- atomically $ readTQueue q - w <- lift $ downloadFromPeer (TimeoutSec 5) cache env (coerce h) peer + here1 <- readTVarIO done <&> HS.member h + + here2 <- hasBlock sto (coerce h) <&> isJust + + when (here1 || here2) next + + w <- lift $ downloadFromPeer (TimeoutSec 5) bu0 cache env (coerce h) peer case w of Right bs -> do - h <- enqueueBlock sto bs - pause @'Seconds 0.25 + h' <- enqueueBlock sto bs + h3 <- ContT $ maybe1 h' (pure $ Left StorageError) + let refs = extractBlockRefs h3 bs + atomically $ mapM_ (writeTQueue q . coerce) refs next Left e -> do - err $ "DOWNLOAD ERROR" <+> pretty h + err $ "DOWNLOAD ERROR" <+> viaShow e -- pause @'Seconds 0.25 next + missed <- findMissedBlocks sto (HashRef h0) + + unless (L.null missed) do + mapM_ (atomically . writeTQueue q . coerce) missed + next + pure $ Right () - -- w <- newTVarIO (mempty :: HashSet (Hash HbSync) ) - -- q <- newTQueueIO - -- p <- newTQueueIO - - -- timeouts <- newTVarIO 0 - - -- sto <- withPeerM env getStorage - - -- let addBlocks hx = atomically do - -- for_ hx $ \h -> do - -- writeTQueue q h - -- -- here <- readTVar w <&> HS.member h - -- -- unless here do - -- -- modifyTVar w (HS.insert h) - - -- flip runContT pure do - - -- atomically $ writeTQueue q h0 - - -- callCC \exit1 -> do - - -- void $ ContT $ withAsync $ forever do - -- atomically do - -- (hx,bs) <- readTQueue p - -- let refs = extractBlockRefs hx bs - -- mapM (writeTQueue q) refs - - -- fix \next -> do - - -- h <- atomically do - -- h1 <- tryReadTQueue q - -- e <- isEmptyTQueue p - - -- case h1 of - -- Just x -> pure (Just x) - -- Nothing | e -> pure Nothing - -- | otherwise -> retry - - -- r <- case h of - -- Nothing -> exit1 okay - -- Just b -> do - -- debug $ "BLOCK TO DOWNLOAD" <+> pretty b - -- missed <- findMissedBlocks sto (HashRef b) - -- addBlocks (fmap coerce missed) - -- blk <- getBlock sto b - -- case blk of - -- Just bs -> pure (Right (b,bs)) - -- Nothing -> do - -- debug $ "GO DOWNLOAD" <+> pretty b - -- w <- lift (downloadFromPeer (TimeoutSec 20) cache env b peer) - - -- when (isLeft w) do - -- addBlocks [b] - - -- pure $ fmap (b,) w - - -- case r of - -- Left (PeerRequestTimeout{}) -> do - -- debug "DOWNLOAD STUCK!" - -- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError (HashRef h0) peer)) - - -- Left (DownloadStuckError he pe) -> do - -- checkTimeout timeouts next (exit1 (Left $ DownloadStuckError he pe)) - - -- Left e -> exit1 (Left e) - - -- Right (hx,bs) -> do - -- resetTimeouts timeouts - -- let refs = extractBlockRefs hx bs - - -- for_ refs $ \z -> do - -- debug $ "PARSED REF" <+> pretty z - - -- atomically $ mapM (writeTQueue q) refs - -- pause @'Seconds 0.01 - -- -- atomically $ writeTQueue p (hx,bs) - -- next - - -- pure okay - - -- where - - -- resetTimeouts timeouts = atomically $ writeTVar timeouts 0 - - -- checkTimeout timeouts n e = do - -- tn <- atomically do - -- modifyTVar timeouts succ - -- readTVar timeouts - - -- if tn < 10 then n else e - - - -- okay = Right () - - downloadFromPeer :: forall e t cache m . ( e ~ L4Proto , MonadUnliftIO m , IsTimeout t , BlockSizeCache e cache ) => Timeout t + -> Int -> cache -> PeerEnv e -> Hash HbSync -> Peer e -> m (Either (DownloadError e) ByteString) -downloadFromPeer t cache env h peer = liftIO $ withPeerM env do +downloadFromPeer t bu cache env h peer = liftIO $ withPeerM env do pd@PeerData{..} <- find (KnownPeerKey peer) id >>= orThrow (UnknownPeerError peer) - sto <- liftIO $ withPeerM env $ getStorage + sto <- getStorage let chunkSize = defChunkSize @@ -343,46 +267,39 @@ downloadFromPeer t cache env h peer = liftIO $ withPeerM env do . set sBlockSize (fromIntegral size) $ down - eblk <- liftIO $ withPeerM env do - update @e new key id + lift $ update @e new key id - request peer (BlockChunks @e coo (BlockGetAllChunks h chunkSize)) + let offsets = calcChunks size (fromIntegral chunkSize) :: [(Offset, Size)] - let total = L.length $ calcChunks size (fromIntegral chunkSize) + let chunkNums = [ 0 .. pred (length offsets) ] - flip runContT pure do + let bursts = calcBursts bu chunkNums + + callCC $ \exit2 -> do + + for_ bursts $ \(i,chunkN) -> do + let req = BlockChunks @e coo (BlockGetChunks h chunkSize (fromIntegral i) (fromIntegral chunkN)) + lift $ request peer req + + r <- liftIO $ race (pause t) do - worker <- ContT $ withAsync do atomically do - wtf <- readTVar _sBlockChunks2 - unless (IntMap.size wtf >= total) retry - pure wtf + -- readTQueue chuQ + pieces <- readTVar _sBlockChunks2 + let done = and [ IntMap.member j pieces | j <- [i .. i + chunkN-1] ] + unless done retry - waiter <- ContT $ withAsync $ fix \next -> do - r <- race (pause t) (atomically $ readTQueue chuQ) - case r of - Left{} -> cancel worker - Right{} -> next + atomically $ flushTQueue chuQ - result <- waitCatch worker - - void $ ContT $ bracket none $ const do - cancel waiter - atomically $ flushTQueue chuQ - expire @e key - - pure result - - callCC \exit2 -> do - - blk <- case eblk of - Right x -> pure x - Left{} -> exit2 (Left (PeerRequestTimeout peer)) + either (const $ exit2 (Left $ DownloadStuckError (HashRef h) peer) ) pure r + blk <- readTVarIO _sBlockChunks2 let rs = LBS.concat $ IntMap.elems blk ha <- enqueueBlock sto rs + lift $ expire @e key + case ha of Nothing -> pure $ Left StorageError @@ -442,26 +359,32 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => _ -> pure nil + entry $ bindMatch "query-block-from-peer:rec" $ \syn -> do - entry $ bindMatch "query-block-from-peer:rec" \case - [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do + flip runContT pure $ callCC \exit -> do - peer' <- liftIO $ try @_ @SomeException do - let pa = fromString @(PeerAddr L4Proto) addr - fromPeerAddr pa + (blk,addr,bu0) <- case syn of + [ HashLike blk, StringLike addr ] -> pure (blk, addr, 4) + [ HashLike blk, StringLike addr, LitIntVal x ] -> pure (blk, addr, fromIntegral x) - peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' + _ -> exit nil - what <- lift $ downloadFromPeerRec defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer + callCC \exit2 -> do - case what of - Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] - Right{} -> pure $ mkList @C [ mkSym "okay" ] + peer' <- liftIO $ try @_ @SomeException do + let pa = fromString @(PeerAddr L4Proto) addr + fromPeerAddr pa - _ -> pure nil + peer <- either (const $ exit2 (mkSym "error:invalid-address")) pure peer' + + what <- lift $ downloadFromPeerRec defChunkWaitMax bu0 rpcBrains rpcPeerEnv (coerce blk) peer + + case what of + Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ] + Right{} -> pure $ mkList @C [ mkSym "okay" ] entry $ bindMatch "query-block-from-peer" \case - [ HashLike blk, StringLike addr ] -> flip runContT pure $ callCC \exit -> do + ( HashLike blk :StringLike addr : opts) -> flip runContT pure $ callCC \exit -> do peer' <- liftIO $ try @_ @SomeException do let pa = fromString @(PeerAddr L4Proto) addr @@ -469,7 +392,7 @@ instance (e ~ L4Proto, MonadUnliftIO m, HasRpcContext PeerAPI RPC2Context m) => peer <- either (const $ exit (mkSym "error:invalid-address")) pure peer' - what <- lift $ downloadFromPeer defChunkWaitMax rpcBrains rpcPeerEnv (coerce blk) peer + what <- lift $ downloadFromPeer defChunkWaitMax 4 rpcBrains rpcPeerEnv (coerce blk) peer case what of Left e -> pure $ mkList @C [ mkSym "error" , mkStr (show e) ]