diff --git a/Cargo.toml b/Cargo.toml index dc7e9af3f..7273f2901 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,12 +67,15 @@ rustls = ["actix-http/rustls", "actix-tls/accept", "actix-tls/rustls"] # Don't rely on these whatsoever. They may disappear at anytime. __compress = [] +# io-uring feature only avaiable for linux OS. +io-uring = ["actix-rt/io-uring", "actix-server/io-uring"] + [dependencies] actix-codec = "0.4.0" actix-macros = "0.2.1" actix-router = "0.5.0-beta.2" -actix-rt = "2.2" -actix-server = "2.0.0-beta.3" +actix-rt = "2.3" +actix-server = "2.0.0-beta.6" actix-service = "2.0.0" actix-utils = "3.0.0" actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true } diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index eccf49a77..b8f6ddd47 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -14,6 +14,9 @@ edition = "2018" name = "actix_files" path = "src/lib.rs" +[features] +io-uring = ["actix-web/io-uring", "tokio", "tokio-uring"] + [dependencies] actix-web = { version = "4.0.0-beta.9", default-features = false } actix-http = "3.0.0-beta.10" @@ -31,6 +34,9 @@ mime = "0.3" mime_guess = "2.0.1" percent-encoding = "2.1" +tokio = { version = "1", optional = true } +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] actix-rt = "2.2" actix-web = "4.0.0-beta.9" diff --git a/actix-files/src/chunked.rs b/actix-files/src/chunked.rs index f639848c9..abaebe4b5 100644 --- a/actix-files/src/chunked.rs +++ b/actix-files/src/chunked.rs @@ -1,19 +1,30 @@ use std::{ - cmp, fmt, - fs::File, - future::Future, - io::{self, Read, Seek}, + cmp, fmt, io, pin::Pin, task::{Context, Poll}, }; -use actix_web::{ - error::{BlockingError, Error}, - rt::task::{spawn_blocking, JoinHandle}, -}; +use actix_web::error::Error; use bytes::Bytes; use futures_core::{ready, Stream}; +use super::named::File; + +#[cfg(not(feature = "io-uring"))] +use { + actix_web::{ + error::BlockingError, + rt::task::{spawn_blocking, JoinHandle}, + }, + std::{ + future::Future, + io::{Read, Seek}, + }, +}; + +#[cfg(feature = "io-uring")] +use futures_core::future::LocalBoxFuture; + #[doc(hidden)] /// A helper created from a `std::fs::File` which reads the file /// chunk-by-chunk on a `ThreadPool`. @@ -26,7 +37,10 @@ pub struct ChunkedReadFile { enum ChunkedReadFileState { File(Option), + #[cfg(not(feature = "io-uring"))] Future(JoinHandle>), + #[cfg(feature = "io-uring")] + Future(LocalBoxFuture<'static, Result<(File, Bytes), io::Error>>), } impl ChunkedReadFile { @@ -60,32 +74,72 @@ impl Stream for ChunkedReadFile { if size == counter { Poll::Ready(None) } else { - let mut file = file - .take() - .expect("ChunkedReadFile polled after completion"); + let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - let fut = spawn_blocking(move || { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; + let fut = { + #[cfg(not(feature = "io-uring"))] + { + let mut file = file + .take() + .expect("ChunkedReadFile polled after completion"); - let mut buf = Vec::with_capacity(max_bytes); - file.seek(io::SeekFrom::Start(offset))?; + spawn_blocking(move || { + let mut buf = Vec::with_capacity(max_bytes); - let n_bytes = - file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; + file.seek(io::SeekFrom::Start(offset))?; - if n_bytes == 0 { - return Err(io::ErrorKind::UnexpectedEof.into()); + let n_bytes = file + .by_ref() + .take(max_bytes as u64) + .read_to_end(&mut buf)?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok((file, Bytes::from(buf))) + }) } + #[cfg(feature = "io-uring")] + { + let file = file + .take() + .expect("ChunkedReadFile polled after completion"); + Box::pin(async move { + let buf = Vec::with_capacity(max_bytes); + + let (res, mut buf) = file.read_at(buf, offset).await; + let n_bytes = res?; + + if n_bytes == 0 { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + let _ = buf.split_off(n_bytes); + + Ok((file, Bytes::from(buf))) + }) + } + }; - Ok((file, Bytes::from(buf))) - }); this.state = ChunkedReadFileState::Future(fut); + self.poll_next(cx) } } ChunkedReadFileState::Future(ref mut fut) => { - let (file, bytes) = - ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; + let (file, bytes) = { + #[cfg(not(feature = "io-uring"))] + { + ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)?? + } + + #[cfg(feature = "io-uring")] + { + ready!(fut.as_mut().poll(cx))? + } + }; + this.state = ChunkedReadFileState::File(Some(file)); this.offset += bytes.len() as u64; diff --git a/actix-files/src/files.rs b/actix-files/src/files.rs index 68879822a..edf047504 100644 --- a/actix-files/src/files.rs +++ b/actix-files/src/files.rs @@ -6,7 +6,6 @@ use std::{ }; use actix_service::{boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt}; -use actix_utils::future::ok; use actix_web::{ dev::{ AppService, HttpServiceFactory, RequestHead, ResourceDef, ServiceRequest, @@ -20,8 +19,9 @@ use actix_web::{ use futures_core::future::LocalBoxFuture; use crate::{ - directory_listing, named, Directory, DirectoryRenderer, FilesService, HttpNewService, - MimeOverride, PathFilter, + directory_listing, named, + service::{FilesService, FilesServiceInner}, + Directory, DirectoryRenderer, HttpNewService, MimeOverride, PathFilter, }; /// Static files handling service. @@ -353,7 +353,7 @@ impl ServiceFactory for Files { type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - let mut srv = FilesService { + let mut inner = FilesServiceInner { directory: self.directory.clone(), index: self.index.clone(), show_index: self.show_index, @@ -372,14 +372,14 @@ impl ServiceFactory for Files { Box::pin(async { match fut.await { Ok(default) => { - srv.default = Some(default); - Ok(srv) + inner.default = Some(default); + Ok(FilesService(Rc::new(inner))) } Err(_) => Err(()), } }) } else { - Box::pin(ok(srv)) + Box::pin(async move { Ok(FilesService(Rc::new(inner))) }) } } } diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 1eb091aaf..847d5fbe5 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -213,7 +213,8 @@ mod tests { #[actix_rt::test] async fn test_named_file_non_ascii_file_name() { let mut file = - NamedFile::from_file(File::open("Cargo.toml").unwrap(), "貨物.toml").unwrap(); + NamedFile::from_file(crate::named::File::open("Cargo.toml").unwrap(), "貨物.toml") + .unwrap(); { file.file(); let _f: &File = &file; diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index 241e78cf0..108af1c08 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -1,11 +1,16 @@ +use std::{ + fmt, + fs::Metadata, + io, + ops::{Deref, DerefMut}, + path::{Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, +}; + use actix_service::{Service, ServiceFactory}; -use actix_utils::future::{ok, ready, Ready}; +use actix_utils::future::{ok, Ready}; use actix_web::dev::{AppService, HttpServiceFactory, ResourceDef}; -use std::fs::{File, Metadata}; -use std::io; -use std::ops::{Deref, DerefMut}; -use std::path::{Path, PathBuf}; -use std::time::{SystemTime, UNIX_EPOCH}; +use futures_core::future::LocalBoxFuture; #[cfg(unix)] use std::os::unix::fs::MetadataExt; @@ -65,7 +70,6 @@ impl Default for Flags { /// NamedFile::open("./static/index.html") /// } /// ``` -#[derive(Debug)] pub struct NamedFile { path: PathBuf, file: File, @@ -78,6 +82,37 @@ pub struct NamedFile { pub(crate) encoding: Option, } +impl fmt::Debug for NamedFile { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NamedFile") + .field("path", &self.path) + .field( + "file", + #[cfg(feature = "io-uring")] + { + &"File" + }, + #[cfg(not(feature = "io-uring"))] + { + &self.file + }, + ) + .field("modified", &self.modified) + .field("md", &self.md) + .field("flags", &self.flags) + .field("status_code", &self.status_code) + .field("content_type", &self.content_type) + .field("content_disposition", &self.content_disposition) + .field("encoding", &self.encoding) + .finish() + } +} + +#[cfg(not(feature = "io-uring"))] +pub(crate) use std::fs::File; +#[cfg(feature = "io-uring")] +pub(crate) use tokio_uring::fs::File; + impl NamedFile { /// Creates an instance from a previously opened file. /// @@ -147,7 +182,26 @@ impl NamedFile { (ct, cd) }; - let md = file.metadata()?; + let md = { + #[cfg(not(feature = "io-uring"))] + { + file.metadata()? + } + + #[cfg(feature = "io-uring")] + { + use std::os::unix::prelude::{AsRawFd, FromRawFd}; + + let fd = file.as_raw_fd(); + + // SAFETY: fd is borrowed and lives longer than the unsafe block. + unsafe { + let fs = std::fs::File::from_raw_fd(fd); + fs.metadata()? + } + } + }; + let modified = md.modified().ok(); let encoding = None; @@ -174,7 +228,43 @@ impl NamedFile { /// let file = NamedFile::open("foo.txt"); /// ``` pub fn open>(path: P) -> io::Result { - Self::from_file(File::open(&path)?, path) + #[cfg(not(feature = "io-uring"))] + { + let file = File::open(&path)?; + Self::from_file(file, path) + } + + #[cfg(feature = "io-uring")] + { + tokio::runtime::Handle::current().block_on(Self::open_async(path)) + } + } + + /// Attempts to open a file asynchronously in read-only mode. + /// + /// # Examples + /// + /// ``` + /// use actix_files::NamedFile; + /// + /// # async fn open() { + /// let file = NamedFile::open_async("foo.txt").await.unwrap(); + /// # } + /// ``` + pub async fn open_async>(path: P) -> io::Result { + let file = { + #[cfg(not(feature = "io-uring"))] + { + File::open(&path)? + } + + #[cfg(feature = "io-uring")] + { + File::open(&path).await? + } + }; + + Self::from_file(file, path) } /// Returns reference to the underlying `File` object. @@ -456,20 +546,6 @@ impl NamedFile { } } -impl Deref for NamedFile { - type Target = File; - - fn deref(&self) -> &File { - &self.file - } -} - -impl DerefMut for NamedFile { - fn deref_mut(&mut self) -> &mut File { - &mut self.file - } -} - /// Returns true if `req` has no `If-Match` header or one which matches `etag`. fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { match req.get_header::() { @@ -510,6 +586,20 @@ fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { } } +impl Deref for NamedFile { + type Target = File; + + fn deref(&self) -> &File { + &self.file + } +} + +impl DerefMut for NamedFile { + fn deref_mut(&mut self) -> &mut File { + &mut self.file + } +} + impl Responder for NamedFile { fn respond_to(self, req: &HttpRequest) -> HttpResponse { self.into_response(req) @@ -540,18 +630,19 @@ pub struct NamedFileService { impl Service for NamedFileService { type Response = ServiceResponse; type Error = Error; - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); fn call(&self, req: ServiceRequest) -> Self::Future { let (req, _) = req.into_parts(); - ready( - NamedFile::open(&self.path) - .map_err(|e| e.into()) - .map(|f| f.into_response(&req)) - .map(|res| ServiceResponse::new(req, res)), - ) + + let path = self.path.clone(); + Box::pin(async move { + let file = NamedFile::open_async(path).await?; + let res = file.into_response(&req); + Ok(ServiceResponse::new(req, res)) + }) } } diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index 09122c63e..d4a80fdad 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -1,7 +1,6 @@ -use std::{fmt, io, path::PathBuf, rc::Rc}; +use std::{fmt, io, ops::Deref, path::PathBuf, rc::Rc}; use actix_service::Service; -use actix_utils::future::ok; use actix_web::{ dev::{ServiceRequest, ServiceResponse}, error::Error, @@ -17,7 +16,18 @@ use crate::{ }; /// Assembled file serving service. -pub struct FilesService { +#[derive(Clone)] +pub struct FilesService(pub Rc); + +impl Deref for FilesService { + type Target = FilesServiceInner; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +pub struct FilesServiceInner { pub(crate) directory: PathBuf, pub(crate) index: Option, pub(crate) show_index: bool, @@ -31,20 +41,42 @@ pub struct FilesService { pub(crate) hidden_files: bool, } +impl fmt::Debug for FilesServiceInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("FilesServiceInner") + } +} + impl FilesService { - fn handle_err( + async fn handle_err( &self, err: io::Error, req: ServiceRequest, - ) -> LocalBoxFuture<'static, Result> { + ) -> Result { log::debug!("error handling {}: {}", req.path(), err); if let Some(ref default) = self.default { - Box::pin(default.call(req)) + default.call(req).await } else { - Box::pin(ok(req.error_response(err))) + Ok(req.error_response(err)) } } + + fn serve_named_file( + &self, + req: ServiceRequest, + mut named_file: NamedFile, + ) -> ServiceResponse { + if let Some(ref mime_override) = self.mime_override { + let new_disposition = mime_override(&named_file.content_type.type_()); + named_file.content_disposition.disposition = new_disposition; + } + named_file.flags = self.file_flags; + + let (req, _) = req.into_parts(); + let res = named_file.into_response(&req); + ServiceResponse::new(req, res) + } } impl fmt::Debug for FilesService { @@ -69,103 +101,99 @@ impl Service for FilesService { matches!(*req.method(), Method::HEAD | Method::GET) }; - if !is_method_valid { - return Box::pin(ok(req.into_response( - actix_web::HttpResponse::MethodNotAllowed() - .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) - .body("Request did not meet this resource's requirements."), - ))); - } + let this = self.clone(); - let real_path = - match PathBufWrap::parse_path(req.match_info().path(), self.hidden_files) { - Ok(item) => item, - Err(e) => return Box::pin(ok(req.error_response(e))), - }; + Box::pin(async move { + if !is_method_valid { + return Ok(req.into_response( + actix_web::HttpResponse::MethodNotAllowed() + .insert_header(header::ContentType(mime::TEXT_PLAIN_UTF_8)) + .body("Request did not meet this resource's requirements."), + )); + } - if let Some(filter) = &self.path_filter { - if !filter(real_path.as_ref(), req.head()) { - if let Some(ref default) = self.default { - return Box::pin(default.call(req)); - } else { - return Box::pin(ok( - req.into_response(actix_web::HttpResponse::NotFound().finish()) + let real_path = + match PathBufWrap::parse_path(req.match_info().path(), this.hidden_files) { + Ok(item) => item, + Err(e) => return Ok(req.error_response(e)), + }; + + if let Some(filter) = &this.path_filter { + if !filter(real_path.as_ref(), req.head()) { + if let Some(ref default) = this.default { + return default.call(req).await; + } else { + return Ok( + req.into_response(actix_web::HttpResponse::NotFound().finish()) + ); + } + } + } + + // full file path + let path = this.directory.join(&real_path); + if let Err(err) = path.canonicalize() { + return this.handle_err(err, req).await; + } + + if path.is_dir() { + if this.redirect_to_slash + && !req.path().ends_with('/') + && (this.index.is_some() || this.show_index) + { + let redirect_to = format!("{}/", req.path()); + + return Ok(req.into_response( + HttpResponse::Found() + .insert_header((header::LOCATION, redirect_to)) + .finish(), )); } - } - } - // full file path - let path = self.directory.join(&real_path); - if let Err(err) = path.canonicalize() { - return Box::pin(self.handle_err(err, req)); - } - - if path.is_dir() { - if self.redirect_to_slash - && !req.path().ends_with('/') - && (self.index.is_some() || self.show_index) - { - let redirect_to = format!("{}/", req.path()); - - return Box::pin(ok(req.into_response( - HttpResponse::Found() - .insert_header((header::LOCATION, redirect_to)) - .finish(), - ))); - } - - let serve_named_file = |req: ServiceRequest, mut named_file: NamedFile| { - if let Some(ref mime_override) = self.mime_override { - let new_disposition = mime_override(&named_file.content_type.type_()); - named_file.content_disposition.disposition = new_disposition; - } - named_file.flags = self.file_flags; - - let (req, _) = req.into_parts(); - let res = named_file.into_response(&req); - Box::pin(ok(ServiceResponse::new(req, res))) - }; - - let show_index = |req: ServiceRequest| { - let dir = Directory::new(self.directory.clone(), path.clone()); - - let (req, _) = req.into_parts(); - let x = (self.renderer)(&dir, &req); - - Box::pin(match x { - Ok(resp) => ok(resp), - Err(err) => ok(ServiceResponse::from_err(err, req)), - }) - }; - - match self.index { - Some(ref index) => match NamedFile::open(path.join(index)) { - Ok(named_file) => serve_named_file(req, named_file), - Err(_) if self.show_index => show_index(req), - Err(err) => self.handle_err(err, req), - }, - None if self.show_index => show_index(req), - _ => Box::pin(ok(ServiceResponse::from_err( - FilesError::IsDirectory, - req.into_parts().0, - ))), - } - } else { - match NamedFile::open(path) { - Ok(mut named_file) => { - if let Some(ref mime_override) = self.mime_override { - let new_disposition = mime_override(&named_file.content_type.type_()); - named_file.content_disposition.disposition = new_disposition; - } - named_file.flags = self.file_flags; + let show_index = |req: ServiceRequest| { + let dir = Directory::new(this.directory.clone(), path.clone()); let (req, _) = req.into_parts(); - let res = named_file.into_response(&req); - Box::pin(ok(ServiceResponse::new(req, res))) + let x = (this.renderer)(&dir, &req); + + match x { + Ok(resp) => Ok(resp), + Err(err) => Ok(ServiceResponse::from_err(err, req)), + } + }; + + match this.index { + Some(ref index) => { + let path = path.join(index); + match NamedFile::open_async(path).await { + Ok(named_file) => Ok(this.serve_named_file(req, named_file)), + Err(_) if this.show_index => show_index(req), + Err(err) => this.handle_err(err, req).await, + } + } + None if this.show_index => show_index(req), + _ => Ok(ServiceResponse::from_err( + FilesError::IsDirectory, + req.into_parts().0, + )), + } + } else { + match NamedFile::open_async(path).await { + Ok(mut named_file) => { + if let Some(ref mime_override) = this.mime_override { + let new_disposition = + mime_override(&named_file.content_type.type_()); + named_file.content_disposition.disposition = new_disposition; + } + named_file.flags = this.file_flags; + + let (req, _) = req.into_parts(); + let res = named_file.into_response(&req); + Ok(ServiceResponse::new(req, res)) + } + Err(err) => this.handle_err(err, req).await, } - Err(err) => self.handle_err(err, req), } - } + }) } }