fix doc comment. use spawn_blocking in actix-files

This commit is contained in:
fakeshadow 2021-01-06 21:08:46 +08:00
parent 848a6ef64f
commit fa127b75a4
4 changed files with 22 additions and 42 deletions

View File

@ -8,17 +8,11 @@ use std::{
}; };
use actix_web::{ use actix_web::{
error::{BlockingError, Error}, error::{Error, ErrorInternalServerError},
web, rt::task::{spawn_blocking, JoinHandle},
}; };
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use futures_util::future::{FutureExt, LocalBoxFuture};
use crate::handle_error;
type ChunkedBoxFuture =
LocalBoxFuture<'static, Result<(File, Bytes), BlockingError<io::Error>>>;
#[doc(hidden)] #[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file /// A helper created from a `std::fs::File` which reads the file
@ -27,7 +21,7 @@ pub struct ChunkedReadFile {
pub(crate) size: u64, pub(crate) size: u64,
pub(crate) offset: u64, pub(crate) offset: u64,
pub(crate) file: Option<File>, pub(crate) file: Option<File>,
pub(crate) fut: Option<ChunkedBoxFuture>, pub(crate) fut: Option<JoinHandle<Result<(File, Bytes), io::Error>>>,
pub(crate) counter: u64, pub(crate) counter: u64,
} }
@ -45,18 +39,20 @@ impl Stream for ChunkedReadFile {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut { if let Some(ref mut fut) = self.fut {
return match ready!(Pin::new(fut).poll(cx)) { let res = match ready!(Pin::new(fut).poll(cx)) {
Ok((file, bytes)) => { Ok(Ok((file, bytes))) => {
self.fut.take(); self.fut.take();
self.file = Some(file); self.file = Some(file);
self.offset += bytes.len() as u64; self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64; self.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes))) Ok(bytes)
} }
Err(e) => Poll::Ready(Some(Err(handle_error(e)))), Ok(Err(e)) => Err(e.into()),
Err(_) => Err(ErrorInternalServerError("Unexpected error")),
}; };
return Poll::Ready(Some(res));
} }
let size = self.size; let size = self.size;
@ -68,25 +64,21 @@ impl Stream for ChunkedReadFile {
} else { } else {
let mut file = self.file.take().expect("Use after completion"); let mut file = self.file.take().expect("Use after completion");
self.fut = Some( self.fut = Some(spawn_blocking(move || {
web::block(move || { let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let max_bytes =
cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes); let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?; file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = let n_bytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 { if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into()); return Err(io::ErrorKind::UnexpectedEof.into());
} }
Ok((file, Bytes::from(buf))) Ok((file, Bytes::from(buf)))
}) }));
.boxed_local(),
);
self.poll_next(cx) self.poll_next(cx)
} }

View File

@ -14,12 +14,10 @@
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![warn(missing_docs, missing_debug_implementations)] #![warn(missing_docs, missing_debug_implementations)]
use std::io;
use actix_service::boxed::{BoxService, BoxServiceFactory}; use actix_service::boxed::{BoxService, BoxServiceFactory};
use actix_web::{ use actix_web::{
dev::{ServiceRequest, ServiceResponse}, dev::{ServiceRequest, ServiceResponse},
error::{BlockingError, Error, ErrorInternalServerError}, error::Error,
http::header::DispositionType, http::header::DispositionType,
}; };
use mime_guess::from_ext; use mime_guess::from_ext;
@ -56,13 +54,6 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime {
from_ext(ext).first_or_octet_stream() from_ext(ext).first_or_octet_stream()
} }
pub(crate) fn handle_error(err: BlockingError<io::Error>) -> Error {
match err {
BlockingError::Error(err) => err.into(),
BlockingError::Canceled => ErrorInternalServerError("Unexpected error"),
}
}
type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType;
#[cfg(test)] #[cfg(test)]

View File

@ -300,8 +300,8 @@ impl From<httparse::Error> for ParseError {
} }
} }
#[derive(Debug, Display)]
/// A set of errors that can occur running blocking tasks in thread pool. /// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> { pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)] #[display(fmt = "{:?}", _0)]
Error(E), Error(E),

View File

@ -280,9 +280,6 @@ where
I: Send + 'static, I: Send + 'static,
E: Send + std::fmt::Debug + 'static, E: Send + std::fmt::Debug + 'static,
{ {
// map error to BlockingError. this is for not breaking the existing api.
// preferably this should just return actix_rt::task::JoinHandle for a more
// flexible control of when the task is awaited and/or the abort(if needed).
match actix_rt::task::spawn_blocking(f).await { match actix_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error), Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled), Err(_) => Err(BlockingError::Canceled),