fix actix-http-test test server stop behavior

This commit is contained in:
Rob Ede 2021-11-15 03:28:18 +00:00
parent e96281cfc3
commit a4133bd377
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
6 changed files with 201 additions and 56 deletions

View File

@ -1,6 +1,7 @@
# Changes
## Unreleased - 2021-xx-xx
* `TestServer::stop` is now async and will wait for the server and system to shutdown. [#2442]
* Update `actix-server` to `2.0.0-beta.9`. [#2442]
* Minimum supported Rust version (MSRV) is now 1.52.

View File

@ -48,6 +48,7 @@ serde_json = "1.0"
slab = "0.4"
serde_urlencoded = "0.7"
tls-openssl = { version = "0.10.9", package = "openssl", optional = true }
tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies]
actix-web = { version = "4.0.0-beta.10", default-features = false, features = ["cookies"] }

View File

@ -7,7 +7,7 @@
#[cfg(feature = "openssl")]
extern crate tls_openssl as openssl;
use std::{net, sync::mpsc, thread, time::Duration};
use std::{net, thread, time::Duration};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::{net::TcpStream, System};
@ -19,6 +19,7 @@ use bytes::Bytes;
use futures_core::stream::Stream;
use http::Method;
use socket2::{Domain, Protocol, Socket, Type};
use tokio::sync::mpsc;
/// Start test server
///
@ -55,12 +56,13 @@ pub async fn test_server<F: ServiceFactory<TcpStream>>(factory: F) -> TestServer
test_server_with_addr(tcp, factory).await
}
/// Start [`test server`](test_server()) on a concrete Address
/// Start [`test server`](test_server()) on an existing address binding.
pub async fn test_server_with_addr<F: ServiceFactory<TcpStream>>(
tcp: net::TcpListener,
factory: F,
) -> TestServer {
let (tx, rx) = mpsc::channel();
let (started_tx, started_rx) = std::sync::mpsc::channel();
let (thread_stop_tx, thread_stop_rx) = mpsc::channel(1);
// run server in separate thread
thread::spawn(move || {
@ -68,59 +70,73 @@ pub async fn test_server_with_addr<F: ServiceFactory<TcpStream>>(
let local_addr = tcp.local_addr().unwrap();
let srv = Server::build()
.listen("test", tcp, factory)?
.workers(1)
.disable_signals();
.disable_signals()
.listen("test", tcp, factory)
.expect("test server could not be created");
sys.block_on(async {
tx.send((System::current(), local_addr)).unwrap();
srv.run().await.unwrap();
});
let srv = srv.run();
started_tx
.send((System::current(), srv.handle(), local_addr))
.unwrap();
sys.run()
// drive server loop
sys.block_on(srv).unwrap();
// start system event loop
sys.run().unwrap();
// notify TestServer that server and system have shut down
// all thread managed resources should be dropped at this point
let _ = thread_stop_tx.send(());
});
let (system, addr) = rx.recv().unwrap();
let (system, server, addr) = started_rx.recv().unwrap();
let client = {
#[cfg(feature = "openssl")]
let connector = {
#[cfg(feature = "openssl")]
{
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
Connector::new()
.conn_lifetime(Duration::from_secs(0))
.timeout(Duration::from_millis(30000))
.ssl(builder.build())
}
#[cfg(not(feature = "openssl"))]
{
Connector::new()
.conn_lifetime(Duration::from_secs(0))
.timeout(Duration::from_millis(30000))
}
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let _ = builder
.set_alpn_protos(b"\x02h2\x08http/1.1")
.map_err(|e| log::error!("Can not set alpn protocol: {:?}", e));
Connector::new()
.conn_lifetime(Duration::from_secs(0))
.timeout(Duration::from_millis(30000))
.ssl(builder.build())
};
#[cfg(not(feature = "openssl"))]
let connector = {
Connector::new()
.conn_lifetime(Duration::from_secs(0))
.timeout(Duration::from_millis(30000))
};
Client::builder().connector(connector).finish()
};
TestServer {
addr,
server,
client,
system,
addr,
thread_stop_rx,
}
}
/// Test server controller
pub struct TestServer {
server: actix_server::ServerHandle,
client: awc::Client,
system: actix_rt::System,
addr: net::SocketAddr,
client: Client,
system: System,
thread_stop_rx: mpsc::Receiver<()>,
}
impl TestServer {
@ -257,15 +273,32 @@ impl TestServer {
self.client.headers()
}
/// Stop HTTP server
fn stop(&mut self) {
/// Gracefully stop HTTP server.
///
/// Waits for spawned `Server` and `System` to shutdown gracefully.
pub async fn stop(&mut self) {
// signal server to stop
self.server.stop(true).await;
// also signal system to stop
// though this is handled by `ServerBuilder::exit_system` too
self.system.stop();
// wait for thread to be stopped but don't care about result
let _ = self.thread_stop_rx.recv().await;
}
}
impl Drop for TestServer {
fn drop(&mut self) {
self.stop()
// calls in this Drop impl should be enough to shut down the server, system, and thread
// without needing to await anything
// signal server to stop
let _ = self.server.stop(true);
// signal system to stop
self.system.stop();
}
}

View File

@ -24,7 +24,7 @@ use regex::Regex;
#[actix_rt::test]
async fn test_h1() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.keep_alive(KeepAlive::Disabled)
.client_timeout(1000)
@ -39,11 +39,13 @@ async fn test_h1() {
let response = srv.get("/").send().await.unwrap();
assert!(response.status().is_success());
srv.stop().await;
}
#[actix_rt::test]
async fn test_h1_2() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.keep_alive(KeepAlive::Disabled)
.client_timeout(1000)
@ -59,6 +61,8 @@ async fn test_h1_2() {
let response = srv.get("/").send().await.unwrap();
assert!(response.status().is_success());
srv.stop().await;
}
#[derive(Debug, Display, Error)]
@ -73,7 +77,7 @@ impl From<ExpectFailed> for Response<AnyBody> {
#[actix_rt::test]
async fn test_expect_continue() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.expect(fn_service(|req: Request| {
if req.head().uri.query() == Some("yes=") {
@ -98,11 +102,13 @@ async fn test_expect_continue() {
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\n"));
srv.stop().await;
}
#[actix_rt::test]
async fn test_expect_continue_h1() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.expect(fn_service(|req: Request| {
sleep(Duration::from_millis(20)).then(move |_| {
@ -129,6 +135,8 @@ async fn test_expect_continue_h1() {
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 100 Continue\r\n\r\nHTTP/1.1 200 OK\r\n"));
srv.stop().await;
}
#[actix_rt::test]
@ -136,7 +144,7 @@ async fn test_chunked_payload() {
let chunk_sizes = vec![32768, 32, 32768];
let total_size: usize = chunk_sizes.iter().sum();
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(fn_service(|mut request: Request| {
request
@ -188,11 +196,13 @@ async fn test_chunked_payload() {
};
assert_eq!(returned_size, total_size);
srv.stop().await;
}
#[actix_rt::test]
async fn test_slow_request() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.client_timeout(100)
.finish(|_| ok::<_, Infallible>(Response::ok()))
@ -205,11 +215,13 @@ async fn test_slow_request() {
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout"));
srv.stop().await;
}
#[actix_rt::test]
async fn test_http1_malformed_request() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp()
@ -221,11 +233,13 @@ async fn test_http1_malformed_request() {
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 400 Bad Request"));
srv.stop().await;
}
#[actix_rt::test]
async fn test_http1_keepalive() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp()
@ -242,11 +256,13 @@ async fn test_http1_keepalive() {
let mut data = vec![0; 1024];
let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
srv.stop().await;
}
#[actix_rt::test]
async fn test_http1_keepalive_timeout() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.keep_alive(1)
.h1(|_| ok::<_, Infallible>(Response::ok()))
@ -264,11 +280,13 @@ async fn test_http1_keepalive_timeout() {
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
srv.stop().await;
}
#[actix_rt::test]
async fn test_http1_keepalive_close() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp()
@ -285,11 +303,13 @@ async fn test_http1_keepalive_close() {
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
srv.stop().await;
}
#[actix_rt::test]
async fn test_http10_keepalive_default_close() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp()
@ -305,11 +325,13 @@ async fn test_http10_keepalive_default_close() {
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
srv.stop().await;
}
#[actix_rt::test]
async fn test_http10_keepalive() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp()
@ -332,11 +354,13 @@ async fn test_http10_keepalive() {
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
srv.stop().await;
}
#[actix_rt::test]
async fn test_http1_keepalive_disabled() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.keep_alive(KeepAlive::Disabled)
.h1(|_| ok::<_, Infallible>(Response::ok()))
@ -353,6 +377,8 @@ async fn test_http1_keepalive_disabled() {
let mut data = vec![0; 1024];
let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0);
srv.stop().await;
}
#[actix_rt::test]
@ -362,7 +388,7 @@ async fn test_content_length() {
StatusCode,
};
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|req: Request| {
let indx: usize = req.uri().path()[1..].parse().unwrap();
@ -400,6 +426,8 @@ async fn test_content_length() {
assert_eq!(response.headers().get(&header), Some(&value));
}
}
srv.stop().await;
}
#[actix_rt::test]
@ -439,6 +467,8 @@ async fn test_h1_headers() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert_eq!(bytes, Bytes::from(data2));
srv.stop().await;
}
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
@ -478,6 +508,8 @@ async fn test_h1_body() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -503,6 +535,8 @@ async fn test_h1_head_empty() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert!(bytes.is_empty());
srv.stop().await;
}
#[actix_rt::test]
@ -528,11 +562,13 @@ async fn test_h1_head_binary() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert!(bytes.is_empty());
srv.stop().await;
}
#[actix_rt::test]
async fn test_h1_head_binary2() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
.tcp()
@ -549,6 +585,8 @@ async fn test_h1_head_binary2() {
.unwrap();
assert_eq!(format!("{}", STR.len()), len.to_str().unwrap());
}
srv.stop().await;
}
#[actix_rt::test]
@ -571,6 +609,8 @@ async fn test_h1_body_length() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -606,6 +646,8 @@ async fn test_h1_body_chunked_explicit() {
// decode
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -635,6 +677,8 @@ async fn test_h1_body_chunked_implicit() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -662,6 +706,8 @@ async fn test_h1_response_http_error_handling() {
bytes,
Bytes::from_static(b"error processing HTTP: failed to parse header value")
);
srv.stop().await;
}
#[derive(Debug, Display, Error)]
@ -689,11 +735,13 @@ async fn test_h1_service_error() {
// read response
let bytes = srv.load_body(response).await.unwrap();
assert_eq!(bytes, Bytes::from_static(b"error"));
srv.stop().await;
}
#[actix_rt::test]
async fn test_h1_on_connect() {
let srv = test_server(|| {
let mut srv = test_server(|| {
HttpService::build()
.on_connect_ext(|_, data| {
data.insert(20isize);
@ -708,4 +756,6 @@ async fn test_h1_on_connect() {
let response = srv.get("/").send().await.unwrap();
assert!(response.status().is_success());
srv.stop().await;
}

View File

@ -288,7 +288,7 @@ where
.send((System::current(), srv.handle(), local_addr))
.unwrap();
//
// drive server loop
sys.block_on(srv).unwrap();
// start system event loop

View File

@ -127,6 +127,8 @@ async fn test_body() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -154,6 +156,8 @@ async fn test_body_gzip() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -181,6 +185,8 @@ async fn test_body_gzip2() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -241,6 +247,8 @@ async fn test_body_encoding_override() {
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -275,6 +283,8 @@ async fn test_body_gzip_large() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -314,6 +324,8 @@ async fn test_body_gzip_large_random() {
e.read_to_end(&mut dec).unwrap();
assert_eq!(dec.len(), data.len());
assert_eq!(Bytes::from(dec), Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -348,6 +360,8 @@ async fn test_body_chunked_implicit() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -380,6 +394,8 @@ async fn test_body_br_streaming() {
let dec = e.finish().unwrap();
println!("T: {:?}", Bytes::copy_from_slice(&dec));
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -401,6 +417,8 @@ async fn test_head_binary() {
// read response
let bytes = response.body().await.unwrap();
assert!(bytes.is_empty());
srv.stop().await;
}
#[actix_rt::test]
@ -420,6 +438,8 @@ async fn test_no_chunking() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -447,6 +467,8 @@ async fn test_body_deflate() {
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -475,6 +497,8 @@ async fn test_body_brotli() {
e.write_all(bytes.as_ref()).unwrap();
let dec = e.finish().unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -503,6 +527,8 @@ async fn test_body_zstd() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -534,6 +560,8 @@ async fn test_body_zstd_streaming() {
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -559,6 +587,8 @@ async fn test_zstd_encoding() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -594,6 +624,8 @@ async fn test_zstd_encoding_large() {
// read response
let bytes = response.body().limit(320_000).await.unwrap();
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -619,6 +651,8 @@ async fn test_encoding() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -644,6 +678,8 @@ async fn test_gzip_encoding() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -670,6 +706,8 @@ async fn test_gzip_encoding_large() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -702,6 +740,8 @@ async fn test_reading_gzip_encoding_large_random() {
let bytes = response.body().await.unwrap();
assert_eq!(bytes.len(), data.len());
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -727,6 +767,8 @@ async fn test_reading_deflate_encoding() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -753,6 +795,8 @@ async fn test_reading_deflate_encoding_large() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -785,6 +829,8 @@ async fn test_reading_deflate_encoding_large_random() {
let bytes = response.body().await.unwrap();
assert_eq!(bytes.len(), data.len());
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[actix_rt::test]
@ -810,6 +856,8 @@ async fn test_brotli_encoding() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
srv.stop().await;
}
#[actix_rt::test]
@ -845,6 +893,8 @@ async fn test_brotli_encoding_large() {
// read response
let bytes = response.body().limit(320_000).await.unwrap();
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[cfg(feature = "openssl")]
@ -861,9 +911,9 @@ async fn test_brotli_encoding_large_openssl() {
});
// body
let mut e = BrotliEncoder::new(Vec::new(), 3);
e.write_all(data.as_ref()).unwrap();
let enc = e.finish().unwrap();
let mut enc = BrotliEncoder::new(Vec::new(), 3);
enc.write_all(data.as_ref()).unwrap();
let enc = enc.finish().unwrap();
// client request
let mut response = srv
@ -877,6 +927,8 @@ async fn test_brotli_encoding_large_openssl() {
// read response
let bytes = response.body().await.unwrap();
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
#[cfg(feature = "rustls")]
@ -944,6 +996,8 @@ mod plus_rustls {
let bytes = response.body().await.unwrap();
assert_eq!(bytes.len(), data.len());
assert_eq!(bytes, Bytes::from(data));
srv.stop().await;
}
}
@ -998,6 +1052,8 @@ async fn test_server_cookies() {
assert_eq!(cookies[0], second_cookie);
assert_eq!(cookies[1], first_cookie);
}
srv.stop().await;
}
#[actix_rt::test]
@ -1018,6 +1074,8 @@ async fn test_slow_request() {
let mut data = String::new();
let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout"));
srv.stop().await;
}
#[actix_rt::test]
@ -1101,4 +1159,6 @@ async fn test_accept_encoding_no_match() {
.unwrap();
assert_eq!(response.status().as_u16(), 406);
srv.stop().await;
}