From 8e98d9168c8db1201e4fe8d4f04c15dac4febeda Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 15 Apr 2021 10:49:43 -0700 Subject: [PATCH 1/6] add test for restart worker thread (#328) --- actix-server/src/worker.rs | 3 + actix-server/tests/test_server.rs | 170 +++++++++++++++++++++++++++--- 2 files changed, 157 insertions(+), 16 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 801467f8..65951345 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -125,6 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { + // UnboundedReceiver should always be the first field. + // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Box<[WorkerService]>, @@ -370,6 +372,7 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 40b07e1c..3af072bb 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,7 +1,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; -use std::{net, thread, time}; +use std::{net, thread, time::Duration}; +use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; use actix_utils::future::ok; @@ -37,7 +38,7 @@ fn test_bind() { }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -64,7 +65,7 @@ fn test_listen() { }); let sys = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -73,11 +74,11 @@ fn test_listen() { #[test] #[cfg(unix)] fn test_start() { + use std::io::Read; + use actix_codec::{BytesCodec, Framed}; - use actix_rt::net::TcpStream; use bytes::Bytes; use futures_util::sink::SinkExt; - use std::io::Read; let addr = unused_addr(); let (tx, rx) = mpsc::channel(); @@ -112,16 +113,16 @@ fn test_start() { // pause let _ = srv.pause(); - thread::sleep(time::Duration::from_millis(200)); + thread::sleep(Duration::from_millis(200)); let mut conn = net::TcpStream::connect(addr).unwrap(); - conn.set_read_timeout(Some(time::Duration::from_millis(100))) + conn.set_read_timeout(Some(Duration::from_millis(100))) .unwrap(); let res = conn.read_exact(&mut buf); assert!(res.is_err()); // resume let _ = srv.resume(); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); @@ -133,10 +134,10 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); sys.stop(); let _ = h.join(); } @@ -182,7 +183,7 @@ fn test_configure() { let _ = sys.run(); }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); @@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() { // The limit test on the other hand is only for concurrent tcp stream limiting a work // thread accept. - use actix_rt::net::TcpStream; use tokio::io::AsyncWriteExt; let addr = unused_addr(); @@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() { let counter = counter.clone(); async move { counter.fetch_add(1, Ordering::SeqCst); - actix_rt::time::sleep(time::Duration::from_secs(20)).await; + sleep(Duration::from_secs(20)).await; counter.fetch_sub(1, Ordering::SeqCst); Ok::<(), ()>(()) } @@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() { conns.push(conn); } - actix_rt::time::sleep(time::Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; // counter would remain at 3 even with 12 successful connection. // and 9 of them remain in backlog. @@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() { #[actix_rt::test] async fn test_service_restart() { use std::task::{Context, Poll}; - use std::time::Duration; - use actix_rt::{net::TcpStream, time::sleep}; use actix_service::{fn_factory, Service}; use futures_core::future::LocalBoxFuture; use tokio::io::AsyncWriteExt; @@ -438,3 +436,143 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +} From ef206f40fb95ccb1d1d5f34a8f0e2950c0b8ff4e Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 20:13:27 +0100 Subject: [PATCH 2/6] update ignored service docs to new traits --- actix-service/src/lib.rs | 18 +++++--- actix-service/src/transform.rs | 75 ++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 21 deletions(-) diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index cc82bfa6..3db2bcc5 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -53,8 +53,14 @@ use self::ready::{err, ok, ready, Ready}; /// async fn(Request) -> Result /// ``` /// -/// The `Service` trait just generalizes this form where each parameter is described as an -/// associated type on the trait. Services can also have mutable state that influence computation. +/// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter +/// and responses and other details are defined as associated types on the trait impl. Notice that +/// this design means that services can receive many request types and converge them to a single +/// response type. +/// +/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell` +/// or `Mutex`. Services intentionally do not take `&mut self` to reduce over-head in the +/// common cases. /// /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent /// both clients and servers. Services describe only _transformation_ operations which encourage @@ -64,8 +70,7 @@ use self::ready::{err, ok, ready, Ready}; /// ```ignore /// struct MyService; /// -/// impl Service for MyService { -/// type Request = u8; +/// impl Service for MyService { /// type Response = u64; /// type Error = MyError; /// type Future = Pin>>>; @@ -81,6 +86,9 @@ use self::ready::{err, ok, ready, Ready}; /// /// ```ignore /// async fn my_service(req: u8) -> Result; +/// +/// let svc = fn_service(my_service) +/// svc.call(123) /// ``` pub trait Service { /// Responses given by the service. @@ -144,7 +152,7 @@ pub trait ServiceFactory { /// Errors potentially raised while building a service. type InitError; - /// The future of the `Service` instance. + /// The future of the `Service` instance.g type Future: Future>; /// Create and return a new service asynchronously. diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index b0abe72b..8fdff66f 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -27,7 +27,7 @@ where /// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in /// the request/response lifecycle. It may modify request and/or response. /// -/// For example, timeout transform: +/// For example, a timeout service wrapper: /// /// ```ignore /// pub struct Timeout { @@ -35,11 +35,7 @@ where /// timeout: Duration, /// } /// -/// impl Service for Timeout -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Service for Timeout { /// type Response = S::Response; /// type Error = TimeoutError; /// type Future = TimeoutServiceResponse; @@ -55,26 +51,22 @@ where /// } /// ``` /// -/// Timeout service in above example is decoupled from underlying service implementation and could -/// be applied to any service. +/// This wrapper service is decoupled from the underlying service implementation and could be +/// applied to any service. /// -/// The `Transform` trait defines the interface of a Service factory. `Transform` is often +/// The `Transform` trait defines the interface of a service wrapper. `Transform` is often /// implemented for middleware, defining how to construct a middleware Service. A Service that is /// constructed by the factory takes the Service that follows it during execution as a parameter, /// assuming ownership of the next Service. /// -/// Factory for `Timeout` middleware from the above example could look like this: +/// A transform for the `Timeout` middleware could look like this: /// /// ```ignore /// pub struct TimeoutTransform { /// timeout: Duration, /// } /// -/// impl Transform for TimeoutTransform -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Transform for TimeoutTransform { /// type Response = S::Response; /// type Error = TimeoutError; /// type InitError = S::Error; @@ -82,7 +74,7 @@ where /// type Future = Ready>; /// /// fn new_transform(&self, service: S) -> Self::Future { -/// ready(Ok(TimeoutService { +/// ready(Ok(Timeout { /// service, /// timeout: self.timeout, /// })) @@ -227,3 +219,54 @@ where } } } + +#[cfg(test)] +mod tests { + use core::{ + future::{ready, Ready}, + time::Duration, + }; + + use super::*; + use crate::Service; + + // pseudo-doctest for Transform trait + pub struct TimeoutTransform { + timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Transform for TimeoutTransform { + type Response = S::Response; + type Error = S::Error; + type InitError = S::Error; + type Transform = Timeout; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(Timeout { + service, + _timeout: self.timeout, + })) + } + } + + // pseudo-doctest for Transform trait + pub struct Timeout { + service: S, + _timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Service for Timeout { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + crate::forward_ready!(service); + + fn call(&self, req: Req) -> Self::Future { + self.service.call(req) + } + } +} From 4e6d88d1438c05d0ed90cce214a6970a1ed3e2c5 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 20:43:02 +0100 Subject: [PATCH 3/6] improve boxed service docs --- actix-service/Cargo.toml | 2 ++ actix-service/src/boxed.rs | 34 ++++++++++++++++-------------- actix-service/src/lib.rs | 38 +++++++++++++++++----------------- actix-service/src/transform.rs | 7 +++---- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 84a0c172..1c82f703 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -22,8 +22,10 @@ path = "src/lib.rs" [dependencies] futures-core = { version = "0.3.7", default-features = false } +paste = "1" pin-project-lite = "0.2" [dev-dependencies] actix-rt = "2.0.0" +actix-utils = "3.0.0-beta.4" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index a872ca9f..3141c5e4 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -3,26 +3,30 @@ use alloc::{boxed::Box, rc::Rc}; use core::{future::Future, pin::Pin}; +use paste::paste; + use crate::{Service, ServiceFactory}; -/// A boxed future without a Send bound or lifetime parameters. +/// A boxed future with no send bound or lifetime parameters. pub type BoxFuture = Pin>>; macro_rules! service_object { ($name: ident, $type: tt, $fn_name: ident) => { - /// Type alias for service trait object. - pub type $name = $type< - dyn Service>>, - >; + paste! { + #[doc = "Type alias for service trait object using `" $type "`."] + pub type $name = $type< + dyn Service>>, + >; - /// Create service trait object. - pub fn $fn_name(service: S) -> $name - where - S: Service + 'static, - Req: 'static, - S::Future: 'static, - { - $type::new(ServiceWrapper::new(service)) + #[doc = "Wraps service as a trait object using [`" $name "`]."] + pub fn $fn_name(service: S) -> $name + where + S: Service + 'static, + Req: 'static, + S::Future: 'static, + { + $type::new(ServiceWrapper::new(service)) + } } }; } @@ -56,10 +60,10 @@ where } } -/// Wrapper for a service factory trait object that will produce a boxed trait object service. +/// Wrapper for a service factory that will map it's services to boxed trait object services. pub struct BoxServiceFactory(Inner); -/// Create service factory trait object. +/// Wraps a service factory that returns service trait objects. pub fn factory( factory: SF, ) -> BoxServiceFactory diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 3db2bcc5..e83323a3 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -73,7 +73,7 @@ use self::ready::{err, ok, ready, Ready}; /// impl Service for MyService { /// type Response = u64; /// type Error = MyError; -/// type Future = Pin>>>; +/// type Future = Pin>>>; /// /// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { ... } /// @@ -82,7 +82,7 @@ use self::ready::{err, ok, ready, Ready}; /// ``` /// /// Sometimes it is not necessary to implement the Service trait. For example, the above service -/// could be rewritten as a simple function and passed to [fn_service](fn_service()). +/// could be rewritten as a simple function and passed to [`fn_service`](fn_service()). /// /// ```ignore /// async fn my_service(req: u8) -> Result; @@ -102,13 +102,12 @@ pub trait Service { /// Returns `Ready` when the service is able to process requests. /// - /// If the service is at capacity, then `Pending` is returned and the task - /// is notified when the service becomes ready again. This function is - /// expected to be called while on a task. + /// If the service is at capacity, then `Pending` is returned and the task is notified when the + /// service becomes ready again. This function is expected to be called while on a task. /// - /// This is a **best effort** implementation. False positives are permitted. - /// It is permitted for the service to return `Ready` from a `poll_ready` - /// call and the next invocation of `call` results in an error. + /// This is a best effort implementation. False positives are permitted. It is permitted for + /// the service to return `Ready` from a `poll_ready` call and the next invocation of `call` + /// results in an error. /// /// # Notes /// 1. `poll_ready` might be called on a different task to `call`. @@ -117,25 +116,26 @@ pub trait Service { /// Process the request and return the response asynchronously. /// - /// This function is expected to be callable off task. As such, - /// implementations should take care to not call `poll_ready`. If the - /// service is at capacity and the request is unable to be handled, the - /// returned `Future` should resolve to an error. + /// This function is expected to be callable off-task. As such, implementations of `call` should + /// take care to not call `poll_ready`. If the service is at capacity and the request is unable + /// to be handled, the returned `Future` should resolve to an error. /// - /// Calling `call` without calling `poll_ready` is permitted. The - /// implementation must be resilient to this fact. + /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be + /// resilient to this fact. fn call(&self, req: Req) -> Self::Future; } /// Factory for creating `Service`s. /// -/// Acts as a service factory. This is useful for cases where new `Service`s -/// must be produced. One case is a TCP server listener. The listener -/// accepts new TCP streams, obtains a new `Service` using the -/// `ServiceFactory` trait, and uses the new `Service` to process inbound -/// requests on that new TCP stream. +/// This is useful for cases where new `Service`s must be produced. One case is a TCP +/// server listener: a listener accepts new connections, constructs a new `Service` for each using +/// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new +/// connection. /// /// `Config` is a service factory configuration type. +/// +/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to +/// reduce boilerplate. pub trait ServiceFactory { /// Responses given by the created services. type Response; diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 8fdff66f..b561a1e5 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -222,10 +222,9 @@ where #[cfg(test)] mod tests { - use core::{ - future::{ready, Ready}, - time::Duration, - }; + use core::time::Duration; + + use actix_utils::future::{ready, Ready}; use super::*; use crate::Service; From 7a82288066ea434349d9850eef63e4fae17a56d0 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 15 Apr 2021 21:58:18 +0100 Subject: [PATCH 4/6] docs tweak --- actix-service/src/lib.rs | 3 ++- actix-service/src/macros.rs | 8 ++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index e83323a3..3ae22679 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -2,6 +2,7 @@ #![no_std] #![deny(rust_2018_idioms, nonstandard_style)] +#![warn(missing_docs)] #![allow(clippy::type_complexity)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] @@ -59,7 +60,7 @@ use self::ready::{err, ok, ready, Ready}; /// response type. /// /// Services can also have mutable state that influence computation by using a `Cell`, `RefCell` -/// or `Mutex`. Services intentionally do not take `&mut self` to reduce over-head in the +/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the /// common cases. /// /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index d2ae9dbf..e23b2960 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,6 +1,6 @@ -/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. +/// An implementation of [`poll_ready`]() that always signals readiness. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run @@ -34,12 +34,12 @@ macro_rules! always_ready { }; } -/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a +/// An implementation of [`poll_ready`] that forwards readiness checks to a /// named struct field. /// /// Tuple structs are not supported. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run From 47fba25d67752a8196c70086abf5f2c0adff1eef Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 16 Apr 2021 00:00:02 +0100 Subject: [PATCH 5/6] remove pipeline from public api (#335) --- actix-server/examples/tcp-echo.rs | 16 ++++++------ actix-service/CHANGES.md | 3 +++ actix-service/src/and_then.rs | 16 ++++++------ actix-service/src/apply.rs | 6 ++++- actix-service/src/ext.rs | 42 +++++++++++++++++++++++++++++-- actix-service/src/lib.rs | 1 - actix-service/src/pipeline.rs | 11 +++++--- actix-service/src/then.rs | 6 ++++- actix-service/src/transform.rs | 7 +++--- actix-tls/examples/tcp-rustls.rs | 12 +++------ 10 files changed, 85 insertions(+), 35 deletions(-) diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 45e473a9..8b038da4 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -9,15 +9,17 @@ //! Start typing. When you press enter the typed line will be echoed back. The server will log //! the length of each line it echos and the total size of data sent when the connection is closed. -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +use std::{ + env, io, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; -use std::{env, io}; use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::pipeline_factory; +use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; @@ -25,7 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "actix=trace,basic=trace"); + env::set_var("RUST_LOG", "info"); env_logger::init(); let count = Arc::new(AtomicUsize::new(0)); @@ -41,7 +43,7 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); let num2 = Arc::clone(&count); - pipeline_factory(move |mut stream: TcpStream| { + fn_service(move |mut stream: TcpStream| { let count = Arc::clone(&count); async move { diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 51749ecd..c99cc2eb 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Removed pipeline and related structs/functions. [#335] + +[#335]: https://github.com/actix/actix-net/pull/335 ## 2.0.0-beta.5 - 2021-03-15 diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index e3b293ea..38980079 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -11,11 +11,11 @@ use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; -/// Service for the `and_then` combinator, chaining a computation onto the end -/// of another service which completes successfully. +/// Service for the `and_then` combinator, chaining a computation onto the end of another service +/// which completes successfully. /// /// This is created by the `Pipeline::and_then` method. -pub(crate) struct AndThenService(Rc<(A, B)>, PhantomData); +pub struct AndThenService(Rc<(A, B)>, PhantomData); impl AndThenService { /// Create new `AndThen` combinator @@ -64,7 +64,7 @@ where } pin_project! { - pub(crate) struct AndThenServiceResponse + pub struct AndThenServiceResponse where A: Service, B: Service, @@ -117,7 +117,7 @@ where } /// `.and_then()` service factory combinator -pub(crate) struct AndThenServiceFactory +pub struct AndThenServiceFactory where A: ServiceFactory, A::Config: Clone, @@ -200,7 +200,7 @@ where } pin_project! { - pub(crate) struct AndThenServiceFactoryResponse + pub struct AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, @@ -272,7 +272,9 @@ mod tests { use futures_util::future::lazy; use crate::{ - fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory, + fn_factory, ok, + pipeline::{pipeline, pipeline_factory}, + ready, Ready, Service, ServiceFactory, }; struct Srv1(Rc>); diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index 9a7e27d2..2f798fd0 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -214,7 +214,11 @@ mod tests { use futures_util::future::lazy; use super::*; - use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory}; + use crate::{ + ok, + pipeline::{pipeline, pipeline_factory}, + Ready, Service, ServiceFactory, + }; #[derive(Clone)] struct Srv; diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index d931596b..f5fe6ed1 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,8 +1,12 @@ use crate::{ - map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory, - Transform, + and_then::{AndThenService, AndThenServiceFactory}, + map::Map, + map_err::MapErr, + transform_err::TransformMapInitErr, + IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, }; +/// An extension trait for [`Service`]s that provides a variety of convenient adapters. pub trait ServiceExt: Service { /// Map this service's output to a different type, returning a new service /// of the resulting type. @@ -36,10 +40,27 @@ pub trait ServiceExt: Service { { MapErr::new(self, f) } + + /// Call another service after call to this one has resolved successfully. + /// + /// This function can be used to chain two services together and ensure that the second service + /// isn't called until call to the fist service have finished. Result of the call to the first + /// service is used as an input parameter for the second service's call. + /// + /// Note that this function consumes the receiving service and returns a wrapped version of it. + fn and_then(self, service: I) -> AndThenService + where + Self: Sized, + I: IntoService, + S1: Service, + { + AndThenService::new(self, service.into_service()) + } } impl ServiceExt for S where S: Service {} +/// An extension trait for [`ServiceFactory`]s that provides a variety of convenient adapters. pub trait ServiceFactoryExt: ServiceFactory { /// Map this service's output to a different type, returning a new service /// of the resulting type. @@ -68,10 +89,27 @@ pub trait ServiceFactoryExt: ServiceFactory { { crate::map_init_err::MapInitErr::new(self, f) } + + /// Call another service after call to this one has resolved successfully. + fn and_then(self, factory: I) -> AndThenServiceFactory + where + Self: Sized, + Self::Config: Clone, + I: IntoServiceFactory, + SF1: ServiceFactory< + Self::Response, + Config = Self::Config, + Error = Self::Error, + InitError = Self::InitError, + >, + { + AndThenServiceFactory::new(self, factory.into_factory()) + } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} +/// An extension trait for [`Transform`]s that provides a variety of convenient adapters. pub trait TransformExt: Transform { /// Return a new `Transform` whose init error is mapped to to a different type. fn map_init_err(self, f: F) -> TransformMapInitErr diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 3ae22679..8f839121 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -38,7 +38,6 @@ pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::map_config::{map_config, unit_config}; -pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; pub use self::transform::{apply, ApplyTransform, Transform}; #[allow(unused_imports)] diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 0ec43f0d..2c71a74b 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -1,3 +1,6 @@ +// TODO: see if pipeline is necessary +#![allow(dead_code)] + use core::{ marker::PhantomData, task::{Context, Poll}, @@ -11,7 +14,7 @@ use crate::then::{ThenService, ThenServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Construct new pipeline with one service in pipeline chain. -pub fn pipeline(service: I) -> Pipeline +pub(crate) fn pipeline(service: I) -> Pipeline where I: IntoService, S: Service, @@ -23,7 +26,7 @@ where } /// Construct new pipeline factory with one service factory. -pub fn pipeline_factory(factory: I) -> PipelineFactory +pub(crate) fn pipeline_factory(factory: I) -> PipelineFactory where I: IntoServiceFactory, SF: ServiceFactory, @@ -35,7 +38,7 @@ where } /// Pipeline service - pipeline allows to compose multiple service into one service. -pub struct Pipeline { +pub(crate) struct Pipeline { service: S, _phantom: PhantomData, } @@ -157,7 +160,7 @@ impl, Req> Service for Pipeline { } /// Pipeline factory -pub struct PipelineFactory { +pub(crate) struct PipelineFactory { factory: SF, _phantom: PhantomData, } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index c9428824..82b9dc94 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -246,7 +246,11 @@ mod tests { use futures_util::future::lazy; - use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory}; + use crate::{ + err, ok, + pipeline::{pipeline, pipeline_factory}, + ready, Ready, Service, ServiceFactory, + }; #[derive(Clone)] struct Srv1(Rc>); diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index b561a1e5..00b686f9 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -21,11 +21,10 @@ where ApplyTransform::new(t, factory.into_factory()) } -/// The `Transform` trait defines the interface of a service factory that wraps inner service -/// during construction. +/// Defines the interface of a service factory that wraps inner service during construction. /// -/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in -/// the request/response lifecycle. It may modify request and/or response. +/// Transformers wrap an inner service and runs during inbound and/or outbound processing in the +/// service lifecycle. It may modify request and/or response. /// /// For example, a timeout service wrapper: /// diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index d0c20428..687c1f86 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -31,7 +31,7 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::pipeline_factory; +use actix_service::ServiceFactoryExt as _; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; @@ -39,14 +39,9 @@ use rustls::{ internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, }; -#[derive(Debug)] -struct ServiceState { - num: Arc, -} - #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "actix=trace,basic=trace"); + env::set_var("RUST_LOG", "info"); env_logger::init(); let mut tls_config = ServerConfig::new(NoClientAuth::new()); @@ -73,7 +68,8 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); // Set up TLS service factory - pipeline_factory(tls_acceptor.clone()) + tls_acceptor + .clone() .map_err(|err| println!("Rustls error: {:?}", err)) .and_then(move |stream: TlsStream| { let num = count.fetch_add(1, Ordering::Relaxed); From aeb81ad3fd19b3e976c4ce8a83527d9192ce29dc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 15 Apr 2021 16:54:15 -0700 Subject: [PATCH 6/6] Fix worker are notified to stop with non_graceful shutdown (#333) --- actix-server/CHANGES.md | 4 ++++ actix-server/src/builder.rs | 46 ++++++++++++------------------------- 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 60d7ce37..7ccada0e 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx +* Server shutdown would notify all workers to exit regardless if shutdown is graceful. + This would make all worker shutdown immediately in force shutdown case. [#333] + +[#333]: https://github.com/actix/actix-net/pull/333 ## 2.0.0-beta.4 - 2021-04-01 diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 6019ff16..aa18bb22 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -381,45 +381,29 @@ impl ServerBuilder { let notify = std::mem::take(&mut self.notify); // stop workers - if !self.handles.is_empty() && graceful { - let iter = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); + let stop = self + .handles + .iter() + .map(move |worker| worker.1.stop(graceful)) + .collect(); - let fut = join_all(iter); - - rt::spawn(async move { - let _ = fut.await; - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - if exit { - rt::spawn(async { - sleep(Duration::from_millis(300)).await; - System::current().stop(); - }); - } - }); - } else { - // we need to stop system if server was spawned - if self.exit { - rt::spawn(async { - sleep(Duration::from_millis(300)).await; - System::current().stop(); - }); + rt::spawn(async move { + if graceful { + let _ = join_all(stop).await; } + if let Some(tx) = completion { let _ = tx.send(()); } for tx in notify { let _ = tx.send(()); } - } + + if exit { + sleep(Duration::from_millis(300)).await; + System::current().stop(); + } + }); } ServerCommand::WorkerFaulted(idx) => { let mut found = false;