Merge branch 'master' into style/h1_poll_response_loop

This commit is contained in:
fakeshadow 2021-03-18 10:02:52 -07:00 committed by GitHub
commit 754c848de6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 231 additions and 292 deletions

View File

@ -26,6 +26,21 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
# install OpenSSL on Windows
- name: Set vcpkg root
if: matrix.target.triple == 'x86_64-pc-windows-msvc'
run: echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append
- name: Install OpenSSL
if: matrix.target.triple == 'x86_64-pc-windows-msvc'
run: vcpkg install openssl:x64-windows
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
profile: minimal
override: true
- name: Install ${{ matrix.version }} - name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1 uses: actions-rs/toolchain@v1
with: with:

View File

@ -111,12 +111,6 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tls-rustls = { package = "rustls", version = "0.19.0", optional = true } tls-rustls = { package = "rustls", version = "0.19.0", optional = true }
url = "2.1" url = "2.1"
[target.'cfg(windows)'.dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
optional = true
[dev-dependencies] [dev-dependencies]
brotli2 = "0.3.2" brotli2 = "0.3.2"
criterion = "0.3" criterion = "0.3"

View File

@ -50,12 +50,6 @@ serde_urlencoded = "0.7"
time = { version = "0.2.23", default-features = false, features = ["std"] } time = { version = "0.2.23", default-features = false, features = ["std"] }
tls-openssl = { version = "0.10.9", package = "openssl", optional = true } tls-openssl = { version = "0.10.9", package = "openssl", optional = true }
[target.'cfg(windows)'.dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
optional = true
[dev-dependencies] [dev-dependencies]
actix-web = { version = "4.0.0-beta.4", default-features = false, features = ["cookies"] } actix-web = { version = "4.0.0-beta.4", default-features = false, features = ["cookies"] }
actix-http = "3.0.0-beta.4" actix-http = "3.0.0-beta.4"

View File

@ -98,11 +98,6 @@ serde_derive = "1.0"
tls-openssl = { version = "0.10", package = "openssl" } tls-openssl = { version = "0.10", package = "openssl" }
tls-rustls = { version = "0.19", package = "rustls" } tls-rustls = { version = "0.19", package = "rustls" }
[target.'cfg(windows)'.dev-dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
[[example]] [[example]]
name = "ws" name = "ws"
required-features = ["rustls"] required-features = ["rustls"]

View File

@ -3,7 +3,7 @@ use std::{env, io};
use actix_http::{Error, HttpService, Request, Response}; use actix_http::{Error, HttpService, Request, Response};
use actix_server::Server; use actix_server::Server;
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::StreamExt; use futures_util::StreamExt as _;
use http::header::HeaderValue; use http::header::HeaderValue;
use log::info; use log::info;

View File

@ -4,7 +4,7 @@ use actix_http::http::HeaderValue;
use actix_http::{Error, HttpService, Request, Response}; use actix_http::{Error, HttpService, Request, Response};
use actix_server::Server; use actix_server::Server;
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::StreamExt; use futures_util::StreamExt as _;
use log::info; use log::info;
async fn handle_request(mut req: Request) -> Result<Response, Error> { async fn handle_request(mut req: Request) -> Result<Response, Error> {

View File

@ -19,6 +19,10 @@ use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
use super::{h1proto, h2proto}; use super::{h1proto, h2proto};
pub trait ConnectionIo: AsyncRead + AsyncWrite + Unpin + 'static {}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ConnectionIo for T {}
pub(crate) enum ConnectionType<Io> { pub(crate) enum ConnectionType<Io> {
H1(Io), H1(Io),
H2(H2Connection), H2(H2Connection),
@ -94,14 +98,6 @@ pub trait Connection {
>; >;
} }
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
/// Close connection
fn close(self: Pin<&mut Self>);
/// Release connection to the connection pool
fn release(self: Pin<&mut Self>);
}
#[doc(hidden)] #[doc(hidden)]
/// HTTP client connection /// HTTP client connection
pub struct IoConnection<T> pub struct IoConnection<T>
@ -110,7 +106,7 @@ where
{ {
io: Option<ConnectionType<T>>, io: Option<ConnectionType<T>>,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Acquired<T>,
} }
impl<T> fmt::Debug for IoConnection<T> impl<T> fmt::Debug for IoConnection<T>
@ -130,7 +126,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
pub(crate) fn new( pub(crate) fn new(
io: ConnectionType<T>, io: ConnectionType<T>,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Acquired<T>,
) -> Self { ) -> Self {
IoConnection { IoConnection {
pool, pool,
@ -139,13 +135,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
} }
} }
pub(crate) fn into_inner(self) -> (ConnectionType<T>, time::Instant) {
(self.io.unwrap(), self.created)
}
#[cfg(test)] #[cfg(test)]
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) { pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
(self.io.unwrap(), self.created, self.pool.unwrap()) (self.io.unwrap(), self.created, self.pool)
} }
async fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>( async fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
@ -173,13 +165,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
match self.io.take().unwrap() { match self.io.take().unwrap() {
ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await, ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await,
ConnectionType::H2(io) => { ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() { self.pool.release(ConnectionType::H2(io), self.created);
pool.release(IoConnection::new(
ConnectionType::H2(io),
self.created,
None,
));
}
Err(SendRequestError::TunnelNotSupported) Err(SendRequestError::TunnelNotSupported)
} }
} }

View File

@ -18,7 +18,7 @@ use futures_core::ready;
use http::Uri; use http::Uri;
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::{Connection, EitherIoConnection}; use super::connection::{Connection, ConnectionIo, EitherIoConnection};
use super::error::ConnectError; use super::error::ConnectError;
use super::pool::ConnectionPool; use super::pool::ConnectionPool;
use super::Connect; use super::Connect;
@ -61,9 +61,6 @@ pub struct Connector<T> {
ssl: SslConnector, ssl: SslConnector,
} }
pub trait Io: AsyncRead + AsyncWrite + Unpin {}
impl<T: AsyncRead + AsyncWrite + Unpin> Io for T {}
impl Connector<()> { impl Connector<()> {
#[allow(clippy::new_ret_no_self, clippy::let_unit_value)] #[allow(clippy::new_ret_no_self, clippy::let_unit_value)]
pub fn new() -> Connector< pub fn new() -> Connector<
@ -281,16 +278,16 @@ where
pub type DummyService = Box< pub type DummyService = Box<
dyn Service< dyn Service<
Connect, Connect,
Response = (Box<dyn Io>, Protocol), Response = (Box<dyn ConnectionIo>, Protocol),
Error = ConnectError, Error = ConnectError,
Future = futures_core::future::LocalBoxFuture< Future = futures_core::future::LocalBoxFuture<
'static, 'static,
Result<(Box<dyn Io>, Protocol), ConnectError>, Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>,
>, >,
>, >,
>; >;
InnerConnector::<_, DummyService, _, Box<dyn Io>> { InnerConnector::<_, DummyService, _, Box<dyn ConnectionIo>> {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
tcp_service, tcp_service,
self.config.no_disconnect_timeout(), self.config.no_disconnect_timeout(),
@ -334,9 +331,12 @@ where
.map(|protos| protos.windows(2).any(|w| w == H2)) .map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false); .unwrap_or(false);
if h2 { if h2 {
(Box::new(sock) as Box<dyn Io>, Protocol::Http2) (
Box::new(sock) as Box<dyn ConnectionIo>,
Protocol::Http2,
)
} else { } else {
(Box::new(sock) as Box<dyn Io>, Protocol::Http1) (Box::new(sock) as _, Protocol::Http1)
} }
}) })
.map_err(ConnectError::from), .map_err(ConnectError::from),
@ -354,9 +354,9 @@ where
.map(|protos| protos.windows(2).any(|w| w == H2)) .map(|protos| protos.windows(2).any(|w| w == H2))
.unwrap_or(false); .unwrap_or(false);
if h2 { if h2 {
(Box::new(sock) as Box<dyn Io>, Protocol::Http2) (Box::new(sock) as _, Protocol::Http2)
} else { } else {
(Box::new(sock) as Box<dyn Io>, Protocol::Http1) (Box::new(sock) as _, Protocol::Http1)
} }
}), }),
), ),

