mirror of https://github.com/fafhrd91/actix-web
fix(http,web): correctly drop `web::Data` on graceful shutdown
This commit is contained in:
parent
1aa74f4234
commit
b602f602e7
|
|
@ -3,9 +3,11 @@
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
- When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967]
|
- When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967]
|
||||||
|
- Wake HTTP/1 payload receivers with an incomplete-payload error when the sender is dropped before EOF. [#3100]
|
||||||
- Update `foldhash` dependency to `0.2`.
|
- Update `foldhash` dependency to `0.2`.
|
||||||
|
|
||||||
[#3967]: https://github.com/actix/actix-web/issues/3967
|
[#3967]: https://github.com/actix/actix-web/issues/3967
|
||||||
|
[#3100]: https://github.com/actix/actix-web/issues/3100
|
||||||
|
|
||||||
## 3.12.1
|
## 3.12.1
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -140,11 +140,20 @@ impl PayloadSender {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for PayloadSender {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(shared) = self.inner.upgrade() {
|
||||||
|
shared.borrow_mut().close_sender();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct Inner {
|
struct Inner {
|
||||||
len: usize,
|
len: usize,
|
||||||
eof: bool,
|
eof: bool,
|
||||||
err: Option<PayloadError>,
|
err: Option<PayloadError>,
|
||||||
|
sender_closed: bool,
|
||||||
need_read: bool,
|
need_read: bool,
|
||||||
items: VecDeque<Bytes>,
|
items: VecDeque<Bytes>,
|
||||||
task: Option<Waker>,
|
task: Option<Waker>,
|
||||||
|
|
@ -157,6 +166,7 @@ impl Inner {
|
||||||
eof,
|
eof,
|
||||||
len: 0,
|
len: 0,
|
||||||
err: None,
|
err: None,
|
||||||
|
sender_closed: eof,
|
||||||
items: VecDeque::new(),
|
items: VecDeque::new(),
|
||||||
need_read: true,
|
need_read: true,
|
||||||
task: None,
|
task: None,
|
||||||
|
|
@ -200,12 +210,21 @@ impl Inner {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn set_error(&mut self, err: PayloadError) {
|
fn set_error(&mut self, err: PayloadError) {
|
||||||
|
self.sender_closed = true;
|
||||||
self.err = Some(err);
|
self.err = Some(err);
|
||||||
self.wake();
|
self.wake();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn close_sender(&mut self) {
|
||||||
|
if !self.sender_closed {
|
||||||
|
self.sender_closed = true;
|
||||||
|
self.set_error(PayloadError::Incomplete(None));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn feed_eof(&mut self) {
|
fn feed_eof(&mut self) {
|
||||||
|
self.sender_closed = true;
|
||||||
self.eof = true;
|
self.eof = true;
|
||||||
self.wake();
|
self.wake();
|
||||||
}
|
}
|
||||||
|
|
@ -332,6 +351,16 @@ mod tests {
|
||||||
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn wake_on_sender_drop() {
|
||||||
|
let (sender, payload) = Payload::create(false);
|
||||||
|
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
|
||||||
|
|
||||||
|
rx.await.unwrap();
|
||||||
|
drop(sender);
|
||||||
|
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_unread_data() {
|
async fn test_unread_data() {
|
||||||
let (_, mut payload) = Payload::create(false);
|
let (_, mut payload) = Payload::create(false);
|
||||||
|
|
|
||||||
|
|
@ -1013,7 +1013,7 @@ mod tests {
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_multipart_payload_consumption() {
|
async fn test_multipart_payload_consumption() {
|
||||||
// with sample payload and HttpRequest with no headers
|
// with sample payload and HttpRequest with no headers
|
||||||
let (_, inner_payload) = h1::Payload::create(false);
|
let (_sender, inner_payload) = h1::Payload::create(false);
|
||||||
let mut payload = actix_web::dev::Payload::from(inner_payload);
|
let mut payload = actix_web::dev::Payload::from(inner_payload);
|
||||||
let req = TestRequest::default().to_http_request();
|
let req = TestRequest::default().to_http_request();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -231,7 +231,7 @@ mod tests {
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn basic() {
|
async fn basic() {
|
||||||
let (_, payload) = h1::Payload::create(false);
|
let (_sender, payload) = h1::Payload::create(false);
|
||||||
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
|
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
|
||||||
|
|
||||||
assert_eq!(payload.buf.len(), 0);
|
assert_eq!(payload.buf.len(), 0);
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@
|
||||||
- Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944]
|
- Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944]
|
||||||
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
|
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
|
||||||
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
|
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
|
||||||
|
- Fix app data being retained after graceful shutdown with in-flight slow request bodies. [#3100]
|
||||||
- Update `foldhash` dependency to `0.2`.
|
- Update `foldhash` dependency to `0.2`.
|
||||||
- Update `rand` dependency to `0.10`.
|
- Update `rand` dependency to `0.10`.
|
||||||
- Add `HttpServer::h1_write_buffer_size()`.
|
- Add `HttpServer::h1_write_buffer_size()`.
|
||||||
|
|
@ -14,6 +15,7 @@
|
||||||
[#3944]: https://github.com/actix/actix-web/pull/3944
|
[#3944]: https://github.com/actix/actix-web/pull/3944
|
||||||
[#3346]: https://github.com/actix/actix-web/issues/3346
|
[#3346]: https://github.com/actix/actix-web/issues/3346
|
||||||
[#3542]: https://github.com/actix/actix-web/issues/3542
|
[#3542]: https://github.com/actix/actix-web/issues/3542
|
||||||
|
[#3100]: https://github.com/actix/actix-web/issues/3100
|
||||||
|
|
||||||
## 4.13.0
|
## 4.13.0
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -255,7 +255,7 @@ where
|
||||||
T: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
T: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.app_state.pool().clear();
|
self.app_state.pool().disable();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
cell::{Ref, RefCell, RefMut},
|
cell::{Cell, Ref, RefCell, RefMut},
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
fmt,
|
fmt,
|
||||||
hash::{BuildHasher, Hash},
|
hash::{BuildHasher, Hash},
|
||||||
|
|
@ -669,6 +669,7 @@ impl fmt::Debug for HttpRequest {
|
||||||
/// The pool's default capacity is 128 items.
|
/// The pool's default capacity is 128 items.
|
||||||
pub(crate) struct HttpRequestPool {
|
pub(crate) struct HttpRequestPool {
|
||||||
inner: RefCell<Vec<Rc<HttpRequestInner>>>,
|
inner: RefCell<Vec<Rc<HttpRequestInner>>>,
|
||||||
|
enabled: Cell<bool>,
|
||||||
cap: usize,
|
cap: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -682,6 +683,7 @@ impl HttpRequestPool {
|
||||||
pub(crate) fn with_capacity(cap: usize) -> Self {
|
pub(crate) fn with_capacity(cap: usize) -> Self {
|
||||||
HttpRequestPool {
|
HttpRequestPool {
|
||||||
inner: RefCell::new(Vec::with_capacity(cap)),
|
inner: RefCell::new(Vec::with_capacity(cap)),
|
||||||
|
enabled: Cell::new(true),
|
||||||
cap,
|
cap,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -698,7 +700,7 @@ impl HttpRequestPool {
|
||||||
/// Check if the pool still has capacity for request storage.
|
/// Check if the pool still has capacity for request storage.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(crate) fn is_available(&self) -> bool {
|
pub(crate) fn is_available(&self) -> bool {
|
||||||
self.inner.borrow_mut().len() < self.cap
|
self.enabled.get() && self.inner.borrow().len() < self.cap
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a request to pool.
|
/// Push a request to pool.
|
||||||
|
|
@ -707,15 +709,16 @@ impl HttpRequestPool {
|
||||||
self.inner.borrow_mut().push(req);
|
self.inner.borrow_mut().push(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clears all allocated HttpRequest objects.
|
/// Prevents future requests from being returned to the pool and clears existing entries.
|
||||||
pub(crate) fn clear(&self) {
|
pub(crate) fn disable(&self) {
|
||||||
self.inner.borrow_mut().clear()
|
self.enabled.set(false);
|
||||||
|
self.inner.borrow_mut().clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::collections::HashMap;
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
|
||||||
|
|
@ -993,6 +996,41 @@ mod tests {
|
||||||
assert_eq!(resp.headers().get("pool_cap").unwrap(), "128");
|
assert_eq!(resp.headers().get("pool_cap").unwrap(), "128");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_request_dropped_after_service_does_not_reenter_pool() {
|
||||||
|
struct State {
|
||||||
|
_data: Arc<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
let (weak_data, app_data) = {
|
||||||
|
let data = Arc::new("data".to_owned());
|
||||||
|
(Arc::downgrade(&data), web::Data::new(State { _data: data }))
|
||||||
|
};
|
||||||
|
|
||||||
|
let held_req = Rc::new(RefCell::new(None));
|
||||||
|
|
||||||
|
{
|
||||||
|
let held_req = Rc::clone(&held_req);
|
||||||
|
let srv = init_service(App::new().app_data(app_data).service(web::resource("/").to(
|
||||||
|
move |req: HttpRequest| {
|
||||||
|
*held_req.borrow_mut() = Some(req.clone());
|
||||||
|
HttpResponse::Ok()
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let resp = call_service(&srv, TestRequest::default().to_request()).await;
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
|
||||||
|
drop(resp);
|
||||||
|
drop(srv);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(weak_data.upgrade().is_some());
|
||||||
|
drop(held_req.borrow_mut().take());
|
||||||
|
assert!(weak_data.upgrade().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_data() {
|
async fn test_data() {
|
||||||
let srv = init_service(App::new().app_data(10usize).service(web::resource("/").to(
|
let srv = init_service(App::new().app_data(10usize).service(web::resource("/").to(
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,16 @@
|
||||||
#[cfg(feature = "openssl")]
|
#[cfg(feature = "openssl")]
|
||||||
extern crate tls_openssl as openssl;
|
extern crate tls_openssl as openssl;
|
||||||
|
|
||||||
use std::{sync::mpsc, thread, time::Duration};
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
|
sync::{mpsc, Arc},
|
||||||
|
thread,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
|
use actix_web::{rt::time::sleep, web, App, HttpRequest, HttpResponse, HttpServer};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures_util::stream;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_start() {
|
async fn test_start() {
|
||||||
|
|
@ -74,6 +81,74 @@ async fn test_start() {
|
||||||
srv.stop(false).await;
|
srv.stop(false).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_app_data_dropped_after_graceful_shutdown_with_slow_request() {
|
||||||
|
struct State {
|
||||||
|
_data: Arc<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn echo(_body: web::Json<String>) -> HttpResponse {
|
||||||
|
HttpResponse::Ok().finish()
|
||||||
|
}
|
||||||
|
|
||||||
|
let (weak_data, app_data) = {
|
||||||
|
let data = Arc::new("data".to_owned());
|
||||||
|
(Arc::downgrade(&data), web::Data::new(State { _data: data }))
|
||||||
|
};
|
||||||
|
|
||||||
|
let server = HttpServer::new(move || {
|
||||||
|
App::new()
|
||||||
|
.app_data(app_data.clone())
|
||||||
|
.service(web::resource("/echo").route(web::post().to(echo)))
|
||||||
|
})
|
||||||
|
.workers(1)
|
||||||
|
.shutdown_timeout(1)
|
||||||
|
.bind(("127.0.0.1", 0))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let addr = server.addrs()[0];
|
||||||
|
let server = server.run();
|
||||||
|
let server_handle = server.handle();
|
||||||
|
|
||||||
|
let send_request = async move {
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let slow_body = stream::unfold(0, |idx| async move {
|
||||||
|
if idx < 8 {
|
||||||
|
sleep(Duration::from_millis(200)).await;
|
||||||
|
Some((Ok::<_, Infallible>(Bytes::from_static(b" ")), idx + 1))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let client = awc::Client::default();
|
||||||
|
let _ = client
|
||||||
|
.post(format!("http://{addr}/echo"))
|
||||||
|
.insert_header(("content-type", "application/json"))
|
||||||
|
.send_stream(slow_body)
|
||||||
|
.await;
|
||||||
|
};
|
||||||
|
|
||||||
|
let graceful_stop = async move {
|
||||||
|
sleep(Duration::from_millis(300)).await;
|
||||||
|
server_handle.stop(true).await;
|
||||||
|
};
|
||||||
|
|
||||||
|
let (server_res, (), ()) = tokio::join!(server, send_request, graceful_stop);
|
||||||
|
server_res.unwrap();
|
||||||
|
|
||||||
|
for _ in 0..20 {
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
if weak_data.upgrade().is_none() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
panic!("app data still referenced after graceful shutdown");
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "openssl")]
|
#[cfg(feature = "openssl")]
|
||||||
fn ssl_acceptor() -> openssl::ssl::SslAcceptorBuilder {
|
fn ssl_acceptor() -> openssl::ssl::SslAcceptorBuilder {
|
||||||
use openssl::{
|
use openssl::{
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue