Merge branch 'master' into fix/non_graceful_shutdown

This commit is contained in:
fakeshadow 2021-04-15 15:53:50 -07:00 committed by GitHub
commit 64ed57d698
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 273 additions and 75 deletions

View File

@ -125,6 +125,8 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>, rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>, rx2: UnboundedReceiver<Stop>,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
@ -370,6 +372,7 @@ impl Default for WorkerState {
impl Drop for ServerWorker { impl Drop for ServerWorker {
fn drop(&mut self) { fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop(); Arbiter::current().stop();
} }
} }

View File

@ -1,7 +1,8 @@
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{net, thread, time}; use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server; use actix_server::Server;
use actix_service::fn_service; use actix_service::fn_service;
use actix_utils::future::ok; use actix_utils::future::ok;
@ -37,7 +38,7 @@ fn test_bind() {
}); });
let (_, sys) = rx.recv().unwrap(); let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
@ -64,7 +65,7 @@ fn test_listen() {
}); });
let sys = rx.recv().unwrap(); let sys = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
@ -73,11 +74,11 @@ fn test_listen() {
#[test] #[test]
#[cfg(unix)] #[cfg(unix)]
fn test_start() { fn test_start() {
use std::io::Read;
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use bytes::Bytes; use bytes::Bytes;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
use std::io::Read;
let addr = unused_addr(); let addr = unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@ -112,16 +113,16 @@ fn test_start() {
// pause // pause
let _ = srv.pause(); let _ = srv.pause();
thread::sleep(time::Duration::from_millis(200)); thread::sleep(Duration::from_millis(200));
let mut conn = net::TcpStream::connect(addr).unwrap(); let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(time::Duration::from_millis(100))) conn.set_read_timeout(Some(Duration::from_millis(100)))
.unwrap(); .unwrap();
let res = conn.read_exact(&mut buf); let res = conn.read_exact(&mut buf);
assert!(res.is_err()); assert!(res.is_err());
// resume // resume
let _ = srv.resume(); let _ = srv.resume();
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
@ -133,10 +134,10 @@ fn test_start() {
// stop // stop
let _ = srv.stop(false); let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(time::Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
} }
@ -182,7 +183,7 @@ fn test_configure() {
let _ = sys.run(); let _ = sys.run();
}); });
let (_, sys) = rx.recv().unwrap(); let (_, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok());
@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() {
// The limit test on the other hand is only for concurrent tcp stream limiting a work // The limit test on the other hand is only for concurrent tcp stream limiting a work
// thread accept. // thread accept.
use actix_rt::net::TcpStream;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
let addr = unused_addr(); let addr = unused_addr();
@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() {
let counter = counter.clone(); let counter = counter.clone();
async move { async move {
counter.fetch_add(1, Ordering::SeqCst); counter.fetch_add(1, Ordering::SeqCst);
actix_rt::time::sleep(time::Duration::from_secs(20)).await; sleep(Duration::from_secs(20)).await;
counter.fetch_sub(1, Ordering::SeqCst); counter.fetch_sub(1, Ordering::SeqCst);
Ok::<(), ()>(()) Ok::<(), ()>(())
} }
@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() {
conns.push(conn); conns.push(conn);
} }
actix_rt::time::sleep(time::Duration::from_secs(5)).await; sleep(Duration::from_secs(5)).await;
// counter would remain at 3 even with 12 successful connection. // counter would remain at 3 even with 12 successful connection.
// and 9 of them remain in backlog. // and 9 of them remain in backlog.
@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() {
#[actix_rt::test] #[actix_rt::test]
async fn test_service_restart() { async fn test_service_restart() {
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::{net::TcpStream, time::sleep};
use actix_service::{fn_factory, Service}; use actix_service::{fn_factory, Service};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@ -438,3 +436,143 @@ async fn test_service_restart() {
let _ = server.stop(false); let _ = server.stop(false);
let _ = h.join().unwrap(); let _ = h.join().unwrap();
} }
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
type Response = ();
type Error = ();
type Config = ();
type Service = TestService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: Self::Config) -> Self::Future {
let counter = self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async move { Ok(TestService(counter)) })
}
}
struct TestService(usize);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: TcpStream) -> Self::Future {
let counter = self.0;
let mut stream = stream.into_std().unwrap();
use std::io::Write;
let str = counter.to_string();
let buf = str.as_bytes();
let mut written = 0;
while written < buf.len() {
if let Ok(n) = stream.write(&buf[written..]) {
written += n;
}
}
stream.flush().unwrap();
stream.shutdown(net::Shutdown::Write).unwrap();
// force worker 2 to restart service once.
if counter == 2 {
panic!("panic on purpose")
} else {
Box::pin(async { Ok(()) })
}
}
}
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(1));
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
let mut buf = [0; 8];
// worker 1 would not restart and return it's id consistently.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// worker 2 dead after return response.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("2", id);
stream.shutdown().await.unwrap();
// request to worker 1
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarting and work goes to worker 1.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarted but worker 1 was still the next to accept connection.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 accept connection again but it's id is 3.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
}

View File

@ -22,8 +22,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
paste = "1"
pin-project-lite = "0.2" pin-project-lite = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
actix-utils = "3.0.0-beta.4"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View File

@ -3,19 +3,22 @@
use alloc::{boxed::Box, rc::Rc}; use alloc::{boxed::Box, rc::Rc};
use core::{future::Future, pin::Pin}; use core::{future::Future, pin::Pin};
use paste::paste;
use crate::{Service, ServiceFactory}; use crate::{Service, ServiceFactory};
/// A boxed future without a Send bound or lifetime parameters. /// A boxed future with no send bound or lifetime parameters.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>; pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
macro_rules! service_object { macro_rules! service_object {
($name: ident, $type: tt, $fn_name: ident) => { ($name: ident, $type: tt, $fn_name: ident) => {
/// Type alias for service trait object. paste! {
#[doc = "Type alias for service trait object using `" $type "`."]
pub type $name<Req, Res, Err> = $type< pub type $name<Req, Res, Err> = $type<
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>, dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>,
>; >;
/// Create service trait object. #[doc = "Wraps service as a trait object using [`" $name "`]."]
pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error> pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error>
where where
S: Service<Req> + 'static, S: Service<Req> + 'static,
@ -24,6 +27,7 @@ macro_rules! service_object {
{ {
$type::new(ServiceWrapper::new(service)) $type::new(ServiceWrapper::new(service))
} }
}
}; };
} }
@ -56,10 +60,10 @@ where
} }
} }
/// Wrapper for a service factory trait object that will produce a boxed trait object service. /// Wrapper for a service factory that will map it's services to boxed trait object services.
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>); pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>);
/// Create service factory trait object. /// Wraps a service factory that returns service trait objects.
pub fn factory<SF, Req>( pub fn factory<SF, Req>(
factory: SF, factory: SF,
) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError> ) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError>

View File

