diff --git a/Cargo.toml b/Cargo.toml index 152282207..91af80143 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ __compress = [] actix-codec = "0.4.0" actix-macros = "0.2.3" actix-rt = "2.2" -actix-server = "2.0.0-beta.3" +actix-server = "2.0.0-beta.8" actix-service = "2.0.0" actix-utils = "3.0.0" actix-tls = { version = "3.0.0-beta.7", default-features = false, optional = true } @@ -117,6 +117,7 @@ rcgen = "0.8" rustls-pemfile = "0.2" tls-openssl = { package = "openssl", version = "0.10.9" } tls-rustls = { package = "rustls", version = "0.20.0" } +tokio = { version = "1.2", features = ["time"] } zstd = "0.7" [profile.dev] @@ -140,6 +141,8 @@ actix-web-actors = { path = "actix-web-actors" } actix-web-codegen = { path = "actix-web-codegen" } awc = { path = "awc" } +actix-server = { path = "../actix-net/actix-server" } + [[test]] name = "test_server" required-features = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"] diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index d3fc8a47f..17832b7a0 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -34,13 +34,13 @@ actix-codec = "0.4.0" actix-tls = "3.0.0-beta.7" actix-utils = "3.0.0" actix-rt = "2.2" -actix-server = "2.0.0-beta.3" +actix-server = "2.0.0-beta.8" awc = { version = "3.0.0-beta.9", default-features = false } base64 = "0.13" bytes = "1" futures-core = { version = "0.3.7", default-features = false } -http = "0.2.2" +http = "0.2.5" log = "0.4" socket2 = "0.4" serde = "1.0" diff --git a/actix-http-test/src/lib.rs b/actix-http-test/src/lib.rs index c7b083b5e..85d2d578b 100644 --- a/actix-http-test/src/lib.rs +++ b/actix-http-test/src/lib.rs @@ -73,8 +73,8 @@ pub async fn test_server_with_addr>( .disable_signals(); sys.block_on(async { - srv.run(); tx.send((System::current(), local_addr)).unwrap(); + srv.run().await.unwrap(); }); sys.run() diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 7d39e000a..61484baab 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -57,7 +57,7 @@ encoding_rs = "0.8" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc", "sink"] } h2 = "0.3.1" -http = "0.2.2" +http = "0.2.5" httparse = "1.5.1" httpdate = "1.0.1" itoa = "0.4" @@ -82,7 +82,7 @@ flate2 = { version = "1.0.13", optional = true } zstd = { version = "0.7", optional = true } [dev-dependencies] -actix-server = "2.0.0-beta.3" +actix-server = "2.0.0-beta.8" actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] } actix-tls = { version = "3.0.0-beta.7", features = ["openssl"] } async-stream = "0.3" @@ -95,6 +95,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tls-openssl = { package = "openssl", version = "0.10.9" } tls-rustls = { package = "rustls", version = "0.20.0" } +tokio = { version = "1.2", features = ["sync", "rt-multi-thread", "macros"] } [[example]] name = "ws" diff --git a/actix-http/tests/test_h2_ping_pong.rs b/actix-http/tests/test_h2_ping_pong.rs index 5e03785a1..d4ff2fbe8 100644 --- a/actix-http/tests/test_h2_ping_pong.rs +++ b/actix-http/tests/test_h2_ping_pong.rs @@ -13,7 +13,7 @@ async fn h2_ping_pong() -> io::Result<()> { let join = std::thread::spawn(move || { actix_rt::System::new().block_on(async move { - let handle = Server::build() + let srv = Server::build() .disable_signals() .workers(1) .listen("h2_ping_pong", lst, || { @@ -24,9 +24,9 @@ async fn h2_ping_pong() -> io::Result<()> { })? .run(); - tx.send(handle.clone()).unwrap(); + tx.send(srv.handle().clone()).unwrap(); - handle.await + srv.await }) }); diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index a58d0cc70..0eaaabcc7 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -8,7 +8,7 @@ use actix_http::{ body::{AnyBody, Body, SizedStream}, error::PayloadError, http::{ - header::{self, HeaderName, HeaderValue}, + header::{self, HeaderValue}, Method, StatusCode, Version, }, Error, HttpMessage, HttpService, Request, Response, @@ -143,38 +143,25 @@ async fn test_h2_content_length() { }) .await; - let header = HeaderName::from_static("content-length"); - let value = HeaderValue::from_static("0"); + static VALUE: HeaderValue = HeaderValue::from_static("0"); { - for &i in &[0] { - let req = srv - .request(Method::HEAD, srv.surl(&format!("/{}", i))) - .send(); - let _response = req.await.expect_err("should timeout on recv 1xx frame"); - // assert_eq!(response.headers().get(&header), None); + let req = srv.request(Method::HEAD, srv.surl("/0")).send(); + req.await.expect_err("should timeout on recv 1xx frame"); - let req = srv - .request(Method::GET, srv.surl(&format!("/{}", i))) - .send(); - let _response = req.await.expect_err("should timeout on recv 1xx frame"); - // assert_eq!(response.headers().get(&header), None); - } + let req = srv.request(Method::GET, srv.surl("/0")).send(); + req.await.expect_err("should timeout on recv 1xx frame"); - for &i in &[1] { - let req = srv - .request(Method::GET, srv.surl(&format!("/{}", i))) - .send(); - let response = req.await.unwrap(); - assert_eq!(response.headers().get(&header), None); - } + let req = srv.request(Method::GET, srv.surl("/1")).send(); + let response = req.await.unwrap(); + assert!(response.headers().get("content-length").is_none()); for &i in &[2, 3] { let req = srv .request(Method::GET, srv.surl(&format!("/{}", i))) .send(); let response = req.await.unwrap(); - assert_eq!(response.headers().get(&header), Some(&value)); + assert_eq!(response.headers().get("content-length"), Some(&VALUE)); } } } diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index 924ef49ad..a9f6e99f8 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -26,10 +26,7 @@ use bytes::{Bytes, BytesMut}; use derive_more::{Display, Error}; use futures_core::Stream; use futures_util::stream::{once, StreamExt as _}; -use rustls::{ - Certificate, OwnedTrustAnchor, PrivateKey, RootCertStore, - ServerConfig as RustlsServerConfig, ServerName, -}; +use rustls::{Certificate, PrivateKey, ServerConfig as RustlsServerConfig, ServerName}; use rustls_pemfile::{certs, pkcs8_private_keys}; async fn load_body(mut stream: S) -> Result diff --git a/actix-router/Cargo.toml b/actix-router/Cargo.toml index e32f0edd6..b95bca505 100644 --- a/actix-router/Cargo.toml +++ b/actix-router/Cargo.toml @@ -30,7 +30,7 @@ serde = "1" [dev-dependencies] criterion = { version = "0.3", features = ["html_reports"] } firestorm = { version = "0.4", features = ["enable_system_time"] } -http = "0.2.3" +http = "0.2.5" serde = { version = "1", features = ["derive"] } [[bench]] diff --git a/actix-test/Cargo.toml b/actix-test/Cargo.toml index 002e7662e..e375ec10c 100644 --- a/actix-test/Cargo.toml +++ b/actix-test/Cargo.toml @@ -45,3 +45,4 @@ serde_json = "1" serde_urlencoded = "0.7" tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tls-rustls = { package = "rustls", version = "0.20.0", optional = true } +tokio = { version = "1.2", features = ["sync"] } diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index 23a7eeba1..e3b2a39e8 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -31,7 +31,7 @@ extern crate tls_openssl as openssl; #[cfg(feature = "rustls")] extern crate tls_rustls as rustls; -use std::{error::Error as StdError, fmt, net, sync::mpsc, thread, time}; +use std::{error::Error as StdError, fmt, net, thread, time::Duration}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; pub use actix_http::test::TestBuffer; @@ -41,8 +41,9 @@ use actix_http::{ }; use actix_service::{map_config, IntoServiceFactory, ServiceFactory, ServiceFactoryExt as _}; use actix_web::{ - dev::{AppConfig, MessageBody, Server, Service}, - rt, web, Error, + dev::{AppConfig, MessageBody, Server, ServerHandle, Service}, + rt::{self, System}, + web, Error, }; use awc::{error::PayloadError, Client, ClientRequest, ClientResponse, Connector}; use futures_core::Stream; @@ -52,6 +53,7 @@ pub use actix_web::test::{ call_service, default_service, init_service, load_stream, ok_service, read_body, read_body_json, read_response, read_response_json, TestRequest, }; +use tokio::sync::mpsc; /// Start default [`TestServer`]. /// @@ -128,7 +130,11 @@ where B: MessageBody + 'static, B::Error: Into>, { - let (tx, rx) = mpsc::channel(); + // for sending handles and server info back from the spawned thread + let (started_tx, started_rx) = std::sync::mpsc::channel(); + + // for signaling the shutdown of spawned server and system + let (thread_stop_tx, thread_stop_rx) = mpsc::channel(1); let tls = match cfg.stream { StreamType::Tcp => false, @@ -138,7 +144,7 @@ where StreamType::Rustls(_) => true, }; - // run server in separate thread + // run server in separate orphaned thread thread::spawn(move || { let sys = rt::System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); @@ -146,7 +152,7 @@ where let factory = factory.clone(); let srv_cfg = cfg.clone(); let timeout = cfg.client_timeout; - let builder = Server::build().workers(1).disable_signals(); + let builder = Server::build().workers(1).disable_signals().system_exit(); let srv = match srv_cfg.stream { StreamType::Tcp => match srv_cfg.tp { @@ -275,17 +281,25 @@ where }), }, } - .unwrap(); + .expect("test server could not be created"); - sys.block_on(async { - let srv = srv.run(); - tx.send((rt::System::current(), srv, local_addr)).unwrap(); - }); + let srv = srv.run(); + started_tx + .send((System::current(), srv.handle(), local_addr)) + .unwrap(); - sys.run() + // + sys.block_on(srv).unwrap(); + + // start system event loop + sys.run().unwrap(); + + // notify TestServer that server and system have shut down + // all thread managed resources should be dropped at this point + let _ = thread_stop_tx.send(()); }); - let (system, server, addr) = rx.recv().unwrap(); + let (system, server, addr) = started_rx.recv().unwrap(); let client = { let connector = { @@ -299,15 +313,15 @@ where .set_alpn_protos(b"\x02h2\x08http/1.1") .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); Connector::new() - .conn_lifetime(time::Duration::from_secs(0)) - .timeout(time::Duration::from_millis(30000)) + .conn_lifetime(Duration::from_secs(0)) + .timeout(Duration::from_millis(30000)) .ssl(builder.build()) } #[cfg(not(feature = "openssl"))] { Connector::new() - .conn_lifetime(time::Duration::from_secs(0)) - .timeout(time::Duration::from_millis(30000)) + .conn_lifetime(Duration::from_secs(0)) + .timeout(Duration::from_millis(30000)) } }; @@ -315,11 +329,12 @@ where }; TestServer { - addr, + server, + thread_stop_rx, client, system, + addr, tls, - server, } } @@ -405,11 +420,12 @@ impl TestServerConfig { /// /// See [`start`] for usage example. pub struct TestServer { - addr: net::SocketAddr, + server: ServerHandle, + thread_stop_rx: mpsc::Receiver<()>, client: awc::Client, system: rt::System, + addr: net::SocketAddr, tls: bool, - server: Server, } impl TestServer { @@ -505,15 +521,30 @@ impl TestServer { } /// Gracefully stop HTTP server. - pub async fn stop(self) { + /// + /// Waits for spawned `Server` and `System` to shutdown gracefully. + pub async fn stop(mut self) { + // signal server to stop self.server.stop(true).await; + + // also signal system to stop + // though this is handled by `ServerBuilder::exit_system` too self.system.stop(); - rt::time::sleep(time::Duration::from_millis(100)).await; + + // wait for thread to be stopped but don't care about result + let _ = self.thread_stop_rx.recv().await; } } impl Drop for TestServer { fn drop(&mut self) { - self.system.stop() + // calls in this Drop impl should be enough to shut down the server, system, and thread + // without needing to await anything + + // signal server to stop + let _ = self.server.stop(true); + + // signal system to stop + self.system.stop(); } } diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 6eeb9ce51..b9e47f509 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -68,7 +68,7 @@ derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false } h2 = "0.3" -http = "0.2" +http = "0.2.5" itoa = "0.4" log =" 0.4" mime = "0.3" @@ -92,7 +92,7 @@ actix-web = { version = "4.0.0-beta.10", features = ["openssl"] } actix-http = { version = "3.0.0-beta.11", features = ["openssl"] } actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] } actix-utils = "3.0.0" -actix-server = "2.0.0-beta.3" +actix-server = "2.0.0-beta.8" actix-tls = { version = "3.0.0-beta.7", features = ["openssl", "rustls"] } actix-test = { version = "0.1.0-beta.5", features = ["openssl", "rustls"] } diff --git a/awc/src/client/connection.rs b/awc/src/client/connection.rs index 97b96fc0a..6bbc9ad07 100644 --- a/awc/src/client/connection.rs +++ b/awc/src/client/connection.rs @@ -173,6 +173,7 @@ impl H2ConnectionInner { /// Cancel spawned connection task on drop. impl Drop for H2ConnectionInner { fn drop(&mut self) { + // TODO: this can end up sending extraneous requests; see if there is a better way to handle if self .sender .send_request(http::Request::new(()), true) @@ -183,8 +184,8 @@ impl Drop for H2ConnectionInner { } } +/// Unified connection type cover HTTP/1 Plain/TLS and HTTP/2 protocols. #[allow(dead_code)] -/// Unified connection type cover Http1 Plain/Tls and Http2 protocols pub enum Connection> where A: ConnectionIo, diff --git a/awc/tests/test_rustls_client.rs b/awc/tests/test_rustls_client.rs index c075a6090..355fcb6fb 100644 --- a/awc/tests/test_rustls_client.rs +++ b/awc/tests/test_rustls_client.rs @@ -19,8 +19,7 @@ use actix_utils::future::ok; use actix_web::{dev::AppConfig, http::Version, web, App, HttpResponse}; use rustls::{ client::{ServerCertVerified, ServerCertVerifier}, - Certificate, ClientConfig, OwnedTrustAnchor, PrivateKey, RootCertStore, ServerConfig, - ServerName, + Certificate, ClientConfig, PrivateKey, ServerConfig, ServerName, }; use rustls_pemfile::{certs, pkcs8_private_keys}; diff --git a/examples/on_connect.rs b/examples/on_connect.rs index 24ac86c6b..9709835e6 100644 --- a/examples/on_connect.rs +++ b/examples/on_connect.rs @@ -8,6 +8,7 @@ use std::{any::Any, io, net::SocketAddr}; use actix_web::{dev::Extensions, rt::net::TcpStream, web, App, HttpServer}; +#[allow(dead_code)] #[derive(Debug, Clone)] struct ConnectionInfo { bind: SocketAddr, diff --git a/src/dev.rs b/src/dev.rs index be3af86a8..4fac207a7 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -20,7 +20,7 @@ pub use actix_http::body::{AnyBody, Body, BodySize, MessageBody, ResponseBody, S pub use actix_http::encoding::Decoder as Decompress; pub use actix_http::{Extensions, Payload, PayloadStream, RequestHead, Response, ResponseHead}; pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; -pub use actix_server::Server; +pub use actix_server::{Server, ServerHandle}; pub use actix_service::{ always_ready, fn_factory, fn_service, forward_ready, Service, ServiceFactory, Transform, }; diff --git a/src/server.rs b/src/server.rs index f15183f85..0f3d7c59a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -159,7 +159,7 @@ where /// /// By default max connections is set to a 25k. pub fn max_connections(mut self, num: usize) -> Self { - self.builder = self.builder.maxconn(num); + self.builder = self.builder.max_concurrent_connections(num); self } @@ -233,7 +233,7 @@ where self } - /// Stop actix system. + /// Stop Actix `System` after server shutdown. pub fn system_exit(mut self) -> Self { self.builder = self.builder.system_exit(); self diff --git a/tests/test_httpserver.rs b/tests/test_httpserver.rs index 881c6ce94..887b51d41 100644 --- a/tests/test_httpserver.rs +++ b/tests/test_httpserver.rs @@ -14,57 +14,45 @@ async fn test_start() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let sys = actix_rt::System::new(); + actix_rt::System::new() + .block_on(async { + let srv = HttpServer::new(|| { + App::new().service( + web::resource("/").route(web::to(|| HttpResponse::Ok().body("test"))), + ) + }) + .workers(1) + .backlog(1) + .max_connections(10) + .max_connection_rate(10) + .keep_alive(10) + .client_timeout(5000) + .client_shutdown(0) + .server_hostname("localhost") + .system_exit() + .disable_signals() + .bind(format!("{}", addr)) + .unwrap() + .run(); - sys.block_on(async { - let srv = HttpServer::new(|| { - App::new().service( - web::resource("/").route(web::to(|| HttpResponse::Ok().body("test"))), - ) + tx.send(srv.handle()).unwrap(); + + srv.await }) - .workers(1) - .backlog(1) - .max_connections(10) - .max_connection_rate(10) - .keep_alive(10) - .client_timeout(5000) - .client_shutdown(0) - .server_hostname("localhost") - .system_exit() - .disable_signals() - .bind(format!("{}", addr)) - .unwrap() - .run(); - - let _ = tx.send((srv, actix_rt::System::current())); - }); - - let _ = sys.run(); + .unwrap(); }); - let (srv, sys) = rx.recv().unwrap(); - #[cfg(feature = "client")] - { - use actix_http::client; + let srv = rx.recv().unwrap(); - let client = awc::Client::builder() - .connector( - client::Connector::new() - .timeout(Duration::from_millis(100)) - .finish(), - ) - .finish(); + let client = awc::Client::builder() + .connector(awc::Connector::new().timeout(Duration::from_millis(100))) + .finish(); - let host = format!("http://{}", addr); - let response = client.get(host.clone()).send().await.unwrap(); - assert!(response.status().is_success()); - } + let host = format!("http://{}", addr); + let response = client.get(host.clone()).send().await.unwrap(); + assert!(response.status().is_success()); - // stop - let _ = srv.stop(false); - - thread::sleep(Duration::from_millis(100)); - let _ = sys.stop(); + srv.stop(false).await; } #[cfg(feature = "openssl")] @@ -92,37 +80,38 @@ fn ssl_acceptor() -> openssl::ssl::SslAcceptorBuilder { #[cfg(feature = "openssl")] async fn test_start_ssl() { use actix_web::HttpRequest; + use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; let addr = actix_test::unused_addr(); let (tx, rx) = mpsc::channel(); thread::spawn(move || { - let sys = actix_rt::System::new(); - let builder = ssl_acceptor(); + actix_rt::System::new() + .block_on(async { + let builder = ssl_acceptor(); - let srv = HttpServer::new(|| { - App::new().service(web::resource("/").route(web::to(|req: HttpRequest| { - assert!(req.app_config().secure()); - HttpResponse::Ok().body("test") - }))) - }) - .workers(1) - .shutdown_timeout(1) - .system_exit() - .disable_signals() - .bind_openssl(format!("{}", addr), builder) - .unwrap(); + let srv = HttpServer::new(|| { + App::new().service(web::resource("/").route(web::to(|req: HttpRequest| { + assert!(req.app_config().secure()); + HttpResponse::Ok().body("test") + }))) + }) + .workers(1) + .shutdown_timeout(1) + .system_exit() + .disable_signals() + .bind_openssl(format!("{}", addr), builder) + .unwrap(); - sys.block_on(async { - let srv = srv.run(); - let _ = tx.send((srv, actix_rt::System::current())); - }); + let srv = srv.run(); + tx.send(srv.handle()).unwrap(); - let _ = sys.run(); + srv.await + }) + .unwrap() }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); - use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); let _ = builder @@ -141,9 +130,5 @@ async fn test_start_ssl() { let response = client.get(host.clone()).send().await.unwrap(); assert!(response.status().is_success()); - // stop - let _ = srv.stop(false); - - thread::sleep(Duration::from_millis(100)); - let _ = sys.stop(); + srv.stop(false).await; } diff --git a/tests/test_server.rs b/tests/test_server.rs index ff6f5ae5e..b2a59e84c 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1030,6 +1030,8 @@ async fn test_normalize() { let response = srv.get("/one/").send().await.unwrap(); assert!(response.status().is_success()); + + srv.stop().await } // allow deprecated App::data