re add eager poll for dispatched futures

This commit is contained in:
fakeshadow 2020-12-18 05:57:02 +08:00
parent f4c5f79b56
commit 6c5b0c67f3
1 changed files with 72 additions and 9 deletions

View File

@ -127,6 +127,8 @@ where
ExpectCall(#[pin] X::Future), ExpectCall(#[pin] X::Future),
ServiceCall(#[pin] S::Future), ServiceCall(#[pin] S::Future),
SendPayload(#[pin] ResponseBody<B>), SendPayload(#[pin] ResponseBody<B>),
// A special state hinting the state should not be updated.
NoOp,
} }
impl<S, B, X> State<S, B, X> impl<S, B, X> State<S, B, X>
@ -139,6 +141,10 @@ where
matches!(self, State::None) matches!(self, State::None)
} }
fn is_noop(&self) -> bool {
matches!(self, State::NoOp)
}
fn is_call(&self) -> bool { fn is_call(&self) -> bool {
matches!(self, State::ServiceCall(_)) matches!(self, State::ServiceCall(_))
} }
@ -442,13 +448,19 @@ where
} }
continue; continue;
} }
StateProj::NoOp => {
unreachable!("State::NoOp is only used in handle_request method")
}
}; };
this = self.as_mut().project(); this = self.as_mut().project();
// set new state // set new state
if let Some(state) = state { if let Some(state) = state {
// only set state when it's not noop
if !state.is_noop() {
this.state.set(state); this.state.set(state);
}
if !self.state.is_empty() { if !self.state.is_empty() {
continue; continue;
} }
@ -472,19 +484,68 @@ where
fn handle_request( fn handle_request(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
req: Request, req: Request,
_: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<State<S, B, X>, DispatchError> { ) -> Result<State<S, B, X>, DispatchError> {
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header
let req = if req.head().expect() { if req.head().expect() {
// set dispatcher state so the future is pinned.
let task = self.as_mut().project().expect.call(req); let task = self.as_mut().project().expect.call(req);
return Ok(State::ExpectCall(task)); self.as_mut().project().state.set(State::ExpectCall(task));
} else { } else {
req // the same as above.
let task = self.as_mut().project().service.call(req);
self.as_mut().project().state.set(State::ServiceCall(task));
}; };
// Call service // eagerly poll the future for once(or twice if expect is resolved immediately).
loop {
match self.as_mut().project().state.project() {
StateProj::ExpectCall(fut) => {
match fut.poll(cx) {
// expect is resolved. continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => {
self.as_mut().send_continue();
let task = self.as_mut().project().service.call(req); let task = self.as_mut().project().service.call(req);
Ok(State::ServiceCall(task)) self.as_mut().project().state.set(State::ServiceCall(task));
continue;
}
// future is pending. return NoOp state to notify that we already set
// the state and it should not be updated again.
Poll::Pending => return Ok(State::NoOp),
// future is error. send response and return a state on success to notify
// the dispatcher state should be updated.
Poll::Ready(Err(e)) => {
let e = e.into();
let res: Response = e.into();
let (res, body) = res.replace_body(());
return self.send_response(res, body.into_body());
}
}
}
StateProj::ServiceCall(fut) => {
// return no matter the service call future's result.
return match fut.poll(cx) {
// future is resolved. send response and return a state on success
// to notify the dispatcher state should be updated.
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
self.send_response(res, body)
}
// see the comment on ExpectCall state branch's Pending.
Poll::Pending => Ok(State::NoOp),
// see the comment on ExpectCall state branch's Ready(Err(e)).
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
self.send_response(res, body.into_body())
}
};
}
_ => unreachable!(
"State must be set to ServiceCall or ExceptCall in handle_request"
),
}
}
} }
/// Process one incoming requests /// Process one incoming requests
@ -535,7 +596,9 @@ where
if this.state.is_empty() { if this.state.is_empty() {
let state = self.as_mut().handle_request(req, cx)?; let state = self.as_mut().handle_request(req, cx)?;
this = self.as_mut().project(); this = self.as_mut().project();
if !state.is_noop() {
this.state.set(state); this.state.set(state);
}
} else { } else {
this.messages.push_back(DispatcherMessage::Item(req)); this.messages.push_back(DispatcherMessage::Item(req));
} }