diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 71132c6b2..f9323cc72 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,8 @@ # Changes -## Unreleased - 2021-xx-xx +## Unreleased - 2022-xx-xx +### Fixed +- Consume bytes from read buffer when `Payload` is dropped early. [#2764] ## 3.0.4 - 2022-03-09 diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index dea8a4beb..b0792f611 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -22,7 +22,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode, + Error, Extensions, OnConnectData, Request, Response, StatusCode, }; use super::{ @@ -724,7 +724,7 @@ where this.state.set(State::None); // break out of payload decode loop - break; + return Ok(true); } // Either whole payload is read and loop is broken or more data @@ -736,18 +736,18 @@ where } } - // not enough info to decide if connection is going to be clean or not + // no bytes in the read buffer, but there are still bytes to be read + // according to the content-length header. The client has stopped + // sending data early. Reset the state, set disconnection flag, + // and stop reading. None => { - error!( - "handler did not read whole payload and dispatcher could not \ - drain read buf; return 500 and close connection" - ); - + debug!("client stopped sending data; disconnecting"); + // reset dispatcher state this.flags.insert(Flags::SHUTDOWN); - let mut res = Response::internal_server_error().drop_body(); - res.head_mut().set_connection_type(ConnectionType::Close); - this.messages.push_back(DispatcherMessage::Error(res)); - *this.error = Some(DispatchError::HandlerDroppedPayload); + let _ = this.payload.take(); + this.state.set(State::None); + + // break out of payload decode loop return Ok(true); } } @@ -1010,8 +1010,22 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - let this = self.project(); + let mut this = self.project(); + Self::read_available_projected(&mut this, cx) + } + /// Returns true when I/O stream can be disconnected after write to it. + /// Meant to be called when there is already access to a projected + /// `InnerDispatcher` available. + /// + /// It covers these conditions: + /// - `std::io::ErrorKind::ConnectionReset` after partial read; + /// - all data read done. + #[inline(always)] // TODO: bench this inline + fn read_available_projected( + this: &mut InnerDispatcherProj<'_, T, S, B, X, U>, + cx: &mut Context<'_>, + ) -> Result { if this.flags.contains(Flags::READ_DISCONNECT) { return Ok(false); }; diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 40454d45a..ddb704093 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -783,6 +783,74 @@ async fn upgrade_handling() { .await; } +#[actix_rt::test] +async fn handler_drop_large_payload() { + let _ = env_logger::try_init(); + + const CONTENT_LENGTH: usize = 256 * 1024; + let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap(); + let buf = TestBuffer::new(http_msg(format!( + r" + POST /drop-payload HTTP/1.1 + Content-Length: {} + + {}", + CONTENT_LENGTH, content + ))); + + let services = HttpFlow::new( + drop_payload_service(), + ExpectHandler, + None::, + ); + + let h1 = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(h1); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual + assert_eq!(h1.poll_count, 2); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + assert!(inner.state.is_none()); + } + }) + .await; +} + #[actix_rt::test] async fn handler_drop_payload() { let _ = env_logger::try_init();