diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml deleted file mode 100644 index 7c76e171..00000000 --- a/.github/workflows/bench.yml +++ /dev/null @@ -1,29 +0,0 @@ -name: Benchmark (Linux) - -on: - pull_request: - types: [opened, synchronize, reopened] - push: - branches: - - master - - '1.0' - -jobs: - check_benchmark: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - name: Install Rust - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly - profile: minimal - override: true - - - name: Check benchmark - uses: actions-rs/cargo@v1 - with: - command: bench - args: --package=actix-service diff --git a/Cargo.toml b/Cargo.toml index 4f51cef3..82d5f832 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,6 @@ actix-router = { path = "router" } bytestring = { path = "string" } #FIXME: remove override -http = { git = "https://github.com/fakeshadow/http.git" } +#http = { git = "https://github.com/fakeshadow/http.git" } +trust-dns-proto = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" } +trust-dns-resolver = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" } \ No newline at end of file diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index bbd94315..e901efd5 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -21,6 +21,6 @@ bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } log = "0.4" -pin-project = "1.0.0" +pin-project-lite = "0.2" tokio = "1" tokio-util = { version = "0.6", features = ["codec", "io"] } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 3c6447a9..cf2297dc 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -5,7 +5,6 @@ use std::{fmt, io}; use bytes::{Buf, BytesMut}; use futures_core::{ready, Stream}; use futures_sink::Sink; -use pin_project::pin_project; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; @@ -21,22 +20,23 @@ bitflags::bitflags! { } } -/// A unified `Stream` and `Sink` interface to an underlying I/O object, using -/// the `Encoder` and `Decoder` traits to encode and decode frames. -/// -/// Raw I/O objects work with byte sequences, but higher-level code usually -/// wants to batch these into meaningful chunks, called "frames". This -/// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder` -/// traits to handle encoding and decoding of message frames. Note that -/// the incoming and outgoing frame types may be distinct. -#[pin_project] -pub struct Framed { - #[pin] - io: T, - codec: U, - flags: Flags, - read_buf: BytesMut, - write_buf: BytesMut, +pin_project_lite::pin_project! { + /// A unified `Stream` and `Sink` interface to an underlying I/O object, using + /// the `Encoder` and `Decoder` traits to encode and decode frames. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Encoder`/`Decoder` + /// traits to handle encoding and decoding of message frames. Note that + /// the incoming and outgoing frame types may be distinct. + pub struct Framed { + #[pin] + io: T, + codec: U, + flags: Flags, + read_buf: BytesMut, + write_buf: BytesMut, + } } impl Framed diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 4cea0f31..cdec658f 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -33,7 +33,7 @@ uri = ["http"] [dependencies] actix-service = "1.0.6" actix-codec = "0.3.0" -actix-utils = "3.0.0" +actix-utils = "2.0.0" actix-rt = "1.1.1" derive_more = "0.99.2" @@ -43,8 +43,8 @@ futures-util = { version = "0.3.7", default-features = false } http = { version = "0.2.2", optional = true } log = "0.4" # FIXME: Use release version -trust-dns-proto = { git = "https://github.com/bluejekyll/trust-dns", branch = "main" } -trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", branch = "main" } +trust-dns-proto = "0.20.0-alpha.3" +trust-dns-resolver = "0.20.0-alpha.3" # openssl open-ssl = { package = "openssl", version = "0.10", optional = true } diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index 19e1f4c6..c5a3450e 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -74,9 +74,7 @@ impl Service> for TcpConnector { type Error = ConnectError; type Future = TcpConnectorResponse; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, req: Connect) -> Self::Future { let port = req.port(); diff --git a/actix-connect/src/resolve.rs b/actix-connect/src/resolve.rs index b498af1d..16b3b37d 100644 --- a/actix-connect/src/resolve.rs +++ b/actix-connect/src/resolve.rs @@ -106,9 +106,7 @@ impl Service> for Resolver { type Error = ConnectError; type Future = ResolverServiceFuture; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, mut req: Connect) -> Self::Future { if req.addr.is_some() { diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs index 87b13dc5..a7960da9 100644 --- a/actix-connect/src/service.rs +++ b/actix-connect/src/service.rs @@ -94,9 +94,7 @@ impl Service> for ConnectService { type Error = ConnectError; type Future = ConnectServiceResponse; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, req: Connect) -> Self::Future { ConnectServiceResponse { @@ -163,9 +161,7 @@ impl Service> for TcpConnectService { type Error = ConnectError; type Future = TcpConnectServiceResponse; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, req: Connect) -> Self::Future { TcpConnectServiceResponse { diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index 3435b094..64ebde09 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io}; @@ -19,90 +18,71 @@ use crate::{ }; /// OpenSSL connector factory -pub struct OpensslConnector { +pub struct OpensslConnector { connector: SslConnector, - _t: PhantomData<(T, U)>, } -impl OpensslConnector { +impl OpensslConnector { pub fn new(connector: SslConnector) -> Self { - OpensslConnector { - connector, - _t: PhantomData, - } + OpensslConnector { connector } } } -impl OpensslConnector -where - T: Address + 'static, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, -{ - pub fn service(connector: SslConnector) -> OpensslConnectorService { - OpensslConnectorService { - connector, - _t: PhantomData, - } +impl OpensslConnector { + pub fn service(connector: SslConnector) -> OpensslConnectorService { + OpensslConnectorService { connector } } } -impl Clone for OpensslConnector { +impl Clone for OpensslConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl ServiceFactory for OpensslConnector +impl ServiceFactory> for OpensslConnector where T: Address + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { - type Request = Connection; type Response = Connection>; type Error = io::Error; type Config = (); - type Service = OpensslConnectorService; + type Service = OpensslConnectorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { ready(Ok(OpensslConnectorService { connector: self.connector.clone(), - _t: PhantomData, })) } } -pub struct OpensslConnectorService { +pub struct OpensslConnectorService { connector: SslConnector, - _t: PhantomData<(T, U)>, } -impl Clone for OpensslConnectorService { +impl Clone for OpensslConnectorService { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl Service for OpensslConnectorService +impl Service> for OpensslConnectorService where T: Address + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { - type Request = Connection; type Response = Connection>; type Error = io::Error; type Future = OpensslConnectorServiceFuture; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, stream: Connection) -> Self::Future { match self.ssl_stream(stream) { @@ -112,18 +92,18 @@ where } } -impl OpensslConnectorService -where - T: Address + 'static, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, -{ +impl OpensslConnectorService { // construct SslStream with connector. // At this point SslStream does not perform any I/O. // handshake would happen later in OpensslConnectorServiceFuture - fn ssl_stream( + fn ssl_stream( &self, stream: Connection, - ) -> Result<(SslStream, Connection), SslError> { + ) -> Result<(SslStream, Connection), SslError> + where + T: Address + 'static, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, + { trace!("SSL Handshake start for: {:?}", stream.host()); let (stream, connection) = stream.replace(()); let host = connection.host().to_string(); @@ -176,7 +156,7 @@ where pub struct OpensslConnectServiceFactory { tcp: ConnectServiceFactory, - openssl: OpensslConnector, + openssl: OpensslConnector, } impl OpensslConnectServiceFactory { @@ -202,7 +182,6 @@ impl OpensslConnectServiceFactory { tcp: self.tcp.service(), openssl: OpensslConnectorService { connector: self.openssl.connector.clone(), - _t: PhantomData, }, } } @@ -217,8 +196,7 @@ impl Clone for OpensslConnectServiceFactory { } } -impl ServiceFactory for OpensslConnectServiceFactory { - type Request = Connect; +impl ServiceFactory> for OpensslConnectServiceFactory { type Response = SslStream; type Error = ConnectError; type Config = (); @@ -234,18 +212,15 @@ impl ServiceFactory for OpensslConnectServiceFactory { #[derive(Clone)] pub struct OpensslConnectService { tcp: ConnectService, - openssl: OpensslConnectorService, + openssl: OpensslConnectorService, } -impl Service for OpensslConnectService { - type Request = Connect; +impl Service> for OpensslConnectService { type Response = SslStream; type Error = ConnectError; type Future = OpensslConnectServiceResponse; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, req: Connect) -> Self::Future { OpensslConnectServiceResponse { @@ -257,9 +232,9 @@ impl Service for OpensslConnectService { } pub struct OpensslConnectServiceResponse { - fut1: Option< as Service>::Future>, - fut2: Option< as Service>::Future>, - openssl: OpensslConnectorService, + fut1: Option< as Service>>::Future>, + fut2: Option<>>::Future>, + openssl: OpensslConnectorService, } impl Future for OpensslConnectServiceResponse { @@ -267,7 +242,7 @@ impl Future for OpensslConnectServiceResponse { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if let Some(ref mut fut) = self.fut1 { - match futures_util::ready!(Pin::new(fut).poll(cx)) { + match ready!(Pin::new(fut).poll(cx)) { Ok(res) => { let _ = self.fut1.take(); self.fut2 = Some(self.openssl.call(res)); @@ -277,7 +252,7 @@ impl Future for OpensslConnectServiceResponse { } if let Some(ref mut fut) = self.fut2 { - match futures_util::ready!(Pin::new(fut).poll(cx)) { + match ready!(Pin::new(fut).poll(cx)) { Ok(connect) => Poll::Ready(Ok(connect.into_parts().0)), Err(e) => Poll::Ready(Err(ConnectError::Io(io::Error::new( io::ErrorKind::Other, diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index 09fd329c..d45b84ef 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -1,6 +1,5 @@ use std::fmt; use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -18,88 +17,70 @@ use webpki::DNSNameRef; use crate::{Address, Connection}; /// Rustls connector factory -pub struct RustlsConnector { +pub struct RustlsConnector { connector: Arc, - _t: PhantomData<(T, U)>, } -impl RustlsConnector { +impl RustlsConnector { pub fn new(connector: Arc) -> Self { - RustlsConnector { - connector, - _t: PhantomData, - } + RustlsConnector { connector } } } -impl RustlsConnector -where - T: Address, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, -{ - pub fn service(connector: Arc) -> RustlsConnectorService { - RustlsConnectorService { - connector, - _t: PhantomData, - } +impl RustlsConnector { + pub fn service(connector: Arc) -> RustlsConnectorService { + RustlsConnectorService { connector } } } -impl Clone for RustlsConnector { +impl Clone for RustlsConnector { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl ServiceFactory for RustlsConnector +impl ServiceFactory> for RustlsConnector where U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { - type Request = Connection; type Response = Connection>; type Error = std::io::Error; type Config = (); - type Service = RustlsConnectorService; + type Service = RustlsConnectorService; type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { ready(Ok(RustlsConnectorService { connector: self.connector.clone(), - _t: PhantomData, })) } } -pub struct RustlsConnectorService { +pub struct RustlsConnectorService { connector: Arc, - _t: PhantomData<(T, U)>, } -impl Clone for RustlsConnectorService { +impl Clone for RustlsConnectorService { fn clone(&self) -> Self { Self { connector: self.connector.clone(), - _t: PhantomData, } } } -impl Service for RustlsConnectorService +impl Service> for RustlsConnectorService where + T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { - type Request = Connection; type Response = Connection>; type Error = std::io::Error; type Future = ConnectAsyncExt; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, stream: Connection) -> Self::Future { trace!("SSL Handshake start for: {:?}", stream.host()); diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 3290a664..81b66d74 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -10,6 +10,7 @@ * Update `tokio` dependency to `1` * Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio. * Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. + These methods would accept a &Self when calling. Remove `'static` lifetime requirement for `System::run` and `Builder::run`. `Arbiter::spawn` would panic when `System` is not in scope. [#207] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 2bb86ef4..162dcc63 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -23,19 +23,19 @@ default = [] actix-service = "1.0.6" actix-rt = "1.1.1" actix-codec = "0.3.0" -actix-utils = "3.0.0" +actix-utils = "2.0.0" concurrent-queue = "1.2.2" -futures-channel = { version = "0.3.7", default-features = false } -futures-util = { version = "0.3.7", default-features = false } +futures-core = { version = "0.3.7", default-features = false } log = "0.4" mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } num_cpus = "1.13" slab = "0.4" +tokio = { version = "1", features = ["sync"] } [dev-dependencies] actix-testing = "1.0.0" bytes = "1" env_logger = "0.7" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["io-util"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 4ac92465..1ced51e4 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -40,7 +40,7 @@ pub(crate) struct AcceptLoop { impl AcceptLoop { pub fn new(srv: Server) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::with_capacity(poll.registry(), 128) + let waker = WakerQueue::new(poll.registry()) .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); Self { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 673584f3..51dd0eda 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -7,11 +7,9 @@ use std::{io, mem}; use actix_rt::net::TcpStream; use actix_rt::time::{sleep_until, Instant}; use actix_rt::{spawn, System}; -use futures_channel::mpsc::{unbounded, UnboundedReceiver}; -use futures_channel::oneshot; -use futures_util::future::join_all; -use futures_util::stream::Stream; use log::{error, info}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::oneshot; use crate::accept::AcceptLoop; use crate::config::{ConfiguredService, ServiceConfig}; @@ -22,7 +20,7 @@ use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; -use crate::Token; +use crate::{join_all, Token}; /// Server builder pub struct ServerBuilder { @@ -50,7 +48,7 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded(); + let (tx, rx) = unbounded_channel(); let server = Server::new(tx); ServerBuilder { @@ -366,7 +364,8 @@ impl ServerBuilder { let iter = self .handles .iter() - .map(move |worker| worker.1.stop(graceful)); + .map(move |worker| worker.1.stop(graceful)) + .collect(); let fut = join_all(iter); @@ -439,7 +438,7 @@ impl Future for ServerBuilder { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.cmd).poll_next(cx) { + match Pin::new(&mut self.cmd).poll_recv(cx) { Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), _ => return Poll::Pending, } diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index ec536011..20270a2f 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -8,14 +8,13 @@ use actix_service::{ ServiceFactory as BaseServiceFactory, }; use actix_utils::counter::CounterGuard; -use futures_util::future::ready; +use futures_core::future::LocalBoxFuture; use log::error; use crate::builder::bind_addr; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::LocalBoxFuture; -use crate::Token; +use crate::{ready, Token}; pub struct ServiceConfig { pub(crate) services: Vec<(String, MioTcpListener)>, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 1bccfa29..79b5d57b 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -22,6 +22,10 @@ pub use self::service::ServiceFactory; #[doc(hidden)] pub use self::socket::FromStream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + /// Socket ID token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); @@ -44,10 +48,80 @@ impl Token { } } -pub(crate) type LocalBoxFuture<'a, T> = - std::pin::Pin + 'a>>; - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() } + +// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48 +#[doc(hidden)] +pub struct Ready(Option); + +pub(crate) fn ready(t: T) -> Ready { + Ready(Some(t)) +} + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(self.get_mut().0.take().unwrap()) + } +} + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(Pin>>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 69f16628..6b0d0aea 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -3,8 +3,8 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_channel::mpsc::UnboundedSender; -use futures_channel::oneshot; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; use crate::builder::ServerBuilder; use crate::signals::Signal; @@ -41,11 +41,11 @@ impl Server { } pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); + let _ = self.0.send(ServerCommand::Signal(sig)); } pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx)); + let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -54,7 +54,7 @@ impl Server { /// All opened connection remains active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); + let _ = self.0.send(ServerCommand::Pause(tx)); async { let _ = rx.await; } @@ -63,7 +63,7 @@ impl Server { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); + let _ = self.0.send(ServerCommand::Resume(tx)); async { let _ = rx.await; } @@ -74,7 +74,7 @@ impl Server { /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.unbounded_send(ServerCommand::Stop { + let _ = self.0.send(ServerCommand::Stop { graceful, completion: Some(tx), }); @@ -98,7 +98,7 @@ impl Future for Server { if this.1.is_none() { let (tx, rx) = oneshot::channel(); - if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() { + if this.0.send(ServerCommand::Notify(tx)).is_err() { return Poll::Ready(Ok(())); } this.1 = Some(rx); diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 3f871188..04b7dce8 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -2,15 +2,13 @@ use std::marker::PhantomData; use std::net::SocketAddr; use std::task::{Context, Poll}; -use actix_rt::spawn; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_utils::counter::CounterGuard; -use futures_util::future::{ready, Ready}; +use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::LocalBoxFuture; -use crate::Token; +use crate::{ready, Ready, Token}; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index e914dd2b..ea1de47e 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,8 +2,9 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use futures_core::future::LocalBoxFuture; + use crate::server::Server; -use crate::LocalBoxFuture; /// Different types of process signals #[allow(dead_code)] diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 105ef246..056979f7 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -32,9 +32,9 @@ impl WakerQueue { /// /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match /// event's token for it to properly handle `WakerInterest`. - pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result { + pub(crate) fn new(registry: &Registry) -> std::io::Result { let waker = Waker::new(registry, WAKER_TOKEN)?; - let queue = ConcurrentQueue::bounded(cap); + let queue = ConcurrentQueue::unbounded(); Ok(Self(Arc::new((waker, queue)))) } @@ -43,10 +43,9 @@ impl WakerQueue { pub(crate) fn wake(&self, interest: WakerInterest) { let (waker, queue) = self.deref(); - // FIXME: should we handle error here? queue .push(interest) - .unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e)); + .unwrap_or_else(|e| panic!("WakerQueue closed: {}", e)); waker .wake() diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a235d076..91e98fc2 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -8,18 +8,15 @@ use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; -use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures_channel::oneshot; -use futures_util::future::join_all; -use futures_util::stream::Stream; -use futures_util::TryFutureExt; +use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::{MioStream, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::LocalBoxFuture; -use crate::Token; +use crate::{join_all, Token}; pub(crate) struct WorkerCommand(Conn); @@ -84,9 +81,7 @@ impl WorkerHandle { } pub fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx1 - .unbounded_send(WorkerCommand(msg)) - .map_err(|msg| msg.into_inner().0) + self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) } pub fn available(&self) -> bool { @@ -95,7 +90,7 @@ impl WorkerHandle { pub fn stop(&self, graceful: bool) -> oneshot::Receiver { let (result, rx) = oneshot::channel(); - let _ = self.tx2.unbounded_send(StopCommand { graceful, result }); + let _ = self.tx2.send(StopCommand { graceful, result }); rx } } @@ -172,8 +167,8 @@ impl Worker { availability: WorkerAvailability, shutdown_timeout: Duration, ) -> WorkerHandle { - let (tx1, rx) = unbounded(); - let (tx2, rx2) = unbounded(); + let (tx1, rx) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); // every worker runs in it's own arbiter. @@ -195,9 +190,12 @@ impl Worker { .iter() .enumerate() .map(|(idx, factory)| { - factory.create().map_ok(move |r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) + let fut = factory.create(); + async move { + fut.await.map(|r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() + }) + } }) .collect::>(); @@ -312,7 +310,7 @@ impl Future for Worker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_next(cx) + Pin::new(&mut self.rx2).poll_recv(cx) { self.availability.set(false); let num = num_connections(); @@ -432,7 +430,7 @@ impl Future for Worker { } } - match Pin::new(&mut self.rx).poll_next(cx) { + match Pin::new(&mut self.rx).poll_recv(cx) { // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { let guard = self.conns.get(); diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 0a1b525b..1f567934 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -49,14 +49,14 @@ fn test_listen() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - sys.block_on(lazy(|_| { + sys.block_on(async { Server::build() .disable_signals() .workers(1) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .unwrap() .start() - })); + }); let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 971741e8..82c5adb3 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -3,10 +3,16 @@ ## Unreleased - 2020-xx-xx * `Service`, other traits, and many type signatures now take the the request type as a type parameter instead of an associated type. [#232] -* Upgrade `pin-project` to `1.0`. - +* Add `always_ready!` and `forward_ready!` macros. [#233] +* Crate is now `no_std`. [#233] +* Migrate pin projections to `pin-project-lite`. [#233] +* Remove `AndThenApplyFn` and Pipeline `and_then_apply_fn`. Use the + `.and_then(apply_fn(...))` construction. [#233] +* Move non-vital methods to `ServiceExt` and `ServiceFactoryExt` extension traits. [#235] [#232]: https://github.com/actix/actix-net/pull/232 +[#233]: https://github.com/actix/actix-net/pull/233 +[#235]: https://github.com/actix/actix-net/pull/235 ## 1.0.6 - 2020-08-09 diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 1505873b..c08bb169 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -17,17 +17,9 @@ name = "actix_service" path = "src/lib.rs" [dependencies] -futures-util = "0.3.1" -pin-project = "1.0.0" +futures-core = { version = "0.3.7", default-features = false } +pin-project-lite = "0.2" [dev-dependencies] actix-rt = "1.0.0" -criterion = "0.3" - -[[bench]] -name = "unsafecell_vs_refcell" -harness = false - -[[bench]] -name = "and_then" -harness = false +futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-service/benches/and_then.rs b/actix-service/benches/and_then.rs deleted file mode 100644 index 6eb88a1e..00000000 --- a/actix-service/benches/and_then.rs +++ /dev/null @@ -1,334 +0,0 @@ -use actix_service::boxed::BoxFuture; -use actix_service::IntoService; -use actix_service::Service; -/// Benchmark various implementations of and_then -use criterion::{criterion_main, Criterion}; -use futures_util::future::join_all; -use futures_util::future::TryFutureExt; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::{ - cell::{RefCell, UnsafeCell}, - marker::PhantomData, -}; - -/* - * Test services A,B for AndThen service implementations - */ - -async fn svc1(_: ()) -> Result { - Ok(1) -} - -async fn svc2(req: usize) -> Result { - Ok(req + 1) -} - -/* - * AndThenUC - original AndThen service based on UnsafeCell - * Cut down version of actix_service::AndThenService based on actix-service::Cell - */ - -struct AndThenUC(Rc>, PhantomData); - -impl AndThenUC { - fn new(a: A, b: B) -> Self - where - A: Service, - B: Service, - { - Self(Rc::new(UnsafeCell::new((a, b))), PhantomData) - } -} - -impl Clone for AndThenUC { - fn clone(&self) -> Self { - Self(self.0.clone(), PhantomData) - } -} - -impl Service for AndThenUC -where - A: Service, - B: Service, -{ - type Response = B::Response; - type Error = A::Error; - type Future = AndThenServiceResponse; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - let fut = unsafe { &mut *(*self.0).get() }.0.call(req); - AndThenServiceResponse { - state: State::A(fut, Some(self.0.clone())), - _phantom: PhantomData, - } - } -} - -#[pin_project::pin_project] -pub(crate) struct AndThenServiceResponse -where - A: Service, - B: Service, -{ - #[pin] - state: State, - _phantom: PhantomData, -} - -#[pin_project::pin_project(project = StateProj)] -enum State -where - A: Service, - B: Service, -{ - A(#[pin] A::Future, Option>>), - B(#[pin] B::Future), - Empty(PhantomData), -} - -impl Future for AndThenServiceResponse -where - A: Service, - B: Service, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - StateProj::A(fut, b) => match fut.poll(cx)? { - Poll::Ready(res) => { - let b = b.take().unwrap(); - this.state.set(State::Empty(PhantomData)); // drop fut A - let fut = unsafe { &mut (*b.get()).1 }.call(res); - this.state.set(State::B(fut)); - self.poll(cx) - } - Poll::Pending => Poll::Pending, - }, - StateProj::B(fut) => fut.poll(cx).map(|r| { - this.state.set(State::Empty(PhantomData)); - r - }), - StateProj::Empty(_) => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } - } -} - -/* - * AndThenRC - AndThen service based on RefCell - */ - -struct AndThenRC(Rc>, PhantomData); - -impl AndThenRC { - fn new(a: A, b: B) -> Self - where - A: Service, - B: Service, - { - Self(Rc::new(RefCell::new((a, b))), PhantomData) - } -} - -impl Clone for AndThenRC { - fn clone(&self) -> Self { - Self(self.0.clone(), PhantomData) - } -} - -impl Service for AndThenRC -where - A: Service, - B: Service, -{ - type Response = B::Response; - type Error = A::Error; - type Future = AndThenServiceResponseRC; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - let fut = self.0.borrow_mut().0.call(req); - AndThenServiceResponseRC { - state: StateRC::A(fut, Some(self.0.clone())), - } - } -} - -#[pin_project::pin_project] -pub(crate) struct AndThenServiceResponseRC -where - A: Service, - B: Service, -{ - #[pin] - state: StateRC, -} - -#[pin_project::pin_project(project = StateRCProj)] -enum StateRC -where - A: Service, - B: Service, -{ - A(#[pin] A::Future, Option>>), - B(#[pin] B::Future), - Empty(PhantomData), -} - -impl Future for AndThenServiceResponseRC -where - A: Service, - B: Service, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - StateRCProj::A(fut, b) => match fut.poll(cx)? { - Poll::Ready(res) => { - let b = b.take().unwrap(); - this.state.set(StateRC::Empty(PhantomData)); // drop fut A - let fut = b.borrow_mut().1.call(res); - this.state.set(StateRC::B(fut)); - self.poll(cx) - } - Poll::Pending => Poll::Pending, - }, - StateRCProj::B(fut) => fut.poll(cx).map(|r| { - this.state.set(StateRC::Empty(PhantomData)); - r - }), - StateRCProj::Empty(_) => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } - } -} - -/* - * AndThenRCFuture - AndThen service based on RefCell - * and standard futures::future::and_then combinator in a Box - */ - -struct AndThenRCFuture(Rc>, PhantomData); - -impl AndThenRCFuture { - fn new(a: A, b: B) -> Self - where - A: Service, - B: Service, - { - Self(Rc::new(RefCell::new((a, b))), PhantomData) - } -} - -impl Clone for AndThenRCFuture { - fn clone(&self) -> Self { - Self(self.0.clone(), PhantomData) - } -} - -impl Service for AndThenRCFuture -where - A: Service + 'static, - A::Future: 'static, - B: Service + 'static, - B::Future: 'static, -{ - type Response = B::Response; - type Error = A::Error; - type Future = BoxFuture; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Req) -> Self::Future { - let fut = self.0.borrow_mut().0.call(req); - let core = self.0.clone(); - let fut2 = move |res| (*core).borrow_mut().1.call(res); - Box::pin(fut.and_then(fut2)) - } -} - -/// Criterion Benchmark for async Service -/// Should be used from within criterion group: -/// ```rust,ignore -/// let mut criterion: ::criterion::Criterion<_> = -/// ::criterion::Criterion::default().configure_from_args(); -/// bench_async_service(&mut criterion, ok_service(), "async_service_direct"); -/// ``` -/// -/// Usable for benching Service wrappers: -/// Using minimum service code implementation we first measure -/// time to run minimum service, then measure time with wrapper. -/// -/// Sample output -/// async_service_direct time: [1.0908 us 1.1656 us 1.2613 us] -pub fn bench_async_service(c: &mut Criterion, srv: S, name: &str) -where - S: Service<(), Response = usize, Error = ()> + Clone + 'static, -{ - let rt = actix_rt::System::new("test"); - - // start benchmark loops - c.bench_function(name, move |b| { - b.iter_custom(|iters| { - let mut srvs: Vec<_> = (1..iters).map(|_| srv.clone()).collect(); - // exclude request generation, it appears it takes significant time vs call (3us vs 1us) - let start = std::time::Instant::now(); - // benchmark body - rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await }); - // check that at least first request succeeded - start.elapsed() - }) - }); -} - -pub fn service_benches() { - let mut criterion: ::criterion::Criterion<_> = - ::criterion::Criterion::default().configure_from_args(); - bench_async_service( - &mut criterion, - AndThenUC::new(svc1.into_service(), svc2.into_service()), - "AndThen with UnsafeCell", - ); - bench_async_service( - &mut criterion, - AndThenRC::new(svc1.into_service(), svc2.into_service()), - "AndThen with RefCell", - ); - bench_async_service( - &mut criterion, - AndThenUC::new(svc1.into_service(), svc2.into_service()), - "AndThen with UnsafeCell", - ); - bench_async_service( - &mut criterion, - AndThenRC::new(svc1.into_service(), svc2.into_service()), - "AndThen with RefCell", - ); - bench_async_service( - &mut criterion, - AndThenRCFuture::new(svc1.into_service(), svc2.into_service()), - "AndThen with RefCell via future::and_then", - ); -} - -criterion_main!(service_benches); diff --git a/actix-service/benches/unsafecell_vs_refcell.rs b/actix-service/benches/unsafecell_vs_refcell.rs deleted file mode 100644 index e7006065..00000000 --- a/actix-service/benches/unsafecell_vs_refcell.rs +++ /dev/null @@ -1,110 +0,0 @@ -use actix_service::Service; -use criterion::{criterion_main, Criterion}; -use futures_util::future::join_all; -use futures_util::future::{ok, Ready}; -use std::cell::{RefCell, UnsafeCell}; -use std::rc::Rc; -use std::task::{Context, Poll}; - -struct SrvUC(Rc>); - -impl Default for SrvUC { - fn default() -> Self { - Self(Rc::new(UnsafeCell::new(0))) - } -} - -impl Clone for SrvUC { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Service<()> for SrvUC { - type Response = usize; - type Error = (); - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - unsafe { *(*self.0).get() = *(*self.0).get() + 1 }; - ok(unsafe { *self.0.get() }) - } -} - -struct SrvRC(Rc>); - -impl Default for SrvRC { - fn default() -> Self { - Self(Rc::new(RefCell::new(0))) - } -} - -impl Clone for SrvRC { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Service<()> for SrvRC { - type Response = usize; - type Error = (); - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - let prev = *self.0.borrow(); - *(*self.0).borrow_mut() = prev + 1; - ok(*self.0.borrow()) - } -} - -/// Criterion Benchmark for async Service -/// Should be used from within criterion group: -/// ```rust,ignore -/// let mut criterion: ::criterion::Criterion<_> = -/// ::criterion::Criterion::default().configure_from_args(); -/// bench_async_service(&mut criterion, ok_service(), "async_service_direct"); -/// ``` -/// -/// Usable for benching Service wrappers: -/// Using minimum service code implementation we first measure -/// time to run minimum service, then measure time with wrapper. -/// -/// Sample output -/// async_service_direct time: [1.0908 us 1.1656 us 1.2613 us] -pub fn bench_async_service(c: &mut Criterion, srv: S, name: &str) -where - S: Service<(), Response = usize, Error = ()> + Clone + 'static, -{ - let rt = actix_rt::System::new("test"); - - // start benchmark loops - c.bench_function(name, move |b| { - b.iter_custom(|iters| { - let mut srvs: Vec<_> = (1..iters).map(|_| srv.clone()).collect(); - // exclude request generation, it appears it takes significant time vs call (3us vs 1us) - let start = std::time::Instant::now(); - // benchmark body - rt.block_on(async { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await }); - // check that at least first request succeeded - start.elapsed() - }) - }); -} - -pub fn service_benches() { - let mut criterion: ::criterion::Criterion<_> = - ::criterion::Criterion::default().configure_from_args(); - bench_async_service(&mut criterion, SrvUC::default(), "Service with UnsafeCell"); - bench_async_service(&mut criterion, SrvRC::default(), "Service with RefCell"); - bench_async_service(&mut criterion, SrvUC::default(), "Service with UnsafeCell"); - bench_async_service(&mut criterion, SrvRC::default(), "Service with RefCell"); -} -criterion_main!(service_benches); diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 04caf79d..fd24cb56 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -1,8 +1,13 @@ -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::{cell::RefCell, marker::PhantomData}; +use alloc::rc::Rc; +use core::{ + cell::RefCell, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; @@ -50,30 +55,43 @@ where fn call(&mut self, req: Req) -> Self::Future { AndThenServiceResponse { - state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())), + state: State::A { + fut: self.0.borrow_mut().0.call(req), + b: Some(self.0.clone()), + }, } } } -#[pin_project::pin_project] -pub(crate) struct AndThenServiceResponse -where - A: Service, - B: Service, -{ - #[pin] - state: State, +pin_project! { + pub(crate) struct AndThenServiceResponse + where + A: Service, + B: Service, + { + #[pin] + state: State, + } } -#[pin_project::pin_project(project = StateProj)] -enum State -where - A: Service, - B: Service, -{ - A(#[pin] A::Future, Option>>), - B(#[pin] B::Future), - Empty, +pin_project! { + #[project = StateProj] + enum State + where + A: Service, + B: Service, + { + A { + #[pin] + fut: A::Future, + b: Option>>, + }, + B { + #[pin] + fut: B::Future, + }, + Empty, + } } impl Future for AndThenServiceResponse @@ -87,17 +105,17 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - StateProj::A(fut, b) => match fut.poll(cx)? { + StateProj::A { fut, b } => match fut.poll(cx)? { Poll::Ready(res) => { let b = b.take().unwrap(); this.state.set(State::Empty); // drop fut A let fut = b.borrow_mut().1.call(res); - this.state.set(State::B(fut)); + this.state.set(State::B { fut }); self.poll(cx) } Poll::Pending => Poll::Pending, }, - StateProj::B(fut) => fut.poll(cx).map(|r| { + StateProj::B { fut } => fut.poll(cx).map(|r| { this.state.set(State::Empty); r }), @@ -191,19 +209,20 @@ where } } -#[pin_project::pin_project] -pub(crate) struct AndThenServiceFactoryResponse -where - A: ServiceFactory, - B: ServiceFactory, -{ - #[pin] - fut_a: A::Future, - #[pin] - fut_b: B::Future, +pin_project! { + pub(crate) struct AndThenServiceFactoryResponse + where + A: ServiceFactory, + B: ServiceFactory, + { + #[pin] + fut_a: A::Future, + #[pin] + fut_b: B::Future, - a: Option, - b: Option, + a: Option, + b: Option, + } } impl AndThenServiceFactoryResponse @@ -254,13 +273,17 @@ where #[cfg(test)] mod tests { - use std::cell::Cell; - use std::rc::Rc; - use std::task::{Context, Poll}; + use alloc::rc::Rc; + use core::{ + cell::Cell, + task::{Context, Poll}, + }; - use futures_util::future::{lazy, ok, ready, Ready}; + use futures_util::future::lazy; - use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{ + fn_factory, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory, + }; struct Srv1(Rc>); diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs deleted file mode 100644 index c7bd098c..00000000 --- a/actix-service/src/and_then_apply_fn.rs +++ /dev/null @@ -1,334 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use crate::{Service, ServiceFactory}; - -/// `Apply` service combinator -pub(crate) struct AndThenApplyFn -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - svc: Rc>, - _phantom: PhantomData<(Fut, Req, In, Res, Err)>, -} - -impl AndThenApplyFn -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - /// Create new `Apply` combinator - pub(crate) fn new(a: S1, b: S2, wrap_fn: F) -> Self { - Self { - svc: Rc::new(RefCell::new((a, b, wrap_fn))), - _phantom: PhantomData, - } - } -} - -impl Clone - for AndThenApplyFn -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - fn clone(&self) -> Self { - AndThenApplyFn { - svc: self.svc.clone(), - _phantom: PhantomData, - } - } -} - -impl Service - for AndThenApplyFn -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - type Response = Res; - type Error = Err; - type Future = AndThenApplyFnFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.svc.borrow_mut(); - let not_ready = inner.0.poll_ready(cx)?.is_pending(); - if inner.1.poll_ready(cx)?.is_pending() || not_ready { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: Req) -> Self::Future { - let fut = self.svc.borrow_mut().0.call(req); - AndThenApplyFnFuture { - state: State::A(fut, Some(self.svc.clone())), - } - } -} - -#[pin_project::pin_project] -pub(crate) struct AndThenApplyFnFuture -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - #[pin] - state: State, -} - -#[pin_project::pin_project(project = StateProj)] -enum State -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - A(#[pin] S1::Future, Option>>), - B(#[pin] Fut), - Empty(PhantomData), -} - -impl Future - for AndThenApplyFnFuture -where - S1: Service, - S2: Service, - F: FnMut(S1::Response, &mut S2) -> Fut, - Fut: Future>, - Err: From + From, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - match this.state.as_mut().project() { - StateProj::A(fut, b) => match fut.poll(cx)? { - Poll::Ready(res) => { - let b = Option::take(b).unwrap(); - this.state.set(State::Empty(PhantomData)); - let (_, b, f) = &mut *b.borrow_mut(); - let fut = f(res, b); - this.state.set(State::B(fut)); - self.poll(cx) - } - Poll::Pending => Poll::Pending, - }, - StateProj::B(fut) => fut.poll(cx).map(|r| { - this.state.set(State::Empty(PhantomData)); - r - }), - StateProj::Empty(_) => { - panic!("future must not be polled after it returned `Poll::Ready`") - } - } - } -} - -/// `AndThenApplyFn` service factory -pub(crate) struct AndThenApplyFnFactory { - srv: Rc<(SF1, SF2, F)>, - _phantom: PhantomData<(Fut, Req, In, Res, Err)>, -} - -impl - AndThenApplyFnFactory -where - SF1: ServiceFactory, - SF2: ServiceFactory, - F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, - Fut: Future>, - Err: From + From, -{ - /// Create new `ApplyNewService` new service instance - pub(crate) fn new(a: SF1, b: SF2, wrap_fn: F) -> Self { - Self { - srv: Rc::new((a, b, wrap_fn)), - _phantom: PhantomData, - } - } -} - -impl Clone - for AndThenApplyFnFactory -{ - fn clone(&self) -> Self { - Self { - srv: self.srv.clone(), - _phantom: PhantomData, - } - } -} - -impl ServiceFactory - for AndThenApplyFnFactory -where - SF1: ServiceFactory, - SF1::Config: Clone, - SF2: ServiceFactory, - F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, - Fut: Future>, - Err: From + From, -{ - type Response = Res; - type Error = Err; - type Service = AndThenApplyFn; - type Config = SF1::Config; - type InitError = SF1::InitError; - type Future = AndThenApplyFnFactoryResponse; - - fn new_service(&self, cfg: SF1::Config) -> Self::Future { - let srv = &*self.srv; - AndThenApplyFnFactoryResponse { - s1: None, - s2: None, - wrap_fn: srv.2.clone(), - fut_s1: srv.0.new_service(cfg.clone()), - fut_s2: srv.1.new_service(cfg), - _phantom: PhantomData, - } - } -} - -#[pin_project::pin_project] -pub(crate) struct AndThenApplyFnFactoryResponse -where - SF1: ServiceFactory, - SF2: ServiceFactory, - F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, - Fut: Future>, - Err: From, - Err: From, -{ - #[pin] - fut_s1: SF1::Future, - #[pin] - fut_s2: SF2::Future, - wrap_fn: F, - s1: Option, - s2: Option, - _phantom: PhantomData, -} - -impl Future - for AndThenApplyFnFactoryResponse -where - SF1: ServiceFactory, - SF2: ServiceFactory, - F: FnMut(SF1::Response, &mut SF2::Service) -> Fut + Clone, - Fut: Future>, - Err: From + From, -{ - type Output = Result< - AndThenApplyFn, - SF1::InitError, - >; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.s1.is_none() { - if let Poll::Ready(service) = this.fut_s1.poll(cx)? { - *this.s1 = Some(service); - } - } - - if this.s2.is_none() { - if let Poll::Ready(service) = this.fut_s2.poll(cx)? { - *this.s2 = Some(service); - } - } - - if this.s1.is_some() && this.s2.is_some() { - Poll::Ready(Ok(AndThenApplyFn { - svc: Rc::new(RefCell::new(( - Option::take(this.s1).unwrap(), - Option::take(this.s2).unwrap(), - this.wrap_fn.clone(), - ))), - _phantom: PhantomData, - })) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use futures_util::future::{lazy, ok, Ready, TryFutureExt}; - - use crate::{fn_service, pipeline, pipeline_factory, Service, ServiceFactory}; - - #[derive(Clone)] - struct Srv; - - impl Service for Srv { - type Response = (); - type Error = (); - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: u8) -> Self::Future { - let _ = req; - ok(()) - } - } - - #[actix_rt::test] - async fn test_service() { - let mut srv = pipeline(ok).and_then_apply_fn(Srv, |req: &'static str, s| { - s.call(1).map_ok(move |res| (req, res)) - }); - let res = lazy(|cx| srv.poll_ready(cx)).await; - assert!(res.is_ready()); - - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), ("srv", ())); - } - - #[actix_rt::test] - async fn test_service_factory() { - let new_srv = pipeline_factory(|| ok::<_, ()>(fn_service(ok))).and_then_apply_fn( - || ok(Srv), - |req: &'static str, s| s.call(1).map_ok(move |res| (req, res)), - ); - let mut srv = new_srv.new_service(()).await.unwrap(); - let res = lazy(|cx| srv.poll_ready(cx)).await; - assert!(res.is_ready()); - - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), ("srv", ())); - } -} diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index 27a09684..9b0c4025 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,11 +1,12 @@ -use std::{ +use core::{ future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}, }; -use futures_util::ready; +use futures_core::ready; +use pin_project_lite::pin_project; use super::{IntoService, IntoServiceFactory, Service, ServiceFactory}; @@ -94,9 +95,7 @@ where type Error = Err; type Future = Fut; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(ready!(self.service.poll_ready(cx))) - } + crate::forward_ready!(service); fn call(&mut self, req: Req) -> Self::Future { (self.wrap_fn)(req, &mut self.service) @@ -162,17 +161,18 @@ where } } -#[pin_project::pin_project] -pub struct ApplyServiceFactoryResponse -where - SF: ServiceFactory, - F: FnMut(Req, &mut SF::Service) -> Fut, - Fut: Future>, -{ - #[pin] - fut: SF::Future, - wrap_fn: Option, - _phantom: PhantomData<(Req, Res)>, +pin_project! { + pub struct ApplyServiceFactoryResponse + where + SF: ServiceFactory, + F: FnMut(Req, &mut SF::Service) -> Fut, + Fut: Future>, + { + #[pin] + fut: SF::Future, + wrap_fn: Option, + _phantom: PhantomData<(Req, Res)>, + } } impl ApplyServiceFactoryResponse @@ -203,18 +203,18 @@ where let this = self.project(); let svc = ready!(this.fut.poll(cx))?; - Poll::Ready(Ok(Apply::new(svc, Option::take(this.wrap_fn).unwrap()))) + Poll::Ready(Ok(Apply::new(svc, this.wrap_fn.take().unwrap()))) } } #[cfg(test)] mod tests { - use std::task::{Context, Poll}; + use core::task::Poll; - use futures_util::future::{lazy, ok, Ready}; + use futures_util::future::lazy; use super::*; - use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{ok, pipeline, pipeline_factory, Ready, Service, ServiceFactory}; #[derive(Clone)] struct Srv; @@ -224,9 +224,7 @@ mod tests { type Error = (); type Future = Ready>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + crate::always_ready!(); fn call(&mut self, _: ()) -> Self::Future { ok(()) diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index da24e87d..3e111231 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -1,9 +1,13 @@ -use std::cell::RefCell; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; +use alloc::rc::Rc; +use core::{ + cell::RefCell, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use crate::{Service, ServiceFactory}; @@ -156,37 +160,42 @@ where ApplyConfigServiceFactoryResponse { cfg: Some(cfg), store: self.srv.clone(), - state: State::A(self.srv.borrow().0.new_service(())), + state: State::A { + fut: self.srv.borrow().0.new_service(()), + }, } } } -#[pin_project::pin_project] -struct ApplyConfigServiceFactoryResponse -where - SF: ServiceFactory, - SF::InitError: From, - F: FnMut(Cfg, &mut SF::Service) -> Fut, - Fut: Future>, - S: Service, -{ - cfg: Option, - store: Rc>, - #[pin] - state: State, +pin_project! { + struct ApplyConfigServiceFactoryResponse + where + SF: ServiceFactory, + SF::InitError: From, + F: FnMut(Cfg, &mut SF::Service) -> Fut, + Fut: Future>, + S: Service, + { + cfg: Option, + store: Rc>, + #[pin] + state: State, + } } -#[pin_project::pin_project(project = StateProj)] -enum State -where - SF: ServiceFactory, - SF::InitError: From, - Fut: Future>, - S: Service, -{ - A(#[pin] SF::Future), - B(SF::Service), - C(#[pin] Fut), +pin_project! { + #[project = StateProj] + enum State + where + SF: ServiceFactory, + SF::InitError: From, + Fut: Future>, + S: Service, + { + A { #[pin] fut: SF::Future }, + B { svc: SF::Service }, + C { #[pin] fut: Fut }, + } } impl Future @@ -204,25 +213,25 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - StateProj::A(fut) => match fut.poll(cx)? { + StateProj::A { fut } => match fut.poll(cx)? { Poll::Pending => Poll::Pending, - Poll::Ready(srv) => { - this.state.set(State::B(srv)); + Poll::Ready(svc) => { + this.state.set(State::B { svc }); self.poll(cx) } }, - StateProj::B(srv) => match srv.poll_ready(cx)? { + StateProj::B { svc } => match svc.poll_ready(cx)? { Poll::Ready(_) => { { let (_, f) = &mut *this.store.borrow_mut(); - let fut = f(this.cfg.take().unwrap(), srv); - this.state.set(State::C(fut)); + let fut = f(this.cfg.take().unwrap(), svc); + this.state.set(State::C { fut }); } self.poll(cx) } Poll::Pending => Poll::Pending, }, - StateProj::C(fut) => fut.poll(cx), + StateProj::C { fut } => fut.poll(cx), } } } diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index 35a10dac..5c4557df 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -1,8 +1,10 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{future::Future, marker::PhantomData}; - -use futures_util::future::FutureExt; +use alloc::boxed::Box; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; use crate::{Service, ServiceFactory}; @@ -28,7 +30,7 @@ where { BoxServiceFactory(Box::new(FactoryWrapper { factory, - _t: std::marker::PhantomData, + _t: PhantomData, })) } @@ -75,12 +77,9 @@ where } } -struct FactoryWrapper -where - SF: ServiceFactory, -{ +struct FactoryWrapper { factory: SF, - _t: PhantomData<(C, Req)>, + _t: PhantomData<(Req, Cfg)>, } impl ServiceFactory for FactoryWrapper @@ -102,11 +101,11 @@ where type Future = BoxFuture>; fn new_service(&self, cfg: Cfg) -> Self::Future { - Box::pin( - self.factory - .new_service(cfg) - .map(|res| res.map(ServiceWrapper::boxed)), - ) + let fut = self.factory.new_service(cfg); + Box::pin(async { + let res = fut.await; + res.map(ServiceWrapper::boxed) + }) } } diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs new file mode 100644 index 00000000..e778d11e --- /dev/null +++ b/actix-service/src/ext.rs @@ -0,0 +1,70 @@ +use crate::{dev, Service, ServiceFactory}; + +pub trait ServiceExt: Service { + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + /// + /// This function is similar to the `Option::map` or `Iterator::map` where + /// it will change the type of the underlying service. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it, similar to the existing `map` methods in the + /// standard library. + fn map(self, f: F) -> dev::Map + where + Self: Sized, + F: FnMut(Self::Response) -> R, + { + dev::Map::new(self, f) + } + + /// Map this service's error to a different error, returning a new service. + /// + /// This function is similar to the `Result::map_err` where it will change + /// the error type of the underlying service. For example, this can be useful to + /// ensure that services have the same error type. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it. + fn map_err(self, f: F) -> dev::MapErr + where + Self: Sized, + F: Fn(Self::Error) -> E, + { + dev::MapErr::new(self, f) + } +} + +impl ServiceExt for S where S: Service {} + +pub trait ServiceFactoryExt: ServiceFactory { + /// Map this service's output to a different type, returning a new service + /// of the resulting type. + fn map(self, f: F) -> crate::map::MapServiceFactory + where + Self: Sized, + F: FnMut(Self::Response) -> R + Clone, + { + crate::map::MapServiceFactory::new(self, f) + } + + /// Map this service's error to a different error, returning a new service. + fn map_err(self, f: F) -> crate::map_err::MapErrServiceFactory + where + Self: Sized, + F: Fn(Self::Error) -> E + Clone, + { + crate::map_err::MapErrServiceFactory::new(self, f) + } + + /// Map this factory's init error to a different error, returning a new service. + fn map_init_err(self, f: F) -> crate::map_init_err::MapInitErr + where + Self: Sized, + F: Fn(Self::InitError) -> E + Clone, + { + crate::map_init_err::MapInitErr::new(self, f) + } +} + +impl ServiceFactoryExt for S where S: ServiceFactory {} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 7d15304d..9f7d1eb7 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,10 +1,6 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::task::{Context, Poll}; +use core::{future::Future, marker::PhantomData, task::Poll}; -use futures_util::future::{ok, Ready}; - -use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory}; +use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; /// Create `ServiceFactory` for function that can act as a `Service` pub fn fn_service( @@ -143,9 +139,7 @@ where type Error = Err; type Future = Fut; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + crate::always_ready!(); fn call(&mut self, req: Req) -> Self::Future { (self.f)(req) @@ -200,9 +194,7 @@ where type Error = Err; type Future = Fut; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + crate::always_ready!(); fn call(&mut self, req: Req) -> Self::Future { (self.f)(req) @@ -361,12 +353,12 @@ where #[cfg(test)] mod tests { - use std::task::Poll; + use core::task::Poll; - use futures_util::future::{lazy, ok}; + use futures_util::future::lazy; use super::*; - use crate::{Service, ServiceFactory}; + use crate::{ok, Service, ServiceFactory}; #[actix_rt::test] async fn test_fn_service() { diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 2dfa0dd7..7bf979e5 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -1,38 +1,47 @@ //! See [`Service`] docs for information on this crate's foundational trait. +#![no_std] #![deny(rust_2018_idioms, nonstandard_style)] #![allow(clippy::type_complexity)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -use std::cell::RefCell; -use std::future::Future; -use std::rc::Rc; -use std::sync::Arc; -use std::task::{self, Context, Poll}; +extern crate alloc; + +use alloc::{boxed::Box, rc::Rc, sync::Arc}; +use core::{ + cell::RefCell, + future::Future, + task::{self, Context, Poll}, +}; mod and_then; -mod and_then_apply_fn; mod apply; mod apply_cfg; pub mod boxed; +mod ext; mod fn_service; mod map; mod map_config; mod map_err; mod map_init_err; mod pipeline; +mod ready; mod then; mod transform; mod transform_err; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; +pub use self::ext::{ServiceExt, ServiceFactoryExt}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::map_config::{map_config, unit_config}; pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; pub use self::transform::{apply, Transform}; +#[allow(unused_imports)] +use self::ready::{err, ok, ready, Ready}; + /// An asynchronous operation from `Request` to a `Response`. /// /// The `Service` trait models a request/response interaction, receiving requests and returning @@ -107,39 +116,6 @@ pub trait Service { /// Calling `call` without calling `poll_ready` is permitted. The /// implementation must be resilient to this fact. fn call(&mut self, req: Req) -> Self::Future; - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - /// - /// This function is similar to the `Option::map` or `Iterator::map` where - /// it will change the type of the underlying service. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - fn map(self, f: F) -> crate::dev::Map - where - Self: Sized, - F: FnMut(Self::Response) -> R, - { - crate::dev::Map::new(self, f) - } - - /// Map this service's error to a different error, returning a new service. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying service. For example, this can be useful to - /// ensure that services have the same error type. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - fn map_err(self, f: F) -> crate::dev::MapErr - where - Self: Sized, - F: Fn(Self::Error) -> E, - { - crate::dev::MapErr::new(self, f) - } } /// Factory for creating `Service`s. @@ -172,34 +148,6 @@ pub trait ServiceFactory { /// Create and return a new service asynchronously. fn new_service(&self, cfg: Self::Config) -> Self::Future; - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - fn map(self, f: F) -> crate::map::MapServiceFactory - where - Self: Sized, - F: FnMut(Self::Response) -> R + Clone, - { - crate::map::MapServiceFactory::new(self, f) - } - - /// Map this service's error to a different error, returning a new service. - fn map_err(self, f: F) -> crate::map_err::MapErrServiceFactory - where - Self: Sized, - F: Fn(Self::Error) -> E + Clone, - { - crate::map_err::MapErrServiceFactory::new(self, f) - } - - /// Map this factory's init error to a different error, returning a new service. - fn map_init_err(self, f: F) -> crate::map_init_err::MapInitErr - where - Self: Sized, - F: Fn(Self::InitError) -> E + Clone, - { - crate::map_init_err::MapInitErr::new(self, f) - } } impl<'a, S, Req> Service for &'a mut S @@ -359,3 +307,27 @@ pub mod dev { pub use crate::transform::ApplyTransform; pub use crate::transform_err::TransformMapInitErr; } + +#[macro_export] +macro_rules! always_ready { + () => { + fn poll_ready( + &mut self, + _: &mut ::core::task::Context<'_>, + ) -> ::core::task::Poll> { + Poll::Ready(Ok(())) + } + }; +} + +#[macro_export] +macro_rules! forward_ready { + ($field:ident) => { + fn poll_ready( + &mut self, + cx: &mut ::core::task::Context<'_>, + ) -> ::core::task::Poll> { + self.$field.poll_ready(cx) + } + }; +} diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index 04ef8c5f..0599a1d8 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -1,7 +1,11 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; @@ -52,24 +56,23 @@ where type Error = A::Error; type Future = MapFuture; - fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(ctx) - } + crate::forward_ready!(service); fn call(&mut self, req: Req) -> Self::Future { MapFuture::new(self.service.call(req), self.f.clone()) } } -#[pin_project::pin_project] -pub struct MapFuture -where - A: Service, - F: FnMut(A::Response) -> Res, -{ - f: F, - #[pin] - fut: A::Future, +pin_project! { + pub struct MapFuture + where + A: Service, + F: FnMut(A::Response) -> Res, + { + f: F, + #[pin] + fut: A::Future, + } } impl MapFuture @@ -154,15 +157,16 @@ where } } -#[pin_project::pin_project] -pub struct MapServiceFuture -where - A: ServiceFactory, - F: FnMut(A::Response) -> Res, -{ - #[pin] - fut: A::Future, - f: Option, +pin_project! { + pub struct MapServiceFuture + where + A: ServiceFactory, + F: FnMut(A::Response) -> Res, + { + #[pin] + fut: A::Future, + f: Option, + } } impl MapServiceFuture @@ -195,10 +199,12 @@ where #[cfg(test)] mod tests { - use futures_util::future::{lazy, ok, Ready}; + use futures_util::future::lazy; use super::*; - use crate::{IntoServiceFactory, Service, ServiceFactory}; + use crate::{ + ok, IntoServiceFactory, Ready, Service, ServiceExt, ServiceFactory, ServiceFactoryExt, + }; struct Srv; @@ -207,9 +213,7 @@ mod tests { type Error = (); type Future = Ready>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + crate::always_ready!(); fn call(&mut self, _: ()) -> Self::Future { ok(()) diff --git a/actix-service/src/map_config.rs b/actix-service/src/map_config.rs index 82b1789b..d6d6f6b2 100644 --- a/actix-service/src/map_config.rs +++ b/actix-service/src/map_config.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use core::marker::PhantomData; use super::{IntoServiceFactory, ServiceFactory}; diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index ae7442cc..944056c2 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -1,7 +1,11 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; @@ -62,15 +66,16 @@ where } } -#[pin_project::pin_project] -pub struct MapErrFuture -where - A: Service, - F: Fn(A::Error) -> E, -{ - f: F, - #[pin] - fut: A::Future, +pin_project! { + pub struct MapErrFuture + where + A: Service, + F: Fn(A::Error) -> E, + { + f: F, + #[pin] + fut: A::Future, + } } impl MapErrFuture @@ -157,15 +162,16 @@ where } } -#[pin_project::pin_project] -pub struct MapErrServiceFuture -where - A: ServiceFactory, - F: Fn(A::Error) -> E, -{ - #[pin] - fut: A::Future, - f: F, +pin_project! { + pub struct MapErrServiceFuture + where + A: ServiceFactory, + F: Fn(A::Error) -> E, + { + #[pin] + fut: A::Future, + f: F, + } } impl MapErrServiceFuture @@ -197,10 +203,13 @@ where #[cfg(test)] mod tests { - use futures_util::future::{err, lazy, ok, Ready}; + use futures_util::future::lazy; use super::*; - use crate::{IntoServiceFactory, Service, ServiceFactory}; + use crate::{ + err, ok, IntoServiceFactory, Ready, Service, ServiceExt, ServiceFactory, + ServiceFactoryExt, + }; struct Srv; diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index 518daaf6..9fc383aa 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -1,7 +1,11 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::ServiceFactory; @@ -59,15 +63,16 @@ where } } -#[pin_project::pin_project] -pub struct MapInitErrFuture -where - A: ServiceFactory, - F: Fn(A::InitError) -> E, -{ - f: F, - #[pin] - fut: A::Future, +pin_project! { + pub struct MapInitErrFuture + where + A: ServiceFactory, + F: Fn(A::InitError) -> E, + { + f: F, + #[pin] + fut: A::Future, + } } impl MapInitErrFuture diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index cba7ce78..580d7b4c 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -1,8 +1,9 @@ -use std::task::{Context, Poll}; -use std::{future::Future, marker::PhantomData}; +use core::{ + marker::PhantomData, + task::{Context, Poll}, +}; use crate::and_then::{AndThenService, AndThenServiceFactory}; -use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory}; use crate::map::{Map, MapServiceFactory}; use crate::map_err::{MapErr, MapErrServiceFactory}; use crate::map_init_err::MapInitErr; @@ -67,28 +68,6 @@ where } } - /// Apply function to specified service and use it as a next service in chain. - /// - /// Short version of `pipeline_factory(...).and_then(apply_fn(...))` - pub fn and_then_apply_fn( - self, - service: I, - wrap_fn: F, - ) -> Pipeline + Clone, Req> - where - Self: Sized, - I: IntoService, - S1: Service, - F: FnMut(S::Response, &mut S1) -> Fut, - Fut: Future>, - Err: From + From, - { - Pipeline { - service: AndThenApplyFn::new(self.service, service.into_service(), wrap_fn), - _phantom: PhantomData, - } - } - /// Chain on a computation for when a call to the service finished, /// passing the result of the call to the next service `U`. /// @@ -219,39 +198,6 @@ where } } - /// Apply function to specified service and use it as a next service in chain. - /// - /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))` - pub fn and_then_apply_fn( - self, - factory: I, - wrap_fn: F, - ) -> PipelineFactory< - impl ServiceFactory< - Req, - Response = Res, - Error = Err, - Config = SF::Config, - InitError = SF::InitError, - Service = impl Service + Clone, - > + Clone, - Req, - > - where - Self: Sized, - SF::Config: Clone, - I: IntoServiceFactory, - SF1: ServiceFactory, - F: FnMut(SF::Response, &mut SF1::Service) -> Fut + Clone, - Fut: Future>, - Err: From + From, - { - PipelineFactory { - factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), wrap_fn), - _phantom: PhantomData, - } - } - /// Create `NewService` to chain on a computation for when a call to the /// service finished, passing the result of the call to the next /// service `U`. diff --git a/actix-service/src/ready.rs b/actix-service/src/ready.rs new file mode 100644 index 00000000..8b0c2ea7 --- /dev/null +++ b/actix-service/src/ready.rs @@ -0,0 +1,54 @@ +//! When MSRV is 1.48, replace with `core::future::Ready` and `core::future::ready()`. + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`ready`](ready()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Ready { + val: Option, +} + +impl Ready { + /// Unwraps the value from this immediately ready future. + #[inline] + pub fn into_inner(mut self) -> T { + self.val.take().unwrap() + } +} + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let val = self.val.take().expect("Ready can not be polled twice."); + Poll::Ready(val) + } +} + +/// Creates a future that is immediately ready with a value. +#[allow(dead_code)] +pub(crate) fn ready(val: T) -> Ready { + Ready { val: Some(val) } +} + +/// Create a future that is immediately ready with a success value. +#[allow(dead_code)] +pub(crate) fn ok(val: T) -> Ready> { + Ready { val: Some(Ok(val)) } +} + +/// Create a future that is immediately ready with an error value. +#[allow(dead_code)] +pub(crate) fn err(err: E) -> Ready> { + Ready { + val: Some(Err(err)), + } +} diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 021e5484..060ca9c7 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -1,8 +1,13 @@ -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::{cell::RefCell, marker::PhantomData}; +use alloc::rc::Rc; +use core::{ + cell::RefCell, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; @@ -50,30 +55,36 @@ where fn call(&mut self, req: Req) -> Self::Future { ThenServiceResponse { - state: State::A(self.0.borrow_mut().0.call(req), Some(self.0.clone())), + state: State::A { + fut: self.0.borrow_mut().0.call(req), + b: Some(self.0.clone()), + }, } } } -#[pin_project::pin_project] -pub(crate) struct ThenServiceResponse -where - A: Service, - B: Service>, -{ - #[pin] - state: State, +pin_project! { + pub(crate) struct ThenServiceResponse + where + A: Service, + B: Service>, + { + #[pin] + state: State, + } } -#[pin_project::pin_project(project = StateProj)] -enum State -where - A: Service, - B: Service>, -{ - A(#[pin] A::Future, Option>>), - B(#[pin] B::Future), - Empty, +pin_project! { + #[project = StateProj] + enum State + where + A: Service, + B: Service>, + { + A { #[pin] fut: A::Future, b: Option>> }, + B { #[pin] fut: B::Future }, + Empty, + } } impl Future for ThenServiceResponse @@ -87,17 +98,17 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - StateProj::A(fut, b) => match fut.poll(cx) { + StateProj::A { fut, b } => match fut.poll(cx) { Poll::Ready(res) => { let b = b.take().unwrap(); this.state.set(State::Empty); // drop fut A let fut = b.borrow_mut().1.call(res); - this.state.set(State::B(fut)); + this.state.set(State::B { fut }); self.poll(cx) } Poll::Pending => Poll::Pending, }, - StateProj::B(fut) => fut.poll(cx).map(|r| { + StateProj::B { fut } => fut.poll(cx).map(|r| { this.state.set(State::Empty); r }), @@ -159,23 +170,24 @@ impl Clone for ThenServiceFactory { } } -#[pin_project::pin_project] -pub(crate) struct ThenServiceFactoryResponse -where - A: ServiceFactory, - B: ServiceFactory< - Result, - Config = A::Config, - Error = A::Error, - InitError = A::InitError, - >, -{ - #[pin] - fut_b: B::Future, - #[pin] - fut_a: A::Future, - a: Option, - b: Option, +pin_project! { + pub(crate) struct ThenServiceFactoryResponse + where + A: ServiceFactory, + B: ServiceFactory< + Result, + Config = A::Config, + Error = A::Error, + InitError = A::InitError, + >, + { + #[pin] + fut_b: B::Future, + #[pin] + fut_a: A::Future, + a: Option, + b: Option, + } } impl ThenServiceFactoryResponse @@ -236,13 +248,15 @@ where #[cfg(test)] mod tests { - use std::cell::Cell; - use std::rc::Rc; - use std::task::{Context, Poll}; + use alloc::rc::Rc; + use core::{ + cell::Cell, + task::{Context, Poll}, + }; - use futures_util::future::{err, lazy, ok, ready, Ready}; + use futures_util::future::lazy; - use crate::{pipeline, pipeline_factory, Service, ServiceFactory}; + use crate::{err, ok, pipeline, pipeline_factory, ready, Ready, Service, ServiceFactory}; #[derive(Clone)] struct Srv1(Rc>); diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index d4d49417..76e4547a 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,8 +1,12 @@ -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::{future::Future, marker::PhantomData}; +use alloc::{rc::Rc, sync::Arc}; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use crate::transform_err::TransformMapInitErr; use crate::{IntoServiceFactory, Service, ServiceFactory}; @@ -185,30 +189,35 @@ where fn new_service(&self, cfg: S::Config) -> Self::Future { ApplyTransformFuture { store: self.0.clone(), - state: ApplyTransformFutureState::A(self.0.as_ref().1.new_service(cfg)), + state: ApplyTransformFutureState::A { + fut: self.0.as_ref().1.new_service(cfg), + }, } } } -#[pin_project::pin_project] -pub struct ApplyTransformFuture -where - S: ServiceFactory, - T: Transform, -{ - store: Rc<(T, S)>, - #[pin] - state: ApplyTransformFutureState, +pin_project! { + pub struct ApplyTransformFuture + where + S: ServiceFactory, + T: Transform, + { + store: Rc<(T, S)>, + #[pin] + state: ApplyTransformFutureState, + } } -#[pin_project::pin_project(project = ApplyTransformFutureStateProj)] -pub enum ApplyTransformFutureState -where - S: ServiceFactory, - T: Transform, -{ - A(#[pin] S::Future), - B(#[pin] T::Future), +pin_project! { + #[project = ApplyTransformFutureStateProj] + pub enum ApplyTransformFutureState + where + S: ServiceFactory, + T: Transform, + { + A { #[pin] fut: S::Future }, + B { #[pin] fut: T::Future }, + } } impl Future for ApplyTransformFuture @@ -222,15 +231,15 @@ where let mut this = self.as_mut().project(); match this.state.as_mut().project() { - ApplyTransformFutureStateProj::A(fut) => match fut.poll(cx)? { + ApplyTransformFutureStateProj::A { fut } => match fut.poll(cx)? { Poll::Ready(srv) => { let fut = this.store.0.new_transform(srv); - this.state.set(ApplyTransformFutureState::B(fut)); + this.state.set(ApplyTransformFutureState::B { fut }); self.poll(cx) } Poll::Pending => Poll::Pending, }, - ApplyTransformFutureStateProj::B(fut) => fut.poll(cx), + ApplyTransformFutureStateProj::B { fut } => fut.poll(cx), } } } diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index 1d1b9576..cbf5fe3b 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -1,7 +1,11 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::{ + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; use super::Transform; @@ -63,15 +67,16 @@ where } } -#[pin_project::pin_project] -pub struct TransformMapInitErrFuture -where +pin_project! { + pub struct TransformMapInitErrFuture + where T: Transform, F: Fn(T::InitError) -> E, -{ - #[pin] - fut: T::Future, - f: F, + { + #[pin] + fut: T::Future, + f: F, + } } impl Future for TransformMapInitErrFuture diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 6b183be1..09e1ba08 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -37,7 +37,7 @@ nativetls = ["native-tls", "tokio-native-tls"] [dependencies] actix-service = "1.0.0" actix-codec = "0.3.0" -actix-utils = "3.0.0" +actix-utils = "2.0.0" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-tls/src/nativetls.rs b/actix-tls/src/nativetls.rs index e551bb54..0710d22d 100644 --- a/actix-tls/src/nativetls.rs +++ b/actix-tls/src/nativetls.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -16,45 +15,36 @@ use crate::MAX_CONN_COUNTER; /// Accept TLS connections via `native-tls` package. /// /// `nativetls` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { acceptor: TlsAcceptor, - io: PhantomData, } -impl Acceptor -where - T: AsyncRead + AsyncWrite + Unpin, -{ +impl Acceptor { /// Create `native-tls` based `Acceptor` service factory. #[inline] pub fn new(acceptor: TlsAcceptor) -> Self { - Acceptor { - acceptor, - io: PhantomData, - } + Acceptor { acceptor } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor +impl ServiceFactory for Acceptor where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Req: AsyncRead + AsyncWrite + Unpin + 'static, { - type Request = T; - type Response = TlsStream; + type Response = TlsStream; type Error = Error; type Config = (); - type Service = NativeTlsAcceptorService; + type Service = NativeTlsAcceptorService; type InitError = (); type Future = Ready>; @@ -63,23 +53,20 @@ where ready(Ok(NativeTlsAcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - io: PhantomData, })) }) } } -pub struct NativeTlsAcceptorService { +pub struct NativeTlsAcceptorService { acceptor: TlsAcceptor, - io: PhantomData, conns: Counter, } -impl Clone for NativeTlsAcceptorService { +impl Clone for NativeTlsAcceptorService { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, conns: self.conns.clone(), } } @@ -87,14 +74,13 @@ impl Clone for NativeTlsAcceptorService { type LocalBoxFuture<'a, T> = Pin + 'a>>; -impl Service for NativeTlsAcceptorService +impl Service for NativeTlsAcceptorService where - T: AsyncRead + AsyncWrite + Unpin + 'static, + Req: AsyncRead + AsyncWrite + Unpin + 'static, { - type Request = T; - type Response = TlsStream; + type Response = TlsStream; type Error = Error; - type Future = LocalBoxFuture<'static, Result, Error>>; + type Future = LocalBoxFuture<'static, Result, Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.conns.available(cx) { @@ -104,7 +90,7 @@ where } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { let guard = self.conns.get(); let this = self.clone(); Box::pin(async move { diff --git a/actix-tls/src/openssl.rs b/actix-tls/src/openssl.rs index 03ee8702..31072c96 100644 --- a/actix-tls/src/openssl.rs +++ b/actix-tls/src/openssl.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -17,38 +16,32 @@ use crate::MAX_CONN_COUNTER; /// Accept TLS connections via `openssl` package. /// /// `openssl` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { acceptor: SslAcceptor, - io: PhantomData, } -impl Acceptor { +impl Acceptor { /// Create OpenSSL based `Acceptor` service factory. #[inline] pub fn new(acceptor: SslAcceptor) -> Self { - Acceptor { - acceptor, - io: PhantomData, - } + Acceptor { acceptor } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor { - type Request = T; +impl ServiceFactory for Acceptor { type Response = SslStream; type Error = Error; type Config = (); - type Service = AcceptorService; + type Service = AcceptorService; type InitError = (); type Future = Ready>; @@ -57,23 +50,20 @@ impl ServiceFactory for Acceptor ready(Ok(AcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - io: PhantomData, })) }) } } -pub struct AcceptorService { +pub struct AcceptorService { acceptor: SslAcceptor, conns: Counter, - io: PhantomData, } -impl Service for AcceptorService { - type Request = T; - type Response = SslStream; +impl Service for AcceptorService { + type Response = SslStream; type Error = Error; - type Future = AcceptorServiceResponse; + type Future = AcceptorServiceResponse; fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { if self.conns.available(ctx) { @@ -83,7 +73,7 @@ impl Service for AcceptorService } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { match self.ssl_stream(req) { Ok(stream) => { let guard = self.conns.get(); @@ -94,27 +84,33 @@ impl Service for AcceptorService } } -impl AcceptorService { +impl AcceptorService { // construct a new SslStream. // At this point the SslStream does not perform any IO. // The handshake would happen later in AcceptorServiceResponse - fn ssl_stream(&self, stream: T) -> Result, Error> { + fn ssl_stream( + &self, + stream: Req, + ) -> Result, Error> { let ssl = Ssl::new(self.acceptor.context())?; let stream = SslStream::new(ssl, stream)?; Ok(stream) } } -pub enum AcceptorServiceResponse +pub enum AcceptorServiceResponse where - T: AsyncRead + AsyncWrite, + Req: AsyncRead + AsyncWrite, { - Accept(Option>, Option), + Accept(Option>, Option), Error(Option), } -impl Future for AcceptorServiceResponse { - type Output = Result, Error>; +impl Future for AcceptorServiceResponse +where + Req: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.get_mut() { diff --git a/actix-tls/src/rustls.rs b/actix-tls/src/rustls.rs index 90411c27..ca68dc6b 100644 --- a/actix-tls/src/rustls.rs +++ b/actix-tls/src/rustls.rs @@ -1,6 +1,5 @@ use std::future::Future; use std::io; -use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -20,39 +19,35 @@ use crate::MAX_CONN_COUNTER; /// Accept TLS connections via `rustls` package. /// /// `rustls` feature enables this `Acceptor` type. -pub struct Acceptor { +pub struct Acceptor { config: Arc, - io: PhantomData, } -impl Acceptor { +impl Acceptor { /// Create Rustls based `Acceptor` service factory. #[inline] pub fn new(config: ServerConfig) -> Self { Acceptor { config: Arc::new(config), - io: PhantomData, } } } -impl Clone for Acceptor { +impl Clone for Acceptor { #[inline] fn clone(&self) -> Self { Self { config: self.config.clone(), - io: PhantomData, } } } -impl ServiceFactory for Acceptor { - type Request = T; - type Response = TlsStream; +impl ServiceFactory for Acceptor { + type Response = TlsStream; type Error = io::Error; type Config = (); - type Service = AcceptorService; + type Service = AcceptorService; type InitError = (); type Future = Ready>; @@ -61,24 +56,21 @@ impl ServiceFactory for Acceptor { ready(Ok(AcceptorService { acceptor: self.config.clone().into(), conns: conns.clone(), - io: PhantomData, })) }) } } /// Rustls based `Acceptor` service -pub struct AcceptorService { +pub struct AcceptorService { acceptor: TlsAcceptor, - io: PhantomData, conns: Counter, } -impl Service for AcceptorService { - type Request = T; - type Response = TlsStream; +impl Service for AcceptorService { + type Response = TlsStream; type Error = io::Error; - type Future = AcceptorServiceFut; + type Future = AcceptorServiceFut; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.conns.available(cx) { @@ -88,7 +80,7 @@ impl Service for AcceptorService { } } - fn call(&mut self, req: Self::Request) -> Self::Future { + fn call(&mut self, req: Req) -> Self::Future { AcceptorServiceFut { _guard: self.conns.get(), fut: self.acceptor.accept(req), @@ -96,16 +88,16 @@ impl Service for AcceptorService { } } -pub struct AcceptorServiceFut +pub struct AcceptorServiceFut where - T: AsyncRead + AsyncWrite + Unpin, + Req: AsyncRead + AsyncWrite + Unpin, { - fut: Accept, + fut: Accept, _guard: CounterGuard, } -impl Future for AcceptorServiceFut { - type Output = Result, io::Error>; +impl Future for AcceptorServiceFut { + type Output = Result, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); diff --git a/actix-tracing/src/lib.rs b/actix-tracing/src/lib.rs index 36aa21f2..6d37d9b3 100644 --- a/actix-tracing/src/lib.rs +++ b/actix-tracing/src/lib.rs @@ -4,8 +4,7 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -use std::marker::PhantomData; -use std::task::{Context, Poll}; +use core::marker::PhantomData; use actix_service::{ apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, @@ -36,9 +35,7 @@ where type Error = S::Error; type Future = Either>; - fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(ctx) - } + actix_service::forward_ready!(inner); fn call(&mut self, req: Req) -> Self::Future { let span = (self.make_span)(&req); diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 5debbaf5..b112d8b1 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,15 +1,11 @@ # Changes ## Unreleased - 2020-xx-xx -<<<<<<< HEAD -* Upgrade `pin-project` to `1.0`. * Update `bytes` dependency to `1`. -======= * Use `pin-project-lite` to replace `pin-project`. [#229] * Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229] [#229]: https://github.com/actix/actix-net/pull/229 ->>>>>>> upstream/master ## 2.0.0 - 2020-08-23 * No changes from beta 1. diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 2a27ed5c..a3cb9b4d 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -200,7 +200,7 @@ where #[cfg(test)] mod tests { - use std::task::{Context, Poll}; + use std::task::Poll; use std::time::Duration; use super::*; @@ -214,9 +214,7 @@ mod tests { type Error = (); type Future = LocalBoxFuture<'static, Result<(), ()>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, _: ()) -> Self::Future { actix_rt::time::sleep(self.0)