From 104b10c9a6ee2c5e19c240fdba537f8aea387d12 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 2 Feb 2022 20:07:38 +0000 Subject: [PATCH] fix stuck connection when handler doesn't read payload - drops connection if dispatcher's state indicated there are outstanding bytes - otherwise clean dispatcher ready for next pipelined or keep-alive request --- actix-http/CHANGES.md | 8 +++++ actix-http/src/error.rs | 5 +++ actix-http/src/h1/codec.rs | 2 ++ actix-http/src/h1/decoder.rs | 8 +++-- actix-http/src/h1/dispatcher.rs | 55 ++++++++++++++++++++++++++++++++- 5 files changed, 74 insertions(+), 4 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 7f7af23a8..e407c397b 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,14 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +- `error::DispatcherError` enum is now marked `#[non_exhaustive]`. [#????] + + +### Fixed +- Issue where handlers that took payload but then dropped without reading it to EOF it would cause keep-alive connections to become stuck. [#????] + +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 3.0.0-rc.1 - 2022-01-31 diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 841322861..52b953421 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -340,6 +340,7 @@ impl From for Error { /// A set of errors that can occur during dispatching HTTP requests. #[derive(Debug, Display, From)] +#[non_exhaustive] pub enum DispatchError { /// Service error. #[display(fmt = "Service Error")] @@ -373,6 +374,10 @@ pub enum DispatchError { #[display(fmt = "Connection shutdown timeout")] DisconnectTimeout, + /// Handler dropped payload before reading EOF. + #[display(fmt = "Handler dropped payload before reading EOF")] + HandlerDroppedPayload, + /// Internal error. #[display(fmt = "Internal error")] InternalError, diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index df74bcc42..80afd7455 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -125,11 +125,13 @@ impl Decoder for Codec { self.flags.set(Flags::HEAD, head.method == Method::HEAD); self.version = head.version; self.conn_type = head.connection_type(); + if self.conn_type == ConnectionType::KeepAlive && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED) { self.conn_type = ConnectionType::Close } + match payload { PayloadType::None => self.payload = None, PayloadType::Payload(pl) => self.payload = Some(pl), diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index fa924f920..17b9b695c 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -209,15 +209,16 @@ impl MessageType for Request { let (len, method, uri, ver, h_len) = { // SAFETY: - // Create an uninitialized array of `MaybeUninit`. The `assume_init` is - // safe because the type we are claiming to have initialized here is a - // bunch of `MaybeUninit`s, which do not require initialization. + // Create an uninitialized array of `MaybeUninit`. The `assume_init` is safe because the + // type we are claiming to have initialized here is a bunch of `MaybeUninit`s, which + // do not require initialization. let mut parsed = unsafe { MaybeUninit::<[MaybeUninit>; MAX_HEADERS]>::uninit() .assume_init() }; let mut req = httparse::Request::new(&mut []); + match req.parse_with_uninit_headers(src, &mut parsed)? { httparse::Status::Complete(len) => { let method = Method::from_bytes(req.method.unwrap().as_bytes()) @@ -232,6 +233,7 @@ impl MessageType for Request { (len, method, uri, version, req.headers.len()) } + httparse::Status::Partial => { return if src.len() >= MAX_BUFFER_SIZE { trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3f327171d..b53e96fa3 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -152,6 +152,7 @@ pin_project! { #[pin] state: State, + // when Some(_) dispatcher is in state of receiving request payload payload: Option, messages: VecDeque, @@ -686,12 +687,64 @@ where let can_not_read = !self.can_read(cx); // limit amount of non-processed requests - if pipeline_queue_full || can_not_read { + if pipeline_queue_full { return Ok(false); } let mut this = self.as_mut().project(); + if can_not_read { + log::debug!("cannot read request payload"); + + // if we cannot read request payload... + if let Some(sender) = &this.payload { + // ...maybe handler does not want to read any more payload... + if let PayloadStatus::Dropped = sender.need_read(cx) { + log::warn!("handler dropped payload early"); + // ...in which case poll request payload a few times + loop { + match this.codec.decode(this.read_buf)? { + Some(msg) => { + match msg { + // payload decoded did not yield EOF yet + Message::Chunk(Some(_)) => { + // if non-clean connection, next loop iter will detect empty + // read buffer and close connection + } + + // connection is in clean state for next request + Message::Chunk(None) => { + // reset dispatcher state + let _ = this.payload.take(); + this.state.set(State::None); + + // break out of payload decode loop + break; + } + + // Either whole payload is read and loop is broken or more data + // was expected in which case connection is closed. In both + // situations dispatcher cannot get here. + Message::Item(_) => { + unreachable!("dispatcher is in payload receive state") + } + } + } + + // not enough info to decide if connection is going to be clean or not + None => { + log::error!( + "handler did not read whole payload; closing connection" + ); + + return Err(DispatchError::HandlerDroppedPayload); + } + } + } + } + } + } + let mut updated = false; loop {