use boolean flag for dispatcher state update

This commit is contained in:
fakeshadow 2020-12-18 18:46:13 +08:00
parent 110f2177e5
commit fec029b574
1 changed files with 38 additions and 40 deletions

View File

@ -338,7 +338,7 @@ where
self: Pin<&mut Self>, self: Pin<&mut Self>,
message: Response<()>, message: Response<()>,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> Result<Option<State<S, B, X>>, DispatchError> { ) -> Result<(), DispatchError> {
let mut this = self.project(); let mut this = self.project();
this.codec this.codec
.encode(Message::Item((message, body.size())), &mut this.write_buf) .encode(Message::Item((message, body.size())), &mut this.write_buf)
@ -351,9 +351,10 @@ where
this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); this.flags.set(Flags::KEEPALIVE, this.codec.keepalive());
match body.size() { match body.size() {
BodySize::None | BodySize::Empty => Ok(Some(State::None)), BodySize::None | BodySize::Empty => this.state.set(State::None),
_ => Ok(Some(State::SendPayload(body))), _ => this.state.set(State::SendPayload(body)),
} };
Ok(())
} }
fn send_continue(self: Pin<&mut Self>) { fn send_continue(self: Pin<&mut Self>) {
@ -368,18 +369,23 @@ where
) -> Result<PollResponse, DispatchError> { ) -> Result<PollResponse, DispatchError> {
loop { loop {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
let state = match this.state.project() { // state is not changed on Poll::Pending.
// other variant and conditions always trigger a state change(or an error).
let state_change = match this.state.project() {
StateProj::None => match this.messages.pop_front() { StateProj::None => match this.messages.pop_front() {
Some(DispatcherMessage::Item(req)) => { Some(DispatcherMessage::Item(req)) => {
self.as_mut().handle_request(req, cx)? self.as_mut().handle_request(req, cx)?;
true
}
Some(DispatcherMessage::Error(res)) => {
self.as_mut()
.send_response(res, ResponseBody::Other(Body::Empty))?;
true
} }
Some(DispatcherMessage::Error(res)) => self
.as_mut()
.send_response(res, ResponseBody::Other(Body::Empty))?,
Some(DispatcherMessage::Upgrade(req)) => { Some(DispatcherMessage::Upgrade(req)) => {
return Ok(PollResponse::Upgrade(req)); return Ok(PollResponse::Upgrade(req));
} }
None => None, None => false,
}, },
StateProj::ExpectCall(fut) => match fut.poll(cx) { StateProj::ExpectCall(fut) => match fut.poll(cx) {
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
@ -391,25 +397,24 @@ where
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())? self.as_mut().send_response(res, body.into_body())?;
true
} }
Poll::Pending => None, Poll::Pending => false,
}, },
StateProj::ServiceCall(fut) => match fut.poll(cx) { StateProj::ServiceCall(fut) => match fut.poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
// send_response does not return a None variant of State. self.as_mut().send_response(res, body)?;
let state = self.as_mut().send_response(res, body)?.unwrap();
this = self.as_mut().project();
this.state.set(state);
continue; continue;
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
self.as_mut().send_response(res, body.into_body())? self.as_mut().send_response(res, body.into_body())?;
true
} }
Poll::Pending => None, Poll::Pending => false,
}, },
StateProj::SendPayload(mut stream) => { StateProj::SendPayload(mut stream) => {
loop { loop {
@ -444,14 +449,9 @@ where
} }
}; };
this = self.as_mut().project(); // state is changed and continue when the state is not Empty
if state_change && !self.state.is_empty() {
// set new state continue;
if let Some(state) = state {
this.state.set(state);
if !self.state.is_empty() {
continue;
}
} else { } else {
// if read-backpressure is enabled and we consumed some data. // if read-backpressure is enabled and we consumed some data.
// we may read more data and retry // we may read more data and retry
@ -473,7 +473,7 @@ where
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
req: Request, req: Request,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<Option<State<S, B, X>>, DispatchError> { ) -> Result<(), DispatchError> {
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header
if req.head().expect() { if req.head().expect() {
// set dispatcher state so the future is pinned. // set dispatcher state so the future is pinned.
@ -497,11 +497,12 @@ where
self.as_mut().project().state.set(State::ServiceCall(task)); self.as_mut().project().state.set(State::ServiceCall(task));
continue; continue;
} }
// future is pending. return None to notify that we already set // future is pending. return Ok(()) to notify that a new state is
// the state and it should not be updated again. // set and the outer loop should be continue.
Poll::Pending => return Ok(None), Poll::Pending => return Ok(()),
// future is error. send response and return a state on success to notify // future is error. send response and return a result. On success
// the dispatcher state should be updated. // to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let e = e.into(); let e = e.into();
let res: Response = e.into(); let res: Response = e.into();
@ -513,14 +514,15 @@ where
StateProj::ServiceCall(fut) => { StateProj::ServiceCall(fut) => {
// return no matter the service call future's result. // return no matter the service call future's result.
return match fut.poll(cx) { return match fut.poll(cx) {
// future is resolved. send response and return a state on success // future is resolved. send response and return a result. On success
// to notify the dispatcher state should be updated. // to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
} }
// see the comment on ExpectCall state branch's Pending. // see the comment on ExpectCall state branch's Pending.
Poll::Pending => Ok(None), Poll::Pending => Ok(()),
// see the comment on ExpectCall state branch's Ready(Err(e)). // see the comment on ExpectCall state branch's Ready(Err(e)).
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
let res: Response = e.into().into(); let res: Response = e.into().into();
@ -582,12 +584,8 @@ where
// handle request early // handle request early
if this.state.is_empty() { if this.state.is_empty() {
// State can be set here self.as_mut().handle_request(req, cx)?;
let state = self.as_mut().handle_request(req, cx)?;
this = self.as_mut().project(); this = self.as_mut().project();
if let Some(state) = state {
this.state.set(state);
}
} else { } else {
this.messages.push_back(DispatcherMessage::Item(req)); this.messages.push_back(DispatcherMessage::Item(req));
} }