From 24228b89d0d26eef960c64a8a518b60b46c6a453 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 10:17:12 -0800 Subject: [PATCH] merge master --- CHANGES.md | 2 + actix-files/src/chunked.rs | 106 +++++++++++++++++------------ actix-files/src/named.rs | 16 +---- actix-http/CHANGES.md | 4 +- actix-http/src/encoding/decoder.rs | 11 +-- actix-http/src/encoding/encoder.rs | 13 +--- actix-http/src/error.rs | 27 +++----- src/test.rs | 2 +- src/web.rs | 13 ++-- 9 files changed, 89 insertions(+), 105 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 562d0f6d3..2416d4d08 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -18,6 +18,7 @@ ### Fixed * Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906] +* `web::block` accept any closure that has an output bound to `Send` and `'static`. [#1957] ### Fixed * Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906] @@ -36,6 +37,7 @@ [#1869]: https://github.com/actix/actix-web/pull/1869 [#1905]: https://github.com/actix/actix-web/pull/1905 [#1906]: https://github.com/actix/actix-web/pull/1906 +[#1957]: https://github.com/actix/actix-web/pull/1957 ## 4.0.0-beta.1 - 2021-01-07 diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index 5b7b17dc4..2a62b1d26 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_web::{ - error::{Error, ErrorInternalServerError}, + error::{BlockingError, Error}, rt::task::{spawn_blocking, JoinHandle}, }; use bytes::Bytes; @@ -18,11 +18,26 @@ use futures_core::{ready, Stream}; /// A helper created from a `std::fs::File` which reads the file /// chunk-by-chunk on a `ThreadPool`. pub struct ChunkedReadFile { - pub(crate) size: u64, - pub(crate) offset: u64, - pub(crate) file: Option, - pub(crate) fut: Option>>, - pub(crate) counter: u64, + size: u64, + offset: u64, + state: ChunkedReadFileState, + counter: u64, +} + +enum ChunkedReadFileState { + File(Option), + Future(JoinHandle>), +} + +impl ChunkedReadFile { + pub(crate) fn new(size: u64, offset: u64, file: File) -> Self { + Self { + size, + offset, + state: ChunkedReadFileState::File(Some(file)), + counter: 0, + } + } } impl fmt::Debug for ChunkedReadFile { @@ -38,49 +53,52 @@ impl Stream for ChunkedReadFile { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - if let Some(ref mut fut) = self.fut { - let res = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok((file, bytes))) => { - self.fut.take(); - self.file = Some(file); + let this = self.as_mut().get_mut(); + match this.state { + ChunkedReadFileState::File(ref mut file) => { + let size = this.size; + let offset = this.offset; + let counter = this.counter; - self.offset += bytes.len() as u64; - self.counter += bytes.len() as u64; + if size == counter { + Poll::Ready(None) + } else { + let mut file = file + .take() + .expect("ChunkedReadFile polled after completion"); - Ok(bytes) + let fut = spawn_blocking(move || { + let max_bytes = + cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let mut buf = Vec::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = file + .by_ref() + .take(max_bytes as u64) + .read_to_end(&mut buf)?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok((file, Bytes::from(buf))) + }); + this.state = ChunkedReadFileState::Future(fut); + self.poll_next(cx) } - Ok(Err(e)) => Err(e.into()), - Err(_) => Err(ErrorInternalServerError("Unexpected error")), - }; - return Poll::Ready(Some(res)); - } + } + ChunkedReadFileState::Future(ref mut fut) => { + let (file, bytes) = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; + this.state = ChunkedReadFileState::File(Some(file)); - let size = self.size; - let offset = self.offset; - let counter = self.counter; + this.offset += bytes.len() as u64; + this.counter += bytes.len() as u64; - if size == counter { - Poll::Ready(None) - } else { - let mut file = self.file.take().expect("Use after completion"); - - self.fut = Some(spawn_blocking(move || { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; - - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok((file, Bytes::from(buf))) - })); - - self.poll_next(cx) + Poll::Ready(Some(Ok(bytes))) + } } } } diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index 8cd2a23f9..6fa3f7c6c 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -298,13 +298,7 @@ impl NamedFile { res.encoding(current_encoding); } - let reader = ChunkedReadFile { - size: self.md.len(), - offset: 0, - file: Some(self.file), - fut: None, - counter: 0, - }; + let reader = ChunkedReadFile::new(self.md.len(), 0, self.file); return res.streaming(reader); } @@ -426,13 +420,7 @@ impl NamedFile { return resp.status(StatusCode::NOT_MODIFIED).finish(); } - let reader = ChunkedReadFile { - offset, - size: length, - file: Some(self.file), - fut: None, - counter: 0, - }; + let reader = ChunkedReadFile::new(length, offset, self.file); if offset != 0 || length != self.md.len() { resp.status(StatusCode::PARTIAL_CONTENT); diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 461929c82..52980fb5d 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -18,6 +18,7 @@ reference. [#1903] * `client::error::ConnectError` Resolver variant contains `Box` type [#1905] * `client::ConnectorConfig` default timeout changed to 5 seconds. [#1905] +* Simplify `BlockingError` type to a struct. It's only triggered with blocking thread pool is dead. [#1957] ### Removed * `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869] @@ -30,8 +31,9 @@ [#1894]: https://github.com/actix/actix-web/pull/1894 [#1903]: https://github.com/actix/actix-web/pull/1903 [#1904]: https://github.com/actix/actix-web/pull/1904 -[#1905]: https://github.com/actix/actix-web/pull/190 +[#1905]: https://github.com/actix/actix-web/pull/1905 [#1912]: https://github.com/actix/actix-web/pull/1912 +[#1957]: https://github.com/actix/actix-web/pull/1957 ## 3.0.0-beta.1 - 2021-01-07 diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index b26609911..2cf2f6e03 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -79,15 +79,8 @@ where ) -> Poll> { loop { if let Some(ref mut fut) = self.fut { - let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => { - return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) - } - Err(_) => { - return Poll::Ready(Some(Err(BlockingError::Canceled.into()))) - } - }; + let (chunk, decoder) = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; self.decoder = Some(decoder); self.fut.take(); if let Some(chunk) = chunk { diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 28c757076..1d4a8e933 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -136,17 +136,8 @@ impl MessageBody for Encoder { } if let Some(ref mut fut) = this.fut { - let mut encoder = match ready!(Pin::new(fut).poll(cx)) { - Ok(Ok(item)) => item, - Ok(Err(e)) => { - return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) - } - Err(_) => { - return Poll::Ready(Some(Err( - BlockingError::::Canceled.into(), - ))) - } - }; + let mut encoder = + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; let chunk = encoder.take(); *this.encoder = Some(encoder); this.fut.take(); diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 9ff154240..28697cbbf 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -297,17 +297,13 @@ impl From for ParseError { /// A set of errors that can occur running blocking tasks in thread pool. #[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} +#[display(fmt = "Blocking thread pool is gone")] +pub struct BlockingError; -impl std::error::Error for BlockingError {} +impl std::error::Error for BlockingError {} /// `InternalServerError` for `BlockingError` -impl ResponseError for BlockingError {} +impl ResponseError for BlockingError {} #[derive(Display, Debug)] /// A set of errors that can occur during payload parsing @@ -372,15 +368,12 @@ impl From for PayloadError { } } -impl From> for PayloadError { - fn from(err: BlockingError) -> Self { - match err { - BlockingError::Error(e) => PayloadError::Io(e), - BlockingError::Canceled => PayloadError::Io(io::Error::new( - io::ErrorKind::Other, - "Operation is canceled", - )), - } +impl From for PayloadError { + fn from(_: BlockingError) -> Self { + PayloadError::Io(io::Error::new( + io::ErrorKind::Other, + "Operation is canceled", + )) } } diff --git a/src/test.rs b/src/test.rs index d51017b1e..3acb520ff 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1213,7 +1213,7 @@ mod tests { match res { Ok(value) => Ok(HttpResponse::Ok() .content_type("text/plain") - .body(format!("Async with block value: {}", value))), + .body(format!("Async with block value: {:?}", value))), Err(_) => panic!("Unexpected"), } } diff --git a/src/web.rs b/src/web.rs index 88071f551..3b4475b63 100644 --- a/src/web.rs +++ b/src/web.rs @@ -274,14 +274,11 @@ pub fn service(path: T) -> WebService { /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub async fn block(f: F) -> Result> +pub fn block(f: F) -> impl Future> where - F: FnOnce() -> Result + Send + 'static, - I: Send + 'static, - E: Send + std::fmt::Debug + 'static, + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { - match actix_rt::task::spawn_blocking(f).await { - Ok(res) => res.map_err(BlockingError::Error), - Err(_) => Err(BlockingError::Canceled), - } + let fut = actix_rt::task::spawn_blocking(f); + async { fut.await.map_err(|_| BlockingError) } }