From 200cf0c07347fdb0e4854b2cc1ca3b8a31a0f647 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 17 Oct 2021 16:07:23 +0800 Subject: [PATCH] add re-used buffer for io-uring feature --- actix-files/src/chunked.rs | 192 ++++++++++++++++++++++++++----------- 1 file changed, 136 insertions(+), 56 deletions(-) diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index df396f6a5..e5dba29db 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -13,15 +13,6 @@ use pin_project_lite::pin_project; use super::named::File; -#[cfg(not(feature = "io-uring"))] -use { - actix_web::{ - error::BlockingError, - rt::task::{spawn_blocking, JoinError}, - }, - std::io::{Read, Seek}, -}; - pin_project! { #[doc(hidden)] /// A helper created from a `std::fs::File` which reads the file @@ -36,6 +27,7 @@ pin_project! { } } +#[cfg(not(feature = "io-uring"))] pin_project! { #[project = ChunkedReadFileStateProj] #[project_replace = ChunkedReadFileStateProjReplace] @@ -50,6 +42,21 @@ pin_project! { } } +#[cfg(feature = "io-uring")] +pin_project! { + #[project = ChunkedReadFileStateProj] + #[project_replace = ChunkedReadFileStateProjReplace] + enum ChunkedReadFileState { + File { + file: Option<(File, BytesMut)>, + }, + Future { + #[pin] + fut: Fut + } + } +} + impl fmt::Debug for ChunkedReadFile { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("ChunkedReadFile") @@ -64,19 +71,26 @@ pub(crate) fn new_chunked_read( ChunkedReadFile { size, offset, + #[cfg(not(feature = "io-uring"))] state: ChunkedReadFileState::File { file: Some(file) }, + #[cfg(feature = "io-uring")] + state: ChunkedReadFileState::File { + file: Some((file, BytesMut::new())), + }, counter: 0, callback: chunked_read_file_callback, } } #[cfg(not(feature = "io-uring"))] -fn chunked_read_file_callback( +async fn chunked_read_file_callback( mut file: File, offset: u64, max_bytes: usize, -) -> impl Future, JoinError>> { - spawn_blocking(move || { +) -> Result<(File, Bytes), Error> { + use io::{Read, Seek}; + + let res = actix_web::rt::task::spawn_blocking(move || { let mut buf = Vec::with_capacity(max_bytes); file.seek(io::SeekFrom::Start(offset))?; @@ -84,11 +98,15 @@ fn chunked_read_file_callback( let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); + Err(io::ErrorKind::UnexpectedEof.into()) + } else { + Ok::<_, io::Error>((file, Bytes::from(buf))) } - - Ok((file, Bytes::from(buf))) }) + .await + .map_err(|_| actix_web::error::BlockingError)??; + + Ok(res) } #[cfg(feature = "io-uring")] @@ -96,26 +114,75 @@ async fn chunked_read_file_callback( file: File, offset: u64, max_bytes: usize, -) -> io::Result<(File, Bytes)> { - let buf = Vec::with_capacity(max_bytes); - - let (res, mut buf) = file.read_at(buf, offset).await; + mut bytes_mut: BytesMut, +) -> io::Result<(File, Bytes, BytesMut)> { + bytes_mut.reserve(max_bytes); + let (res, mut bytes_mut) = file.read_at(bytes_mut, offset).await; let n_bytes = res?; if n_bytes == 0 { return Err(io::ErrorKind::UnexpectedEof.into()); } - let _ = buf.split_off(n_bytes); + let bytes = bytes_mut.split_to(n_bytes).freeze(); - Ok((file, Bytes::from(buf))) + Ok((file, bytes, bytes_mut)) } #[cfg(feature = "io-uring")] impl Stream for ChunkedReadFile +where + F: Fn(File, u64, usize, BytesMut) -> Fut, + Fut: Future>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + ChunkedReadFileStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; + + if size == counter { + Poll::Ready(None) + } else { + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + + let (file, bytes_mut) = file + .take() + .expect("ChunkedReadFile polled after completion"); + + let fut = (this.callback)(file, offset, max_bytes, bytes_mut); + + this.state + .project_replace(ChunkedReadFileState::Future { fut }); + + self.poll_next(cx) + } + } + ChunkedReadFileStateProj::Future { fut } => { + let (file, bytes, bytes_mut) = ready!(fut.poll(cx))?; + + this.state.project_replace(ChunkedReadFileState::File { + file: Some((file, bytes_mut)), + }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; + + Poll::Ready(Some(Ok(bytes))) + } + } + } +} + +#[cfg(not(feature = "io-uring"))] +impl Stream for ChunkedReadFile where F: Fn(File, u64, usize) -> Fut, - Fut: Future>, + Fut: Future>, { type Item = Result; @@ -159,49 +226,62 @@ where } } -#[cfg(not(feature = "io-uring"))] -impl Stream for ChunkedReadFile -where - F: Fn(File, u64, usize) -> Fut, - Fut: Future, JoinError>>, -{ - type Item = Result; +#[cfg(feature = "io-uring")] +use bytes_mut::BytesMut; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); - match this.state.as_mut().project() { - ChunkedReadFileStateProj::File { file } => { - let size = *this.size; - let offset = *this.offset; - let counter = *this.counter; +// TODO: remove new type and use bytes::BytesMut directly +#[doc(hidden)] +#[cfg(feature = "io-uring")] +mod bytes_mut { + use std::ops::{Deref, DerefMut}; - if size == counter { - Poll::Ready(None) - } else { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + use tokio_uring::buf::{IoBuf, IoBufMut}; - let file = file - .take() - .expect("ChunkedReadFile polled after completion"); + #[derive(Debug)] + pub struct BytesMut(bytes::BytesMut); - let fut = (this.callback)(file, offset, max_bytes); + impl BytesMut { + pub(super) fn new() -> Self { + Self(bytes::BytesMut::new()) + } + } - this.state - .project_replace(ChunkedReadFileState::Future { fut }); + impl Deref for BytesMut { + type Target = bytes::BytesMut; - self.poll_next(cx) - } - } - ChunkedReadFileStateProj::Future { fut } => { - let (file, bytes) = ready!(fut.poll(cx)).map_err(|_| BlockingError)??; + fn deref(&self) -> &Self::Target { + &self.0 + } + } - this.state - .project_replace(ChunkedReadFileState::File { file: Some(file) }); + impl DerefMut for BytesMut { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } + } - *this.offset += bytes.len() as u64; - *this.counter += bytes.len() as u64; + unsafe impl IoBuf for BytesMut { + fn stable_ptr(&self) -> *const u8 { + self.0.as_ptr() + } - Poll::Ready(Some(Ok(bytes))) + fn bytes_init(&self) -> usize { + self.0.len() + } + + fn bytes_total(&self) -> usize { + self.0.capacity() + } + } + + unsafe impl IoBufMut for BytesMut { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.0.as_mut_ptr() + } + + unsafe fn set_init(&mut self, init_len: usize) { + if self.len() < init_len { + self.0.set_len(init_len); } } }