add drop payload tests

This commit is contained in:
Rob Ede 2022-02-03 05:28:11 +00:00
parent a7671c30c2
commit 0b3fa41234
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
2 changed files with 234 additions and 26 deletions

View File

@ -21,7 +21,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode,
};
use super::{
@ -151,7 +151,7 @@ pin_project! {
error: Option<DispatchError>,
#[pin]
state: State<S, B, X>,
pub(super) state: State<S, B, X>,
// when Some(_) dispatcher is in state of receiving request payload
payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>,
@ -175,7 +175,7 @@ enum DispatcherMessage {
pin_project! {
#[project = StateProj]
enum State<S, B, X>
pub(super) enum State<S, B, X>
where
S: Service<Request>,
X: Service<Request, Response = Request>,
@ -195,7 +195,7 @@ where
X: Service<Request, Response = Request>,
B: MessageBody,
{
fn is_none(&self) -> bool {
pub(super) fn is_none(&self) -> bool {
matches!(self, State::None)
}
}
@ -700,7 +700,7 @@ where
if let Some(sender) = &this.payload {
// ...maybe handler does not want to read any more payload...
if let PayloadStatus::Dropped = sender.need_read(cx) {
log::warn!("handler dropped payload early");
log::debug!("handler dropped payload early; attempt to clean connection");
// ...in which case poll request payload a few times
loop {
match this.codec.decode(this.read_buf)? {
@ -714,6 +714,8 @@ where
// connection is in clean state for next request
Message::Chunk(None) => {
log::debug!("connection successfully cleaned");
// reset dispatcher state
let _ = this.payload.take();
this.state.set(State::None);
@ -734,10 +736,16 @@ where
// not enough info to decide if connection is going to be clean or not
None => {
log::error!(
"handler did not read whole payload; closing connection"
"handler did not read whole payload and dispatcher could not \
drain read buf; return 500 and close connection"
);
return Err(DispatchError::HandlerDroppedPayload);
this.flags.insert(Flags::SHUTDOWN);
let mut res = Response::internal_server_error().drop_body();
res.head_mut().set_connection_type(ConnectionType::Close);
this.messages.push_back(DispatcherMessage::Error(res));
*this.error = Some(DispatchError::HandlerDroppedPayload);
return Ok(true);
}
}
}

View File

@ -1,6 +1,6 @@
use std::{future::Future, str, task::Poll, time::Duration};
use actix_rt::time::sleep;
use actix_rt::{pin, time::sleep};
use actix_service::fn_service;
use actix_utils::future::{ready, Ready};
use bytes::Bytes;
@ -53,6 +53,14 @@ fn echo_path_service(
})
}
fn drop_payload_service(
) -> impl Service<Request, Response = Response<&'static str>, Error = Error> {
fn_service(|mut req: Request| async move {
let _ = req.take_payload();
Ok::<_, Error>(Response::with_body(StatusCode::OK, "payload dropped"))
})
}
fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
fn_service(|mut req: Request| {
Box::pin(async move {
@ -89,7 +97,7 @@ async fn late_request() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -156,7 +164,7 @@ async fn oneshot_connection() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -173,13 +181,16 @@ async fn oneshot_connection() {
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 5\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/abcd\
";
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 5
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
/abcd
",
);
assert_eq!(
res,
@ -188,7 +199,7 @@ async fn oneshot_connection() {
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
String::from_utf8_lossy(&exp)
);
})
.await;
@ -214,7 +225,7 @@ async fn keep_alive_timeout() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -293,7 +304,7 @@ async fn keep_alive_follow_up_req() {
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -413,7 +424,7 @@ async fn req_parse_err() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
match h1.as_mut().poll(cx) {
Poll::Pending => panic!(),
@ -459,7 +470,7 @@ async fn pipelining_ok_then_ok() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -529,7 +540,7 @@ async fn pipelining_ok_then_bad() {
OnConnectData::default(),
);
actix_rt::pin!(h1);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -601,7 +612,7 @@ async fn expect_handling() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_pending());
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -678,7 +689,7 @@ async fn expect_eager() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
@ -761,7 +772,7 @@ async fn upgrade_handling() {
",
);
actix_rt::pin!(h1);
pin!(h1);
assert!(h1.as_mut().poll(cx).is_ready());
assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. }));
@ -771,3 +782,192 @@ async fn upgrade_handling() {
})
.await;
}
#[actix_rt::test]
async fn handler_drop_payload() {
let _ = env_logger::try_init();
let mut buf = TestBuffer::new(http_msg(
r"
POST /drop-payload HTTP/1.1
Content-Length: 3
abc
",
));
let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);
let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual
assert_eq!(h1.poll_count, 1);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
assert!(inner.state.is_none());
}
})
.await;
lazy(|cx| {
// add message that claims to have payload longer than provided
buf.extend_read_buf(http_msg(
r"
POST /drop-payload HTTP/1.1
Content-Length: 200
abc
",
));
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual => manual
assert_eq!(h1.poll_count, 2);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect response immediately even though request side has not finished reading payload
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_ready());
// polls: manual => manual => manual
assert_eq!(h1.poll_count, 3);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
// expect that unrequested error response is sent back since connection could not be cleaned
let exp = http_msg(
r"
HTTP/1.1 500 Internal Server Error
content-length: 0
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
}
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
let mut msg = msg
.as_ref()
.trim()
.split('\n')
.into_iter()
.map(|line| [line.trim_start(), "\r"].concat())
.collect::<Vec<_>>()
.join("\n");
// remove trailing \r
msg.pop();
if !msg.is_empty() && !msg.contains("\r\n\r\n") {
msg.push_str("\r\n\r\n");
}
BytesMut::from(msg.as_bytes())
}
#[test]
fn http_msg_creates_msg() {
assert_eq!(http_msg(r""), "");
assert_eq!(
http_msg(
r"
POST / HTTP/1.1
Content-Length: 3
abc
"
),
"POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc"
);
assert_eq!(
http_msg(
r"
GET / HTTP/1.1
Content-Length: 3
"
),
"GET / HTTP/1.1\r\nContent-Length: 3\r\n\r\n"
);
}