mirror of https://github.com/fafhrd91/actix-web
Merge branch 'master' into tokio0.3
This commit is contained in:
commit
3239dab8ea
|
@ -132,8 +132,8 @@ where
|
|||
B: MessageBody,
|
||||
{
|
||||
None,
|
||||
ExpectCall(Pin<Box<X::Future>>),
|
||||
ServiceCall(Pin<Box<S::Future>>),
|
||||
ExpectCall(#[pin] X::Future),
|
||||
ServiceCall(#[pin] S::Future),
|
||||
SendPayload(#[pin] ResponseBody<B>),
|
||||
}
|
||||
|
||||
|
@ -349,7 +349,7 @@ where
|
|||
self: Pin<&mut Self>,
|
||||
message: Response<()>,
|
||||
body: ResponseBody<B>,
|
||||
) -> Result<State<S, B, X>, DispatchError> {
|
||||
) -> Result<(), DispatchError> {
|
||||
let mut this = self.project();
|
||||
this.codec
|
||||
.encode(Message::Item((message, body.size())), &mut this.write_buf)
|
||||
|
@ -362,9 +362,10 @@ where
|
|||
|
||||
this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
|
||||
match body.size() {
|
||||
BodySize::None | BodySize::Empty => Ok(State::None),
|
||||
_ => Ok(State::SendPayload(body)),
|
||||
}
|
||||
BodySize::None | BodySize::Empty => this.state.set(State::None),
|
||||
_ => this.state.set(State::SendPayload(body)),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_continue(self: Pin<&mut Self>) {
|
||||
|
@ -379,49 +380,52 @@ where
|
|||
) -> Result<PollResponse, DispatchError> {
|
||||
loop {
|
||||
let mut this = self.as_mut().project();
|
||||
let state = match this.state.project() {
|
||||
// state is not changed on Poll::Pending.
|
||||
// other variant and conditions always trigger a state change(or an error).
|
||||
let state_change = match this.state.project() {
|
||||
StateProj::None => match this.messages.pop_front() {
|
||||
Some(DispatcherMessage::Item(req)) => {
|
||||
Some(self.as_mut().handle_request(req, cx)?)
|
||||
self.as_mut().handle_request(req, cx)?;
|
||||
true
|
||||
}
|
||||
Some(DispatcherMessage::Error(res)) => Some(
|
||||
Some(DispatcherMessage::Error(res)) => {
|
||||
self.as_mut()
|
||||
.send_response(res, ResponseBody::Other(Body::Empty))?,
|
||||
),
|
||||
.send_response(res, ResponseBody::Other(Body::Empty))?;
|
||||
true
|
||||
}
|
||||
Some(DispatcherMessage::Upgrade(req)) => {
|
||||
return Ok(PollResponse::Upgrade(req));
|
||||
}
|
||||
None => None,
|
||||
None => false,
|
||||
},
|
||||
StateProj::ExpectCall(fut) => match fut.as_mut().poll(cx) {
|
||||
StateProj::ExpectCall(fut) => match fut.poll(cx) {
|
||||
Poll::Ready(Ok(req)) => {
|
||||
self.as_mut().send_continue();
|
||||
this = self.as_mut().project();
|
||||
this.state
|
||||
.set(State::ServiceCall(Box::pin(this.service.call(req))));
|
||||
this.state.set(State::ServiceCall(this.service.call(req)));
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
let res: Response = e.into().into();
|
||||
let (res, body) = res.replace_body(());
|
||||
Some(self.as_mut().send_response(res, body.into_body())?)
|
||||
self.as_mut().send_response(res, body.into_body())?;
|
||||
true
|
||||
}
|
||||
Poll::Pending => None,
|
||||
Poll::Pending => false,
|
||||
},
|
||||
StateProj::ServiceCall(fut) => match fut.as_mut().poll(cx) {
|
||||
StateProj::ServiceCall(fut) => match fut.poll(cx) {
|
||||
Poll::Ready(Ok(res)) => {
|
||||
let (res, body) = res.into().replace_body(());
|
||||
let state = self.as_mut().send_response(res, body)?;
|
||||
this = self.as_mut().project();
|
||||
this.state.set(state);
|
||||
self.as_mut().send_response(res, body)?;
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
let res: Response = e.into().into();
|
||||
let (res, body) = res.replace_body(());
|
||||
Some(self.as_mut().send_response(res, body.into_body())?)
|
||||
self.as_mut().send_response(res, body.into_body())?;
|
||||
true
|
||||
}
|
||||
Poll::Pending => None,
|
||||
Poll::Pending => false,
|
||||
},
|
||||
StateProj::SendPayload(mut stream) => {
|
||||
loop {
|
||||
|
@ -456,11 +460,8 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
this = self.as_mut().project();
|
||||
|
||||
// set new state
|
||||
if let Some(state) = state {
|
||||
this.state.set(state);
|
||||
// state is changed and continue when the state is not Empty
|
||||
if state_change {
|
||||
if !self.state.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
@ -485,16 +486,36 @@ where
|
|||
mut self: Pin<&mut Self>,
|
||||
req: Request,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Result<State<S, B, X>, DispatchError> {
|
||||
) -> Result<(), DispatchError> {
|
||||
// Handle `EXPECT: 100-Continue` header
|
||||
let req = if req.head().expect() {
|
||||
let mut task = Box::pin(self.as_mut().project().expect.call(req));
|
||||
match task.as_mut().poll(cx) {
|
||||
if req.head().expect() {
|
||||
// set dispatcher state so the future is pinned.
|
||||
let task = self.as_mut().project().expect.call(req);
|
||||
self.as_mut().project().state.set(State::ExpectCall(task));
|
||||
} else {
|
||||
// the same as above.
|
||||
let task = self.as_mut().project().service.call(req);
|
||||
self.as_mut().project().state.set(State::ServiceCall(task));
|
||||
};
|
||||
|
||||
// eagerly poll the future for once(or twice if expect is resolved immediately).
|
||||
loop {
|
||||
match self.as_mut().project().state.project() {
|
||||
StateProj::ExpectCall(fut) => {
|
||||
match fut.poll(cx) {
|
||||
// expect is resolved. continue loop and poll the service call branch.
|
||||
Poll::Ready(Ok(req)) => {
|
||||
self.as_mut().send_continue();
|
||||
req
|
||||
let task = self.as_mut().project().service.call(req);
|
||||
self.as_mut().project().state.set(State::ServiceCall(task));
|
||||
continue;
|
||||
}
|
||||
Poll::Pending => return Ok(State::ExpectCall(task)),
|
||||
// future is pending. return Ok(()) to notify that a new state is
|
||||
// set and the outer loop should be continue.
|
||||
Poll::Pending => return Ok(()),
|
||||
// future is error. send response and return a result. On success
|
||||
// to notify the dispatcher a new state is set and the outer loop
|
||||
// should be continue.
|
||||
Poll::Ready(Err(e)) => {
|
||||
let e = e.into();
|
||||
let res: Response = e.into();
|
||||
|
@ -502,23 +523,31 @@ where
|
|||
return self.send_response(res, body.into_body());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
req
|
||||
};
|
||||
|
||||
// Call service
|
||||
let mut task = Box::pin(self.as_mut().project().service.call(req));
|
||||
match task.as_mut().poll(cx) {
|
||||
}
|
||||
StateProj::ServiceCall(fut) => {
|
||||
// return no matter the service call future's result.
|
||||
return match fut.poll(cx) {
|
||||
// future is resolved. send response and return a result. On success
|
||||
// to notify the dispatcher a new state is set and the outer loop
|
||||
// should be continue.
|
||||
Poll::Ready(Ok(res)) => {
|
||||
let (res, body) = res.into().replace_body(());
|
||||
self.send_response(res, body)
|
||||
}
|
||||
Poll::Pending => Ok(State::ServiceCall(task)),
|
||||
// see the comment on ExpectCall state branch's Pending.
|
||||
Poll::Pending => Ok(()),
|
||||
// see the comment on ExpectCall state branch's Ready(Err(e)).
|
||||
Poll::Ready(Err(e)) => {
|
||||
let res: Response = e.into().into();
|
||||
let (res, body) = res.replace_body(());
|
||||
self.send_response(res, body.into_body())
|
||||
}
|
||||
};
|
||||
}
|
||||
_ => unreachable!(
|
||||
"State must be set to ServiceCall or ExceptCall in handle_request"
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -568,9 +597,8 @@ where
|
|||
|
||||
// handle request early
|
||||
if this.state.is_empty() {
|
||||
let state = self.as_mut().handle_request(req, cx)?;
|
||||
self.as_mut().handle_request(req, cx)?;
|
||||
this = self.as_mut().project();
|
||||
this.state.set(state);
|
||||
} else {
|
||||
this.messages.push_back(DispatcherMessage::Item(req));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue