From 4598a7c0ccd594deb483c743a031eb3d4acad72e Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Tue, 25 May 2021 00:09:38 +0900 Subject: [PATCH 1/7] Only run UI tests on MSRV (#2232) --- actix-web-codegen/tests/trybuild.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/actix-web-codegen/tests/trybuild.rs b/actix-web-codegen/tests/trybuild.rs index afbe7b728..12e848cf3 100644 --- a/actix-web-codegen/tests/trybuild.rs +++ b/actix-web-codegen/tests/trybuild.rs @@ -1,3 +1,4 @@ +#[rustversion::stable(1.46)] // MSRV #[test] fn compile_macros() { let t = trybuild::TestCases::new(); @@ -12,11 +13,3 @@ fn compile_macros() { t.pass("tests/trybuild/docstring-ok.rs"); } - -// #[rustversion::not(nightly)] -// fn skip_on_nightly(t: &trybuild::TestCases) { -// -// } - -// #[rustversion::nightly] -// fn skip_on_nightly(_t: &trybuild::TestCases) {} From bb7d33c9d41acef3b8d57dafc167c4077875374c Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 25 May 2021 10:21:20 +0800 Subject: [PATCH 2/7] refactor h2 dispatcher to async/await.reduce duplicate code (#2211) --- actix-http/src/h2/dispatcher.rs | 520 ++++++++++++-------------------- 1 file changed, 195 insertions(+), 325 deletions(-) diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 5be172aaf..baff20e51 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -1,20 +1,26 @@ -use std::task::{Context, Poll}; -use std::{cmp, future::Future, marker::PhantomData, net, pin::Pin, rc::Rc}; +use std::{ + cmp, + future::Future, + marker::PhantomData, + net, + pin::Pin, + rc::Rc, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; +use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; use futures_core::ready; -use h2::{ - server::{Connection, SendResponse}, - SendStream, -}; +use h2::server::{Connection, SendResponse}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; +use pin_project_lite::pin_project; -use crate::body::{Body, BodySize, MessageBody}; +use crate::body::{BodySize, MessageBody}; use crate::config::ServiceConfig; -use crate::error::{DispatchError, Error}; +use crate::error::Error; use crate::message::ResponseHead; use crate::payload::Payload; use crate::request::Request; @@ -24,30 +30,19 @@ use crate::OnConnectData; const CHUNK_SIZE: usize = 16_384; -/// Dispatcher for HTTP/2 protocol. -#[pin_project::pin_project] -pub struct Dispatcher -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - B: MessageBody, -{ - flow: Rc>, - connection: Connection, - on_connect_data: OnConnectData, - config: ServiceConfig, - peer_addr: Option, - _phantom: PhantomData, +pin_project! { + /// Dispatcher for HTTP/2 protocol. + pub struct Dispatcher { + flow: Rc>, + connection: Connection, + on_connect_data: OnConnectData, + config: ServiceConfig, + peer_addr: Option, + _phantom: PhantomData, + } } -impl Dispatcher -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - S::Error: Into, - S::Response: Into>, - B: MessageBody, -{ +impl Dispatcher { pub(crate) fn new( flow: Rc>, connection: Connection, @@ -55,7 +50,7 @@ where config: ServiceConfig, peer_addr: Option, ) -> Self { - Dispatcher { + Self { flow, config, peer_addr, @@ -71,331 +66,206 @@ where T: AsyncRead + AsyncWrite + Unpin, S: Service, - S::Error: Into + 'static, + S::Error: Into, S::Future: 'static, - S::Response: Into> + 'static, + S::Response: Into>, - B: MessageBody + 'static, + B: MessageBody, B::Error: Into, { - type Output = Result<(), DispatchError>; + type Output = Result<(), crate::error::DispatchError>; #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - loop { - match ready!(Pin::new(&mut this.connection).poll_accept(cx)) { - None => return Poll::Ready(Ok(())), + while let Some((req, tx)) = + ready!(Pin::new(&mut this.connection).poll_accept(cx)?) + { + let (parts, body) = req.into_parts(); + let pl = crate::h2::Payload::new(body); + let pl = Payload::::H2(pl); + let mut req = Request::with_payload(pl); - Some(Err(err)) => return Poll::Ready(Err(err.into())), + let head = req.head_mut(); + head.uri = parts.uri; + head.method = parts.method; + head.version = parts.version; + head.headers = parts.headers.into(); + head.peer_addr = this.peer_addr; - Some(Ok((req, res))) => { - let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::::H2(pl); - let mut req = Request::with_payload(pl); + // merge on_connect_ext data into request extensions + this.on_connect_data.merge_into(&mut req); - let head = req.head_mut(); - head.uri = parts.uri; - head.method = parts.method; - head.version = parts.version; - head.headers = parts.headers.into(); - head.peer_addr = this.peer_addr; + let fut = this.flow.service.call(req); + let config = this.config.clone(); - // merge on_connect_ext data into request extensions - this.on_connect_data.merge_into(&mut req); + // multiplex request handling with spawn task + actix_rt::spawn(async move { + // resolve service call and send response. + let res = match fut.await { + Ok(res) => handle_response(res.into(), tx, config).await, + Err(err) => { + let res = Response::from_error(err.into()); + handle_response(res, tx, config).await + } + }; - let svc = ServiceResponse { - state: ServiceResponseState::ServiceCall( - this.flow.service.call(req), - Some(res), - ), - config: this.config.clone(), - buffer: None, - _phantom: PhantomData, - }; + // log error. + if let Err(err) = res { + match err { + DispatchError::SendResponse(err) => { + trace!("Error sending HTTP/2 response: {:?}", err) + } + DispatchError::SendData(err) => warn!("{:?}", err), + DispatchError::ResponseBody(err) => { + error!("Response payload stream error: {:?}", err) + } + } + } + }); + } - actix_rt::spawn(svc); + Poll::Ready(Ok(())) + } +} + +enum DispatchError { + SendResponse(h2::Error), + SendData(h2::Error), + ResponseBody(Error), +} + +async fn handle_response( + res: Response, + mut tx: SendResponse, + config: ServiceConfig, +) -> Result<(), DispatchError> +where + B: MessageBody, + B::Error: Into, +{ + let (res, body) = res.replace_body(()); + + // prepare response. + let mut size = body.size(); + let res = prepare_response(config, res.head(), &mut size); + let eof = size.is_eof(); + + // send response head and return on eof. + let mut stream = tx + .send_response(res, eof) + .map_err(DispatchError::SendResponse)?; + + if eof { + return Ok(()); + } + + // poll response body and send chunks to client. + actix_rt::pin!(body); + + while let Some(res) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + let mut chunk = res.map_err(|err| DispatchError::ResponseBody(err.into()))?; + + 'send: loop { + // reserve enough space and wait for stream ready. + stream.reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); + + match poll_fn(|cx| stream.poll_capacity(cx)).await { + // No capacity left. drop body and return. + None => return Ok(()), + Some(res) => { + // Split chuck to writeable size and send to client. + let cap = res.map_err(DispatchError::SendData)?; + + let len = chunk.len(); + let bytes = chunk.split_to(cmp::min(cap, len)); + + stream + .send_data(bytes, false) + .map_err(DispatchError::SendData)?; + + // Current chuck completely sent. break send loop and poll next one. + if chunk.is_empty() { + break 'send; + } } } } } + + // response body streaming finished. send end of stream and return. + stream + .send_data(Bytes::new(), true) + .map_err(DispatchError::SendData)?; + + Ok(()) } -#[pin_project::pin_project] -struct ServiceResponse { - #[pin] - state: ServiceResponseState, +fn prepare_response( config: ServiceConfig, - buffer: Option, - _phantom: PhantomData<(I, E)>, -} + head: &ResponseHead, + size: &mut BodySize, +) -> http::Response<()> { + let mut has_date = false; + let mut skip_len = size != &BodySize::Stream; -#[pin_project::pin_project(project = ServiceResponseStateProj)] -enum ServiceResponseState { - ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, #[pin] B), - SendErrorPayload(SendStream, #[pin] Body), -} + let mut res = http::Response::new(()); + *res.status_mut() = head.status; + *res.version_mut() = http::Version::HTTP_2; -impl ServiceResponse -where - F: Future>, - E: Into, - I: Into>, + // Content length + match head.status { + http::StatusCode::NO_CONTENT + | http::StatusCode::CONTINUE + | http::StatusCode::PROCESSING => *size = BodySize::None, + http::StatusCode::SWITCHING_PROTOCOLS => { + skip_len = true; + *size = BodySize::Stream; + } + _ => {} + } - B: MessageBody, - B::Error: Into, -{ - fn prepare_response( - &self, - head: &ResponseHead, - size: &mut BodySize, - ) -> http::Response<()> { - let mut has_date = false; - let mut skip_len = size != &BodySize::Stream; + let _ = match size { + BodySize::None | BodySize::Stream => None, + BodySize::Empty => res + .headers_mut() + .insert(CONTENT_LENGTH, HeaderValue::from_static("0")), + BodySize::Sized(len) => { + let mut buf = itoa::Buffer::new(); - let mut res = http::Response::new(()); - *res.status_mut() = head.status; - *res.version_mut() = http::Version::HTTP_2; + res.headers_mut().insert( + CONTENT_LENGTH, + HeaderValue::from_str(buf.format(*len)).unwrap(), + ) + } + }; - // Content length - match head.status { - http::StatusCode::NO_CONTENT - | http::StatusCode::CONTINUE - | http::StatusCode::PROCESSING => *size = BodySize::None, - http::StatusCode::SWITCHING_PROTOCOLS => { - skip_len = true; - *size = BodySize::Stream; - } + // copy headers + for (key, value) in head.headers.iter() { + match *key { + // TODO: consider skipping other headers according to: + // https://tools.ietf.org/html/rfc7540#section-8.1.2.2 + // omit HTTP/1.x only headers + CONNECTION | TRANSFER_ENCODING => continue, + CONTENT_LENGTH if skip_len => continue, + DATE => has_date = true, _ => {} } - let _ = match size { - BodySize::None | BodySize::Stream => None, - BodySize::Empty => res - .headers_mut() - .insert(CONTENT_LENGTH, HeaderValue::from_static("0")), - BodySize::Sized(len) => { - let mut buf = itoa::Buffer::new(); - - res.headers_mut().insert( - CONTENT_LENGTH, - HeaderValue::from_str(buf.format(*len)).unwrap(), - ) - } - }; - - // copy headers - for (key, value) in head.headers.iter() { - match *key { - // TODO: consider skipping other headers according to: - // https://tools.ietf.org/html/rfc7540#section-8.1.2.2 - // omit HTTP/1.x only headers - CONNECTION | TRANSFER_ENCODING => continue, - CONTENT_LENGTH if skip_len => continue, - DATE => has_date = true, - _ => {} - } - - res.headers_mut().append(key, value.clone()); - } - - // set date header - if !has_date { - let mut bytes = BytesMut::with_capacity(29); - self.config.set_date_header(&mut bytes); - res.headers_mut().insert( - DATE, - // SAFETY: serialized date-times are known ASCII strings - unsafe { HeaderValue::from_maybe_shared_unchecked(bytes.freeze()) }, - ); - } - - res + res.headers_mut().append(key, value.clone()); } -} -impl Future for ServiceResponse -where - F: Future>, - E: Into, - I: Into>, - - B: MessageBody, - B::Error: Into, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.project() { - ServiceResponseStateProj::ServiceCall(call, send) => { - match ready!(call.poll(cx)) { - Ok(res) => { - let (res, body) = res.into().replace_body(()); - - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); - - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending HTTP/2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state - .set(ServiceResponseState::SendPayload(stream, body)); - self.poll(cx) - } - } - - Err(err) => { - let res = Response::from_error(err.into()); - let (res, body) = res.replace_body(()); - - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); - - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending HTTP/2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state.set(ServiceResponseState::SendErrorPayload( - stream, body, - )); - self.poll(cx) - } - } - } - } - - ServiceResponseStateProj::SendPayload(ref mut stream, ref mut body) => { - loop { - match this.buffer { - Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { - None => return Poll::Ready(()), - - Some(Ok(cap)) => { - let len = buffer.len(); - let bytes = buffer.split_to(cmp::min(cap, len)); - - if let Err(e) = stream.send_data(bytes, false) { - warn!("{:?}", e); - return Poll::Ready(()); - } else if !buffer.is_empty() { - let cap = cmp::min(buffer.len(), CHUNK_SIZE); - stream.reserve_capacity(cap); - } else { - this.buffer.take(); - } - } - - Some(Err(e)) => { - warn!("{:?}", e); - return Poll::Ready(()); - } - }, - - None => match ready!(body.as_mut().poll_next(cx)) { - None => { - if let Err(e) = stream.send_data(Bytes::new(), true) { - warn!("{:?}", e); - } - return Poll::Ready(()); - } - - Some(Ok(chunk)) => { - stream - .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); - *this.buffer = Some(chunk); - } - - Some(Err(err)) => { - error!( - "Response payload stream error: {:?}", - err.into() - ); - - return Poll::Ready(()); - } - }, - } - } - } - - ServiceResponseStateProj::SendErrorPayload(ref mut stream, ref mut body) => { - // TODO: de-dupe impl with SendPayload - - loop { - match this.buffer { - Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { - None => return Poll::Ready(()), - - Some(Ok(cap)) => { - let len = buffer.len(); - let bytes = buffer.split_to(cmp::min(cap, len)); - - if let Err(e) = stream.send_data(bytes, false) { - warn!("{:?}", e); - return Poll::Ready(()); - } else if !buffer.is_empty() { - let cap = cmp::min(buffer.len(), CHUNK_SIZE); - stream.reserve_capacity(cap); - } else { - this.buffer.take(); - } - } - - Some(Err(e)) => { - warn!("{:?}", e); - return Poll::Ready(()); - } - }, - - None => match ready!(body.as_mut().poll_next(cx)) { - None => { - if let Err(e) = stream.send_data(Bytes::new(), true) { - warn!("{:?}", e); - } - return Poll::Ready(()); - } - - Some(Ok(chunk)) => { - stream - .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); - *this.buffer = Some(chunk); - } - - Some(Err(err)) => { - error!("Response payload stream error: {:?}", err); - - return Poll::Ready(()); - } - }, - } - } - } - } + // set date header + if !has_date { + let mut bytes = BytesMut::with_capacity(29); + config.set_date_header(&mut bytes); + res.headers_mut().insert( + DATE, + // SAFETY: serialized date-times are known ASCII strings + unsafe { HeaderValue::from_maybe_shared_unchecked(bytes.freeze()) }, + ); } + + res } From 3847429d00e9b256dc1d1fc2cfea414e367c3410 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 26 May 2021 12:41:48 +0800 Subject: [PATCH 3/7] Response::from_error take impl Into (#2214) --- actix-http/src/h1/dispatcher.rs | 8 ++++---- actix-http/src/response.rs | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 574f0b2a9..c81d0b3bc 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -399,7 +399,7 @@ where // send service call error as response Poll::Ready(Err(err)) => { - let res = Response::from_error(err.into()); + let res = Response::from_error(err); let (res, body) = res.replace_body(()); self.as_mut().send_error_response(res, body)?; } @@ -496,7 +496,7 @@ where // send expect error as response Poll::Ready(Err(err)) => { - let res = Response::from_error(err.into()); + let res = Response::from_error(err); let (res, body) = res.replace_body(()); self.as_mut().send_error_response(res, body)?; } @@ -546,7 +546,7 @@ where // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Err(err)) => { - let res = Response::from_error(err.into()); + let res = Response::from_error(err); let (res, body) = res.replace_body(()); return self.send_error_response(res, body); } @@ -566,7 +566,7 @@ where Poll::Pending => Ok(()), // see the comment on ExpectCall state branch's Ready(Err(err)). Poll::Ready(Err(err)) => { - let res = Response::from_error(err.into()); + let res = Response::from_error(err); let (res, body) = res.replace_body(()); self.send_error_response(res, body) } diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index 419f6b88e..bcfa65732 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -69,7 +69,8 @@ impl Response { /// Constructs a new response from an error. #[inline] - pub fn from_error(error: Error) -> Response { + pub fn from_error(error: impl Into) -> Response { + let error = error.into(); let resp = error.as_response_error().error_response(); if resp.head.status == StatusCode::INTERNAL_SERVER_ERROR { debug!("Internal Server Error: {:?}", error); From e5b713b04a5bb79d5857eebc56489ac761e5eb59 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Wed, 26 May 2021 12:42:29 +0300 Subject: [PATCH 4/7] files: Fix `redirect_to_slash_directory()` when used with `show_files_listing()` (#2225) --- actix-files/CHANGES.md | 2 ++ actix-files/src/lib.rs | 15 ++++++++++++++- actix-files/src/service.rs | 23 +++++++++++++---------- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/actix-files/CHANGES.md b/actix-files/CHANGES.md index 0586a2fd3..12b70c135 100644 --- a/actix-files/CHANGES.md +++ b/actix-files/CHANGES.md @@ -3,9 +3,11 @@ ## Unreleased - 2021-xx-xx * `NamedFile` now implements `ServiceFactory` and `HttpServiceFactory` making it much more useful in routing. For example, it can be used directly as a default service. [#2135] * For symbolic links, `Content-Disposition` header no longer shows the filename of the original file. [#2156] +* `Files::redirect_to_slash_directory()` now works as expected when used with `Files::show_files_listing()`. [#2225] [#2135]: https://github.com/actix/actix-web/pull/2135 [#2156]: https://github.com/actix/actix-web/pull/2156 +[#2225]: https://github.com/actix/actix-web/pull/2225 ## 0.6.0-beta.4 - 2021-04-02 diff --git a/actix-files/src/lib.rs b/actix-files/src/lib.rs index 0384ff2b0..aa5960b5b 100644 --- a/actix-files/src/lib.rs +++ b/actix-files/src/lib.rs @@ -632,7 +632,7 @@ mod tests { #[actix_rt::test] async fn test_redirect_to_slash_directory() { - // should not redirect if no index + // should not redirect if no index and files listing is disabled let srv = test::init_service( App::new().service(Files::new("/", ".").redirect_to_slash_directory()), ) @@ -654,6 +654,19 @@ mod tests { let resp = test::call_service(&srv, req).await; assert_eq!(resp.status(), StatusCode::FOUND); + // should redirect if files listing is enabled + let srv = test::init_service( + App::new().service( + Files::new("/", ".") + .show_files_listing() + .redirect_to_slash_directory(), + ), + ) + .await; + let req = TestRequest::with_uri("/tests").to_request(); + let resp = test::call_service(&srv, req).await; + assert_eq!(resp.status(), StatusCode::FOUND); + // should not redirect if the path is wrong let req = TestRequest::with_uri("/not_existing").to_request(); let resp = test::call_service(&srv, req).await; diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index 31e1434bd..831115dc6 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -89,17 +89,20 @@ impl Service for FilesService { } if path.is_dir() { + if self.redirect_to_slash + && !req.path().ends_with('/') + && (self.index.is_some() || self.show_index) + { + let redirect_to = format!("{}/", req.path()); + + return Box::pin(ok(req.into_response( + HttpResponse::Found() + .insert_header((header::LOCATION, redirect_to)) + .finish(), + ))); + } + if let Some(ref redir_index) = self.index { - if self.redirect_to_slash && !req.path().ends_with('/') { - let redirect_to = format!("{}/", req.path()); - - return Box::pin(ok(req.into_response( - HttpResponse::Found() - .insert_header((header::LOCATION, redirect_to)) - .finish(), - ))); - } - let path = path.join(redir_index); match NamedFile::open(path) { From 136dac135245daec70b6b68a116dca9132b3680f Mon Sep 17 00:00:00 2001 From: James Wright Date: Thu, 3 Jun 2021 03:28:09 +0100 Subject: [PATCH 5/7] Additional test coverage and tidyup (middleware::normalize) (#2243) --- src/middleware/normalize.rs | 216 ++++++++++++++++++++++++------------ 1 file changed, 148 insertions(+), 68 deletions(-) diff --git a/src/middleware/normalize.rs b/src/middleware/normalize.rs index ec6c2a344..cbed78714 100644 --- a/src/middleware/normalize.rs +++ b/src/middleware/normalize.rs @@ -189,6 +189,7 @@ mod tests { use super::*; use crate::{ dev::ServiceRequest, + guard::fn_guard, test::{call_service, init_service, TestRequest}, web, App, HttpResponse, }; @@ -199,37 +200,34 @@ mod tests { App::new() .wrap(NormalizePath::default()) .service(web::resource("/").to(HttpResponse::Ok)) - .service(web::resource("/v1/something").to(HttpResponse::Ok)), + .service(web::resource("/v1/something").to(HttpResponse::Ok)) + .service( + web::resource("/v2/something") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), ) .await; - let req = TestRequest::with_uri("/").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); + let test_uris = vec![ + "/", + "/?query=test", + "///", + "/v1//something", + "/v1//something////", + "//v1/something", + "//v1//////something", + "/v2//something?query=test", + "/v2//something////?query=test", + "//v2/something?query=test", + "//v2//////something?query=test", + ]; - let req = TestRequest::with_uri("/?query=test").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); - - let req = TestRequest::with_uri("///").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); - - let req = TestRequest::with_uri("/v1//something////").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); - - let req2 = TestRequest::with_uri("//v1/something").to_request(); - let res2 = call_service(&app, req2).await; - assert!(res2.status().is_success()); - - let req3 = TestRequest::with_uri("//v1//////something").to_request(); - let res3 = call_service(&app, req3).await; - assert!(res3.status().is_success()); - - let req4 = TestRequest::with_uri("/v1//something").to_request(); - let res4 = call_service(&app, req4).await; - assert!(res4.status().is_success()); + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_request(); + let res = call_service(&app, req).await; + assert!(res.status().is_success(), "Failed uri: {}", uri); + } } #[actix_rt::test] @@ -238,38 +236,114 @@ mod tests { App::new() .wrap(NormalizePath(TrailingSlash::Trim)) .service(web::resource("/").to(HttpResponse::Ok)) - .service(web::resource("/v1/something").to(HttpResponse::Ok)), + .service(web::resource("/v1/something").to(HttpResponse::Ok)) + .service( + web::resource("/v2/something") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), ) .await; - // root paths should still work - let req = TestRequest::with_uri("/").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); + let test_uris = vec![ + "/", + "///", + "/v1/something", + "/v1/something/", + "/v1/something////", + "//v1//something", + "//v1//something//", + "/v2/something?query=test", + "/v2/something/?query=test", + "/v2/something////?query=test", + "//v2//something?query=test", + "//v2//something//?query=test", + ]; - let req = TestRequest::with_uri("/?query=test").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_request(); + let res = call_service(&app, req).await; + assert!(res.status().is_success(), "Failed uri: {}", uri); + } + } - let req = TestRequest::with_uri("///").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); + #[actix_rt::test] + async fn trim_root_trailing_slashes_with_query() { + let app = init_service( + App::new().wrap(NormalizePath(TrailingSlash::Trim)).service( + web::resource("/") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), + ) + .await; - let req = TestRequest::with_uri("/v1/something////").to_request(); - let res = call_service(&app, req).await; - assert!(res.status().is_success()); + let test_uris = vec!["/?query=test", "//?query=test", "///?query=test"]; - let req2 = TestRequest::with_uri("/v1/something/").to_request(); - let res2 = call_service(&app, req2).await; - assert!(res2.status().is_success()); + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_request(); + let res = call_service(&app, req).await; + assert!(res.status().is_success(), "Failed uri: {}", uri); + } + } - let req3 = TestRequest::with_uri("//v1//something//").to_request(); - let res3 = call_service(&app, req3).await; - assert!(res3.status().is_success()); + #[actix_rt::test] + async fn ensure_trailing_slash() { + let app = init_service( + App::new() + .wrap(NormalizePath(TrailingSlash::Always)) + .service(web::resource("/").to(HttpResponse::Ok)) + .service(web::resource("/v1/something/").to(HttpResponse::Ok)) + .service( + web::resource("/v2/something/") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), + ) + .await; - let req4 = TestRequest::with_uri("//v1//something").to_request(); - let res4 = call_service(&app, req4).await; - assert!(res4.status().is_success()); + let test_uris = vec![ + "/", + "///", + "/v1/something", + "/v1/something/", + "/v1/something////", + "//v1//something", + "//v1//something//", + "/v2/something?query=test", + "/v2/something/?query=test", + "/v2/something////?query=test", + "//v2//something?query=test", + "//v2//something//?query=test", + ]; + + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_request(); + let res = call_service(&app, req).await; + assert!(res.status().is_success(), "Failed uri: {}", uri); + } + } + + #[actix_rt::test] + async fn ensure_root_trailing_slash_with_query() { + let app = init_service( + App::new() + .wrap(NormalizePath(TrailingSlash::Always)) + .service( + web::resource("/") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), + ) + .await; + + let test_uris = vec!["/?query=test", "//?query=test", "///?query=test"]; + + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_request(); + let res = call_service(&app, req).await; + assert!(res.status().is_success(), "Failed uri: {}", uri); + } } #[actix_rt::test] @@ -279,7 +353,12 @@ mod tests { .wrap(NormalizePath(TrailingSlash::MergeOnly)) .service(web::resource("/").to(HttpResponse::Ok)) .service(web::resource("/v1/something").to(HttpResponse::Ok)) - .service(web::resource("/v1/").to(HttpResponse::Ok)), + .service(web::resource("/v1/").to(HttpResponse::Ok)) + .service( + web::resource("/v2/something") + .guard(fn_guard(|req| req.uri.query() == Some("query=test"))) + .to(HttpResponse::Ok), + ), ) .await; @@ -295,12 +374,16 @@ mod tests { ("/v1////", true), ("//v1//", true), ("///v1", false), + ("/v2/something?query=test", true), + ("/v2/something/?query=test", false), + ("/v2/something//?query=test", false), + ("//v2//something?query=test", true), ]; - for (path, success) in tests { - let req = TestRequest::with_uri(path).to_request(); + for (uri, success) in tests { + let req = TestRequest::with_uri(uri).to_request(); let res = call_service(&app, req).await; - assert_eq!(res.status().is_success(), success); + assert_eq!(res.status().is_success(), success, "Failed uri: {}", uri); } } @@ -316,21 +399,18 @@ mod tests { .await .unwrap(); - let req = TestRequest::with_uri("/v1//something////").to_srv_request(); - let res = normalize.call(req).await.unwrap(); - assert!(res.status().is_success()); + let test_uris = vec![ + "/v1//something////", + "///v1/something", + "//v1///something", + "/v1//something", + ]; - let req2 = TestRequest::with_uri("///v1/something").to_srv_request(); - let res2 = normalize.call(req2).await.unwrap(); - assert!(res2.status().is_success()); - - let req3 = TestRequest::with_uri("//v1///something").to_srv_request(); - let res3 = normalize.call(req3).await.unwrap(); - assert!(res3.status().is_success()); - - let req4 = TestRequest::with_uri("/v1//something").to_srv_request(); - let res4 = normalize.call(req4).await.unwrap(); - assert!(res4.status().is_success()); + for uri in test_uris { + let req = TestRequest::with_uri(uri).to_srv_request(); + let res = normalize.call(req).await.unwrap(); + assert!(res.status().is_success(), "Failed uri: {}", uri); + } } #[actix_rt::test] From 34792934168897d0559cadb31720ad27a5114a34 Mon Sep 17 00:00:00 2001 From: Arthur Le Moigne Date: Thu, 3 Jun 2021 22:32:52 +0200 Subject: [PATCH 6/7] Add zstd ContentEncoding support (#2244) Co-authored-by: Igor Aleksanov Co-authored-by: Rob Ede --- Cargo.toml | 1 + actix-http/CHANGES.md | 3 + actix-http/Cargo.toml | 3 +- actix-http/src/encoding/decoder.rs | 36 ++++++ actix-http/src/encoding/encoder.rs | 20 +++ .../src/header/shared/content_encoding.rs | 7 + tests/test_server.rs | 120 ++++++++++++++++++ 7 files changed, 189 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5aa302333..6893067d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ brotli2 = "0.3.2" criterion = "0.3" env_logger = "0.8" flate2 = "1.0.13" +zstd = "0.7" rand = "0.8" rcgen = "0.8" serde_derive = "1.0" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index ec7d8ee3b..fbf9f8e99 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -9,6 +9,7 @@ * Re-export `ContentEncoding` and `ConnectionType` at the crate root. [#2171] * `Response::into_body` that consumes response and returns body type. [#2201] * `impl Default` for `Response`. [#2201] +* Add zstd support for `ContentEncoding`. [#2244] ### Changed * The `MessageBody` trait now has an associated `Error` type. [#2183] @@ -35,6 +36,8 @@ [#2201]: https://github.com/actix/actix-web/pull/2201 [#2205]: https://github.com/actix/actix-web/pull/2205 [#2215]: https://github.com/actix/actix-web/pull/2215 +[#2244]: https://github.com/actix/actix-web/pull/2244 + ## 3.0.0-beta.6 - 2021-04-17 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 1f7df39a6..809be868d 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -32,7 +32,7 @@ openssl = ["actix-tls/openssl"] rustls = ["actix-tls/rustls"] # enable compression support -compress = ["flate2", "brotli2"] +compress = ["flate2", "brotli2", "zstd"] # trust-dns as client dns resolver trust-dns = ["trust-dns-resolver"] @@ -76,6 +76,7 @@ tokio = { version = "1.2", features = ["sync"] } # compression brotli2 = { version="0.3.2", optional = true } flate2 = { version = "1.0.13", optional = true } +zstd = { version = "0.7", optional = true } trust-dns-resolver = { version = "0.20.0", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index f0abae865..58981e82e 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -12,6 +12,7 @@ use brotli2::write::BrotliDecoder; use bytes::Bytes; use flate2::write::{GzDecoder, ZlibDecoder}; use futures_core::{ready, Stream}; +use zstd::stream::write::Decoder as ZstdDecoder; use crate::{ encoding::Writer, @@ -45,6 +46,12 @@ where ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new( GzDecoder::new(Writer::new()), ))), + ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new( + ZstdDecoder::new(Writer::new()).expect( + "Failed to create zstd decoder. This is a bug. \ + Please report it to the actix-web repository.", + ), + ))), _ => None, }; @@ -144,6 +151,9 @@ enum ContentDecoder { Deflate(Box>), Gzip(Box>), Br(Box>), + // We need explicit 'static lifetime here because ZstdDecoder need lifetime + // argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static` + Zstd(Box>), } impl ContentDecoder { @@ -186,6 +196,18 @@ impl ContentDecoder { } Err(e) => Err(e), }, + + ContentDecoder::Zstd(ref mut decoder) => match decoder.flush() { + Ok(_) => { + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, } } @@ -232,6 +254,20 @@ impl ContentDecoder { } Err(e) => Err(e), }, + + ContentDecoder::Zstd(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, } } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index b8bc8b68d..6adde9be2 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -15,6 +15,7 @@ use derive_more::Display; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; use pin_project::pin_project; +use zstd::stream::write::Encoder as ZstdEncoder; use crate::{ body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody}, @@ -237,6 +238,9 @@ enum ContentEncoder { Deflate(ZlibEncoder), Gzip(GzEncoder), Br(BrotliEncoder), + // We need explicit 'static lifetime here because ZstdEncoder need lifetime + // argument, and we use `spawn_blocking` in `Encoder::poll_next` that require `FnOnce() -> R + Send + 'static` + Zstd(ZstdEncoder<'static, Writer>), } impl ContentEncoder { @@ -253,6 +257,10 @@ impl ContentEncoder { ContentEncoding::Br => { Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3))) } + ContentEncoding::Zstd => { + let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; + Some(ContentEncoder::Zstd(encoder)) + } _ => None, } } @@ -263,6 +271,7 @@ impl ContentEncoder { ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(), ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(), ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(), + ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(), } } @@ -280,6 +289,10 @@ impl ContentEncoder { Ok(writer) => Ok(writer.buf.freeze()), Err(err) => Err(err), }, + ContentEncoder::Zstd(encoder) => match encoder.finish() { + Ok(writer) => Ok(writer.buf.freeze()), + Err(err) => Err(err), + }, } } @@ -306,6 +319,13 @@ impl ContentEncoder { Err(err) } }, + ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Error decoding ztsd encoding: {}", err); + Err(err) + } + }, } } } diff --git a/actix-http/src/header/shared/content_encoding.rs b/actix-http/src/header/shared/content_encoding.rs index b93d66101..b9c1d2795 100644 --- a/actix-http/src/header/shared/content_encoding.rs +++ b/actix-http/src/header/shared/content_encoding.rs @@ -23,6 +23,9 @@ pub enum ContentEncoding { /// Gzip algorithm. Gzip, + // Zstd algorithm. + Zstd, + /// Indicates the identity function (i.e. no compression, nor modification). Identity, } @@ -41,6 +44,7 @@ impl ContentEncoding { ContentEncoding::Br => "br", ContentEncoding::Gzip => "gzip", ContentEncoding::Deflate => "deflate", + ContentEncoding::Zstd => "zstd", ContentEncoding::Identity | ContentEncoding::Auto => "identity", } } @@ -53,6 +57,7 @@ impl ContentEncoding { ContentEncoding::Gzip => 1.0, ContentEncoding::Deflate => 0.9, ContentEncoding::Identity | ContentEncoding::Auto => 0.1, + ContentEncoding::Zstd => 0.0, } } } @@ -81,6 +86,8 @@ impl From<&str> for ContentEncoding { ContentEncoding::Gzip } else if val.eq_ignore_ascii_case("deflate") { ContentEncoding::Deflate + } else if val.eq_ignore_ascii_case("zstd") { + ContentEncoding::Zstd } else { ContentEncoding::default() } diff --git a/tests/test_server.rs b/tests/test_server.rs index c341aa0ce..520eb5ce2 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -29,6 +29,7 @@ use openssl::{ x509::X509, }; use rand::{distributions::Alphanumeric, Rng}; +use zstd::stream::{read::Decoder as ZstdDecoder, write::Encoder as ZstdEncoder}; use actix_web::dev::BodyEncoding; use actix_web::middleware::{Compress, NormalizePath, TrailingSlash}; @@ -476,6 +477,125 @@ async fn test_body_brotli() { assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); } +#[actix_rt::test] +async fn test_body_zstd() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new() + .wrap(Compress::new(ContentEncoding::Zstd)) + .service(web::resource("/").route(web::to(move || HttpResponse::Ok().body(STR)))) + }); + + // client request + let mut response = srv + .get("/") + .append_header((ACCEPT_ENCODING, "zstd")) + .no_decompress() + .send() + .await + .unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + + // decode + let mut e = ZstdDecoder::new(&bytes[..]).unwrap(); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_body_zstd_streaming() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new() + .wrap(Compress::new(ContentEncoding::Zstd)) + .service(web::resource("/").route(web::to(move || { + HttpResponse::Ok() + .streaming(TestBody::new(Bytes::from_static(STR.as_ref()), 24)) + }))) + }); + + // client request + let mut response = srv + .get("/") + .append_header((ACCEPT_ENCODING, "zstd")) + .no_decompress() + .send() + .await + .unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + + // decode + let mut e = ZstdDecoder::new(&bytes[..]).unwrap(); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_zstd_encoding() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new().service( + web::resource("/").route(web::to(move |body: Bytes| HttpResponse::Ok().body(body))), + ) + }); + + let mut e = ZstdEncoder::new(Vec::new(), 5).unwrap(); + e.write_all(STR.as_ref()).unwrap(); + let enc = e.finish().unwrap(); + + // client request + let request = srv + .post("/") + .append_header((CONTENT_ENCODING, "zstd")) + .send_body(enc.clone()); + let mut response = request.await.unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_zstd_encoding_large() { + let data = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(320_000) + .map(char::from) + .collect::(); + + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new().service( + web::resource("/") + .app_data(web::PayloadConfig::new(320_000)) + .route(web::to(move |body: Bytes| { + HttpResponse::Ok().streaming(TestBody::new(body, 10240)) + })), + ) + }); + + let mut e = ZstdEncoder::new(Vec::new(), 5).unwrap(); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); + + // client request + let request = srv + .post("/") + .append_header((CONTENT_ENCODING, "zstd")) + .send_body(enc.clone()); + let mut response = request.await.unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().limit(320_000).await.unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} + #[actix_rt::test] async fn test_encoding() { let srv = actix_test::start_with(actix_test::config().h1(), || { From 0bb035cfa79816d4ead678fc23e2839b3120c7e1 Mon Sep 17 00:00:00 2001 From: Yerkebulan Tulibergenov Date: Thu, 3 Jun 2021 18:54:40 -0700 Subject: [PATCH 7/7] Add information about Actix discord server (#2247) --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 96e6ecbf8..0d488adf8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ //! //! * [Website & User Guide](https://actix.rs/) //! * [Examples Repository](https://github.com/actix/examples) +//! * [Community Chat on Discord](https://discord.gg/NWpN5mmg3x) //! * [Community Chat on Gitter](https://gitter.im/actix/actix-web) //! //! To get started navigating the API docs, you may consider looking at the following pages first: