From 69595439f6a651d6055e035c2d998a589fda20d0 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 15 Sep 2020 23:42:08 +0100 Subject: [PATCH] move chunked structures --- actix-files/src/chunked.rs | 81 +++++++++++++++++++++++++ actix-files/src/lib.rs | 119 ++++++++++--------------------------- 2 files changed, 111 insertions(+), 89 deletions(-) create mode 100644 actix-files/src/chunked.rs diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs new file mode 100644 index 000000000..1e9757e05 --- /dev/null +++ b/actix-files/src/chunked.rs @@ -0,0 +1,81 @@ +use std::{ + cmp, + fs::File, + future::Future, + io::{self, Read, Seek}, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_web::{ + error::{BlockingError, Error}, + web, +}; +use bytes::Bytes; +use futures_core::Stream; +use futures_util::future::{FutureExt, LocalBoxFuture}; + +use crate::handle_error; + +type ChunkedBoxFuture = + LocalBoxFuture<'static, Result<(File, Bytes), BlockingError>>; + +#[doc(hidden)] +/// A helper created from a `std::fs::File` which reads the file +/// chunk-by-chunk on a `ThreadPool`. +pub struct ChunkedReadFile { + pub(crate) size: u64, + pub(crate) offset: u64, + pub(crate) file: Option, + pub(crate) fut: Option, + pub(crate) counter: u64, +} + +impl Stream for ChunkedReadFile { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(ref mut fut) = self.fut { + return match Pin::new(fut).poll(cx) { + Poll::Ready(Ok((file, bytes))) => { + self.fut.take(); + self.file = Some(file); + self.offset += bytes.len() as u64; + self.counter += bytes.len() as u64; + Poll::Ready(Some(Ok(bytes))) + } + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(handle_error(e)))), + Poll::Pending => Poll::Pending, + }; + } + + let size = self.size; + let offset = self.offset; + let counter = self.counter; + + if size == counter { + Poll::Ready(None) + } else { + let mut file = self.file.take().expect("Use after completion"); + self.fut = Some( + web::block(move || { + let max_bytes: usize; + max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + let mut buf = Vec::with_capacity(max_bytes); + file.seek(io::SeekFrom::Start(offset))?; + let nbytes = + file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + if nbytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + Ok((file, Bytes::from(buf))) + }) + .boxed_local(), + ); + self.poll_next(cx) + } + } +} diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index a062a2396..b8b347a16 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -3,37 +3,39 @@ #![deny(rust_2018_idioms)] #![allow(clippy::borrow_interior_mutable_const)] -use std::cell::RefCell; -use std::fs::File; -use std::future::Future; -use std::io::{Read, Seek}; -use std::path::PathBuf; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::{cmp, io}; - -use actix_service::boxed::{self, BoxService, BoxServiceFactory}; -use actix_service::{IntoServiceFactory, Service, ServiceFactory}; -use actix_web::dev::{ - AppService, HttpServiceFactory, Payload, ResourceDef, ServiceRequest, - ServiceResponse, +use std::{ + cell::RefCell, + io, + path::PathBuf, + rc::Rc, + task::{Context, Poll}, +}; + +use actix_service::{ + boxed::{self, BoxService, BoxServiceFactory}, + IntoServiceFactory, Service, ServiceFactory, +}; +use actix_web::{ + dev::{ + AppService, HttpServiceFactory, Payload, ResourceDef, ServiceRequest, + ServiceResponse, + }, + error::{BlockingError, Error, ErrorInternalServerError}, + guard::Guard, + http::header::{self, DispositionType}, + http::Method, + FromRequest, HttpRequest, HttpResponse, }; -use actix_web::error::{BlockingError, Error, ErrorInternalServerError}; -use actix_web::guard::Guard; -use actix_web::http::header::{self, DispositionType}; -use actix_web::http::Method; -use actix_web::{web, FromRequest, HttpRequest, HttpResponse}; -use bytes::Bytes; -use futures_core::Stream; use futures_util::future::{ok, ready, Either, FutureExt, LocalBoxFuture, Ready}; use mime_guess::from_ext; +mod chunked; mod directory; mod error; mod named; mod range; +pub use crate::chunked::ChunkedReadFile; pub use crate::directory::Directory; pub use crate::named::NamedFile; pub use crate::range::HttpRange; @@ -52,73 +54,12 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime { from_ext(ext).first_or_octet_stream() } -fn handle_error(err: BlockingError) -> Error { +pub(crate) fn handle_error(err: BlockingError) -> Error { match err { BlockingError::Error(err) => err.into(), BlockingError::Canceled => ErrorInternalServerError("Unexpected error"), } } -#[doc(hidden)] -/// A helper created from a `std::fs::File` which reads the file -/// chunk-by-chunk on a `ThreadPool`. -pub struct ChunkedReadFile { - size: u64, - offset: u64, - file: Option, - #[allow(clippy::type_complexity)] - fut: - Option>>>, - counter: u64, -} - -impl Stream for ChunkedReadFile { - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - if let Some(ref mut fut) = self.fut { - return match Pin::new(fut).poll(cx) { - Poll::Ready(Ok((file, bytes))) => { - self.fut.take(); - self.file = Some(file); - self.offset += bytes.len() as u64; - self.counter += bytes.len() as u64; - Poll::Ready(Some(Ok(bytes))) - } - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(handle_error(e)))), - Poll::Pending => Poll::Pending, - }; - } - - let size = self.size; - let offset = self.offset; - let counter = self.counter; - - if size == counter { - Poll::Ready(None) - } else { - let mut file = self.file.take().expect("Use after completion"); - self.fut = Some( - web::block(move || { - let max_bytes: usize; - max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; - let nbytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - if nbytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - Ok((file, Bytes::from(buf))) - }) - .boxed_local(), - ); - self.poll_next(cx) - } - } -} type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType; @@ -536,7 +477,7 @@ impl FromRequest for PathBufWrp { #[cfg(test)] mod tests { - use std::fs; + use std::fs::{self, File}; use std::iter::FromIterator; use std::ops::Add; use std::time::{Duration, SystemTime}; @@ -549,7 +490,7 @@ mod tests { use actix_web::http::{Method, StatusCode}; use actix_web::middleware::Compress; use actix_web::test::{self, TestRequest}; - use actix_web::{App, Responder}; + use actix_web::{web, App, Responder}; #[actix_rt::test] async fn test_file_extension_to_mime() { @@ -906,7 +847,7 @@ mod tests { // Check file contents let bytes = response.body().await.unwrap(); - let data = Bytes::from(fs::read("tests/test.binary").unwrap()); + let data = web::Bytes::from(fs::read("tests/test.binary").unwrap()); assert_eq!(bytes, data); } @@ -939,7 +880,7 @@ mod tests { assert_eq!(response.status(), StatusCode::OK); let bytes = test::read_body(response).await; - let data = Bytes::from(fs::read("tests/test space.binary").unwrap()); + let data = web::Bytes::from(fs::read("tests/test space.binary").unwrap()); assert_eq!(bytes, data); } @@ -1117,7 +1058,7 @@ mod tests { let resp = test::call_service(&mut st, req).await; assert_eq!(resp.status(), StatusCode::OK); let bytes = test::read_body(resp).await; - assert_eq!(bytes, Bytes::from_static(b"default content")); + assert_eq!(bytes, web::Bytes::from_static(b"default content")); } // #[actix_rt::test]