From a751be73cc8b9043bc6814ce5f7a61a3930c9e4f Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 17 Oct 2021 01:38:47 +0800 Subject: [PATCH] reduce heap allocation when chunked streaming. --- actix-files/Cargo.toml | 1 + actix-files/src/chunked.rs | 258 +++++++++++++++++++++++-------------- actix-files/src/named.rs | 5 +- src/response/builder.rs | 2 +- 4 files changed, 162 insertions(+), 104 deletions(-) diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index b8f6ddd47..fc9c4c203 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -33,6 +33,7 @@ log = "0.4" mime = "0.3" mime_guess = "2.0.1" percent-encoding = "2.1" +pin-project-lite = "0.2.7" tokio = { version = "1", optional = true } tokio-uring = { version = "0.1", optional = true } diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index abaebe4b5..4d80303d3 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -1,5 +1,7 @@ use std::{ - cmp, fmt, io, + cmp, fmt, + future::Future, + io, pin::Pin, task::{Context, Poll}, }; @@ -7,6 +9,7 @@ use std::{ use actix_web::error::Error; use bytes::Bytes; use futures_core::{ready, Stream}; +use pin_project_lite::pin_project; use super::named::File; @@ -14,136 +17,191 @@ use super::named::File; use { actix_web::{ error::BlockingError, - rt::task::{spawn_blocking, JoinHandle}, - }, - std::{ - future::Future, - io::{Read, Seek}, + rt::task::{spawn_blocking, JoinError}, }, + std::io::{Read, Seek}, }; -#[cfg(feature = "io-uring")] -use futures_core::future::LocalBoxFuture; - -#[doc(hidden)] -/// A helper created from a `std::fs::File` which reads the file -/// chunk-by-chunk on a `ThreadPool`. -pub struct ChunkedReadFile { - size: u64, - offset: u64, - state: ChunkedReadFileState, - counter: u64, +pin_project! { + #[doc(hidden)] + /// A helper created from a `std::fs::File` which reads the file + /// chunk-by-chunk on a `ThreadPool`. + pub struct ChunkedReadFile { + size: u64, + offset: u64, + #[pin] + state: ChunkedReadFileState, + counter: u64, + callback: F, + } } -enum ChunkedReadFileState { - File(Option), - #[cfg(not(feature = "io-uring"))] - Future(JoinHandle>), - #[cfg(feature = "io-uring")] - Future(LocalBoxFuture<'static, Result<(File, Bytes), io::Error>>), -} - -impl ChunkedReadFile { - pub(crate) fn new(size: u64, offset: u64, file: File) -> Self { - Self { - size, - offset, - state: ChunkedReadFileState::File(Some(file)), - counter: 0, +pin_project! { + #[project = ChunkedReadFileStateProj] + #[project_replace = ChunkedReadFileStateProjReplace] + enum ChunkedReadFileState { + File { + file: Option, + }, + Future { + #[pin] + fut: Fut } } } -impl fmt::Debug for ChunkedReadFile { +impl fmt::Debug for ChunkedReadFile { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("ChunkedReadFile") } } -impl Stream for ChunkedReadFile { +pub(crate) fn new_chunked_read( + size: u64, + offset: u64, + file: File, +) -> impl Stream> { + ChunkedReadFile { + size, + offset, + state: ChunkedReadFileState::File { file: Some(file) }, + counter: 0, + callback: chunked_read_file_callback, + } +} + +#[cfg(not(feature = "io-uring"))] +fn chunked_read_file_callback( + mut file: File, + offset: u64, + max_bytes: usize, +) -> impl Future, JoinError>> { + spawn_blocking(move || { + let mut buf = Vec::with_capacity(max_bytes); + + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok((file, Bytes::from(buf))) + }) +} + +#[cfg(feature = "io-uring")] +fn chunked_read_file_callback( + file: File, + offset: u64, + max_bytes: usize, +) -> impl Future> { + async move { + let buf = Vec::with_capacity(max_bytes); + + let (res, mut buf) = file.read_at(buf, offset).await; + let n_bytes = res?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + let _ = buf.split_off(n_bytes); + + Ok((file, Bytes::from(buf))) + } +} + +#[cfg(feature = "io-uring")] +impl Stream for ChunkedReadFile +where + F: Fn(File, u64, usize) -> Fut, + Fut: Future>, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.as_mut().get_mut(); - match this.state { - ChunkedReadFileState::File(ref mut file) => { - let size = this.size; - let offset = this.offset; - let counter = this.counter; + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + ChunkedReadFileStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; if size == counter { Poll::Ready(None) } else { let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - let fut = { - #[cfg(not(feature = "io-uring"))] - { - let mut file = file - .take() - .expect("ChunkedReadFile polled after completion"); + let file = file + .take() + .expect("ChunkedReadFile polled after completion"); - spawn_blocking(move || { - let mut buf = Vec::with_capacity(max_bytes); + let fut = (this.callback)(file, offset, max_bytes); - file.seek(io::SeekFrom::Start(offset))?; - - let n_bytes = file - .by_ref() - .take(max_bytes as u64) - .read_to_end(&mut buf)?; - - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok((file, Bytes::from(buf))) - }) - } - #[cfg(feature = "io-uring")] - { - let file = file - .take() - .expect("ChunkedReadFile polled after completion"); - Box::pin(async move { - let buf = Vec::with_capacity(max_bytes); - - let (res, mut buf) = file.read_at(buf, offset).await; - let n_bytes = res?; - - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - let _ = buf.split_off(n_bytes); - - Ok((file, Bytes::from(buf))) - }) - } - }; - - this.state = ChunkedReadFileState::Future(fut); + this.state + .project_replace(ChunkedReadFileState::Future { fut }); self.poll_next(cx) } } - ChunkedReadFileState::Future(ref mut fut) => { - let (file, bytes) = { - #[cfg(not(feature = "io-uring"))] - { - ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)?? - } + ChunkedReadFileStateProj::Future { fut } => { + let (file, bytes) = ready!(fut.poll(cx))?; - #[cfg(feature = "io-uring")] - { - ready!(fut.as_mut().poll(cx))? - } - }; + this.state + .project_replace(ChunkedReadFileState::File { file: Some(file) }); - this.state = ChunkedReadFileState::File(Some(file)); - - this.offset += bytes.len() as u64; - this.counter += bytes.len() as u64; + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; + + Poll::Ready(Some(Ok(bytes))) + } + } + } +} + +#[cfg(not(feature = "io-uring"))] +impl Stream for ChunkedReadFile +where + F: Fn(File, u64, usize) -> Fut, + Fut: Future, JoinError>>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + ChunkedReadFileStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; + + if size == counter { + Poll::Ready(None) + } else { + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let file = file + .take() + .expect("ChunkedReadFile polled after completion"); + + let fut = (this.callback)(file, offset, max_bytes); + + this.state + .project_replace(ChunkedReadFileState::Future { fut }); + + self.poll_next(cx) + } + } + ChunkedReadFileStateProj::Future { fut } => { + let (file, bytes) = ready!(fut.poll(cx)).map_err(|_| BlockingError)??; + + this.state + .project_replace(ChunkedReadFileState::File { file: Some(file) }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; Poll::Ready(Some(Ok(bytes))) } diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index f2d8674c0..5395d173c 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -28,7 +28,6 @@ use actix_web::{ use bitflags::bitflags; use mime_guess::from_path; -use crate::ChunkedReadFile; use crate::{encoding::equiv_utf8_text, range::HttpRange}; bitflags! { @@ -424,7 +423,7 @@ impl NamedFile { res.encoding(current_encoding); } - let reader = ChunkedReadFile::new(self.md.len(), 0, self.file); + let reader = super::chunked::new_chunked_read(self.md.len(), 0, self.file); return res.streaming(reader); } @@ -538,7 +537,7 @@ impl NamedFile { return resp.status(StatusCode::NOT_MODIFIED).finish(); } - let reader = ChunkedReadFile::new(length, offset, self.file); + let reader = super::chunked::new_chunked_read(length, offset, self.file); if offset != 0 || length != self.md.len() { resp.status(StatusCode::PARTIAL_CONTENT); diff --git a/src/response/builder.rs b/src/response/builder.rs index 56d30d9d0..b22af9c46 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -354,7 +354,7 @@ impl HttpResponseBuilder { #[inline] pub fn streaming(&mut self, stream: S) -> HttpResponse where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, E: Into> + 'static, { self.body(AnyBody::from_message(BodyStream::new(stream)))