mirror of https://github.com/fafhrd91/actix-web
fix(awc): do not request as chunked if body is empty (#3910)
This commit is contained in:
parent
9856a3b056
commit
41e4863748
|
|
@ -3,6 +3,7 @@
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
- Minimum supported Rust version (MSRV) is now 1.88.
|
- Minimum supported Rust version (MSRV) is now 1.88.
|
||||||
|
- Fix empty streaming request bodies being sent with chunked transfer encoding.
|
||||||
|
|
||||||
## 3.8.1
|
## 3.8.1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,35 @@ where
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
B::Error: Into<BoxError>,
|
B::Error: Into<BoxError>,
|
||||||
{
|
{
|
||||||
|
actix_rt::pin!(body);
|
||||||
|
|
||||||
|
let orig_length = body.size();
|
||||||
|
let mut length = orig_length;
|
||||||
|
let mut first_chunk = None;
|
||||||
|
|
||||||
|
// This avoids sending `Transfer-Encoding: chunked` for requests with an empty body stream.
|
||||||
|
// https://github.com/actix/actix-web/issues/2320
|
||||||
|
if matches!(orig_length, BodySize::Stream) {
|
||||||
|
enum Peek<E> {
|
||||||
|
Pending,
|
||||||
|
Item(Result<Bytes, E>),
|
||||||
|
Eof,
|
||||||
|
}
|
||||||
|
|
||||||
|
match poll_fn(|cx| match body.as_mut().poll_next(cx) {
|
||||||
|
Poll::Pending => Poll::Ready(Peek::Pending),
|
||||||
|
Poll::Ready(Some(res)) => Poll::Ready(Peek::Item(res)),
|
||||||
|
Poll::Ready(None) => Poll::Ready(Peek::Eof),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Peek::Pending => {}
|
||||||
|
Peek::Eof => length = BodySize::Sized(0),
|
||||||
|
Peek::Item(Ok(chunk)) => first_chunk = Some(chunk),
|
||||||
|
Peek::Item(Err(err)) => return Err(SendRequestError::Body(err.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// set request host header
|
// set request host header
|
||||||
if !head.as_ref().headers.contains_key(HOST)
|
if !head.as_ref().headers.contains_key(HOST)
|
||||||
&& !head.extra_headers().iter().any(|h| h.contains_key(HOST))
|
&& !head.extra_headers().iter().any(|h| h.contains_key(HOST))
|
||||||
|
|
@ -67,7 +96,7 @@ where
|
||||||
// Check EXPECT header and enable expect handle flag accordingly.
|
// Check EXPECT header and enable expect handle flag accordingly.
|
||||||
// See https://datatracker.ietf.org/doc/html/rfc7231#section-5.1.1
|
// See https://datatracker.ietf.org/doc/html/rfc7231#section-5.1.1
|
||||||
let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
|
let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
|
||||||
match body.size() {
|
match orig_length {
|
||||||
BodySize::None | BodySize::Sized(0) => {
|
BodySize::None | BodySize::Sized(0) => {
|
||||||
let keep_alive = framed.codec_ref().keep_alive();
|
let keep_alive = framed.codec_ref().keep_alive();
|
||||||
framed.io_mut().on_release(keep_alive);
|
framed.io_mut().on_release(keep_alive);
|
||||||
|
|
@ -86,7 +115,7 @@ where
|
||||||
|
|
||||||
// special handle for EXPECT request.
|
// special handle for EXPECT request.
|
||||||
let (do_send, mut res_head) = if is_expect {
|
let (do_send, mut res_head) = if is_expect {
|
||||||
pin_framed.send((head, body.size()).into()).await?;
|
pin_framed.send((head, length).into()).await?;
|
||||||
|
|
||||||
let head = poll_fn(|cx| pin_framed.as_mut().poll_next(cx))
|
let head = poll_fn(|cx| pin_framed.as_mut().poll_next(cx))
|
||||||
.await
|
.await
|
||||||
|
|
@ -96,18 +125,18 @@ where
|
||||||
// and current head would be used as final response head.
|
// and current head would be used as final response head.
|
||||||
(head.status == StatusCode::CONTINUE, Some(head))
|
(head.status == StatusCode::CONTINUE, Some(head))
|
||||||
} else {
|
} else {
|
||||||
pin_framed.feed((head, body.size()).into()).await?;
|
pin_framed.feed((head, length).into()).await?;
|
||||||
|
|
||||||
(true, None)
|
(true, None)
|
||||||
};
|
};
|
||||||
|
|
||||||
if do_send {
|
if do_send {
|
||||||
// send request body
|
// send request body
|
||||||
match body.size() {
|
match length {
|
||||||
BodySize::None | BodySize::Sized(0) => {
|
BodySize::None | BodySize::Sized(0) => {
|
||||||
poll_fn(|cx| pin_framed.as_mut().flush(cx)).await?;
|
poll_fn(|cx| pin_framed.as_mut().flush(cx)).await?;
|
||||||
}
|
}
|
||||||
_ => send_body(body, pin_framed.as_mut()).await?,
|
_ => send_body(body.as_mut(), pin_framed.as_mut(), first_chunk).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
// read response and init read body
|
// read response and init read body
|
||||||
|
|
@ -157,15 +186,18 @@ where
|
||||||
|
|
||||||
/// send request body to the peer
|
/// send request body to the peer
|
||||||
pub(crate) async fn send_body<Io, B>(
|
pub(crate) async fn send_body<Io, B>(
|
||||||
body: B,
|
mut body: Pin<&mut B>,
|
||||||
mut framed: Pin<&mut Framed<Io, h1::ClientCodec>>,
|
mut framed: Pin<&mut Framed<Io, h1::ClientCodec>>,
|
||||||
|
first_chunk: Option<Bytes>,
|
||||||
) -> Result<(), SendRequestError>
|
) -> Result<(), SendRequestError>
|
||||||
where
|
where
|
||||||
Io: ConnectionIo,
|
Io: ConnectionIo,
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
B::Error: Into<BoxError>,
|
B::Error: Into<BoxError>,
|
||||||
{
|
{
|
||||||
actix_rt::pin!(body);
|
if let Some(chunk) = first_chunk {
|
||||||
|
framed.as_mut().write(h1::Message::Chunk(Some(chunk)))?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut eof = false;
|
let mut eof = false;
|
||||||
while !eof {
|
while !eof {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
use std::{convert::Infallible, time::Duration};
|
||||||
|
|
||||||
|
use actix_rt::net::TcpListener;
|
||||||
|
use awc::Client;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures_util::stream;
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt as _, AsyncWriteExt as _},
|
||||||
|
time::timeout,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn empty_body_stream_does_not_use_chunked_encoding() {
|
||||||
|
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
// Minimal HTTP/1.1 server that rejects chunked requests.
|
||||||
|
let srv = actix_rt::spawn(async move {
|
||||||
|
let (mut sock, _) = listener.accept().await.unwrap();
|
||||||
|
|
||||||
|
let mut buf = Vec::with_capacity(1024);
|
||||||
|
let mut tmp = [0u8; 1024];
|
||||||
|
|
||||||
|
let header_end = loop {
|
||||||
|
let n = timeout(Duration::from_secs(2), sock.read(&mut tmp))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
if n == 0 {
|
||||||
|
break None;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_from_slice(&tmp[..n]);
|
||||||
|
|
||||||
|
if let Some(pos) = buf.windows(4).position(|w| w == b"\r\n\r\n") {
|
||||||
|
break Some(pos + 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
if buf.len() > 16 * 1024 {
|
||||||
|
break None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.expect("did not receive complete request headers");
|
||||||
|
|
||||||
|
let headers_lower = String::from_utf8_lossy(&buf[..header_end]).to_ascii_lowercase();
|
||||||
|
let has_chunked = headers_lower.contains("\r\ntransfer-encoding: chunked\r\n");
|
||||||
|
|
||||||
|
if has_chunked {
|
||||||
|
// Drain terminating chunk so client doesn't error on write before response is read.
|
||||||
|
let terminator = b"0\r\n\r\n";
|
||||||
|
while !buf[header_end..]
|
||||||
|
.windows(terminator.len())
|
||||||
|
.any(|w| w == terminator)
|
||||||
|
{
|
||||||
|
let n = match timeout(Duration::from_secs(2), sock.read(&mut tmp)).await {
|
||||||
|
Ok(Ok(n)) => n,
|
||||||
|
_ => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_from_slice(&tmp[..n]);
|
||||||
|
|
||||||
|
if buf.len() > 32 * 1024 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let status = if has_chunked {
|
||||||
|
"400 Bad Request"
|
||||||
|
} else {
|
||||||
|
"200 OK"
|
||||||
|
};
|
||||||
|
let resp = format!("HTTP/1.1 {status}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
|
||||||
|
sock.write_all(resp.as_bytes()).await.unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
let url = format!("http://{addr}/");
|
||||||
|
let res = Client::default()
|
||||||
|
.get(url)
|
||||||
|
.send_stream(stream::empty::<Result<Bytes, Infallible>>())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(res.status().is_success());
|
||||||
|
|
||||||
|
srv.await.unwrap();
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue