From fec029b574a3155e5d818bf805bb45dc71c27b61 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 18 Dec 2020 18:46:13 +0800 Subject: [PATCH] use boolean flag for dispatcher state update --- actix-http/src/h1/dispatcher.rs | 78 ++++++++++++++++----------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5a2855b3c..fe1b8da22 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -338,7 +338,7 @@ where self: Pin<&mut Self>, message: Response<()>, body: ResponseBody, - ) -> Result>, DispatchError> { + ) -> Result<(), DispatchError> { let mut this = self.project(); this.codec .encode(Message::Item((message, body.size())), &mut this.write_buf) @@ -351,9 +351,10 @@ where this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); match body.size() { - BodySize::None | BodySize::Empty => Ok(Some(State::None)), - _ => Ok(Some(State::SendPayload(body))), - } + BodySize::None | BodySize::Empty => this.state.set(State::None), + _ => this.state.set(State::SendPayload(body)), + }; + Ok(()) } fn send_continue(self: Pin<&mut Self>) { @@ -368,18 +369,23 @@ where ) -> Result { loop { let mut this = self.as_mut().project(); - let state = match this.state.project() { + // state is not changed on Poll::Pending. + // other variant and conditions always trigger a state change(or an error). + let state_change = match this.state.project() { StateProj::None => match this.messages.pop_front() { Some(DispatcherMessage::Item(req)) => { - self.as_mut().handle_request(req, cx)? + self.as_mut().handle_request(req, cx)?; + true + } + Some(DispatcherMessage::Error(res)) => { + self.as_mut() + .send_response(res, ResponseBody::Other(Body::Empty))?; + true } - Some(DispatcherMessage::Error(res)) => self - .as_mut() - .send_response(res, ResponseBody::Other(Body::Empty))?, Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } - None => None, + None => false, }, StateProj::ExpectCall(fut) => match fut.poll(cx) { Poll::Ready(Ok(req)) => { @@ -391,25 +397,24 @@ where Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())? + self.as_mut().send_response(res, body.into_body())?; + true } - Poll::Pending => None, + Poll::Pending => false, }, StateProj::ServiceCall(fut) => match fut.poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); - // send_response does not return a None variant of State. - let state = self.as_mut().send_response(res, body)?.unwrap(); - this = self.as_mut().project(); - this.state.set(state); + self.as_mut().send_response(res, body)?; continue; } Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())? + self.as_mut().send_response(res, body.into_body())?; + true } - Poll::Pending => None, + Poll::Pending => false, }, StateProj::SendPayload(mut stream) => { loop { @@ -444,14 +449,9 @@ where } }; - this = self.as_mut().project(); - - // set new state - if let Some(state) = state { - this.state.set(state); - if !self.state.is_empty() { - continue; - } + // state is changed and continue when the state is not Empty + if state_change && !self.state.is_empty() { + continue; } else { // if read-backpressure is enabled and we consumed some data. // we may read more data and retry @@ -473,7 +473,7 @@ where mut self: Pin<&mut Self>, req: Request, cx: &mut Context<'_>, - ) -> Result>, DispatchError> { + ) -> Result<(), DispatchError> { // Handle `EXPECT: 100-Continue` header if req.head().expect() { // set dispatcher state so the future is pinned. @@ -497,11 +497,12 @@ where self.as_mut().project().state.set(State::ServiceCall(task)); continue; } - // future is pending. return None to notify that we already set - // the state and it should not be updated again. - Poll::Pending => return Ok(None), - // future is error. send response and return a state on success to notify - // the dispatcher state should be updated. + // future is pending. return Ok(()) to notify that a new state is + // set and the outer loop should be continue. + Poll::Pending => return Ok(()), + // future is error. send response and return a result. On success + // to notify the dispatcher a new state is set and the outer loop + // should be continue. Poll::Ready(Err(e)) => { let e = e.into(); let res: Response = e.into(); @@ -513,14 +514,15 @@ where StateProj::ServiceCall(fut) => { // return no matter the service call future's result. return match fut.poll(cx) { - // future is resolved. send response and return a state on success - // to notify the dispatcher state should be updated. + // future is resolved. send response and return a result. On success + // to notify the dispatcher a new state is set and the outer loop + // should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) } // see the comment on ExpectCall state branch's Pending. - Poll::Pending => Ok(None), + Poll::Pending => Ok(()), // see the comment on ExpectCall state branch's Ready(Err(e)). Poll::Ready(Err(e)) => { let res: Response = e.into().into(); @@ -582,12 +584,8 @@ where // handle request early if this.state.is_empty() { - // State can be set here - let state = self.as_mut().handle_request(req, cx)?; + self.as_mut().handle_request(req, cx)?; this = self.as_mut().project(); - if let Some(state) = state { - this.state.set(state); - } } else { this.messages.push_back(DispatcherMessage::Item(req)); }