remove unneed type bound.

This commit is contained in:
fakeshadow 2021-05-13 14:53:44 +08:00
parent 94fa95d969
commit 49d2e3f02d
2 changed files with 67 additions and 74 deletions

View File

@ -1,5 +1,12 @@
use std::task::{Context, Poll}; use std::{
use std::{cmp, future::Future, marker::PhantomData, net, pin::Pin, rc::Rc}; cmp,
future::Future,
marker::PhantomData,
net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::Service; use actix_service::Service;
@ -9,6 +16,7 @@ use futures_core::ready;
use h2::server::{Connection, SendResponse}; use h2::server::{Connection, SendResponse};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use log::{error, trace}; use log::{error, trace};
use pin_project_lite::pin_project;
use crate::body::{BodySize, MessageBody}; use crate::body::{BodySize, MessageBody};
use crate::config::ServiceConfig; use crate::config::ServiceConfig;
@ -22,30 +30,19 @@ use crate::OnConnectData;
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
/// Dispatcher for HTTP/2 protocol. pin_project! {
#[pin_project::pin_project] /// Dispatcher for HTTP/2 protocol.
pub struct Dispatcher<T, S, B, X, U> pub struct Dispatcher<T, S, B, X, U> {
where flow: Rc<HttpFlow<S, X, U>>,
T: AsyncRead + AsyncWrite + Unpin, connection: Connection<T, Bytes>,
S: Service<Request>, on_connect_data: OnConnectData,
B: MessageBody, config: ServiceConfig,
{ peer_addr: Option<net::SocketAddr>,
flow: Rc<HttpFlow<S, X, U>>, _phantom: PhantomData<B>,
connection: Connection<T, Bytes>, }
on_connect_data: OnConnectData,
config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
_phantom: PhantomData<B>,
} }
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> {
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
{
pub(crate) fn new( pub(crate) fn new(
flow: Rc<HttpFlow<S, X, U>>, flow: Rc<HttpFlow<S, X, U>>,
connection: Connection<T, Bytes>, connection: Connection<T, Bytes>,
@ -53,7 +50,7 @@ where
config: ServiceConfig, config: ServiceConfig,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
Dispatcher { Self {
flow, flow,
config, config,
peer_addr, peer_addr,
@ -82,62 +79,58 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
loop { while let Some((req, tx)) =
match ready!(Pin::new(&mut this.connection).poll_accept(cx)) { ready!(Pin::new(&mut this.connection).poll_accept(cx)).transpose()?
None => return Poll::Ready(Ok(())), {
let (parts, body) = req.into_parts();
let pl = crate::h2::Payload::new(body);
let pl = Payload::<crate::payload::PayloadStream>::H2(pl);
let mut req = Request::with_payload(pl);
Some(Err(err)) => return Poll::Ready(Err(err.into())), let head = req.head_mut();
head.uri = parts.uri;
head.method = parts.method;
head.version = parts.version;
head.headers = parts.headers.into();
head.peer_addr = this.peer_addr;
Some(Ok((req, tx))) => { // merge on_connect_ext data into request extensions
let (parts, body) = req.into_parts(); this.on_connect_data.merge_into(&mut req);
let pl = crate::h2::Payload::new(body);
let pl = Payload::<crate::payload::PayloadStream>::H2(pl);
let mut req = Request::with_payload(pl);
let head = req.head_mut(); let fut = this.flow.service.call(req);
head.uri = parts.uri; let config = this.config.clone();
head.method = parts.method;
head.version = parts.version;
head.headers = parts.headers.into();
head.peer_addr = this.peer_addr;
// merge on_connect_ext data into request extensions // multiplex request handling with spawn task
this.on_connect_data.merge_into(&mut req); actix_rt::spawn(async move {
// resolve service call and send response.
let res = match fut.await {
Ok(res) => {
let (res, body) = res.into().replace_body(());
handle_response(res, body, tx, config).await
}
Err(err) => {
let (res, body) =
Response::from_error(err.into()).replace_body(());
handle_response(res, body, tx, config).await
}
};
let fut = this.flow.service.call(req); // log error.
let config = this.config.clone(); if let Err(err) = res {
match err {
// multiplex request handling with spawn task DispatchError::SendResponse(err) => {
actix_rt::spawn(async move { trace!("Error sending HTTP/2 response: {:?}", err)
// resolve service call and send response.
let res = match fut.await {
Ok(res) => {
let (res, body) = res.into().replace_body(());
handle_response(res, body, tx, config).await
}
Err(err) => {
let (res, body) =
Response::from_error(err.into()).replace_body(());
handle_response(res, body, tx, config).await
}
};
// log error.
if let Err(err) = res {
match err {
DispatchError::SendResponse(err) => {
trace!("Error sending HTTP/2 response: {:?}", err)
}
DispatchError::SendData(err) => warn!("{:?}", err),
DispatchError::ResponseBody(err) => {
error!("Response payload stream error: {:?}", err)
}
}
} }
}); DispatchError::SendData(err) => warn!("{:?}", err),
DispatchError::ResponseBody(err) => {
error!("Response payload stream error: {:?}", err)
}
}
} }
} });
} }
Poll::Ready(Ok(()))
} }
} }

View File

@ -84,7 +84,7 @@ impl<B> Response<B> {
pub fn with_body(status: StatusCode, body: B) -> Response<B> { pub fn with_body(status: StatusCode, body: B) -> Response<B> {
Response { Response {
head: BoxedResponseHead::new(status), head: BoxedResponseHead::new(status),
body: body, body,
} }
} }