move chunked structures

This commit is contained in:
Rob Ede 2020-09-15 23:42:08 +01:00
parent dc572f7825
commit 69595439f6
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
2 changed files with 111 additions and 89 deletions

View File

@ -0,0 +1,81 @@
use std::{
cmp,
fs::File,
future::Future,
io::{self, Read, Seek},
pin::Pin,
task::{Context, Poll},
};
use actix_web::{
error::{BlockingError, Error},
web,
};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::future::{FutureExt, LocalBoxFuture};
use crate::handle_error;
type ChunkedBoxFuture =
LocalBoxFuture<'static, Result<(File, Bytes), BlockingError<io::Error>>>;
#[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file
/// chunk-by-chunk on a `ThreadPool`.
pub struct ChunkedReadFile {
pub(crate) size: u64,
pub(crate) offset: u64,
pub(crate) file: Option<File>,
pub(crate) fut: Option<ChunkedBoxFuture>,
pub(crate) counter: u64,
}
impl Stream for ChunkedReadFile {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok((file, bytes))) => {
self.fut.take();
self.file = Some(file);
self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(handle_error(e)))),
Poll::Pending => Poll::Pending,
};
}
let size = self.size;
let offset = self.offset;
let counter = self.counter;
if size == counter {
Poll::Ready(None)
} else {
let mut file = self.file.take().expect("Use after completion");
self.fut = Some(
web::block(move || {
let max_bytes: usize;
max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let nbytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if nbytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
})
.boxed_local(),
);
self.poll_next(cx)
}
}
}

View File

@ -3,37 +3,39 @@
#![deny(rust_2018_idioms)] #![deny(rust_2018_idioms)]
#![allow(clippy::borrow_interior_mutable_const)] #![allow(clippy::borrow_interior_mutable_const)]
use std::cell::RefCell; use std::{
use std::fs::File; cell::RefCell,
use std::future::Future; io,
use std::io::{Read, Seek}; path::PathBuf,
use std::path::PathBuf; rc::Rc,
use std::pin::Pin; task::{Context, Poll},
use std::rc::Rc; };
use std::task::{Context, Poll};
use std::{cmp, io}; use actix_service::{
boxed::{self, BoxService, BoxServiceFactory},
use actix_service::boxed::{self, BoxService, BoxServiceFactory}; IntoServiceFactory, Service, ServiceFactory,
use actix_service::{IntoServiceFactory, Service, ServiceFactory}; };
use actix_web::dev::{ use actix_web::{
AppService, HttpServiceFactory, Payload, ResourceDef, ServiceRequest, dev::{
ServiceResponse, AppService, HttpServiceFactory, Payload, ResourceDef, ServiceRequest,
ServiceResponse,
},
error::{BlockingError, Error, ErrorInternalServerError},
guard::Guard,
http::header::{self, DispositionType},
http::Method,
FromRequest, HttpRequest, HttpResponse,
}; };
use actix_web::error::{BlockingError, Error, ErrorInternalServerError};
use actix_web::guard::Guard;
use actix_web::http::header::{self, DispositionType};
use actix_web::http::Method;
use actix_web::{web, FromRequest, HttpRequest, HttpResponse};
use bytes::Bytes;
use futures_core::Stream;
use futures_util::future::{ok, ready, Either, FutureExt, LocalBoxFuture, Ready}; use futures_util::future::{ok, ready, Either, FutureExt, LocalBoxFuture, Ready};
use mime_guess::from_ext; use mime_guess::from_ext;
mod chunked;
mod directory; mod directory;
mod error; mod error;
mod named; mod named;
mod range; mod range;
pub use crate::chunked::ChunkedReadFile;
pub use crate::directory::Directory; pub use crate::directory::Directory;
pub use crate::named::NamedFile; pub use crate::named::NamedFile;
pub use crate::range::HttpRange; pub use crate::range::HttpRange;
@ -52,73 +54,12 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime {
from_ext(ext).first_or_octet_stream() from_ext(ext).first_or_octet_stream()
} }
fn handle_error(err: BlockingError<io::Error>) -> Error { pub(crate) fn handle_error(err: BlockingError<io::Error>) -> Error {
match err { match err {
BlockingError::Error(err) => err.into(), BlockingError::Error(err) => err.into(),
BlockingError::Canceled => ErrorInternalServerError("Unexpected error"), BlockingError::Canceled => ErrorInternalServerError("Unexpected error"),
} }
} }
#[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file
/// chunk-by-chunk on a `ThreadPool`.
pub struct ChunkedReadFile {
size: u64,
offset: u64,
file: Option<File>,
#[allow(clippy::type_complexity)]
fut:
Option<LocalBoxFuture<'static, Result<(File, Bytes), BlockingError<io::Error>>>>,
counter: u64,
}
impl Stream for ChunkedReadFile {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
return match Pin::new(fut).poll(cx) {
Poll::Ready(Ok((file, bytes))) => {
self.fut.take();
self.file = Some(file);
self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(handle_error(e)))),
Poll::Pending => Poll::Pending,
};
}
let size = self.size;
let offset = self.offset;
let counter = self.counter;
if size == counter {
Poll::Ready(None)
} else {
let mut file = self.file.take().expect("Use after completion");
self.fut = Some(
web::block(move || {
let max_bytes: usize;
max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let nbytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if nbytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
})
.boxed_local(),
);
self.poll_next(cx)
}
}
}
type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType;
@ -536,7 +477,7 @@ impl FromRequest for PathBufWrp {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::fs; use std::fs::{self, File};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::Add; use std::ops::Add;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -549,7 +490,7 @@ mod tests {
use actix_web::http::{Method, StatusCode}; use actix_web::http::{Method, StatusCode};
use actix_web::middleware::Compress; use actix_web::middleware::Compress;
use actix_web::test::{self, TestRequest}; use actix_web::test::{self, TestRequest};
use actix_web::{App, Responder}; use actix_web::{web, App, Responder};
#[actix_rt::test] #[actix_rt::test]
async fn test_file_extension_to_mime() { async fn test_file_extension_to_mime() {
@ -906,7 +847,7 @@ mod tests {
// Check file contents // Check file contents
let bytes = response.body().await.unwrap(); let bytes = response.body().await.unwrap();
let data = Bytes::from(fs::read("tests/test.binary").unwrap()); let data = web::Bytes::from(fs::read("tests/test.binary").unwrap());
assert_eq!(bytes, data); assert_eq!(bytes, data);
} }
@ -939,7 +880,7 @@ mod tests {
assert_eq!(response.status(), StatusCode::OK); assert_eq!(response.status(), StatusCode::OK);
let bytes = test::read_body(response).await; let bytes = test::read_body(response).await;
let data = Bytes::from(fs::read("tests/test space.binary").unwrap()); let data = web::Bytes::from(fs::read("tests/test space.binary").unwrap());
assert_eq!(bytes, data); assert_eq!(bytes, data);
} }
@ -1117,7 +1058,7 @@ mod tests {
let resp = test::call_service(&mut st, req).await; let resp = test::call_service(&mut st, req).await;
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
let bytes = test::read_body(resp).await; let bytes = test::read_body(resp).await;
assert_eq!(bytes, Bytes::from_static(b"default content")); assert_eq!(bytes, web::Bytes::from_static(b"default content"));
} }
// #[actix_rt::test] // #[actix_rt::test]