From d40b6748bc84a98483657a9422493bc16db760aa Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 22 Oct 2021 07:22:58 +0800 Subject: [PATCH 1/7] remove dead dep (#2420) --- actix-http/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 3abf537fa..a8fc4255f 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -67,7 +67,6 @@ httpdate = "1.0.1" itoa = "0.4" language-tags = "0.3" local-channel = "0.1" -once_cell = "1.5" log = "0.4" mime = "0.3" percent-encoding = "2.1" From d13854505feae2fc1c2435563e209b991bdb4399 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 26 Oct 2021 07:37:40 +0800 Subject: [PATCH 2/7] move actix_http::client module to awc (#2425) --- actix-http/CHANGES.md | 5 + actix-http/Cargo.toml | 7 +- actix-http/src/lib.rs | 1 - actix-test/Cargo.toml | 4 +- awc/Cargo.toml | 19 +- awc/src/builder.rs | 6 +- {actix-http => awc}/src/client/config.rs | 3 +- {actix-http => awc}/src/client/connection.rs | 50 ++--- {actix-http => awc}/src/client/connector.rs | 181 +++++++++---------- {actix-http => awc}/src/client/error.rs | 7 +- {actix-http => awc}/src/client/h1proto.rs | 35 ++-- {actix-http => awc}/src/client/h2proto.rs | 15 +- {actix-http => awc}/src/client/mod.rs | 1 - {actix-http => awc}/src/client/pool.rs | 46 ++--- awc/src/connect.rs | 10 +- awc/src/error.rs | 14 +- awc/src/lib.rs | 32 ++-- awc/src/middleware/redirect.rs | 2 +- 18 files changed, 197 insertions(+), 241 deletions(-) rename {actix-http => awc}/src/client/config.rs (96%) rename {actix-http => awc}/src/client/connection.rs (89%) rename {actix-http => awc}/src/client/connector.rs (88%) rename {actix-http => awc}/src/client/error.rs (98%) rename {actix-http => awc}/src/client/h1proto.rs (92%) rename {actix-http => awc}/src/client/h2proto.rs (96%) rename {actix-http => awc}/src/client/mod.rs (95%) rename {actix-http => awc}/src/client/pool.rs (95%) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 3273847c5..81595c92d 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,11 @@ # Changes ## Unreleased - 2021-xx-xx +### Removed +* `client` module. [#2425] +* `trust-dns` feature. [#2425] + +[#2425]: https://github.com/actix/actix-web/pull/2425 ## 3.0.0-beta.11 - 2021-10-20 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index a8fc4255f..3d45cc8ce 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -37,9 +37,6 @@ compress-brotli = ["brotli2", "__compress"] compress-gzip = ["flate2", "__compress"] compress-zstd = ["zstd", "__compress"] -# trust-dns as client dns resolver -trust-dns = ["trust-dns-resolver"] - # Internal (PRIVATE!) features used to aid testing and cheking feature status. # Don't rely on these whatsoever. They may disappear at anytime. __compress = [] @@ -49,7 +46,7 @@ actix-service = "2.0.0" actix-codec = "0.4.0" actix-utils = "3.0.0" actix-rt = "2.2" -actix-tls = { version = "3.0.0-beta.7", features = ["accept", "connect"] } +actix-tls = { version = "3.0.0-beta.7", features = ["accept"] } ahash = "0.7" base64 = "0.13" @@ -82,8 +79,6 @@ brotli2 = { version="0.3.2", optional = true } flate2 = { version = "1.0.13", optional = true } zstd = { version = "0.7", optional = true } -trust-dns-resolver = { version = "0.20.0", optional = true } - [dev-dependencies] actix-server = "2.0.0-beta.3" actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] } diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 42ce4ffe4..bfb6b8c55 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -29,7 +29,6 @@ extern crate log; pub mod body; mod builder; -pub mod client; mod config; #[cfg(feature = "__compress")] diff --git a/actix-test/Cargo.toml b/actix-test/Cargo.toml index ede72f219..002e7662e 100644 --- a/actix-test/Cargo.toml +++ b/actix-test/Cargo.toml @@ -22,10 +22,10 @@ edition = "2018" default = [] # rustls -rustls = ["tls-rustls", "actix-http/rustls"] +rustls = ["tls-rustls", "actix-http/rustls", "awc/rustls"] # openssl -openssl = ["tls-openssl", "actix-http/openssl"] +openssl = ["tls-openssl", "actix-http/openssl", "awc/openssl"] [dependencies] actix-codec = "0.4.0" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 5a8235336..6eeb9ce51 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -30,10 +30,10 @@ features = ["openssl", "rustls", "compress-brotli", "compress-gzip", "compress-z default = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"] # openssl -openssl = ["tls-openssl", "actix-http/openssl"] +openssl = ["tls-openssl", "actix-tls/openssl"] # rustls -rustls = ["tls-rustls", "actix-http/rustls"] +rustls = ["tls-rustls", "actix-tls/rustls"] # Brotli algorithm content-encoding support compress-brotli = ["actix-http/compress-brotli", "__compress"] @@ -46,7 +46,7 @@ compress-zstd = ["actix-http/compress-zstd", "__compress"] cookies = ["cookie"] # trust-dns as dns resolver -trust-dns = ["actix-http/trust-dns"] +trust-dns = ["trust-dns-resolver"] # Internal (PRIVATE!) features used to aid testing and cheking feature status. # Don't rely on these whatsoever. They may disappear at anytime. @@ -57,13 +57,18 @@ actix-codec = "0.4.0" actix-service = "2.0.0" actix-http = "3.0.0-beta.11" actix-rt = { version = "2.1", default-features = false } +actix-tls = { version = "3.0.0-beta.7", features = ["connect"] } +actix-utils = "3.0.0" +ahash = "0.7" base64 = "0.13" bytes = "1" cfg-if = "1" -cookie = { version = "0.15", features = ["percent-encode"], optional = true } derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false } +futures-util = { version = "0.3.7", default-features = false } +h2 = "0.3" +http = "0.2" itoa = "0.4" log =" 0.4" mime = "0.3" @@ -73,9 +78,15 @@ rand = "0.8" serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.7" +tokio = { version = "1", features = ["sync"] } + +cookie = { version = "0.15", features = ["percent-encode"], optional = true } + tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tls-rustls = { package = "rustls", version = "0.20.0", optional = true, features = ["dangerous_configuration"] } +trust-dns-resolver = { version = "0.20.0", optional = true } + [dev-dependencies] actix-web = { version = "4.0.0-beta.10", features = ["openssl"] } actix-http = { version = "3.0.0-beta.11", features = ["openssl"] } diff --git a/awc/src/builder.rs b/awc/src/builder.rs index c594b4836..11ececa70 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -4,13 +4,11 @@ use std::net::IpAddr; use std::rc::Rc; use std::time::Duration; -use actix_http::{ - client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection}, - http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri}, -}; +use actix_http::http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri}; use actix_rt::net::{ActixStream, TcpStream}; use actix_service::{boxed, Service}; +use crate::client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection}; use crate::connect::DefaultConnector; use crate::error::SendRequestError; use crate::middleware::{NestTransform, Redirect, Transform}; diff --git a/actix-http/src/client/config.rs b/awc/src/client/config.rs similarity index 96% rename from actix-http/src/client/config.rs rename to awc/src/client/config.rs index 1c0405cbc..530c1e03b 100644 --- a/actix-http/src/client/config.rs +++ b/awc/src/client/config.rs @@ -1,5 +1,4 @@ -use std::net::IpAddr; -use std::time::Duration; +use std::{net::IpAddr, time::Duration}; const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2MB const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB diff --git a/actix-http/src/client/connection.rs b/awc/src/client/connection.rs similarity index 89% rename from actix-http/src/client/connection.rs rename to awc/src/client/connection.rs index a30f651ca..97b96fc0a 100644 --- a/actix-http/src/client/connection.rs +++ b/awc/src/client/connection.rs @@ -12,10 +12,9 @@ use bytes::Bytes; use futures_core::future::LocalBoxFuture; use h2::client::SendRequest; -use crate::h1::ClientCodec; -use crate::message::{RequestHeadType, ResponseHead}; -use crate::payload::Payload; -use crate::{body::MessageBody, Error}; +use actix_http::{ + body::MessageBody, h1::ClientCodec, Error, Payload, RequestHeadType, ResponseHead, +}; use super::error::SendRequestError; use super::pool::Acquired; @@ -219,11 +218,7 @@ impl ConnectionType { } } - pub(super) fn from_h1( - io: Io, - created: time::Instant, - acquired: Acquired, - ) -> Self { + pub(super) fn from_h1(io: Io, created: time::Instant, acquired: Acquired) -> Self { Self::H1(H1Connection { io: Some(io), created, @@ -271,9 +266,7 @@ where Connection::Tls(ConnectionType::H2(conn)) => { h2proto::send_request(conn, head.into(), body).await } - _ => unreachable!( - "Plain Tcp connection can be used only in Http1 protocol" - ), + _ => unreachable!("Plain Tcp connection can be used only in Http1 protocol"), } }) } @@ -301,9 +294,7 @@ where Err(SendRequestError::TunnelNotSupported) } Connection::Tcp(ConnectionType::H2(_)) => { - unreachable!( - "Plain Tcp connection can be used only in Http1 protocol" - ) + unreachable!("Plain Tcp connection can be used only in Http1 protocol") } } }) @@ -321,12 +312,8 @@ where buf: &mut ReadBuf<'_>, ) -> Poll> { match self.get_mut() { - Connection::Tcp(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_read(cx, buf) - } - Connection::Tls(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_read(cx, buf) - } + Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf), + Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf), _ => unreachable!("H2Connection can not impl AsyncRead trait"), } } @@ -345,12 +332,8 @@ where buf: &[u8], ) -> Poll> { match self.get_mut() { - Connection::Tcp(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_write(cx, buf) - } - Connection::Tls(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_write(cx, buf) - } + Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf), + Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf), _ => unreachable!(H2_UNREACHABLE_WRITE), } } @@ -363,17 +346,10 @@ where } } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.get_mut() { - Connection::Tcp(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_shutdown(cx) - } - Connection::Tls(ConnectionType::H1(conn)) => { - Pin::new(conn).poll_shutdown(cx) - } + Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx), + Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx), _ => unreachable!(H2_UNREACHABLE_WRITE), } } diff --git a/actix-http/src/client/connector.rs b/awc/src/client/connector.rs similarity index 88% rename from actix-http/src/client/connector.rs rename to awc/src/client/connector.rs index 4314511a3..8a162c4f8 100644 --- a/actix-http/src/client/connector.rs +++ b/awc/src/client/connector.rs @@ -8,6 +8,7 @@ use std::{ time::Duration, }; +use actix_http::Protocol; use actix_rt::{ net::{ActixStream, TcpStream}, time::{sleep, Sleep}, @@ -19,14 +20,13 @@ use actix_tls::connect::{ }; use futures_core::{future::LocalBoxFuture, ready}; use http::Uri; -use pin_project::pin_project; +use pin_project_lite::pin_project; use super::config::ConnectorConfig; use super::connection::{Connection, ConnectionIo}; use super::error::ConnectError; use super::pool::ConnectionPool; use super::Connect; -use super::Protocol; enum SslConnector { #[allow(dead_code)] @@ -99,9 +99,7 @@ impl Connector<()> { /// Build TLS connector with openssl, based on supplied ALPN protocols #[cfg(all(feature = "openssl", not(feature = "rustls")))] fn build_ssl(protocols: Vec>) -> SslConnector { - use actix_tls::connect::tls::openssl::{ - SslConnector as OpensslConnector, SslMethod, - }; + use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod}; use bytes::{BufMut, BytesMut}; let mut alpn = BytesMut::with_capacity(20); @@ -112,7 +110,7 @@ impl Connector<()> { let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); if let Err(err) = ssl.set_alpn_protos(&alpn) { - error!("Can not set ALPN protocol: {:?}", err); + log::error!("Can not set ALPN protocol: {:?}", err); } SslConnector::Openssl(ssl.build()) @@ -148,11 +146,8 @@ where // This remap is to hide ActixStream's trait methods. They are not meant to be called // from user code. Io: ActixStream + fmt::Debug + 'static, - S: Service< - TcpConnect, - Response = TcpConnection, - Error = TcpConnectError, - > + Clone + S: Service, Response = TcpConnection, Error = TcpConnectError> + + Clone + 'static, { /// Tcp connection timeout, i.e. max time to connect to remote host including dns name @@ -171,10 +166,7 @@ where #[cfg(feature = "openssl")] /// Use custom `SslConnector` instance. - pub fn ssl( - mut self, - connector: actix_tls::connect::ssl::openssl::SslConnector, - ) -> Self { + pub fn ssl(mut self, connector: actix_tls::connect::ssl::openssl::SslConnector) -> Self { self.ssl = SslConnector::Openssl(connector); self } @@ -328,10 +320,11 @@ where impl IntoConnectionIo for TcpConnection> { fn into_connection_io(self) -> (Box, Protocol) { let sock = self.into_parts().0; - let h2 = - sock.get_ref().1.alpn_protocol().map_or(false, |protos| { - protos.windows(2).any(|w| w == H2) - }); + let h2 = sock + .get_ref() + .1 + .alpn_protocol() + .map_or(false, |protos| protos.windows(2).any(|w| w == H2)); if h2 { (Box::new(sock), Protocol::Http2) } else { @@ -357,8 +350,8 @@ where let tcp_pool = ConnectionPool::new(tcp_service, tcp_config); let tls_config = self.config; - let tls_pool = tls_service - .map(move |tls_service| ConnectionPool::new(tls_service, tls_config)); + let tls_pool = + tls_service.map(move |tls_service| ConnectionPool::new(tls_service, tls_config)); ConnectorServicePriv { tcp_pool, tls_pool } } @@ -389,10 +382,12 @@ where } } -#[pin_project] -pub struct TcpConnectorFuture { - #[pin] - fut: Fut, +pin_project! { + #[project = TcpConnectorFutureProj] + pub struct TcpConnectorFuture { + #[pin] + fut: Fut, + } } impl Future for TcpConnectorFuture @@ -451,23 +446,25 @@ where } } -#[pin_project(project = TlsConnectorProj)] -#[allow(clippy::large_enum_variant)] -enum TlsConnectorFuture { - TcpConnect { - #[pin] - fut: Fut1, - tls_service: Option, - timeout: Duration, - }, - TlsConnect { - #[pin] - fut: Fut2, - #[pin] - timeout: Sleep, - }, -} +pin_project! { + #[project = TlsConnectorProj] + #[allow(clippy::large_enum_variant)] + enum TlsConnectorFuture { + TcpConnect { + #[pin] + fut: Fut1, + tls_service: Option, + timeout: Duration, + }, + TlsConnect { + #[pin] + fut: Fut2, + #[pin] + timeout: Sleep, + }, + } +} /// helper trait for generic over different TlsStream types between tls crates. trait IntoConnectionIo { fn into_connection_io(self) -> (Box, Protocol); @@ -475,12 +472,7 @@ trait IntoConnectionIo { impl Future for TlsConnectorFuture where - S: Service< - TcpConnection, - Response = Res, - Error = std::io::Error, - Future = Fut2, - >, + S: Service, Response = Res, Error = std::io::Error, Future = Fut2>, S::Response: IntoConnectionIo, Fut1: Future, ConnectError>>, Fut2: Future>, @@ -522,11 +514,7 @@ pub struct TcpConnectorInnerService { } impl TcpConnectorInnerService { - fn new( - service: S, - timeout: Duration, - local_address: Option, - ) -> Self { + fn new(service: S, timeout: Duration, local_address: Option) -> Self { Self { service, timeout, @@ -537,11 +525,8 @@ impl TcpConnectorInnerService { impl Service for TcpConnectorInnerService where - S: Service< - TcpConnect, - Response = TcpConnection, - Error = TcpConnectError, - > + Clone + S: Service, Response = TcpConnection, Error = TcpConnectError> + + Clone + 'static, { type Response = S::Response; @@ -564,12 +549,14 @@ where } } -#[pin_project] -pub struct TcpConnectorInnerFuture { - #[pin] - fut: Fut, - #[pin] - timeout: Sleep, +pin_project! { + #[project = TcpConnectorInnerFutureProj] + pub struct TcpConnectorInnerFuture { + #[pin] + fut: Fut, + #[pin] + timeout: Sleep, + } } impl Future for TcpConnectorInnerFuture @@ -618,12 +605,8 @@ where impl Service for ConnectorServicePriv where - S1: Service - + Clone - + 'static, - S2: Service - + Clone - + 'static, + S1: Service + Clone + 'static, + S2: Service + Clone + 'static, Io1: ConnectionIo, Io2: ConnectionIo, { @@ -643,38 +626,46 @@ where match req.uri.scheme_str() { Some("https") | Some("wss") => match self.tls_pool { None => ConnectorServiceFuture::SslIsNotSupported, - Some(ref pool) => ConnectorServiceFuture::Tls(pool.call(req)), + Some(ref pool) => ConnectorServiceFuture::Tls { + fut: pool.call(req), + }, + }, + _ => ConnectorServiceFuture::Tcp { + fut: self.tcp_pool.call(req), }, - _ => ConnectorServiceFuture::Tcp(self.tcp_pool.call(req)), } } } -#[pin_project(project = ConnectorServiceProj)] -pub enum ConnectorServiceFuture -where - S1: Service - + Clone - + 'static, - S2: Service - + Clone - + 'static, - Io1: ConnectionIo, - Io2: ConnectionIo, -{ - Tcp(#[pin] as Service>::Future), - Tls(#[pin] as Service>::Future), - SslIsNotSupported, +pin_project! { + #[project = ConnectorServiceFutureProj] + pub enum ConnectorServiceFuture + where + S1: Service, + S1: Clone, + S1: 'static, + S2: Service, + S2: Clone, + S2: 'static, + Io1: ConnectionIo, + Io2: ConnectionIo, + { + Tcp { + #[pin] + fut: as Service>::Future + }, + Tls { + #[pin] + fut: as Service>::Future + }, + SslIsNotSupported + } } impl Future for ConnectorServiceFuture where - S1: Service - + Clone - + 'static, - S2: Service - + Clone - + 'static, + S1: Service + Clone + 'static, + S2: Service + Clone + 'static, Io1: ConnectionIo, Io2: ConnectionIo, { @@ -682,9 +673,9 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { - ConnectorServiceProj::Tcp(fut) => fut.poll(cx).map_ok(Connection::Tcp), - ConnectorServiceProj::Tls(fut) => fut.poll(cx).map_ok(Connection::Tls), - ConnectorServiceProj::SslIsNotSupported => { + ConnectorServiceFutureProj::Tcp { fut } => fut.poll(cx).map_ok(Connection::Tcp), + ConnectorServiceFutureProj::Tls { fut } => fut.poll(cx).map_ok(Connection::Tls), + ConnectorServiceFutureProj::SslIsNotSupported => { Poll::Ready(Err(ConnectError::SslIsNotSupported)) } } diff --git a/actix-http/src/client/error.rs b/awc/src/client/error.rs similarity index 98% rename from actix-http/src/client/error.rs rename to awc/src/client/error.rs index 34833503b..0f3b1fdea 100644 --- a/actix-http/src/client/error.rs +++ b/awc/src/client/error.rs @@ -2,12 +2,13 @@ use std::{error::Error as StdError, fmt, io}; use derive_more::{Display, From}; +use actix_http::{ + error::{Error, ParseError}, + http::Error as HttpError, +}; #[cfg(feature = "openssl")] use actix_tls::accept::openssl::SslError; -use crate::error::{Error, ParseError}; -use crate::http::Error as HttpError; - /// A set of errors that can occur while connecting to an HTTP host #[derive(Debug, Display, From)] #[non_exhaustive] diff --git a/actix-http/src/client/h1proto.rs b/awc/src/client/h1proto.rs similarity index 92% rename from actix-http/src/client/h1proto.rs rename to awc/src/client/h1proto.rs index 65a30748c..3c2bb7cc1 100644 --- a/actix-http/src/client/h1proto.rs +++ b/awc/src/client/h1proto.rs @@ -5,24 +5,25 @@ use std::{ }; use actix_codec::Framed; +use actix_http::{ + body::{BodySize, MessageBody}, + error::PayloadError, + h1, + http::{ + header::{HeaderMap, IntoHeaderValue, EXPECT, HOST}, + StatusCode, + }, + Error, Payload, RequestHeadType, ResponseHead, +}; use actix_utils::future::poll_fn; use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use futures_core::{ready, Stream}; use futures_util::SinkExt as _; - -use crate::h1; -use crate::http::{ - header::{HeaderMap, IntoHeaderValue, EXPECT, HOST}, - StatusCode, -}; -use crate::message::{RequestHeadType, ResponseHead}; -use crate::payload::Payload; -use crate::{error::PayloadError, Error}; +use pin_project_lite::pin_project; use super::connection::{ConnectionIo, H1Connection}; use super::error::{ConnectError, SendRequestError}; -use crate::body::{BodySize, MessageBody}; pub(crate) async fn send_request( io: H1Connection, @@ -194,10 +195,11 @@ where Ok(()) } -#[pin_project::pin_project] -pub(crate) struct PlStream { - #[pin] - framed: Framed, h1::ClientPayloadCodec>, +pin_project! { + pub(crate) struct PlStream { + #[pin] + framed: Framed, h1::ClientPayloadCodec>, + } } impl PlStream { @@ -211,10 +213,7 @@ impl PlStream { impl Stream for PlStream { type Item = Result; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); match ready!(this.framed.as_mut().next_item(cx)?) { diff --git a/actix-http/src/client/h2proto.rs b/awc/src/client/h2proto.rs similarity index 96% rename from actix-http/src/client/h2proto.rs rename to awc/src/client/h2proto.rs index b9d5f96bd..feb2dbd06 100644 --- a/actix-http/src/client/h2proto.rs +++ b/awc/src/client/h2proto.rs @@ -8,13 +8,12 @@ use h2::{ }; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; +use log::trace; -use crate::{ +use actix_http::{ body::{BodySize, MessageBody}, header::HeaderMap, - message::{RequestHeadType, ResponseHead}, - payload::Payload, - Error, + Error, Payload, RequestHeadType, ResponseHead, }; use super::{ @@ -131,10 +130,7 @@ where Ok((head, payload)) } -async fn send_body( - body: B, - mut send: SendStream, -) -> Result<(), SendRequestError> +async fn send_body(body: B, mut send: SendStream) -> Result<(), SendRequestError> where B: MessageBody, B::Error: Into, @@ -184,8 +180,7 @@ where pub(crate) fn handshake( io: Io, config: &ConnectorConfig, -) -> impl Future, Connection), h2::Error>> -{ +) -> impl Future, Connection), h2::Error>> { let mut builder = Builder::new(); builder .initial_window_size(config.stream_window_size) diff --git a/actix-http/src/client/mod.rs b/awc/src/client/mod.rs similarity index 95% rename from actix-http/src/client/mod.rs rename to awc/src/client/mod.rs index 41d5fef2a..3abbf50a5 100644 --- a/actix-http/src/client/mod.rs +++ b/awc/src/client/mod.rs @@ -17,7 +17,6 @@ pub use actix_tls::connect::{ pub use self::connection::{Connection, ConnectionIo}; pub use self::connector::{Connector, ConnectorService}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; -pub use crate::Protocol; #[derive(Clone)] pub struct Connect { diff --git a/actix-http/src/client/pool.rs b/awc/src/client/pool.rs similarity index 95% rename from actix-http/src/client/pool.rs rename to awc/src/client/pool.rs index 88188038f..229c6324a 100644 --- a/actix-http/src/client/pool.rs +++ b/awc/src/client/pool.rs @@ -14,22 +14,20 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; +use actix_http::Protocol; use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use ahash::AHashMap; use futures_core::future::LocalBoxFuture; use http::uri::Authority; -use pin_project::pin_project; +use pin_project_lite::pin_project; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::config::ConnectorConfig; -use super::connection::{ - ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner, -}; +use super::connection::{ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner}; use super::error::ConnectError; use super::h2proto::handshake; use super::Connect; -use super::Protocol; #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub struct Key { @@ -152,9 +150,7 @@ where impl Service for ConnectionPool where - S: Service - + Clone - + 'static, + S: Service + Clone + 'static, Io: ConnectionIo, { type Response = ConnectionType; @@ -195,8 +191,8 @@ where let config = &inner.config; let idle_dur = now - c.used; let age = now - c.created; - let conn_ineligible = idle_dur > config.conn_keep_alive - || age > config.conn_lifetime; + let conn_ineligible = + idle_dur > config.conn_keep_alive || age > config.conn_lifetime; if conn_ineligible { // drop connections that are too old @@ -231,9 +227,7 @@ where // match the connection and spawn new one if did not get anything. match conn { - Some(conn) => { - Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired)) - } + Some(conn) => Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired)), None => { let (io, proto) = connector.call(req).await?; @@ -284,9 +278,7 @@ where let mut read_buf = ReadBuf::new(&mut buf); let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) { - Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { - ConnectionState::Tainted - } + Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => ConnectionState::Tainted, Poll::Pending => ConnectionState::Live, _ => ConnectionState::Skip, @@ -302,11 +294,13 @@ struct PooledConnection { created: Instant, } -#[pin_project] -struct CloseConnection { - io: Io, - #[pin] - timeout: Sleep, +pin_project! { + #[project = CloseConnectionProj] + struct CloseConnection { + io: Io, + #[pin] + timeout: Sleep, + } } impl CloseConnection @@ -413,17 +407,11 @@ mod test { unimplemented!() } - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { unimplemented!() } - fn poll_shutdown( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 6a9fc4630..f27a8c368 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -8,16 +8,14 @@ use std::{ use actix_codec::Framed; use actix_http::{ - body::Body, - client::{ - Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError, - }, - h1::ClientCodec, - Payload, RequestHead, RequestHeadType, ResponseHead, + body::Body, h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead, }; use actix_service::Service; use futures_core::{future::LocalBoxFuture, ready}; +use crate::client::{ + Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError, +}; use crate::response::ClientResponse; pub type BoxConnectorService = Rc< diff --git a/awc/src/error.rs b/awc/src/error.rs index c83c5ebbf..d415efe95 100644 --- a/awc/src/error.rs +++ b/awc/src/error.rs @@ -1,15 +1,15 @@ //! HTTP client errors -pub use actix_http::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; -pub use actix_http::error::PayloadError; -pub use actix_http::http::Error as HttpError; -pub use actix_http::ws::HandshakeError as WsHandshakeError; -pub use actix_http::ws::ProtocolError as WsProtocolError; +pub use actix_http::{ + error::PayloadError, + http::{header::HeaderValue, Error as HttpError, StatusCode}, + ws::{HandshakeError as WsHandshakeError, ProtocolError as WsProtocolError}, +}; +use derive_more::{Display, From}; use serde_json::error::Error as JsonError; -use actix_http::http::{header::HeaderValue, StatusCode}; -use derive_more::{Display, From}; +pub use crate::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; /// Websocket client error #[derive(Debug, Display, From)] diff --git a/awc/src/lib.rs b/awc/src/lib.rs index c0290ddcf..05f97aa3d 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -104,22 +104,8 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -use std::{convert::TryFrom, rc::Rc, time::Duration}; - -#[cfg(feature = "cookies")] -pub use cookie; - -pub use actix_http::{client::Connector, http}; - -use actix_http::{ - client::{TcpConnect, TcpConnectError, TcpConnection}, - http::{Error as HttpError, HeaderMap, Method, Uri}, - RequestHead, -}; -use actix_rt::net::TcpStream; -use actix_service::Service; - mod builder; +mod client; mod connect; pub mod error; mod frozen; @@ -130,13 +116,29 @@ mod sender; pub mod test; pub mod ws; +pub use actix_http::http; +#[cfg(feature = "cookies")] +pub use cookie; + pub use self::builder::ClientBuilder; +pub use self::client::Connector; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; pub use self::response::{ClientResponse, JsonBody, MessageBody}; pub use self::sender::SendClientRequest; +use std::{convert::TryFrom, rc::Rc, time::Duration}; + +use actix_http::{ + http::{Error as HttpError, HeaderMap, Method, Uri}, + RequestHead, +}; +use actix_rt::net::TcpStream; +use actix_service::Service; + +use self::client::{TcpConnect, TcpConnectError, TcpConnection}; + /// An asynchronous HTTP and WebSocket client. /// /// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU diff --git a/awc/src/middleware/redirect.rs b/awc/src/middleware/redirect.rs index a8c14d549..8a79a6596 100644 --- a/awc/src/middleware/redirect.rs +++ b/awc/src/middleware/redirect.rs @@ -9,7 +9,6 @@ use std::{ use actix_http::{ body::Body, - client::{InvalidUrl, SendRequestError}, http::{header, Method, StatusCode, Uri}, RequestHead, RequestHeadType, }; @@ -19,6 +18,7 @@ use futures_core::ready; use super::Transform; +use crate::client::{InvalidUrl, SendRequestError}; use crate::connect::{ConnectRequest, ConnectResponse}; use crate::ClientResponse; From 855e260fdb0f983537c463fd8d64c3eeb38ab546 Mon Sep 17 00:00:00 2001 From: Luca Palmieri Date: Tue, 26 Oct 2021 09:24:38 +0100 Subject: [PATCH 3/7] Add `html_utf8` content type. (#2423) --- CHANGES.md | 2 ++ src/http/header/content_type.rs | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9d7b3180d..3be41d468 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +* `ContentType::html` now returns `Content-Type: text/html; charset=utf-8` instead of `Content-Type: text/html`. ## 4.0.0-beta.10 - 2021-10-20 diff --git a/src/http/header/content_type.rs b/src/http/header/content_type.rs index e1c419c22..230460003 100644 --- a/src/http/header/content_type.rs +++ b/src/http/header/content_type.rs @@ -60,52 +60,53 @@ crate::http::header::common_header! { } impl ContentType { - /// A constructor to easily create a `Content-Type: application/json` + /// A constructor to easily create a `Content-Type: application/json` /// header. #[inline] pub fn json() -> ContentType { ContentType(mime::APPLICATION_JSON) } - /// A constructor to easily create a `Content-Type: text/plain; + /// A constructor to easily create a `Content-Type: text/plain; /// charset=utf-8` header. #[inline] pub fn plaintext() -> ContentType { ContentType(mime::TEXT_PLAIN_UTF_8) } - /// A constructor to easily create a `Content-Type: text/html` header. + /// A constructor to easily create a `Content-Type: text/html; charset=utf-8` + /// header. #[inline] pub fn html() -> ContentType { - ContentType(mime::TEXT_HTML) + ContentType(mime::TEXT_HTML_UTF_8) } - /// A constructor to easily create a `Content-Type: text/xml` header. + /// A constructor to easily create a `Content-Type: text/xml` header. #[inline] pub fn xml() -> ContentType { ContentType(mime::TEXT_XML) } - /// A constructor to easily create a `Content-Type: + /// A constructor to easily create a `Content-Type: /// application/www-form-url-encoded` header. #[inline] pub fn form_url_encoded() -> ContentType { ContentType(mime::APPLICATION_WWW_FORM_URLENCODED) } - /// A constructor to easily create a `Content-Type: image/jpeg` header. + /// A constructor to easily create a `Content-Type: image/jpeg` header. #[inline] pub fn jpeg() -> ContentType { ContentType(mime::IMAGE_JPEG) } - /// A constructor to easily create a `Content-Type: image/png` header. + /// A constructor to easily create a `Content-Type: image/png` header. #[inline] pub fn png() -> ContentType { ContentType(mime::IMAGE_PNG) } - /// A constructor to easily create a `Content-Type: + /// A constructor to easily create a `Content-Type: /// application/octet-stream` header. #[inline] pub fn octet_stream() -> ContentType { From be9530eb729035cf4225753a2d6f64a12e0e6140 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 26 Oct 2021 20:16:48 +0800 Subject: [PATCH 4/7] avoid building actix-tls with no-default-features (#2426) --- actix-http/Cargo.toml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 3d45cc8ce..7d39e000a 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -27,10 +27,10 @@ path = "src/lib.rs" default = [] # openssl -openssl = ["actix-tls/openssl"] +openssl = ["actix-tls/accept", "actix-tls/openssl"] # rustls support -rustls = ["actix-tls/rustls"] +rustls = ["actix-tls/accept", "actix-tls/rustls"] # enable compression support compress-brotli = ["brotli2", "__compress"] @@ -46,7 +46,6 @@ actix-service = "2.0.0" actix-codec = "0.4.0" actix-utils = "3.0.0" actix-rt = "2.2" -actix-tls = { version = "3.0.0-beta.7", features = ["accept"] } ahash = "0.7" base64 = "0.13" @@ -74,6 +73,9 @@ sha-1 = "0.9" smallvec = "1.6.1" tokio = { version = "1.2", features = ["sync"] } +# tls +actix-tls = { version = "3.0.0-beta.7", default-features = false, optional = true } + # compression brotli2 = { version="0.3.2", optional = true } flate2 = { version = "1.0.13", optional = true } From ec6d284a8e393d8c8f5bf60eece49abbab62658f Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Sun, 31 Oct 2021 16:19:21 +0300 Subject: [PATCH 5/7] improve "data no configured" message (#2429) --- src/data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/data.rs b/src/data.rs index 7e01d3462..d27ad196b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -137,7 +137,7 @@ impl FromRequest for Data { type_name::(), ); err(ErrorInternalServerError( - "App data is not configured, to configure use App::data()", + "App data is not configured, to configure construct it with web::Data::new() and pass it to App::app_data()", )) } } From 6ec2d7b90966bc55b72ee4c90129074742d087bc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Nov 2021 23:15:23 +0800 Subject: [PATCH 6/7] add keep alive to h2 through ping pong (#2433) --- actix-http/src/h2/dispatcher.rs | 147 ++++++++++++++++++-------- actix-http/tests/test_h2_ping_pong.rs | 77 ++++++++++++++ 2 files changed, 180 insertions(+), 44 deletions(-) create mode 100644 actix-http/tests/test_h2_ping_pong.rs diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index ea149b1e0..7326dfff1 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -10,11 +10,15 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite}; +use actix_rt::time::Sleep; use actix_service::Service; use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; use futures_core::ready; -use h2::server::{Connection, SendResponse}; +use h2::{ + server::{Connection, SendResponse}, + Ping, PingPong, +}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; use pin_project_lite::pin_project; @@ -36,29 +40,46 @@ pin_project! { on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, - _phantom: PhantomData, + ping_pong: Option, + _phantom: PhantomData } } -impl Dispatcher { +impl Dispatcher +where + T: AsyncRead + AsyncWrite + Unpin, +{ pub(crate) fn new( flow: Rc>, - connection: Connection, + mut connection: Connection, on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, ) -> Self { + let ping_pong = config.keep_alive_timer().map(|timer| H2PingPong { + timer: Box::pin(timer), + on_flight: false, + ping_pong: connection.ping_pong().unwrap(), + }); + Self { flow, config, peer_addr, connection, on_connect_data, + ping_pong, _phantom: PhantomData, } } } +struct H2PingPong { + timer: Pin>, + on_flight: bool, + ping_pong: PingPong, +} + impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -77,54 +98,92 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some((req, tx)) = - ready!(Pin::new(&mut this.connection).poll_accept(cx)?) - { - let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::::H2(pl); - let mut req = Request::with_payload(pl); + loop { + match Pin::new(&mut this.connection).poll_accept(cx)? { + Poll::Ready(Some((req, tx))) => { + let (parts, body) = req.into_parts(); + let pl = crate::h2::Payload::new(body); + let pl = Payload::::H2(pl); + let mut req = Request::with_payload(pl); - let head = req.head_mut(); - head.uri = parts.uri; - head.method = parts.method; - head.version = parts.version; - head.headers = parts.headers.into(); - head.peer_addr = this.peer_addr; + let head = req.head_mut(); + head.uri = parts.uri; + head.method = parts.method; + head.version = parts.version; + head.headers = parts.headers.into(); + head.peer_addr = this.peer_addr; - // merge on_connect_ext data into request extensions - this.on_connect_data.merge_into(&mut req); + // merge on_connect_ext data into request extensions + this.on_connect_data.merge_into(&mut req); - let fut = this.flow.service.call(req); - let config = this.config.clone(); + let fut = this.flow.service.call(req); + let config = this.config.clone(); - // multiplex request handling with spawn task - actix_rt::spawn(async move { - // resolve service call and send response. - let res = match fut.await { - Ok(res) => handle_response(res.into(), tx, config).await, - Err(err) => { - let res: Response = err.into(); - handle_response(res, tx, config).await - } - }; + // multiplex request handling with spawn task + actix_rt::spawn(async move { + // resolve service call and send response. + let res = match fut.await { + Ok(res) => handle_response(res.into(), tx, config).await, + Err(err) => { + let res: Response = err.into(); + handle_response(res, tx, config).await + } + }; - // log error. - if let Err(err) = res { - match err { - DispatchError::SendResponse(err) => { - trace!("Error sending HTTP/2 response: {:?}", err) + // log error. + if let Err(err) = res { + match err { + DispatchError::SendResponse(err) => { + trace!("Error sending HTTP/2 response: {:?}", err) + } + DispatchError::SendData(err) => warn!("{:?}", err), + DispatchError::ResponseBody(err) => { + error!("Response payload stream error: {:?}", err) + } + } } - DispatchError::SendData(err) => warn!("{:?}", err), - DispatchError::ResponseBody(err) => { - error!("Response payload stream error: {:?}", err) - } - } + }); } - }); - } + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => match this.ping_pong.as_mut() { + Some(ping_pong) => loop { + if ping_pong.on_flight { + // When have on flight ping pong. poll pong and and keep alive timer. + // on success pong received update keep alive timer to determine the next timing of + // ping pong. + match ping_pong.ping_pong.poll_pong(cx)? { + Poll::Ready(_) => { + ping_pong.on_flight = false; - Poll::Ready(Ok(())) + let dead_line = + this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + } + Poll::Pending => { + return ping_pong + .timer + .as_mut() + .poll(cx) + .map(|_| Ok(())) + } + } + } else { + // When there is no on flight ping pong. keep alive timer is used to wait for next + // timing of ping pong. Therefore at this point it serves as an interval instead. + ready!(ping_pong.timer.as_mut().poll(cx)); + + ping_pong.ping_pong.send_ping(Ping::opaque())?; + + let dead_line = this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + + ping_pong.on_flight = true; + } + }, + None => return Poll::Pending, + }, + } + } } } diff --git a/actix-http/tests/test_h2_ping_pong.rs b/actix-http/tests/test_h2_ping_pong.rs new file mode 100644 index 000000000..5e03785a1 --- /dev/null +++ b/actix-http/tests/test_h2_ping_pong.rs @@ -0,0 +1,77 @@ +use std::io; + +use actix_http::{error::Error, HttpService, Response}; +use actix_server::Server; + +#[actix_rt::test] +async fn h2_ping_pong() -> io::Result<()> { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + + let lst = std::net::TcpListener::bind("127.0.0.1:0")?; + + let addr = lst.local_addr().unwrap(); + + let join = std::thread::spawn(move || { + actix_rt::System::new().block_on(async move { + let handle = Server::build() + .disable_signals() + .workers(1) + .listen("h2_ping_pong", lst, || { + HttpService::build() + .keep_alive(3) + .h2(|_| async { Ok::<_, Error>(Response::ok()) }) + .tcp() + })? + .run(); + + tx.send(handle.clone()).unwrap(); + + handle.await + }) + }); + + let handle = rx.recv().unwrap(); + + let (sync_tx, rx) = std::sync::mpsc::sync_channel(1); + + // use a separate thread for h2 client so it can be blocked. + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let stream = tokio::net::TcpStream::connect(addr).await.unwrap(); + + let (mut tx, conn) = h2::client::handshake(stream).await.unwrap(); + + tokio::spawn(async move { conn.await.unwrap() }); + + let (res, _) = tx.send_request(::http::Request::new(()), true).unwrap(); + let res = res.await.unwrap(); + + assert_eq!(res.status().as_u16(), 200); + + sync_tx.send(()).unwrap(); + + // intentionally block the client thread so it can not answer ping pong. + std::thread::sleep(std::time::Duration::from_secs(1000)); + }) + }); + + rx.recv().unwrap(); + + let now = std::time::Instant::now(); + + // stop server gracefully. this step would take up to 30 seconds. + handle.stop(true).await; + + // join server thread. only when connection are all gone this step would finish. + join.join().unwrap()?; + + // check the time used for join server thread so it's known that the server shutdown + // is from keep alive and not server graceful shutdown timeout. + assert!(now.elapsed() < std::time::Duration::from_secs(30)); + + Ok(()) +} From 5e554dca35844c241e813663da09e20575d586d3 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Nov 2021 23:57:55 +0800 Subject: [PATCH 7/7] fix awc clippy warning (#2431) --- awc/src/client/pool.rs | 3 ++- awc/tests/test_client.rs | 14 ++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/awc/src/client/pool.rs b/awc/src/client/pool.rs index 229c6324a..9d130412b 100644 --- a/awc/src/client/pool.rs +++ b/awc/src/client/pool.rs @@ -19,6 +19,7 @@ use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use ahash::AHashMap; use futures_core::future::LocalBoxFuture; +use futures_util::FutureExt; use http::uri::Authority; use pin_project_lite::pin_project; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; @@ -201,7 +202,7 @@ where // check if the connection is still usable if let ConnectionInnerType::H1(ref mut io) = c.conn { let check = ConnectionCheckFuture { io }; - match check.await { + match check.now_or_never().expect("ConnectionCheckFuture must never yield with Poll::Pending.") { ConnectionState::Tainted => { inner.close(c.conn); continue; diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index 615789fb3..a0af0cab6 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -795,17 +795,15 @@ async fn client_unread_response() { let lst = std::net::TcpListener::bind(addr).unwrap(); std::thread::spawn(move || { - for stream in lst.incoming() { - let mut stream = stream.unwrap(); - let mut b = [0; 1000]; - let _ = stream.read(&mut b).unwrap(); - let _ = stream.write_all( - b"HTTP/1.1 200 OK\r\n\ + let (mut stream, _) = lst.accept().unwrap(); + let mut b = [0; 1000]; + let _ = stream.read(&mut b).unwrap(); + let _ = stream.write_all( + b"HTTP/1.1 200 OK\r\n\ connection: close\r\n\ \r\n\ welcome!", - ); - } + ); }); // client request