merge master

This commit is contained in:
fakeshadow 2021-04-16 08:09:14 +08:00
commit 8b5dde997c
16 changed files with 355 additions and 112 deletions

View File

@ -5,8 +5,11 @@
* Remove `Future` impl for `ServerBuilder`. [#266]
* Rename `Server` to `ServerHandle`. `ServerHandle` must be explicitly constructed with `Server::handle` API. [#266]
* Add `Server`(new type) that can be `await` for blocking until server stop. [#266]
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
This would make all worker shutdown immediately in force shutdown case. [#333]
[#266]: https://github.com/actix/actix-net/pull/266
[#333]: https://github.com/actix/actix-net/pull/333
## 2.0.0-beta.4 - 2021-04-01

View File

@ -19,14 +19,14 @@ use std::{
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::pipeline_factory;
use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace");
env::set_var("RUST_LOG", "info");
env_logger::init();
let count = Arc::new(AtomicUsize::new(0));
@ -42,7 +42,7 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| {
fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
async move {

View File

@ -125,6 +125,8 @@ impl WorkerAvailability {
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
services: Box<[WorkerService]>,
@ -422,12 +424,6 @@ impl Default for WorkerState {
}
}
impl Drop for ServerWorker {
fn drop(&mut self) {
Arbiter::current().stop();
}
}
impl Future for ServerWorker {
type Output = ();

View File

@ -1,7 +1,8 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::{net, thread, time};
use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server;
use actix_service::fn_service;
use actix_utils::future::ok;
@ -35,7 +36,7 @@ fn test_bind() {
});
let handle = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = handle.stop(true);
let _ = h.join().unwrap();
@ -67,7 +68,7 @@ fn test_listen() {
let handle = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = handle.stop(true);
let _ = h.join().unwrap();
@ -76,11 +77,11 @@ fn test_listen() {
#[test]
#[cfg(unix)]
fn test_start() {
use std::io::Read;
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use bytes::Bytes;
use futures_util::sink::SinkExt;
use std::io::Read;
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
@ -114,16 +115,16 @@ fn test_start() {
// pause
let _ = srv.pause();
thread::sleep(time::Duration::from_millis(200));
thread::sleep(Duration::from_millis(200));
let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(time::Duration::from_millis(100)))
conn.set_read_timeout(Some(Duration::from_millis(100)))
.unwrap();
let res = conn.read_exact(&mut buf);
assert!(res.is_err());
// resume
let _ = srv.resume();
thread::sleep(time::Duration::from_millis(100));
thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok());
@ -135,10 +136,10 @@ fn test_start() {
// stop
let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100));
thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(time::Duration::from_millis(100));
thread::sleep(Duration::from_millis(100));
sys.stop();
let _ = h.join();
}
@ -182,8 +183,9 @@ fn test_configure() {
let _ = server.await;
});
});
let (server, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500));
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok());
@ -202,7 +204,6 @@ async fn test_max_concurrent_connections() {
// The limit test on the other hand is only for concurrent tcp stream limiting a work
// thread accept.
use actix_rt::net::TcpStream;
use tokio::io::AsyncWriteExt;
let addr = unused_addr();
@ -228,7 +229,7 @@ async fn test_max_concurrent_connections() {
let counter = counter.clone();
async move {
counter.fetch_add(1, Ordering::SeqCst);
actix_rt::time::sleep(time::Duration::from_secs(20)).await;
sleep(Duration::from_secs(20)).await;
counter.fetch_sub(1, Ordering::SeqCst);
Ok::<(), ()>(())
}
@ -251,7 +252,7 @@ async fn test_max_concurrent_connections() {
conns.push(conn);
}
actix_rt::time::sleep(time::Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
// counter would remain at 3 even with 12 successful connection.
// and 9 of them remain in backlog.
@ -270,9 +271,7 @@ async fn test_max_concurrent_connections() {
#[actix_rt::test]
async fn test_service_restart() {
use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::{net::TcpStream, time::sleep};
use actix_service::{fn_factory, Service};
use futures_core::future::LocalBoxFuture;
use tokio::io::AsyncWriteExt;
@ -440,3 +439,143 @@ async fn test_service_restart() {
let _ = server.stop(false);
let _ = h.join().unwrap();
}
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
type Response = ();
type Error = ();
type Config = ();
type Service = TestService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: Self::Config) -> Self::Future {
let counter = self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async move { Ok(TestService(counter)) })
}
}
struct TestService(usize);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: TcpStream) -> Self::Future {
let counter = self.0;
let mut stream = stream.into_std().unwrap();
use std::io::Write;
let str = counter.to_string();
let buf = str.as_bytes();
let mut written = 0;
while written < buf.len() {
if let Ok(n) = stream.write(&buf[written..]) {
written += n;
}
}
stream.flush().unwrap();
stream.shutdown(net::Shutdown::Write).unwrap();
// force worker 2 to restart service once.
if counter == 2 {
panic!("panic on purpose")
} else {
Box::pin(async { Ok(()) })
}
}
}
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(1));
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.workers(2)
.run();
let _ = tx.send((server.handle(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
let mut buf = [0; 8];
// worker 1 would not restart and return it's id consistently.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// worker 2 dead after return response.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("2", id);
stream.shutdown().await.unwrap();
// request to worker 1
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarting and work goes to worker 1.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarted but worker 1 was still the next to accept connection.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 accept connection again but it's id is 3.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
}

View File

@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
* Removed pipeline and related structs/functions. [#335]
[#335]: https://github.com/actix/actix-net/pull/335
## 2.0.0-beta.5 - 2021-03-15

View File

@ -22,8 +22,10 @@ path = "src/lib.rs"
[dependencies]
futures-core = { version = "0.3.7", default-features = false }
paste = "1"
pin-project-lite = "0.2"
[dev-dependencies]
actix-rt = "2.0.0"
actix-utils = "3.0.0-beta.4"
futures-util = { version = "0.3.7", default-features = false }

View File

@ -11,11 +11,11 @@ use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
/// Service for the `and_then` combinator, chaining a computation onto the end of another service
/// which completes successfully.
///
/// This is created by the `Pipeline::and_then` method.
pub(crate) struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
impl<A, B, Req> AndThenService<A, B, Req> {
/// Create new `AndThen` combinator
@ -64,7 +64,7 @@ where
}
pin_project! {
pub(crate) struct AndThenServiceResponse<A, B, Req>
pub struct AndThenServiceResponse<A, B, Req>
where
A: Service<Req>,
B: Service<A::Response, Error = A::Error>,
@ -117,7 +117,7 @@ where
}
/// `.and_then()` service factory combinator
pub(crate) struct AndThenServiceFactory<A, B, Req>
pub struct AndThenServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,
A::Config: Clone,
@ -200,7 +200,7 @@ where
}
pin_project! {
pub(crate) struct AndThenServiceFactoryResponse<A, B, Req>
pub struct AndThenServiceFactoryResponse<A, B, Req>
where
A: ServiceFactory<Req>,
B: ServiceFactory<A::Response>,
@ -272,7 +272,9 @@ mod tests {
use futures_util::future::lazy;
use crate::{
fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory,
fn_factory, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
};
struct Srv1(Rc<Cell<usize>>);

View File

@ -214,7 +214,11 @@ mod tests {
use futures_util::future::lazy;
use super::*;
use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory};
use crate::{
ok,
pipeline::{pipeline, pipeline_factory},
Ready, Service, ServiceFactory,
};
#[derive(Clone)]
struct Srv;

View File

@ -3,19 +3,22 @@
use alloc::{boxed::Box, rc::Rc};
use core::{future::Future, pin::Pin};
use paste::paste;
use crate::{Service, ServiceFactory};
/// A boxed future without a Send bound or lifetime parameters.
/// A boxed future with no send bound or lifetime parameters.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
macro_rules! service_object {
($name: ident, $type: tt, $fn_name: ident) => {
/// Type alias for service trait object.
paste! {
#[doc = "Type alias for service trait object using `" $type "`."]
pub type $name<Req, Res, Err> = $type<
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>,
>;
/// Create service trait object.
#[doc = "Wraps service as a trait object using [`" $name "`]."]
pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error>
where
S: Service<Req> + 'static,
@ -24,6 +27,7 @@ macro_rules! service_object {
{
$type::new(ServiceWrapper::new(service))
}
}
};
}
@ -56,10 +60,10 @@ where
}
}
/// Wrapper for a service factory trait object that will produce a boxed trait object service.
/// Wrapper for a service factory that will map it's services to boxed trait object services.
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>);
/// Create service factory trait object.
/// Wraps a service factory that returns service trait objects.
pub fn factory<SF, Req>(
factory: SF,
) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError>

View File

@ -1,8 +1,12 @@
use crate::{
map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory,
Transform,
and_then::{AndThenService, AndThenServiceFactory},
map::Map,
map_err::MapErr,
transform_err::TransformMapInitErr,
IntoService, IntoServiceFactory, Service, ServiceFactory, Transform,
};
/// An extension trait for [`Service`]s that provides a variety of convenient adapters.
pub trait ServiceExt<Req>: Service<Req> {
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
@ -36,10 +40,27 @@ pub trait ServiceExt<Req>: Service<Req> {
{
MapErr::new(self, f)
}
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that the second service
/// isn't called until call to the fist service have finished. Result of the call to the first
/// service is used as an input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a wrapped version of it.
fn and_then<I, S1>(self, service: I) -> AndThenService<Self, S1, Req>
where
Self: Sized,
I: IntoService<S1, Self::Response>,
S1: Service<Self::Response, Error = Self::Error>,
{
AndThenService::new(self, service.into_service())
}
}
impl<S, Req> ServiceExt<Req> for S where S: Service<Req> {}
/// An extension trait for [`ServiceFactory`]s that provides a variety of convenient adapters.
pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
@ -68,10 +89,27 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
{
crate::map_init_err::MapInitErr::new(self, f)
}
/// Call another service after call to this one has resolved successfully.
fn and_then<I, SF1>(self, factory: I) -> AndThenServiceFactory<Self, SF1, Req>
where
Self: Sized,
Self::Config: Clone,
I: IntoServiceFactory<SF1, Self::Response>,
SF1: ServiceFactory<
Self::Response,
Config = Self::Config,
Error = Self::Error,
InitError = Self::InitError,
>,
{
AndThenServiceFactory::new(self, factory.into_factory())
}
}
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
/// An extension trait for [`Transform`]s that provides a variety of convenient adapters.
pub trait TransformExt<S, Req>: Transform<S, Req> {
/// Return a new `Transform` whose init error is mapped to to a different type.
fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E>

View File

@ -2,6 +2,7 @@
#![no_std]
#![deny(rust_2018_idioms, nonstandard_style)]
#![warn(missing_docs)]
#![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
@ -37,7 +38,6 @@ pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt};
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
pub use self::map_config::{map_config, unit_config};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply, ApplyTransform, Transform};
#[allow(unused_imports)]
@ -53,8 +53,14 @@ use self::ready::{err, ok, ready, Ready};
/// async fn(Request) -> Result<Response, Err>
/// ```
///
/// The `Service` trait just generalizes this form where each parameter is described as an
/// associated type on the trait. Services can also have mutable state that influence computation.
/// The `Service` trait just generalizes this form. Requests are defined as a generic type parameter
/// and responses and other details are defined as associated types on the trait impl. Notice that
/// this design means that services can receive many request types and converge them to a single
/// response type.
///
/// Services can also have mutable state that influence computation by using a `Cell`, `RefCell`
/// or `Mutex`. Services intentionally do not take `&mut self` to reduce overhead in the
/// common cases.
///
/// `Service` provides a symmetric and uniform API; the same abstractions can be used to represent
/// both clients and servers. Services describe only _transformation_ operations which encourage
@ -64,11 +70,10 @@ use self::ready::{err, ok, ready, Ready};
/// ```ignore
/// struct MyService;
///
/// impl Service for MyService {
/// type Request = u8;
/// impl Service<u8> for MyService {
/// type Response = u64;
/// type Error = MyError;
/// type Future = Pin<Box<Future<Output=Result<Self::Response, Self::Error>>>>;
/// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
///
/// fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { ... }
///
@ -77,10 +82,13 @@ use self::ready::{err, ok, ready, Ready};
/// ```
///
/// Sometimes it is not necessary to implement the Service trait. For example, the above service
/// could be rewritten as a simple function and passed to [fn_service](fn_service()).
/// could be rewritten as a simple function and passed to [`fn_service`](fn_service()).
///
/// ```ignore
/// async fn my_service(req: u8) -> Result<u64, MyError>;
///
/// let svc = fn_service(my_service)
/// svc.call(123)
/// ```
pub trait Service<Req> {
/// Responses given by the service.
@ -94,13 +102,12 @@ pub trait Service<Req> {
/// Returns `Ready` when the service is able to process requests.
///
/// If the service is at capacity, then `Pending` is returned and the task
/// is notified when the service becomes ready again. This function is
/// expected to be called while on a task.
/// If the service is at capacity, then `Pending` is returned and the task is notified when the
/// service becomes ready again. This function is expected to be called while on a task.
///
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
/// This is a best effort implementation. False positives are permitted. It is permitted for
/// the service to return `Ready` from a `poll_ready` call and the next invocation of `call`
/// results in an error.
///
/// # Notes
/// 1. `poll_ready` might be called on a different task to `call`.
@ -109,25 +116,26 @@ pub trait Service<Req> {
/// Process the request and return the response asynchronously.
///
/// This function is expected to be callable off task. As such,
/// implementations should take care to not call `poll_ready`. If the
/// service is at capacity and the request is unable to be handled, the
/// returned `Future` should resolve to an error.
/// This function is expected to be callable off-task. As such, implementations of `call` should
/// take care to not call `poll_ready`. If the service is at capacity and the request is unable
/// to be handled, the returned `Future` should resolve to an error.
///
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
/// Invoking `call` without first invoking `poll_ready` is permitted. Implementations must be
/// resilient to this fact.
fn call(&self, req: Req) -> Self::Future;
}
/// Factory for creating `Service`s.
///
/// Acts as a service factory. This is useful for cases where new `Service`s
/// must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new `Service` using the
/// `ServiceFactory` trait, and uses the new `Service` to process inbound
/// requests on that new TCP stream.
/// This is useful for cases where new `Service`s must be produced. One case is a TCP
/// server listener: a listener accepts new connections, constructs a new `Service` for each using
/// the `ServiceFactory` trait, and uses the new `Service` to process inbound requests on that new
/// connection.
///
/// `Config` is a service factory configuration type.
///
/// Simple factories may be able to use [`fn_factory`] or [`fn_factory_with_config`] to
/// reduce boilerplate.
pub trait ServiceFactory<Req> {
/// Responses given by the created services.
type Response;
@ -144,7 +152,7 @@ pub trait ServiceFactory<Req> {
/// Errors potentially raised while building a service.
type InitError;
/// The future of the `Service` instance.
/// The future of the `Service` instance.g
type Future: Future<Output = Result<Self::Service, Self::InitError>>;
/// Create and return a new service asynchronously.

View File

@ -1,6 +1,6 @@
/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness.
/// An implementation of [`poll_ready`]() that always signals readiness.
///
/// [`Service::poll_ready`]: crate::Service::poll_ready
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Examples
/// ```no_run
@ -34,12 +34,12 @@ macro_rules! always_ready {
};
}
/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a
/// An implementation of [`poll_ready`] that forwards readiness checks to a
/// named struct field.
///
/// Tuple structs are not supported.
///
/// [`Service::poll_ready`]: crate::Service::poll_ready
/// [`poll_ready`]: crate::Service::poll_ready
///
/// # Examples
/// ```no_run

View File

@ -1,3 +1,6 @@
// TODO: see if pipeline is necessary
#![allow(dead_code)]
use core::{
marker::PhantomData,
task::{Context, Poll},
@ -11,7 +14,7 @@ use crate::then::{ThenService, ThenServiceFactory};
use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
/// Construct new pipeline with one service in pipeline chain.
pub fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req>
pub(crate) fn pipeline<I, S, Req>(service: I) -> Pipeline<S, Req>
where
I: IntoService<S, Req>,
S: Service<Req>,
@ -23,7 +26,7 @@ where
}
/// Construct new pipeline factory with one service factory.
pub fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req>
pub(crate) fn pipeline_factory<I, SF, Req>(factory: I) -> PipelineFactory<SF, Req>
where
I: IntoServiceFactory<SF, Req>,
SF: ServiceFactory<Req>,
@ -35,7 +38,7 @@ where
}
/// Pipeline service - pipeline allows to compose multiple service into one service.
pub struct Pipeline<S, Req> {
pub(crate) struct Pipeline<S, Req> {
service: S,
_phantom: PhantomData<Req>,
}
@ -157,7 +160,7 @@ impl<S: Service<Req>, Req> Service<Req> for Pipeline<S, Req> {
}
/// Pipeline factory
pub struct PipelineFactory<SF, Req> {
pub(crate) struct PipelineFactory<SF, Req> {
factory: SF,
_phantom: PhantomData<Req>,
}

View File

@ -246,7 +246,11 @@ mod tests {
use futures_util::future::lazy;
use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory};
use crate::{
err, ok,
pipeline::{pipeline, pipeline_factory},
ready, Ready, Service, ServiceFactory,
};
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);

View File

@ -21,13 +21,12 @@ where
ApplyTransform::new(t, factory.into_factory())
}
/// The `Transform` trait defines the interface of a service factory that wraps inner service
/// during construction.
/// Defines the interface of a service factory that wraps inner service during construction.
///
/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in
/// the request/response lifecycle. It may modify request and/or response.
/// Transformers wrap an inner service and runs during inbound and/or outbound processing in the
/// service lifecycle. It may modify request and/or response.
///
/// For example, timeout transform:
/// For example, a timeout service wrapper:
///
/// ```ignore
/// pub struct Timeout<S> {
@ -35,11 +34,7 @@ where
/// timeout: Duration,
/// }
///
/// impl<S> Service for Timeout<S>
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
/// type Response = S::Response;
/// type Error = TimeoutError<S::Error>;
/// type Future = TimeoutServiceResponse<S>;
@ -55,26 +50,22 @@ where
/// }
/// ```
///
/// Timeout service in above example is decoupled from underlying service implementation and could
/// be applied to any service.
/// This wrapper service is decoupled from the underlying service implementation and could be
/// applied to any service.
///
/// The `Transform` trait defines the interface of a Service factory. `Transform` is often
/// The `Transform` trait defines the interface of a service wrapper. `Transform` is often
/// implemented for middleware, defining how to construct a middleware Service. A Service that is
/// constructed by the factory takes the Service that follows it during execution as a parameter,
/// assuming ownership of the next Service.
///
/// Factory for `Timeout` middleware from the above example could look like this:
/// A transform for the `Timeout` middleware could look like this:
///
/// ```ignore
/// pub struct TimeoutTransform {
/// timeout: Duration,
/// }
///
/// impl<S> Transform<S> for TimeoutTransform
/// where
/// S: Service,
/// {
/// type Request = S::Request;
/// impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
/// type Response = S::Response;
/// type Error = TimeoutError<S::Error>;
/// type InitError = S::Error;
@ -82,7 +73,7 @@ where
/// type Future = Ready<Result<Self::Transform, Self::InitError>>;
///
/// fn new_transform(&self, service: S) -> Self::Future {
/// ready(Ok(TimeoutService {
/// ready(Ok(Timeout {
/// service,
/// timeout: self.timeout,
/// }))
@ -227,3 +218,53 @@ where
}
}
}
#[cfg(test)]
mod tests {
use core::time::Duration;
use actix_utils::future::{ready, Ready};
use super::*;
use crate::Service;
// pseudo-doctest for Transform trait
pub struct TimeoutTransform {
timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Transform<S, Req> for TimeoutTransform {
type Response = S::Response;
type Error = S::Error;
type InitError = S::Error;
type Transform = Timeout<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(Timeout {
service,
_timeout: self.timeout,
}))
}
}
// pseudo-doctest for Transform trait
pub struct Timeout<S> {
service: S,
_timeout: Duration,
}
// pseudo-doctest for Transform trait
impl<S: Service<Req>, Req> Service<Req> for Timeout<S> {
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
crate::forward_ready!(service);
fn call(&self, req: Req) -> Self::Future {
self.service.call(req)
}
}
}

View File

@ -31,7 +31,7 @@ use std::{
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::pipeline_factory;
use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
use log::info;
@ -39,14 +39,9 @@ use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
};
#[derive(Debug)]
struct ServiceState {
num: Arc<AtomicUsize>,
}
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace");
env::set_var("RUST_LOG", "info");
env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new());
@ -73,7 +68,8 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count);
// Set up TLS service factory
pipeline_factory(tls_acceptor.clone())
tls_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed);