Merge branch 'master' into feature/test_websocket_with_config

This commit is contained in:
Rob Ede 2021-03-31 14:05:39 +01:00 committed by GitHub
commit e78715f4cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 258 additions and 497 deletions

3
.cargo/config.toml Normal file
View File

@ -0,0 +1,3 @@
[alias]
chk = "hack check --workspace --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"

View File

@ -23,6 +23,9 @@ jobs:
name: ${{ matrix.target.name }} / ${{ matrix.version }}
runs-on: ${{ matrix.target.os }}
env:
VCPKGRS_DYNAMIC: 1
steps:
- uses: actions/checkout@v2
@ -65,7 +68,13 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: hack
args: --clean-per-run check --workspace --no-default-features --tests
args: check --workspace --no-default-features
- name: check minimal + tests
uses: actions-rs/cargo@v1
with:
command: hack
args: check --workspace --no-default-features --tests --examples
- name: check full
uses: actions-rs/cargo@v1

View File

@ -80,11 +80,11 @@ required-features = ["rustls"]
actix-codec = "0.4.0-beta.1"
actix-macros = "0.2.0"
actix-router = "0.2.7"
actix-rt = "2.1"
actix-rt = "2.2"
actix-server = "2.0.0-beta.3"
actix-service = "2.0.0-beta.4"
actix-utils = "3.0.0-beta.2"
actix-tls = { version = "3.0.0-beta.4", default-features = false, optional = true }
actix-tls = { version = "3.0.0-beta.5", default-features = false, optional = true }
actix-web-codegen = "0.5.0-beta.2"
actix-http = "3.0.0-beta.4"

View File

@ -33,5 +33,5 @@ mime_guess = "2.0.1"
percent-encoding = "2.1"
[dev-dependencies]
actix-rt = "2.1"
actix-rt = "2.2"
actix-web = "4.0.0-beta.4"

View File

@ -31,9 +31,9 @@ openssl = ["tls-openssl", "awc/openssl"]
[dependencies]
actix-service = "2.0.0-beta.4"
actix-codec = "0.4.0-beta.1"
actix-tls = "3.0.0-beta.4"
actix-tls = "3.0.0-beta.5"
actix-utils = "3.0.0-beta.2"
actix-rt = "2.1"
actix-rt = "2.2"
actix-server = "2.0.0-beta.3"
awc = { version = "3.0.0-beta.3", default-features = false }

View File

@ -6,7 +6,7 @@
* `client::ConnectorService` as `client::Connector::finish` method's return type [#2081]
* `client::ConnectionIo` trait alias [#2081]
### Chaged
### Changed
* `client::Connector` type now only have one generic type for `actix_service::Service`. [#2063]
[#2063]: https://github.com/actix/actix-web/pull/2063

View File

@ -47,8 +47,8 @@ trust-dns = ["trust-dns-resolver"]
actix-service = "2.0.0-beta.4"
actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.2"
actix-rt = "2.1"
actix-tls = "3.0.0-beta.4"
actix-rt = "2.2"
actix-tls = "3.0.0-beta.5"
ahash = "0.7"
base64 = "0.13"
@ -89,7 +89,7 @@ trust-dns-resolver = { version = "0.20.0", optional = true }
[dev-dependencies]
actix-server = "2.0.0-beta.3"
actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] }
actix-tls = { version = "3.0.0-beta.4", features = ["openssl"] }
actix-tls = { version = "3.0.0-beta.5", features = ["openssl"] }
criterion = "0.3"
env_logger = "0.8"
rcgen = "0.8"

View File

@ -63,11 +63,9 @@ where
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{
/// Set server keep-alive setting.
///
@ -127,7 +125,6 @@ where
X1: ServiceFactory<Request, Config = (), Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
<X1::Service as Service<Request>>::Future: 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
@ -152,7 +149,6 @@ where
U1: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
<U1::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
@ -211,7 +207,6 @@ where
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,
@ -233,7 +228,6 @@ where
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,

View File

@ -9,7 +9,7 @@ use std::{
};
use actix_rt::{
net::TcpStream,
net::{ActixStream, TcpStream},
time::{sleep, Sleep},
};
use actix_service::Service;
@ -119,7 +119,7 @@ impl<S> Connector<S> {
/// Use custom connector.
pub fn connector<S1, Io1>(self, connector: S1) -> Connector<S1>
where
Io1: ConnectionIo + fmt::Debug,
Io1: ActixStream + fmt::Debug + 'static,
S1: Service<
TcpConnect<Uri>,
Response = TcpConnection<Uri, Io1>,
@ -136,7 +136,14 @@ impl<S> Connector<S> {
impl<S, Io> Connector<S>
where
Io: ConnectionIo + fmt::Debug,
// Note:
// Input Io type is bound to ActixStream trait but internally in client module they
// are bound to ConnectionIo trait alias. And latter is the trait exposed to public
// in the form of Box<dyn ConnectionIo> type.
//
// This remap is to hide ActixStream's trait methods. They are not meant to be called
// from user code.
Io: ActixStream + fmt::Debug + 'static,
S: Service<
TcpConnect<Uri>,
Response = TcpConnection<Uri, Io>,
@ -407,16 +414,14 @@ struct TlsConnectorService<S, St> {
timeout: Duration,
}
impl<S, St, Io, Res> Service<Connect> for TlsConnectorService<S, St>
impl<S, St, Io> Service<Connect> for TlsConnectorService<S, St>
where
S: Service<Connect, Response = TcpConnection<Uri, Io>, Error = ConnectError>
+ Clone
+ 'static,
St: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error>
+ Clone
+ 'static,
St: Service<TcpConnection<Uri, Io>, Error = std::io::Error> + Clone + 'static,
Io: ConnectionIo,
Res: IntoConnectionIo,
St::Response: IntoConnectionIo,
{
type Response = (Box<dyn ConnectionIo>, Protocol);
type Error = ConnectError;
@ -471,10 +476,10 @@ where
Error = std::io::Error,
Future = Fut2,
>,
S::Response: IntoConnectionIo,
Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
Fut2: Future<Output = Result<S::Response, S::Error>>,
Io: ConnectionIo,
Res: IntoConnectionIo,
{
type Output = Result<(Box<dyn ConnectionIo>, Protocol), ConnectError>;

View File

@ -7,7 +7,7 @@ use std::{
use actix_codec::Framed;
use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_core::{ready, Stream};
use futures_util::{future::poll_fn, SinkExt as _};
use crate::error::PayloadError;
@ -17,7 +17,7 @@ use crate::http::{
StatusCode,
};
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream};
use crate::payload::Payload;
use super::connection::{ConnectionIo, H1Connection};
use super::error::{ConnectError, SendRequestError};
@ -122,10 +122,7 @@ where
Ok((head, Payload::None))
}
_ => {
let pl: PayloadStream = Box::pin(PlStream::new(framed));
Ok((head, pl.into()))
}
_ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))),
}
}
@ -194,21 +191,16 @@ where
}
#[pin_project::pin_project]
pub(crate) struct PlStream<Io: ConnectionIo>
where
Io: ConnectionIo,
{
pub(crate) struct PlStream<Io: ConnectionIo> {
#[pin]
framed: Option<Framed<H1Connection<Io>, h1::ClientPayloadCodec>>,
framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
}
impl<Io: ConnectionIo> PlStream<Io> {
fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
PlStream {
framed: Some(framed),
}
PlStream { framed }
}
}
@ -219,20 +211,16 @@ impl<Io: ConnectionIo> Stream for PlStream<Io> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut framed = self.project().framed.as_pin_mut().unwrap();
let mut this = self.project();
match framed.as_mut().next_item(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(chunk)) => {
if let Some(chunk) = chunk {
Poll::Ready(Some(Ok(chunk)))
} else {
let keep_alive = framed.codec_ref().keepalive();
framed.io_mut().on_release(keep_alive);
Poll::Ready(None)
}
match ready!(this.framed.as_mut().next_item(cx)?) {
Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
Some(None) => {
let keep_alive = this.framed.codec_ref().keepalive();
this.framed.io_mut().on_release(keep_alive);
Poll::Ready(None)
}
Poll::Ready(None) => Poll::Ready(None),
None => Poll::Ready(None),
}
}
}

View File

