mirror of https://github.com/fafhrd91/actix-web
Read remaining bytes when cleaning dropped payload #2764
This commit is contained in:
parent
dce57a79c9
commit
b381052dad
|
@ -706,6 +706,9 @@ where
|
|||
debug!("handler dropped payload early; attempt to clean connection");
|
||||
// ...in which case poll request payload a few times
|
||||
loop {
|
||||
if this.read_buf.is_empty() {
|
||||
Self::read_available_projected(&mut this, cx)?;
|
||||
}
|
||||
match this.codec.decode(this.read_buf)? {
|
||||
Some(msg) => {
|
||||
match msg {
|
||||
|
@ -1010,8 +1013,22 @@ where
|
|||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Result<bool, DispatchError> {
|
||||
let this = self.project();
|
||||
let mut this = self.project();
|
||||
Self::read_available_projected(&mut this, cx)
|
||||
}
|
||||
|
||||
/// Returns true when I/O stream can be disconnected after write to it.
|
||||
/// Meant to be called when there is already access to a projected
|
||||
/// `InnerDispatcher` available.
|
||||
///
|
||||
/// It covers these conditions:
|
||||
/// - `std::io::ErrorKind::ConnectionReset` after partial read;
|
||||
/// - all data read done.
|
||||
#[inline(always)] // TODO: bench this inline
|
||||
fn read_available_projected(
|
||||
this: &mut InnerDispatcherProj<'_, T, S, B, X, U>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Result<bool, DispatchError> {
|
||||
if this.flags.contains(Flags::READ_DISCONNECT) {
|
||||
return Ok(false);
|
||||
};
|
||||
|
|
|
@ -783,6 +783,74 @@ async fn upgrade_handling() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn handler_drop_large_payload() {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
const CONTENT_LENGTH: usize = 256 * 1024;
|
||||
let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap();
|
||||
let buf = TestBuffer::new(http_msg(format!(
|
||||
r"
|
||||
POST /drop-payload HTTP/1.1
|
||||
Content-Length: {}
|
||||
|
||||
{}",
|
||||
CONTENT_LENGTH, content
|
||||
)));
|
||||
|
||||
let services = HttpFlow::new(
|
||||
drop_payload_service(),
|
||||
ExpectHandler,
|
||||
None::<UpgradeHandler>,
|
||||
);
|
||||
|
||||
let h1 = Dispatcher::new(
|
||||
buf.clone(),
|
||||
services,
|
||||
ServiceConfig::default(),
|
||||
None,
|
||||
OnConnectData::default(),
|
||||
);
|
||||
pin!(h1);
|
||||
|
||||
lazy(|cx| {
|
||||
assert!(h1.as_mut().poll(cx).is_pending());
|
||||
assert!(h1.as_mut().poll(cx).is_pending());
|
||||
|
||||
// polls: manual
|
||||
assert_eq!(h1.poll_count, 2);
|
||||
|
||||
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
||||
stabilize_date_header(&mut res);
|
||||
let res = &res[..];
|
||||
|
||||
let exp = http_msg(
|
||||
r"
|
||||
HTTP/1.1 200 OK
|
||||
content-length: 15
|
||||
date: Thu, 01 Jan 1970 12:34:56 UTC
|
||||
|
||||
payload dropped
|
||||
",
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
exp,
|
||||
"\nexpected response not in write buffer:\n\
|
||||
response: {:?}\n\
|
||||
expected: {:?}",
|
||||
String::from_utf8_lossy(res),
|
||||
String::from_utf8_lossy(&exp)
|
||||
);
|
||||
|
||||
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
|
||||
assert!(inner.state.is_none());
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn handler_drop_payload() {
|
||||
let _ = env_logger::try_init();
|
||||
|
|
Loading…
Reference in New Issue