From ced7b3a20ecf6d58c1b63fac5bd6b48804de03ae Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 6 Feb 2021 19:02:43 -0800 Subject: [PATCH] use join handle for cancel h2 connection. fix test --- actix-http/src/client/connection.rs | 112 +++++++--------------------- 1 file changed, 29 insertions(+), 83 deletions(-) diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 1fadd31ed..4c6a6dcb8 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -1,12 +1,11 @@ use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, io, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; -use actix_utils::task::LocalWaker; +use actix_rt::task::JoinHandle; use bytes::Bytes; use futures_core::future::LocalBoxFuture; use futures_util::future::{err, Either, FutureExt, Ready}; @@ -28,11 +27,10 @@ pub(crate) enum ConnectionType { } // h2 connection has two parts: SendRequest and Connection. -// Connection is spawned as async task on runtime and H2Connection would hold a waker for +// Connection is spawned as async task on runtime and H2Connection would hold a handle for // this task. So it can wake up and quit the task when SendRequest is dropped. -// TODO: consider use actix_rt::task::JoinHandle for spawn task abort when v2 release. pub(crate) struct H2Connection { - waker: Rc, + handle: JoinHandle<()>, sender: SendRequest, } @@ -44,18 +42,18 @@ impl H2Connection { where Io: AsyncRead + AsyncWrite + Unpin + 'static, { - let (connection, waker) = WakeupOnDrop::new(connection); + let handle = actix_rt::spawn(async move { + let _ = connection.await; + }); - actix_rt::spawn(connection); - - Self { waker, sender } + Self { handle, sender } } } // wake up waker when drop impl Drop for H2Connection { fn drop(&mut self) { - self.waker.wake(); + self.handle.abort(); } } @@ -74,46 +72,6 @@ impl DerefMut for H2Connection { } } -// wrap a future and notify it to terminate based on the waker state. -#[pin_project::pin_project] -struct WakeupOnDrop { - waker: Rc, - #[pin] - fut: Fut, -} - -impl WakeupOnDrop { - fn new(fut: Fut) -> (Self, Rc) { - let waker = Rc::new(LocalWaker::new()); - let this = WakeupOnDrop { - waker: waker.clone(), - fut, - }; - - (this, waker) - } -} - -impl Future for WakeupOnDrop { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if Rc::strong_count(&this.waker) == 1 { - return Poll::Ready(()); - } - - this.waker.register(cx.waker()); - - if this.fut.poll(cx).is_ready() { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - pub trait Connection { type Io: AsyncRead + AsyncWrite + Unpin; type Future: Future>; @@ -359,44 +317,32 @@ where #[cfg(test)] mod test { - use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }; + use std::net; + + use actix_rt::net::TcpStream; use super::*; #[actix_rt::test] - async fn test_manual_drop() { - let var = Arc::new(AtomicUsize::new(0)); + async fn test_h2_connection_drop() { + let addr = "127.0.0.1:0".parse::().unwrap(); + let listener = net::TcpListener::bind(addr).unwrap(); + let local = listener.local_addr().unwrap(); - let var_1 = var.clone(); - let fut = async move { - actix_rt::time::sleep(std::time::Duration::from_millis(500)).await; - var_1.fetch_add(1, Ordering::SeqCst); + std::thread::spawn(move || while listener.accept().is_ok() {}); + + let tcp = TcpStream::connect(local).await.unwrap(); + let (sender, connection) = h2::client::handshake(tcp).await.unwrap(); + let conn = H2Connection::new(sender.clone(), connection); + + assert!(sender.clone().ready().await.is_ok()); + assert!(h2::client::SendRequest::clone(&*conn).ready().await.is_ok()); + + drop(conn); + + match sender.ready().await { + Ok(_) => panic!("connection should be gone and can not be ready"), + Err(e) => assert!(e.is_io()), }; - let (fut, handle) = WakeupOnDrop::new(fut); - - actix_rt::spawn(fut); - - actix_rt::time::sleep(std::time::Duration::from_millis(100)).await; - - handle.wake(); - drop(handle); - - actix_rt::time::sleep(std::time::Duration::from_millis(500)).await; - assert_eq!(var.load(Ordering::SeqCst), 0); - - let var_1 = var.clone(); - let fut = async move { - actix_rt::time::sleep(std::time::Duration::from_millis(500)).await; - var_1.fetch_add(1, Ordering::SeqCst); - }; - - let (fut, _handle) = WakeupOnDrop::new(fut); - actix_rt::spawn(fut); - - actix_rt::time::sleep(std::time::Duration::from_millis(600)).await; - assert_eq!(var.load(Ordering::SeqCst), 1); } }