mirror of https://github.com/fafhrd91/actix-web
Compare commits
6 Commits
d5512e9739
...
63a73fb980
Author | SHA1 | Date |
---|---|---|
|
63a73fb980 | |
|
5041cd1c65 | |
|
d3c46537b3 | |
|
9b68186946 | |
|
a816f617d1 | |
|
d3f8f7c854 |
|
@ -2,6 +2,8 @@
|
|||
|
||||
## Unreleased
|
||||
|
||||
- Properly wake Payload receivers when feeding errors or EOF
|
||||
|
||||
## 3.11.1
|
||||
|
||||
- Prevent more hangs after client disconnects.
|
||||
|
|
|
@ -156,7 +156,7 @@ serde_json = "1.0"
|
|||
static_assertions = "1"
|
||||
tls-openssl = { package = "openssl", version = "0.10.55" }
|
||||
tls-rustls_023 = { package = "rustls", version = "0.23" }
|
||||
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
|
||||
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
|
|
@ -304,6 +304,14 @@ where
|
|||
U: Service<(Request, Framed<T, Codec>), Response = ()>,
|
||||
U::Error: fmt::Display,
|
||||
{
|
||||
fn finalize_payload_if_present(mut self: Pin<&mut Self>, reason: &str) {
|
||||
let this = self.as_mut().project();
|
||||
if let Some(mut payload) = this.payload.take() {
|
||||
trace!("Finalizing payload early: {reason}");
|
||||
payload.feed_eof();
|
||||
}
|
||||
}
|
||||
|
||||
fn can_read(&self, cx: &mut Context<'_>) -> bool {
|
||||
if self.flags.contains(Flags::READ_DISCONNECT) {
|
||||
false
|
||||
|
@ -686,6 +694,11 @@ where
|
|||
|
||||
// limit amount of non-processed requests
|
||||
if pipeline_queue_full || can_not_read {
|
||||
// since we're here, it's possible the client has been sent a response before we've been able to read the body
|
||||
// in this case, we should eof the payload to prevent the next request from reading invalid data
|
||||
// this can occur with certain load balancers that pipeline requests
|
||||
self.as_mut()
|
||||
.finalize_payload_if_present("pipeline queue full or cannot read");
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
|
|
|
@ -200,11 +200,13 @@ impl Inner {
|
|||
#[inline]
|
||||
fn set_error(&mut self, err: PayloadError) {
|
||||
self.err = Some(err);
|
||||
self.wake();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn feed_eof(&mut self) {
|
||||
self.eof = true;
|
||||
self.wake();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -253,8 +255,13 @@ impl Inner {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{task::Poll, time::Duration};
|
||||
|
||||
use actix_rt::time::timeout;
|
||||
use actix_utils::future::poll_fn;
|
||||
use futures_util::{FutureExt, StreamExt};
|
||||
use static_assertions::{assert_impl_all, assert_not_impl_any};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -263,6 +270,67 @@ mod tests {
|
|||
|
||||
assert_impl_all!(Inner: Unpin, Send, Sync);
|
||||
|
||||
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
fn prepare_waking_test(
|
||||
mut payload: Payload,
|
||||
expected: Option<Result<(), ()>>,
|
||||
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let handle = actix_rt::spawn(async move {
|
||||
// Make sure to poll once to set the waker
|
||||
poll_fn(|cx| {
|
||||
assert!(payload.poll_next_unpin(cx).is_pending());
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
tx.send(()).unwrap();
|
||||
|
||||
// actix-rt is single-threaded, so this won't race with `rx.await`
|
||||
let mut pend_once = false;
|
||||
poll_fn(|_| {
|
||||
if pend_once {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
// Return pending without storing wakers, we already did on the previous
|
||||
// `poll_fn`, now this task will only continue if the `sender` wakes us
|
||||
pend_once = true;
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let got = payload.next().now_or_never().unwrap();
|
||||
match expected {
|
||||
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
|
||||
Some(Err(_)) => assert!(got.unwrap().is_err()),
|
||||
None => assert!(got.is_none()),
|
||||
}
|
||||
});
|
||||
(rx, handle)
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn wake_on_error() {
|
||||
let (mut sender, payload) = Payload::create(false);
|
||||
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
|
||||
|
||||
rx.await.unwrap();
|
||||
sender.set_error(PayloadError::Incomplete(None));
|
||||
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn wake_on_eof() {
|
||||
let (mut sender, payload) = Payload::create(false);
|
||||
let (rx, handle) = prepare_waking_test(payload, None);
|
||||
|
||||
rx.await.unwrap();
|
||||
sender.feed_eof();
|
||||
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_unread_data() {
|
||||
let (_, mut payload) = Payload::create(false);
|
||||
|
|
|
@ -24,7 +24,7 @@ allowed_external_types = [
|
|||
actix = { version = ">=0.12, <0.14", default-features = false }
|
||||
actix-codec = "0.5"
|
||||
actix-http = "3"
|
||||
actix-web = { version = "4", default-features = false }
|
||||
actix-web = { version = "4", default-features = false, features = ["ws"] }
|
||||
|
||||
bytes = "1"
|
||||
bytestring = "1"
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
|
||||
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
|
||||
- Add `ws` crate feature (on-by-default) which forwards to `actix-http` and guards some of its `ResponseError` impls.
|
||||
|
||||
## 4.11.0
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ default = [
|
|||
"http2",
|
||||
"unicode",
|
||||
"compat",
|
||||
"ws",
|
||||
]
|
||||
|
||||
# Brotli algorithm content-encoding support
|
||||
|
@ -85,9 +86,12 @@ cookies = ["dep:cookie"]
|
|||
# Secure & signed cookies
|
||||
secure-cookies = ["cookies", "cookie/secure"]
|
||||
|
||||
# HTTP/2 support (including h2c).
|
||||
# HTTP/2 support (including h2c)
|
||||
http2 = ["actix-http/http2"]
|
||||
|
||||
# WebSocket support
|
||||
ws = ["actix-http/ws"]
|
||||
|
||||
# TLS via OpenSSL
|
||||
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
|
||||
|
||||
|
@ -131,7 +135,7 @@ actix-service = "2"
|
|||
actix-tls = { version = "3.4", default-features = false, optional = true }
|
||||
actix-utils = "3"
|
||||
|
||||
actix-http = { version = "3.11", features = ["ws"] }
|
||||
actix-http = "3.11"
|
||||
actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
|
||||
actix-web-codegen = { version = "4.3", optional = true, default-features = false }
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ use std::{
|
|||
io::{self, Write as _},
|
||||
};
|
||||
|
||||
use actix_http::Response;
|
||||
use bytes::BytesMut;
|
||||
|
||||
use crate::{
|
||||
|
@ -126,20 +125,24 @@ impl ResponseError for actix_http::error::PayloadError {
|
|||
}
|
||||
}
|
||||
|
||||
impl ResponseError for actix_http::ws::ProtocolError {}
|
||||
|
||||
impl ResponseError for actix_http::error::ContentTypeError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::BAD_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ws")]
|
||||
impl ResponseError for actix_http::ws::HandshakeError {
|
||||
fn error_response(&self) -> HttpResponse<BoxBody> {
|
||||
Response::from(self).map_into_boxed_body().into()
|
||||
actix_http::Response::from(self)
|
||||
.map_into_boxed_body()
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "ws")]
|
||||
impl ResponseError for actix_http::ws::ProtocolError {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
Loading…
Reference in New Issue