View File

@ -7,7 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use bytes::buf::BufMut; use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::{future::poll_fn, SinkExt, StreamExt}; use futures_util::{future::poll_fn, SinkExt as _};
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;
@ -19,7 +19,7 @@ use crate::http::{
use crate::message::{RequestHeadType, ResponseHead}; use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream}; use crate::payload::{Payload, PayloadStream};
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::connection::ConnectionType;
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
use super::pool::Acquired; use super::pool::Acquired;
use crate::body::{BodySize, MessageBody}; use crate::body::{BodySize, MessageBody};
@ -29,7 +29,7 @@ pub(crate) async fn send_request<T, B>(
mut head: RequestHeadType, mut head: RequestHeadType,
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, acquired: Acquired<T>,
) -> Result<(ResponseHead, Payload), SendRequestError> ) -> Result<(ResponseHead, Payload), SendRequestError>
where where
T: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
@ -42,9 +42,9 @@ where
if let Some(host) = head.as_ref().uri.host() { if let Some(host) = head.as_ref().uri.host() {
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
let _ = match head.as_ref().uri.port_u16() { match head.as_ref().uri.port_u16() {
None | Some(80) | Some(443) => write!(wrt, "{}", host), None | Some(80) | Some(443) => write!(wrt, "{}", host)?,
Some(port) => write!(wrt, "{}:{}", host, port), Some(port) => write!(wrt, "{}:{}", host, port)?,
}; };
match wrt.get_mut().split().freeze().try_into_value() { match wrt.get_mut().split().freeze().try_into_value() {
@ -64,7 +64,7 @@ where
let io = H1Connection { let io = H1Connection {
created, created,
pool, acquired,
io: Some(io), io: Some(io),
}; };
@ -77,10 +77,8 @@ where
let is_expect = if head.as_ref().headers.contains_key(EXPECT) { let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
match body.size() { match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => { BodySize::None | BodySize::Empty | BodySize::Sized(0) => {
let pin_framed = Pin::new(&mut framed); let keep_alive = framed.codec_ref().keepalive();
framed.io_mut().on_release(keep_alive);
let force_close = !pin_framed.codec_ref().keepalive();
release_connection(pin_framed, force_close);
// TODO: use a new variant or a new type better describing error violate // TODO: use a new variant or a new type better describing error violate
// `Requirements for clients` session of above RFC // `Requirements for clients` session of above RFC
@ -128,8 +126,9 @@ where
match pin_framed.codec_ref().message_type() { match pin_framed.codec_ref().message_type() {
h1::MessageType::None => { h1::MessageType::None => {
let force_close = !pin_framed.codec_ref().keepalive(); let keep_alive = pin_framed.codec_ref().keepalive();
release_connection(pin_framed, force_close); pin_framed.io_mut().on_release(keep_alive);
Ok((head, Payload::None)) Ok((head, Payload::None))
} }
_ => { _ => {
@ -151,12 +150,11 @@ where
framed.send((head, BodySize::None).into()).await?; framed.send((head, BodySize::None).into()).await?;
// read response // read response
if let (Some(result), framed) = framed.into_future().await { let head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx))
let head = result.map_err(SendRequestError::from)?; .await
Ok((head, framed)) .ok_or(ConnectError::Disconnected)??;
} else {
Err(SendRequestError::from(ConnectError::Disconnected)) Ok((head, framed))
}
} }
/// send request body to the peer /// send request body to the peer
@ -165,7 +163,7 @@ pub(crate) async fn send_body<T, B>(
mut framed: Pin<&mut Framed<T, h1::ClientCodec>>, mut framed: Pin<&mut Framed<T, h1::ClientCodec>>,
) -> Result<(), SendRequestError> ) -> Result<(), SendRequestError>
where where
T: ConnectionLifetime + Unpin, T: AsyncRead + AsyncWrite + Unpin + 'static,
B: MessageBody, B: MessageBody,
{ {
actix_rt::pin!(body); actix_rt::pin!(body);
@ -200,7 +198,7 @@ where
} }
} }
SinkExt::flush(Pin::into_inner(framed)).await?; framed.get_mut().flush().await?;
Ok(()) Ok(())
} }
@ -208,41 +206,37 @@ where
/// HTTP client connection /// HTTP client connection
pub struct H1Connection<T> pub struct H1Connection<T>
where where
T: AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
/// T should be `Unpin` /// T should be `Unpin`
io: Option<T>, io: Option<T>,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, acquired: Acquired<T>,
} }
impl<T> ConnectionLifetime for H1Connection<T> impl<T> H1Connection<T>
where where
T: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
fn on_release(&mut self, keep_alive: bool) {
if keep_alive {
self.release();
} else {
self.close();
}
}
/// Close connection /// Close connection
fn close(mut self: Pin<&mut Self>) { fn close(&mut self) {
if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() {
if let Some(io) = self.io.take() { self.acquired.close(ConnectionType::H1(io));
pool.close(IoConnection::new(
ConnectionType::H1(io),
self.created,
None,
));
}
} }
} }
/// Release this connection to the connection pool /// Release this connection to the connection pool
fn release(mut self: Pin<&mut Self>) { fn release(&mut self) {
if let Some(mut pool) = self.pool.take() { if let Some(io) = self.io.take() {
if let Some(io) = self.io.take() { self.acquired.release(ConnectionType::H1(io), self.created);
pool.release(IoConnection::new(
ConnectionType::H1(io),
self.created,
None,
));
}
} }
} }
} }
@ -282,13 +276,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T>
} }
#[pin_project::pin_project] #[pin_project::pin_project]
pub(crate) struct PlStream<Io> { pub(crate) struct PlStream<Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
#[pin] #[pin]
framed: Option<Framed<Io, h1::ClientPayloadCodec>>, framed: Option<Framed<H1Connection<Io>, h1::ClientPayloadCodec>>,
} }
impl<Io: ConnectionLifetime> PlStream<Io> { impl<Io> PlStream<Io>
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self { where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
PlStream { PlStream {
@ -297,24 +297,26 @@ impl<Io: ConnectionLifetime> PlStream<Io> {
} }
} }
impl<Io: ConnectionLifetime> Stream for PlStream<Io> { impl<Io> Stream for PlStream<Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Item = Result<Bytes, PayloadError>; type Item = Result<Bytes, PayloadError>;
fn poll_next( fn poll_next(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> { ) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut framed = self.project().framed.as_pin_mut().unwrap();
match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? { match framed.as_mut().next_item(cx)? {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Some(chunk)) => { Poll::Ready(Some(chunk)) => {
if let Some(chunk) = chunk { if let Some(chunk) = chunk {
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
} else { } else {
let framed = this.framed.as_mut().as_pin_mut().unwrap(); let keep_alive = framed.codec_ref().keepalive();
let force_close = !framed.codec_ref().keepalive(); framed.io_mut().on_release(keep_alive);
release_connection(framed, force_close);
Poll::Ready(None) Poll::Ready(None)
} }
} }
@ -322,14 +324,3 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
} }
} }
} }
fn release_connection<T, U>(framed: Pin<&mut Framed<T, U>>, force_close: bool)
where
T: ConnectionLifetime,
{
if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() {
framed.io_pin().release()
} else {
framed.io_pin().close()
}
}

