mirror of https://github.com/fafhrd91/actix-web
properly drop h2 connection
This commit is contained in:
parent
c201c15f8c
commit
08b7fd86ee
|
@ -1,9 +1,12 @@
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt, io, time};
|
use std::{fmt, io, time};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
|
||||||
|
use actix_utils::task::LocalWaker;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::future::{err, Either, FutureExt, LocalBoxFuture, Ready};
|
use futures_util::future::{err, Either, FutureExt, LocalBoxFuture, Ready};
|
||||||
use h2::client::SendRequest;
|
use h2::client::SendRequest;
|
||||||
|
@ -20,7 +23,94 @@ use super::{h1proto, h2proto};
|
||||||
|
|
||||||
pub(crate) enum ConnectionType<Io> {
|
pub(crate) enum ConnectionType<Io> {
|
||||||
H1(Io),
|
H1(Io),
|
||||||
H2(SendRequest<Bytes>),
|
H2(H2Connection),
|
||||||
|
}
|
||||||
|
|
||||||
|
// h2 connection has two parts: SendRequest and Connection.
|
||||||
|
// Connection is spawned as async task on runtime and H2Connection would hold a waker for
|
||||||
|
// this task. So it can wake up and quit the task when SendRequest is dropped.
|
||||||
|
// TODO: consider use actix_rt::task::JoinHandle for spawn task abort when v2 release.
|
||||||
|
pub(crate) struct H2Connection {
|
||||||
|
waker: Rc<LocalWaker>,
|
||||||
|
sender: SendRequest<Bytes>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl H2Connection {
|
||||||
|
pub(crate) fn new<Io>(
|
||||||
|
sender: SendRequest<Bytes>,
|
||||||
|
connection: h2::client::Connection<Io>,
|
||||||
|
) -> Self
|
||||||
|
where
|
||||||
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
{
|
||||||
|
let (connection, waker) = WakeupOnDrop::new(connection);
|
||||||
|
|
||||||
|
actix_rt::spawn(connection);
|
||||||
|
|
||||||
|
Self { waker, sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wake up waker when drop
|
||||||
|
impl Drop for H2Connection {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.waker.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// only expose sender type to public.
|
||||||
|
impl Deref for H2Connection {
|
||||||
|
type Target = SendRequest<Bytes>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DerefMut for H2Connection {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.sender
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrap a future and notify it to terminate based on the waker state.
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
struct WakeupOnDrop<Fut> {
|
||||||
|
waker: Rc<LocalWaker>,
|
||||||
|
#[pin]
|
||||||
|
fut: Fut,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Fut> WakeupOnDrop<Fut> {
|
||||||
|
fn new(fut: Fut) -> (Self, Rc<LocalWaker>) {
|
||||||
|
let waker = Rc::new(LocalWaker::new());
|
||||||
|
let this = WakeupOnDrop {
|
||||||
|
waker: waker.clone(),
|
||||||
|
fut,
|
||||||
|
};
|
||||||
|
|
||||||
|
(this, waker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Fut: Future> Future for WakeupOnDrop<Fut> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = self.project();
|
||||||
|
|
||||||
|
if Rc::strong_count(&this.waker) == 1 {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
|
||||||
|
this.waker.register(cx.waker());
|
||||||
|
|
||||||
|
if this.fut.poll(cx).is_ready() {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait Connection {
|
pub trait Connection {
|
||||||
|
|
|
@ -22,9 +22,10 @@ use super::config::ConnectorConfig;
|
||||||
use super::connection::{ConnectionType, IoConnection};
|
use super::connection::{ConnectionType, IoConnection};
|
||||||
use super::error::SendRequestError;
|
use super::error::SendRequestError;
|
||||||
use super::pool::Acquired;
|
use super::pool::Acquired;
|
||||||
|
use crate::client::connection::H2Connection;
|
||||||
|
|
||||||
pub(crate) async fn send_request<T, B>(
|
pub(crate) async fn send_request<T, B>(
|
||||||
mut io: SendRequest<Bytes>,
|
mut io: H2Connection,
|
||||||
head: RequestHeadType,
|
head: RequestHeadType,
|
||||||
body: B,
|
body: B,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
|
@ -173,7 +174,7 @@ async fn send_body<B: MessageBody>(
|
||||||
|
|
||||||
/// release SendRequest object
|
/// release SendRequest object
|
||||||
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
|
fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
|
||||||
io: SendRequest<Bytes>,
|
io: H2Connection,
|
||||||
pool: Option<Acquired<T>>,
|
pool: Option<Acquired<T>>,
|
||||||
created: time::Instant,
|
created: time::Instant,
|
||||||
close: bool,
|
close: bool,
|
||||||
|
|
|
@ -25,6 +25,7 @@ use super::connection::{ConnectionType, IoConnection};
|
||||||
use super::error::ConnectError;
|
use super::error::ConnectError;
|
||||||
use super::h2proto::handshake;
|
use super::h2proto::handshake;
|
||||||
use super::Connect;
|
use super::Connect;
|
||||||
|
use crate::client::connection::H2Connection;
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq)]
|
#[derive(Clone, Copy, PartialEq)]
|
||||||
/// Protocol version
|
/// Protocol version
|
||||||
|
@ -138,10 +139,9 @@ where
|
||||||
Some(guard.consume()),
|
Some(guard.consume()),
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
let (snd, connection) = handshake(io, &config).await?;
|
let (sender, connection) = handshake(io, &config).await?;
|
||||||
actix_rt::spawn(connection.map(|_| ()));
|
|
||||||
Ok(IoConnection::new(
|
Ok(IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(guard.consume()),
|
Some(guard.consume()),
|
||||||
))
|
))
|
||||||
|
@ -565,11 +565,10 @@ where
|
||||||
|
|
||||||
if let Some(ref mut h2) = this.h2 {
|
if let Some(ref mut h2) = this.h2 {
|
||||||
return match Pin::new(h2).poll(cx) {
|
return match Pin::new(h2).poll(cx) {
|
||||||
Poll::Ready(Ok((snd, connection))) => {
|
Poll::Ready(Ok((sender, connection))) => {
|
||||||
actix_rt::spawn(connection.map(|_| ()));
|
|
||||||
let rx = this.rx.take().unwrap();
|
let rx = this.rx.take().unwrap();
|
||||||
let _ = rx.send(Ok(IoConnection::new(
|
let _ = rx.send(Ok(IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||||
Instant::now(),
|
Instant::now(),
|
||||||
Some(Acquired(this.key.clone(), this.inner.take())),
|
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||||
)));
|
)));
|
||||||
|
|
Loading…
Reference in New Issue