implement io-uring for actix-files

This commit is contained in:
fakeshadow 2021-10-14 05:09:30 +08:00
parent 99985fc4ec
commit bf724545f3
7 changed files with 342 additions and 159 deletions

View File

@ -67,12 +67,15 @@ rustls = ["actix-http/rustls", "actix-tls/accept", "actix-tls/rustls"]
# Don't rely on these whatsoever. They may disappear at anytime. # Don't rely on these whatsoever. They may disappear at anytime.
__compress = [] __compress = []
# io-uring feature only avaiable for linux OS.
io-uring = ["actix-rt/io-uring", "actix-server/io-uring"]
[dependencies] [dependencies]
actix-codec = "0.4.0" actix-codec = "0.4.0"
actix-macros = "0.2.1" actix-macros = "0.2.1"
actix-router = "0.5.0-beta.2" actix-router = "0.5.0-beta.2"
actix-rt = "2.2" actix-rt = "2.3"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.6"
actix-service = "2.0.0" actix-service = "2.0.0"
actix-utils = "3.0.0" actix-utils = "3.0.0"
actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true } actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true }

View File

@ -14,6 +14,9 @@ edition = "2018"
name = "actix_files" name = "actix_files"
path = "src/lib.rs" path = "src/lib.rs"
[features]
io-uring = ["actix-web/io-uring", "tokio", "tokio-uring"]
[dependencies] [dependencies]
actix-web = { version = "4.0.0-beta.9", default-features = false } actix-web = { version = "4.0.0-beta.9", default-features = false }
actix-http = "3.0.0-beta.10" actix-http = "3.0.0-beta.10"
@ -31,6 +34,9 @@ mime = "0.3"
mime_guess = "2.0.1" mime_guess = "2.0.1"
percent-encoding = "2.1" percent-encoding = "2.1"
tokio = { version = "1", optional = true }
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2" actix-rt = "2.2"
actix-web = "4.0.0-beta.9" actix-web = "4.0.0-beta.9"

View File

@ -1,19 +1,30 @@
use std::{ use std::{
cmp, fmt, cmp, fmt, io,
fs::File,
future::Future,
io::{self, Read, Seek},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_web::{ use actix_web::error::Error;
error::{BlockingError, Error},
rt::task::{spawn_blocking, JoinHandle},
};
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use super::named::File;
#[cfg(not(feature = "io-uring"))]
use {
actix_web::{
error::BlockingError,
rt::task::{spawn_blocking, JoinHandle},
},
std::{
future::Future,
io::{Read, Seek},
},
};
#[cfg(feature = "io-uring")]
use futures_core::future::LocalBoxFuture;
#[doc(hidden)] #[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file /// A helper created from a `std::fs::File` which reads the file
/// chunk-by-chunk on a `ThreadPool`. /// chunk-by-chunk on a `ThreadPool`.
@ -26,7 +37,10 @@ pub struct ChunkedReadFile {
enum ChunkedReadFileState { enum ChunkedReadFileState {
File(Option<File>), File(Option<File>),
#[cfg(not(feature = "io-uring"))]
Future(JoinHandle<Result<(File, Bytes), io::Error>>), Future(JoinHandle<Result<(File, Bytes), io::Error>>),
#[cfg(feature = "io-uring")]
Future(LocalBoxFuture<'static, Result<(File, Bytes), io::Error>>),
} }
impl ChunkedReadFile { impl ChunkedReadFile {
@ -60,32 +74,72 @@ impl Stream for ChunkedReadFile {
if size == counter { if size == counter {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let mut file = file let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;
.take()
.expect("ChunkedReadFile polled after completion");
let fut = spawn_blocking(move || { let fut = {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; #[cfg(not(feature = "io-uring"))]
{
let mut file = file
.take()
.expect("ChunkedReadFile polled after completion");
let mut buf = Vec::with_capacity(max_bytes); spawn_blocking(move || {
file.seek(io::SeekFrom::Start(offset))?; let mut buf = Vec::with_capacity(max_bytes);
let n_bytes = file.seek(io::SeekFrom::Start(offset))?;
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
if n_bytes == 0 { let n_bytes = file
return Err(io::ErrorKind::UnexpectedEof.into()); .by_ref()
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok((file, Bytes::from(buf)))
})
} }
#[cfg(feature = "io-uring")]
{
let file = file
.take()
.expect("ChunkedReadFile polled after completion");
Box::pin(async move {
let buf = Vec::with_capacity(max_bytes);
let (res, mut buf) = file.read_at(buf, offset).await;
let n_bytes = res?;
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
let _ = buf.split_off(n_bytes);
Ok((file, Bytes::from(buf)))
})
}
};
Ok((file, Bytes::from(buf)))
});
this.state = ChunkedReadFileState::Future(fut); this.state = ChunkedReadFileState::Future(fut);
self.poll_next(cx) self.poll_next(cx)
} }
} }
ChunkedReadFileState::Future(ref mut fut) => { ChunkedReadFileState::Future(ref mut fut) => {
let (file, bytes) = let (file, bytes) = {
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; #[cfg(not(feature = "io-uring"))]
{
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??
}
#[cfg(feature = "io-uring")]
{
ready!(fut.as_mut().poll(cx))?
}
};
this.state = ChunkedReadFileState::File(Some(file)); this.state = ChunkedReadFileState::File(Some(file));
this.offset += bytes.len() as u64; this.offset += bytes.len() as u64;

View File

@ -6,7 +6,6 @@ use std::{
}; };
use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt}; use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt};
use actix_utils::future::ok;
use actix_web::{ use actix_web::{
dev::{ dev::{
AppService, HttpServiceFactory, RequestHead, ResourceDef, ServiceRequest, AppService, HttpServiceFactory, RequestHead, ResourceDef, ServiceRequest,
@ -20,8 +19,9 @@ use actix_web::{
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use crate::{ use crate::{
directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService, directory_listing, named,
MimeOverride, PathFilter, service::{FilesService, FilesServiceInner},
Directory, DirectoryRenderer, HttpNewService, MimeOverride, PathFilter,
}; };
/// Static files handling service. /// Static files handling service.
@ -353,7 +353,7 @@ impl ServiceFactory<ServiceRequest> for Files {
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
let mut srv = FilesService { let mut inner = FilesServiceInner {
directory: self.directory.clone(), directory: self.directory.clone(),
index: self.index.clone(), index: self.index.clone(),
show_index: self.show_index, show_index: self.show_index,
@ -372,14 +372,14 @@ impl ServiceFactory<ServiceRequest> for Files {
Box::pin(async { Box::pin(async {
match fut.await { match fut.await {
Ok(default) => { Ok(default) => {
srv.default = Some(default); inner.default = Some(default);
Ok(srv) Ok(FilesService(Rc::new(inner)))
} }
Err(_) => Err(()), Err(_) => Err(()),
} }
}) })
} else { } else {
Box::pin(ok(srv)) Box::pin(async move { Ok(FilesService(Rc::new(inner))) })
} }
} }
} }

View File

@ -213,7 +213,8 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_named_file_non_ascii_file_name() { async fn test_named_file_non_ascii_file_name() {
let mut file = let mut file =
NamedFile::from_file(File::open("Cargo.toml").unwrap(), "貨物.toml").unwrap(); NamedFile::from_file(crate::named::File::open("Cargo.toml").unwrap(), "貨物.toml")
.unwrap();
{ {
file.file(); file.file();
let _f: &File = &file; let _f: &File = &file;

View File

@ -1,11 +1,16 @@
use std::{
fmt,
fs::Metadata,
io,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ok, ready, Ready}; use actix_utils::future::{ok, Ready};
use actix_web::dev::{AppService, HttpServiceFactory, ResourceDef}; use actix_web::dev::{AppService, HttpServiceFactory, ResourceDef};
use std::fs::{File, Metadata}; use futures_core::future::LocalBoxFuture;
use std::io;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::fs::MetadataExt; use std::os::unix::fs::MetadataExt;
@ -65,7 +70,6 @@ impl Default for Flags {
/// NamedFile::open("./static/index.html") /// NamedFile::open("./static/index.html")
/// } /// }
/// ``` /// ```
#[derive(Debug)]
pub struct NamedFile { pub struct NamedFile {
path: PathBuf, path: PathBuf,
file: File, file: File,
@ -78,6 +82,37 @@ pub struct NamedFile {
pub(crate) encoding: Option<ContentEncoding>, pub(crate) encoding: Option<ContentEncoding>,
} }
impl fmt::Debug for NamedFile {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NamedFile")
.field("path", &self.path)
.field(
"file",
#[cfg(feature = "io-uring")]
{
&"File"
},
#[cfg(not(feature = "io-uring"))]
{
&self.file
},
)
.field("modified", &self.modified)
.field("md", &self.md)
.field("flags", &self.flags)
.field("status_code", &self.status_code)
.field("content_type", &self.content_type)
.field("content_disposition", &self.content_disposition)
.field("encoding", &self.encoding)
.finish()
}
}
#[cfg(not(feature = "io-uring"))]
pub(crate) use std::fs::File;
#[cfg(feature = "io-uring")]
pub(crate) use tokio_uring::fs::File;
impl NamedFile { impl NamedFile {
/// Creates an instance from a previously opened file. /// Creates an instance from a previously opened file.
/// ///
@ -147,7 +182,26 @@ impl NamedFile {
(ct, cd) (ct, cd)
}; };
let md = file.metadata()?; let md = {
#[cfg(not(feature = "io-uring"))]
{
file.metadata()?
}
#[cfg(feature = "io-uring")]
{
use std::os::unix::prelude::{AsRawFd, FromRawFd};
let fd = file.as_raw_fd();
// SAFETY: fd is borrowed and lives longer than the unsafe block.
unsafe {
let fs = std::fs::File::from_raw_fd(fd);
fs.metadata()?
}
}
};
let modified = md.modified().ok(); let modified = md.modified().ok();
let encoding = None; let encoding = None;
@ -174,7 +228,43 @@ impl NamedFile {
/// let file = NamedFile::open("foo.txt"); /// let file = NamedFile::open("foo.txt");
/// ``` /// ```
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<NamedFile> { pub fn open<P: AsRef<Path>>(path: P) -> io::Result<NamedFile> {
Self::from_file(File::open(&path)?, path) #[cfg(not(feature = "io-uring"))]
{
let file = File::open(&path)?;
Self::from_file(file, path)
}
#[cfg(feature = "io-uring")]
{
tokio::runtime::Handle::current().block_on(Self::open_async(path))
}
}
/// Attempts to open a file asynchronously in read-only mode.
///
/// # Examples
///
/// ```
/// use actix_files::NamedFile;
///
/// # async fn open() {
/// let file = NamedFile::open_async("foo.txt").await.unwrap();
/// # }
/// ```
pub async fn open_async<P: AsRef<Path>>(path: P) -> io::Result<NamedFile> {
let file = {
#[cfg(not(feature = "io-uring"))]
{
File::open(&path)?
}
#[cfg(feature = "io-uring")]
{
File::open(&path).await?
}
};
Self::from_file(file, path)
} }
/// Returns reference to the underlying `File` object. /// Returns reference to the underlying `File` object.
@ -456,20 +546,6 @@ impl NamedFile {
} }
} }
impl Deref for NamedFile {
type Target = File;
fn deref(&self) -> &File {
&self.file
}
}
impl DerefMut for NamedFile {
fn deref_mut(&mut self) -> &mut File {
&mut self.file
}
}
/// Returns true if `req` has no `If-Match` header or one which matches `etag`. /// Returns true if `req` has no `If-Match` header or one which matches `etag`.
fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
match req.get_header::<header::IfMatch>() { match req.get_header::<header::IfMatch>() {
@ -510,6 +586,20 @@ fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool {
} }
} }
impl Deref for NamedFile {
type Target = File;
fn deref(&self) -> &File {
&self.file
}
}
impl DerefMut for NamedFile {
fn deref_mut(&mut self) -> &mut File {
&mut self.file
}
}
impl Responder for NamedFile { impl Responder for NamedFile {
fn respond_to(self, req: &HttpRequest) -> HttpResponse { fn respond_to(self, req: &HttpRequest) -> HttpResponse {
self.into_response(req) self.into_response(req)
@ -540,18 +630,19 @@ pub struct NamedFileService {
impl Service<ServiceRequest> for NamedFileService { impl Service<ServiceRequest> for NamedFileService {
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = Error; type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&self, req: ServiceRequest) -> Self::Future { fn call(&self, req: ServiceRequest) -> Self::Future {
let (req, _) = req.into_parts(); let (req, _) = req.into_parts();
ready(
NamedFile::open(&self.path) let path = self.path.clone();
.map_err(|e| e.into()) Box::pin(async move {
.map(|f| f.into_response(&req)) let file = NamedFile::open_async(path).await?;
.map(|res| ServiceResponse::new(req, res)), let res = file.into_response(&req);
) Ok(ServiceResponse::new(req, res))
})
} }
} }

View File

@ -1,7 +1,6 @@
use std::{fmt, io, path::PathBuf, rc::Rc}; use std::{fmt, io, ops::Deref, path::PathBuf, rc::Rc};
use actix_service::Service; use actix_service::Service;
use actix_utils::future::ok;
use actix_web::{ use actix_web::{
dev::{ServiceRequest, ServiceResponse}, dev::{ServiceRequest, ServiceResponse},
error::Error, error::Error,
@ -17,7 +16,18 @@ use crate::{
}; };
/// Assembled file serving service. /// Assembled file serving service.
pub struct FilesService { #[derive(Clone)]
pub struct FilesService(pub Rc<FilesServiceInner>);
impl Deref for FilesService {
type Target = FilesServiceInner;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
pub struct FilesServiceInner {
pub(crate) directory: PathBuf, pub(crate) directory: PathBuf,
pub(crate) index: Option<String>, pub(crate) index: Option<String>,
pub(crate) show_index: bool, pub(crate) show_index: bool,
@ -31,20 +41,42 @@ pub struct FilesService {
pub(crate) hidden_files: bool, pub(crate) hidden_files: bool,
} }
impl fmt::Debug for FilesServiceInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("FilesServiceInner")
}
}
impl FilesService { impl FilesService {
fn handle_err( async fn handle_err(
&self, &self,
err: io::Error, err: io::Error,
req: ServiceRequest, req: ServiceRequest,
) -> LocalBoxFuture<'static, Result<ServiceResponse, Error>> { ) -> Result<ServiceResponse, Error> {
log::debug!("error handling {}: {}", req.path(), err); log::debug!("error handling {}: {}", req.path(), err);
if let Some(ref default) = self.default { if let Some(ref default) = self.default {
Box::pin(default.call(req)) default.call(req).await
} else { } else {
Box::pin(ok(req.error_response(err))) Ok(req.error_response(err))
} }
} }
fn serve_named_file(
&self,
req: ServiceRequest,
mut named_file: NamedFile,
) -> ServiceResponse {
if let Some(ref mime_override) = self.mime_override {
let new_disposition = mime_override(&named_file.content_type.type_());
named_file.content_disposition.disposition = new_disposition;
}
named_file.flags = self.file_flags;
let (req, _) = req.into_parts();
let res = named_file.into_response(&req);
ServiceResponse::new(req, res)
}
} }
impl fmt::Debug for FilesService { impl fmt::Debug for FilesService {
@ -69,103 +101,99 @@ impl Service<ServiceRequest> for FilesService {
matches!(*req.method(), Method::HEAD | Method::GET) matches!(*req.method(), Method::HEAD | Method::GET)
}; };
if !is_method_valid { let this = self.clone();
return Box::pin(ok(req.into_response(
actix_web::HttpResponse::MethodNotAllowed()
.insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8))
.body("Request did not meet this resource's requirements."),
)));
}
let real_path = Box::pin(async move {
match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) { if !is_method_valid {
Ok(item) => item, return Ok(req.into_response(
Err(e) => return Box::pin(ok(req.error_response(e))), actix_web::HttpResponse::MethodNotAllowed()
}; .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8))
.body("Request did not meet this resource's requirements."),
));
}
if let Some(filter) = &self.path_filter { let real_path =
if !filter(real_path.as_ref(), req.head()) { match PathBufWrap::parse_path(req.match_info().path(), this.hidden_files) {
if let Some(ref default) = self.default { Ok(item) => item,
return Box::pin(default.call(req)); Err(e) => return Ok(req.error_response(e)),
} else { };
return Box::pin(ok(
req.into_response(actix_web::HttpResponse::NotFound().finish()) if let Some(filter) = &this.path_filter {
if !filter(real_path.as_ref(), req.head()) {
if let Some(ref default) = this.default {
return default.call(req).await;
} else {
return Ok(
req.into_response(actix_web::HttpResponse::NotFound().finish())
);
}
}
}
// full file path
let path = this.directory.join(&real_path);
if let Err(err) = path.canonicalize() {
return this.handle_err(err, req).await;
}
if path.is_dir() {
if this.redirect_to_slash
&& !req.path().ends_with('/')
&& (this.index.is_some() || this.show_index)
{
let redirect_to = format!("{}/", req.path());
return Ok(req.into_response(
HttpResponse::Found()
.insert_header((header::LOCATION, redirect_to))
.finish(),
)); ));
} }
}
}
// full file path let show_index = |req: ServiceRequest| {
let path = self.directory.join(&real_path); let dir = Directory::new(this.directory.clone(), path.clone());
if let Err(err) = path.canonicalize() {
return Box::pin(self.handle_err(err, req));
}
if path.is_dir() {
if self.redirect_to_slash
&& !req.path().ends_with('/')
&& (self.index.is_some() || self.show_index)
{
let redirect_to = format!("{}/", req.path());
return Box::pin(ok(req.into_response(
HttpResponse::Found()
.insert_header((header::LOCATION, redirect_to))
.finish(),
)));
}
let serve_named_file = |req: ServiceRequest, mut named_file: NamedFile| {
if let Some(ref mime_override) = self.mime_override {
let new_disposition = mime_override(&named_file.content_type.type_());
named_file.content_disposition.disposition = new_disposition;
}
named_file.flags = self.file_flags;
let (req, _) = req.into_parts();
let res = named_file.into_response(&req);
Box::pin(ok(ServiceResponse::new(req, res)))
};
let show_index = |req: ServiceRequest| {
let dir = Directory::new(self.directory.clone(), path.clone());
let (req, _) = req.into_parts();
let x = (self.renderer)(&dir, &req);
Box::pin(match x {
Ok(resp) => ok(resp),
Err(err) => ok(ServiceResponse::from_err(err, req)),
})
};
match self.index {
Some(ref index) => match NamedFile::open(path.join(index)) {
Ok(named_file) => serve_named_file(req, named_file),
Err(_) if self.show_index => show_index(req),
Err(err) => self.handle_err(err, req),
},
None if self.show_index => show_index(req),
_ => Box::pin(ok(ServiceResponse::from_err(
FilesError::IsDirectory,
req.into_parts().0,
))),
}
} else {
match NamedFile::open(path) {
Ok(mut named_file) => {
if let Some(ref mime_override) = self.mime_override {
let new_disposition = mime_override(&named_file.content_type.type_());
named_file.content_disposition.disposition = new_disposition;
}
named_file.flags = self.file_flags;
let (req, _) = req.into_parts(); let (req, _) = req.into_parts();
let res = named_file.into_response(&req); let x = (this.renderer)(&dir, &req);
Box::pin(ok(ServiceResponse::new(req, res)))
match x {
Ok(resp) => Ok(resp),
Err(err) => Ok(ServiceResponse::from_err(err, req)),
}
};
match this.index {
Some(ref index) => {
let path = path.join(index);
match NamedFile::open_async(path).await {
Ok(named_file) => Ok(this.serve_named_file(req, named_file)),
Err(_) if this.show_index => show_index(req),
Err(err) => this.handle_err(err, req).await,
}
}
None if this.show_index => show_index(req),
_ => Ok(ServiceResponse::from_err(
FilesError::IsDirectory,
req.into_parts().0,
)),
}
} else {
match NamedFile::open_async(path).await {
Ok(mut named_file) => {
if let Some(ref mime_override) = this.mime_override {
let new_disposition =
mime_override(&named_file.content_type.type_());
named_file.content_disposition.disposition = new_disposition;
}
named_file.flags = this.file_flags;
let (req, _) = req.into_parts();
let res = named_file.into_response(&req);
Ok(ServiceResponse::new(req, res))
}
Err(err) => this.handle_err(err, req).await,
} }
Err(err) => self.handle_err(err, req),
} }
} })
} }
} }