diff --git a/actix-http-test/CHANGES.md b/actix-http-test/CHANGES.md index f20ff75e2..03507d587 100644 --- a/actix-http-test/CHANGES.md +++ b/actix-http-test/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2021-xx-xx +* `TestServer::stop` is now async and will wait for the server and system to shutdown. [#2442] * Update `actix-server` to `2.0.0-beta.9`. [#2442] * Minimum supported Rust version (MSRV) is now 1.52. diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index aa0b5ff61..de4dcfc73 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -48,6 +48,7 @@ serde_json = "1.0" slab = "0.4" serde_urlencoded = "0.7" tls-openssl = { version = "0.10.9", package = "openssl", optional = true } +tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] actix-web = { version = "4.0.0-beta.10", default-features = false, features = ["cookies"] } diff --git a/actix-http-test/src/lib.rs b/actix-http-test/src/lib.rs index 85d2d578b..cda98cea5 100644 --- a/actix-http-test/src/lib.rs +++ b/actix-http-test/src/lib.rs @@ -7,7 +7,7 @@ #[cfg(feature = "openssl")] extern crate tls_openssl as openssl; -use std::{net, sync::mpsc, thread, time::Duration}; +use std::{net, thread, time::Duration}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::{net::TcpStream, System}; @@ -19,6 +19,7 @@ use bytes::Bytes; use futures_core::stream::Stream; use http::Method; use socket2::{Domain, Protocol, Socket, Type}; +use tokio::sync::mpsc; /// Start test server /// @@ -55,12 +56,13 @@ pub async fn test_server>(factory: F) -> TestServer test_server_with_addr(tcp, factory).await } -/// Start [`test server`](test_server()) on a concrete Address +/// Start [`test server`](test_server()) on an existing address binding. pub async fn test_server_with_addr>( tcp: net::TcpListener, factory: F, ) -> TestServer { - let (tx, rx) = mpsc::channel(); + let (started_tx, started_rx) = std::sync::mpsc::channel(); + let (thread_stop_tx, thread_stop_rx) = mpsc::channel(1); // run server in separate thread thread::spawn(move || { @@ -68,59 +70,73 @@ pub async fn test_server_with_addr>( let local_addr = tcp.local_addr().unwrap(); let srv = Server::build() - .listen("test", tcp, factory)? .workers(1) - .disable_signals(); + .disable_signals() + .listen("test", tcp, factory) + .expect("test server could not be created"); - sys.block_on(async { - tx.send((System::current(), local_addr)).unwrap(); - srv.run().await.unwrap(); - }); + let srv = srv.run(); + started_tx + .send((System::current(), srv.handle(), local_addr)) + .unwrap(); - sys.run() + // drive server loop + sys.block_on(srv).unwrap(); + + // start system event loop + sys.run().unwrap(); + + // notify TestServer that server and system have shut down + // all thread managed resources should be dropped at this point + let _ = thread_stop_tx.send(()); }); - let (system, addr) = rx.recv().unwrap(); + let (system, server, addr) = started_rx.recv().unwrap(); let client = { + #[cfg(feature = "openssl")] let connector = { - #[cfg(feature = "openssl")] - { - use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; + use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; - let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); - builder.set_verify(SslVerifyMode::NONE); - let _ = builder - .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); - Connector::new() - .conn_lifetime(Duration::from_secs(0)) - .timeout(Duration::from_millis(30000)) - .ssl(builder.build()) - } - #[cfg(not(feature = "openssl"))] - { - Connector::new() - .conn_lifetime(Duration::from_secs(0)) - .timeout(Duration::from_millis(30000)) - } + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + + builder.set_verify(SslVerifyMode::NONE); + let _ = builder + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + + Connector::new() + .conn_lifetime(Duration::from_secs(0)) + .timeout(Duration::from_millis(30000)) + .ssl(builder.build()) + }; + + #[cfg(not(feature = "openssl"))] + let connector = { + Connector::new() + .conn_lifetime(Duration::from_secs(0)) + .timeout(Duration::from_millis(30000)) }; Client::builder().connector(connector).finish() }; TestServer { - addr, + server, client, system, + addr, + thread_stop_rx, } } /// Test server controller pub struct TestServer { + server: actix_server::ServerHandle, + client: awc::Client, + system: actix_rt::System, addr: net::SocketAddr, - client: Client, - system: System, + thread_stop_rx: mpsc::Receiver<()>, } impl TestServer { @@ -257,15 +273,32 @@ impl TestServer { self.client.headers() } - /// Stop HTTP server - fn stop(&mut self) { + /// Gracefully stop HTTP server. + /// + /// Waits for spawned `Server` and `System` to shutdown gracefully. + pub async fn stop(&mut self) { + // signal server to stop + self.server.stop(true).await; + + // also signal system to stop + // though this is handled by `ServerBuilder::exit_system` too self.system.stop(); + + // wait for thread to be stopped but don't care about result + let _ = self.thread_stop_rx.recv().await; } } impl Drop for TestServer { fn drop(&mut self) { - self.stop() + // calls in this Drop impl should be enough to shut down the server, system, and thread + // without needing to await anything + + // signal server to stop + let _ = self.server.stop(true); + + // signal system to stop + self.system.stop(); } } diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index c04aeae00..ea78ce113 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -24,7 +24,7 @@ use regex::Regex; #[actix_rt::test] async fn test_h1() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) .client_timeout(1000) @@ -39,11 +39,13 @@ async fn test_h1() { let response = srv.get("/").send().await.unwrap(); assert!(response.status().is_success()); + + srv.stop().await; } #[actix_rt::test] async fn test_h1_2() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) .client_timeout(1000) @@ -59,6 +61,8 @@ async fn test_h1_2() { let response = srv.get("/").send().await.unwrap(); assert!(response.status().is_success()); + + srv.stop().await; } #[derive(Debug, Display, Error)] @@ -73,7 +77,7 @@ impl From for Response { #[actix_rt::test] async fn test_expect_continue() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { if req.head().uri.query() == Some("yes=") { @@ -98,11 +102,13 @@ async fn test_expect_continue() { let mut data = String::new(); let _ = stream.read_to_string(&mut data); assert!(data.starts_with("HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\n")); + + srv.stop().await; } #[actix_rt::test] async fn test_expect_continue_h1() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { sleep(Duration::from_millis(20)).then(move |_| { @@ -129,6 +135,8 @@ async fn test_expect_continue_h1() { let mut data = String::new(); let _ = stream.read_to_string(&mut data); assert!(data.starts_with("HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\n")); + + srv.stop().await; } #[actix_rt::test] @@ -136,7 +144,7 @@ async fn test_chunked_payload() { let chunk_sizes = vec![32768, 32, 32768]; let total_size: usize = chunk_sizes.iter().sum(); - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(fn_service(|mut request: Request| { request @@ -188,11 +196,13 @@ async fn test_chunked_payload() { }; assert_eq!(returned_size, total_size); + + srv.stop().await; } #[actix_rt::test] async fn test_slow_request() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .client_timeout(100) .finish(|_| ok::<_, Infallible>(Response::ok())) @@ -205,11 +215,13 @@ async fn test_slow_request() { let mut data = String::new(); let _ = stream.read_to_string(&mut data); assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); + + srv.stop().await; } #[actix_rt::test] async fn test_http1_malformed_request() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() @@ -221,11 +233,13 @@ async fn test_http1_malformed_request() { let mut data = String::new(); let _ = stream.read_to_string(&mut data); assert!(data.starts_with("HTTP/1.1 400 Bad Request")); + + srv.stop().await; } #[actix_rt::test] async fn test_http1_keepalive() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() @@ -242,11 +256,13 @@ async fn test_http1_keepalive() { let mut data = vec![0; 1024]; let _ = stream.read(&mut data); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + + srv.stop().await; } #[actix_rt::test] async fn test_http1_keepalive_timeout() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .keep_alive(1) .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -264,11 +280,13 @@ async fn test_http1_keepalive_timeout() { let mut data = vec![0; 1024]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); + + srv.stop().await; } #[actix_rt::test] async fn test_http1_keepalive_close() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() @@ -285,11 +303,13 @@ async fn test_http1_keepalive_close() { let mut data = vec![0; 1024]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); + + srv.stop().await; } #[actix_rt::test] async fn test_http10_keepalive_default_close() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() @@ -305,11 +325,13 @@ async fn test_http10_keepalive_default_close() { let mut data = vec![0; 1024]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); + + srv.stop().await; } #[actix_rt::test] async fn test_http10_keepalive() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() @@ -332,11 +354,13 @@ async fn test_http10_keepalive() { let mut data = vec![0; 1024]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); + + srv.stop().await; } #[actix_rt::test] async fn test_http1_keepalive_disabled() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -353,6 +377,8 @@ async fn test_http1_keepalive_disabled() { let mut data = vec![0; 1024]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); + + srv.stop().await; } #[actix_rt::test] @@ -362,7 +388,7 @@ async fn test_content_length() { StatusCode, }; - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|req: Request| { let indx: usize = req.uri().path()[1..].parse().unwrap(); @@ -400,6 +426,8 @@ async fn test_content_length() { assert_eq!(response.headers().get(&header), Some(&value)); } } + + srv.stop().await; } #[actix_rt::test] @@ -439,6 +467,8 @@ async fn test_h1_headers() { // read response let bytes = srv.load_body(response).await.unwrap(); assert_eq!(bytes, Bytes::from(data2)); + + srv.stop().await; } const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ @@ -478,6 +508,8 @@ async fn test_h1_body() { // read response let bytes = srv.load_body(response).await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -503,6 +535,8 @@ async fn test_h1_head_empty() { // read response let bytes = srv.load_body(response).await.unwrap(); assert!(bytes.is_empty()); + + srv.stop().await; } #[actix_rt::test] @@ -528,11 +562,13 @@ async fn test_h1_head_binary() { // read response let bytes = srv.load_body(response).await.unwrap(); assert!(bytes.is_empty()); + + srv.stop().await; } #[actix_rt::test] async fn test_h1_head_binary2() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) .tcp() @@ -549,6 +585,8 @@ async fn test_h1_head_binary2() { .unwrap(); assert_eq!(format!("{}", STR.len()), len.to_str().unwrap()); } + + srv.stop().await; } #[actix_rt::test] @@ -571,6 +609,8 @@ async fn test_h1_body_length() { // read response let bytes = srv.load_body(response).await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -606,6 +646,8 @@ async fn test_h1_body_chunked_explicit() { // decode assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -635,6 +677,8 @@ async fn test_h1_body_chunked_implicit() { // read response let bytes = srv.load_body(response).await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -662,6 +706,8 @@ async fn test_h1_response_http_error_handling() { bytes, Bytes::from_static(b"error processing HTTP: failed to parse header value") ); + + srv.stop().await; } #[derive(Debug, Display, Error)] @@ -689,11 +735,13 @@ async fn test_h1_service_error() { // read response let bytes = srv.load_body(response).await.unwrap(); assert_eq!(bytes, Bytes::from_static(b"error")); + + srv.stop().await; } #[actix_rt::test] async fn test_h1_on_connect() { - let srv = test_server(|| { + let mut srv = test_server(|| { HttpService::build() .on_connect_ext(|_, data| { data.insert(20isize); @@ -708,4 +756,6 @@ async fn test_h1_on_connect() { let response = srv.get("/").send().await.unwrap(); assert!(response.status().is_success()); + + srv.stop().await; } diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index e3b2a39e8..cf5738aa0 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -288,7 +288,7 @@ where .send((System::current(), srv.handle(), local_addr)) .unwrap(); - // + // drive server loop sys.block_on(srv).unwrap(); // start system event loop diff --git a/tests/test_server.rs b/tests/test_server.rs index b2a59e84c..d21dac8cf 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -127,6 +127,8 @@ async fn test_body() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -154,6 +156,8 @@ async fn test_body_gzip() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -181,6 +185,8 @@ async fn test_body_gzip2() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -241,6 +247,8 @@ async fn test_body_encoding_override() { e.write_all(bytes.as_ref()).unwrap(); let dec = e.finish().unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -275,6 +283,8 @@ async fn test_body_gzip_large() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -314,6 +324,8 @@ async fn test_body_gzip_large_random() { e.read_to_end(&mut dec).unwrap(); assert_eq!(dec.len(), data.len()); assert_eq!(Bytes::from(dec), Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -348,6 +360,8 @@ async fn test_body_chunked_implicit() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -380,6 +394,8 @@ async fn test_body_br_streaming() { let dec = e.finish().unwrap(); println!("T: {:?}", Bytes::copy_from_slice(&dec)); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -401,6 +417,8 @@ async fn test_head_binary() { // read response let bytes = response.body().await.unwrap(); assert!(bytes.is_empty()); + + srv.stop().await; } #[actix_rt::test] @@ -420,6 +438,8 @@ async fn test_no_chunking() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -447,6 +467,8 @@ async fn test_body_deflate() { e.write_all(bytes.as_ref()).unwrap(); let dec = e.finish().unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -475,6 +497,8 @@ async fn test_body_brotli() { e.write_all(bytes.as_ref()).unwrap(); let dec = e.finish().unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -503,6 +527,8 @@ async fn test_body_zstd() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -534,6 +560,8 @@ async fn test_body_zstd_streaming() { let mut dec = Vec::new(); e.read_to_end(&mut dec).unwrap(); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -559,6 +587,8 @@ async fn test_zstd_encoding() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -594,6 +624,8 @@ async fn test_zstd_encoding_large() { // read response let bytes = response.body().limit(320_000).await.unwrap(); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -619,6 +651,8 @@ async fn test_encoding() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -644,6 +678,8 @@ async fn test_gzip_encoding() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -670,6 +706,8 @@ async fn test_gzip_encoding_large() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -702,6 +740,8 @@ async fn test_reading_gzip_encoding_large_random() { let bytes = response.body().await.unwrap(); assert_eq!(bytes.len(), data.len()); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -727,6 +767,8 @@ async fn test_reading_deflate_encoding() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -753,6 +795,8 @@ async fn test_reading_deflate_encoding_large() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -785,6 +829,8 @@ async fn test_reading_deflate_encoding_large_random() { let bytes = response.body().await.unwrap(); assert_eq!(bytes.len(), data.len()); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[actix_rt::test] @@ -810,6 +856,8 @@ async fn test_brotli_encoding() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + srv.stop().await; } #[actix_rt::test] @@ -845,6 +893,8 @@ async fn test_brotli_encoding_large() { // read response let bytes = response.body().limit(320_000).await.unwrap(); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[cfg(feature = "openssl")] @@ -861,9 +911,9 @@ async fn test_brotli_encoding_large_openssl() { }); // body - let mut e = BrotliEncoder::new(Vec::new(), 3); - e.write_all(data.as_ref()).unwrap(); - let enc = e.finish().unwrap(); + let mut enc = BrotliEncoder::new(Vec::new(), 3); + enc.write_all(data.as_ref()).unwrap(); + let enc = enc.finish().unwrap(); // client request let mut response = srv @@ -877,6 +927,8 @@ async fn test_brotli_encoding_large_openssl() { // read response let bytes = response.body().await.unwrap(); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } #[cfg(feature = "rustls")] @@ -944,6 +996,8 @@ mod plus_rustls { let bytes = response.body().await.unwrap(); assert_eq!(bytes.len(), data.len()); assert_eq!(bytes, Bytes::from(data)); + + srv.stop().await; } } @@ -998,6 +1052,8 @@ async fn test_server_cookies() { assert_eq!(cookies[0], second_cookie); assert_eq!(cookies[1], first_cookie); } + + srv.stop().await; } #[actix_rt::test] @@ -1018,6 +1074,8 @@ async fn test_slow_request() { let mut data = String::new(); let _ = stream.read_to_string(&mut data); assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); + + srv.stop().await; } #[actix_rt::test] @@ -1101,4 +1159,6 @@ async fn test_accept_encoding_no_match() { .unwrap(); assert_eq!(response.status().as_u16(), 406); + + srv.stop().await; }