diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index b53e96fa3..25dca5bcf 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -21,7 +21,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Error, Extensions, OnConnectData, Request, Response, StatusCode, + ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode, }; use super::{ @@ -151,7 +151,7 @@ pin_project! { error: Option, #[pin] - state: State, + pub(super) state: State, // when Some(_) dispatcher is in state of receiving request payload payload: Option, messages: VecDeque, @@ -175,7 +175,7 @@ enum DispatcherMessage { pin_project! { #[project = StateProj] - enum State + pub(super) enum State where S: Service, X: Service, @@ -195,7 +195,7 @@ where X: Service, B: MessageBody, { - fn is_none(&self) -> bool { + pub(super) fn is_none(&self) -> bool { matches!(self, State::None) } } @@ -700,7 +700,7 @@ where if let Some(sender) = &this.payload { // ...maybe handler does not want to read any more payload... if let PayloadStatus::Dropped = sender.need_read(cx) { - log::warn!("handler dropped payload early"); + log::debug!("handler dropped payload early; attempt to clean connection"); // ...in which case poll request payload a few times loop { match this.codec.decode(this.read_buf)? { @@ -714,6 +714,8 @@ where // connection is in clean state for next request Message::Chunk(None) => { + log::debug!("connection successfully cleaned"); + // reset dispatcher state let _ = this.payload.take(); this.state.set(State::None); @@ -734,10 +736,16 @@ where // not enough info to decide if connection is going to be clean or not None => { log::error!( - "handler did not read whole payload; closing connection" + "handler did not read whole payload and dispatcher could not \ + drain read buf; return 500 and close connection" ); - return Err(DispatchError::HandlerDroppedPayload); + this.flags.insert(Flags::SHUTDOWN); + let mut res = Response::internal_server_error().drop_body(); + res.head_mut().set_connection_type(ConnectionType::Close); + this.messages.push_back(DispatcherMessage::Error(res)); + *this.error = Some(DispatchError::HandlerDroppedPayload); + return Ok(true); } } } diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 891cce69c..40454d45a 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,6 +1,6 @@ use std::{future::Future, str, task::Poll, time::Duration}; -use actix_rt::time::sleep; +use actix_rt::{pin, time::sleep}; use actix_service::fn_service; use actix_utils::future::{ready, Ready}; use bytes::Bytes; @@ -53,6 +53,14 @@ fn echo_path_service( }) } +fn drop_payload_service( +) -> impl Service, Error = Error> { + fn_service(|mut req: Request| async move { + let _ = req.take_payload(); + Ok::<_, Error>(Response::with_body(StatusCode::OK, "payload dropped")) + }) +} + fn echo_payload_service() -> impl Service, Error = Error> { fn_service(|mut req: Request| { Box::pin(async move { @@ -89,7 +97,7 @@ async fn late_request() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -156,7 +164,7 @@ async fn oneshot_connection() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -173,13 +181,16 @@ async fn oneshot_connection() { stabilize_date_header(&mut res); let res = &res[..]; - let exp = b"\ - HTTP/1.1 200 OK\r\n\ - content-length: 5\r\n\ - connection: close\r\n\ - date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ - /abcd\ - "; + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 5 + connection: close + date: Thu, 01 Jan 1970 12:34:56 UTC + + /abcd + ", + ); assert_eq!( res, @@ -188,7 +199,7 @@ async fn oneshot_connection() { response: {:?}\n\ expected: {:?}", String::from_utf8_lossy(res), - String::from_utf8_lossy(exp) + String::from_utf8_lossy(&exp) ); }) .await; @@ -214,7 +225,7 @@ async fn keep_alive_timeout() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -293,7 +304,7 @@ async fn keep_alive_follow_up_req() { None, OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); lazy(|cx| { assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -413,7 +424,7 @@ async fn req_parse_err() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); match h1.as_mut().poll(cx) { Poll::Pending => panic!(), @@ -459,7 +470,7 @@ async fn pipelining_ok_then_ok() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -529,7 +540,7 @@ async fn pipelining_ok_then_bad() { OnConnectData::default(), ); - actix_rt::pin!(h1); + pin!(h1); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -601,7 +612,7 @@ async fn expect_handling() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_pending()); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -678,7 +689,7 @@ async fn expect_eager() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); @@ -761,7 +772,7 @@ async fn upgrade_handling() { ", ); - actix_rt::pin!(h1); + pin!(h1); assert!(h1.as_mut().poll(cx).is_ready()); assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. })); @@ -771,3 +782,192 @@ async fn upgrade_handling() { }) .await; } + +#[actix_rt::test] +async fn handler_drop_payload() { + let _ = env_logger::try_init(); + + let mut buf = TestBuffer::new(http_msg( + r" + POST /drop-payload HTTP/1.1 + Content-Length: 3 + + abc + ", + )); + + let services = HttpFlow::new( + drop_payload_service(), + ExpectHandler, + None::, + ); + + let h1 = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(h1); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual + assert_eq!(h1.poll_count, 1); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + assert!(inner.state.is_none()); + } + }) + .await; + + lazy(|cx| { + // add message that claims to have payload longer than provided + buf.extend_read_buf(http_msg( + r" + POST /drop-payload HTTP/1.1 + Content-Length: 200 + + abc + ", + )); + + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual => manual + assert_eq!(h1.poll_count, 2); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + // expect response immediately even though request side has not finished reading payload + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_ready()); + + // polls: manual => manual => manual + assert_eq!(h1.poll_count, 3); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + // expect that unrequested error response is sent back since connection could not be cleaned + let exp = http_msg( + r" + HTTP/1.1 500 Internal Server Error + content-length: 0 + connection: close + date: Thu, 01 Jan 1970 12:34:56 UTC + + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; +} + +fn http_msg(msg: impl AsRef) -> BytesMut { + let mut msg = msg + .as_ref() + .trim() + .split('\n') + .into_iter() + .map(|line| [line.trim_start(), "\r"].concat()) + .collect::>() + .join("\n"); + + // remove trailing \r + msg.pop(); + + if !msg.is_empty() && !msg.contains("\r\n\r\n") { + msg.push_str("\r\n\r\n"); + } + + BytesMut::from(msg.as_bytes()) +} + +#[test] +fn http_msg_creates_msg() { + assert_eq!(http_msg(r""), ""); + + assert_eq!( + http_msg( + r" + POST / HTTP/1.1 + Content-Length: 3 + + abc + " + ), + "POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc" + ); + + assert_eq!( + http_msg( + r" + GET / HTTP/1.1 + Content-Length: 3 + + " + ), + "GET / HTTP/1.1\r\nContent-Length: 3\r\n\r\n" + ); +}