@ -1,6 +1,4 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{fmt, net};
@ -8,7 +6,7 @@ use std::{fmt, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use futures_core::ready;
use futures_core::future::LocalBoxFuture;
use futures_util::future::ready;
use crate::body::MessageBody;
@ -16,7 +14,7 @@ use crate::config::ServiceConfig;
use crate::error::{DispatchError, Error};
use crate::request::Request;
use crate::response::Response;
use crate::service::HttpFlow;
use crate::service::HttpServiceHandler;
use crate::{ConnectCallback, OnConnectData};
use super::codec::Codec;
@ -60,14 +58,17 @@ where
impl<S, B, X, U> H1Service<TcpStream, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<TcpStream, Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -94,17 +95,21 @@ mod openssl {
use super::*;
use actix_service::ServiceFactoryExt;
use actix_tls::accept::openssl::{Acceptor, SslAcceptor, SslError, TlsStream};
use actix_tls::accept::TlsError;
use actix_tls::accept::{
openssl::{Acceptor, SslAcceptor, SslError, TlsStream},
TlsError,
};
impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
@ -112,6 +117,7 @@ mod openssl {
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -143,19 +149,25 @@ mod openssl {
#[cfg(feature = "rustls")]
mod rustls {
use super::*;
use std::io;
use actix_service::ServiceFactoryExt;
use actix_tls::accept::rustls::{Acceptor, ServerConfig, TlsStream};
use actix_tls::accept::TlsError;
use std::{fmt, io};
use actix_tls::accept::{
rustls::{Acceptor, ServerConfig, TlsStream},
TlsError,
};
impl<S, B, X, U> H1Service<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
@ -163,6 +175,7 @@ mod rustls {
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -241,16 +254,19 @@ where
impl<T, S, B, X, U> ServiceFactory<(T, Option<net::SocketAddr>)>
for H1Service<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin + 'static,
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -259,148 +275,50 @@ where
type Config = ();
type Service = H1ServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = ();
type Future = H1ServiceResponse<T, S, B, X, U>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
H1ServiceResponse {
fut: self.srv.new_service(()),
fut_ex: Some(self.expect.new_service(())),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
expect: None,
upgrade: None,
on_connect_ext: self.on_connect_ext.clone(),
cfg: Some(self.cfg.clone()),
_phantom: PhantomData,
}
}
}
let service = self.srv.new_service(());
let expect = self.expect.new_service(());
let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
let on_connect_ext = self.on_connect_ext.clone();
let cfg = self.cfg.clone();
#[doc(hidden)]
#[pin_project::pin_project]
pub struct H1ServiceResponse<T, S, B, X, U>
where
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::InitError: fmt::Debug,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
#[pin]
fut: S::Future,
#[pin]
fut_ex: Option<X::Future>,
#[pin]
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: Option<ServiceConfig>,
_phantom: PhantomData<B>,
}
Box::pin(async move {
let expect = expect
.await
.map_err(|e| log::error!("Init http expect service error: {:?}", e))?;
impl<T, S, B, X, U> Future for H1ServiceResponse<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
S::InitError: fmt::Debug,
B: MessageBody,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
type Output = Result<H1ServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
let upgrade = match upgrade {
Some(upgrade) => {
let upgrade = upgrade.await.map_err(|e| {
log::error!("Init http upgrade service error: {:?}", e)
})?;
Some(upgrade)
}
None => None,
};
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
let service = service
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
if let Some(fut) = this.fut_ex.as_pin_mut() {
let expect = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.expect = Some(expect);
this.fut_ex.set(None);
}
if let Some(fut) = this.fut_upg.as_pin_mut() {
let upgrade = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.upgrade = Some(upgrade);
this.fut_upg.set(None);
}
let result = ready!(this
.fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Poll::Ready(result.map(|service| {
let this = self.as_mut().project();
H1ServiceHandler::new(
this.cfg.take().unwrap(),
Ok(H1ServiceHandler::new(
cfg,
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect_ext.clone(),
)
}))
expect,
upgrade,
on_connect_ext,
))
})
}
}
/// `Service` implementation for HTTP/1 transport
pub struct H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> H1ServiceHandler<T, S, B, X, U> {
H1ServiceHandler {
flow: HttpFlow::new(service, expect, upgrade),
cfg,
on_connect_ext,
_phantom: PhantomData,
}
}
}
pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)>
for H1ServiceHandler<T, S, B, X, U>
for HttpServiceHandler<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
@ -417,47 +335,10 @@ where
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.flow
.expect
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready();
let ready = self
.flow
.service
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready;
let ready = if let Some(ref upg) = self.flow.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP/1 service readiness error: {:?}", e);
DispatchError::Service(e)
})
}
fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {

View File

@ -11,8 +11,8 @@ use actix_service::{
ServiceFactory,
};
use bytes::Bytes;
use futures_core::ready;
use futures_util::future::ok;
use futures_core::{future::LocalBoxFuture, ready};
use futures_util::future::ready;
use h2::server::{handshake, Handshake};
use log::error;
@ -65,6 +65,7 @@ where
impl<S, B> H2Service<TcpStream, S, B>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
@ -80,11 +81,11 @@ where
Error = DispatchError,
InitError = S::InitError,
> {
pipeline_factory(fn_factory(|| async {
Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
pipeline_factory(fn_factory(|| {
ready(Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
let peer_addr = io.peer_addr().ok();
ok::<_, DispatchError>((io, peer_addr))
}))
ready(Ok::<_, DispatchError>((io, peer_addr)))
})))
}))
.and_then(self)
}
@ -101,6 +102,7 @@ mod openssl {
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
@ -123,10 +125,12 @@ mod openssl {
.map_init_err(|_| panic!()),
)
.and_then(fn_factory(|| {
ok::<_, S::InitError>(fn_service(|io: TlsStream<TcpStream>| {
let peer_addr = io.get_ref().peer_addr().ok();
ok((io, peer_addr))
}))
ready(Ok::<_, S::InitError>(fn_service(
|io: TlsStream<TcpStream>| {
let peer_addr = io.get_ref().peer_addr().ok();
ready(Ok((io, peer_addr)))
},
)))
}))
.and_then(self.map_err(TlsError::Service))
}
@ -144,6 +148,7 @@ mod rustls {
impl<S, B> H2Service<TlsStream<TcpStream>, S, B>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
@ -169,10 +174,12 @@ mod rustls {
.map_init_err(|_| panic!()),
)
.and_then(fn_factory(|| {
ok::<_, S::InitError>(fn_service(|io: TlsStream<TcpStream>| {
let peer_addr = io.get_ref().0.peer_addr().ok();
ok((io, peer_addr))
}))
ready(Ok::<_, S::InitError>(fn_service(
|io: TlsStream<TcpStream>| {
let peer_addr = io.get_ref().0.peer_addr().ok();
ready(Ok((io, peer_addr)))
},
)))
}))
.and_then(self.map_err(TlsError::Service))
}
@ -181,8 +188,9 @@ mod rustls {
impl<T, S, B> ServiceFactory<(T, Option<net::SocketAddr>)> for H2Service<T, S, B>
where
T: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin + 'static,
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
@ -193,52 +201,16 @@ where
type Config = ();
type Service = H2ServiceHandler<T, S::Service, B>;
type InitError = S::InitError;
type Future = H2ServiceResponse<T, S, B>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
H2ServiceResponse {
fut: self.srv.new_service(()),
cfg: Some(self.cfg.clone()),
on_connect_ext: self.on_connect_ext.clone(),
_phantom: PhantomData,
}
}
}
let service = self.srv.new_service(());
let cfg = self.cfg.clone();
let on_connect_ext = self.on_connect_ext.clone();
#[doc(hidden)]
#[pin_project::pin_project]
pub struct H2ServiceResponse<T, S, B>
where
S: ServiceFactory<Request>,
{
#[pin]
fut: S::Future,
cfg: Option<ServiceConfig>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_phantom: PhantomData<B>,
}
impl<T, S, B> Future for H2ServiceResponse<T, S, B>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Error> + 'static,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
{
type Output = Result<H2ServiceHandler<T, S::Service, B>, S::InitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
this.fut.poll(cx).map_ok(|service| {
let this = self.as_mut().project();
H2ServiceHandler::new(
this.cfg.take().unwrap(),
this.on_connect_ext.clone(),
service,
)
Box::pin(async move {
let service = service.await?;
Ok(H2ServiceHandler::new(cfg, on_connect_ext, service))
})
}
}

View File

@ -1,18 +1,20 @@
use std::fmt::{self, Display};
use std::io::Write;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
fmt,
io::Write,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use bytes::buf::BufMut;
use bytes::BytesMut;
use http::header::{HeaderValue, InvalidHeaderValue};
use time::{offset, OffsetDateTime, PrimitiveDateTime};
use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
use crate::error::ParseError;
use crate::header::IntoHeaderValue;
use crate::time_parser;
/// A timestamp with HTTP formatting and parsing
/// A timestamp with HTTP formatting and parsing.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct HttpDate(OffsetDateTime);
@ -27,18 +29,12 @@ impl FromStr for HttpDate {
}
}
impl Display for HttpDate {
impl fmt::Display for HttpDate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0.format("%a, %d %b %Y %H:%M:%S GMT"), f)
}
}
impl From<OffsetDateTime> for HttpDate {
fn from(dt: OffsetDateTime) -> HttpDate {
HttpDate(dt)
}
}
impl From<SystemTime> for HttpDate {
fn from(sys: SystemTime) -> HttpDate {
HttpDate(PrimitiveDateTime::from(sys).assume_utc())
@ -54,7 +50,7 @@ impl IntoHeaderValue for HttpDate {
wrt,
"{}",
self.0
.to_offset(offset!(UTC))
.to_offset(UtcOffset::UTC)
.format("%a, %d %b %Y %H:%M:%S GMT")
)
.unwrap();

View File

@ -12,7 +12,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use bytes::Bytes;
use futures_core::ready;
use futures_core::{future::LocalBoxFuture, ready};
use h2::server::{handshake, Handshake};
use pin_project::pin_project;
@ -107,7 +107,6 @@ where
X1: ServiceFactory<Request, Config = (), Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
<X1::Service as Service<Request>>::Future: 'static,
{
HttpService {
expect,
@ -128,7 +127,6 @@ where
U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
<U1::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
HttpService {
upgrade,
@ -150,23 +148,24 @@ where
impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TcpStream, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TcpStream, h1::Codec>)>>::Future: 'static,
{
/// Create simple tcp stream service
pub fn tcp(
@ -196,23 +195,24 @@ mod openssl {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TlsStream<TcpStream>, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static,
{
/// Create openssl based service
pub fn openssl(
@ -261,23 +261,24 @@ mod rustls {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TlsStream<TcpStream>, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static,
{
/// Create openssl based service
pub fn rustls(
@ -319,137 +320,117 @@ mod rustls {
impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)>
for HttpService<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin + 'static,
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
type Response = ();
type Error = DispatchError;
type Config = ();
type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = ();
type Future = HttpServiceResponse<T, S, B, X, U>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
HttpServiceResponse {
fut: self.srv.new_service(()),
fut_ex: Some(self.expect.new_service(())),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
expect: None,
upgrade: None,
on_connect_ext: self.on_connect_ext.clone(),
cfg: self.cfg.clone(),
_phantom: PhantomData,
}
}
}
let service = self.srv.new_service(());
let expect = self.expect.new_service(());
let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
let on_connect_ext = self.on_connect_ext.clone();
let cfg = self.cfg.clone();
#[doc(hidden)]
#[pin_project]
pub struct HttpServiceResponse<T, S, B, X, U>
where
S: ServiceFactory<Request>,
X: ServiceFactory<Request>,
U: ServiceFactory<(Request, Framed<T, h1::Codec>)>,
{
#[pin]
fut: S::Future,
#[pin]
fut_ex: Option<X::Future>,
#[pin]
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
Box::pin(async move {
let expect = expect
.await
.map_err(|e| log::error!("Init http expect service error: {:?}", e))?;
impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request>,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
type Output =
Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
let upgrade = match upgrade {
Some(upgrade) => {
let upgrade = upgrade.await.map_err(|e| {
log::error!("Init http upgrade service error: {:?}", e)
})?;
Some(upgrade)
}
None => None,
};
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
let service = service
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
if let Some(fut) = this.fut_ex.as_pin_mut() {
let expect = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.expect = Some(expect);
this.fut_ex.set(None);
}
if let Some(fut) = this.fut_upg.as_pin_mut() {
let upgrade = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.upgrade = Some(upgrade);
this.fut_upg.set(None);
}
let result = ready!(this
.fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Poll::Ready(result.map(|service| {
let this = self.as_mut().project();
HttpServiceHandler::new(
this.cfg.clone(),
Ok(HttpServiceHandler::new(
cfg,
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect_ext.clone(),
)
}))
expect,
upgrade,
on_connect_ext,
))
})
}
}
/// `Service` implementation for HTTP transport
/// `Service` implementation for HTTP/1 and HTTP/2 transport
pub struct HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, h1::Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
pub(super) flow: Rc<HttpFlow<S, X, U>>,
pub(super) cfg: ServiceConfig,
pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
X: Service<Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>)>,
U::Error: Into<Error>,
{
pub(super) fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?;
ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?;
if let Some(ref upg) = self.flow.upgrade {
ready!(upg.poll_ready(cx).map_err(Into::into))?;
};
Poll::Ready(Ok(()))
}
}
/// A collection of services that describe an HTTP request flow.
pub(super) struct HttpFlow<S, X, U> {
pub(super) service: S,
@ -467,34 +448,6 @@ impl<S, X, U> HttpFlow<S, X, U> {
}
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
}
impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)>
for HttpServiceHandler<T, S, B, X, U>
where
@ -514,47 +467,10 @@ where
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.flow
.expect
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready();
let ready = self
.flow
.service
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready;
let ready = if let Some(ref upg) = self.flow.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP service readiness error: {:?}", e);
DispatchError::Service(e)
})
}
fn call(

View File

@ -3,6 +3,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
@ -11,7 +12,6 @@ use actix_service::{fn_factory, Service};
use actix_utils::dispatcher::Dispatcher;
use bytes::Bytes;
use futures_util::future;
use futures_util::task::{Context, Poll};
use futures_util::{SinkExt as _, StreamExt as _};
struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>);

View File

@ -28,7 +28,7 @@ mime = "0.3"
twoway = "0.2"
[dev-dependencies]
actix-rt = "2.1"
actix-rt = "2.2"
actix-http = "3.0.0-beta.4"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"

View File

@ -28,7 +28,7 @@ pin-project = "1.0.0"
tokio = { version = "1", features = ["sync"] }
[dev-dependencies]
actix-rt = "2.1"
actix-rt = "2.2"
awc = { version = "3.0.0-beta.3", default-features = false }
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false }

View File

@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] }
proc-macro2 = "1"
[dev-dependencies]
actix-rt = "2.1"
actix-rt = "2.2"
actix-web = "4.0.0-beta.4"
futures-util = { version = "0.3.7", default-features = false }
trybuild = "1"

View File

@ -71,7 +71,7 @@ actix-http = { version = "3.0.0-beta.4", features = ["openssl"] }
actix-http-test = { version = "3.0.0-beta.3", features = ["openssl"] }
actix-utils = "3.0.0-beta.1"
actix-server = "2.0.0-beta.3"
actix-tls = { version = "3.0.0-beta.4", features = ["openssl", "rustls"] }
actix-tls = { version = "3.0.0-beta.5", features = ["openssl", "rustls"] }
brotli2 = "0.3.2"
env_logger = "0.8"

View File

@ -4,12 +4,11 @@ use std::net::IpAddr;
use std::rc::Rc;
use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite};
use actix_http::{
client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection},
http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri},
};
use actix_rt::net::TcpStream;
use actix_rt::net::{ActixStream, TcpStream};
use actix_service::{boxed, Service};
use crate::connect::DefaultConnector;
@ -64,7 +63,7 @@ where
S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
+ Clone
+ 'static,
Io: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
Io: ActixStream + fmt::Debug + 'static,
{
/// Use custom connector service.
pub fn connector<S1, Io1>(self, connector: Connector<S1>) -> ClientBuilder<S1, M>
@ -75,7 +74,7 @@ where
Error = TcpConnectError,
> + Clone
+ 'static,
Io1: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
Io1: ActixStream + fmt::Debug + 'static,
{
ClientBuilder {
middleware: self.middleware,

View File

@ -1,21 +1,19 @@
digraph {
subgraph cluster_net {
label="actix/actix-net";
"actix-codec"
"actix-macros"
"actix-rt"
"actix-server"
"actix-service"
"actix-threadpool"
"actix-tls"
"actix-tracing"
"actix-utils"
"actix-router"
"actix-codec" "actix-macros" "actix-rt" "actix-server" "actix-service"
"actix-tls" "actix-tracing" "actix-utils" "actix-router"
"local-channel" "local-waker"
}
"actix-utils" -> { "actix-service" "actix-rt" "actix-codec" }
"actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" }
"actix-utils" -> { "local-waker" }
"actix-tracing" -> { "actix-service" }
"actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" }
"actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" }
"actix-rt" -> { "actix-macros" "actix-threadpool" }
"actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" }
"actix-rt" -> { "actix-macros" "tokio" }
"local-channel" -> { "local-waker" }
"tokio" [fontcolor = darkgreen]
}