From 66e3dafaa7ad1990a51c1d3133ae68ba50d6057f Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 10:41:42 -0800 Subject: [PATCH] revert dispatcher change --- actix-http/src/h1/decoder.rs | 27 +- actix-http/src/h1/dispatcher.rs | 562 +++++++++++++++----------------- 2 files changed, 270 insertions(+), 319 deletions(-) diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index 9da958563..85379b084 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -14,7 +14,7 @@ use crate::header::HeaderMap; use crate::message::{ConnectionType, ResponseHead}; use crate::request::Request; -pub(crate) const MAX_BUFFER_SIZE: usize = 131_072; +const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; /// Incoming message decoder @@ -203,15 +203,7 @@ impl MessageType for Request { (len, method, uri, version, req.headers.len()) } - httparse::Status::Partial => { - return if src.len() >= MAX_BUFFER_SIZE { - trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - Err(ParseError::TooLarge) - } else { - // Return None to notify more read are needed for parsing request - Ok(None) - }; - } + httparse::Status::Partial => return Ok(None), } }; @@ -230,6 +222,9 @@ impl MessageType for Request { PayloadLength::None => { if method == Method::CONNECT { PayloadType::Stream(PayloadDecoder::eof()) + } else if src.len() >= MAX_BUFFER_SIZE { + trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + return Err(ParseError::TooLarge); } else { PayloadType::None } @@ -278,14 +273,7 @@ impl MessageType for ResponseHead { (len, version, status, res.headers.len()) } - httparse::Status::Partial => { - return if src.len() >= MAX_BUFFER_SIZE { - error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - Err(ParseError::TooLarge) - } else { - Ok(None) - } - } + httparse::Status::Partial => return Ok(None), } }; @@ -301,6 +289,9 @@ impl MessageType for ResponseHead { } else if status == StatusCode::SWITCHING_PROTOCOLS { // switching protocol or connect PayloadType::Stream(PayloadDecoder::eof()) + } else if src.len() >= MAX_BUFFER_SIZE { + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + return Err(ParseError::TooLarge); } else { // for HTTP/1.0 read to eof and close connection if msg.version == Version::HTTP_10 { diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index e2ab0a347..90a5f0e06 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -45,7 +45,7 @@ bitflags! { } } -#[pin_project] +#[pin_project::pin_project] /// Dispatcher for HTTP/1.1 protocol pub struct Dispatcher where @@ -139,14 +139,27 @@ where fn is_empty(&self) -> bool { matches!(self, State::None) } -} + fn is_call(&self) -> bool { + matches!(self, State::ServiceCall(_)) + } +} enum PollResponse { Upgrade(Request), DoNothing, DrainWriteBuf, } +impl PartialEq for PollResponse { + fn eq(&self, other: &PollResponse) -> bool { + match self { + PollResponse::DrainWriteBuf => matches!(other, PollResponse::DrainWriteBuf), + PollResponse::DoNothing => matches!(other, PollResponse::DoNothing), + _ => false, + } + } +} + impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -163,7 +176,7 @@ where pub(crate) fn new( stream: T, config: ServiceConfig, - flow: Rc>, + services: Rc>, on_connect_data: OnConnectData, peer_addr: Option, ) -> Self { @@ -173,7 +186,7 @@ where config, BytesMut::with_capacity(HW_BUFFER_SIZE), None, - flow, + services, on_connect_data, peer_addr, ) @@ -186,7 +199,7 @@ where config: ServiceConfig, read_buf: BytesMut, timeout: Option, - flow: Rc>, + services: Rc>, on_connect_data: OnConnectData, peer_addr: Option, ) -> Self { @@ -216,7 +229,7 @@ where io: Some(io), codec, read_buf, - flow, + flow: services, on_connect_data, flags, peer_addr, @@ -256,14 +269,13 @@ where } // if checked is set to true, delay disconnect until all tasks have finished. - fn client_disconnected(self: Pin<&mut Self>, err: impl Into) { + fn client_disconnected(self: Pin<&mut Self>) { let this = self.project(); this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } - *this.error = Some(err.into()); } /// Flush stream @@ -312,10 +324,9 @@ where message: Response<()>, body: ResponseBody, ) -> Result<(), DispatchError> { - let size = body.size(); let mut this = self.project(); this.codec - .encode(Message::Item((message, size)), &mut this.write_buf) + .encode(Message::Item((message, body.size())), &mut this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); @@ -324,76 +335,74 @@ where })?; this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); - match size { + match body.size() { BodySize::None | BodySize::Empty => this.state.set(State::None), _ => this.state.set(State::SendPayload(body)), }; Ok(()) } + fn send_continue(self: Pin<&mut Self>) { + self.project() + .write_buf + .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); + } + fn poll_response( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { loop { let mut this = self.as_mut().project(); - match this.state.as_mut().project() { - // no future is in InnerDispatcher state. pop next message. + // state is not changed on Poll::Pending. + // other variant and conditions always trigger a state change(or an error). + let state_change = match this.state.project() { StateProj::None => match this.messages.pop_front() { - // handle request message. Some(DispatcherMessage::Item(req)) => { - // Handle `EXPECT: 100-Continue` header - if req.head().expect() { - // set InnerDispatcher state and continue loop to poll it. - let task = this.flow.expect.call(req); - this.state.set(State::ExpectCall(task)); - } else { - // the same as expect call. - let task = this.flow.service.call(req); - this.state.set(State::ServiceCall(task)); - }; + self.as_mut().handle_request(req, cx)?; + true } - // handle error message. Some(DispatcherMessage::Error(res)) => { - // send_response would update InnerDispatcher state to SendPayload or - // None(If response body is empty). - // continue loop to poll it. self.as_mut() .send_response(res, ResponseBody::Other(Body::Empty))?; + true } - // return with upgrade request and poll it exclusively. Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } - // all messages are dealt with. - None => return Ok(PollResponse::DoNothing), + None => false, }, - StateProj::ServiceCall(fut) => match fut.poll(cx) { - // service call resolved. send response. - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body)?; + StateProj::ExpectCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(req)) => { + self.as_mut().send_continue(); + this = self.as_mut().project(); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall(fut)); + continue; } - // send service call error as response Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); self.as_mut().send_response(res, body.into_body())?; + true } - // service call pending and could be waiting for more chunk messages. - // (pipeline message limit and/or payload can_read limit) - Poll::Pending => { - // no new message is decoded and no new payload is feed. - // nothing to do except waiting for new incoming data from client. - if !self.as_mut().poll_request(cx)? { - return Ok(PollResponse::DoNothing); - } - // otherwise keep loop. + Poll::Pending => false, + }, + StateProj::ServiceCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); + self.as_mut().send_response(res, body)?; + continue; } + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_response(res, body.into_body())?; + true + } + Poll::Pending => false, }, StateProj::SendPayload(mut stream) => { - // keep populate writer buffer until buffer size limit hit, - // get blocked or finished. loop { if this.write_buf.len() < HW_BUFFER_SIZE { match stream.as_mut().poll_next(cx) { @@ -402,60 +411,50 @@ where Message::Chunk(Some(item)), &mut this.write_buf, )?; + continue; } Poll::Ready(None) => { this.codec.encode( Message::Chunk(None), &mut this.write_buf, )?; - // payload stream finished. - // break and goes out of scope of borrowed stream. - break; + this = self.as_mut().project(); + this.state.set(State::None); } - Poll::Ready(Some(Err(e))) => { - return Err(DispatchError::Service(e)) + Poll::Ready(Some(Err(_))) => { + return Err(DispatchError::Unknown) } - // Payload Stream Pending should only be given when the caller - // promise to wake it up properly. - // - // TODO: Think if it's an good idea to mix in a self wake up. - // It would turn dispatcher into a busy polling style of stream - // handling. (Or another timer as source of scheduled wake up) - // As There is no way to know when or how the caller would wake - // up the stream so a self wake up is an overhead that would - // result in a double polling(or an extra timer) Poll::Pending => return Ok(PollResponse::DoNothing), } } else { - // buffer is beyond max size. - // return and write the whole buffer to io stream. return Ok(PollResponse::DrainWriteBuf); } + break; } - // break from Poll::Ready(None) on stream finished. - // this is for re borrow InnerDispatcher state and set it to None. - this.state.set(State::None); + continue; } - StateProj::ExpectCall(fut) => match fut.poll(cx) { - // expect resolved. write continue to buffer and set InnerDispatcher state - // to service call. - Poll::Ready(Ok(req)) => { - this.write_buf - .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall(fut)); + }; + + // state is changed and continue when the state is not Empty + if state_change { + if !self.state.is_empty() { + continue; + } + } else { + // if read-backpressure is enabled and we consumed some data. + // we may read more data and retry + if self.state.is_call() { + if self.as_mut().poll_request(cx)? { + continue; } - // send expect error as response - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; - } - // expect must be solved before progress can be made. - Poll::Pending => return Ok(PollResponse::DoNothing), - }, + } else if !self.messages.is_empty() { + continue; + } } + break; } + + Ok(PollResponse::DoNothing) } fn handle_request( @@ -463,28 +462,52 @@ where req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - let mut this = self.as_mut().project(); - // Handle `EXPECT: 100-Continue` header if req.head().expect() { - // set InnerDispatcher state so the future is pinned. + // set dispatcher state so the future is pinned. + let mut this = self.as_mut().project(); let task = this.flow.expect.call(req); this.state.set(State::ExpectCall(task)); } else { // the same as above. + let mut this = self.as_mut().project(); let task = this.flow.service.call(req); this.state.set(State::ServiceCall(task)); }; // eagerly poll the future for once(or twice if expect is resolved immediately). loop { - match this.state.as_mut().project() { + match self.as_mut().project().state.project() { + StateProj::ExpectCall(fut) => { + match fut.poll(cx) { + // expect is resolved. continue loop and poll the service call branch. + Poll::Ready(Ok(req)) => { + self.as_mut().send_continue(); + let mut this = self.as_mut().project(); + let task = this.flow.service.call(req); + this.state.set(State::ServiceCall(task)); + continue; + } + // future is pending. return Ok(()) to notify that a new state is + // set and the outer loop should be continue. + Poll::Pending => return Ok(()), + // future is error. send response and return a result. On success + // to notify the dispatcher a new state is set and the outer loop + // should be continue. + Poll::Ready(Err(e)) => { + let e = e.into(); + let res: Response = e.into(); + let (res, body) = res.replace_body(()); + return self.send_response(res, body.into_body()); + } + } + } StateProj::ServiceCall(fut) => { // return no matter the service call future's result. return match fut.poll(cx) { // future is resolved. send response and return a result. On success - // to notify the dispatcher a new InnerDispatcher state is set and the - // outer loop should be continue. + // to notify the dispatcher a new state is set and the outer loop + // should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) @@ -499,28 +522,6 @@ where } }; } - StateProj::ExpectCall(fut) => { - match fut.poll(cx) { - // expect is resolved. continue loop and poll the service call branch. - Poll::Ready(Ok(req)) => { - this.write_buf - .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); - let task = this.flow.service.call(req); - this.state.as_mut().set(State::ServiceCall(task)); - } - // future is pending. return Ok(()) to notify that a new InnerDispatcher - // state is set and the outer loop should be continue. - Poll::Pending => return Ok(()), - // future is error. send response and return a result. On success - // to notify the dispatcher a new InnerDispatcher state is set and - // the outer loop should be continue. - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - return self.send_response(res, body.into_body()); - } - } - } _ => unreachable!( "State must be set to ServiceCall or ExceptCall in handle_request" ), @@ -547,39 +548,26 @@ where this.flags.insert(Flags::STARTED); match msg { - // handle new request. Message::Item(mut req) => { + let pl = this.codec.message_type(); req.head_mut().peer_addr = *this.peer_addr; // merge on_connect_ext data into request extensions this.on_connect_data.merge_into(&mut req); - match this.codec.message_type() { - // break when upgrade received. - // existing buffer and io stream would be handled by framed - // after upgrade success. - MessageType::Stream if this.flow.upgrade.is_some() => { - this.messages - .push_back(DispatcherMessage::Upgrade(req)); - break; - } - // construct request and payload. - MessageType::Payload | MessageType::Stream => { - // PayloadSender and Payload are smart pointers share the same - // state. Payload is pass to Request and handed to service - // for extracting state. PayloadSender is held by dispatcher - // to push new data/error to state. - let (ps, pl) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::Payload::H1(pl)); - req = req1; - *this.payload = Some(ps); - } - // Ignore empty payload. - MessageType::None => {} + if pl == MessageType::Stream && this.flow.upgrade.is_some() { + this.messages.push_back(DispatcherMessage::Upgrade(req)); + break; + } + if pl == MessageType::Payload || pl == MessageType::Stream { + let (ps, pl) = Payload::create(false); + let (req1, _) = + req.replace_payload(crate::Payload::H1(pl)); + req = req1; + *this.payload = Some(ps); } - // handle request early if no future lives in InnerDispatcher state. + // handle request early if this.state.is_empty() { self.as_mut().handle_request(req, cx)?; this = self.as_mut().project(); @@ -587,60 +575,54 @@ where this.messages.push_back(DispatcherMessage::Item(req)); } } - Message::Chunk(Some(chunk)) => match this.payload { - Some(ref mut payload) => payload.feed_data(chunk), - None => { + Message::Chunk(Some(chunk)) => { + if let Some(ref mut payload) = this.payload { + payload.feed_data(chunk); + } else { error!( "Internal server error: unexpected payload chunk" ); - self.as_mut().response_error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), - DispatchError::InternalError, - ); - this = self.project(); + )); + *this.error = Some(DispatchError::InternalError); break; } - }, - Message::Chunk(None) => match this.payload.take() { - Some(mut payload) => payload.feed_eof(), - None => { + } + Message::Chunk(None) => { + if let Some(mut payload) = this.payload.take() { + payload.feed_eof(); + } else { error!("Internal server error: unexpected eof"); - self.as_mut().response_error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), - DispatchError::InternalError, - ); - this = self.project(); + )); + *this.error = Some(DispatchError::InternalError); break; } - }, + } } } Ok(None) => break, Err(ParseError::Io(e)) => { - self.as_mut().client_disconnected(e); + self.as_mut().client_disconnected(); this = self.as_mut().project(); - break; - } - // big size requests overflow should be responded with 413 - Err(ParseError::TooLarge) => { - if let Some(mut payload) = this.payload.take() { - payload.set_error(PayloadError::EncodingCorrupted); - } - self.as_mut().response_error( - Response::PayloadTooLarge().finish().drop_body(), - ParseError::TooLarge, - ); - this = self.project(); + *this.error = Some(DispatchError::Io(e)); break; } Err(e) => { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } + // Malformed requests should be responded with 400 - self.as_mut() - .response_error(Response::BadRequest().finish().drop_body(), e); - this = self.project(); + this.messages.push_back(DispatcherMessage::Error( + Response::BadRequest().finish().drop_body(), + )); + this.flags.insert(Flags::READ_DISCONNECT); + *this.error = Some(e.into()); break; } } @@ -651,7 +633,6 @@ where *this.ka_expire = expire; } } - Ok(updated) } @@ -739,94 +720,6 @@ where } Ok(()) } - - /// Returns true when io stream can be disconnected after write to it. - /// - /// It covers these conditions: - /// - /// - `Flags::READ_DISCONNECT` flag active. - /// - `std::io::ErrorKind::ConnectionReset` after partial read. - /// - all data read done. - #[inline(always)] - fn read_available( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Result { - let this = self.project(); - - if this.flags.contains(Flags::READ_DISCONNECT) { - return Ok(true); - }; - - let mut io = Pin::new(this.io.as_mut().unwrap()); - - let mut read_some = false; - - loop { - // grow buffer if necessary. - let remaining = this.read_buf.capacity() - this.read_buf.len(); - if remaining < LW_BUFFER_SIZE { - this.read_buf.reserve(HW_BUFFER_SIZE - remaining); - } - - match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { - Poll::Pending => return Ok(false), - Poll::Ready(Ok(n)) => { - if n == 0 { - return Ok(true); - } else { - // Return early when read buf exceed decoder's max buffer size. - if this.read_buf.len() >= super::decoder::MAX_BUFFER_SIZE { - // at this point it's not known io is still scheduled to - // be waked up. so force wake up dispatcher just in case. - // TODO: figure out the overhead. - cx.waker().wake_by_ref(); - return Ok(false); - } - - read_some = true; - } - } - Poll::Ready(Err(err)) => { - return if err.kind() == io::ErrorKind::WouldBlock { - Ok(false) - } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { - Ok(true) - } else { - Err(DispatchError::Io(err)) - } - } - } - } - } - - /// call upgrade service with request. - fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future { - let this = self.project(); - let mut parts = FramedParts::with_read_buf( - this.io.take().unwrap(), - mem::take(this.codec), - mem::take(this.read_buf), - ); - parts.write_buf = mem::take(this.write_buf); - let framed = Framed::from_parts(parts); - this.flow.upgrade.as_ref().unwrap().call((req, framed)) - } - - /// response error handler. - fn response_error( - self: Pin<&mut Self>, - res: Response<()>, - err: impl Into, - ) { - let this = self.project(); - // set flag to read disconnect so no new data is read. - this.flags.insert(Flags::READ_DISCONNECT); - // add response to message and send back to client. - this.messages.push_back(DispatcherMessage::Error(res)); - // attach error for resolve dispatcher future with error. - *this.error = Some(err.into()); - } } impl Future for Dispatcher @@ -860,10 +753,9 @@ where if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { - // flush buffer. + // flush buffer inner.as_mut().poll_flush(cx)?; if !inner.write_buf.is_empty() { - // still have unfinished data. wait. Poll::Pending } else { Pin::new(inner.project().io.as_mut().unwrap()) @@ -872,46 +764,60 @@ where } } } else { - // read from io stream and fill read buffer. - let should_disconnect = inner.as_mut().read_available(cx)?; + // read socket into a buf + let should_disconnect = + if !inner.flags.contains(Flags::READ_DISCONNECT) { + let mut inner_p = inner.as_mut().project(); + read_available( + cx, + inner_p.io.as_mut().unwrap(), + &mut inner_p.read_buf, + )? + } else { + None + }; inner.as_mut().poll_request(cx)?; - - // io stream should to be closed. - if should_disconnect { - let inner = inner.as_mut().project(); - inner.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = inner.payload.take() { + if let Some(true) = should_disconnect { + let inner_p = inner.as_mut().project(); + inner_p.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = inner_p.payload.take() { payload.feed_eof(); } }; loop { - // grow buffer if necessary. - { - let inner = inner.as_mut().project(); - let remaining = - inner.write_buf.capacity() - inner.write_buf.len(); - if remaining < LW_BUFFER_SIZE { - inner.write_buf.reserve(HW_BUFFER_SIZE - remaining); - } + let inner_p = inner.as_mut().project(); + let remaining = + inner_p.write_buf.capacity() - inner_p.write_buf.len(); + if remaining < LW_BUFFER_SIZE { + inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining); } + let result = inner.as_mut().poll_response(cx)?; + let drain = result == PollResponse::DrainWriteBuf; - // poll_response and populate write buffer. - // drain indicate if write buffer should be emptied before next run. - let drain = match inner.as_mut().poll_response(cx)? { - PollResponse::DrainWriteBuf => true, - PollResponse::DoNothing => false, - // upgrade request and goes Upgrade variant of DispatcherState. - PollResponse::Upgrade(req) => { - let upgrade = inner.upgrade(req); - self.as_mut() - .project() - .inner - .set(DispatcherState::Upgrade(upgrade)); - return self.poll(cx); - } - }; + // switch to upgrade handler + if let PollResponse::Upgrade(req) = result { + let inner_p = inner.as_mut().project(); + let mut parts = FramedParts::with_read_buf( + inner_p.io.take().unwrap(), + mem::take(inner_p.codec), + mem::take(inner_p.read_buf), + ); + parts.write_buf = mem::take(inner_p.write_buf); + let framed = Framed::from_parts(parts); + let upgrade = inner_p + .flow + .upgrade + .as_ref() + .unwrap() + .call((req, framed)); + self.as_mut() + .project() + .inner + .set(DispatcherState::Upgrade(upgrade)); + return self.poll(cx); + } // we didn't get WouldBlock from write operation, // so data get written to kernel completely (macOS) @@ -929,29 +835,28 @@ where return Poll::Ready(Ok(())); } - // check if still have unsolved future in InnerDispatcher state. let is_empty = inner.state.is_empty(); - let inner = inner.as_mut().project(); + let inner_p = inner.as_mut().project(); // read half is closed and we do not processing any responses - if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty { - inner.flags.insert(Flags::SHUTDOWN); + if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { + inner_p.flags.insert(Flags::SHUTDOWN); } // keep-alive and stream errors - if is_empty && inner.write_buf.is_empty() { - if let Some(err) = inner.error.take() { + if is_empty && inner_p.write_buf.is_empty() { + if let Some(err) = inner_p.error.take() { Poll::Ready(Err(err)) } // disconnect if keep-alive is not enabled - else if inner.flags.contains(Flags::STARTED) - && !inner.flags.intersects(Flags::KEEPALIVE) + else if inner_p.flags.contains(Flags::STARTED) + && !inner_p.flags.intersects(Flags::KEEPALIVE) { - inner.flags.insert(Flags::SHUTDOWN); + inner_p.flags.insert(Flags::SHUTDOWN); self.poll(cx) } // disconnect if shutdown - else if inner.flags.contains(Flags::SHUTDOWN) { + else if inner_p.flags.contains(Flags::SHUTDOWN) { self.poll(cx) } else { Poll::Pending @@ -969,6 +874,61 @@ where } } +/// Returns either: +/// - `Ok(Some(true))` - data was read and done reading all data. +/// - `Ok(Some(false))` - data was read but there should be more to read. +/// - `Ok(None)` - no data was read but there should be more to read later. +/// - Unhandled Errors +fn read_available( + cx: &mut Context<'_>, + io: &mut T, + buf: &mut BytesMut, +) -> Result, io::Error> +where + T: AsyncRead + Unpin, +{ + let mut read_some = false; + + loop { + // If buf is full return but do not disconnect since + // there is more reading to be done + if buf.len() >= HW_BUFFER_SIZE { + return Ok(Some(false)); + } + + let remaining = buf.capacity() - buf.len(); + if remaining < LW_BUFFER_SIZE { + buf.reserve(HW_BUFFER_SIZE - remaining); + } + + match actix_codec::poll_read_buf(Pin::new(io), cx, buf) { + Poll::Pending => { + return if read_some { Ok(Some(false)) } else { Ok(None) }; + } + Poll::Ready(Ok(n)) => { + if n == 0 { + return Ok(Some(true)); + } else { + read_some = true; + } + } + Poll::Ready(Err(err)) => { + return if err.kind() == io::ErrorKind::WouldBlock { + if read_some { + Ok(Some(false)) + } else { + Ok(None) + } + } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { + Ok(Some(true)) + } else { + Err(err) + } + } + } + } +} + #[cfg(test)] mod tests { use std::str;