@ -2,6 +2,7 @@
#![no_std] #![no_std]
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![warn(missing_docs)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
@ -53,8 +54,14 @@ use self::ready::{err, ok, ready, Ready};
/// async fn(Request) -> Result<Response, Err> /// async fn(Request) -> Result<Response, Err>
/// ``` /// ```
/// ///
/// The `Service` trait just generalizes this form where each parameter is described as an /// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter
/// associated type on the trait. Services can also have mutable state that influence computation. /// and responses and other details are defined as associated types on the trait impl. Notice that
/// this design means that services can receive many request types and converge them to a single
/// response type.
///
/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell`
/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the
/// common cases.
/// ///
/// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent /// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent
/// both clients and servers. Services describe only _transformation_ operations which encourage /// both clients and servers. Services describe only _transformation_ operations which encourage
@ -64,11 +71,10 @@ use self::ready::{err, ok, ready, Ready};
/// ```ignore /// ```ignore
/// struct MyService; /// struct MyService;
/// ///
/// impl Service for MyService { /// impl Service<u8> for MyService {
/// type Request = u8;
/// type Response = u64; /// type Response = u64;
/// type Error = MyError; /// type Error = MyError;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>>; /// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
/// ///
/// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... } /// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
/// ///
@ -77,10 +83,13 @@ use self::ready::{err, ok, ready, Ready};
/// ``` /// ```
/// ///
/// Sometimes it is not necessary to implement the Service trait. For example, the above service /// Sometimes it is not necessary to implement the Service trait. For example, the above service
/// could be rewritten as a simple function and passed to [fn_service](fn_service()). /// could be rewritten as a simple function and passed to [`fn_service`](fn_service()).
/// ///
/// ```ignore /// ```ignore
/// async fn my_service(req: u8) -> Result<u64, MyError>; /// async fn my_service(req: u8) -> Result<u64, MyError>;
///
/// let svc = fn_service(my_service)
/// svc.call(123)
/// ``` /// ```
pub trait Service<Req> { pub trait Service<Req> {
/// Responses given by the service. /// Responses given by the service.
@ -94,13 +103,12 @@ pub trait Service<Req> {
/// Returns `Ready` when the service is able to process requests. /// Returns `Ready` when the service is able to process requests.
/// ///
/// If the service is at capacity, then `Pending` is returned and the task /// If the service is at capacity, then `Pending` is returned and the task is notified when the
/// is notified when the service becomes ready again. This function is /// service becomes ready again. This function is expected to be called while on a task.
/// expected to be called while on a task.
/// ///
/// This is a **best effort** implementation. False positives are permitted. /// This is a best effort implementation. False positives are permitted. It is permitted for
/// It is permitted for the service to return `Ready` from a `poll_ready` /// the service to return `Ready` from a `poll_ready` call and the next invocation of `call`
/// call and the next invocation of `call` results in an error. /// results in an error.
/// ///
/// # Notes /// # Notes
/// 1. `poll_ready` might be called on a different task to `call`. /// 1. `poll_ready` might be called on a different task to `call`.
@ -109,25 +117,26 @@ pub trait Service<Req> {
/// Process the request and return the response asynchronously. /// Process the request and return the response asynchronously.
/// ///
/// This function is expected to be callable off task. As such, /// This function is expected to be callable off-task. As such, implementations of `call` should
/// implementations should take care to not call `poll_ready`. If the /// take care to not call `poll_ready`. If the service is at capacity and the request is unable
/// service is at capacity and the request is unable to be handled, the /// to be handled, the returned `Future` should resolve to an error.
/// returned `Future` should resolve to an error.
/// ///
/// Calling `call` without calling `poll_ready` is permitted. The /// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be
/// implementation must be resilient to this fact. /// resilient to this fact.
fn call(&self, req: Req) -> Self::Future; fn call(&self, req: Req) -> Self::Future;
} }
/// Factory for creating `Service`s. /// Factory for creating `Service`s.
/// ///
/// Acts as a service factory. This is useful for cases where new `Service`s /// This is useful for cases where new `Service`s must be produced. One case is a TCP
/// must be produced. One case is a TCP server listener. The listener /// server listener: a listener accepts new connections, constructs a new `Service` for each using
/// accepts new TCP streams, obtains a new `Service` using the /// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new
/// `ServiceFactory` trait, and uses the new `Service` to process inbound /// connection.
/// requests on that new TCP stream.
/// ///
/// `Config` is a service factory configuration type. /// `Config` is a service factory configuration type.
///
/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to
/// reduce boilerplate.
pub trait ServiceFactory<Req> { pub trait ServiceFactory<Req> {
/// Responses given by the created services. /// Responses given by the created services.
type Response; type Response;
@ -144,7 +153,7 @@ pub trait ServiceFactory<Req> {
/// Errors potentially raised while building a service. /// Errors potentially raised while building a service.
type InitError; type InitError;
/// The future of the `Service` instance. /// The future of the `Service` instance.g
type Future: Future<Output = Result<Self::Service, Self::InitError>>; type Future: Future<Output = Result<Self::Service, Self::InitError>>;
/// Create and return a new service asynchronously. /// Create and return a new service asynchronously.

View File

@ -1,6 +1,6 @@
/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. /// An implementation of [`poll_ready`]() that always signals readiness.
/// ///
/// [`Service::poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run
@ -34,12 +34,12 @@ macro_rules! always_ready {
}; };
} }
/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a /// An implementation of [`poll_ready`] that forwards readiness checks to a
/// named struct field. /// named struct field.
/// ///
/// Tuple structs are not supported. /// Tuple structs are not supported.
/// ///
/// [`Service::poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready
/// ///
/// # Examples /// # Examples
/// ```no_run /// ```no_run

View File

@ -27,7 +27,7 @@ where
/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in /// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in
/// the request/response lifecycle. It may modify request and/or response. /// the request/response lifecycle. It may modify request and/or response.
/// ///
/// For example, timeout transform: /// For example, a timeout service wrapper:
/// ///
/// ```ignore /// ```ignore
/// pub struct Timeout<S> { /// pub struct Timeout<S> {
@ -35,11 +35,7 @@ where
/// timeout: Duration, /// timeout: Duration,
/// } /// }
/// ///
/// impl<S> Service for Timeout<S> /// impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response; /// type Response = S::Response;
/// type Error = TimeoutError<S::Error>; /// type Error = TimeoutError<S::Error>;
/// type Future = TimeoutServiceResponse<S>; /// type Future = TimeoutServiceResponse<S>;
@ -55,26 +51,22 @@ where
/// } /// }
/// ``` /// ```
/// ///
/// Timeout service in above example is decoupled from underlying service implementation and could /// This wrapper service is decoupled from the underlying service implementation and could be
/// be applied to any service. /// applied to any service.
/// ///
/// The `Transform` trait defines the interface of a Service factory. `Transform` is often /// The `Transform` trait defines the interface of a service wrapper. `Transform` is often
/// implemented for middleware, defining how to construct a middleware Service. A Service that is /// implemented for middleware, defining how to construct a middleware Service. A Service that is
/// constructed by the factory takes the Service that follows it during execution as a parameter, /// constructed by the factory takes the Service that follows it during execution as a parameter,
/// assuming ownership of the next Service. /// assuming ownership of the next Service.
/// ///
/// Factory for `Timeout` middleware from the above example could look like this: /// A transform for the `Timeout` middleware could look like this:
/// ///
/// ```ignore /// ```ignore
/// pub struct TimeoutTransform { /// pub struct TimeoutTransform {
/// timeout: Duration, /// timeout: Duration,
/// } /// }
/// ///
/// impl<S> Transform<S> for TimeoutTransform /// impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// type Response = S::Response; /// type Response = S::Response;
/// type Error = TimeoutError<S::Error>; /// type Error = TimeoutError<S::Error>;
/// type InitError = S::Error; /// type InitError = S::Error;
@ -82,7 +74,7 @@ where
/// type Future = Ready<Result<Self::Transform, Self::InitError>>; /// type Future = Ready<Result<Self::Transform, Self::InitError>>;
/// ///
/// fn new_transform(&self, service: S) -> Self::Future { /// fn new_transform(&self, service: S) -> Self::Future {
/// ready(Ok(TimeoutService { /// ready(Ok(Timeout {
/// service, /// service,
/// timeout: self.timeout, /// timeout: self.timeout,
/// })) /// }))
@ -227,3 +219,53 @@ where
} }
} }
} }
#[cfg(test)]
mod tests {
use core::time::Duration;
use actix_utils::future::{ready, Ready};
use super::*;
use crate::Service;
// pseudo-doctest for Transform trait
pub struct TimeoutTransform {
timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
type Response = S::Response;
type Error = S::Error;
type InitError = S::Error;
type Transform = Timeout<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(Timeout {
service,
_timeout: self.timeout,
}))
}
}
// pseudo-doctest for Transform trait
pub struct Timeout<S> {
service: S,
_timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
crate::forward_ready!(service);
fn call(&self, req: Req) -> Self::Future {
self.service.call(req)
}
}
}