diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index d0f904c7..52cf6aee 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -5,8 +5,11 @@ * Remove `Future` impl for `ServerBuilder`. [#266] * Rename `Server` to `ServerHandle`. `ServerHandle` must be explicitly constructed with `Server::handle` API. [#266] * Add `Server`(new type) that can be `await` for blocking until server stop. [#266] +* Server shutdown would notify all workers to exit regardless if shutdown is graceful. + This would make all worker shutdown immediately in force shutdown case. [#333] [#266]: https://github.com/actix/actix-net/pull/266 +[#333]: https://github.com/actix/actix-net/pull/333 ## 2.0.0-beta.4 - 2021-04-01 diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 7f9bdac8..dd069e58 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -19,14 +19,14 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::pipeline_factory; +use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "actix=trace,basic=trace"); + env::set_var("RUST_LOG", "info"); env_logger::init(); let count = Arc::new(AtomicUsize::new(0)); @@ -42,7 +42,7 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); let num2 = Arc::clone(&count); - pipeline_factory(move |mut stream: TcpStream| { + fn_service(move |mut stream: TcpStream| { let count = Arc::clone(&count); async move { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 53e11db8..9f9bd48f 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -125,6 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { + // UnboundedReceiver should always be the first field. + // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Box<[WorkerService]>, @@ -422,12 +424,6 @@ impl Default for WorkerState { } } -impl Drop for ServerWorker { - fn drop(&mut self) { - Arbiter::current().stop(); - } -} - impl Future for ServerWorker { type Output = (); diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 3f23da39..ceb6fb06 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,7 +1,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; -use std::{net, thread, time}; +use std::{net, thread, time::Duration}; +use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; use actix_utils::future::ok; @@ -35,7 +36,7 @@ fn test_bind() { }); let handle = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = handle.stop(true); let _ = h.join().unwrap(); @@ -67,7 +68,7 @@ fn test_listen() { let handle = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = handle.stop(true); let _ = h.join().unwrap(); @@ -76,11 +77,11 @@ fn test_listen() { #[test] #[cfg(unix)] fn test_start() { + use std::io::Read; + use actix_codec::{BytesCodec, Framed}; - use actix_rt::net::TcpStream; use bytes::Bytes; use futures_util::sink::SinkExt; - use std::io::Read; let addr = unused_addr(); let (tx, rx) = mpsc::channel(); @@ -114,16 +115,16 @@ fn test_start() { // pause let _ = srv.pause(); - thread::sleep(time::Duration::from_millis(200)); + thread::sleep(Duration::from_millis(200)); let mut conn = net::TcpStream::connect(addr).unwrap(); - conn.set_read_timeout(Some(time::Duration::from_millis(100))) + conn.set_read_timeout(Some(Duration::from_millis(100))) .unwrap(); let res = conn.read_exact(&mut buf); assert!(res.is_err()); // resume let _ = srv.resume(); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); @@ -135,10 +136,10 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); sys.stop(); let _ = h.join(); } @@ -182,8 +183,9 @@ fn test_configure() { let _ = server.await; }); }); + let (server, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); @@ -202,7 +204,6 @@ async fn test_max_concurrent_connections() { // The limit test on the other hand is only for concurrent tcp stream limiting a work // thread accept. - use actix_rt::net::TcpStream; use tokio::io::AsyncWriteExt; let addr = unused_addr(); @@ -228,7 +229,7 @@ async fn test_max_concurrent_connections() { let counter = counter.clone(); async move { counter.fetch_add(1, Ordering::SeqCst); - actix_rt::time::sleep(time::Duration::from_secs(20)).await; + sleep(Duration::from_secs(20)).await; counter.fetch_sub(1, Ordering::SeqCst); Ok::<(), ()>(()) } @@ -251,7 +252,7 @@ async fn test_max_concurrent_connections() { conns.push(conn); } - actix_rt::time::sleep(time::Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; // counter would remain at 3 even with 12 successful connection. // and 9 of them remain in backlog. @@ -270,9 +271,7 @@ async fn test_max_concurrent_connections() { #[actix_rt::test] async fn test_service_restart() { use std::task::{Context, Poll}; - use std::time::Duration; - use actix_rt::{net::TcpStream, time::sleep}; use actix_service::{fn_factory, Service}; use futures_core::future::LocalBoxFuture; use tokio::io::AsyncWriteExt; @@ -440,3 +439,143 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.handle(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +} diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 51749ecd..c99cc2eb 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Removed pipeline and related structs/functions. [#335] + +[#335]: https://github.com/actix/actix-net/pull/335 ## 2.0.0-beta.5 - 2021-03-15 diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 84a0c172..1c82f703 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -22,8 +22,10 @@ path = "src/lib.rs" [dependencies] futures-core = { version = "0.3.7", default-features = false } +paste = "1" pin-project-lite = "0.2" [dev-dependencies] actix-rt = "2.0.0" +actix-utils = "3.0.0-beta.4" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index e3b293ea..38980079 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -11,11 +11,11 @@ use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; -/// Service for the `and_then` combinator, chaining a computation onto the end -/// of another service which completes successfully. +/// Service for the `and_then` combinator, chaining a computation onto the end of another service +/// which completes successfully. /// /// This is created by the `Pipeline::and_then` method. -pub(crate) struct AndThenService(Rc<(A, B)>, PhantomData); +pub struct AndThenService(Rc<(A, B)>, PhantomData); impl AndThenService { /// Create new `AndThen` combinator @@ -64,7 +64,7 @@ where } pin_project! { - pub(crate) struct AndThenServiceResponse + pub struct AndThenServiceResponse where A: Service, B: Service, @@ -117,7 +117,7 @@ where } /// `.and_then()` service factory combinator -pub(crate) struct AndThenServiceFactory +pub struct AndThenServiceFactory where A: ServiceFactory, A::Config: Clone, @@ -200,7 +200,7 @@ where } pin_project! { - pub(crate) struct AndThenServiceFactoryResponse + pub struct AndThenServiceFactoryResponse where A: ServiceFactory, B: ServiceFactory, @@ -272,7 +272,9 @@ mod tests { use futures_util::future::lazy; use crate::{ - fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory, + fn_factory, ok, + pipeline::{pipeline, pipeline_factory}, + ready, Ready, Service, ServiceFactory, }; struct Srv1(Rc>); diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index 9a7e27d2..2f798fd0 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -214,7 +214,11 @@ mod tests { use futures_util::future::lazy; use super::*; - use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory}; + use crate::{ + ok, + pipeline::{pipeline, pipeline_factory}, + Ready, Service, ServiceFactory, + }; #[derive(Clone)] struct Srv; diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index a872ca9f..3141c5e4 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -3,26 +3,30 @@ use alloc::{boxed::Box, rc::Rc}; use core::{future::Future, pin::Pin}; +use paste::paste; + use crate::{Service, ServiceFactory}; -/// A boxed future without a Send bound or lifetime parameters. +/// A boxed future with no send bound or lifetime parameters. pub type BoxFuture = Pin>>; macro_rules! service_object { ($name: ident, $type: tt, $fn_name: ident) => { - /// Type alias for service trait object. - pub type $name = $type< - dyn Service>>, - >; + paste! { + #[doc = "Type alias for service trait object using `" $type "`."] + pub type $name = $type< + dyn Service>>, + >; - /// Create service trait object. - pub fn $fn_name(service: S) -> $name - where - S: Service + 'static, - Req: 'static, - S::Future: 'static, - { - $type::new(ServiceWrapper::new(service)) + #[doc = "Wraps service as a trait object using [`" $name "`]."] + pub fn $fn_name(service: S) -> $name + where + S: Service + 'static, + Req: 'static, + S::Future: 'static, + { + $type::new(ServiceWrapper::new(service)) + } } }; } @@ -56,10 +60,10 @@ where } } -/// Wrapper for a service factory trait object that will produce a boxed trait object service. +/// Wrapper for a service factory that will map it's services to boxed trait object services. pub struct BoxServiceFactory(Inner); -/// Create service factory trait object. +/// Wraps a service factory that returns service trait objects. pub fn factory( factory: SF, ) -> BoxServiceFactory diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index d931596b..f5fe6ed1 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,8 +1,12 @@ use crate::{ - map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory, - Transform, + and_then::{AndThenService, AndThenServiceFactory}, + map::Map, + map_err::MapErr, + transform_err::TransformMapInitErr, + IntoService, IntoServiceFactory, Service, ServiceFactory, Transform, }; +/// An extension trait for [`Service`]s that provides a variety of convenient adapters. pub trait ServiceExt: Service { /// Map this service's output to a different type, returning a new service /// of the resulting type. @@ -36,10 +40,27 @@ pub trait ServiceExt: Service { { MapErr::new(self, f) } + + /// Call another service after call to this one has resolved successfully. + /// + /// This function can be used to chain two services together and ensure that the second service + /// isn't called until call to the fist service have finished. Result of the call to the first + /// service is used as an input parameter for the second service's call. + /// + /// Note that this function consumes the receiving service and returns a wrapped version of it. + fn and_then(self, service: I) -> AndThenService + where + Self: Sized, + I: IntoService, + S1: Service, + { + AndThenService::new(self, service.into_service()) + } } impl ServiceExt for S where S: Service {} +/// An extension trait for [`ServiceFactory`]s that provides a variety of convenient adapters. pub trait ServiceFactoryExt: ServiceFactory { /// Map this service's output to a different type, returning a new service /// of the resulting type. @@ -68,10 +89,27 @@ pub trait ServiceFactoryExt: ServiceFactory { { crate::map_init_err::MapInitErr::new(self, f) } + + /// Call another service after call to this one has resolved successfully. + fn and_then(self, factory: I) -> AndThenServiceFactory + where + Self: Sized, + Self::Config: Clone, + I: IntoServiceFactory, + SF1: ServiceFactory< + Self::Response, + Config = Self::Config, + Error = Self::Error, + InitError = Self::InitError, + >, + { + AndThenServiceFactory::new(self, factory.into_factory()) + } } impl ServiceFactoryExt for SF where SF: ServiceFactory {} +/// An extension trait for [`Transform`]s that provides a variety of convenient adapters. pub trait TransformExt: Transform { /// Return a new `Transform` whose init error is mapped to to a different type. fn map_init_err(self, f: F) -> TransformMapInitErr diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index cc82bfa6..8f839121 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -2,6 +2,7 @@ #![no_std] #![deny(rust_2018_idioms, nonstandard_style)] +#![warn(missing_docs)] #![allow(clippy::type_complexity)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] @@ -37,7 +38,6 @@ pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::map_config::{map_config, unit_config}; -pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; pub use self::transform::{apply, ApplyTransform, Transform}; #[allow(unused_imports)] @@ -53,8 +53,14 @@ use self::ready::{err, ok, ready, Ready}; /// async fn(Request) -> Result /// ``` /// -/// The `Service` trait just generalizes this form where each parameter is described as an -/// associated type on the trait. Services can also have mutable state that influence computation. +/// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter +/// and responses and other details are defined as associated types on the trait impl. Notice that +/// this design means that services can receive many request types and converge them to a single +/// response type. +/// +/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell` +/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the +/// common cases. /// /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent /// both clients and servers. Services describe only _transformation_ operations which encourage @@ -64,11 +70,10 @@ use self::ready::{err, ok, ready, Ready}; /// ```ignore /// struct MyService; /// -/// impl Service for MyService { -/// type Request = u8; +/// impl Service for MyService { /// type Response = u64; /// type Error = MyError; -/// type Future = Pin>>>; +/// type Future = Pin>>>; /// /// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { ... } /// @@ -77,10 +82,13 @@ use self::ready::{err, ok, ready, Ready}; /// ``` /// /// Sometimes it is not necessary to implement the Service trait. For example, the above service -/// could be rewritten as a simple function and passed to [fn_service](fn_service()). +/// could be rewritten as a simple function and passed to [`fn_service`](fn_service()). /// /// ```ignore /// async fn my_service(req: u8) -> Result; +/// +/// let svc = fn_service(my_service) +/// svc.call(123) /// ``` pub trait Service { /// Responses given by the service. @@ -94,13 +102,12 @@ pub trait Service { /// Returns `Ready` when the service is able to process requests. /// - /// If the service is at capacity, then `Pending` is returned and the task - /// is notified when the service becomes ready again. This function is - /// expected to be called while on a task. + /// If the service is at capacity, then `Pending` is returned and the task is notified when the + /// service becomes ready again. This function is expected to be called while on a task. /// - /// This is a **best effort** implementation. False positives are permitted. - /// It is permitted for the service to return `Ready` from a `poll_ready` - /// call and the next invocation of `call` results in an error. + /// This is a best effort implementation. False positives are permitted. It is permitted for + /// the service to return `Ready` from a `poll_ready` call and the next invocation of `call` + /// results in an error. /// /// # Notes /// 1. `poll_ready` might be called on a different task to `call`. @@ -109,25 +116,26 @@ pub trait Service { /// Process the request and return the response asynchronously. /// - /// This function is expected to be callable off task. As such, - /// implementations should take care to not call `poll_ready`. If the - /// service is at capacity and the request is unable to be handled, the - /// returned `Future` should resolve to an error. + /// This function is expected to be callable off-task. As such, implementations of `call` should + /// take care to not call `poll_ready`. If the service is at capacity and the request is unable + /// to be handled, the returned `Future` should resolve to an error. /// - /// Calling `call` without calling `poll_ready` is permitted. The - /// implementation must be resilient to this fact. + /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be + /// resilient to this fact. fn call(&self, req: Req) -> Self::Future; } /// Factory for creating `Service`s. /// -/// Acts as a service factory. This is useful for cases where new `Service`s -/// must be produced. One case is a TCP server listener. The listener -/// accepts new TCP streams, obtains a new `Service` using the -/// `ServiceFactory` trait, and uses the new `Service` to process inbound -/// requests on that new TCP stream. +/// This is useful for cases where new `Service`s must be produced. One case is a TCP +/// server listener: a listener accepts new connections, constructs a new `Service` for each using +/// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new +/// connection. /// /// `Config` is a service factory configuration type. +/// +/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to +/// reduce boilerplate. pub trait ServiceFactory { /// Responses given by the created services. type Response; @@ -144,7 +152,7 @@ pub trait ServiceFactory { /// Errors potentially raised while building a service. type InitError; - /// The future of the `Service` instance. + /// The future of the `Service` instance.g type Future: Future>; /// Create and return a new service asynchronously. diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index d2ae9dbf..e23b2960 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,6 +1,6 @@ -/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. +/// An implementation of [`poll_ready`]() that always signals readiness. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run @@ -34,12 +34,12 @@ macro_rules! always_ready { }; } -/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a +/// An implementation of [`poll_ready`] that forwards readiness checks to a /// named struct field. /// /// Tuple structs are not supported. /// -/// [`Service::poll_ready`]: crate::Service::poll_ready +/// [`poll_ready`]: crate::Service::poll_ready /// /// # Examples /// ```no_run diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 0ec43f0d..2c71a74b 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -1,3 +1,6 @@ +// TODO: see if pipeline is necessary +#![allow(dead_code)] + use core::{ marker::PhantomData, task::{Context, Poll}, @@ -11,7 +14,7 @@ use crate::then::{ThenService, ThenServiceFactory}; use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; /// Construct new pipeline with one service in pipeline chain. -pub fn pipeline(service: I) -> Pipeline +pub(crate) fn pipeline(service: I) -> Pipeline where I: IntoService, S: Service, @@ -23,7 +26,7 @@ where } /// Construct new pipeline factory with one service factory. -pub fn pipeline_factory(factory: I) -> PipelineFactory +pub(crate) fn pipeline_factory(factory: I) -> PipelineFactory where I: IntoServiceFactory, SF: ServiceFactory, @@ -35,7 +38,7 @@ where } /// Pipeline service - pipeline allows to compose multiple service into one service. -pub struct Pipeline { +pub(crate) struct Pipeline { service: S, _phantom: PhantomData, } @@ -157,7 +160,7 @@ impl, Req> Service for Pipeline { } /// Pipeline factory -pub struct PipelineFactory { +pub(crate) struct PipelineFactory { factory: SF, _phantom: PhantomData, } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index c9428824..82b9dc94 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -246,7 +246,11 @@ mod tests { use futures_util::future::lazy; - use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory}; + use crate::{ + err, ok, + pipeline::{pipeline, pipeline_factory}, + ready, Ready, Service, ServiceFactory, + }; #[derive(Clone)] struct Srv1(Rc>); diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index b0abe72b..00b686f9 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -21,13 +21,12 @@ where ApplyTransform::new(t, factory.into_factory()) } -/// The `Transform` trait defines the interface of a service factory that wraps inner service -/// during construction. +/// Defines the interface of a service factory that wraps inner service during construction. /// -/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in -/// the request/response lifecycle. It may modify request and/or response. +/// Transformers wrap an inner service and runs during inbound and/or outbound processing in the +/// service lifecycle. It may modify request and/or response. /// -/// For example, timeout transform: +/// For example, a timeout service wrapper: /// /// ```ignore /// pub struct Timeout { @@ -35,11 +34,7 @@ where /// timeout: Duration, /// } /// -/// impl Service for Timeout -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Service for Timeout { /// type Response = S::Response; /// type Error = TimeoutError; /// type Future = TimeoutServiceResponse; @@ -55,26 +50,22 @@ where /// } /// ``` /// -/// Timeout service in above example is decoupled from underlying service implementation and could -/// be applied to any service. +/// This wrapper service is decoupled from the underlying service implementation and could be +/// applied to any service. /// -/// The `Transform` trait defines the interface of a Service factory. `Transform` is often +/// The `Transform` trait defines the interface of a service wrapper. `Transform` is often /// implemented for middleware, defining how to construct a middleware Service. A Service that is /// constructed by the factory takes the Service that follows it during execution as a parameter, /// assuming ownership of the next Service. /// -/// Factory for `Timeout` middleware from the above example could look like this: +/// A transform for the `Timeout` middleware could look like this: /// /// ```ignore /// pub struct TimeoutTransform { /// timeout: Duration, /// } /// -/// impl Transform for TimeoutTransform -/// where -/// S: Service, -/// { -/// type Request = S::Request; +/// impl, Req> Transform for TimeoutTransform { /// type Response = S::Response; /// type Error = TimeoutError; /// type InitError = S::Error; @@ -82,7 +73,7 @@ where /// type Future = Ready>; /// /// fn new_transform(&self, service: S) -> Self::Future { -/// ready(Ok(TimeoutService { +/// ready(Ok(Timeout { /// service, /// timeout: self.timeout, /// })) @@ -227,3 +218,53 @@ where } } } + +#[cfg(test)] +mod tests { + use core::time::Duration; + + use actix_utils::future::{ready, Ready}; + + use super::*; + use crate::Service; + + // pseudo-doctest for Transform trait + pub struct TimeoutTransform { + timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Transform for TimeoutTransform { + type Response = S::Response; + type Error = S::Error; + type InitError = S::Error; + type Transform = Timeout; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(Timeout { + service, + _timeout: self.timeout, + })) + } + } + + // pseudo-doctest for Transform trait + pub struct Timeout { + service: S, + _timeout: Duration, + } + + // pseudo-doctest for Transform trait + impl, Req> Service for Timeout { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + crate::forward_ready!(service); + + fn call(&self, req: Req) -> Self::Future { + self.service.call(req) + } + } +} diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index d0c20428..687c1f86 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -31,7 +31,7 @@ use std::{ use actix_rt::net::TcpStream; use actix_server::Server; -use actix_service::pipeline_factory; +use actix_service::ServiceFactoryExt as _; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; @@ -39,14 +39,9 @@ use rustls::{ internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, }; -#[derive(Debug)] -struct ServiceState { - num: Arc, -} - #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "actix=trace,basic=trace"); + env::set_var("RUST_LOG", "info"); env_logger::init(); let mut tls_config = ServerConfig::new(NoClientAuth::new()); @@ -73,7 +68,8 @@ async fn main() -> io::Result<()> { let count = Arc::clone(&count); // Set up TLS service factory - pipeline_factory(tls_acceptor.clone()) + tls_acceptor + .clone() .map_err(|err| println!("Rustls error: {:?}", err)) .and_then(move |stream: TlsStream| { let num = count.fetch_add(1, Ordering::Relaxed);