diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 52980fb5d..11171c67d 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -19,6 +19,7 @@ * `client::error::ConnectError` Resolver variant contains `Box` type [#1905] * `client::ConnectorConfig` default timeout changed to 5 seconds. [#1905] * Simplify `BlockingError` type to a struct. It's only triggered with blocking thread pool is dead. [#1957] +* `body::ResponseBody` enum use named field. ### Removed * `ResponseBuilder::set`; use `ResponseBuilder::insert_header`. [#1869] diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 22a54f569..7f023b734 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -69,7 +69,7 @@ language-tags = "0.2" log = "0.4" mime = "0.3" percent-encoding = "2.1" -pin-project = "1.0.0" +pin-project-lite = "0.2" rand = "0.8" regex = "1.3" serde = "1.0" diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index d5d1989d4..2b7d0bcfb 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -4,7 +4,7 @@ use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use futures_core::{ready, Stream}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::error::Error; @@ -63,31 +63,33 @@ impl MessageBody for Box { } } -#[pin_project(project = ResponseBodyProj)] -pub enum ResponseBody { - Body(#[pin] B), - Other(Body), +pin_project! { + #[project = ResponseBodyProj] + pub enum ResponseBody { + Body { #[pin] body: B }, + Other { body: Body } + } } impl ResponseBody { pub fn into_body(self) -> ResponseBody { match self { - ResponseBody::Body(b) => ResponseBody::Other(b), - ResponseBody::Other(b) => ResponseBody::Other(b), + ResponseBody::Body { body } => ResponseBody::Other { body }, + ResponseBody::Other { body } => ResponseBody::Other { body }, } } } impl ResponseBody { pub fn take_body(&mut self) -> ResponseBody { - std::mem::replace(self, ResponseBody::Other(Body::None)) + std::mem::replace(self, ResponseBody::Other { body: Body::None }) } } impl ResponseBody { pub fn as_ref(&self) -> Option<&B> { - if let ResponseBody::Body(ref b) = self { - Some(b) + if let ResponseBody::Body { body } = self { + Some(body) } else { None } @@ -97,8 +99,8 @@ impl ResponseBody { impl MessageBody for ResponseBody { fn size(&self) -> BodySize { match self { - ResponseBody::Body(ref body) => body.size(), - ResponseBody::Other(ref body) => body.size(), + ResponseBody::Body { body } => body.size(), + ResponseBody::Other { body } => body.size(), } } @@ -107,8 +109,8 @@ impl MessageBody for ResponseBody { cx: &mut Context<'_>, ) -> Poll>> { match self.project() { - ResponseBodyProj::Body(body) => body.poll_next(cx), - ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), + ResponseBodyProj::Body { body } => body.poll_next(cx), + ResponseBodyProj::Other { body } => Pin::new(body).poll_next(cx), } } } @@ -121,8 +123,8 @@ impl Stream for ResponseBody { cx: &mut Context<'_>, ) -> Poll> { match self.project() { - ResponseBodyProj::Body(body) => body.poll_next(cx), - ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), + ResponseBodyProj::Body { body } => body.poll_next(cx), + ResponseBodyProj::Other { body } => Pin::new(body).poll_next(cx), } } } @@ -468,8 +470,8 @@ mod tests { impl ResponseBody { pub(crate) fn get_ref(&self) -> &[u8] { match *self { - ResponseBody::Body(ref b) => b.get_ref(), - ResponseBody::Other(ref b) => b.get_ref(), + ResponseBody::Body { ref body } => body.get_ref(), + ResponseBody::Other { ref body } => body.get_ref(), } } } diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 4c6a6dcb8..6dcedcff3 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -10,7 +10,7 @@ use bytes::Bytes; use futures_core::future::LocalBoxFuture; use futures_util::future::{err, Either, FutureExt, Ready}; use h2::client::SendRequest; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::body::MessageBody; use crate::h1::ClientCodec; @@ -245,25 +245,30 @@ where EitherConnection::A(con) => con .open_tunnel(head) .map(|res| { - res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A))) + res.map(|(head, framed)| { + (head, framed.into_map_io(|a| EitherIo::A { a })) + }) }) .boxed_local(), EitherConnection::B(con) => con .open_tunnel(head) .map(|res| { - res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B))) + res.map(|(head, framed)| { + (head, framed.into_map_io(|b| EitherIo::B { b })) + }) }) .boxed_local(), } } } -#[pin_project(project = EitherIoProj)] -pub enum EitherIo { - A(#[pin] A), - B(#[pin] B), +pin_project! { + #[project = EitherIoProj] + pub enum EitherIo { + A { #[pin] a: A }, + B { #[pin] b: B} + } } - impl AsyncRead for EitherIo where A: AsyncRead, @@ -275,8 +280,8 @@ where buf: &mut ReadBuf<'_>, ) -> Poll> { match self.project() { - EitherIoProj::A(val) => val.poll_read(cx, buf), - EitherIoProj::B(val) => val.poll_read(cx, buf), + EitherIoProj::A { a } => a.poll_read(cx, buf), + EitherIoProj::B { b } => b.poll_read(cx, buf), } } } @@ -292,15 +297,15 @@ where buf: &[u8], ) -> Poll> { match self.project() { - EitherIoProj::A(val) => val.poll_write(cx, buf), - EitherIoProj::B(val) => val.poll_write(cx, buf), + EitherIoProj::A { a } => a.poll_write(cx, buf), + EitherIoProj::B { b } => b.poll_write(cx, buf), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { - EitherIoProj::A(val) => val.poll_flush(cx), - EitherIoProj::B(val) => val.poll_flush(cx), + EitherIoProj::A { a } => a.poll_flush(cx), + EitherIoProj::B { b } => b.poll_flush(cx), } } @@ -309,8 +314,8 @@ where cx: &mut Context<'_>, ) -> Poll> { match self.project() { - EitherIoProj::A(val) => val.poll_shutdown(cx), - EitherIoProj::B(val) => val.poll_shutdown(cx), + EitherIoProj::A { a } => a.poll_shutdown(cx), + EitherIoProj::B { b } => b.poll_shutdown(cx), } } } diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 425ee0f70..13fbba3c0 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -416,6 +416,7 @@ mod connect_impl { use futures_core::ready; use futures_util::future::Either; + use pin_project_lite::pin_project; use super::*; use crate::client::connection::EitherConnection; @@ -478,15 +479,20 @@ mod connect_impl { } } - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseA - where - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - #[pin] - fut: as Service>::Future, - _phantom: PhantomData, + pin_project! { + pub(crate) struct InnerConnectorResponseA + where + Io1: AsyncRead, + Io1: AsyncWrite, + Io1: Unpin, + Io1: 'static, + T: Service, + T: 'static + { + #[pin] + fut: as Service>::Future, + _phantom: PhantomData + } } impl Future for InnerConnectorResponseA @@ -505,15 +511,20 @@ mod connect_impl { } } - #[pin_project::pin_project] - pub(crate) struct InnerConnectorResponseB - where - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - #[pin] - fut: as Service>::Future, - _phantom: PhantomData, + pin_project! { + pub(crate) struct InnerConnectorResponseB + where + Io2: AsyncRead, + Io2: AsyncWrite, + Io2: Unpin, + Io2: 'static, + T: Service, + T: 'static + { + #[pin] + fut: as Service>::Future, + _phantom: PhantomData + } } impl Future for InnerConnectorResponseB diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 24f4207e8..3d9e6e8ae 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -9,6 +9,7 @@ use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::future::poll_fn; use futures_util::{pin_mut, SinkExt, StreamExt}; +use pin_project_lite::pin_project; use crate::error::PayloadError; use crate::h1; @@ -237,10 +238,11 @@ impl AsyncWrite for H1Connection } } -#[pin_project::pin_project] -pub(crate) struct PlStream { - #[pin] - framed: Option>, +pin_project! { + pub(crate) struct PlStream { + #[pin] + framed: Option>, + } } impl PlStream { diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 867ba5c0c..f68a6ed39 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -11,14 +11,12 @@ use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use actix_utils::task::LocalWaker; use ahash::AHashMap; -use bytes::Bytes; use futures_channel::oneshot; use futures_core::future::LocalBoxFuture; use futures_util::future::{poll_fn, FutureExt}; -use h2::client::{Connection, SendRequest}; use http::uri::Authority; use indexmap::IndexSet; -use pin_project::pin_project; +use pin_project_lite::pin_project; use slab::Slab; use super::config::ConnectorConfig; @@ -386,11 +384,12 @@ where } } -#[pin_project::pin_project] -struct CloseConnection { - io: T, - #[pin] - timeout: Sleep, +pin_project! { + struct CloseConnection { + io: T, + #[pin] + timeout: Sleep + } } impl CloseConnection @@ -424,7 +423,6 @@ where } } -#[pin_project] struct ConnectorPoolSupport where Io: AsyncRead + AsyncWrite + Unpin + 'static, @@ -442,9 +440,9 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + let this = self.get_mut(); - if Rc::strong_count(this.inner) == 1 { + if Rc::strong_count(&this.inner) == 1 { // If we are last copy of Inner it means the ConnectionPool is already gone // and we are safe to exit. return Poll::Ready(()); @@ -498,55 +496,75 @@ where } } -#[pin_project::pin_project(PinnedDrop)] -struct OpenWaitingConnection +struct OpenWaitingConnection where Io: AsyncRead + AsyncWrite + Unpin + 'static, { - #[pin] - fut: F, - key: Key, - h2: Option< - LocalBoxFuture< - 'static, - Result<(SendRequest, Connection), h2::Error>, - >, - >, - rx: Option, ConnectError>>>, inner: Option>>>, - config: ConnectorConfig, } -impl OpenWaitingConnection +impl OpenWaitingConnection where - F: Future> + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static, { - fn spawn( + fn spawn( key: Key, rx: oneshot::Sender, ConnectError>>, inner: Rc>>, fut: F, config: ConnectorConfig, - ) { - actix_rt::spawn(OpenWaitingConnection { - key, - fut, - h2: None, - rx: Some(rx), - inner: Some(inner), - config, + ) where + F: Future> + 'static, + { + // OpenWaitingConnection would guard the spawn task and release + // permission/wake up support future when spawn task is canceled/generated error. + let mut guard = OpenWaitingConnection { inner: Some(inner) }; + + actix_rt::spawn(async move { + let (io, proto) = match fut.await { + Ok((io, proto)) => (io, proto), + Err(e) => { + let _ = Option::take(&mut guard.inner); + let _ = rx.send(Err(e)); + return; + } + }; + match proto { + Protocol::Http1 => { + let inner = Option::take(&mut guard.inner); + let _ = rx.send(Ok(IoConnection::new( + ConnectionType::H1(io), + Instant::now(), + Some(Acquired(key, inner)), + ))); + } + _ => match handshake(io, &config).await { + Ok((sender, connection)) => { + let inner = Option::take(&mut guard.inner); + let _ = rx.send(Ok(IoConnection::new( + ConnectionType::H2(H2Connection::new(sender, connection)), + Instant::now(), + Some(Acquired(key, inner)), + ))); + } + Err(err) => { + let _ = Option::take(&mut guard.inner); + let _ = rx.send(Err(ConnectError::H2(err))); + } + }, + } }); } } -#[pin_project::pinned_drop] -impl PinnedDrop for OpenWaitingConnection +impl Drop for OpenWaitingConnection where Io: AsyncRead + AsyncWrite + Unpin + 'static, { - fn drop(self: Pin<&mut Self>) { - if let Some(inner) = self.project().inner.take() { + fn drop(&mut self) { + // if inner is some it means OpenWaitingConnection did not finish + // it's task. release permission and try to wake up support future. + if let Some(inner) = self.inner.take() { let mut inner = inner.as_ref().borrow_mut(); inner.release(); inner.check_availability(); @@ -554,65 +572,6 @@ where } } -impl Future for OpenWaitingConnection -where - F: Future>, - Io: AsyncRead + AsyncWrite + Unpin, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(ref mut h2) = this.h2 { - return match Pin::new(h2).poll(cx) { - Poll::Ready(Ok((sender, connection))) => { - let rx = this.rx.take().unwrap(); - let _ = rx.send(Ok(IoConnection::new( - ConnectionType::H2(H2Connection::new(sender, connection)), - Instant::now(), - Some(Acquired(this.key.clone(), this.inner.take())), - ))); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - let _ = this.inner.take(); - if let Some(rx) = this.rx.take() { - let _ = rx.send(Err(ConnectError::H2(err))); - } - Poll::Ready(()) - } - }; - } - - match this.fut.poll(cx) { - Poll::Ready(Err(err)) => { - let _ = this.inner.take(); - if let Some(rx) = this.rx.take() { - let _ = rx.send(Err(err)); - } - Poll::Ready(()) - } - Poll::Ready(Ok((io, proto))) => { - if proto == Protocol::Http1 { - let rx = this.rx.take().unwrap(); - let _ = rx.send(Ok(IoConnection::new( - ConnectionType::H1(io), - Instant::now(), - Some(Acquired(this.key.clone(), this.inner.take())), - ))); - Poll::Ready(()) - } else { - *this.h2 = Some(handshake(io, this.config).boxed_local()); - self.poll(cx) - } - } - Poll::Pending => Poll::Pending, - } - } -} - pub(crate) struct Acquired(Key, Option>>>); impl Acquired diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 1d4a8e933..6998dae24 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -9,7 +9,7 @@ use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; @@ -21,13 +21,14 @@ use crate::error::BlockingError; const INPLACE: usize = 1024; -#[pin_project] -pub struct Encoder { - eof: bool, - #[pin] - body: EncoderBody, - encoder: Option, - fut: Option>>, +pin_project! { + pub struct Encoder { + eof: bool, + #[pin] + body: EncoderBody, + encoder: Option, + fut: Option>>, + } } impl Encoder { @@ -43,19 +44,21 @@ impl Encoder { || encoding == ContentEncoding::Auto); let body = match body { - ResponseBody::Other(b) => match b { - Body::None => return ResponseBody::Other(Body::None), - Body::Empty => return ResponseBody::Other(Body::Empty), - Body::Bytes(buf) => { + ResponseBody::Other { body } => match body { + Body::None => return ResponseBody::Other { body: Body::None }, + Body::Empty => return ResponseBody::Other { body: Body::Empty }, + Body::Bytes(bytes) => { if can_encode { - EncoderBody::Bytes(buf) + EncoderBody::Bytes { bytes } } else { - return ResponseBody::Other(Body::Bytes(buf)); + return ResponseBody::Other { + body: Body::Bytes(bytes), + }; } } - Body::Message(stream) => EncoderBody::BoxedStream(stream), + Body::Message(stream) => EncoderBody::BoxedStream { stream }, }, - ResponseBody::Body(stream) => EncoderBody::Stream(stream), + ResponseBody::Body { body } => EncoderBody::Stream { stream: body }, }; if can_encode { @@ -63,36 +66,42 @@ impl Encoder { if let Some(enc) = ContentEncoder::encoder(encoding) { update_head(encoding, head); head.no_chunking(false); - return ResponseBody::Body(Encoder { - body, - eof: false, - fut: None, - encoder: Some(enc), - }); + return ResponseBody::Body { + body: Encoder { + body, + eof: false, + fut: None, + encoder: Some(enc), + }, + }; } } - ResponseBody::Body(Encoder { - body, - eof: false, - fut: None, - encoder: None, - }) + ResponseBody::Body { + body: Encoder { + body, + eof: false, + fut: None, + encoder: None, + }, + } } } -#[pin_project(project = EncoderBodyProj)] -enum EncoderBody { - Bytes(Bytes), - Stream(#[pin] B), - BoxedStream(Box), +pin_project! { + #[project = EncoderBodyProj] + enum EncoderBody { + Bytes { bytes: Bytes }, + Stream { #[pin] stream: B }, + BoxedStream { stream: Box } + } } impl MessageBody for EncoderBody { fn size(&self) -> BodySize { match self { - EncoderBody::Bytes(ref b) => b.size(), - EncoderBody::Stream(ref b) => b.size(), - EncoderBody::BoxedStream(ref b) => b.size(), + EncoderBody::Bytes { ref bytes } => bytes.size(), + EncoderBody::Stream { ref stream } => stream.size(), + EncoderBody::BoxedStream { ref stream } => stream.size(), } } @@ -101,16 +110,16 @@ impl MessageBody for EncoderBody { cx: &mut Context<'_>, ) -> Poll>> { match self.project() { - EncoderBodyProj::Bytes(b) => { - if b.is_empty() { + EncoderBodyProj::Bytes { bytes } => { + if bytes.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(std::mem::take(b)))) + Poll::Ready(Some(Ok(std::mem::take(bytes)))) } } - EncoderBodyProj::Stream(b) => b.poll_next(cx), - EncoderBodyProj::BoxedStream(ref mut b) => { - Pin::new(b.as_mut()).poll_next(cx) + EncoderBodyProj::Stream { stream } => stream.poll_next(cx), + EncoderBodyProj::BoxedStream { stream } => { + Pin::new(stream.as_mut()).poll_next(cx) } } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index f8fc95921..2463cf001 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -14,7 +14,7 @@ use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use log::{error, trace}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::config::ServiceConfig; @@ -45,70 +45,92 @@ bitflags! { } } -#[pin_project::pin_project] -/// Dispatcher for HTTP/1.1 protocol -pub struct Dispatcher -where - S: Service, - S::Error: Into, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - #[pin] - inner: DispatcherState, - - #[cfg(test)] - poll_count: u64, +#[cfg(test)] +pin_project! { + /// Dispatcher for HTTP/1.1 protocol + pub struct Dispatcher + where + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + #[pin] + inner: DispatcherState, + poll_count: u64 + } } -#[pin_project(project = DispatcherStateProj)] -enum DispatcherState -where - S: Service, - S::Error: Into, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - Normal(#[pin] InnerDispatcher), - Upgrade(#[pin] U::Future), +#[cfg(not(test))] +pin_project! { + /// Dispatcher for HTTP/1.1 protocol + pub struct Dispatcher + where + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + #[pin] + inner: DispatcherState + } } -#[pin_project(project = InnerDispatcherProj)] -struct InnerDispatcher -where - S: Service, - S::Error: Into, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - flow: Rc>, - on_connect_data: OnConnectData, - flags: Flags, - peer_addr: Option, - error: Option, +pin_project! { + #[project = DispatcherStateProj] + enum DispatcherState + where + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + Normal { #[pin] inner: InnerDispatcher}, + Upgrade { #[pin] upgrade: U::Future } + } +} - #[pin] - state: State, - payload: Option, - messages: VecDeque, +pin_project! { + #[project = InnerDispatcherProj] + struct InnerDispatcher + where + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + flow: Rc>, + on_connect_data: OnConnectData, + flags: Flags, + peer_addr: Option, + error: Option, - ka_expire: Instant, - #[pin] - ka_timer: Option, + #[pin] + state: State, + payload: Option, + messages: VecDeque, - io: Option, - read_buf: BytesMut, - write_buf: BytesMut, - codec: Codec, + ka_expire: Instant, + #[pin] + ka_timer: Option, + + io: Option, + read_buf: BytesMut, + write_buf: BytesMut, + codec: Codec + } } enum DispatcherMessage { @@ -117,17 +139,19 @@ enum DispatcherMessage { Error(Response<()>), } -#[pin_project(project = StateProj)] -enum State -where - S: Service, - X: Service, - B: MessageBody, -{ - None, - ExpectCall(#[pin] X::Future), - ServiceCall(#[pin] S::Future), - SendPayload(#[pin] ResponseBody), +pin_project! { + #[project = StateProj] + enum State + where + S: Service, + X: Service, + B: MessageBody + { + None, + ExpectCall { #[pin] fut: X::Future }, + ServiceCall { #[pin] fut: S::Future }, + SendPayload { #[pin] body: ResponseBody } + } } impl State @@ -141,7 +165,7 @@ where } fn is_call(&self) -> bool { - matches!(self, State::ServiceCall(_)) + matches!(self, State::ServiceCall { .. }) } } enum PollResponse { @@ -220,22 +244,24 @@ where }; Dispatcher { - inner: DispatcherState::Normal(InnerDispatcher { - write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), - payload: None, - state: State::None, - error: None, - messages: VecDeque::new(), - io: Some(io), - codec, - read_buf, - flow: services, - on_connect_data, - flags, - peer_addr, - ka_expire, - ka_timer, - }), + inner: DispatcherState::Normal { + inner: InnerDispatcher { + write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + payload: None, + state: State::None, + error: None, + messages: VecDeque::new(), + io: Some(io), + codec, + read_buf, + flow: services, + on_connect_data, + flags, + peer_addr, + ka_expire, + ka_timer, + }, + }, #[cfg(test)] poll_count: 0, @@ -337,7 +363,7 @@ where this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); match body.size() { BodySize::None | BodySize::Empty => this.state.set(State::None), - _ => this.state.set(State::SendPayload(body)), + _ => this.state.set(State::SendPayload { body }), }; Ok(()) } @@ -363,8 +389,10 @@ where true } Some(DispatcherMessage::Error(res)) => { - self.as_mut() - .send_response(res, ResponseBody::Other(Body::Empty))?; + self.as_mut().send_response( + res, + ResponseBody::Other { body: Body::Empty }, + )?; true } Some(DispatcherMessage::Upgrade(req)) => { @@ -372,12 +400,12 @@ where } None => false, }, - StateProj::ExpectCall(fut) => match fut.poll(cx) { + StateProj::ExpectCall { fut } => match fut.poll(cx) { Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); this = self.as_mut().project(); let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall(fut)); + this.state.set(State::ServiceCall { fut }); continue; } Poll::Ready(Err(e)) => { @@ -388,7 +416,7 @@ where } Poll::Pending => false, }, - StateProj::ServiceCall(fut) => match fut.poll(cx) { + StateProj::ServiceCall { fut } => match fut.poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.as_mut().send_response(res, body)?; @@ -402,10 +430,10 @@ where } Poll::Pending => false, }, - StateProj::SendPayload(mut stream) => { + StateProj::SendPayload { mut body } => { loop { if this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { - match stream.as_mut().poll_next(cx) { + match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec.encode( Message::Chunk(Some(item)), @@ -466,26 +494,26 @@ where if req.head().expect() { // set dispatcher state so the future is pinned. let mut this = self.as_mut().project(); - let task = this.flow.expect.call(req); - this.state.set(State::ExpectCall(task)); + let fut = this.flow.expect.call(req); + this.state.set(State::ExpectCall { fut }); } else { // the same as above. let mut this = self.as_mut().project(); - let task = this.flow.service.call(req); - this.state.set(State::ServiceCall(task)); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); }; // eagerly poll the future for once(or twice if expect is resolved immediately). loop { match self.as_mut().project().state.project() { - StateProj::ExpectCall(fut) => { + StateProj::ExpectCall { fut } => { match fut.poll(cx) { // expect is resolved. continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); let mut this = self.as_mut().project(); - let task = this.flow.service.call(req); - this.state.set(State::ServiceCall(task)); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); continue; } // future is pending. return Ok(()) to notify that a new state is @@ -502,7 +530,7 @@ where } } } - StateProj::ServiceCall(fut) => { + StateProj::ServiceCall { fut } => { // return no matter the service call future's result. return match fut.poll(cx) { // future is resolved. send response and return a result. On success @@ -707,7 +735,7 @@ where trace!("Slow request timeout"); let _ = self.as_mut().send_response( Response::RequestTimeout().finish().drop_body(), - ResponseBody::Other(Body::Empty), + ResponseBody::Other { body: Body::Empty }, ); this = self.project(); } else { @@ -832,7 +860,7 @@ where } match this.inner.project() { - DispatcherStateProj::Normal(mut inner) => { + DispatcherStateProj::Normal { mut inner } => { inner.as_mut().poll_keepalive(cx)?; if inner.flags.contains(Flags::SHUTDOWN) { @@ -876,7 +904,7 @@ where self.as_mut() .project() .inner - .set(DispatcherState::Upgrade(upgrade)); + .set(DispatcherState::Upgrade { upgrade }); return self.poll(cx); } }; @@ -928,7 +956,7 @@ where } } } - DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| { + DispatcherStateProj::Upgrade { upgrade } => upgrade.poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e); DispatchError::Upgrade }), @@ -1017,7 +1045,7 @@ mod tests { Poll::Ready(res) => assert!(res.is_err()), } - if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { + if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { assert!(inner.flags.contains(Flags::READ_DISCONNECT)); assert_eq!( &inner.project().io.take().unwrap().write_buf[..26], @@ -1052,7 +1080,7 @@ mod tests { futures_util::pin_mut!(h1); - assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); match h1.as_mut().poll(cx) { Poll::Pending => panic!("first poll should not be pending"), @@ -1062,7 +1090,7 @@ mod tests { // polls: initial => shutdown assert_eq!(h1.poll_count, 2); - if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { + if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { let res = &mut inner.project().io.take().unwrap().write_buf[..]; stabilize_date_header(res); @@ -1106,7 +1134,7 @@ mod tests { futures_util::pin_mut!(h1); - assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); match h1.as_mut().poll(cx) { Poll::Pending => panic!("first poll should not be pending"), @@ -1116,7 +1144,7 @@ mod tests { // polls: initial => shutdown assert_eq!(h1.poll_count, 1); - if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() { + if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() { let res = &mut inner.project().io.take().unwrap().write_buf[..]; stabilize_date_header(res); @@ -1166,13 +1194,13 @@ mod tests { futures_util::pin_mut!(h1); assert!(h1.as_mut().poll(cx).is_pending()); - assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); // polls: manual assert_eq!(h1.poll_count, 1); eprintln!("poll count: {}", h1.poll_count); - if let DispatcherState::Normal(ref inner) = h1.inner { + if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); let res = &io.write_buf()[..]; assert_eq!( @@ -1187,7 +1215,7 @@ mod tests { // polls: manual manual shutdown assert_eq!(h1.poll_count, 3); - if let DispatcherState::Normal(ref inner) = h1.inner { + if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); let mut res = (&io.write_buf()[..]).to_owned(); stabilize_date_header(&mut res); @@ -1238,12 +1266,12 @@ mod tests { futures_util::pin_mut!(h1); assert!(h1.as_mut().poll(cx).is_ready()); - assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); // polls: manual shutdown assert_eq!(h1.poll_count, 2); - if let DispatcherState::Normal(ref inner) = h1.inner { + if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); let mut res = (&io.write_buf()[..]).to_owned(); stabilize_date_header(&mut res); @@ -1299,7 +1327,7 @@ mod tests { futures_util::pin_mut!(h1); assert!(h1.as_mut().poll(cx).is_ready()); - assert!(matches!(&h1.inner, DispatcherState::Upgrade(_))); + assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. })); // polls: manual shutdown assert_eq!(h1.poll_count, 2); diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index b79453ebd..07befeaab 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -10,6 +10,7 @@ use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use futures_core::ready; use futures_util::future::ready; +use pin_project_lite::pin_project; use crate::body::MessageBody; use crate::config::ServiceConfig; @@ -275,31 +276,32 @@ where } } -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H1ServiceResponse -where - S: ServiceFactory, - S::Error: Into, - S::InitError: fmt::Debug, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option, - #[pin] - fut_upg: Option, - expect: Option, - upgrade: Option, - on_connect_ext: Option>>, - cfg: Option, - _phantom: PhantomData, +pin_project! { + #[doc(hidden)] + pub struct H1ServiceResponse + where + S: ServiceFactory, + S::Error: Into, + S::InitError: fmt::Debug, + X: ServiceFactory, + X::Error: Into, + X::InitError: fmt::Debug, + U: ServiceFactory<(Request, Framed), Response = ()>, + U::Error: fmt::Display, + U::InitError: fmt::Debug + { + #[pin] + fut: S::Future, + #[pin] + fut_ex: Option, + #[pin] + fut_upg: Option, + expect: Option, + upgrade: Option, + on_connect_ext: Option>>, + cfg: Option, + _phantom: PhantomData + } } impl Future for H1ServiceResponse diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 9e9c57137..1c47c116e 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -3,20 +3,22 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; +use pin_project_lite::pin_project; use crate::body::{BodySize, MessageBody, ResponseBody}; use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; -/// Send HTTP/1 response -#[pin_project::pin_project] -pub struct SendResponse { - res: Option, BodySize)>>, - #[pin] - body: Option>, - #[pin] - framed: Option>, +pin_project! { + /// Send HTTP/1 response + pub struct SendResponse { + res: Option, BodySize)>>, + #[pin] + body: Option>, + #[pin] + framed: Option> + } } impl SendResponse diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 5ccd2a9d1..0d1b2f67a 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -15,6 +15,7 @@ use h2::server::{Connection, SendResponse}; use h2::SendStream; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; +use pin_project_lite::pin_project; use crate::body::{BodySize, MessageBody, ResponseBody}; use crate::config::ServiceConfig; @@ -28,22 +29,25 @@ use crate::OnConnectData; const CHUNK_SIZE: usize = 16_384; -/// Dispatcher for HTTP/2 protocol. -#[pin_project::pin_project] -pub struct Dispatcher -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - B: MessageBody, -{ - flow: Rc>, - connection: Connection, - on_connect_data: OnConnectData, - config: ServiceConfig, - peer_addr: Option, - ka_expire: Instant, - ka_timer: Option, - _phantom: PhantomData, +pin_project! { + /// Dispatcher for HTTP/2 protocol. + pub struct Dispatcher + where + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + S: Service, + B: MessageBody + { + flow: Rc>, + connection: Connection, + on_connect_data: OnConnectData, + config: ServiceConfig, + peer_addr: Option, + ka_expire: Instant, + ka_timer: Option, + _phantom: PhantomData + } } impl Dispatcher @@ -136,10 +140,10 @@ where this.on_connect_data.merge_into(&mut req); let svc = ServiceResponse:: { - state: ServiceResponseState::ServiceCall( - this.flow.service.call(req), - Some(res), - ), + state: ServiceResponseState::ServiceCall { + fut: this.flow.service.call(req), + sender: Some(res), + }, config: this.config.clone(), buffer: None, _phantom: PhantomData, @@ -152,19 +156,30 @@ where } } -#[pin_project::pin_project] -struct ServiceResponse { - #[pin] - state: ServiceResponseState, - config: ServiceConfig, - buffer: Option, - _phantom: PhantomData<(I, E)>, +pin_project! { + struct ServiceResponse { + #[pin] + state: ServiceResponseState, + config: ServiceConfig, + buffer: Option, + _phantom: PhantomData<(I, E)>, + } } -#[pin_project::pin_project(project = ServiceResponseStateProj)] -enum ServiceResponseState { - ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, #[pin] ResponseBody), +pin_project! { + #[project = ServiceResponseStateProj] + enum ServiceResponseState { + ServiceCall { + #[pin] + fut: F, + sender: Option> + }, + SendPayload { + stream: SendStream, + #[pin] + body: ResponseBody + }, + } } impl ServiceResponse @@ -250,12 +265,12 @@ where let mut this = self.as_mut().project(); match this.state.project() { - ServiceResponseStateProj::ServiceCall(call, send) => { - match ready!(call.poll(cx)) { + ServiceResponseStateProj::ServiceCall { fut, sender } => { + match ready!(fut.poll(cx)) { Ok(res) => { let (res, body) = res.into().replace_body(()); - let mut send = send.take().unwrap(); + let mut send = sender.take().unwrap(); let mut size = body.size(); let h2_res = self.as_mut().prepare_response(res.head(), &mut size); @@ -273,7 +288,7 @@ where Poll::Ready(()) } else { this.state - .set(ServiceResponseState::SendPayload(stream, body)); + .set(ServiceResponseState::SendPayload { stream, body }); self.poll(cx) } } @@ -282,7 +297,7 @@ where let res: Response = e.into().into(); let (res, body) = res.replace_body(()); - let mut send = send.take().unwrap(); + let mut send = sender.take().unwrap(); let mut size = body.size(); let h2_res = self.as_mut().prepare_response(res.head(), &mut size); @@ -299,72 +314,65 @@ where if size.is_eof() { Poll::Ready(()) } else { - this.state.set(ServiceResponseState::SendPayload( + this.state.set(ServiceResponseState::SendPayload { stream, - body.into_body(), - )); + body: body.into_body(), + }); self.poll(cx) } } } } - ServiceResponseStateProj::SendPayload(ref mut stream, ref mut body) => { + ServiceResponseStateProj::SendPayload { stream, mut body } => loop { loop { - loop { - match this.buffer { - Some(ref mut buffer) => { - match ready!(stream.poll_capacity(cx)) { - None => return Poll::Ready(()), + match this.buffer { + Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { + None => return Poll::Ready(()), - Some(Ok(cap)) => { - let len = buffer.len(); - let bytes = buffer.split_to(cmp::min(cap, len)); + Some(Ok(cap)) => { + let len = buffer.len(); + let bytes = buffer.split_to(cmp::min(cap, len)); - if let Err(e) = stream.send_data(bytes, false) { - warn!("{:?}", e); - return Poll::Ready(()); - } else if !buffer.is_empty() { - let cap = cmp::min(buffer.len(), CHUNK_SIZE); - stream.reserve_capacity(cap); - } else { - this.buffer.take(); - } - } - - Some(Err(e)) => { - warn!("{:?}", e); - return Poll::Ready(()); - } + if let Err(e) = stream.send_data(bytes, false) { + warn!("{:?}", e); + return Poll::Ready(()); + } else if !buffer.is_empty() { + let cap = cmp::min(buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + this.buffer.take(); } } - None => match ready!(body.as_mut().poll_next(cx)) { - None => { - if let Err(e) = stream.send_data(Bytes::new(), true) - { - warn!("{:?}", e); - } - return Poll::Ready(()); - } + Some(Err(e)) => { + warn!("{:?}", e); + return Poll::Ready(()); + } + }, - Some(Ok(chunk)) => { - stream.reserve_capacity(cmp::min( - chunk.len(), - CHUNK_SIZE, - )); - *this.buffer = Some(chunk); + None => match ready!(body.as_mut().poll_next(cx)) { + None => { + if let Err(e) = stream.send_data(Bytes::new(), true) { + warn!("{:?}", e); } + return Poll::Ready(()); + } - Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); - return Poll::Ready(()); - } - }, - } + Some(Ok(chunk)) => { + stream + .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); + *this.buffer = Some(chunk); + } + + Some(Err(e)) => { + error!("Response payload stream error: {:?}", e); + return Poll::Ready(()); + } + }, } } - } + }, } } } diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 36c76b17c..a8874b0db 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -15,6 +15,7 @@ use futures_core::ready; use futures_util::future::ok; use h2::server::{self, Handshake}; use log::error; +use pin_project_lite::pin_project; use crate::body::MessageBody; use crate::config::ServiceConfig; @@ -205,17 +206,18 @@ where } } -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H2ServiceResponse -where - S: ServiceFactory, -{ - #[pin] - fut: S::Future, - cfg: Option, - on_connect_ext: Option>>, - _phantom: PhantomData, +pin_project! { + #[doc(hidden)] + pub struct H2ServiceResponse + where + S: ServiceFactory, + { + #[pin] + fut: S::Future, + cfg: Option, + on_connect_ext: Option>>, + _phantom: PhantomData + } } impl Future for H2ServiceResponse diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index 110514e05..007a3ff6d 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -49,7 +49,7 @@ impl Response { pub fn new(status: StatusCode) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(Body::Empty), + body: ResponseBody::Body { body: Body::Empty }, error: None, } } @@ -67,14 +67,14 @@ impl Response { /// Convert response to response with body pub fn into_body(self) -> Response { - let b = match self.body { - ResponseBody::Body(b) => b, - ResponseBody::Other(b) => b, + let body = match self.body { + ResponseBody::Body { body } => body, + ResponseBody::Other { body } => body, }; Response { head: self.head, error: self.error, - body: ResponseBody::Other(b), + body: ResponseBody::Other { body }, } } } @@ -85,7 +85,7 @@ impl Response { pub fn with_body(status: StatusCode, body: B) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(body), + body: ResponseBody::Body { body }, error: None, } } @@ -210,7 +210,7 @@ impl Response { pub fn set_body(self, body: B2) -> Response { Response { head: self.head, - body: ResponseBody::Body(body), + body: ResponseBody::Body { body }, error: None, } } @@ -220,7 +220,7 @@ impl Response { ( Response { head: self.head, - body: ResponseBody::Body(()), + body: ResponseBody::Body { body: () }, error: self.error, }, self.body, @@ -231,7 +231,7 @@ impl Response { pub fn drop_body(self) -> Response<()> { Response { head: self.head, - body: ResponseBody::Body(()), + body: ResponseBody::Body { body: () }, error: None, } } @@ -241,7 +241,7 @@ impl Response { ( Response { head: self.head, - body: ResponseBody::Body(body), + body: ResponseBody::Body { body }, error: self.error, }, self.body, @@ -635,7 +635,7 @@ impl ResponseBuilder { Response { head: response, - body: ResponseBody::Body(body), + body: ResponseBody::Body { body }, error: None, } } diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index f0121db97..2a6cf2d7c 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -9,7 +9,7 @@ use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactor use bytes::Bytes; use futures_core::{ready, Future}; use h2::server::{self, Handshake}; -use pin_project::pin_project; +use pin_project_lite::pin_project; use crate::body::MessageBody; use crate::builder::HttpServiceBuilder; @@ -351,25 +351,26 @@ where } } -#[doc(hidden)] -#[pin_project] -pub struct HttpServiceResponse -where - S: ServiceFactory, - X: ServiceFactory, - U: ServiceFactory<(Request, Framed)>, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option, - #[pin] - fut_upg: Option, - expect: Option, - upgrade: Option, - on_connect_ext: Option>>, - cfg: ServiceConfig, - _phantom: PhantomData, +pin_project! { + #[doc(hidden)] + pub struct HttpServiceResponse + where + S: ServiceFactory, + X: ServiceFactory, + U: ServiceFactory<(Request, Framed)> + { + #[pin] + fut: S::Future, + #[pin] + fut_ex: Option, + #[pin] + fut_upg: Option, + expect: Option, + upgrade: Option, + on_connect_ext: Option>>, + cfg: ServiceConfig, + _phantom: PhantomData + } } impl Future for HttpServiceResponse @@ -561,23 +562,27 @@ where match proto { Protocol::Http2 => HttpServiceHandlerResponse { - state: State::H2Handshake(Some(( - server::handshake(io), - self.cfg.clone(), - self.flow.clone(), - on_connect_data, - peer_addr, - ))), + state: State::H2Handshake { + hds: Some(( + server::handshake(io), + self.cfg.clone(), + self.flow.clone(), + on_connect_data, + peer_addr, + )), + }, }, Protocol::Http1 => HttpServiceHandlerResponse { - state: State::H1(h1::Dispatcher::new( - io, - self.cfg.clone(), - self.flow.clone(), - on_connect_data, - peer_addr, - )), + state: State::H1 { + dsp: h1::Dispatcher::new( + io, + self.cfg.clone(), + self.flow.clone(), + on_connect_data, + peer_addr, + ), + }, }, proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), @@ -585,48 +590,58 @@ where } } -#[pin_project(project = StateProj)] -enum State -where - S: Service, - S::Future: 'static, - S::Error: Into, - T: AsyncRead + AsyncWrite + Unpin, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - H1(#[pin] h1::Dispatcher), - H2(#[pin] Dispatcher), - H2Handshake( - Option<( - Handshake, - ServiceConfig, - Rc>, - OnConnectData, - Option, - )>, - ), +pin_project! { + #[project = StateProj] + enum State + where + S: Service, + S::Future: 'static, + S::Error: Into, + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + B: MessageBody, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + H1 { #[pin] dsp: h1::Dispatcher }, + H2 { #[pin] dsp: Dispatcher }, + H2Handshake { + hds: Option<( + Handshake, + ServiceConfig, + Rc>, + OnConnectData, + Option, + )> + } + } } -#[pin_project] -pub struct HttpServiceHandlerResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - S::Error: Into + 'static, - S::Future: 'static, - S::Response: Into> + 'static, - B: MessageBody + 'static, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - #[pin] - state: State, +pin_project! { + pub struct HttpServiceHandlerResponse + where + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + S: Service, + S::Error: Into, + S::Error: 'static, + S::Future: 'static, + S::Response: Into>, + S::Response: 'static, + B: MessageBody, + B: 'static, + X: Service, + X::Error: Into, + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display + { + #[pin] + state: State + } } impl Future for HttpServiceHandlerResponse @@ -646,21 +661,23 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project().state.project() { - StateProj::H1(disp) => disp.poll(cx), - StateProj::H2(disp) => disp.poll(cx), - StateProj::H2Handshake(data) => { - match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { + StateProj::H1 { dsp } => dsp.poll(cx), + StateProj::H2 { dsp } => dsp.poll(cx), + StateProj::H2Handshake { hds } => { + match ready!(Pin::new(&mut hds.as_mut().unwrap().0).poll(cx)) { Ok(conn) => { let (_, cfg, srv, on_connect_data, peer_addr) = - data.take().unwrap(); - self.as_mut().project().state.set(State::H2(Dispatcher::new( - srv, - conn, - on_connect_data, - cfg, - None, - peer_addr, - ))); + hds.take().unwrap(); + self.as_mut().project().state.set(State::H2 { + dsp: Dispatcher::new( + srv, + conn, + on_connect_data, + cfg, + None, + peer_addr, + ), + }); self.poll(cx) } Err(err) => { diff --git a/actix-http/src/ws/dispatcher.rs b/actix-http/src/ws/dispatcher.rs index 7be7cf637..f6e6c7b0a 100644 --- a/actix-http/src/ws/dispatcher.rs +++ b/actix-http/src/ws/dispatcher.rs @@ -5,17 +5,21 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoService, Service}; use actix_utils::dispatcher::{Dispatcher as InnerDispatcher, DispatcherError}; +use pin_project_lite::pin_project; use super::{Codec, Frame, Message}; -#[pin_project::pin_project] -pub struct Dispatcher -where - S: Service + 'static, - T: AsyncRead + AsyncWrite, -{ - #[pin] - inner: InnerDispatcher, +pin_project! { + pub struct Dispatcher + where + S: Service, + S: 'static, + T: AsyncRead, + T: AsyncWrite, + { + #[pin] + inner: InnerDispatcher + } } impl Dispatcher diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 2df535280..d0ff0eed4 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -117,7 +117,9 @@ pub trait MapServiceResponseBody { impl MapServiceResponseBody for ServiceResponse { fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) + self.map_body(|_, body| ResponseBody::Other { + body: Body::from_message(body), + }) } } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 969aa9be5..30a48eb09 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -289,13 +289,13 @@ where let time = *this.time; let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| { - ResponseBody::Body(StreamLog { + Poll::Ready(Ok(res.map_body(move |_, body| ResponseBody::Body { + body: StreamLog { body, time, format, size: 0, - }) + }, }))) } } diff --git a/src/responder.rs b/src/responder.rs index dcad45e0f..92456f14a 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -277,7 +277,9 @@ pub(crate) mod tests { let resp = srv.call(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + ResponseBody::Body { + body: Body::Bytes(ref b), + } => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"some")); } @@ -292,21 +294,21 @@ pub(crate) mod tests { impl BodyTest for ResponseBody { fn bin_ref(&self) -> &[u8] { - match self { - ResponseBody::Body(ref b) => match b { + match *self { + ResponseBody::Body { ref body } => match body { Body::Bytes(ref bin) => &bin, _ => panic!(), }, - ResponseBody::Other(ref b) => match b { + ResponseBody::Other { ref body } => match body { Body::Bytes(ref bin) => &bin, _ => panic!(), }, } } fn body(&self) -> &Body { - match self { - ResponseBody::Body(ref b) => b, - ResponseBody::Other(ref b) => b, + match *self { + ResponseBody::Body { ref body } => body, + ResponseBody::Other { ref body } => body, } } } diff --git a/src/scope.rs b/src/scope.rs index d17acd843..28ce17ae3 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -748,7 +748,9 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + ResponseBody::Body { + body: Body::Bytes(ref b), + } => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project1")); } @@ -849,7 +851,9 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + ResponseBody::Body { + body: Body::Bytes(ref b), + } => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project_1")); } @@ -877,7 +881,9 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + ResponseBody::Body { + body: Body::Bytes(ref b), + } => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: test - 1")); }