View File

@ -17,7 +17,7 @@ use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::{ConnectionType, IoConnection}; use super::connection::ConnectionType;
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::Acquired; use super::pool::Acquired;
use crate::client::connection::H2Connection; use crate::client::connection::H2Connection;
@ -27,7 +27,7 @@ pub(crate) async fn send_request<T, B>(
head: RequestHeadType, head: RequestHeadType,
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, acquired: Acquired<T>,
) -> Result<(ResponseHead, Payload), SendRequestError> ) -> Result<(ResponseHead, Payload), SendRequestError>
where where
T: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
@ -103,13 +103,13 @@ where
let res = poll_fn(|cx| io.poll_ready(cx)).await; let res = poll_fn(|cx| io.poll_ready(cx)).await;
if let Err(e) = res { if let Err(e) = res {
release(io, pool, created, e.is_io()); release(io, acquired, created, e.is_io());
return Err(SendRequestError::from(e)); return Err(SendRequestError::from(e));
} }
let resp = match io.send_request(req, eof) { let resp = match io.send_request(req, eof) {
Ok((fut, send)) => { Ok((fut, send)) => {
release(io, pool, created, false); release(io, acquired, created, false);
if !eof { if !eof {
send_body(body, send).await?; send_body(body, send).await?;
@ -117,7 +117,7 @@ where
fut.await.map_err(SendRequestError::from)? fut.await.map_err(SendRequestError::from)?
} }
Err(e) => { Err(e) => {
release(io, pool, created, e.is_io()); release(io, acquired, created, e.is_io());
return Err(e.into()); return Err(e.into());
} }
}; };
@ -181,16 +181,14 @@ async fn send_body<B: MessageBody>(
/// release SendRequest object /// release SendRequest object
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>( fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
io: H2Connection, io: H2Connection,
pool: Option<Acquired<T>>, acquired: Acquired<T>,
created: time::Instant, created: time::Instant,
close: bool, close: bool,
) { ) {
if let Some(mut pool) = pool { if close {
if close { acquired.close(ConnectionType::H2(io));
pool.close(IoConnection::new(ConnectionType::H2(io), created, None)); } else {
} else { acquired.release(ConnectionType::H2(io), created);
pool.release(IoConnection::new(ConnectionType::H2(io), created, None));
}
} }
} }

View File

@ -14,7 +14,7 @@ pub use actix_tls::connect::{
Connect as TcpConnect, ConnectError as TcpConnectError, Connection as TcpConnection, Connect as TcpConnect, ConnectError as TcpConnectError, Connection as TcpConnection,
}; };
pub use self::connection::Connection; pub use self::connection::{Connection, ConnectionIo};
pub use self::connector::Connector; pub use self::connector::Connector;
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
pub use crate::Protocol; pub use crate::Protocol;

View File

@ -217,7 +217,7 @@ where
// construct acquired. It's used to put Io type back to pool/ close the Io type. // construct acquired. It's used to put Io type back to pool/ close the Io type.
// permit is carried with the whole lifecycle of Acquired. // permit is carried with the whole lifecycle of Acquired.
let acquired = Some(Acquired { key, inner, permit }); let acquired = Acquired { key, inner, permit };
// match the connection and spawn new one if did not get anything. // match the connection and spawn new one if did not get anything.
match conn { match conn {
@ -235,7 +235,7 @@ where
acquired, acquired,
)) ))
} else { } else {
let config = &acquired.as_ref().unwrap().inner.config; let config = &acquired.inner.config;
let (sender, connection) = handshake(io, config).await?; let (sender, connection) = handshake(io, config).await?;
Ok(IoConnection::new( Ok(IoConnection::new(
ConnectionType::H2(H2Connection::new(sender, connection)), ConnectionType::H2(H2Connection::new(sender, connection)),
@ -346,14 +346,12 @@ where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
/// Close the IO. /// Close the IO.
pub(crate) fn close(&mut self, conn: IoConnection<Io>) { pub(crate) fn close(&self, conn: ConnectionType<Io>) {
let (conn, _) = conn.into_inner();
self.inner.close(conn); self.inner.close(conn);
} }
/// Release IO back into pool. /// Release IO back into pool.
pub(crate) fn release(&mut self, conn: IoConnection<Io>) { pub(crate) fn release(&self, conn: ConnectionType<Io>, created: Instant) {
let (io, created) = conn.into_inner();
let Acquired { key, inner, .. } = self; let Acquired { key, inner, .. } = self;
inner inner
@ -362,12 +360,12 @@ where
.entry(key.clone()) .entry(key.clone())
.or_insert_with(VecDeque::new) .or_insert_with(VecDeque::new)
.push_back(PooledConnection { .push_back(PooledConnection {
conn: io, conn,
created, created,
used: Instant::now(), used: Instant::now(),
}); });
let _ = &mut self.permit; let _ = &self.permit;
} }
} }
@ -447,8 +445,8 @@ mod test {
where where
T: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
let (conn, created, mut acquired) = conn.into_parts(); let (conn, created, acquired) = conn.into_parts();
acquired.release(IoConnection::new(conn, created, None)); acquired.release(conn, created);
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -662,7 +662,7 @@ where
// got timeout during shutdown, drop connection // got timeout during shutdown, drop connection
if this.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);
// exceed deadline. check for any outstanding tasks // exceed deadline. check for any outstanding tasks
} else if timer.deadline() >= *this.ka_expire { } else if timer.deadline() >= *this.ka_expire {
// have no task at hand. // have no task at hand.
if this.state.is_empty() && this.write_buf.is_empty() { if this.state.is_empty() && this.write_buf.is_empty() {
@ -695,15 +695,15 @@ where
this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
this.state.set(State::None); this.state.set(State::None);
} }
// still have unfinished task. try to reset and register keep-alive. // still have unfinished task. try to reset and register keep-alive.
} else if let Some(deadline) = } else if let Some(deadline) =
this.codec.config().keep_alive_expire() this.codec.config().keep_alive_expire()
{ {
timer.as_mut().reset(deadline); timer.as_mut().reset(deadline);
let _ = timer.poll(cx); let _ = timer.poll(cx);
} }
// timer resolved but still have not met the keep-alive expire deadline. // timer resolved but still have not met the keep-alive expire deadline.
// reset and register for later wakeup. // reset and register for later wakeup.
} else { } else {
timer.as_mut().reset(*this.ka_expire); timer.as_mut().reset(*this.ka_expire);
let _ = timer.poll(cx); let _ = timer.poll(cx);
@ -953,14 +953,15 @@ mod tests {
use std::str; use std::str;
use actix_service::fn_service; use actix_service::fn_service;
use futures_util::future::{lazy, ready}; use futures_util::future::{lazy, ready, Ready};
use super::*; use super::*;
use crate::test::TestBuffer;
use crate::{error::Error, KeepAlive};
use crate::{ use crate::{
error::Error,
h1::{ExpectHandler, UpgradeHandler}, h1::{ExpectHandler, UpgradeHandler},
test::TestSeqBuffer, http::Method,
test::{TestBuffer, TestSeqBuffer},
HttpMessage, KeepAlive,
}; };
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> { fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
@ -1284,14 +1285,30 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_upgrade() { async fn test_upgrade() {
struct TestUpgrade;
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
type Response = ();
type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, (req, _framed): (Request, Framed<T, Codec>)) -> Self::Future {
assert_eq!(req.method(), Method::GET);
assert!(req.upgrade());
assert_eq!(req.headers().get("upgrade").unwrap(), "websocket");
ready(Ok(()))
}
}
lazy(|cx| { lazy(|cx| {
let mut buf = TestSeqBuffer::empty(); let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
let services = let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));
HttpFlow::new(ok_service(), ExpectHandler, Some(UpgradeHandler));
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new(
buf.clone(), buf.clone(),
cfg, cfg,
services, services,

View File

@ -1,5 +1,3 @@
use std::task::Poll;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_util::future::{ready, Ready};

View File

@ -1,8 +1,6 @@
use std::task::Poll;
use actix_codec::Framed; use actix_codec::Framed;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_core::future::LocalBoxFuture;
use crate::error::Error; use crate::error::Error;
use crate::h1::Codec; use crate::h1::Codec;
@ -16,7 +14,7 @@ impl<T> ServiceFactory<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Config = (); type Config = ();
type Service = UpgradeHandler; type Service = UpgradeHandler;
type InitError = Error; type InitError = Error;
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
unimplemented!() unimplemented!()
@ -26,11 +24,11 @@ impl<T> ServiceFactory<(Request, Framed<T, Codec>)> for UpgradeHandler {
impl<T> Service<(Request, Framed<T, Codec>)> for UpgradeHandler { impl<T> Service<(Request, Framed<T, Codec>)> for UpgradeHandler {
type Response = (); type Response = ();
type Error = Error; type Error = Error;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&self, _: (Request, Framed<T, Codec>)) -> Self::Future { fn call(&self, _: (Request, Framed<T, Codec>)) -> Self::Future {
ready(Ok(())) unimplemented!()
} }
} }

View File

@ -5,8 +5,10 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::Service; use actix_service::Service;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::ready; use futures_core::ready;
use h2::server::{Connection, SendResponse}; use h2::{
use h2::SendStream; server::{Connection, SendResponse},
SendStream,
};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use log::{error, trace}; use log::{error, trace};

View File

@ -13,7 +13,7 @@ use actix_service::{
use bytes::Bytes; use bytes::Bytes;
use futures_core::ready; use futures_core::ready;
use futures_util::future::ok; use futures_util::future::ok;
use h2::server::{self, Handshake}; use h2::server::{handshake, Handshake};
use log::error; use log::error;
use crate::body::MessageBody; use crate::body::MessageBody;
@ -307,7 +307,7 @@ where
Some(self.cfg.clone()), Some(self.cfg.clone()),
addr, addr,
on_connect_data, on_connect_data,
server::handshake(io), handshake(io),
), ),
} }
} }

View File

@ -1,15 +1,15 @@
use std::io; use std::io;
use bytes::{BufMut, BytesMut}; use bytes::BufMut;
use http::Version; use http::Version;
const DIGITS_START: u8 = b'0'; const DIGITS_START: u8 = b'0';
pub(crate) fn write_status_line(version: Version, n: u16, bytes: &mut BytesMut) { pub(crate) fn write_status_line<B: BufMut>(version: Version, n: u16, buf: &mut B) {
match version { match version {
Version::HTTP_11 => bytes.put_slice(b"HTTP/1.1 "), Version::HTTP_11 => buf.put_slice(b"HTTP/1.1 "),
Version::HTTP_10 => bytes.put_slice(b"HTTP/1.0 "), Version::HTTP_10 => buf.put_slice(b"HTTP/1.0 "),
Version::HTTP_09 => bytes.put_slice(b"HTTP/0.9 "), Version::HTTP_09 => buf.put_slice(b"HTTP/0.9 "),
_ => { _ => {
// other HTTP version handlers do not use this method // other HTTP version handlers do not use this method
} }
@ -19,33 +19,36 @@ pub(crate) fn write_status_line(version: Version, n: u16, bytes: &mut BytesMut)
let d10 = ((n / 10) % 10) as u8; let d10 = ((n / 10) % 10) as u8;
let d1 = (n % 10) as u8; let d1 = (n % 10) as u8;
bytes.put_u8(DIGITS_START + d100); buf.put_u8(DIGITS_START + d100);
bytes.put_u8(DIGITS_START + d10); buf.put_u8(DIGITS_START + d10);
bytes.put_u8(DIGITS_START + d1); buf.put_u8(DIGITS_START + d1);
// trailing space before reason // trailing space before reason
bytes.put_u8(b' '); buf.put_u8(b' ');
} }
/// NOTE: bytes object has to contain enough space /// NOTE: bytes object has to contain enough space
pub fn write_content_length(n: u64, bytes: &mut BytesMut) { pub fn write_content_length<B: BufMut>(n: u64, buf: &mut B) {
if n == 0 { if n == 0 {
bytes.put_slice(b"\r\ncontent-length: 0\r\n"); buf.put_slice(b"\r\ncontent-length: 0\r\n");
return; return;
} }
let mut buf = itoa::Buffer::new(); let mut buffer = itoa::Buffer::new();
bytes.put_slice(b"\r\ncontent-length: "); buf.put_slice(b"\r\ncontent-length: ");
bytes.put_slice(buf.format(n).as_bytes()); buf.put_slice(buffer.format(n).as_bytes());
bytes.put_slice(b"\r\n"); buf.put_slice(b"\r\n");
} }
pub(crate) struct Writer<'a>(pub &'a mut BytesMut); pub(crate) struct Writer<'a, B>(pub &'a mut B);
impl<'a> io::Write for Writer<'a> { impl<'a, B> io::Write for Writer<'a, B>
where
B: BufMut,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.extend_from_slice(buf); self.0.put_slice(buf);
Ok(buf.len()) Ok(buf.len())
} }
@ -58,6 +61,8 @@ impl<'a> io::Write for Writer<'a> {
mod tests { mod tests {
use std::str::from_utf8; use std::str::from_utf8;
use bytes::BytesMut;
use super::*; use super::*;
#[test] #[test]

View File

@ -1,14 +1,19 @@
use std::marker::PhantomData; use std::{
use std::pin::Pin; fmt,
use std::task::{Context, Poll}; future::Future,
use std::{fmt, net, rc::Rc}; marker::PhantomData,
net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Future}; use futures_core::ready;
use h2::server::{self, Handshake}; use h2::server::{handshake, Handshake};
use pin_project::pin_project; use pin_project::pin_project;
use crate::body::MessageBody; use crate::body::MessageBody;
@ -562,7 +567,7 @@ where
match proto { match proto {
Protocol::Http2 => HttpServiceHandlerResponse { Protocol::Http2 => HttpServiceHandlerResponse {
state: State::H2Handshake(Some(( state: State::H2Handshake(Some((
server::handshake(io), handshake(io),
self.cfg.clone(), self.cfg.clone(),
self.flow.clone(), self.flow.clone(),
on_connect_data, on_connect_data,

View File

@ -1,7 +1,7 @@
use time::{Date, OffsetDateTime, PrimitiveDateTime}; use time::{Date, OffsetDateTime, PrimitiveDateTime};
/// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime. /// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime.
pub fn parse_http_date(time: &str) -> Option<PrimitiveDateTime> { pub(crate) fn parse_http_date(time: &str) -> Option<PrimitiveDateTime> {
try_parse_rfc_1123(time) try_parse_rfc_1123(time)
.or_else(|| try_parse_rfc_850(time)) .or_else(|| try_parse_rfc_850(time))
.or_else(|| try_parse_asctime(time)) .or_else(|| try_parse_asctime(time))

View File

@ -6,7 +6,7 @@ use actix_service::ServiceFactoryExt;
use bytes::Bytes; use bytes::Bytes;
use futures_util::{ use futures_util::{
future::{self, ok}, future::{self, ok},
StreamExt, StreamExt as _,
}; };
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \

View File

@ -12,8 +12,11 @@ use actix_http::{body, Error, HttpService, Request, Response};
use actix_http_test::test_server; use actix_http_test::test_server;
use actix_service::{fn_service, ServiceFactoryExt}; use actix_service::{fn_service, ServiceFactoryExt};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_util::future::{err, ok, ready}; use futures_core::Stream;
use futures_util::stream::{once, Stream, StreamExt}; use futures_util::{
future::{err, ok, ready},
stream::{once, StreamExt as _},
};
use openssl::{ use openssl::{
pkey::PKey, pkey::PKey,
ssl::{SslAcceptor, SslMethod}, ssl::{SslAcceptor, SslMethod},

View File

@ -10,8 +10,9 @@ use actix_http_test::test_server;
use actix_service::{fn_factory_with_config, fn_service}; use actix_service::{fn_factory_with_config, fn_service};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::future::{self, err, ok}; use futures_util::future::{self, err, ok};
use futures_util::stream::{once, Stream, StreamExt}; use futures_util::stream::{once, StreamExt as _};
use rustls::{ use rustls::{
internal::pemfile::{certs, pkcs8_private_keys}, internal::pemfile::{certs, pkcs8_private_keys},
NoClientAuth, ServerConfig as RustlsServerConfig, NoClientAuth, ServerConfig as RustlsServerConfig,

View File

@ -7,7 +7,7 @@ use actix_rt::time::sleep;
use actix_service::fn_service; use actix_service::fn_service;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{self, err, ok, ready, FutureExt}; use futures_util::future::{self, err, ok, ready, FutureExt};
use futures_util::stream::{once, StreamExt}; use futures_util::stream::{once, StreamExt as _};
use regex::Regex; use regex::Regex;
use actix_http::HttpMessage; use actix_http::HttpMessage;

View File

@ -12,7 +12,7 @@ use actix_utils::dispatcher::Dispatcher;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future; use futures_util::future;
use futures_util::task::{Context, Poll}; use futures_util::task::{Context, Poll};
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt as _, StreamExt as _};
struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>); struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>);

View File

@ -66,12 +66,6 @@ serde_urlencoded = "0.7"
tls-openssl = { version = "0.10.9", package = "openssl", optional = true } tls-openssl = { version = "0.10.9", package = "openssl", optional = true }
tls-rustls = { version = "0.19.0", package = "rustls", optional = true, features = ["dangerous_configuration"] } tls-rustls = { version = "0.19.0", package = "rustls", optional = true, features = ["dangerous_configuration"] }
[target.'cfg(windows)'.dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
optional = true
[dev-dependencies] [dev-dependencies]
actix-web = { version = "4.0.0-beta.4", features = ["openssl"] } actix-web = { version = "4.0.0-beta.4", features = ["openssl"] }
actix-http = { version = "3.0.0-beta.4", features = ["openssl"] } actix-http = { version = "3.0.0-beta.4", features = ["openssl"] }

View File

@ -27,7 +27,7 @@ fn main() {
let res = client let res = client
.get("http://www.rust-lang.org") // <- Create request builder .get("http://www.rust-lang.org") // <- Create request builder
.header("User-Agent", "Actix-web") .insert_header(("User-Agent", "Actix-web"))
.send() // <- Send http request .send() // <- Send http request
.await; .await;

View File

@ -1,15 +1,16 @@
use std::{ use std::{
fmt,
future::Future, future::Future,
io, net, net,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_codec::Framed;
use actix_http::{ use actix_http::{
body::Body, body::Body,
client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, client::{
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
},
h1::ClientCodec, h1::ClientCodec,
Payload, RequestHead, RequestHeadType, ResponseHead, Payload, RequestHead, RequestHeadType, ResponseHead,
}; };
@ -123,7 +124,7 @@ impl<Fut, C, Io> Future for ConnectRequestFuture<Fut, Io>
where where
Fut: Future<Output = Result<C, ConnectError>>, Fut: Future<Output = Result<C, ConnectError>>,
C: Connection<Io = Io>, C: Connection<Io = Io>,
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: ConnectionIo,
{ {
type Output = Result<ConnectResponse, SendRequestError>; type Output = Result<ConnectResponse, SendRequestError>;
@ -138,14 +139,14 @@ where
let fut = ConnectRequestFuture::Client { let fut = ConnectRequestFuture::Client {
fut: connection.send_request(head, body), fut: connection.send_request(head, body),
}; };
self.as_mut().set(fut); self.set(fut);
} }
ConnectRequest::Tunnel(head, ..) => { ConnectRequest::Tunnel(head, ..) => {
// send request // send request
let fut = ConnectRequestFuture::Tunnel { let fut = ConnectRequestFuture::Tunnel {
fut: connection.open_tunnel(RequestHeadType::from(head)), fut: connection.open_tunnel(RequestHeadType::from(head)),
}; };
self.as_mut().set(fut); self.set(fut);
} }
} }
self.poll(cx) self.poll(cx)
@ -158,65 +159,11 @@ where
} }
ConnectRequestProj::Tunnel { fut } => { ConnectRequestProj::Tunnel { fut } => {
let (head, framed) = ready!(fut.as_mut().poll(cx))?; let (head, framed) = ready!(fut.as_mut().poll(cx))?;
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); let framed = framed.into_map_io(|io| Box::new(io) as _);
Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed))) Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
} }
} }
} }
} }
trait AsyncSocket { pub type BoxedSocket = Box<dyn ConnectionIo>;
fn as_read(&self) -> &(dyn AsyncRead + Unpin);
fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin);
fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin);
}
struct Socket<T: AsyncRead + AsyncWrite + Unpin>(T);
impl<T: AsyncRead + AsyncWrite + Unpin> AsyncSocket for Socket<T> {
fn as_read(&self) -> &(dyn AsyncRead + Unpin) {
&self.0
}
fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) {
&mut self.0
}
fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) {
&mut self.0
}
}
pub struct BoxedSocket(Box<dyn AsyncSocket>);
impl fmt::Debug for BoxedSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BoxedSocket")
}
}
impl AsyncRead for BoxedSocket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf)
}
}
impl AsyncWrite for BoxedSocket {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx)
}
}

View File

@ -1,6 +1,5 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
use std::task::Poll;
use actix_http::{Extensions, Request, Response}; use actix_http::{Extensions, Request, Response};
use actix_router::{Path, ResourceDef, Router, Url}; use actix_router::{Path, ResourceDef, Router, Url};

View File

@ -2,7 +2,6 @@ use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::rc::Rc; use std::rc::Rc;
use std::task::Poll;
use actix_http::{Error, Extensions, Response}; use actix_http::{Error, Extensions, Response};
use actix_router::IntoPattern; use actix_router::IntoPattern;

View File

@ -2,7 +2,6 @@ use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::rc::Rc; use std::rc::Rc;
use std::task::Poll;
use actix_http::Extensions; use actix_http::Extensions;
use actix_router::{ResourceDef, Router}; use actix_router::{ResourceDef, Router};

View File

@ -12,13 +12,6 @@ use actix_http::{
use actix_server::{Server, ServerBuilder}; use actix_server::{Server, ServerBuilder};
use actix_service::{map_config, IntoServiceFactory, Service, ServiceFactory}; use actix_service::{map_config, IntoServiceFactory, Service, ServiceFactory};
#[cfg(unix)]
use actix_http::Protocol;
#[cfg(unix)]
use actix_service::pipeline_factory;
#[cfg(unix)]
use futures_util::future::ok;
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
use actix_tls::accept::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use actix_tls::accept::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
@ -489,7 +482,9 @@ where
#[cfg(unix)] #[cfg(unix)]
/// Start listening for unix domain (UDS) connections on existing listener. /// Start listening for unix domain (UDS) connections on existing listener.
pub fn listen_uds(mut self, lst: std::os::unix::net::UnixListener) -> io::Result<Self> { pub fn listen_uds(mut self, lst: std::os::unix::net::UnixListener) -> io::Result<Self> {
use actix_http::Protocol;
use actix_rt::net::UnixStream; use actix_rt::net::UnixStream;
use actix_service::pipeline_factory;
let cfg = self.config.clone(); let cfg = self.config.clone();
let factory = self.factory.clone(); let factory = self.factory.clone();
@ -511,19 +506,22 @@ where
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then({ pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) })
let svc = HttpService::build() .and_then({
.keep_alive(c.keep_alive) let svc = HttpService::build()
.client_timeout(c.client_timeout); .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout);
let svc = if let Some(handler) = on_connect_fn.clone() { let svc = if let Some(handler) = on_connect_fn.clone() {
svc.on_connect_ext(move |io: &_, ext: _| (&*handler)(io as &dyn Any, ext)) svc.on_connect_ext(move |io: &_, ext: _| {
} else { (&*handler)(io as &dyn Any, ext)
svc })
}; } else {
svc
};
svc.finish(map_config(factory(), move |_| config.clone())) svc.finish(map_config(factory(), move |_| config.clone()))
}) })
})?; })?;
Ok(self) Ok(self)
} }
@ -534,7 +532,9 @@ where
where where
A: AsRef<std::path::Path>, A: AsRef<std::path::Path>,
{ {
use actix_http::Protocol;
use actix_rt::net::UnixStream; use actix_rt::net::UnixStream;
use actix_service::pipeline_factory;
let cfg = self.config.clone(); let cfg = self.config.clone();
let factory = self.factory.clone(); let factory = self.factory.clone();
@ -555,12 +555,13 @@ where
socket_addr, socket_addr,
c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)),
); );
pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then( pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) })
HttpService::build() .and_then(
.keep_alive(c.keep_alive) HttpService::build()
.client_timeout(c.client_timeout) .keep_alive(c.keep_alive)
.finish(map_config(factory(), move |_| config.clone())), .client_timeout(c.client_timeout)
) .finish(map_config(factory(), move |_| config.clone())),
)
}, },
)?; )?;
Ok(self) Ok(self)

View File

@ -20,7 +20,7 @@ use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest};
/// // extract path info from "/{name}/{count}/index.html" into tuple /// // extract path info from "/{name}/{count}/index.html" into tuple
/// // {name} - deserialize a String /// // {name} - deserialize a String
/// // {count} - deserialize a u32 /// // {count} - deserialize a u32
/// #[get("/")] /// #[get("/{name}/{count}/index.html")]
/// async fn index(path: web::Path<(String, u32)>) -> String { /// async fn index(path: web::Path<(String, u32)>) -> String {
/// let (name, count) = path.into_inner(); /// let (name, count) = path.into_inner();
/// format!("Welcome {}! {}", name, count) /// format!("Welcome {}! {}", name, count)
@ -40,7 +40,7 @@ use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest};
/// } /// }
/// ///
/// // extract `Info` from a path using serde /// // extract `Info` from a path using serde
/// #[get("/")] /// #[get("/{name}")]
/// async fn index(info: web::Path<Info>) -> String { /// async fn index(info: web::Path<Info>) -> String {
/// format!("Welcome {}!", info.name) /// format!("Welcome {}!", info.name)
/// } /// }