From 891f85754761572906b5b0b6db3056a93a1103f3 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 20 Apr 2019 11:18:04 -0700 Subject: [PATCH 01/14] update changes --- CHANGES.md | 2 ++ src/middleware/mod.rs | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 573d287c1..ea4cdc5dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,8 @@ * Add `.peer_addr()` #744 +* Add `NormalizePath` middleware + ### Changed * Rename `RouterConfig` to `ServiceConfig` diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 3df926251..5266f7c1a 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -6,10 +6,11 @@ pub mod cors; mod defaultheaders; pub mod errhandlers; mod logger; -pub mod normalize; +mod normalize; pub use self::defaultheaders::DefaultHeaders; pub use self::logger::Logger; +pub use self::normalize::NormalizePath; #[cfg(feature = "secure-cookies")] pub mod identity; From 7e480ab2f77659e0694f23447b2ecb7f08cea994 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 20 Apr 2019 21:16:51 -0700 Subject: [PATCH 02/14] beta.1 release --- CHANGES.md | 2 +- Cargo.toml | 4 ++-- actix-files/CHANGES.md | 4 ++++ actix-files/Cargo.toml | 6 +++--- actix-session/CHANGES.md | 3 +++ actix-session/Cargo.toml | 6 +++--- actix-web-codegen/CHANGES.md | 4 ++++ actix-web-codegen/Cargo.toml | 4 ++-- 8 files changed, 22 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ea4cdc5dc..eed851a76 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## [1.0.0-beta.1] - 2019-04-xx +## [1.0.0-beta.1] - 2019-04-20 ### Added diff --git a/Cargo.toml b/Cargo.toml index f49884c85..c4e01e9fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ actix-service = "0.3.6" actix-utils = "0.3.4" actix-router = "0.1.2" actix-rt = "0.2.2" -actix-web-codegen = "0.1.0-alpha.6" +actix-web-codegen = "0.1.0-beta.1" actix-http = { version = "0.1.1", features=["fail"] } actix-server = "0.4.3" actix-server-config = "0.1.1" @@ -100,7 +100,7 @@ rustls = { version = "^0.15", optional = true } [dev-dependencies] actix-http = { version = "0.1.1", features=["ssl", "brotli", "flate2-zlib"] } actix-http-test = { version = "0.1.0", features=["ssl"] } -actix-files = { version = "0.1.0-alpha.6" } +actix-files = { version = "0.1.0-beta.1" } rand = "0.6" env_logger = "0.6" serde_derive = "1.0" diff --git a/actix-files/CHANGES.md b/actix-files/CHANGES.md index ae22fca19..05a5e5805 100644 --- a/actix-files/CHANGES.md +++ b/actix-files/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.0-beta.1] - 2019-04-20 + +* Update actix-web to beta.1 + ## [0.1.0-alpha.6] - 2019-04-14 * Update actix-web to alpha6 diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index fa9d67393..8d242a724 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-files" -version = "0.1.0-alpha.6" +version = "0.1.0-betsa.1" authors = ["Nikolay Kim "] description = "Static files support for actix web." readme = "README.md" @@ -18,7 +18,7 @@ name = "actix_files" path = "src/lib.rs" [dependencies] -actix-web = "1.0.0-alpha.6" +actix-web = "1.0.0-beta.1" actix-service = "0.3.4" bitflags = "1" bytes = "0.4" @@ -31,4 +31,4 @@ percent-encoding = "1.0" v_htmlescape = "0.4" [dev-dependencies] -actix-web = { version = "1.0.0-alpha.6", features=["ssl"] } +actix-web = { version = "1.0.0-beta.1", features=["ssl"] } diff --git a/actix-session/CHANGES.md b/actix-session/CHANGES.md index dfa3033c9..a60d1e668 100644 --- a/actix-session/CHANGES.md +++ b/actix-session/CHANGES.md @@ -1,5 +1,8 @@ # Changes +## [0.1.0-beta.1] - 2019-04-20 + +* Update actix-web to beta.1 * `CookieSession::max_age()` accepts value in seconds diff --git a/actix-session/Cargo.toml b/actix-session/Cargo.toml index e21bb732f..83f9807f7 100644 --- a/actix-session/Cargo.toml +++ b/actix-session/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-session" -version = "0.1.0-alpha.6" +version = "0.1.0-beta.1" authors = ["Nikolay Kim "] description = "Session for actix web framework." readme = "README.md" @@ -24,12 +24,12 @@ default = ["cookie-session"] cookie-session = ["actix-web/secure-cookies"] [dependencies] -actix-web = "1.0.0-alpha.6" +actix-web = "1.0.0-beta.1" actix-service = "0.3.4" bytes = "0.4" derive_more = "0.14" futures = "0.1.25" -hashbrown = "0.2.0" +hashbrown = "0.2.2" serde = "1.0" serde_json = "1.0" time = "0.1.42" diff --git a/actix-web-codegen/CHANGES.md b/actix-web-codegen/CHANGES.md index 2ce5bd7db..3173ff1f0 100644 --- a/actix-web-codegen/CHANGES.md +++ b/actix-web-codegen/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.0-beta.1] - 2019-04-20 + +* Gen code for actix-web 1.0.0-beta.1 + ## [0.1.0-alpha.6] - 2019-04-14 * Gen code for actix-web 1.0.0-alpha.6 diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 13928f67a..4108d879a 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web-codegen" -version = "0.1.0-alpha.6" +version = "0.1.0-beta.1" description = "Actix web proc macros" readme = "README.md" authors = ["Nikolay Kim "] @@ -17,6 +17,6 @@ syn = { version = "0.15", features = ["full", "parsing"] } [dev-dependencies] actix-web = { version = "1.0.0-alpha.6" } -actix-http = { version = "0.1.0", features=["ssl"] } +actix-http = { version = "0.1.1", features=["ssl"] } actix-http-test = { version = "0.1.0", features=["ssl"] } futures = { version = "0.1" } From f0789aad055065f9e556a0ca146a301dc3d1b3bb Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 21 Apr 2019 09:03:46 -0700 Subject: [PATCH 03/14] update dep versions --- actix-files/Cargo.toml | 2 +- actix-web-actors/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index 8d242a724..5e37fc090 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-files" -version = "0.1.0-betsa.1" +version = "0.1.0-beta.1" authors = ["Nikolay Kim "] description = "Static files support for actix web." readme = "README.md" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 3e67c2313..22fdf613d 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -19,8 +19,8 @@ path = "src/lib.rs" [dependencies] actix = "0.8.0" -actix-web = "1.0.0-alpha.5" -actix-http = "0.1.0" +actix-web = "1.0.0-beta.1" +actix-http = "0.1.1" actix-codec = "0.1.2" bytes = "0.4" futures = "0.1.25" From 895e409d57178666aeb0faff97458052ab6d68ca Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 21 Apr 2019 15:41:01 -0700 Subject: [PATCH 04/14] Optimize multipart handling #634, #769 --- actix-multipart/CHANGES.md | 4 +- actix-multipart/Cargo.toml | 8 +- actix-multipart/src/server.rs | 140 ++++++++++++++++++++-------------- 3 files changed, 90 insertions(+), 62 deletions(-) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index fec3e50f8..9f8fa052a 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,7 +1,9 @@ # Changes -## [0.1.0-alpha.1] - 2019-04-xx +## [0.1.0-beta.1] - 2019-04-21 * Do not support nested multipart * Split multipart support to separate crate + +* Optimize multipart handling #634, #769 \ No newline at end of file diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 58ab45230..8e1714e7d 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-multipart" -version = "0.1.0-alpha.1" +version = "0.1.0-beta.1" authors = ["Nikolay Kim "] description = "Multipart support for actix web framework." readme = "README.md" @@ -18,8 +18,8 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = "1.0.0-alpha.6" -actix-service = "0.3.4" +actix-web = "1.0.0-beta.1" +actix-service = "0.3.6" bytes = "0.4" derive_more = "0.14" httparse = "1.3" @@ -31,4 +31,4 @@ twoway = "0.2" [dev-dependencies] actix-rt = "0.2.2" -actix-http = "0.1.0" \ No newline at end of file +actix-http = "0.1.1" \ No newline at end of file diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index c651fae56..59ed55994 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -168,7 +168,7 @@ impl InnerMultipart { match payload.readline() { None => { if payload.eof { - Err(MultipartError::Incomplete) + Ok(Some(true)) } else { Ok(None) } @@ -201,8 +201,7 @@ impl InnerMultipart { match payload.readline() { Some(chunk) => { if chunk.is_empty() { - //ValueError("Could not find starting boundary %r" - //% (self._boundary)) + return Err(MultipartError::Boundary); } if chunk.len() < boundary.len() { continue; @@ -505,47 +504,73 @@ impl InnerField { payload: &mut PayloadBuffer, boundary: &str, ) -> Poll, MultipartError> { - match payload.read_until(b"\r") { - None => { - if payload.eof { - Err(MultipartError::Incomplete) + let mut pos = 0; + + let len = payload.buf.len(); + if len == 0 { + return Ok(Async::NotReady); + } + + // check boundary + if len > 4 && payload.buf[0] == b'\r' { + let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" { + Some(4) + } else if &payload.buf[1..3] == b"--" { + Some(3) + } else { + None + }; + + if let Some(b_len) = b_len { + let b_size = boundary.len() + b_len; + if len < b_size { + return Ok(Async::NotReady); } else { - Ok(Async::NotReady) + if &payload.buf[b_len..b_size] == boundary.as_bytes() { + // found boundary + payload.buf.split_to(b_size); + return Ok(Async::Ready(None)); + } else { + pos = b_size; + } } } - Some(mut chunk) => { - if chunk.len() == 1 { - payload.unprocessed(chunk); - match payload.read_exact(boundary.len() + 4) { - None => { - if payload.eof { - Err(MultipartError::Incomplete) - } else { - Ok(Async::NotReady) - } - } - Some(mut chunk) => { - if &chunk[..2] == b"\r\n" - && &chunk[2..4] == b"--" - && &chunk[4..] == boundary.as_bytes() - { - payload.unprocessed(chunk); - Ok(Async::Ready(None)) - } else { - // \r might be part of data stream - let ch = chunk.split_to(1); - payload.unprocessed(chunk); - Ok(Async::Ready(Some(ch))) - } - } + } + + loop { + return if let Some(idx) = twoway::find_bytes(&payload.buf[pos..], b"\r") { + let cur = pos + idx; + + // check if we have enough data for boundary detection + if cur + 4 > len { + if cur > 0 { + Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + } else { + Ok(Async::NotReady) } } else { - let to = chunk.len() - 1; - let ch = chunk.split_to(to); - payload.unprocessed(chunk); - Ok(Async::Ready(Some(ch))) + // check boundary + if (&payload.buf[cur..cur + 2] == b"\r\n" + && &payload.buf[cur + 2..cur + 4] == b"--") + || (&payload.buf[cur..cur + 1] == b"\r" + && &payload.buf[cur + 1..cur + 3] == b"--") + { + if cur != 0 { + // return buffer + Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + } else { + pos = cur + 1; + continue; + } + } else { + // not boundary + pos = cur + 1; + continue; + } } - } + } else { + return Ok(Async::Ready(Some(payload.buf.take().freeze()))); + }; } } @@ -555,26 +580,27 @@ impl InnerField { } let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { - let res = if let Some(ref mut len) = self.length { - InnerField::read_len(payload, len)? - } else { - InnerField::read_stream(payload, &self.boundary)? - }; + if !self.eof { + let res = if let Some(ref mut len) = self.length { + InnerField::read_len(payload, len)? + } else { + InnerField::read_stream(payload, &self.boundary)? + }; - match res { - Async::NotReady => Async::NotReady, - Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), - Async::Ready(None) => { - self.eof = true; - match payload.readline() { - None => Async::Ready(None), - Some(line) => { - if line.as_ref() != b"\r\n" { - log::warn!("multipart field did not read all the data or it is malformed"); - } - Async::Ready(None) - } + match res { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(Some(bytes)) => return Ok(Async::Ready(Some(bytes))), + Async::Ready(None) => self.eof = true, + } + } + + match payload.readline() { + None => Async::Ready(None), + Some(line) => { + if line.as_ref() != b"\r\n" { + log::warn!("multipart field did not read all the data or it is malformed"); } + Async::Ready(None) } } } else { @@ -704,7 +730,7 @@ impl PayloadBuffer { } /// Read exact number of bytes - #[inline] + #[cfg(test)] fn read_exact(&mut self, size: usize) -> Option { if size <= self.buf.len() { Some(self.buf.split_to(size).freeze()) From d00c9bb8446ee82a9fbad884c202cf9505c6ccc5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 21 Apr 2019 16:14:09 -0700 Subject: [PATCH 05/14] do not consume boundary --- actix-multipart/src/server.rs | 73 ++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 59ed55994..82b0b5ace 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -528,7 +528,6 @@ impl InnerField { } else { if &payload.buf[b_len..b_size] == boundary.as_bytes() { // found boundary - payload.buf.split_to(b_size); return Ok(Async::Ready(None)); } else { pos = b_size; @@ -901,6 +900,78 @@ mod tests { }); } + #[test] + fn test_stream() { + run_on(|| { + let (sender, payload) = create_stream(); + + let bytes = Bytes::from( + "testasdadsad\r\n\ + --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\ + Content-Type: text/plain; charset=utf-8\r\n\r\n\ + test\r\n\ + --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ + Content-Type: text/plain; charset=utf-8\r\n\r\n\ + data\r\n\ + --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n", + ); + sender.unbounded_send(Ok(bytes)).unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static( + "multipart/mixed; boundary=\"abbc761f78ff4d7cb7573b5a23f96ef0\"", + ), + ); + + let mut multipart = Multipart::new(&headers, payload); + match multipart.poll().unwrap() { + Async::Ready(Some(mut field)) => { + let cd = field.content_disposition().unwrap(); + assert_eq!(cd.disposition, DispositionType::FormData); + assert_eq!(cd.parameters[0], DispositionParam::Name("file".into())); + + assert_eq!(field.content_type().type_(), mime::TEXT); + assert_eq!(field.content_type().subtype(), mime::PLAIN); + + match field.poll().unwrap() { + Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), + _ => unreachable!(), + } + match field.poll().unwrap() { + Async::Ready(None) => (), + _ => unreachable!(), + } + } + _ => unreachable!(), + } + + match multipart.poll().unwrap() { + Async::Ready(Some(mut field)) => { + assert_eq!(field.content_type().type_(), mime::TEXT); + assert_eq!(field.content_type().subtype(), mime::PLAIN); + + match field.poll() { + Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), + _ => unreachable!(), + } + match field.poll() { + Ok(Async::Ready(None)) => (), + _ => unreachable!(), + } + } + _ => unreachable!(), + } + + match multipart.poll().unwrap() { + Async::Ready(None) => (), + _ => unreachable!(), + } + }); + } + #[test] fn test_basic() { run_on(|| { From 48bee5508789ad92a78162b8e1691c1ecd3de616 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 22 Apr 2019 14:22:08 -0700 Subject: [PATCH 06/14] .to_async() handler can return Responder type #792 --- CHANGES.md | 9 +++++++++ src/handler.rs | 50 ++++++++++++++++++++++++++++++++++++------------- src/resource.rs | 2 +- src/route.rs | 37 ++++++++++++++++++++++++++++-------- src/test.rs | 40 +++++++++++++++++++++++++++++++++++++++ src/web.rs | 4 ++-- 6 files changed, 118 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index eed851a76..6f0a2d422 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,14 @@ # Changes +### Added + +* Add helper functions for reading response body `test::read_body()` + +### Changed + +* `.to_async()` handler can return `Responder` type #792 + + ## [1.0.0-beta.1] - 2019-04-20 ### Added diff --git a/src/handler.rs b/src/handler.rs index f328cd25d..850c0c92c 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -124,7 +124,7 @@ where pub trait AsyncFactory: Clone + 'static where R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { fn call(&self, param: T) -> R; @@ -134,7 +134,7 @@ impl AsyncFactory<(), R> for F where F: Fn() -> R + Clone + 'static, R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { fn call(&self, _: ()) -> R { @@ -147,7 +147,7 @@ pub struct AsyncHandler where F: AsyncFactory, R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { hnd: F, @@ -158,7 +158,7 @@ impl AsyncHandler where F: AsyncFactory, R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { pub fn new(hnd: F) -> Self { @@ -173,7 +173,7 @@ impl Clone for AsyncHandler where F: AsyncFactory, R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { fn clone(&self) -> Self { @@ -188,7 +188,7 @@ impl Service for AsyncHandler where F: AsyncFactory, R: IntoFuture, - R::Item: Into, + R::Item: Responder, R::Error: Into, { type Request = (T, HttpRequest); @@ -203,32 +203,56 @@ where fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future { AsyncHandlerServiceResponse { fut: self.hnd.call(param).into_future(), + fut2: None, req: Some(req), } } } #[doc(hidden)] -pub struct AsyncHandlerServiceResponse { +pub struct AsyncHandlerServiceResponse +where + T: Future, + T::Item: Responder, +{ fut: T, + fut2: Option<<::Future as IntoFuture>::Future>, req: Option, } impl Future for AsyncHandlerServiceResponse where T: Future, - T::Item: Into, + T::Item: Responder, T::Error: Into, { type Item = ServiceResponse; type Error = Void; fn poll(&mut self) -> Poll { + if let Some(ref mut fut) = self.fut2 { + return match fut.poll() { + Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new( + self.req.take().unwrap(), + res, + ))), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + let res: Response = e.into().into(); + Ok(Async::Ready(ServiceResponse::new( + self.req.take().unwrap(), + res, + ))) + } + }; + } + match self.fut.poll() { - Ok(Async::Ready(res)) => Ok(Async::Ready(ServiceResponse::new( - self.req.take().unwrap(), - res.into(), - ))), + Ok(Async::Ready(res)) => { + self.fut2 = + Some(res.respond_to(self.req.as_ref().unwrap()).into_future()); + return self.poll(); + } Ok(Async::NotReady) => Ok(Async::NotReady), Err(e) => { let res: Response = e.into().into(); @@ -357,7 +381,7 @@ macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => { impl AsyncFactory<($($T,)+), Res> for Func where Func: Fn($($T,)+) -> Res + Clone + 'static, Res: IntoFuture, - Res::Item: Into, + Res::Item: Responder, Res::Error: Into, { fn call(&self, param: ($($T,)+)) -> Res { diff --git a/src/resource.rs b/src/resource.rs index 1f1e6e157..03c614a9d 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -217,7 +217,7 @@ where F: AsyncFactory, I: FromRequest + 'static, R: IntoFuture + 'static, - R::Item: Into, + R::Item: Responder, R::Error: Into, { self.routes.push(Route::new().to_async(handler)); diff --git a/src/route.rs b/src/route.rs index 7b9f36a64..8c97d7720 100644 --- a/src/route.rs +++ b/src/route.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; use std::rc::Rc; -use actix_http::{http::Method, Error, Extensions, Response}; +use actix_http::{http::Method, Error, Extensions}; use actix_service::{NewService, Service}; use futures::future::{ok, Either, FutureResult}; use futures::{Async, Future, IntoFuture, Poll}; @@ -278,7 +278,7 @@ impl Route { F: AsyncFactory, T: FromRequest + 'static, R: IntoFuture + 'static, - R::Item: Into, + R::Item: Responder, R::Error: Into, { self.service = Box::new(RouteNewService::new(Extract::new( @@ -418,18 +418,25 @@ where mod tests { use std::time::Duration; + use bytes::Bytes; use futures::Future; + use serde_derive::Serialize; use tokio_timer::sleep; use crate::http::{Method, StatusCode}; - use crate::test::{call_service, init_service, TestRequest}; + use crate::test::{call_service, init_service, read_body, TestRequest}; use crate::{error, web, App, HttpResponse}; + #[derive(Serialize, PartialEq, Debug)] + struct MyObject { + name: String, + } + #[test] fn test_route() { - let mut srv = - init_service( - App::new().service( + let mut srv = init_service( + App::new() + .service( web::resource("/test") .route(web::get().to(|| HttpResponse::Ok())) .route(web::put().to(|| { @@ -444,8 +451,15 @@ mod tests { Err::(error::ErrorBadRequest("err")) }) })), - ), - ); + ) + .service(web::resource("/json").route(web::get().to_async(|| { + sleep(Duration::from_millis(25)).then(|_| { + Ok::<_, crate::Error>(web::Json(MyObject { + name: "test".to_string(), + })) + }) + }))), + ); let req = TestRequest::with_uri("/test") .method(Method::GET) @@ -476,5 +490,12 @@ mod tests { .to_request(); let resp = call_service(&mut srv, req); assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED); + + let req = TestRequest::with_uri("/json").to_request(); + let resp = call_service(&mut srv, req); + assert_eq!(resp.status(), StatusCode::OK); + + let body = read_body(resp); + assert_eq!(body, Bytes::from_static(b"{\"name\":\"test\"}")); } } diff --git a/src/test.rs b/src/test.rs index d932adfd3..1f3a24271 100644 --- a/src/test.rs +++ b/src/test.rs @@ -193,6 +193,46 @@ where .unwrap_or_else(|_| panic!("read_response failed at block_on unwrap")) } +/// Helper function that returns a response body of a ServiceResponse. +/// This function blocks the current thread until futures complete. +/// +/// ```rust +/// use actix_web::{test, web, App, HttpResponse, http::header}; +/// use bytes::Bytes; +/// +/// #[test] +/// fn test_index() { +/// let mut app = test::init_service( +/// App::new().service( +/// web::resource("/index.html") +/// .route(web::post().to( +/// || HttpResponse::Ok().body("welcome!"))))); +/// +/// let req = test::TestRequest::post() +/// .uri("/index.html") +/// .header(header::CONTENT_TYPE, "application/json") +/// .to_request(); +/// +/// let resp = call_service(&mut srv, req); +/// let result = test::read_body(resp); +/// assert_eq!(result, Bytes::from_static(b"welcome!")); +/// } +/// ``` +pub fn read_body(mut res: ServiceResponse) -> Bytes +where + B: MessageBody, +{ + block_on(run_on(move || { + res.take_body() + .fold(BytesMut::new(), move |mut body, chunk| { + body.extend_from_slice(&chunk); + Ok::<_, Error>(body) + }) + .map(|body: BytesMut| body.freeze()) + })) + .unwrap_or_else(|_| panic!("read_response failed at block_on unwrap")) +} + /// Helper function that returns a deserialized response body of a TestRequest /// This function blocks the current thread until futures complete. /// diff --git a/src/web.rs b/src/web.rs index 079dec516..73314449c 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,5 +1,5 @@ //! Essentials helper functions and types for application registration. -use actix_http::{http::Method, Response}; +use actix_http::http::Method; use futures::{Future, IntoFuture}; pub use actix_http::Response as HttpResponse; @@ -268,7 +268,7 @@ where F: AsyncFactory, I: FromRequest + 'static, R: IntoFuture + 'static, - R::Item: Into, + R::Item: Responder, R::Error: Into, { Route::new().to_async(handler) From 3532602299f855ba1b9de291db9a8128fb72cb0f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 22 Apr 2019 21:22:17 -0700 Subject: [PATCH 07/14] Added support for remainder match (i.e /path/{tail}*) --- CHANGES.md | 3 +++ Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 6f0a2d422..f4fdd6af7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,9 @@ * Add helper functions for reading response body `test::read_body()` +* Added support for `remainder match` (i.e "/path/{tail}*") + + ### Changed * `.to_async()` handler can return `Responder` type #792 diff --git a/Cargo.toml b/Cargo.toml index c4e01e9fb..74fe78b4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ rust-tls = ["rustls", "actix-server/rust-tls"] actix-codec = "0.1.2" actix-service = "0.3.6" actix-utils = "0.3.4" -actix-router = "0.1.2" +actix-router = "0.1.3" actix-rt = "0.2.2" actix-web-codegen = "0.1.0-beta.1" actix-http = { version = "0.1.1", features=["fail"] } From 5d531989e70d6279ea97e7dead7a1feae3240abc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Apr 2019 09:42:19 -0700 Subject: [PATCH 08/14] Fix BorrowMutError panic in client connector #793 --- actix-http/CHANGES.md | 7 +++++++ actix-http/src/client/pool.rs | 28 ++++++++++++++-------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index fc33ff411..2edcceeb0 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.1.2] - 2019-04-23 + +### Fixed + +* Fix BorrowMutError panic in client connector #793 + + ## [0.1.1] - 2019-04-19 ### Changes diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 7b138dbaa..1164205ea 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -113,31 +113,31 @@ where match self.1.as_ref().borrow_mut().acquire(&key) { Acquire::Acquired(io, created) => { // use existing connection - Either::A(ok(IoConnection::new( + return Either::A(ok(IoConnection::new( io, created, Some(Acquired(key, Some(self.1.clone()))), - ))) - } - Acquire::NotAvailable => { - // connection is not available, wait - let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req); - Either::B(Either::A(WaitForConnection { - rx, - key, - token, - inner: Some(self.1.clone()), - })) + ))); } Acquire::Available => { // open new connection - Either::B(Either::B(OpenConnection::new( + return Either::B(Either::B(OpenConnection::new( key, self.1.clone(), self.0.call(req), - ))) + ))); } + _ => (), } + + // connection is not available, wait + let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req); + Either::B(Either::A(WaitForConnection { + rx, + key, + token, + inner: Some(self.1.clone()), + })) } } From 5f6a1a82492d6fbc1915a4b0f98c7bbfb828e008 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Apr 2019 09:45:39 -0700 Subject: [PATCH 09/14] update version --- actix-http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index ac15667f7..17e99bca7 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "0.1.1" +version = "0.1.2" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" From d2b0afd859e61f48f1057f37d09661afa1be1c36 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Apr 2019 14:57:03 -0700 Subject: [PATCH 10/14] Fix http client pool and wait queue management --- Cargo.toml | 4 +- actix-http/CHANGES.md | 9 ++ actix-http/src/client/connector.rs | 44 +++++-- actix-http/src/client/h1proto.rs | 2 +- actix-http/src/client/pool.rs | 205 +++++++++++++++++++++++++---- awc/Cargo.toml | 6 +- awc/tests/test_client.rs | 201 ++++++++++++++++++++++++++-- test-server/src/lib.rs | 11 ++ 8 files changed, 430 insertions(+), 52 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 74fe78b4f..a886b5fcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ actix-utils = "0.3.4" actix-router = "0.1.3" actix-rt = "0.2.2" actix-web-codegen = "0.1.0-beta.1" -actix-http = { version = "0.1.1", features=["fail"] } +actix-http = { version = "0.1.2", features=["fail"] } actix-server = "0.4.3" actix-server-config = "0.1.1" actix-threadpool = "0.1.0" @@ -98,7 +98,7 @@ openssl = { version="0.10", optional = true } rustls = { version = "^0.15", optional = true } [dev-dependencies] -actix-http = { version = "0.1.1", features=["ssl", "brotli", "flate2-zlib"] } +actix-http = { version = "0.1.2", features=["ssl", "brotli", "flate2-zlib"] } actix-http-test = { version = "0.1.0", features=["ssl"] } actix-files = { version = "0.1.0-beta.1" } rand = "0.6" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2edcceeb0..37d0eec65 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,5 +1,14 @@ # Changes +## [0.1.3] - 2019-04-23 + +### Fixed + +* Fix http client pool management + +* Fix http client wait queue management #794 + + ## [0.1.2] - 2019-04-23 ### Fixed diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 14df2eee8..0241e8472 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -114,7 +114,8 @@ where Request = TcpConnect, Response = TcpConnection, Error = actix_connect::ConnectError, - > + Clone, + > + Clone + + 'static, { /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. @@ -284,7 +285,9 @@ mod connect_impl { pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { pub(crate) tcp_pool: ConnectionPool, } @@ -293,7 +296,8 @@ mod connect_impl { where Io: AsyncRead + AsyncWrite + 'static, T: Service - + Clone, + + Clone + + 'static, { fn clone(&self) -> Self { InnerConnector { @@ -305,7 +309,9 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { type Request = Connect; type Response = IoConnection; @@ -356,9 +362,11 @@ mod connect_impl { Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, T1: Service - + Clone, + + Clone + + 'static, T2: Service - + Clone, + + Clone + + 'static, { fn clone(&self) -> Self { InnerConnector { @@ -372,8 +380,12 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service + + Clone + + 'static, + T2: Service + + Clone + + 'static, { type Request = Connect; type Response = EitherConnection; @@ -409,7 +421,9 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { fut: as Service>::Future, _t: PhantomData, @@ -417,7 +431,9 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service, + T: Service + + Clone + + 'static, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { @@ -435,7 +451,9 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { fut: as Service>::Future, _t: PhantomData, @@ -443,7 +461,9 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service, + T: Service + + Clone + + 'static, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index becc07528..97ed3bbc7 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -274,7 +274,7 @@ impl Stream for PlStream { Ok(Async::Ready(Some(chunk))) } else { let framed = self.framed.take().unwrap(); - let force_close = framed.get_codec().keepalive(); + let force_close = !framed.get_codec().keepalive(); release_connection(framed, force_close); Ok(Async::Ready(None)) } diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 1164205ea..8dedf72f5 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -49,7 +49,9 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { pub(crate) fn new( connector: T, @@ -69,7 +71,7 @@ where waiters: Slab::new(), waiters_queue: IndexSet::new(), available: HashMap::new(), - task: AtomicTask::new(), + task: None, })), ) } @@ -88,7 +90,9 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { type Request = Connect; type Response = IoConnection; @@ -131,7 +135,17 @@ where } // connection is not available, wait - let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req); + let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req); + + // start support future + if !support { + self.1.as_ref().borrow_mut().task = Some(AtomicTask::new()); + tokio_current_thread::spawn(ConnectorPoolSupport { + connector: self.0.clone(), + inner: self.1.clone(), + }) + } + Either::B(Either::A(WaitForConnection { rx, key, @@ -245,7 +259,7 @@ where Ok(Async::Ready(IoConnection::new( ConnectionType::H2(snd), Instant::now(), - Some(Acquired(self.key.clone(), self.inner.clone())), + Some(Acquired(self.key.clone(), self.inner.take())), ))) } Ok(Async::NotReady) => Ok(Async::NotReady), @@ -256,12 +270,11 @@ where match self.fut.poll() { Err(err) => Err(err), Ok(Async::Ready((io, proto))) => { - let _ = self.inner.take(); if proto == Protocol::Http1 { Ok(Async::Ready(IoConnection::new( ConnectionType::H1(io), Instant::now(), - Some(Acquired(self.key.clone(), self.inner.clone())), + Some(Acquired(self.key.clone(), self.inner.take())), ))) } else { self.h2 = Some(handshake(io)); @@ -279,7 +292,6 @@ enum Acquire { NotAvailable, } -// #[derive(Debug)] struct AvailableConnection { io: ConnectionType, used: Instant, @@ -298,7 +310,7 @@ pub(crate) struct Inner { oneshot::Sender, ConnectError>>, )>, waiters_queue: IndexSet<(Key, usize)>, - task: AtomicTask, + task: Option, } impl Inner { @@ -314,18 +326,6 @@ impl Inner { self.waiters.remove(token); self.waiters_queue.remove(&(key.clone(), token)); } - - fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { - self.acquired -= 1; - self.available - .entry(key.clone()) - .or_insert_with(VecDeque::new) - .push_back(AvailableConnection { - io, - created, - used: Instant::now(), - }); - } } impl Inner @@ -339,6 +339,7 @@ where ) -> ( oneshot::Receiver, ConnectError>>, usize, + bool, ) { let (tx, rx) = oneshot::channel(); @@ -346,8 +347,9 @@ where let entry = self.waiters.vacant_entry(); let token = entry.key(); entry.insert((connect, tx)); - assert!(!self.waiters_queue.insert((key, token))); - (rx, token) + assert!(self.waiters_queue.insert((key, token))); + + (rx, token, self.task.is_some()) } fn acquire(&mut self, key: &Key) -> Acquire { @@ -400,6 +402,19 @@ where Acquire::Available } + fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { + self.acquired -= 1; + self.available + .entry(key.clone()) + .or_insert_with(VecDeque::new) + .push_back(AvailableConnection { + io, + created, + used: Instant::now(), + }); + self.check_availibility(); + } + fn release_close(&mut self, io: ConnectionType) { self.acquired -= 1; if let Some(timeout) = self.disconnect_timeout { @@ -407,11 +422,12 @@ where tokio_current_thread::spawn(CloseConnection::new(io, timeout)) } } + self.check_availibility(); } fn check_availibility(&self) { if !self.waiters_queue.is_empty() && self.acquired < self.limit { - self.task.notify() + self.task.as_ref().map(|t| t.notify()); } } } @@ -451,6 +467,147 @@ where } } +struct ConnectorPoolSupport +where + Io: AsyncRead + AsyncWrite + 'static, +{ + connector: T, + inner: Rc>>, +} + +impl Future for ConnectorPoolSupport +where + Io: AsyncRead + AsyncWrite + 'static, + T: Service, + T::Future: 'static, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let mut inner = self.inner.as_ref().borrow_mut(); + inner.task.as_ref().unwrap().register(); + + // check waiters + loop { + let (key, token) = { + if let Some((key, token)) = inner.waiters_queue.get_index(0) { + (key.clone(), *token) + } else { + break; + } + }; + match inner.acquire(&key) { + Acquire::NotAvailable => break, + Acquire::Acquired(io, created) => { + let (_, tx) = inner.waiters.remove(token); + if let Err(conn) = tx.send(Ok(IoConnection::new( + io, + created, + Some(Acquired(key.clone(), Some(self.inner.clone()))), + ))) { + let (io, created) = conn.unwrap().into_inner(); + inner.release_conn(&key, io, created); + } + } + Acquire::Available => { + let (connect, tx) = inner.waiters.remove(token); + OpenWaitingConnection::spawn( + key.clone(), + tx, + self.inner.clone(), + self.connector.call(connect), + ); + } + } + let _ = inner.waiters_queue.swap_remove_index(0); + } + + Ok(Async::NotReady) + } +} + +struct OpenWaitingConnection +where + Io: AsyncRead + AsyncWrite + 'static, +{ + fut: F, + key: Key, + h2: Option>, + rx: Option, ConnectError>>>, + inner: Option>>>, +} + +impl OpenWaitingConnection +where + F: Future + 'static, + Io: AsyncRead + AsyncWrite + 'static, +{ + fn spawn( + key: Key, + rx: oneshot::Sender, ConnectError>>, + inner: Rc>>, + fut: F, + ) { + tokio_current_thread::spawn(OpenWaitingConnection { + key, + fut, + h2: None, + rx: Some(rx), + inner: Some(inner), + }) + } +} + +impl Drop for OpenWaitingConnection +where + Io: AsyncRead + AsyncWrite + 'static, +{ + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let mut inner = inner.as_ref().borrow_mut(); + inner.release(); + inner.check_availibility(); + } + } +} + +impl Future for OpenWaitingConnection +where + F: Future, + Io: AsyncRead + AsyncWrite, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Err(err) => { + let _ = self.inner.take(); + if let Some(rx) = self.rx.take() { + let _ = rx.send(Err(err)); + } + Err(()) + } + Ok(Async::Ready((io, proto))) => { + if proto == Protocol::Http1 { + let rx = self.rx.take().unwrap(); + let _ = rx.send(Ok(IoConnection::new( + ConnectionType::H1(io), + Instant::now(), + Some(Acquired(self.key.clone(), self.inner.take())), + ))); + Ok(Async::Ready(())) + } else { + self.h2 = Some(handshake(io)); + self.poll() + } + } + Ok(Async::NotReady) => Ok(Async::NotReady), + } + } +} + pub(crate) struct Acquired(Key, Option>>>); impl Acquired diff --git a/awc/Cargo.toml b/awc/Cargo.toml index a254d69c6..e6018f44f 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -38,7 +38,7 @@ flate2-rust = ["actix-http/flate2-rust"] [dependencies] actix-codec = "0.1.2" actix-service = "0.3.6" -actix-http = "0.1.1" +actix-http = "0.1.2" base64 = "0.10.1" bytes = "0.4" derive_more = "0.14" @@ -55,8 +55,8 @@ openssl = { version="0.10", optional = true } [dev-dependencies] actix-rt = "0.2.2" -actix-web = { version = "1.0.0-alpha.6", features=["ssl"] } -actix-http = { version = "0.1.1", features=["ssl"] } +actix-web = { version = "1.0.0-beta.1", features=["ssl"] } +actix-http = { version = "0.1.2", features=["ssl"] } actix-http-test = { version = "0.1.0", features=["ssl"] } actix-utils = "0.3.4" actix-server = { version = "0.4.3", features=["ssl"] } diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index afccdff86..d1139fdc5 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use brotli2::write::BrotliEncoder; @@ -7,11 +9,12 @@ use bytes::Bytes; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use flate2::Compression; -use futures::future::Future; +use futures::Future; use rand::Rng; use actix_http::HttpService; use actix_http_test::TestServer; +use actix_service::{fn_service, NewService}; use actix_web::http::Cookie; use actix_web::middleware::{BodyEncoding, Compress}; use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse}; @@ -144,17 +147,195 @@ fn test_timeout_override() { } #[test] -fn test_connection_close() { - let mut srv = TestServer::new(|| { - HttpService::new( - App::new().service(web::resource("/").to(|| HttpResponse::Ok())), - ) +fn test_connection_reuse() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + )) }); - let res = srv - .block_on(awc::Client::new().get(srv.url("/")).force_close().send()) - .unwrap(); - assert!(res.status().is_success()); + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // one connection + assert_eq!(num.load(Ordering::Relaxed), 1); +} + +#[test] +fn test_connection_force_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + )) + }); + + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).force_close().send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")).force_close(); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); +} + +#[test] +fn test_connection_server_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().finish())), + ), + )) + }); + + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); +} + +#[test] +fn test_connection_wait_queue() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new(App::new().service( + web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))), + ))) + }); + + let client = awc::Client::build() + .connector(awc::Connector::new().limit(1).finish()) + .finish(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req2 = client.post(srv.url("/")); + let req2_fut = srv.execute(move || { + let mut fut = req2.send(); + assert!(fut.poll().unwrap().is_not_ready()); + fut + }); + + // read response 1 + let bytes = srv.block_on(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + // req 2 + let response = srv.block_on(req2_fut).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 1); +} + +#[test] +fn test_connection_wait_queue_force_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().body(STR))), + ), + )) + }); + + let client = awc::Client::build() + .connector(awc::Connector::new().limit(1).finish()) + .finish(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req2 = client.post(srv.url("/")); + let req2_fut = srv.execute(move || { + let mut fut = req2.send(); + assert!(fut.poll().unwrap().is_not_ready()); + fut + }); + + // read response 1 + let bytes = srv.block_on(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + // req 2 + let response = srv.block_on(req2_fut).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); } #[test] diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 37abe1292..42d07549d 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -124,6 +124,7 @@ impl TestServer { |e| log::error!("Can not set alpn protocol: {:?}", e), ); Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) .timeout(time::Duration::from_millis(500)) .ssl(builder.build()) .finish() @@ -131,6 +132,7 @@ impl TestServer { #[cfg(not(feature = "ssl"))] { Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) .timeout(time::Duration::from_millis(500)) .finish() } @@ -163,6 +165,15 @@ impl TestServerRuntime { self.rt.block_on(fut) } + /// Execute future on current core + pub fn block_on_fn(&mut self, f: F) -> Result + where + F: FnOnce() -> R, + R: Future, + { + self.rt.block_on(lazy(|| f())) + } + /// Execute function on current core pub fn execute(&mut self, fut: F) -> R where From 9702b2d88edec888ec88bdfa4736bc0c99971860 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Apr 2019 15:06:30 -0700 Subject: [PATCH 11/14] add client h2 reuse test --- actix-http/Cargo.toml | 2 +- awc/tests/test_client.rs | 81 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 17e99bca7..438754b3f 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "0.1.2" +version = "0.1.3" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index d1139fdc5..7e2dc6ba4 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -12,10 +12,11 @@ use flate2::Compression; use futures::Future; use rand::Rng; +use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::HttpService; use actix_http_test::TestServer; use actix_service::{fn_service, NewService}; -use actix_web::http::Cookie; +use actix_web::http::{Cookie, Version}; use actix_web::middleware::{BodyEncoding, Compress}; use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse}; use awc::error::SendRequestError; @@ -42,6 +43,30 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; +#[cfg(feature = "ssl")] +fn ssl_acceptor( +) -> std::io::Result> { + use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; + // load ssl keys + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + builder + .set_private_key_file("../tests/key.pem", SslFiletype::PEM) + .unwrap(); + builder + .set_certificate_chain_file("../tests/cert.pem") + .unwrap(); + builder.set_alpn_select_callback(|_, protos| { + const H2: &[u8] = b"\x02h2"; + if protos.windows(3).any(|window| window == H2) { + Ok(b"h2") + } else { + Err(openssl::ssl::AlpnError::NOACK) + } + }); + builder.set_alpn_protos(b"\x02h2")?; + Ok(actix_server::ssl::OpensslAcceptor::new(builder.build())) +} + #[test] fn test_simple() { let mut srv = @@ -178,6 +203,60 @@ fn test_connection_reuse() { assert_eq!(num.load(Ordering::Relaxed), 1); } +#[cfg(feature = "ssl")] +#[test] +fn test_connection_reuse_h2() { + let openssl = ssl_acceptor().unwrap(); + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then( + openssl + .clone() + .map_err(|e| println!("Openssl error: {}", e)), + ) + .and_then( + HttpService::build() + .h2(App::new() + .service(web::resource("/").route(web::to(|| HttpResponse::Ok())))) + .map_err(|_| ()), + ) + }); + + // disable ssl verification + use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; + + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let _ = builder + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + + let client = awc::Client::build() + .connector(awc::Connector::new().ssl(builder.build()).finish()) + .finish(); + + // req 1 + let request = client.get(srv.surl("/")).send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.surl("/")); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + assert_eq!(response.version(), Version::HTTP_2); + + // one connection + assert_eq!(num.load(Ordering::Relaxed), 1); +} + #[test] fn test_connection_force_close() { let num = Arc::new(AtomicUsize::new(0)); From 898ef570803c983aaf92e2d3a0ecf7e5d278cbb6 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 23 Apr 2019 21:21:49 -0700 Subject: [PATCH 12/14] Fix async web::Data factory handling --- CHANGES.md | 5 +++++ src/app.rs | 27 +++++++++++++++++++++++++-- src/app_service.rs | 2 +- src/config.rs | 41 ++++++++++++++++++++++++++++++++++++++--- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f4fdd6af7..565362263 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,11 @@ * `.to_async()` handler can return `Responder` type #792 +### Fixed + +* Fix async web::Data factory handling + + ## [1.0.0-beta.1] - 2019-04-20 ### Added diff --git a/src/app.rs b/src/app.rs index c0bbc8b2b..bb6d2aef8 100644 --- a/src/app.rs +++ b/src/app.rs @@ -431,13 +431,14 @@ where #[cfg(test)] mod tests { use actix_service::Service; + use bytes::Bytes; use futures::{Future, IntoFuture}; use super::*; use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::service::{ServiceRequest, ServiceResponse}; - use crate::test::{block_on, call_service, init_service, TestRequest}; - use crate::{web, Error, HttpResponse}; + use crate::test::{block_on, call_service, init_service, read_body, TestRequest}; + use crate::{web, Error, HttpRequest, HttpResponse}; #[test] fn test_default_resource() { @@ -598,4 +599,26 @@ mod tests { HeaderValue::from_static("0001") ); } + + #[test] + fn test_external_resource() { + let mut srv = init_service( + App::new() + .external_resource("youtube", "https://youtube.com/watch/{video_id}") + .route( + "/test", + web::get().to(|req: HttpRequest| { + HttpResponse::Ok().body(format!( + "{}", + req.url_for("youtube", &["12345"]).unwrap() + )) + }), + ), + ); + let req = TestRequest::with_uri("/test").to_request(); + let resp = call_service(&mut srv, req); + assert_eq!(resp.status(), StatusCode::OK); + let body = read_body(resp); + assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345")); + } } diff --git a/src/app_service.rs b/src/app_service.rs index 63bf84e76..7229a2301 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -162,7 +162,7 @@ where } } - if self.endpoint.is_some() { + if self.endpoint.is_some() && self.data.is_empty() { Ok(Async::Ready(AppInitService { service: self.endpoint.take().unwrap(), rmap: self.rmap.clone(), diff --git a/src/config.rs b/src/config.rs index a8caba4d2..4c4bfa220 100644 --- a/src/config.rs +++ b/src/config.rs @@ -253,11 +253,14 @@ impl ServiceConfig { #[cfg(test)] mod tests { use actix_service::Service; + use bytes::Bytes; + use futures::Future; + use tokio_timer::sleep; use super::*; use crate::http::{Method, StatusCode}; - use crate::test::{block_on, call_service, init_service, TestRequest}; - use crate::{web, App, HttpResponse}; + use crate::test::{block_on, call_service, init_service, read_body, TestRequest}; + use crate::{web, App, HttpRequest, HttpResponse}; #[test] fn test_data() { @@ -277,7 +280,12 @@ mod tests { #[test] fn test_data_factory() { let cfg = |cfg: &mut ServiceConfig| { - cfg.data_factory(|| Ok::<_, ()>(10usize)); + cfg.data_factory(|| { + sleep(std::time::Duration::from_millis(50)).then(|_| { + println!("READY"); + Ok::<_, ()>(10usize) + }) + }); }; let mut srv = @@ -301,6 +309,33 @@ mod tests { assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); } + #[test] + fn test_external_resource() { + let mut srv = init_service( + App::new() + .configure(|cfg| { + cfg.external_resource( + "youtube", + "https://youtube.com/watch/{video_id}", + ); + }) + .route( + "/test", + web::get().to(|req: HttpRequest| { + HttpResponse::Ok().body(format!( + "{}", + req.url_for("youtube", &["12345"]).unwrap() + )) + }), + ), + ); + let req = TestRequest::with_uri("/test").to_request(); + let resp = call_service(&mut srv, req); + assert_eq!(resp.status(), StatusCode::OK); + let body = read_body(resp); + assert_eq!(body, Bytes::from_static(b"https://youtube.com/watch/12345")); + } + #[test] fn test_service() { let mut srv = init_service(App::new().configure(|cfg| { From 42644dac3f2827ac290d9f1e64591c35dbce395f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 24 Apr 2019 07:31:33 -0700 Subject: [PATCH 13/14] prepare actix-http-test release --- Cargo.toml | 2 +- test-server/CHANGES.md | 5 +++++ test-server/Cargo.toml | 8 ++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a886b5fcc..f4720a1c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ rustls = { version = "^0.15", optional = true } [dev-dependencies] actix-http = { version = "0.1.2", features=["ssl", "brotli", "flate2-zlib"] } -actix-http-test = { version = "0.1.0", features=["ssl"] } +actix-http-test = { version = "0.1.1", features=["ssl"] } actix-files = { version = "0.1.0-beta.1" } rand = "0.6" env_logger = "0.6" diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index cec01fde6..700b3aa1f 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [0.1.1] - 2019-04-24 + +* Always make new connection for http client + + ## [0.1.0] - 2019-04-16 * No changes diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 657dd2615..906c9d389 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http-test" -version = "0.1.0" +version = "0.1.1" authors = ["Nikolay Kim "] description = "Actix http test server" readme = "README.md" @@ -35,7 +35,7 @@ actix-rt = "0.2.2" actix-service = "0.3.6" actix-server = "0.4.3" actix-utils = "0.3.5" -awc = "0.1.0" +awc = "0.1.1" base64 = "0.10" bytes = "0.4" @@ -55,5 +55,5 @@ tokio-timer = "0.2" openssl = { version="0.10", optional = true } [dev-dependencies] -actix-web = "1.0.0-alpha.5" -actix-http = "0.1.0" +actix-web = "1.0.0-beta.1" +actix-http = "0.1.2" From 679d1cd51360f62fe5f0084893591b6003671091 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 24 Apr 2019 10:25:46 -0700 Subject: [PATCH 14/14] allow to override responder's status code and headers --- CHANGES.md | 2 + src/responder.rs | 193 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 193 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 565362263..81be2a344 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ### Added +* Extend `Responder` trait, allow to override status code and headers. + * Add helper functions for reading response body `test::read_body()` * Added support for `remainder match` (i.e "/path/{tail}*") diff --git a/src/responder.rs b/src/responder.rs index 103009e75..f7f2a8b3a 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -1,8 +1,12 @@ use actix_http::error::InternalError; -use actix_http::{http::StatusCode, Error, Response, ResponseBuilder}; +use actix_http::http::{ + header::IntoHeaderValue, Error as HttpError, HeaderMap, HeaderName, HttpTryFrom, + StatusCode, +}; +use actix_http::{Error, Response, ResponseBuilder}; use bytes::{Bytes, BytesMut}; use futures::future::{err, ok, Either as EitherFuture, FutureResult}; -use futures::{Future, IntoFuture, Poll}; +use futures::{try_ready, Async, Future, IntoFuture, Poll}; use crate::request::HttpRequest; @@ -18,6 +22,51 @@ pub trait Responder { /// Convert itself to `AsyncResult` or `Error`. fn respond_to(self, req: &HttpRequest) -> Self::Future; + + /// Override a status code for a responder. + /// + /// ```rust + /// use actix_web::{HttpRequest, Responder, http::StatusCode}; + /// + /// fn index(req: HttpRequest) -> impl Responder { + /// "Welcome!".with_status(StatusCode::OK) + /// } + /// # fn main() {} + /// ``` + fn with_status(self, status: StatusCode) -> CustomResponder + where + Self: Sized, + { + CustomResponder::new(self).with_status(status) + } + + /// Add extra header to the responder's response. + /// + /// ```rust + /// use actix_web::{web, HttpRequest, Responder}; + /// use serde::Serialize; + /// + /// #[derive(Serialize)] + /// struct MyObj { + /// name: String, + /// } + /// + /// fn index(req: HttpRequest) -> impl Responder { + /// web::Json( + /// MyObj{name: "Name".to_string()} + /// ) + /// .with_header("x-version", "1.2.3") + /// } + /// # fn main() {} + /// ``` + fn with_header(self, key: K, value: V) -> CustomResponder + where + Self: Sized, + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + CustomResponder::new(self).with_header(key, value) + } } impl Responder for Response { @@ -154,6 +203,117 @@ impl Responder for BytesMut { } } +/// Allows to override status code and headers for a responder. +pub struct CustomResponder { + responder: T, + status: Option, + headers: Option, + error: Option, +} + +impl CustomResponder { + fn new(responder: T) -> Self { + CustomResponder { + responder, + status: None, + headers: None, + error: None, + } + } + + /// Override a status code for the responder's response. + /// + /// ```rust + /// use actix_web::{HttpRequest, Responder, http::StatusCode}; + /// + /// fn index(req: HttpRequest) -> impl Responder { + /// "Welcome!".with_status(StatusCode::OK) + /// } + /// # fn main() {} + /// ``` + pub fn with_status(mut self, status: StatusCode) -> Self { + self.status = Some(status); + self + } + + /// Add extra header to the responder's response. + /// + /// ```rust + /// use actix_web::{web, HttpRequest, Responder}; + /// use serde::Serialize; + /// + /// #[derive(Serialize)] + /// struct MyObj { + /// name: String, + /// } + /// + /// fn index(req: HttpRequest) -> impl Responder { + /// web::Json( + /// MyObj{name: "Name".to_string()} + /// ) + /// .with_header("x-version", "1.2.3") + /// } + /// # fn main() {} + /// ``` + pub fn with_header(mut self, key: K, value: V) -> Self + where + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + if self.headers.is_none() { + self.headers = Some(HeaderMap::new()); + } + + match HeaderName::try_from(key) { + Ok(key) => match value.try_into() { + Ok(value) => { + self.headers.as_mut().unwrap().append(key, value); + } + Err(e) => self.error = Some(e.into()), + }, + Err(e) => self.error = Some(e.into()), + }; + self + } +} + +impl Responder for CustomResponder { + type Error = T::Error; + type Future = CustomResponderFut; + + fn respond_to(self, req: &HttpRequest) -> Self::Future { + CustomResponderFut { + fut: self.responder.respond_to(req).into_future(), + status: self.status, + headers: self.headers, + } + } +} + +pub struct CustomResponderFut { + fut: ::Future, + status: Option, + headers: Option, +} + +impl Future for CustomResponderFut { + type Item = Response; + type Error = T::Error; + + fn poll(&mut self) -> Poll { + let mut res = try_ready!(self.fut.poll()); + if let Some(status) = self.status { + *res.status_mut() = status; + } + if let Some(ref headers) = self.headers { + for (k, v) in headers { + res.headers_mut().insert(k.clone(), v.clone()); + } + } + Ok(Async::Ready(res)) + } +} + /// Combines two different responder types into a single type /// /// ```rust @@ -435,4 +595,33 @@ pub(crate) mod tests { ); assert!(res.is_err()); } + + #[test] + fn test_custom_responder() { + let req = TestRequest::default().to_http_request(); + let res = block_on( + "test" + .to_string() + .with_status(StatusCode::BAD_REQUEST) + .respond_to(&req), + ) + .unwrap(); + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + assert_eq!(res.body().bin_ref(), b"test"); + + let res = block_on( + "test" + .to_string() + .with_header("content-type", "json") + .respond_to(&req), + ) + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!(res.body().bin_ref(), b"test"); + assert_eq!( + res.headers().get(CONTENT_TYPE).unwrap(), + HeaderValue::from_static("json") + ); + } }