merge master

This commit is contained in:
fakeshadow 2021-03-30 17:32:09 +08:00
commit 44e7a86c73
5 changed files with 109 additions and 236 deletions

3
.cargo/config.toml Normal file
View File

@ -0,0 +1,3 @@
[alias]
chk = "hack check --workspace --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"

View File

@ -6,7 +6,7 @@ use std::{fmt, net};
use actix_codec::Framed;
use actix_rt::net::{ActixStream, TcpStream};
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use futures_core::future::LocalBoxFuture;
use futures_util::future::ready;
use crate::body::MessageBody;
@ -14,7 +14,7 @@ use crate::config::ServiceConfig;
use crate::error::{DispatchError, Error};
use crate::request::Request;
use crate::response::Response;
use crate::service::HttpFlow;
use crate::service::HttpServiceHandler;
use crate::{ConnectCallback, OnConnectData};
use super::codec::Codec;
@ -315,47 +315,10 @@ where
}
/// `Service` implementation for HTTP/1 transport
pub struct H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> H1ServiceHandler<T, S, B, X, U> {
H1ServiceHandler {
flow: HttpFlow::new(service, expect, upgrade),
cfg,
on_connect_ext,
_phantom: PhantomData,
}
}
}
pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)>
for H1ServiceHandler<T, S, B, X, U>
for HttpServiceHandler<T, S, B, X, U>
where
T: ActixStream,
S: Service<Request>,
@ -372,27 +335,10 @@ where
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.flow.expect.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http expect service readiness error: {:?}", e);
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP/1 service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
if let Some(ref upg) = self.flow.upgrade {
ready!(upg.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http upgrade service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
};
ready!(self.flow.service.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
Poll::Ready(Ok(()))
})
}
fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {

View File

@ -1,18 +1,20 @@
use std::fmt::{self, Display};
use std::io::Write;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
fmt,
io::Write,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use bytes::buf::BufMut;
use bytes::BytesMut;
use http::header::{HeaderValue, InvalidHeaderValue};
use time::{offset, OffsetDateTime, PrimitiveDateTime};
use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
use crate::error::ParseError;
use crate::header::IntoHeaderValue;
use crate::time_parser;
/// A timestamp with HTTP formatting and parsing
/// A timestamp with HTTP formatting and parsing.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct HttpDate(OffsetDateTime);
@ -27,18 +29,12 @@ impl FromStr for HttpDate {
}
}
impl Display for HttpDate {
impl fmt::Display for HttpDate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0.format("%a, %d %b %Y %H:%M:%S GMT"), f)
}
}
impl From<OffsetDateTime> for HttpDate {
fn from(dt: OffsetDateTime) -> HttpDate {
HttpDate(dt)
}
}
impl From<SystemTime> for HttpDate {
fn from(sys: SystemTime) -> HttpDate {
HttpDate(PrimitiveDateTime::from(sys).assume_utc())
@ -54,7 +50,7 @@ impl IntoHeaderValue for HttpDate {
wrt,
"{}",
self.0
.to_offset(offset!(UTC))
.to_offset(UtcOffset::UTC)
.format("%a, %d %b %Y %H:%M:%S GMT")
)
.unwrap();

View File

@ -12,7 +12,7 @@ use actix_codec::Framed;
use actix_rt::net::{ActixStream, TcpStream};
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use bytes::Bytes;
use futures_core::ready;
use futures_core::{future::LocalBoxFuture, ready};
use h2::server::{handshake, Handshake};
use pin_project::pin_project;
@ -148,12 +148,14 @@ where
impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
@ -161,6 +163,7 @@ where
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -192,12 +195,14 @@ mod openssl {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
@ -205,6 +210,7 @@ mod openssl {
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -255,12 +261,14 @@ mod rustls {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<
@ -268,6 +276,7 @@ mod rustls {
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -311,17 +320,20 @@ mod rustls {
impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)>
for HttpService<T, S, B, X, U>
where
T: ActixStream,
T: ActixStream + 'static,
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
{
@ -330,114 +342,95 @@ where
type Config = ();
type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = ();
type Future = HttpServiceResponse<T, S, B, X, U>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
HttpServiceResponse {
fut: self.srv.new_service(()),
fut_ex: Some(self.expect.new_service(())),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
expect: None,
upgrade: None,
on_connect_ext: self.on_connect_ext.clone(),
cfg: self.cfg.clone(),
_phantom: PhantomData,
}
}
let service = self.srv.new_service(());
let expect = self.expect.new_service(());
let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
let on_connect_ext = self.on_connect_ext.clone();
let cfg = self.cfg.clone();
Box::pin(async move {
let expect = expect
.await
.map_err(|e| log::error!("Init http expect service error: {:?}", e))?;
let upgrade = match upgrade {
Some(upgrade) => {
let upgrade = upgrade.await.map_err(|e| {
log::error!("Init http upgrade service error: {:?}", e)
})?;
Some(upgrade)
}
None => None,
};
#[doc(hidden)]
#[pin_project]
pub struct HttpServiceResponse<T, S, B, X, U>
where
S: ServiceFactory<Request>,
X: ServiceFactory<Request>,
U: ServiceFactory<(Request, Framed<T, h1::Codec>)>,
{
#[pin]
fut: S::Future,
#[pin]
fut_ex: Option<X::Future>,
#[pin]
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
let service = service
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U>
where
T: ActixStream,
S: ServiceFactory<Request>,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
{
type Output =
Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if let Some(fut) = this.fut_ex.as_pin_mut() {
let expect = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.expect = Some(expect);
this.fut_ex.set(None);
}
if let Some(fut) = this.fut_upg.as_pin_mut() {
let upgrade = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.upgrade = Some(upgrade);
this.fut_upg.set(None);
}
let result = ready!(this
.fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Poll::Ready(result.map(|service| {
let this = self.as_mut().project();
HttpServiceHandler::new(
this.cfg.clone(),
Ok(HttpServiceHandler::new(
cfg,
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect_ext.clone(),
)
}))
expect,
upgrade,
on_connect_ext,
))
})
}
}
/// `Service` implementation for HTTP transport
/// `Service` implementation for HTTP/1 and HTTP/2 transport
pub struct HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, h1::Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
pub(super) flow: Rc<HttpFlow<S, X, U>>,
pub(super) cfg: ServiceConfig,
pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
X: Service<Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>)>,
U::Error: Into<Error>,
{
pub(super) fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?;
ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?;
if let Some(ref upg) = self.flow.upgrade {
ready!(upg.poll_ready(cx).map_err(Into::into))?;
};
Poll::Ready(Ok(()))
}
}
/// A collection of services that describe an HTTP request flow.
pub(super) struct HttpFlow<S, X, U> {
pub(super) service: S,
@ -455,34 +448,6 @@ impl<S, X, U> HttpFlow<S, X, U> {
}
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
}
impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)>
for HttpServiceHandler<T, S, B, X, U>
where
@ -502,47 +467,10 @@ where
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.flow
.expect
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready();
let ready = self
.flow
.service
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready;
let ready = if let Some(ref upg) = self.flow.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
}
fn call(

View File

@ -7,7 +7,7 @@ digraph {
}
"actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" }
"actix-utils" -> { "actix-rt" "actix-service" "local-waker" }
"actix-utils" -> { "local-waker" }
"actix-tracing" -> { "actix-service" }
"actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" }
"actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" }