use join handle for cancel h2 connection. fix test

This commit is contained in:
fakeshadow 2021-02-06 19:02:43 -08:00
parent 927baba662
commit ced7b3a20e
1 changed files with 29 additions and 83 deletions

View File

@ -1,12 +1,11 @@
use std::future::Future; use std::future::Future;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, io, time}; use std::{fmt, io, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use actix_utils::task::LocalWaker; use actix_rt::task::JoinHandle;
use bytes::Bytes; use bytes::Bytes;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use futures_util::future::{err, Either, FutureExt, Ready}; use futures_util::future::{err, Either, FutureExt, Ready};
@ -28,11 +27,10 @@ pub(crate) enum ConnectionType<Io> {
} }
// h2 connection has two parts: SendRequest and Connection. // h2 connection has two parts: SendRequest and Connection.
// Connection is spawned as async task on runtime and H2Connection would hold a waker for // Connection is spawned as async task on runtime and H2Connection would hold a handle for
// this task. So it can wake up and quit the task when SendRequest is dropped. // this task. So it can wake up and quit the task when SendRequest is dropped.
// TODO: consider use actix_rt::task::JoinHandle for spawn task abort when v2 release.
pub(crate) struct H2Connection { pub(crate) struct H2Connection {
waker: Rc<LocalWaker>, handle: JoinHandle<()>,
sender: SendRequest<Bytes>, sender: SendRequest<Bytes>,
} }
@ -44,18 +42,18 @@ impl H2Connection {
where where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
let (connection, waker) = WakeupOnDrop::new(connection); let handle = actix_rt::spawn(async move {
let _ = connection.await;
});
actix_rt::spawn(connection); Self { handle, sender }
Self { waker, sender }
} }
} }
// wake up waker when drop // wake up waker when drop
impl Drop for H2Connection { impl Drop for H2Connection {
fn drop(&mut self) { fn drop(&mut self) {
self.waker.wake(); self.handle.abort();
} }
} }
@ -74,46 +72,6 @@ impl DerefMut for H2Connection {
} }
} }
// wrap a future and notify it to terminate based on the waker state.
#[pin_project::pin_project]
struct WakeupOnDrop<Fut> {
waker: Rc<LocalWaker>,
#[pin]
fut: Fut,
}
impl<Fut> WakeupOnDrop<Fut> {
fn new(fut: Fut) -> (Self, Rc<LocalWaker>) {
let waker = Rc::new(LocalWaker::new());
let this = WakeupOnDrop {
waker: waker.clone(),
fut,
};
(this, waker)
}
}
impl<Fut: Future> Future for WakeupOnDrop<Fut> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if Rc::strong_count(&this.waker) == 1 {
return Poll::Ready(());
}
this.waker.register(cx.waker());
if this.fut.poll(cx).is_ready() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
pub trait Connection { pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin; type Io: AsyncRead + AsyncWrite + Unpin;
type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>; type Future: Future<Output = Result<(ResponseHead, Payload), SendRequestError>>;
@ -359,44 +317,32 @@ where
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::sync::{ use std::net;
atomic::{AtomicUsize, Ordering},
Arc, use actix_rt::net::TcpStream;
};
use super::*; use super::*;
#[actix_rt::test] #[actix_rt::test]
async fn test_manual_drop() { async fn test_h2_connection_drop() {
let var = Arc::new(AtomicUsize::new(0)); let addr = "127.0.0.1:0".parse::<net::SocketAddr>().unwrap();
let listener = net::TcpListener::bind(addr).unwrap();
let local = listener.local_addr().unwrap();
let var_1 = var.clone(); std::thread::spawn(move || while listener.accept().is_ok() {});
let fut = async move {
actix_rt::time::sleep(std::time::Duration::from_millis(500)).await; let tcp = TcpStream::connect(local).await.unwrap();
var_1.fetch_add(1, Ordering::SeqCst); let (sender, connection) = h2::client::handshake(tcp).await.unwrap();
let conn = H2Connection::new(sender.clone(), connection);
assert!(sender.clone().ready().await.is_ok());
assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok());
drop(conn);
match sender.ready().await {
Ok(_) => panic!("connection should be gone and can not be ready"),
Err(e) => assert!(e.is_io()),
}; };
let (fut, handle) = WakeupOnDrop::new(fut);
actix_rt::spawn(fut);
actix_rt::time::sleep(std::time::Duration::from_millis(100)).await;
handle.wake();
drop(handle);
actix_rt::time::sleep(std::time::Duration::from_millis(500)).await;
assert_eq!(var.load(Ordering::SeqCst), 0);
let var_1 = var.clone();
let fut = async move {
actix_rt::time::sleep(std::time::Duration::from_millis(500)).await;
var_1.fetch_add(1, Ordering::SeqCst);
};
let (fut, _handle) = WakeupOnDrop::new(fut);
actix_rt::spawn(fut);
actix_rt::time::sleep(std::time::Duration::from_millis(600)).await;
assert_eq!(var.load(Ordering::SeqCst), 1);
} }
} }