Fix client leak

Each request was creating a new ConnectorPoolSupport future which never
finished. That future also referenced the pool’s inner so even when the
entire pool dropped we leaked the internals. This fixes both.
This commit is contained in:
Patrick Tescher 2020-06-23 17:28:49 -07:00 committed by Patrick Tescher
parent 056803d534
commit 3fe57daa71
No known key found for this signature in database
GPG Key ID: 868BFE173E60138E
4 changed files with 81 additions and 72 deletions

View File

@ -7,6 +7,7 @@
* Migrate cookie handling to `cookie` crate.
* Update `sha-1` to 0.9
* MSRV is now 1.41.1
* Fix client leak [#1580]
## [2.0.0-alpha.4] - 2020-05-21

View File

@ -2,7 +2,7 @@ use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
@ -53,16 +53,25 @@ where
+ 'static,
{
pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self {
let connector_rc = Rc::new(RefCell::new(connector));
let inner_rc = Rc::new(RefCell::new(Inner {
config,
acquired: 0,
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: FxHashMap::default(),
waker: LocalWaker::new(),
}));
// start support future
actix_rt::spawn(ConnectorPoolSupport {
connector: connector_rc.clone(),
inner: Rc::downgrade(&inner_rc),
});
ConnectionPool(
Rc::new(RefCell::new(connector)),
Rc::new(RefCell::new(Inner {
config,
acquired: 0,
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: FxHashMap::default(),
waker: LocalWaker::new(),
})),
connector_rc,
inner_rc,
)
}
}
@ -92,12 +101,6 @@ where
}
fn call(&mut self, req: Connect) -> Self::Future {
// start support future
actix_rt::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
});
let mut connector = self.0.clone();
let inner = self.1.clone();
@ -421,7 +424,7 @@ where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
connector: T,
inner: Rc<RefCell<Inner<Io>>>,
inner: Weak<RefCell<Inner<Io>>>,
}
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
@ -435,51 +438,55 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut inner = this.inner.as_ref().borrow_mut();
inner.waker.register(cx.waker());
if let Some(this_inner) = this.inner.upgrade() {
let mut inner = this_inner.as_ref().borrow_mut();
inner.waker.register(cx.waker());
// check waiters
loop {
let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token)
} else {
break;
// check waiters
loop {
let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token)
} else {
break;
}
};
if inner.waiters.get(token).unwrap().is_none() {
continue;
}
};
if inner.waiters.get(token).unwrap().is_none() {
continue;
}
match inner.acquire(&key, cx) {
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Some(Acquired(key.clone(), Some(this.inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
match inner.acquire(&key, cx) {
Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => {
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
if let Err(conn) = tx.send(Ok(IoConnection::new(
io,
created,
Some(Acquired(key.clone(), Some(this_inner.clone()))),
))) {
let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this_inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
}
}
Acquire::Available => {
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this.inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
}
let _ = inner.waiters_queue.swap_remove_index(0);
}
let _ = inner.waiters_queue.swap_remove_index(0);
}
Poll::Pending
Poll::Pending
} else {
Poll::Ready(())
}
}
}

View File

@ -586,16 +586,16 @@ mod tests {
use super::*;
use crate::Client;
#[test]
fn test_debug() {
#[actix_rt::test]
async fn test_debug() {
let request = Client::new().get("/").header("x-test", "111");
let repr = format!("{:?}", request);
assert!(repr.contains("ClientRequest"));
assert!(repr.contains("x-test"));
}
#[test]
fn test_basics() {
#[actix_rt::test]
async fn test_basics() {
let mut req = Client::new()
.put("/")
.version(Version::HTTP_2)
@ -621,8 +621,8 @@ mod tests {
let _ = req.send_body("");
}
#[test]
fn test_client_header() {
#[actix_rt::test]
async fn test_client_header() {
let req = Client::build()
.header(header::CONTENT_TYPE, "111")
.finish()
@ -639,8 +639,8 @@ mod tests {
);
}
#[test]
fn test_client_header_override() {
#[actix_rt::test]
async fn test_client_header_override() {
let req = Client::build()
.header(header::CONTENT_TYPE, "111")
.finish()
@ -658,8 +658,8 @@ mod tests {
);
}
#[test]
fn client_basic_auth() {
#[actix_rt::test]
async fn client_basic_auth() {
let req = Client::new()
.get("/")
.basic_auth("username", Some("password"));
@ -685,8 +685,8 @@ mod tests {
);
}
#[test]
fn client_bearer_auth() {
#[actix_rt::test]
async fn client_bearer_auth() {
let req = Client::new().get("/").bearer_auth("someS3cr3tAutht0k3n");
assert_eq!(
req.head
@ -699,8 +699,8 @@ mod tests {
);
}
#[test]
fn client_query() {
#[actix_rt::test]
async fn client_query() {
let req = Client::new()
.get("/")
.query(&[("key1", "val1"), ("key2", "val2")])

View File

@ -27,15 +27,16 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
// benchmark sending all requests at the same time
fn bench_async_burst(c: &mut Criterion) {
// We are using System here, since Runtime requires preinitialized tokio
// Maybe add to actix_rt docs
let mut rt = actix_rt::System::new("test");
let srv = test::start(|| {
App::new()
.service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))))
});
// We are using System here, since Runtime requires preinitialized tokio
// Maybe add to actix_rt docs
let url = srv.url("/");
let mut rt = actix_rt::System::new("test");
c.bench_function("get_body_async_burst", move |b| {
b.iter_custom(|iters| {