From 266cf0622c18fe569f6af3f0de19f0a226c98bc0 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Feb 2021 14:48:27 -0800 Subject: [PATCH 1/2] reduce branch.remove deadcode for h1 dispatcher (#1962) --- actix-http/src/h1/dispatcher.rs | 202 +++++++++++++++----------------- 1 file changed, 95 insertions(+), 107 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index e65a1bd56..f8fc95921 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -734,6 +734,78 @@ where } Ok(()) } + + /// Returns true when io stream can be disconnected after write to it. + /// + /// It covers these conditions: + /// + /// - `std::io::ErrorKind::ConnectionReset` after partial read. + /// - all data read done. + #[inline(always)] + fn read_available( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result { + let this = self.project(); + + if this.flags.contains(Flags::READ_DISCONNECT) { + return Ok(false); + }; + + let mut io = Pin::new(this.io.as_mut().unwrap()); + + let mut read_some = false; + + loop { + // grow buffer if necessary. + let remaining = this.read_buf.capacity() - this.read_buf.len(); + if remaining < LW_BUFFER_SIZE { + this.read_buf.reserve(HW_BUFFER_SIZE - remaining); + } + + match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { + Poll::Pending => return Ok(false), + Poll::Ready(Ok(n)) => { + if n == 0 { + return Ok(true); + } else { + // Return early when read buf exceed decoder's max buffer size. + if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE { + // at this point it's not known io is still scheduled to + // be waked up. so force wake up dispatcher just in case. + // TODO: figure out the overhead. + cx.waker().wake_by_ref(); + return Ok(false); + } + + read_some = true; + } + } + Poll::Ready(Err(err)) => { + return if err.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { + Ok(true) + } else { + Err(DispatchError::Io(err)) + } + } + } + } + } + + /// call upgrade service with request. + fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future { + let this = self.project(); + let mut parts = FramedParts::with_read_buf( + this.io.take().unwrap(), + mem::take(this.codec), + mem::take(this.read_buf), + ); + parts.write_buf = mem::take(this.write_buf); + let framed = Framed::from_parts(parts); + this.flow.upgrade.as_ref().unwrap().call((req, framed)) + } } impl Future for Dispatcher @@ -778,60 +850,36 @@ where } } } else { - // read socket into a buf - let should_disconnect = - if !inner.flags.contains(Flags::READ_DISCONNECT) { - let mut inner_p = inner.as_mut().project(); - read_available( - cx, - inner_p.io.as_mut().unwrap(), - &mut inner_p.read_buf, - )? - } else { - None - }; + // read from io stream and fill read buffer. + let should_disconnect = inner.as_mut().read_available(cx)?; inner.as_mut().poll_request(cx)?; - if let Some(true) = should_disconnect { - let inner_p = inner.as_mut().project(); - inner_p.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = inner_p.payload.take() { + + // io stream should to be closed. + if should_disconnect { + let inner = inner.as_mut().project(); + inner.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = inner.payload.take() { payload.feed_eof(); } }; loop { - let inner_p = inner.as_mut().project(); - let remaining = - inner_p.write_buf.capacity() - inner_p.write_buf.len(); - if remaining < LW_BUFFER_SIZE { - inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining); - } - let result = inner.as_mut().poll_response(cx)?; - let drain = result == PollResponse::DrainWriteBuf; - - // switch to upgrade handler - if let PollResponse::Upgrade(req) = result { - let inner_p = inner.as_mut().project(); - let mut parts = FramedParts::with_read_buf( - inner_p.io.take().unwrap(), - mem::take(inner_p.codec), - mem::take(inner_p.read_buf), - ); - parts.write_buf = mem::take(inner_p.write_buf); - let framed = Framed::from_parts(parts); - let upgrade = inner_p - .flow - .upgrade - .as_ref() - .unwrap() - .call((req, framed)); - self.as_mut() - .project() - .inner - .set(DispatcherState::Upgrade(upgrade)); - return self.poll(cx); - } + // poll_response and populate write buffer. + // drain indicate if write buffer should be emptied before next run. + let drain = match inner.as_mut().poll_response(cx)? { + PollResponse::DrainWriteBuf => true, + PollResponse::DoNothing => false, + // upgrade request and goes Upgrade variant of DispatcherState. + PollResponse::Upgrade(req) => { + let upgrade = inner.upgrade(req); + self.as_mut() + .project() + .inner + .set(DispatcherState::Upgrade(upgrade)); + return self.poll(cx); + } + }; // we didn't get WouldBlock from write operation, // so data get written to kernel completely (macOS) @@ -888,66 +936,6 @@ where } } -/// Returns either: -/// - `Ok(Some(true))` - data was read and done reading all data. -/// - `Ok(Some(false))` - data was read but there should be more to read. -/// - `Ok(None)` - no data was read but there should be more to read later. -/// - Unhandled Errors -fn read_available( - cx: &mut Context<'_>, - io: &mut T, - buf: &mut BytesMut, -) -> Result, io::Error> -where - T: AsyncRead + Unpin, -{ - let mut read_some = false; - - loop { - // reserve capacity for buffer - let remaining = buf.capacity() - buf.len(); - if remaining < LW_BUFFER_SIZE { - buf.reserve(HW_BUFFER_SIZE - remaining); - } - - match actix_codec::poll_read_buf(Pin::new(io), cx, buf) { - Poll::Pending => { - return if read_some { Ok(Some(false)) } else { Ok(None) }; - } - Poll::Ready(Ok(n)) => { - if n == 0 { - return Ok(Some(true)); - } else { - // If buf is full return but do not disconnect since - // there is more reading to be done - if buf.len() >= super::decoder::MAX_BUFFER_SIZE { - // at this point it's not known io is still scheduled to - // be waked up. so force wake up dispatcher just in case. - // TODO: figure out the overhead. - cx.waker().wake_by_ref(); - return Ok(Some(false)); - } - - read_some = true; - } - } - Poll::Ready(Err(err)) => { - return if err.kind() == io::ErrorKind::WouldBlock { - if read_some { - Ok(Some(false)) - } else { - Ok(None) - } - } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { - Ok(Some(true)) - } else { - Err(err) - } - } - } - } -} - #[cfg(test)] mod tests { use std::str; From dddb623a11f23c96af45e295ac479d66374e3897 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Feb 2021 15:47:51 -0800 Subject: [PATCH 2/2] add services register for tuple and vec of services (#1933) --- CHANGES.md | 3 ++ src/service.rs | 142 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index e9a788950..9a3b7fe3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ ### Added * The method `Either, web::Form>::into_inner()` which returns the inner type for whichever variant was created. Also works for `Either, web::Json>`. [#1894] +* Add `services!` macro for helping register multiple services to `App`. [#1933] +* Enable registering vector of same type of `HttpServiceFactory` to `App` [#1933] ### Changed * Rework `Responder` trait to be sync and returns `Response`/`HttpResponse` directly. @@ -34,6 +36,7 @@ [#1869]: https://github.com/actix/actix-web/pull/1869 [#1905]: https://github.com/actix/actix-web/pull/1905 [#1906]: https://github.com/actix/actix-web/pull/1906 +[#1933]: https://github.com/actix/actix-web/pull/1933 [#1957]: https://github.com/actix/actix-web/pull/1957 diff --git a/src/service.rs b/src/service.rs index 86949dca6..db0ec602a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -22,6 +22,13 @@ pub trait HttpServiceFactory { fn register(self, config: &mut AppService); } +impl HttpServiceFactory for Vec { + fn register(self, config: &mut AppService) { + self.into_iter() + .for_each(|factory| factory.register(config)); + } +} + pub(crate) trait AppServiceFactory { fn register(&mut self, config: &mut AppService); } @@ -532,6 +539,65 @@ where } } +/// Macro helping register different types of services at the sametime. +/// +/// The service type must be implementing [`HttpServiceFactory`](self::HttpServiceFactory) trait. +/// +/// The max number of services can be grouped together is 12. +/// +/// # Examples +/// +/// ``` +/// use actix_web::{services, web, App}; +/// +/// let services = services![ +/// web::resource("/test2").to(|| async { "test2" }), +/// web::scope("/test3").route("/", web::get().to(|| async { "test3" })) +/// ]; +/// +/// let app = App::new().service(services); +/// +/// // services macro just convert multiple services to a tuple. +/// // below would also work without importing the macro. +/// let app = App::new().service(( +/// web::resource("/test2").to(|| async { "test2" }), +/// web::scope("/test3").route("/", web::get().to(|| async { "test3" })) +/// )); +/// ``` +#[macro_export] +macro_rules! services { + ($($x:expr),+ $(,)?) => { + ($($x,)+) + } +} + +/// HttpServiceFactory trait impl for tuples +macro_rules! service_tuple ({ $(($n:tt, $T:ident)),+} => { + impl<$($T: HttpServiceFactory),+> HttpServiceFactory for ($($T,)+) { + fn register(self, config: &mut AppService) { + $(self.$n.register(config);)+ + } + } +}); + +#[rustfmt::skip] +mod m { + use super::*; + + service_tuple!((0, A)); + service_tuple!((0, A), (1, B)); + service_tuple!((0, A), (1, B), (2, C)); + service_tuple!((0, A), (1, B), (2, C), (3, D)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J), (10, K)); + service_tuple!((0, A), (1, B), (2, C), (3, D), (4, E), (5, F), (6, G), (7, H), (8, I), (9, J), (10, K), (11, L)); +} + #[cfg(test)] mod tests { use super::*; @@ -606,4 +672,80 @@ mod tests { assert!(s.contains("ServiceResponse")); assert!(s.contains("x-test")); } + + #[actix_rt::test] + async fn test_services_macro() { + let scoped = services![ + web::service("/scoped_test1").name("scoped_test1").finish( + |req: ServiceRequest| async { + Ok(req.into_response(HttpResponse::Ok().finish())) + } + ), + web::resource("/scoped_test2").to(|| async { "test2" }), + ]; + + let services = services![ + web::service("/test1") + .name("test") + .finish(|req: ServiceRequest| async { + Ok(req.into_response(HttpResponse::Ok().finish())) + }), + web::resource("/test2").to(|| async { "test2" }), + web::scope("/test3").service(scoped) + ]; + + let srv = init_service(App::new().service(services)).await; + + let req = TestRequest::with_uri("/test1").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test2").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test3/scoped_test1").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test3/scoped_test2").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } + + #[actix_rt::test] + async fn test_services_vec() { + let services = vec![ + web::resource("/test1").to(|| async { "test1" }), + web::resource("/test2").to(|| async { "test2" }), + ]; + + let scoped = vec![ + web::resource("/scoped_test1").to(|| async { "test1" }), + web::resource("/scoped_test2").to(|| async { "test2" }), + ]; + + let srv = init_service( + App::new() + .service(services) + .service(web::scope("/test3").service(scoped)), + ) + .await; + + let req = TestRequest::with_uri("/test1").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test2").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test3/scoped_test1").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + + let req = TestRequest::with_uri("/test3/scoped_test2").to_request(); + let resp = srv.call(req).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } }