reduce heap allocation when chunked streaming.

This commit is contained in:
fakeshadow 2021-10-17 01:38:47 +08:00
parent 9ffabe946f
commit a751be73cc
4 changed files with 162 additions and 104 deletions

View File

@ -33,6 +33,7 @@ log = "0.4"
mime = "0.3" mime = "0.3"
mime_guess = "2.0.1" mime_guess = "2.0.1"
percent-encoding = "2.1" percent-encoding = "2.1"
pin-project-lite = "0.2.7"
tokio = { version = "1", optional = true } tokio = { version = "1", optional = true }
tokio-uring = { version = "0.1", optional = true } tokio-uring = { version = "0.1", optional = true }

View File

@ -1,5 +1,7 @@
use std::{ use std::{
cmp, fmt, io, cmp, fmt,
future::Future,
io,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -7,6 +9,7 @@ use std::{
use actix_web::error::Error; use actix_web::error::Error;
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
use super::named::File; use super::named::File;
@ -14,136 +17,191 @@ use super::named::File;
use { use {
actix_web::{ actix_web::{
error::BlockingError, error::BlockingError,
rt::task::{spawn_blocking, JoinHandle}, rt::task::{spawn_blocking, JoinError},
},
std::{
future::Future,
io::{Read, Seek},
}, },
std::io::{Read, Seek},
}; };
#[cfg(feature = "io-uring")] pin_project! {
use futures_core::future::LocalBoxFuture; #[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file
#[doc(hidden)] /// chunk-by-chunk on a `ThreadPool`.
/// A helper created from a `std::fs::File` which reads the file pub struct ChunkedReadFile<F, Fut> {
/// chunk-by-chunk on a `ThreadPool`. size: u64,
pub struct ChunkedReadFile { offset: u64,
size: u64, #[pin]
offset: u64, state: ChunkedReadFileState<Fut>,
state: ChunkedReadFileState, counter: u64,
counter: u64, callback: F,
}
} }
enum ChunkedReadFileState { pin_project! {
File(Option<File>), #[project = ChunkedReadFileStateProj]
#[cfg(not(feature = "io-uring"))] #[project_replace = ChunkedReadFileStateProjReplace]
Future(JoinHandle<Result<(File, Bytes), io::Error>>), enum ChunkedReadFileState<Fut> {
#[cfg(feature = "io-uring")] File {
Future(LocalBoxFuture<'static, Result<(File, Bytes), io::Error>>), file: Option<File>,
} },
Future {
impl ChunkedReadFile { #[pin]
pub(crate) fn new(size: u64, offset: u64, file: File) -> Self { fut: Fut
Self {
size,
offset,
state: ChunkedReadFileState::File(Some(file)),
counter: 0,
} }
} }
} }
impl fmt::Debug for ChunkedReadFile { impl<F, Fut> fmt::Debug for ChunkedReadFile<F, Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ChunkedReadFile") f.write_str("ChunkedReadFile")
} }
} }
impl Stream for ChunkedReadFile { pub(crate) fn new_chunked_read(
size: u64,
offset: u64,
file: File,
) -> impl Stream<Item = Result<Bytes, Error>> {
ChunkedReadFile {
size,
offset,
state: ChunkedReadFileState::File { file: Some(file) },
counter: 0,
callback: chunked_read_file_callback,
}
}
#[cfg(not(feature = "io-uring"))]
fn chunked_read_file_callback(
mut file: File,
offset: u64,
max_bytes: usize,
) -> impl Future<Output = Result<io::Result<(File, Bytes)>, JoinError>> {
spawn_blocking(move || {
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
})
}
#[cfg(feature = "io-uring")]
fn chunked_read_file_callback(
file: File,
offset: u64,
max_bytes: usize,
) -> impl Future<Output = io::Result<(File, Bytes)>> {
async move {
let buf = Vec::with_capacity(max_bytes);
let (res, mut buf) = file.read_at(buf, offset).await;
let n_bytes = res?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
let _ = buf.split_off(n_bytes);
Ok((file, Bytes::from(buf)))
}
}
#[cfg(feature = "io-uring")]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = io::Result<(File, Bytes)>>,
{
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().get_mut(); let mut this = self.as_mut().project();
match this.state { match this.state.as_mut().project() {
ChunkedReadFileState::File(ref mut file) => { ChunkedReadFileStateProj::File { file } => {
let size = this.size; let size = *this.size;
let offset = this.offset; let offset = *this.offset;
let counter = this.counter; let counter = *this.counter;
if size == counter { if size == counter {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let fut = { let file = file
#[cfg(not(feature = "io-uring"))] .take()
{ .expect("ChunkedReadFile polled after completion");
let mut file = file
.take()
.expect("ChunkedReadFile polled after completion");
spawn_blocking(move || { let fut = (this.callback)(file, offset, max_bytes);
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?; this.state
.project_replace(ChunkedReadFileState::Future { fut });
let n_bytes = file
.by_ref()
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
})
}
#[cfg(feature = "io-uring")]
{
let file = file
.take()
.expect("ChunkedReadFile polled after completion");
Box::pin(async move {
let buf = Vec::with_capacity(max_bytes);
let (res, mut buf) = file.read_at(buf, offset).await;
let n_bytes = res?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
let _ = buf.split_off(n_bytes);
Ok((file, Bytes::from(buf)))
})
}
};
this.state = ChunkedReadFileState::Future(fut);
self.poll_next(cx) self.poll_next(cx)
} }
} }
ChunkedReadFileState::Future(ref mut fut) => { ChunkedReadFileStateProj::Future { fut } => {
let (file, bytes) = { let (file, bytes) = ready!(fut.poll(cx))?;
#[cfg(not(feature = "io-uring"))]
{
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??
}
#[cfg(feature = "io-uring")] this.state
{ .project_replace(ChunkedReadFileState::File { file: Some(file) });
ready!(fut.as_mut().poll(cx))?
}
};
this.state = ChunkedReadFileState::File(Some(file)); *this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
this.offset += bytes.len() as u64;
this.counter += bytes.len() as u64; Poll::Ready(Some(Ok(bytes)))
}
}
}
}
#[cfg(not(feature = "io-uring"))]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = Result<io::Result<(File, Bytes)>, JoinError>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ChunkedReadFileStateProj::File { file } => {
let size = *this.size;
let offset = *this.offset;
let counter = *this.counter;
if size == counter {
Poll::Ready(None)
} else {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let file = file
.take()
.expect("ChunkedReadFile polled after completion");
let fut = (this.callback)(file, offset, max_bytes);
this.state
.project_replace(ChunkedReadFileState::Future { fut });
self.poll_next(cx)
}
}
ChunkedReadFileStateProj::Future { fut } => {
let (file, bytes) = ready!(fut.poll(cx)).map_err(|_| BlockingError)??;
this.state
.project_replace(ChunkedReadFileState::File { file: Some(file) });
*this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes))) Poll::Ready(Some(Ok(bytes)))
} }

View File

@ -28,7 +28,6 @@ use actix_web::{
use bitflags::bitflags; use bitflags::bitflags;
use mime_guess::from_path; use mime_guess::from_path;
use crate::ChunkedReadFile;
use crate::{encoding::equiv_utf8_text, range::HttpRange}; use crate::{encoding::equiv_utf8_text, range::HttpRange};
bitflags! { bitflags! {
@ -424,7 +423,7 @@ impl NamedFile {
res.encoding(current_encoding); res.encoding(current_encoding);
} }
let reader = ChunkedReadFile::new(self.md.len(), 0, self.file); let reader = super::chunked::new_chunked_read(self.md.len(), 0, self.file);
return res.streaming(reader); return res.streaming(reader);
} }
@ -538,7 +537,7 @@ impl NamedFile {
return resp.status(StatusCode::NOT_MODIFIED).finish(); return resp.status(StatusCode::NOT_MODIFIED).finish();
} }
let reader = ChunkedReadFile::new(length, offset, self.file); let reader = super::chunked::new_chunked_read(length, offset, self.file);
if offset != 0 || length != self.md.len() { if offset != 0 || length != self.md.len() {
resp.status(StatusCode::PARTIAL_CONTENT); resp.status(StatusCode::PARTIAL_CONTENT);

View File

@ -354,7 +354,7 @@ impl HttpResponseBuilder {
#[inline] #[inline]
pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Box<dyn StdError>> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
self.body(AnyBody::from_message(BodyStream::new(stream))) self.body(AnyBody::from_message(BodyStream::new(stream)))