This commit is contained in:
Kevin R 2022-06-10 17:09:59 -07:00 committed by GitHub
commit e3b24060cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 14 deletions

View File

@ -1,6 +1,8 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2022-xx-xx
### Fixed
- Consume bytes from read buffer when `Payload` is dropped early. [#2764]
## 3.0.4 - 2022-03-09 ## 3.0.4 - 2022-03-09

View File

@ -22,7 +22,7 @@ use crate::{
config::ServiceConfig, config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError}, error::{DispatchError, ParseError, PayloadError},
service::HttpFlow, service::HttpFlow,
ConnectionType, Error, Extensions, OnConnectData, Request, Response, StatusCode, Error, Extensions, OnConnectData, Request, Response, StatusCode,
}; };
use super::{ use super::{
@ -724,7 +724,7 @@ where
this.state.set(State::None); this.state.set(State::None);
// break out of payload decode loop // break out of payload decode loop
break; return Ok(true);
} }
// Either whole payload is read and loop is broken or more data // Either whole payload is read and loop is broken or more data
@ -736,18 +736,18 @@ where
} }
} }
// not enough info to decide if connection is going to be clean or not // no bytes in the read buffer, but there are still bytes to be read
// according to the content-length header. The client has stopped
// sending data early. Reset the state, set disconnection flag,
// and stop reading.
None => { None => {
error!( debug!("client stopped sending data; disconnecting");
"handler did not read whole payload and dispatcher could not \ // reset dispatcher state
drain read buf; return 500 and close connection"
);
this.flags.insert(Flags::SHUTDOWN); this.flags.insert(Flags::SHUTDOWN);
let mut res = Response::internal_server_error().drop_body(); let _ = this.payload.take();
res.head_mut().set_connection_type(ConnectionType::Close); this.state.set(State::None);
this.messages.push_back(DispatcherMessage::Error(res));
*this.error = Some(DispatchError::HandlerDroppedPayload); // break out of payload decode loop
return Ok(true); return Ok(true);
} }
} }
@ -1010,8 +1010,22 @@ where
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, DispatchError> { ) -> Result<bool, DispatchError> {
let this = self.project(); let mut this = self.project();
Self::read_available_projected(&mut this, cx)
}
/// Returns true when I/O stream can be disconnected after write to it.
/// Meant to be called when there is already access to a projected
/// `InnerDispatcher` available.
///
/// It covers these conditions:
/// - `std::io::ErrorKind::ConnectionReset` after partial read;
/// - all data read done.
#[inline(always)] // TODO: bench this inline
fn read_available_projected(
this: &mut InnerDispatcherProj<'_, T, S, B, X, U>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
if this.flags.contains(Flags::READ_DISCONNECT) { if this.flags.contains(Flags::READ_DISCONNECT) {
return Ok(false); return Ok(false);
}; };

View File

@ -783,6 +783,74 @@ async fn upgrade_handling() {
.await; .await;
} }
#[actix_rt::test]
async fn handler_drop_large_payload() {
let _ = env_logger::try_init();
const CONTENT_LENGTH: usize = 256 * 1024;
let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap();
let buf = TestBuffer::new(http_msg(format!(
r"
POST /drop-payload HTTP/1.1
Content-Length: {}
{}",
CONTENT_LENGTH, content
)));
let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);
let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
assert!(h1.as_mut().poll(cx).is_pending());
// polls: manual
assert_eq!(h1.poll_count, 2);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
assert!(inner.state.is_none());
}
})
.await;
}
#[actix_rt::test] #[actix_rt::test]
async fn handler_drop_payload() { async fn handler_drop_payload() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();