add re-used buffer for io-uring feature

This commit is contained in:
fakeshadow 2021-10-17 16:07:23 +08:00
parent d521c574da
commit 200cf0c073
1 changed files with 136 additions and 56 deletions

View File

@ -13,15 +13,6 @@ use pin_project_lite::pin_project;
use super::named::File; use super::named::File;
#[cfg(not(feature = "io-uring"))]
use {
actix_web::{
error::BlockingError,
rt::task::{spawn_blocking, JoinError},
},
std::io::{Read, Seek},
};
pin_project! { pin_project! {
#[doc(hidden)] #[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file /// A helper created from a `std::fs::File` which reads the file
@ -36,6 +27,7 @@ pin_project! {
} }
} }
#[cfg(not(feature = "io-uring"))]
pin_project! { pin_project! {
#[project = ChunkedReadFileStateProj] #[project = ChunkedReadFileStateProj]
#[project_replace = ChunkedReadFileStateProjReplace] #[project_replace = ChunkedReadFileStateProjReplace]
@ -50,6 +42,21 @@ pin_project! {
} }
} }
#[cfg(feature = "io-uring")]
pin_project! {
#[project = ChunkedReadFileStateProj]
#[project_replace = ChunkedReadFileStateProjReplace]
enum ChunkedReadFileState<Fut> {
File {
file: Option<(File, BytesMut)>,
},
Future {
#[pin]
fut: Fut
}
}
}
impl<F, Fut> fmt::Debug for ChunkedReadFile<F, Fut> { impl<F, Fut> fmt::Debug for ChunkedReadFile<F, Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ChunkedReadFile") f.write_str("ChunkedReadFile")
@ -64,19 +71,26 @@ pub(crate) fn new_chunked_read(
ChunkedReadFile { ChunkedReadFile {
size, size,
offset, offset,
#[cfg(not(feature = "io-uring"))]
state: ChunkedReadFileState::File { file: Some(file) }, state: ChunkedReadFileState::File { file: Some(file) },
#[cfg(feature = "io-uring")]
state: ChunkedReadFileState::File {
file: Some((file, BytesMut::new())),
},
counter: 0, counter: 0,
callback: chunked_read_file_callback, callback: chunked_read_file_callback,
} }
} }
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
fn chunked_read_file_callback( async fn chunked_read_file_callback(
mut file: File, mut file: File,
offset: u64, offset: u64,
max_bytes: usize, max_bytes: usize,
) -> impl Future<Output = Result<io::Result<(File, Bytes)>, JoinError>> { ) -> Result<(File, Bytes), Error> {
spawn_blocking(move || { use io::{Read, Seek};
let res = actix_web::rt::task::spawn_blocking(move || {
let mut buf = Vec::with_capacity(max_bytes); let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?; file.seek(io::SeekFrom::Start(offset))?;
@ -84,11 +98,15 @@ fn chunked_read_file_callback(
let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 { if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into()); Err(io::ErrorKind::UnexpectedEof.into())
} else {
Ok::<_, io::Error>((file, Bytes::from(buf)))
} }
Ok((file, Bytes::from(buf)))
}) })
.await
.map_err(|_| actix_web::error::BlockingError)??;
Ok(res)
} }
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
@ -96,26 +114,75 @@ async fn chunked_read_file_callback(
file: File, file: File,
offset: u64, offset: u64,
max_bytes: usize, max_bytes: usize,
) -> io::Result<(File, Bytes)> { mut bytes_mut: BytesMut,
let buf = Vec::with_capacity(max_bytes); ) -> io::Result<(File, Bytes, BytesMut)> {
bytes_mut.reserve(max_bytes);
let (res, mut buf) = file.read_at(buf, offset).await; let (res, mut bytes_mut) = file.read_at(bytes_mut, offset).await;
let n_bytes = res?; let n_bytes = res?;
if n_bytes == 0 { if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into()); return Err(io::ErrorKind::UnexpectedEof.into());
} }
let _ = buf.split_off(n_bytes); let bytes = bytes_mut.split_to(n_bytes).freeze();
Ok((file, Bytes::from(buf))) Ok((file, bytes, bytes_mut))
} }
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut> impl<F, Fut> Stream for ChunkedReadFile<F, Fut>
where
F: Fn(File, u64, usize, BytesMut) -> Fut,
Fut: Future<Output = io::Result<(File, Bytes, BytesMut)>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
ChunkedReadFileStateProj::File { file } => {
let size = *this.size;
let offset = *this.offset;
let counter = *this.counter;
if size == counter {
Poll::Ready(None)
} else {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let (file, bytes_mut) = file
.take()
.expect("ChunkedReadFile polled after completion");
let fut = (this.callback)(file, offset, max_bytes, bytes_mut);
this.state
.project_replace(ChunkedReadFileState::Future { fut });
self.poll_next(cx)
}
}
ChunkedReadFileStateProj::Future { fut } => {
let (file, bytes, bytes_mut) = ready!(fut.poll(cx))?;
this.state.project_replace(ChunkedReadFileState::File {
file: Some((file, bytes_mut)),
});
*this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}
#[cfg(not(feature = "io-uring"))]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut>
where where
F: Fn(File, u64, usize) -> Fut, F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = io::Result<(File, Bytes)>>, Fut: Future<Output = Result<(File, Bytes), Error>>,
{ {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
@ -159,49 +226,62 @@ where
} }
} }
#[cfg(not(feature = "io-uring"))] #[cfg(feature = "io-uring")]
impl<F, Fut> Stream for ChunkedReadFile<F, Fut> use bytes_mut::BytesMut;
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = Result<io::Result<(File, Bytes)>, JoinError>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // TODO: remove new type and use bytes::BytesMut directly
let mut this = self.as_mut().project(); #[doc(hidden)]
match this.state.as_mut().project() { #[cfg(feature = "io-uring")]
ChunkedReadFileStateProj::File { file } => { mod bytes_mut {
let size = *this.size; use std::ops::{Deref, DerefMut};
let offset = *this.offset;
let counter = *this.counter;
if size == counter { use tokio_uring::buf::{IoBuf, IoBufMut};
Poll::Ready(None)
} else {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
let file = file #[derive(Debug)]
.take() pub struct BytesMut(bytes::BytesMut);
.expect("ChunkedReadFile polled after completion");
let fut = (this.callback)(file, offset, max_bytes); impl BytesMut {
pub(super) fn new() -> Self {
this.state Self(bytes::BytesMut::new())
.project_replace(ChunkedReadFileState::Future { fut });
self.poll_next(cx)
} }
} }
ChunkedReadFileStateProj::Future { fut } => {
let (file, bytes) = ready!(fut.poll(cx)).map_err(|_| BlockingError)??;
this.state impl Deref for BytesMut {
.project_replace(ChunkedReadFileState::File { file: Some(file) }); type Target = bytes::BytesMut;
*this.offset += bytes.len() as u64; fn deref(&self) -> &Self::Target {
*this.counter += bytes.len() as u64; &self.0
}
}
Poll::Ready(Some(Ok(bytes))) impl DerefMut for BytesMut {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
unsafe impl IoBuf for BytesMut {
fn stable_ptr(&self) -> *const u8 {
self.0.as_ptr()
}
fn bytes_init(&self) -> usize {
self.0.len()
}
fn bytes_total(&self) -> usize {
self.0.capacity()
}
}
unsafe impl IoBufMut for BytesMut {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.0.as_mut_ptr()
}
unsafe fn set_init(&mut self, init_len: usize) {
if self.len() < init_len {
self.0.set_len(init_len);
} }
} }
} }