From fa127b75a40384f48c399a7fc4ff57d102e150a8 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 6 Jan 2021 21:08:46 +0800 Subject: [PATCH] fix doc comment. use spawn_blocking in actix-files --- actix-files/src/chunked.rs | 48 ++++++++++++++++---------------------- actix-files/src/lib.rs | 11 +-------- actix-http/src/error.rs | 2 +- src/web.rs | 3 --- 4 files changed, 22 insertions(+), 42 deletions(-) diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index 580b06787..5b7b17dc4 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -8,17 +8,11 @@ use std::{ }; use actix_web::{ - error::{BlockingError, Error}, - web, + error::{Error, ErrorInternalServerError}, + rt::task::{spawn_blocking, JoinHandle}, }; use bytes::Bytes; use futures_core::{ready, Stream}; -use futures_util::future::{FutureExt, LocalBoxFuture}; - -use crate::handle_error; - -type ChunkedBoxFuture = - LocalBoxFuture<'static, Result<(File, Bytes), BlockingError>>; #[doc(hidden)] /// A helper created from a `std::fs::File` which reads the file @@ -27,7 +21,7 @@ pub struct ChunkedReadFile { pub(crate) size: u64, pub(crate) offset: u64, pub(crate) file: Option, - pub(crate) fut: Option, + pub(crate) fut: Option>>, pub(crate) counter: u64, } @@ -45,18 +39,20 @@ impl Stream for ChunkedReadFile { cx: &mut Context<'_>, ) -> Poll> { if let Some(ref mut fut) = self.fut { - return match ready!(Pin::new(fut).poll(cx)) { - Ok((file, bytes)) => { + let res = match ready!(Pin::new(fut).poll(cx)) { + Ok(Ok((file, bytes))) => { self.fut.take(); self.file = Some(file); self.offset += bytes.len() as u64; self.counter += bytes.len() as u64; - Poll::Ready(Some(Ok(bytes))) + Ok(bytes) } - Err(e) => Poll::Ready(Some(Err(handle_error(e)))), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err(ErrorInternalServerError("Unexpected error")), }; + return Poll::Ready(Some(res)); } let size = self.size; @@ -68,25 +64,21 @@ impl Stream for ChunkedReadFile { } else { let mut file = self.file.take().expect("Use after completion"); - self.fut = Some( - web::block(move || { - let max_bytes = - cmp::min(size.saturating_sub(counter), 65_536) as usize; + self.fut = Some(spawn_blocking(move || { + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; + let mut buf = Vec::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + let n_bytes = + file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } - Ok((file, Bytes::from(buf))) - }) - .boxed_local(), - ); + Ok((file, Bytes::from(buf))) + })); self.poll_next(cx) } diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 662fba0a3..b7225fbc0 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -14,12 +14,10 @@ #![deny(rust_2018_idioms)] #![warn(missing_docs, missing_debug_implementations)] -use std::io; - use actix_service::boxed::{BoxService, BoxServiceFactory}; use actix_web::{ dev::{ServiceRequest, ServiceResponse}, - error::{BlockingError, Error, ErrorInternalServerError}, + error::Error, http::header::DispositionType, }; use mime_guess::from_ext; @@ -56,13 +54,6 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime { from_ext(ext).first_or_octet_stream() } -pub(crate) fn handle_error(err: BlockingError) -> Error { - match err { - BlockingError::Error(err) => err.into(), - BlockingError::Canceled => ErrorInternalServerError("Unexpected error"), - } -} - type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; #[cfg(test)] diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index df099790c..a99c5f3fa 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -300,8 +300,8 @@ impl From for ParseError { } } -#[derive(Debug, Display)] /// A set of errors that can occur running blocking tasks in thread pool. +#[derive(Debug, Display)] pub enum BlockingError { #[display(fmt = "{:?}", _0)] Error(E), diff --git a/src/web.rs b/src/web.rs index 2c0a9e26a..88071f551 100644 --- a/src/web.rs +++ b/src/web.rs @@ -280,9 +280,6 @@ where I: Send + 'static, E: Send + std::fmt::Debug + 'static, { - // map error to BlockingError. this is for not breaking the existing api. - // preferably this should just return actix_rt::task::JoinHandle for a more - // flexible control of when the task is awaited and/or the abort(if needed). match actix_rt::task::spawn_blocking(f).await { Ok(res) => res.map_err(BlockingError::Error), Err(_) => Err(BlockingError::Canceled),