From 6822bf2f580a504afcd36bcaa3a95d12e628bc3b Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 26 Mar 2021 09:15:04 -0700 Subject: [PATCH 1/9] Refactor actix_http::h1::service (#2117) --- actix-http/src/h1/service.rs | 205 ++++++++++++----------------------- 1 file changed, 70 insertions(+), 135 deletions(-) diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 51303886b..4fe79736b 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,6 +1,4 @@ -use std::future::Future; use std::marker::PhantomData; -use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, net}; @@ -8,7 +6,7 @@ use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; -use futures_core::ready; +use futures_core::{future::LocalBoxFuture, ready}; use futures_util::future::ready; use crate::body::MessageBody; @@ -60,14 +58,17 @@ where impl H1Service where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -94,17 +95,21 @@ mod openssl { use super::*; use actix_service::ServiceFactoryExt; - use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, TlsStream}; - use actix_tls::accept::TlsError; + use actix_tls::accept::{ + openssl::{Acceptor, SslAcceptor, SslError, TlsStream}, + TlsError, + }; impl H1Service, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< @@ -112,6 +117,7 @@ mod openssl { Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -143,19 +149,25 @@ mod openssl { #[cfg(feature = "rustls")] mod rustls { use super::*; + + use std::io; + use actix_service::ServiceFactoryExt; - use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream}; - use actix_tls::accept::TlsError; - use std::{fmt, io}; + use actix_tls::accept::{ + rustls::{Acceptor, ServerConfig, TlsStream}, + TlsError, + }; impl H1Service, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory< @@ -163,6 +175,7 @@ mod rustls { Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -241,16 +254,19 @@ where impl ServiceFactory<(T, Option)> for H1Service where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, + S::Future: 'static, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, B: MessageBody, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, { @@ -259,103 +275,42 @@ where type Config = (); type Service = H1ServiceHandler; type InitError = (); - type Future = H1ServiceResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - H1ServiceResponse { - fut: self.srv.new_service(()), - fut_ex: Some(self.expect.new_service(())), - fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), - expect: None, - upgrade: None, - on_connect_ext: self.on_connect_ext.clone(), - cfg: Some(self.cfg.clone()), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let expect = self.expect.new_service(()); + let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); + let on_connect_ext = self.on_connect_ext.clone(); + let cfg = self.cfg.clone(); -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H1ServiceResponse -where - S: ServiceFactory, - S::Error: Into, - S::InitError: fmt::Debug, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option, - #[pin] - fut_upg: Option, - expect: Option, - upgrade: Option, - on_connect_ext: Option>>, - cfg: Option, - _phantom: PhantomData, -} + Box::pin(async move { + let expect = expect + .await + .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; -impl Future for H1ServiceResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory, - S::Error: Into, - S::Response: Into>, - S::InitError: fmt::Debug, - B: MessageBody, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, -{ - type Output = Result, ()>; + let upgrade = match upgrade { + Some(upgrade) => { + let upgrade = upgrade.await.map_err(|e| { + log::error!("Init http upgrade service error: {:?}", e) + })?; + Some(upgrade) + } + None => None, + }; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); + let service = service + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; - if let Some(fut) = this.fut_ex.as_pin_mut() { - let expect = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.expect = Some(expect); - this.fut_ex.set(None); - } - - if let Some(fut) = this.fut_upg.as_pin_mut() { - let upgrade = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.upgrade = Some(upgrade); - this.fut_upg.set(None); - } - - let result = ready!(this - .fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e))); - - Poll::Ready(result.map(|service| { - let this = self.as_mut().project(); - - H1ServiceHandler::new( - this.cfg.take().unwrap(), + Ok(H1ServiceHandler::new( + cfg, service, - this.expect.take().unwrap(), - this.upgrade.take(), - this.on_connect_ext.clone(), - ) - })) + expect, + upgrade, + on_connect_ext, + )) + }) } } @@ -417,47 +372,27 @@ where type Future = Dispatcher; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready = self - .flow - .expect - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready(); + ready!(self.flow.expect.poll_ready(cx)).map_err(|e| { + let e = e.into(); + log::error!("Http expect service readiness error: {:?}", e); + DispatchError::Service(e) + })?; - let ready = self - .flow - .service - .poll_ready(cx) - .map_err(|e| { + if let Some(ref upg) = self.flow.upgrade { + ready!(upg.poll_ready(cx)).map_err(|e| { let e = e.into(); - log::error!("Http service readiness error: {:?}", e); + log::error!("Http upgrade service readiness error: {:?}", e); DispatchError::Service(e) - })? - .is_ready() - && ready; - - let ready = if let Some(ref upg) = self.flow.upgrade { - upg.poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready - } else { - ready + })?; }; - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + ready!(self.flow.service.poll_ready(cx)).map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })?; + + Poll::Ready(Ok(())) } fn call(&self, (io, addr): (T, Option)) -> Self::Future { From 60f9cfbb2a0fc77ba6dc918f199ee3a602c64549 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 26 Mar 2021 11:24:51 -0700 Subject: [PATCH 2/9] Refactor actix_http::h2::service module. Reduce loc. (#2118) --- actix-http/src/h2/service.rs | 88 ++++++++++++------------------------ 1 file changed, 30 insertions(+), 58 deletions(-) diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index c64139564..db0b580b3 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -11,8 +11,8 @@ use actix_service::{ ServiceFactory, }; use bytes::Bytes; -use futures_core::ready; -use futures_util::future::ok; +use futures_core::{future::LocalBoxFuture, ready}; +use futures_util::future::ready; use h2::server::{handshake, Handshake}; use log::error; @@ -65,6 +65,7 @@ where impl H2Service where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -80,11 +81,11 @@ where Error = DispatchError, InitError = S::InitError, > { - pipeline_factory(fn_factory(|| async { - Ok::<_, S::InitError>(fn_service(|io: TcpStream| { + pipeline_factory(fn_factory(|| { + ready(Ok::<_, S::InitError>(fn_service(|io: TcpStream| { let peer_addr = io.peer_addr().ok(); - ok::<_, DispatchError>((io, peer_addr)) - })) + ready(Ok::<_, DispatchError>((io, peer_addr))) + }))) })) .and_then(self) } @@ -101,6 +102,7 @@ mod openssl { impl H2Service, S, B> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -123,10 +125,12 @@ mod openssl { .map_init_err(|_| panic!()), ) .and_then(fn_factory(|| { - ok::<_, S::InitError>(fn_service(|io: TlsStream| { - let peer_addr = io.get_ref().peer_addr().ok(); - ok((io, peer_addr)) - })) + ready(Ok::<_, S::InitError>(fn_service( + |io: TlsStream| { + let peer_addr = io.get_ref().peer_addr().ok(); + ready(Ok((io, peer_addr))) + }, + ))) })) .and_then(self.map_err(TlsError::Service)) } @@ -144,6 +148,7 @@ mod rustls { impl H2Service, S, B> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -169,10 +174,12 @@ mod rustls { .map_init_err(|_| panic!()), ) .and_then(fn_factory(|| { - ok::<_, S::InitError>(fn_service(|io: TlsStream| { - let peer_addr = io.get_ref().0.peer_addr().ok(); - ok((io, peer_addr)) - })) + ready(Ok::<_, S::InitError>(fn_service( + |io: TlsStream| { + let peer_addr = io.get_ref().0.peer_addr().ok(); + ready(Ok((io, peer_addr))) + }, + ))) })) .and_then(self.map_err(TlsError::Service)) } @@ -181,8 +188,9 @@ mod rustls { impl ServiceFactory<(T, Option)> for H2Service where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, @@ -193,52 +201,16 @@ where type Config = (); type Service = H2ServiceHandler; type InitError = S::InitError; - type Future = H2ServiceResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - H2ServiceResponse { - fut: self.srv.new_service(()), - cfg: Some(self.cfg.clone()), - on_connect_ext: self.on_connect_ext.clone(), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let cfg = self.cfg.clone(); + let on_connect_ext = self.on_connect_ext.clone(); -#[doc(hidden)] -#[pin_project::pin_project] -pub struct H2ServiceResponse -where - S: ServiceFactory, -{ - #[pin] - fut: S::Future, - cfg: Option, - on_connect_ext: Option>>, - _phantom: PhantomData, -} - -impl Future for H2ServiceResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory, - S::Error: Into + 'static, - S::Response: Into> + 'static, - >::Future: 'static, - B: MessageBody + 'static, -{ - type Output = Result, S::InitError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - this.fut.poll(cx).map_ok(|service| { - let this = self.as_mut().project(); - H2ServiceHandler::new( - this.cfg.take().unwrap(), - this.on_connect_ext.clone(), - service, - ) + Box::pin(async move { + let service = service.await?; + Ok(H2ServiceHandler::new(cfg, on_connect_ext, service)) }) } } From f954a30c341fba28ecfca598b0b29f4af3cb4386 Mon Sep 17 00:00:00 2001 From: Daniel Egger Date: Mon, 29 Mar 2021 11:18:05 +0200 Subject: [PATCH 3/9] Fix typo in CHANGES.md (#2124) --- actix-http/CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index c4e0aec89..2c71031ab 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,7 +6,7 @@ * `client::ConnectorService` as `client::Connector::finish` method's return type [#2081] * `client::ConnectionIo` trait alias [#2081] -### Chaged +### Changed * `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063] [#2063]: https://github.com/actix/actix-web/pull/2063 From e8ce73b49674895624801265c2a8525bfbf6bc62 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 29 Mar 2021 11:52:59 +0100 Subject: [PATCH 4/9] update dep docs --- actix-http/tests/test_ws.rs | 2 +- docs/graphs/net-only.dot | 24 +++++++++++------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 3b90b4e54..51238215a 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -3,6 +3,7 @@ use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::{body, h1, ws, Error, HttpService, Request, Response}; @@ -11,7 +12,6 @@ use actix_service::{fn_factory, Service}; use actix_utils::dispatcher::Dispatcher; use bytes::Bytes; use futures_util::future; -use futures_util::task::{Context, Poll}; use futures_util::{SinkExt as _, StreamExt as _}; struct WsService(Arc, Cell)>>); diff --git a/docs/graphs/net-only.dot b/docs/graphs/net-only.dot index 9488f3fe7..babd612a6 100644 --- a/docs/graphs/net-only.dot +++ b/docs/graphs/net-only.dot @@ -1,21 +1,19 @@ digraph { subgraph cluster_net { label="actix/actix-net"; - "actix-codec" - "actix-macros" - "actix-rt" - "actix-server" - "actix-service" - "actix-threadpool" - "actix-tls" - "actix-tracing" - "actix-utils" - "actix-router" + "actix-codec" "actix-macros" "actix-rt" "actix-server" "actix-service" + "actix-tls" "actix-tracing" "actix-utils" "actix-router" + "local-channel" "local-waker" } - "actix-utils" -> { "actix-service" "actix-rt" "actix-codec" } + "actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" } + "actix-utils" -> { "actix-rt" "actix-service" "local-waker" } "actix-tracing" -> { "actix-service" } "actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" } - "actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" } - "actix-rt" -> { "actix-macros" "actix-threadpool" } + "actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" } + "actix-rt" -> { "actix-macros" "tokio" } + + "local-channel" -> { "local-waker" } + + "tokio" [fontcolor = darkgreen] } From 980ecc5f07bc70d1e179f8189c4c30e2ecfdc632 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 29 Mar 2021 13:01:37 +0100 Subject: [PATCH 5/9] fix openssl windows ci --- .github/workflows/ci.yml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc0ff0c19..3aac6efa8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,6 +23,9 @@ jobs: name: ${{ matrix.target.name }} / ${{ matrix.version }} runs-on: ${{ matrix.target.os }} + env: + VCPKGRS_DYNAMIC: 1 + steps: - uses: actions/checkout@v2 @@ -65,7 +68,13 @@ jobs: uses: actions-rs/cargo@v1 with: command: hack - args: --clean-per-run check --workspace --no-default-features --tests + args: check --workspace --no-default-features + + - name: check minimal + tests + uses: actions-rs/cargo@v1 + with: + command: hack + args: check --workspace --no-default-features --tests --examples - name: check full uses: actions-rs/cargo@v1 From 222acfd070aeccd08d856e5d45090eb38b67ec4a Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 29 Mar 2021 05:45:48 -0700 Subject: [PATCH 6/9] Fix build for next actix-tls-beta release (#2122) --- Cargo.toml | 4 ++-- actix-files/Cargo.toml | 2 +- actix-http-test/Cargo.toml | 4 ++-- actix-http/Cargo.toml | 6 +++--- actix-http/src/client/connector.rs | 23 ++++++++++++++--------- actix-multipart/Cargo.toml | 2 +- actix-web-actors/Cargo.toml | 2 +- actix-web-codegen/Cargo.toml | 2 +- awc/Cargo.toml | 2 +- awc/src/builder.rs | 7 +++---- 10 files changed, 29 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3a6271ee..7dd7635cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,11 +80,11 @@ required-features = ["rustls"] actix-codec = "0.4.0-beta.1" actix-macros = "0.2.0" actix-router = "0.2.7" -actix-rt = "2.1" +actix-rt = "2.2" actix-server = "2.0.0-beta.3" actix-service = "2.0.0-beta.4" actix-utils = "3.0.0-beta.2" -actix-tls = { version = "3.0.0-beta.4", default-features = false, optional = true } +actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true } actix-web-codegen = "0.5.0-beta.2" actix-http = "3.0.0-beta.4" diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index 49cd6966c..472bd0362 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -33,5 +33,5 @@ mime_guess = "2.0.1" percent-encoding = "2.1" [dev-dependencies] -actix-rt = "2.1" +actix-rt = "2.2" actix-web = "4.0.0-beta.4" diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index a7efc5310..0e7d57fc3 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -31,9 +31,9 @@ openssl = ["tls-openssl", "awc/openssl"] [dependencies] actix-service = "2.0.0-beta.4" actix-codec = "0.4.0-beta.1" -actix-tls = "3.0.0-beta.4" +actix-tls = "3.0.0-beta.5" actix-utils = "3.0.0-beta.2" -actix-rt = "2.1" +actix-rt = "2.2" actix-server = "2.0.0-beta.3" awc = { version = "3.0.0-beta.3", default-features = false } diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index c24878404..679e8c992 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -47,8 +47,8 @@ trust-dns = ["trust-dns-resolver"] actix-service = "2.0.0-beta.4" actix-codec = "0.4.0-beta.1" actix-utils = "3.0.0-beta.2" -actix-rt = "2.1" -actix-tls = "3.0.0-beta.4" +actix-rt = "2.2" +actix-tls = "3.0.0-beta.5" ahash = "0.7" base64 = "0.13" @@ -89,7 +89,7 @@ trust-dns-resolver = { version = "0.20.0", optional = true } [dev-dependencies] actix-server = "2.0.0-beta.3" actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] } -actix-tls = { version = "3.0.0-beta.4", features = ["openssl"] } +actix-tls = { version = "3.0.0-beta.5", features = ["openssl"] } criterion = "0.3" env_logger = "0.8" rcgen = "0.8" diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 6996677d2..508fe748b 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -9,7 +9,7 @@ use std::{ }; use actix_rt::{ - net::TcpStream, + net::{ActixStream, TcpStream}, time::{sleep, Sleep}, }; use actix_service::Service; @@ -119,7 +119,7 @@ impl Connector { /// Use custom connector. pub fn connector(self, connector: S1) -> Connector where - Io1: ConnectionIo + fmt::Debug, + Io1: ActixStream + fmt::Debug + 'static, S1: Service< TcpConnect, Response = TcpConnection, @@ -136,7 +136,14 @@ impl Connector { impl Connector where - Io: ConnectionIo + fmt::Debug, + // Note: + // Input Io type is bound to ActixStream trait but internally in client module they + // are bound to ConnectionIo trait alias. And latter is the trait exposed to public + // in the form of Box type. + // + // This remap is to hide ActixStream's trait methods. They are not meant to be called + // from user code. + Io: ActixStream + fmt::Debug + 'static, S: Service< TcpConnect, Response = TcpConnection, @@ -407,16 +414,14 @@ struct TlsConnectorService { timeout: Duration, } -impl Service for TlsConnectorService +impl Service for TlsConnectorService where S: Service, Error = ConnectError> + Clone + 'static, - St: Service, Response = Res, Error = std::io::Error> - + Clone - + 'static, + St: Service, Error = std::io::Error> + Clone + 'static, Io: ConnectionIo, - Res: IntoConnectionIo, + St::Response: IntoConnectionIo, { type Response = (Box, Protocol); type Error = ConnectError; @@ -471,10 +476,10 @@ where Error = std::io::Error, Future = Fut2, >, + S::Response: IntoConnectionIo, Fut1: Future, ConnectError>>, Fut2: Future>, Io: ConnectionIo, - Res: IntoConnectionIo, { type Output = Result<(Box, Protocol), ConnectError>; diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 9a3ea7bb5..607e90849 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -28,7 +28,7 @@ mime = "0.3" twoway = "0.2" [dev-dependencies] -actix-rt = "2.1" +actix-rt = "2.2" actix-http = "3.0.0-beta.4" tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 77663540c..1f734582d 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -28,6 +28,6 @@ pin-project = "1.0.0" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.1" +actix-rt = "2.2" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index d8a189565..fdfb9f6ba 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] } proc-macro2 = "1" [dev-dependencies] -actix-rt = "2.1" +actix-rt = "2.2" actix-web = "4.0.0-beta.4" futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index b555ebb22..cc6841606 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -71,7 +71,7 @@ actix-http = { version = "3.0.0-beta.4", features = ["openssl"] } actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] } actix-utils = "3.0.0-beta.1" actix-server = "2.0.0-beta.3" -actix-tls = { version = "3.0.0-beta.4", features = ["openssl", "rustls"] } +actix-tls = { version = "3.0.0-beta.5", features = ["openssl", "rustls"] } brotli2 = "0.3.2" env_logger = "0.8" diff --git a/awc/src/builder.rs b/awc/src/builder.rs index 925d9ae2a..c594b4836 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -4,12 +4,11 @@ use std::net::IpAddr; use std::rc::Rc; use std::time::Duration; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::{ client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection}, http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri}, }; -use actix_rt::net::TcpStream; +use actix_rt::net::{ActixStream, TcpStream}; use actix_service::{boxed, Service}; use crate::connect::DefaultConnector; @@ -64,7 +63,7 @@ where S: Service, Response = TcpConnection, Error = TcpConnectError> + Clone + 'static, - Io: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, + Io: ActixStream + fmt::Debug + 'static, { /// Use custom connector service. pub fn connector(self, connector: Connector) -> ClientBuilder @@ -75,7 +74,7 @@ where Error = TcpConnectError, > + Clone + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, + Io1: ActixStream + fmt::Debug + 'static, { ClientBuilder { middleware: self.middleware, From 1281a748d03aa63fb7662c638d725d91b29a44c7 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 29 Mar 2021 19:06:16 -0700 Subject: [PATCH 7/9] merge H1ServiceHandler requests into HttpServiceHandler (#2126) --- actix-http/src/h1/service.rs | 68 +-------- actix-http/src/service.rs | 260 ++++++++++++----------------------- 2 files changed, 95 insertions(+), 233 deletions(-) diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 4fe79736b..f915bfa47 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -6,7 +6,7 @@ use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; -use futures_core::{future::LocalBoxFuture, ready}; +use futures_core::future::LocalBoxFuture; use futures_util::future::ready; use crate::body::MessageBody; @@ -14,7 +14,7 @@ use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; -use crate::service::HttpFlow; +use crate::service::HttpServiceHandler; use crate::{ConnectCallback, OnConnectData}; use super::codec::Codec; @@ -315,47 +315,10 @@ where } /// `Service` implementation for HTTP/1 transport -pub struct H1ServiceHandler -where - S: Service, - X: Service, - U: Service<(Request, Framed)>, -{ - flow: Rc>, - on_connect_ext: Option>>, - cfg: ServiceConfig, - _phantom: PhantomData, -} - -impl H1ServiceHandler -where - S: Service, - S::Error: Into, - S::Response: Into>, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - fn new( - cfg: ServiceConfig, - service: S, - expect: X, - upgrade: Option, - on_connect_ext: Option>>, - ) -> H1ServiceHandler { - H1ServiceHandler { - flow: HttpFlow::new(service, expect, upgrade), - cfg, - on_connect_ext, - _phantom: PhantomData, - } - } -} +pub type H1ServiceHandler = HttpServiceHandler; impl Service<(T, Option)> - for H1ServiceHandler + for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, S: Service, @@ -372,27 +335,10 @@ where type Future = Dispatcher; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - ready!(self.flow.expect.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http expect service readiness error: {:?}", e); + self._poll_ready(cx).map_err(|e| { + log::error!("HTTP/1 service readiness error: {:?}", e); DispatchError::Service(e) - })?; - - if let Some(ref upg) = self.flow.upgrade { - ready!(upg.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http upgrade service readiness error: {:?}", e); - DispatchError::Service(e) - })?; - }; - - ready!(self.flow.service.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })?; - - Poll::Ready(Ok(())) + }) } fn call(&self, (io, addr): (T, Option)) -> Self::Future { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 1a06cec3d..fd97fb5ef 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -12,7 +12,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; -use futures_core::ready; +use futures_core::{future::LocalBoxFuture, ready}; use h2::server::{handshake, Handshake}; use pin_project::pin_project; @@ -107,7 +107,6 @@ where X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, - >::Future: 'static, { HttpService { expect, @@ -128,7 +127,6 @@ where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, - )>>::Future: 'static, { HttpService { upgrade, @@ -150,23 +148,24 @@ where impl HttpService where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, - >::Future: 'static, U: ServiceFactory< (Request, Framed), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, - )>>::Future: 'static, { /// Create simple tcp stream service pub fn tcp( @@ -196,23 +195,24 @@ mod openssl { impl HttpService, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, - >::Future: 'static, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, - , h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn openssl( @@ -261,23 +261,24 @@ mod rustls { impl HttpService, S, B, X, U> where S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, - >::Future: 'static, U: ServiceFactory< (Request, Framed, h1::Codec>), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, - , h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn rustls( @@ -319,137 +320,117 @@ mod rustls { impl ServiceFactory<(T, Protocol, Option)> for HttpService where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory, + S::Future: 'static, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, X: ServiceFactory, + X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, - >::Future: 'static, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into, U::InitError: fmt::Debug, - )>>::Future: 'static, { type Response = (); type Error = DispatchError; type Config = (); type Service = HttpServiceHandler; type InitError = (); - type Future = HttpServiceResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - HttpServiceResponse { - fut: self.srv.new_service(()), - fut_ex: Some(self.expect.new_service(())), - fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), - expect: None, - upgrade: None, - on_connect_ext: self.on_connect_ext.clone(), - cfg: self.cfg.clone(), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let expect = self.expect.new_service(()); + let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); + let on_connect_ext = self.on_connect_ext.clone(); + let cfg = self.cfg.clone(); -#[doc(hidden)] -#[pin_project] -pub struct HttpServiceResponse -where - S: ServiceFactory, - X: ServiceFactory, - U: ServiceFactory<(Request, Framed)>, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option, - #[pin] - fut_upg: Option, - expect: Option, - upgrade: Option, - on_connect_ext: Option>>, - cfg: ServiceConfig, - _phantom: PhantomData, -} + Box::pin(async move { + let expect = expect + .await + .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; -impl Future for HttpServiceResponse -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory, - S::Error: Into + 'static, - S::InitError: fmt::Debug, - S::Response: Into> + 'static, - >::Future: 'static, - B: MessageBody + 'static, - X: ServiceFactory, - X::Error: Into, - X::InitError: fmt::Debug, - >::Future: 'static, - U: ServiceFactory<(Request, Framed), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, - )>>::Future: 'static, -{ - type Output = - Result, ()>; + let upgrade = match upgrade { + Some(upgrade) => { + let upgrade = upgrade.await.map_err(|e| { + log::error!("Init http upgrade service error: {:?}", e) + })?; + Some(upgrade) + } + None => None, + }; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); + let service = service + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; - if let Some(fut) = this.fut_ex.as_pin_mut() { - let expect = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.expect = Some(expect); - this.fut_ex.set(None); - } - - if let Some(fut) = this.fut_upg.as_pin_mut() { - let upgrade = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.upgrade = Some(upgrade); - this.fut_upg.set(None); - } - - let result = ready!(this - .fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e))); - - Poll::Ready(result.map(|service| { - let this = self.as_mut().project(); - HttpServiceHandler::new( - this.cfg.clone(), + Ok(HttpServiceHandler::new( + cfg, service, - this.expect.take().unwrap(), - this.upgrade.take(), - this.on_connect_ext.clone(), - ) - })) + expect, + upgrade, + on_connect_ext, + )) + }) } } -/// `Service` implementation for HTTP transport +/// `Service` implementation for HTTP/1 and HTTP/2 transport pub struct HttpServiceHandler where S: Service, X: Service, U: Service<(Request, Framed)>, { - flow: Rc>, - cfg: ServiceConfig, - on_connect_ext: Option>>, + pub(super) flow: Rc>, + pub(super) cfg: ServiceConfig, + pub(super) on_connect_ext: Option>>, _phantom: PhantomData, } +impl HttpServiceHandler +where + S: Service, + S::Error: Into, + X: Service, + X::Error: Into, + U: Service<(Request, Framed)>, + U::Error: Into, +{ + pub(super) fn new( + cfg: ServiceConfig, + service: S, + expect: X, + upgrade: Option, + on_connect_ext: Option>>, + ) -> HttpServiceHandler { + HttpServiceHandler { + cfg, + on_connect_ext, + flow: HttpFlow::new(service, expect, upgrade), + _phantom: PhantomData, + } + } + + pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?; + + ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?; + + if let Some(ref upg) = self.flow.upgrade { + ready!(upg.poll_ready(cx).map_err(Into::into))?; + }; + + Poll::Ready(Ok(())) + } +} + /// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow { pub(super) service: S, @@ -467,34 +448,6 @@ impl HttpFlow { } } -impl HttpServiceHandler -where - S: Service, - S::Error: Into + 'static, - S::Future: 'static, - S::Response: Into> + 'static, - B: MessageBody + 'static, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - fn new( - cfg: ServiceConfig, - service: S, - expect: X, - upgrade: Option, - on_connect_ext: Option>>, - ) -> HttpServiceHandler { - HttpServiceHandler { - cfg, - on_connect_ext, - flow: HttpFlow::new(service, expect, upgrade), - _phantom: PhantomData, - } - } -} - impl Service<(T, Protocol, Option)> for HttpServiceHandler where @@ -514,47 +467,10 @@ where type Future = HttpServiceHandlerResponse; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - let ready = self - .flow - .expect - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready(); - - let ready = self - .flow - .service - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready; - - let ready = if let Some(ref upg) = self.flow.upgrade { - upg.poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready - } else { - ready - }; - - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + self._poll_ready(cx).map_err(|e| { + log::error!("HTTP service readiness error: {:?}", e); + DispatchError::Service(e) + }) } fn call( From f66774e30b239546991fb6c0d54c8885171d1b8f Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 30 Mar 2021 03:32:22 +0100 Subject: [PATCH 8/9] remove `From` impl from HttpDate fully removes time crate from public api of -http --- .cargo/config.toml | 3 +++ actix-http/src/header/shared/httpdate.rs | 24 ++++++++++-------------- docs/graphs/net-only.dot | 2 +- 3 files changed, 14 insertions(+), 15 deletions(-) create mode 100644 .cargo/config.toml diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..9d0d9da8c --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[alias] +chk = "hack check --workspace --tests --examples" +lint = "hack --clean-per-run clippy --workspace --tests --examples" diff --git a/actix-http/src/header/shared/httpdate.rs b/actix-http/src/header/shared/httpdate.rs index 72a225589..18278a6d8 100644 --- a/actix-http/src/header/shared/httpdate.rs +++ b/actix-http/src/header/shared/httpdate.rs @@ -1,18 +1,20 @@ -use std::fmt::{self, Display}; -use std::io::Write; -use std::str::FromStr; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::{ + fmt, + io::Write, + str::FromStr, + time::{SystemTime, UNIX_EPOCH}, +}; use bytes::buf::BufMut; use bytes::BytesMut; use http::header::{HeaderValue, InvalidHeaderValue}; -use time::{offset, OffsetDateTime, PrimitiveDateTime}; +use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset}; use crate::error::ParseError; use crate::header::IntoHeaderValue; use crate::time_parser; -/// A timestamp with HTTP formatting and parsing +/// A timestamp with HTTP formatting and parsing. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct HttpDate(OffsetDateTime); @@ -27,18 +29,12 @@ impl FromStr for HttpDate { } } -impl Display for HttpDate { +impl fmt::Display for HttpDate { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Display::fmt(&self.0.format("%a, %d %b %Y %H:%M:%S GMT"), f) } } -impl From for HttpDate { - fn from(dt: OffsetDateTime) -> HttpDate { - HttpDate(dt) - } -} - impl From for HttpDate { fn from(sys: SystemTime) -> HttpDate { HttpDate(PrimitiveDateTime::from(sys).assume_utc()) @@ -54,7 +50,7 @@ impl IntoHeaderValue for HttpDate { wrt, "{}", self.0 - .to_offset(offset!(UTC)) + .to_offset(UtcOffset::UTC) .format("%a, %d %b %Y %H:%M:%S GMT") ) .unwrap(); diff --git a/docs/graphs/net-only.dot b/docs/graphs/net-only.dot index babd612a6..84227cdb0 100644 --- a/docs/graphs/net-only.dot +++ b/docs/graphs/net-only.dot @@ -7,7 +7,7 @@ digraph { } "actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" } - "actix-utils" -> { "actix-rt" "actix-service" "local-waker" } + "actix-utils" -> { "local-waker" } "actix-tracing" -> { "actix-service" } "actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" } "actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" } From c49fe79207aadd5ef7e2a8db5daadf19fab2366a Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 30 Mar 2021 07:46:09 -0700 Subject: [PATCH 9/9] Simplify lifetime annotation in HttpServiceBuilder. Simplify PlStream (#2129) --- actix-http/src/builder.rs | 6 ----- actix-http/src/client/h1proto.rs | 40 +++++++++++--------------------- 2 files changed, 14 insertions(+), 32 deletions(-) diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index fa430c4fe..623bfdda2 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -63,11 +63,9 @@ where X: ServiceFactory, X::Error: Into, X::InitError: fmt::Debug, - >::Future: 'static, U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Error: fmt::Display, U::InitError: fmt::Debug, - )>>::Future: 'static, { /// Set server keep-alive setting. /// @@ -127,7 +125,6 @@ where X1: ServiceFactory, X1::Error: Into, X1::InitError: fmt::Debug, - >::Future: 'static, { HttpServiceBuilder { keep_alive: self.keep_alive, @@ -152,7 +149,6 @@ where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, - )>>::Future: 'static, { HttpServiceBuilder { keep_alive: self.keep_alive, @@ -211,7 +207,6 @@ where S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, - >::Future: 'static, { let cfg = ServiceConfig::new( self.keep_alive, @@ -233,7 +228,6 @@ where S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, - >::Future: 'static, { let cfg = ServiceConfig::new( self.keep_alive, diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 01a6e1edf..8fb08b0ce 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -7,7 +7,7 @@ use std::{ use actix_codec::Framed; use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; -use futures_core::Stream; +use futures_core::{ready, Stream}; use futures_util::{future::poll_fn, SinkExt as _}; use crate::error::PayloadError; @@ -17,7 +17,7 @@ use crate::http::{ StatusCode, }; use crate::message::{RequestHeadType, ResponseHead}; -use crate::payload::{Payload, PayloadStream}; +use crate::payload::Payload; use super::connection::{ConnectionIo, H1Connection}; use super::error::{ConnectError, SendRequestError}; @@ -122,10 +122,7 @@ where Ok((head, Payload::None)) } - _ => { - let pl: PayloadStream = Box::pin(PlStream::new(framed)); - Ok((head, pl.into())) - } + _ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))), } } @@ -194,21 +191,16 @@ where } #[pin_project::pin_project] -pub(crate) struct PlStream -where - Io: ConnectionIo, -{ +pub(crate) struct PlStream { #[pin] - framed: Option, h1::ClientPayloadCodec>>, + framed: Framed, h1::ClientPayloadCodec>, } impl PlStream { fn new(framed: Framed, h1::ClientCodec>) -> Self { let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); - PlStream { - framed: Some(framed), - } + PlStream { framed } } } @@ -219,20 +211,16 @@ impl Stream for PlStream { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let mut framed = self.project().framed.as_pin_mut().unwrap(); + let mut this = self.project(); - match framed.as_mut().next_item(cx)? { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(chunk)) => { - if let Some(chunk) = chunk { - Poll::Ready(Some(Ok(chunk))) - } else { - let keep_alive = framed.codec_ref().keepalive(); - framed.io_mut().on_release(keep_alive); - Poll::Ready(None) - } + match ready!(this.framed.as_mut().next_item(cx)?) { + Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))), + Some(None) => { + let keep_alive = this.framed.codec_ref().keepalive(); + this.framed.io_mut().on_release(keep_alive); + Poll::Ready(None) } - Poll::Ready(None) => Poll::Ready(None), + None => Poll::Ready(None), } } }