merge master

This commit is contained in:
fakeshadow 2021-02-06 10:17:12 -08:00
parent 44c48a43b7
commit 24228b89d0
9 changed files with 89 additions and 105 deletions

View File

@ -18,6 +18,7 @@
### Fixed ### Fixed
* Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906] * Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906]
* `web::block` accept any closure that has an output bound to `Send` and `'static`. [#1957]
### Fixed ### Fixed
* Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906] * Multiple calls `App::data` with the same type now keeps the latest call's data. [#1906]
@ -36,6 +37,7 @@
[#1869]: https://github.com/actix/actix-web/pull/1869 [#1869]: https://github.com/actix/actix-web/pull/1869
[#1905]: https://github.com/actix/actix-web/pull/1905 [#1905]: https://github.com/actix/actix-web/pull/1905
[#1906]: https://github.com/actix/actix-web/pull/1906 [#1906]: https://github.com/actix/actix-web/pull/1906
[#1957]: https://github.com/actix/actix-web/pull/1957
## 4.0.0-beta.1 - 2021-01-07 ## 4.0.0-beta.1 - 2021-01-07

View File

@ -8,7 +8,7 @@ use std::{
}; };
use actix_web::{ use actix_web::{
error::{Error, ErrorInternalServerError}, error::{BlockingError, Error},
rt::task::{spawn_blocking, JoinHandle}, rt::task::{spawn_blocking, JoinHandle},
}; };
use bytes::Bytes; use bytes::Bytes;
@ -18,11 +18,26 @@ use futures_core::{ready, Stream};
/// A helper created from a `std::fs::File` which reads the file /// A helper created from a `std::fs::File` which reads the file
/// chunk-by-chunk on a `ThreadPool`. /// chunk-by-chunk on a `ThreadPool`.
pub struct ChunkedReadFile { pub struct ChunkedReadFile {
pub(crate) size: u64, size: u64,
pub(crate) offset: u64, offset: u64,
pub(crate) file: Option<File>, state: ChunkedReadFileState,
pub(crate) fut: Option<JoinHandle<Result<(File, Bytes), io::Error>>>, counter: u64,
pub(crate) counter: u64, }
enum ChunkedReadFileState {
File(Option<File>),
Future(JoinHandle<Result<(File, Bytes), io::Error>>),
}
impl ChunkedReadFile {
pub(crate) fn new(size: u64, offset: u64, file: File) -> Self {
Self {
size,
offset,
state: ChunkedReadFileState::File(Some(file)),
counter: 0,
}
}
} }
impl fmt::Debug for ChunkedReadFile { impl fmt::Debug for ChunkedReadFile {
@ -38,49 +53,52 @@ impl Stream for ChunkedReadFile {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut { let this = self.as_mut().get_mut();
let res = match ready!(Pin::new(fut).poll(cx)) { match this.state {
Ok(Ok((file, bytes))) => { ChunkedReadFileState::File(ref mut file) => {
self.fut.take(); let size = this.size;
self.file = Some(file); let offset = this.offset;
let counter = this.counter;
self.offset += bytes.len() as u64; if size == counter {
self.counter += bytes.len() as u64; Poll::Ready(None)
} else {
let mut file = file
.take()
.expect("ChunkedReadFile polled after completion");
Ok(bytes) let fut = spawn_blocking(move || {
let max_bytes =
cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = file
.by_ref()
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
});
this.state = ChunkedReadFileState::Future(fut);
self.poll_next(cx)
} }
Ok(Err(e)) => Err(e.into()), }
Err(_) => Err(ErrorInternalServerError("Unexpected error")), ChunkedReadFileState::Future(ref mut fut) => {
}; let (file, bytes) =
return Poll::Ready(Some(res)); ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
} this.state = ChunkedReadFileState::File(Some(file));
let size = self.size; this.offset += bytes.len() as u64;
let offset = self.offset; this.counter += bytes.len() as u64;
let counter = self.counter;
if size == counter { Poll::Ready(Some(Ok(bytes)))
Poll::Ready(None) }
} else {
let mut file = self.file.take().expect("Use after completion");
self.fut = Some(spawn_blocking(move || {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
}));
self.poll_next(cx)
} }
} }
} }

View File

@ -298,13 +298,7 @@ impl NamedFile {
res.encoding(current_encoding); res.encoding(current_encoding);
} }
let reader = ChunkedReadFile { let reader = ChunkedReadFile::new(self.md.len(), 0, self.file);
size: self.md.len(),
offset: 0,
file: Some(self.file),
fut: None,
counter: 0,
};
return res.streaming(reader); return res.streaming(reader);
} }
@ -426,13 +420,7 @@ impl NamedFile {
return resp.status(StatusCode::NOT_MODIFIED).finish(); return resp.status(StatusCode::NOT_MODIFIED).finish();
} }
let reader = ChunkedReadFile { let reader = ChunkedReadFile::new(length, offset, self.file);
offset,
size: length,
file: Some(self.file),
fut: None,
counter: 0,
};
if offset != 0 || length != self.md.len() { if offset != 0 || length != self.md.len() {
resp.status(StatusCode::PARTIAL_CONTENT); resp.status(StatusCode::PARTIAL_CONTENT);

View File

@ -18,6 +18,7 @@
reference. [#1903] reference. [#1903]
* `client::error::ConnectError` Resolver variant contains `Box<dyn std::error::Error>` type [#1905] * `client::error::ConnectError` Resolver variant contains `Box<dyn std::error::Error>` type [#1905]
* `client::ConnectorConfig` default timeout changed to 5 seconds. [#1905] * `client::ConnectorConfig` default timeout changed to 5 seconds. [#1905]
* Simplify `BlockingError` type to a struct. It's only triggered with blocking thread pool is dead. [#1957]
### Removed ### Removed
* `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869] * `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869]
@ -30,8 +31,9 @@
[#1894]: https://github.com/actix/actix-web/pull/1894 [#1894]: https://github.com/actix/actix-web/pull/1894
[#1903]: https://github.com/actix/actix-web/pull/1903 [#1903]: https://github.com/actix/actix-web/pull/1903
[#1904]: https://github.com/actix/actix-web/pull/1904 [#1904]: https://github.com/actix/actix-web/pull/1904
[#1905]: https://github.com/actix/actix-web/pull/190 [#1905]: https://github.com/actix/actix-web/pull/1905
[#1912]: https://github.com/actix/actix-web/pull/1912 [#1912]: https://github.com/actix/actix-web/pull/1912
[#1957]: https://github.com/actix/actix-web/pull/1957
## 3.0.0-beta.1 - 2021-01-07 ## 3.0.0-beta.1 - 2021-01-07

View File

@ -79,15 +79,8 @@ where
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
loop { loop {
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { let (chunk, decoder) =
Ok(Ok(item)) => item, ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
}
};
self.decoder = Some(decoder); self.decoder = Some(decoder);
self.fut.take(); self.fut.take();
if let Some(chunk) = chunk { if let Some(chunk) = chunk {

View File

@ -136,17 +136,8 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
} }
if let Some(ref mut fut) = this.fut { if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) { let mut encoder =
Ok(Ok(item)) => item, ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(
BlockingError::<io::Error>::Canceled.into(),
)))
}
};
let chunk = encoder.take(); let chunk = encoder.take();
*this.encoder = Some(encoder); *this.encoder = Some(encoder);
this.fut.take(); this.fut.take();

View File

@ -297,17 +297,13 @@ impl From<httparse::Error> for ParseError {
/// A set of errors that can occur running blocking tasks in thread pool. /// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display)] #[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> { #[display(fmt = "Blocking thread pool is gone")]
#[display(fmt = "{:?}", _0)] pub struct BlockingError;
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {} impl std::error::Error for BlockingError {}
/// `InternalServerError` for `BlockingError` /// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {} impl ResponseError for BlockingError {}
#[derive(Display, Debug)] #[derive(Display, Debug)]
/// A set of errors that can occur during payload parsing /// A set of errors that can occur during payload parsing
@ -372,15 +368,12 @@ impl From<io::Error> for PayloadError {
} }
} }
impl From<BlockingError<io::Error>> for PayloadError { impl From<BlockingError> for PayloadError {
fn from(err: BlockingError<io::Error>) -> Self { fn from(_: BlockingError) -> Self {
match err { PayloadError::Io(io::Error::new(
BlockingError::Error(e) => PayloadError::Io(e), io::ErrorKind::Other,
BlockingError::Canceled => PayloadError::Io(io::Error::new( "Operation is canceled",
io::ErrorKind::Other, ))
"Operation is canceled",
)),
}
} }
} }

View File

@ -1213,7 +1213,7 @@ mod tests {
match res { match res {
Ok(value) => Ok(HttpResponse::Ok() Ok(value) => Ok(HttpResponse::Ok()
.content_type("text/plain") .content_type("text/plain")
.body(format!("Async with block value: {}", value))), .body(format!("Async with block value: {:?}", value))),
Err(_) => panic!("Unexpected"), Err(_) => panic!("Unexpected"),
} }
} }

View File

@ -274,14 +274,11 @@ pub fn service<T: IntoPattern>(path: T) -> WebService {
/// Execute blocking function on a thread pool, returns future that resolves /// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution. /// to result of the function execution.
pub async fn block<F, I, E>(f: F) -> Result<I, BlockingError<E>> pub fn block<F, R>(f: F) -> impl Future<Output = Result<R, BlockingError>>
where where
F: FnOnce() -> Result<I, E> + Send + 'static, F: FnOnce() -> R + Send + 'static,
I: Send + 'static, R: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{ {
match actix_rt::task::spawn_blocking(f).await { let fut = actix_rt::task::spawn_blocking(f);
Ok(res) => res.map_err(BlockingError::Error), async { fut.await.map_err(|_| BlockingError) }
Err(_) => Err(BlockingError::Canceled),
}
} }