fix stuck connection when handler doesn't read payload

- drops connection if dispatcher's state indicated there are outstanding bytes
- otherwise clean dispatcher ready for next pipelined or keep-alive request
This commit is contained in:
Rob Ede 2022-02-02 20:07:38 +00:00
parent 075df88a07
commit 104b10c9a6
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 74 additions and 4 deletions

View File

@ -1,6 +1,14 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
### Changed
- `error::DispatcherError` enum is now marked `#[non_exhaustive]`. [#????]
### Fixed
- Issue where handlers that took payload but then dropped without reading it to EOF it would cause keep-alive connections to become stuck. [#????]
[#2611]: https://github.com/actix/actix-web/pull/2611
## 3.0.0-rc.1 - 2022-01-31 ## 3.0.0-rc.1 - 2022-01-31

View File

@ -340,6 +340,7 @@ impl From<PayloadError> for Error {
/// A set of errors that can occur during dispatching HTTP requests. /// A set of errors that can occur during dispatching HTTP requests.
#[derive(Debug, Display, From)] #[derive(Debug, Display, From)]
#[non_exhaustive]
pub enum DispatchError { pub enum DispatchError {
/// Service error. /// Service error.
#[display(fmt = "Service Error")] #[display(fmt = "Service Error")]
@ -373,6 +374,10 @@ pub enum DispatchError {
#[display(fmt = "Connection shutdown timeout")] #[display(fmt = "Connection shutdown timeout")]
DisconnectTimeout, DisconnectTimeout,
/// Handler dropped payload before reading EOF.
#[display(fmt = "Handler dropped payload before reading EOF")]
HandlerDroppedPayload,
/// Internal error. /// Internal error.
#[display(fmt = "Internal error")] #[display(fmt = "Internal error")]
InternalError, InternalError,

View File

@ -125,11 +125,13 @@ impl Decoder for Codec {
self.flags.set(Flags::HEAD, head.method == Method::HEAD); self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version; self.version = head.version;
self.conn_type = head.connection_type(); self.conn_type = head.connection_type();
if self.conn_type == ConnectionType::KeepAlive if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEP_ALIVE_ENABLED) && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{ {
self.conn_type = ConnectionType::Close self.conn_type = ConnectionType::Close
} }
match payload { match payload {
PayloadType::None => self.payload = None, PayloadType::None => self.payload = None,
PayloadType::Payload(pl) => self.payload = Some(pl), PayloadType::Payload(pl) => self.payload = Some(pl),

View File

@ -209,15 +209,16 @@ impl MessageType for Request {
let (len, method, uri, ver, h_len) = { let (len, method, uri, ver, h_len) = {
// SAFETY: // SAFETY:
// Create an uninitialized array of `MaybeUninit`. The `assume_init` is // Create an uninitialized array of `MaybeUninit`. The `assume_init` is safe because the
// safe because the type we are claiming to have initialized here is a // type we are claiming to have initialized here is a bunch of `MaybeUninit`s, which
// bunch of `MaybeUninit`s, which do not require initialization. // do not require initialization.
let mut parsed = unsafe { let mut parsed = unsafe {
MaybeUninit::<[MaybeUninit<httparse::Header<'_>>; MAX_HEADERS]>::uninit() MaybeUninit::<[MaybeUninit<httparse::Header<'_>>; MAX_HEADERS]>::uninit()
.assume_init() .assume_init()
}; };
let mut req = httparse::Request::new(&mut []); let mut req = httparse::Request::new(&mut []);
match req.parse_with_uninit_headers(src, &mut parsed)? { match req.parse_with_uninit_headers(src, &mut parsed)? {
httparse::Status::Complete(len) => { httparse::Status::Complete(len) => {
let method = Method::from_bytes(req.method.unwrap().as_bytes()) let method = Method::from_bytes(req.method.unwrap().as_bytes())
@ -232,6 +233,7 @@ impl MessageType for Request {
(len, method, uri, version, req.headers.len()) (len, method, uri, version, req.headers.len())
} }
httparse::Status::Partial => { httparse::Status::Partial => {
return if src.len() >= MAX_BUFFER_SIZE { return if src.len() >= MAX_BUFFER_SIZE {
trace!("MAX_BUFFER_SIZE unprocessed data reached, closing"); trace!("MAX_BUFFER_SIZE unprocessed data reached, closing");

View File

@ -152,6 +152,7 @@ pin_project! {
#[pin] #[pin]
state: State<S, B, X>, state: State<S, B, X>,
// when Some(_) dispatcher is in state of receiving request payload
payload: Option<PayloadSender>, payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>, messages: VecDeque<DispatcherMessage>,
@ -686,12 +687,64 @@ where
let can_not_read = !self.can_read(cx); let can_not_read = !self.can_read(cx);
// limit amount of non-processed requests // limit amount of non-processed requests
if pipeline_queue_full || can_not_read { if pipeline_queue_full {
return Ok(false); return Ok(false);
} }
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
if can_not_read {
log::debug!("cannot read request payload");
// if we cannot read request payload...
if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
log::warn!("handler dropped payload early");
// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
Some(msg) => {
match msg {
// payload decoded did not yield EOF yet
Message::Chunk(Some(_)) => {
// if non-clean connection, next loop iter will detect empty
// read buffer and close connection
}
// connection is in clean state for next request
Message::Chunk(None) => {
// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);
// break out of payload decode loop
break;
}
// Either whole payload is read and loop is broken or more data
// was expected in which case connection is closed. In both
// situations dispatcher cannot get here.
Message::Item(_) => {
unreachable!("dispatcher is in payload receive state")
}
}
}
// not enough info to decide if connection is going to be clean or not
None => {
log::error!(
"handler did not read whole payload; closing connection"
);
return Err(DispatchError::HandlerDroppedPayload);
}
}
}
}
}
}
let mut updated = false; let mut updated = false;
loop { loop {