merge master

This commit is contained in:
Ali MJ Al-Nasrawy 2021-12-17 19:52:55 +03:00
commit f2cc57c563
6 changed files with 193 additions and 162 deletions

View File

@ -45,7 +45,7 @@ __compress = []
actix-service = "2.0.0"
actix-codec = "0.4.1"
actix-utils = "3.0.0"
actix-rt = "2.2"
actix-rt = { version = "2.2", default-features = false }
ahash = "0.7"
base64 = "0.13"
@ -66,7 +66,6 @@ local-channel = "0.1"
log = "0.4"
mime = "0.3"
percent-encoding = "2.1"
pin-project = "1.0.0"
pin-project-lite = "0.2"
rand = "0.8"
sha-1 = "0.9"

View File

@ -332,31 +332,28 @@ impl From<PayloadError> for Error {
}
/// A set of errors that can occur during dispatching HTTP requests.
#[derive(Debug, Display, Error, From)]
#[non_exhaustive]
#[derive(Debug, Display, From)]
pub enum DispatchError {
/// Service error
// FIXME: display and error type
/// Service error.
#[display(fmt = "Service Error")]
Service(#[error(not(source))] Response<BoxBody>),
Service(Response<BoxBody>),
/// Body error
// FIXME: display and error type
#[display(fmt = "Body Error")]
Body(#[error(not(source))] Box<dyn StdError>),
/// Body streaming error.
#[display(fmt = "Body error: {}", _0)]
Body(Box<dyn StdError>),
/// Upgrade service error
/// Upgrade service error.
Upgrade,
/// An `io::Error` that occurred while trying to read or write to a network stream.
#[display(fmt = "IO error: {}", _0)]
Io(io::Error),
/// Http request parse error.
#[display(fmt = "Parse error: {}", _0)]
/// Request parse error.
#[display(fmt = "Request parse error: {}", _0)]
Parse(ParseError),
/// Http/2 error
/// HTTP/2 error.
#[display(fmt = "{}", _0)]
H2(h2::Error),
@ -368,21 +365,23 @@ pub enum DispatchError {
#[display(fmt = "Connection shutdown timeout")]
DisconnectTimeout,
/// Payload is not consumed
#[display(fmt = "Task is completed but request's payload is not consumed")]
PayloadIsNotConsumed,
/// Malformed request
#[display(fmt = "Malformed request")]
MalformedRequest,
/// Internal error
/// Internal error.
#[display(fmt = "Internal error")]
InternalError,
}
/// Unknown error
#[display(fmt = "Unknown error")]
Unknown,
impl StdError for DispatchError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self {
// TODO: error source extraction?
DispatchError::Service(_res) => None,
DispatchError::Body(err) => Some(&**err),
DispatchError::Io(err) => Some(err),
DispatchError::Parse(err) => Some(err),
DispatchError::H2(err) => Some(err),
_ => None,
}
}
}
/// A set of error that can occur during parsing content type.

View File

@ -15,7 +15,7 @@ use bitflags::bitflags;
use bytes::{Buf, BytesMut};
use futures_core::ready;
use log::{error, trace};
use pin_project::pin_project;
use pin_project_lite::pin_project;
use crate::{
body::{BodySize, BoxBody, MessageBody},
@ -46,79 +46,111 @@ bitflags! {
}
}
#[pin_project]
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
// there's 2 versions of Dispatcher state because of:
// https://github.com/taiki-e/pin-project-lite/issues/3
//
// tl;dr: pin-project-lite doesn't play well with other attribute macros
B: MessageBody,
#[cfg(not(test))]
pin_project! {
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
B: MessageBody,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
#[pin]
inner: DispatcherState<T, S, B, X, U>,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
#[cfg(test)]
poll_count: u64,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
#[pin]
inner: DispatcherState<T, S, B, X, U>,
}
}
#[pin_project(project = DispatcherStateProj)]
enum DispatcherState<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
#[cfg(test)]
pin_project! {
/// Dispatcher for HTTP/1.1 protocol
pub struct Dispatcher<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
B: MessageBody,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
Normal(#[pin] InnerDispatcher<T, S, B, X, U>),
Upgrade(#[pin] U::Future),
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
#[pin]
inner: DispatcherState<T, S, B, X, U>,
// used in tests
poll_count: u64,
}
}
#[pin_project(project = InnerDispatcherProj)]
struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
pin_project! {
#[project = DispatcherStateProj]
enum DispatcherState<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
B: MessageBody,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
flow: Rc<HttpFlow<S, X, U>>,
flags: Flags,
peer_addr: Option<net::SocketAddr>,
conn_data: Option<Rc<Extensions>>,
error: Option<DispatchError>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
Normal { #[pin] inner: InnerDispatcher<T, S, B, X, U> },
Upgrade { #[pin] fut: U::Future },
}
}
#[pin]
state: State<S, B, X>,
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,
pin_project! {
#[project = InnerDispatcherProj]
struct InnerDispatcher<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
ka_expire: Instant,
#[pin]
ka_timer: Option<Sleep>,
B: MessageBody,
io: Option<T>,
read_buf: BytesMut,
write_buf: BytesMut,
codec: Codec,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
flow: Rc<HttpFlow<S, X, U>>,
flags: Flags,
peer_addr: Option<net::SocketAddr>,
conn_data: Option<Rc<Extensions>>,
error: Option<DispatchError>,
#[pin]
state: State<S, B, X>,
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,
ka_expire: Instant,
#[pin]
ka_timer: Option<Sleep>,
io: Option<T>,
read_buf: BytesMut,
write_buf: BytesMut,
codec: Codec,
}
}
enum DispatcherMessage {
@ -127,19 +159,21 @@ enum DispatcherMessage {
Error(Response<()>),
}
#[pin_project(project = StateProj)]
enum State<S, B, X>
where
S: Service<Request>,
X: Service<Request, Response = Request>,
pin_project! {
#[project = StateProj]
enum State<S, B, X>
where
S: Service<Request>,
X: Service<Request, Response = Request>,
B: MessageBody,
{
None,
ExpectCall(#[pin] X::Future),
ServiceCall(#[pin] S::Future),
SendPayload(#[pin] B),
SendErrorPayload(#[pin] BoxBody),
B: MessageBody,
{
None,
ExpectCall { #[pin] fut: X::Future },
ServiceCall { #[pin] fut: S::Future },
SendPayload { #[pin] body: B },
SendErrorPayload { #[pin] body: BoxBody },
}
}
impl<S, B, X> State<S, B, X>
@ -198,25 +232,27 @@ where
};
Dispatcher {
inner: DispatcherState::Normal(InnerDispatcher {
flow,
flags,
peer_addr,
conn_data: conn_data.0.map(Rc::new),
error: None,
inner: DispatcherState::Normal {
inner: InnerDispatcher {
flow,
flags,
peer_addr,
conn_data: conn_data.0.map(Rc::new),
error: None,
state: State::None,
payload: None,
messages: VecDeque::new(),
state: State::None,
payload: None,
messages: VecDeque::new(),
ka_expire,
ka_timer,
ka_expire,
ka_timer,
io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
codec: Codec::new(config),
}),
io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
codec: Codec::new(config),
},
},
#[cfg(test)]
poll_count: 0,
@ -316,7 +352,7 @@ where
let size = self.as_mut().send_response_inner(message, &body)?;
let state = match size {
BodySize::None | BodySize::Sized(0) => State::None,
_ => State::SendPayload(body),
_ => State::SendPayload { body },
};
self.project().state.set(state);
Ok(())
@ -330,7 +366,7 @@ where
let size = self.as_mut().send_response_inner(message, &body)?;
let state = match size {
BodySize::None | BodySize::Sized(0) => State::None,
_ => State::SendErrorPayload(body),
_ => State::SendErrorPayload { body },
};
self.project().state.set(state);
Ok(())
@ -356,12 +392,12 @@ where
// Handle `EXPECT: 100-Continue` header
if req.head().expect() {
// set InnerDispatcher state and continue loop to poll it.
let task = this.flow.expect.call(req);
this.state.set(State::ExpectCall(task));
let fut = this.flow.expect.call(req);
this.state.set(State::ExpectCall { fut });
} else {
// the same as expect call.
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut });
};
}
@ -381,7 +417,7 @@ where
// all messages are dealt with.
None => return Ok(PollResponse::DoNothing),
},
StateProj::ServiceCall(fut) => match fut.poll(cx) {
StateProj::ServiceCall { fut } => match fut.poll(cx) {
// service call resolved. send response.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
@ -407,11 +443,11 @@ where
}
},
StateProj::SendPayload(mut stream) => {
StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
.encode(Message::Chunk(Some(item)), this.write_buf)?;
@ -437,13 +473,13 @@ where
return Ok(PollResponse::DrainWriteBuf);
}
StateProj::SendErrorPayload(mut stream) => {
StateProj::SendErrorPayload { mut body } => {
// TODO: de-dupe impl with SendPayload
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
match stream.as_mut().poll_next(cx) {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
.encode(Message::Chunk(Some(item)), this.write_buf)?;
@ -458,7 +494,7 @@ where
}
Poll::Ready(Some(Err(err))) => {
return Err(DispatchError::Service(
return Err(DispatchError::Body(
Error::new_body().with_cause(err).into(),
))
}
@ -471,14 +507,14 @@ where
return Ok(PollResponse::DrainWriteBuf);
}
StateProj::ExpectCall(fut) => match fut.poll(cx) {
StateProj::ExpectCall { fut } => match fut.poll(cx) {
// expect resolved. write continue to buffer and set InnerDispatcher state
// to service call.
Poll::Ready(Ok(req)) => {
this.write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall(fut));
this.state.set(State::ServiceCall { fut });
}
// send expect error as response
@ -504,25 +540,25 @@ where
let mut this = self.as_mut().project();
if req.head().expect() {
// set dispatcher state so the future is pinned.
let task = this.flow.expect.call(req);
this.state.set(State::ExpectCall(task));
let fut = this.flow.expect.call(req);
this.state.set(State::ExpectCall { fut });
} else {
// the same as above.
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut });
};
// eagerly poll the future for once(or twice if expect is resolved immediately).
loop {
match self.as_mut().project().state.project() {
StateProj::ExpectCall(fut) => {
StateProj::ExpectCall { fut } => {
match fut.poll(cx) {
// expect is resolved. continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
let mut this = self.as_mut().project();
let task = this.flow.service.call(req);
this.state.set(State::ServiceCall(task));
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut });
continue;
}
// future is pending. return Ok(()) to notify that a new state is
@ -538,7 +574,7 @@ where
}
}
}
StateProj::ServiceCall(fut) => {
StateProj::ServiceCall { fut } => {
// return no matter the service call future's result.
return match fut.poll(cx) {
// future is resolved. send response and return a result. On success
@ -903,7 +939,7 @@ where
}
match this.inner.project() {
DispatcherStateProj::Normal(mut inner) => {
DispatcherStateProj::Normal { mut inner } => {
inner.as_mut().poll_keepalive(cx)?;
if inner.flags.contains(Flags::SHUTDOWN) {
@ -943,7 +979,7 @@ where
self.as_mut()
.project()
.inner
.set(DispatcherState::Upgrade(upgrade));
.set(DispatcherState::Upgrade { fut: upgrade });
return self.poll(cx);
}
};
@ -995,8 +1031,8 @@ where
}
}
}
DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e);
DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
error!("Upgrade handler error: {}", err);
DispatchError::Upgrade
}),
}
@ -1090,7 +1126,7 @@ mod tests {
Poll::Ready(res) => assert!(res.is_err()),
}
if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
assert!(inner.flags.contains(Flags::READ_DISCONNECT));
assert_eq!(
&inner.project().io.take().unwrap().write_buf[..26],
@ -1125,7 +1161,7 @@ mod tests {
actix_rt::pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("first poll should not be pending"),
@ -1135,7 +1171,7 @@ mod tests {
// polls: initial => shutdown
assert_eq!(h1.poll_count, 2);
if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
let res = &mut inner.project().io.take().unwrap().write_buf[..];
stabilize_date_header(res);
@ -1179,7 +1215,7 @@ mod tests {
actix_rt::pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("first poll should not be pending"),
@ -1189,7 +1225,7 @@ mod tests {
// polls: initial => shutdown
assert_eq!(h1.poll_count, 1);
if let DispatcherStateProj::Normal(inner) = h1.project().inner.project() {
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
let res = &mut inner.project().io.take().unwrap().write_buf[..];
stabilize_date_header(res);
@ -1239,13 +1275,13 @@ mod tests {
actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_pending());
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
// polls: manual
assert_eq!(h1.poll_count, 1);
eprintln!("poll count: {}", h1.poll_count);
if let DispatcherState::Normal(ref inner) = h1.inner {
if let DispatcherState::Normal { ref inner } = h1.inner {
let io = inner.io.as_ref().unwrap();
let res = &io.write_buf()[..];
assert_eq!(
@ -1260,7 +1296,7 @@ mod tests {
// polls: manual manual shutdown
assert_eq!(h1.poll_count, 3);
if let DispatcherState::Normal(ref inner) = h1.inner {
if let DispatcherState::Normal { ref inner } = h1.inner {
let io = inner.io.as_ref().unwrap();
let mut res = (&io.write_buf()[..]).to_owned();
stabilize_date_header(&mut res);
@ -1311,12 +1347,12 @@ mod tests {
actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
// polls: manual shutdown
assert_eq!(h1.poll_count, 2);
if let DispatcherState::Normal(ref inner) = h1.inner {
if let DispatcherState::Normal { ref inner } = h1.inner {
let io = inner.io.as_ref().unwrap();
let mut res = (&io.write_buf()[..]).to_owned();
stabilize_date_header(&mut res);
@ -1388,7 +1424,7 @@ mod tests {
actix_rt::pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Upgrade(_)));
assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. }));
// polls: manual shutdown
assert_eq!(h1.poll_count, 2);

View File

@ -356,9 +356,9 @@ where
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP/1 service readiness error: {:?}", e);
DispatchError::Service(e)
self._poll_ready(cx).map_err(|err| {
log::error!("HTTP/1 service readiness error: {:?}", err);
DispatchError::Service(err)
})
}

View File

@ -493,9 +493,9 @@ where
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP service readiness error: {:?}", e);
DispatchError::Service(e)
self._poll_ready(cx).map_err(|err| {
log::error!("HTTP service readiness error: {:?}", err);
DispatchError::Service(err)
})
}

View File

@ -322,13 +322,10 @@ pin_project! {
}
}
impl<B> MessageBody for StreamLog<B>
where
B: MessageBody,
B::Error: Into<Error>,
{
type Error = Error;
impl<B: MessageBody> MessageBody for StreamLog<B> {
type Error = B::Error;
#[inline]
fn size(&self) -> BodySize {
self.body.size()
}
@ -344,7 +341,7 @@ where
*this.size += chunk.len();
Poll::Ready(Some(Ok(chunk)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}