From 8e98d9168c8db1201e4fe8d4f04c15dac4febeda Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 15 Apr 2021 10:49:43 -0700 Subject: [PATCH 1/4] add test for restart worker thread (#328) --- actix-server/src/worker.rs | 3 + actix-server/tests/test_server.rs | 170 +++++++++++++++++++++++++++--- 2 files changed, 157 insertions(+), 16 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 801467f8..65951345 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -125,6 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { + // UnboundedReceiver should always be the first field. + // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Box<[WorkerService]>, @@ -370,6 +372,7 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 40b07e1c..3af072bb 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,7 +1,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; -use std::{net, thread, time}; +use std::{net, thread, time::Duration}; +use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; use actix_utils::future::ok; @@ -37,7 +38,7 @@ fn test_bind() { }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -64,7 +65,7 @@ fn test_listen() { }); let sys = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -73,11 +74,11 @@ fn test_listen() { #[test] #[cfg(unix)] fn test_start() { + use std::io::Read; + use actix_codec::{BytesCodec, Framed}; - use actix_rt::net::TcpStream; use bytes::Bytes; use futures_util::sink::SinkExt; - use std::io::Read; let addr = unused_addr(); let (tx, rx) = mpsc::channel(); @@ -112,16 +113,16 @@ fn test_start() { // pause let _ = srv.pause(); - thread::sleep(time::Duration::from_millis(200)); + thread::sleep(Duration::from_millis(200)); let mut conn = net::TcpStream::connect(addr).unwrap(); - conn.set_read_timeout(Some(time::Duration::from_millis(100))) + conn.set_read_timeout(Some(Duration::from_millis(100))) .unwrap(); let res = conn.read_exact(&mut buf); assert!(res.is_err()); // resume let _ = srv.resume(); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); @@ -133,10 +134,10 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); sys.stop(); let _ = h.join(); } @@ -182,7 +183,7 @@ fn test_configure() { let _ = sys.run(); }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); @@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() { // The limit test on the other hand is only for concurrent tcp stream limiting a work // thread accept. - use actix_rt::net::TcpStream; use tokio::io::AsyncWriteExt; let addr = unused_addr(); @@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() { let counter = counter.clone(); async move { counter.fetch_add(1, Ordering::SeqCst); - actix_rt::time::sleep(time::Duration::from_secs(20)).await; + sleep(Duration::from_secs(20)).await; counter.fetch_sub(1, Ordering::SeqCst); Ok::<(), ()>(()) } @@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() { conns.push(conn); } - actix_rt::time::sleep(time::Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; // counter would remain at 3 even with 12 successful connection. // and 9 of them remain in backlog. @@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() { #[actix_rt::test] async fn test_service_restart() { use std::task::{Context, Poll}; - use std::time::Duration; - use actix_rt::{net::TcpStream, time::sleep}; use actix_service::{fn_factory, Service}; use futures_core::future::LocalBoxFuture; use tokio::io::AsyncWriteExt; @@ -438,3 +436,143 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +} From ef206f40fb95ccb1d1d5f34a8f0e2950c0b8ff4e Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 20:13:27 +0100 Subject: [PATCH 2/4] update ignored service docs to new traits --- actix-service/src/lib.rs | 18 +++++--- actix-service/src/transform.rs | 75 ++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 21 deletions(-) diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index cc82bfa6..3db2bcc5 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -53,8 +53,14 @@ use self::ready::{err, ok, ready, Ready}; /// async fn(Request) -> Result /// ``` /// -/// The `Service` trait just generalizes this form where each parameter is described as an -/// associated type on the trait. Services can also have mutable state that influence computation. +/// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter +/// and responses and other details are defined as associated types on the trait impl. Notice that +/// this design means that services can receive many request types and converge them to a single +/// response type. +/// +/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell` +/// or `Mutex`. Services intentionally do not take `&mut self` to reduce over-head in the +/// common cases. /// /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent /// both clients and servers. Services describe only _transformation_ operations which encourage @@ -64,8 +70,7 @@ use self::ready::{err, ok, ready, Ready}; /// ```ignore /// struct MyService; /// -/// impl Service for MyService { -/// type Request = u8; +/// impl Service for MyService { /// type Response = u64; /// type Error = MyError; /// type Future = Pin>>>; @@ -81,6 +86,9 @@ use self::ready::{err, ok, ready, Ready}; /// /// ```ignore /// async fn my_service(req: u8) -> Result; +/// +/// let svc = fn_service(my_service) +/// svc.call(123) /// ``` pub trait Service { /// Responses given by the service. @@ -144,7 +152,7 @@ pub trait ServiceFactory { /// Errors potentially raised while building a service. type InitError; - /// The future of the `Service` instance. + /// The future of the `Service` instance.g type Future: Future>; /// Create and return a new service asynchronously. diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index b0abe72b..8fdff66f 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -27,7 +27,7 @@ where /// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in /// the request/response lifecycle. It may modify request and/or response. /// -/// For example, timeout transform: +/// For example, a timeout service wrapper: /// /// ```ignore /// pub struct Timeout { @@ -35,11 +35,7 @@ where /// timeout: Duration, /// } /// -/// impl Service for Timeout -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Service for Timeout { /// type Response = S::Response; /// type Error = TimeoutError; /// type Future = TimeoutServiceResponse; @@ -55,26 +51,22 @@ where /// } /// ``` /// -/// Timeout service in above example is decoupled from underlying service implementation and could -/// be applied to any service. +/// This wrapper service is decoupled from the underlying service implementation and could be +/// applied to any service. /// -/// The `Transform` trait defines the interface of a Service factory. `Transform` is often +/// The `Transform` trait defines the interface of a service wrapper. `Transform` is often /// implemented for middleware, defining how to construct a middleware Service. A Service that is /// constructed by the factory takes the Service that follows it during execution as a parameter, /// assuming ownership of the next Service. /// -/// Factory for `Timeout` middleware from the above example could look like this: +/// A transform for the `Timeout` middleware could look like this: /// /// ```ignore /// pub struct TimeoutTransform { /// timeout: Duration, /// } /// -/// impl Transform for TimeoutTransform -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Transform for TimeoutTransform { /// type Response = S::Response; /// type Error = TimeoutError; /// type InitError = S::Error; @@ -82,7 +74,7 @@ where /// type Future = Ready>; /// /// fn new_transform(&self, service: S) -> Self::Future { -/// ready(Ok(TimeoutService { +/// ready(Ok(Timeout { /// service, /// timeout: self.timeout, /// })) @@ -227,3 +219,54 @@ where } } } + +#[cfg(test)] +mod tests { + use core::{ + future::{ready, Ready}, + time::Duration, + }; + + use super::*; + use crate::Service; + + // pseudo-doctest for Transform trait + pub struct TimeoutTransform { + timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Transform for TimeoutTransform { + type Response = S::Response; + type Error = S::Error; + type InitError = S::Error; + type Transform = Timeout; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(Timeout { + service, + _timeout: self.timeout, + })) + } + } + + // pseudo-doctest for Transform trait + pub struct Timeout { + service: S, + _timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Service for Timeout { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + crate::forward_ready!(service); + + fn call(&self, req: Req) -> Self::Future { + self.service.call(req) + } + } +} From 4e6d88d1438c05d0ed90cce214a6970a1ed3e2c5 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 20:43:02 +0100 Subject: [PATCH 3/4] improve boxed service docs --- actix-service/Cargo.toml | 2 ++ actix-service/src/boxed.rs | 34 ++++++++++++++++-------------- actix-service/src/lib.rs | 38 +++++++++++++++++----------------- actix-service/src/transform.rs | 7 +++---- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 84a0c172..1c82f703 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -22,8 +22,10 @@ path = "src/lib.rs" [dependencies] futures-core = { version = "0.3.7", default-features = false } +paste = "1" pin-project-lite = "0.2" [dev-dependencies] actix-rt = "2.0.0" +actix-utils = "3.0.0-beta.4" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index a872ca9f..3141c5e4 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -3,26 +3,30 @@ use alloc::{boxed::Box, rc::Rc}; use core::{future::Future, pin::Pin}; +use paste::paste; + use crate::{Service, ServiceFactory}; -/// A boxed future without a Send bound or lifetime parameters. +/// A boxed future with no send bound or lifetime parameters. pub type BoxFuture = Pin>>; macro_rules! service_object { ($name: ident, $type: tt, $fn_name: ident) => { - /// Type alias for service trait object. - pub type $name = $type< - dyn Service>>, - >; + paste! { + #[doc = "Type alias for service trait object using `" $type "`."] + pub type $name = $type< + dyn Service>>, + >; - /// Create service trait object. - pub fn $fn_name(service: S) -> $name - where - S: Service + 'static, - Req: 'static, - S::Future: 'static, - { - $type::new(ServiceWrapper::new(service)) + #[doc = "Wraps service as a trait object using [`" $name "`]."] + pub fn $fn_name(service: S) -> $name + where + S: Service + 'static, + Req: 'static, + S::Future: 'static, + { + $type::new(ServiceWrapper::new(service)) + } } }; } @@ -56,10 +60,10 @@ where } } -/// Wrapper for a service factory trait object that will produce a boxed trait object service. +/// Wrapper for a service factory that will map it's services to boxed trait object services. pub struct BoxServiceFactory(Inner); -/// Create service factory trait object. +/// Wraps a service factory that returns service trait objects. pub fn factory( factory: SF, ) -> BoxServiceFactory diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 3db2bcc5..e83323a3 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -73,7 +73,7 @@ use self::ready::{err, ok, ready, Ready}; /// impl Service for MyService { /// type Response = u64; /// type Error = MyError; -/// type Future = Pin>>>; +/// type Future = Pin>>>; /// /// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { ... } /// @@ -82,7 +82,7 @@ use self::ready::{err, ok, ready, Ready}; /// ``` /// /// Sometimes it is not necessary to implement the Service trait. For example, the above service -/// could be rewritten as a simple function and passed to [fn_service](fn_service()). +/// could be rewritten as a simple function and passed to [`fn_service`](fn_service()). /// /// ```ignore /// async fn my_service(req: u8) -> Result; @@ -102,13 +102,12 @@ pub trait Service { /// Returns `Ready` when the service is able to process requests. /// - /// If the service is at capacity, then `Pending` is returned and the task - /// is notified when the service becomes ready again. This function is - /// expected to be called while on a task. + /// If the service is at capacity, then `Pending` is returned and the task is notified when the + /// service becomes ready again. This function is expected to be called while on a task. /// - /// This is a **best effort** implementation. False positives are permitted. - /// It is permitted for the service to return `Ready` from a `poll_ready` - /// call and the next invocation of `call` results in an error. + /// This is a best effort implementation. False positives are permitted. It is permitted for + /// the service to return `Ready` from a `poll_ready` call and the next invocation of `call` + /// results in an error. /// /// # Notes /// 1. `poll_ready` might be called on a different task to `call`. @@ -117,25 +116,26 @@ pub trait Service { /// Process the request and return the response asynchronously. /// - /// This function is expected to be callable off task. As such, - /// implementations should take care to not call `poll_ready`. If the - /// service is at capacity and the request is unable to be handled, the - /// returned `Future` should resolve to an error. + /// This function is expected to be callable off-task. As such, implementations of `call` should + /// take care to not call `poll_ready`. If the service is at capacity and the request is unable + /// to be handled, the returned `Future` should resolve to an error. /// - /// Calling `call` without calling `poll_ready` is permitted. The - /// implementation must be resilient to this fact. + /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be + /// resilient to this fact. fn call(&self, req: Req) -> Self::Future; } /// Factory for creating `Service`s. /// -/// Acts as a service factory. This is useful for cases where new `Service`s -/// must be produced. One case is a TCP server listener. The listener -/// accepts new TCP streams, obtains a new `Service` using the -/// `ServiceFactory` trait, and uses the new `Service` to process inbound -/// requests on that new TCP stream. +/// This is useful for cases where new `Service`s must be produced. One case is a TCP +/// server listener: a listener accepts new connections, constructs a new `Service` for each using +/// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new +/// connection. /// /// `Config` is a service factory configuration type. +/// +/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to +/// reduce boilerplate. pub trait ServiceFactory { /// Responses given by the created services. type Response; diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 8fdff66f..b561a1e5 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -222,10 +222,9 @@ where #[cfg(test)] mod tests { - use core::{ - future::{ready, Ready}, - time::Duration, - }; + use core::time::Duration; + + use actix_utils::future::{ready, Ready}; use super::*; use crate::Service; From 7a82288066ea434349d9850eef63e4fae17a56d0 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 21:58:18 +0100 Subject: [PATCH 4/4] docs tweak --- actix-service/src/lib.rs | 3 ++- actix-service/src/macros.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index e83323a3..3ae22679 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -2,6 +2,7 @@ #![no_std] #![deny(rust_2018_idioms, nonstandard_style)] +#![warn(missing_docs)] #![allow(clippy::type_complexity)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] @@ -59,7 +60,7 @@ use self::ready::{err, ok, ready, Ready}; /// response type. /// /// Services can also have mutable state that influence computation by using a `Cell`, `RefCell` -/// or `Mutex`. Services intentionally do not take `&mut self` to reduce over-head in the +/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the /// common cases. /// /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index d2ae9dbf..e23b2960 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,6 +1,6 @@ -/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. +/// An implementation of [`poll_ready`]() that always signals readiness. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run @@ -34,12 +34,12 @@ macro_rules! always_ready { }; } -/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a +/// An implementation of [`poll_ready`] that forwards readiness checks to a /// named struct field. /// /// Tuple structs are not supported. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run