From ec6d284a8e393d8c8f5bf60eece49abbab62658f Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Sun, 31 Oct 2021 16:19:21 +0300 Subject: [PATCH 1/3] improve "data no configured" message (#2429) --- src/data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/data.rs b/src/data.rs index 7e01d3462..d27ad196b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -137,7 +137,7 @@ impl FromRequest for Data { type_name::(), ); err(ErrorInternalServerError( - "App data is not configured, to configure use App::data()", + "App data is not configured, to configure construct it with web::Data::new() and pass it to App::app_data()", )) } } From 6ec2d7b90966bc55b72ee4c90129074742d087bc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Nov 2021 23:15:23 +0800 Subject: [PATCH 2/3] add keep alive to h2 through ping pong (#2433) --- actix-http/src/h2/dispatcher.rs | 147 ++++++++++++++++++-------- actix-http/tests/test_h2_ping_pong.rs | 77 ++++++++++++++ 2 files changed, 180 insertions(+), 44 deletions(-) create mode 100644 actix-http/tests/test_h2_ping_pong.rs diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index ea149b1e0..7326dfff1 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -10,11 +10,15 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite}; +use actix_rt::time::Sleep; use actix_service::Service; use actix_utils::future::poll_fn; use bytes::{Bytes, BytesMut}; use futures_core::ready; -use h2::server::{Connection, SendResponse}; +use h2::{ + server::{Connection, SendResponse}, + Ping, PingPong, +}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; use pin_project_lite::pin_project; @@ -36,29 +40,46 @@ pin_project! { on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, - _phantom: PhantomData, + ping_pong: Option, + _phantom: PhantomData } } -impl Dispatcher { +impl Dispatcher +where + T: AsyncRead + AsyncWrite + Unpin, +{ pub(crate) fn new( flow: Rc>, - connection: Connection, + mut connection: Connection, on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, ) -> Self { + let ping_pong = config.keep_alive_timer().map(|timer| H2PingPong { + timer: Box::pin(timer), + on_flight: false, + ping_pong: connection.ping_pong().unwrap(), + }); + Self { flow, config, peer_addr, connection, on_connect_data, + ping_pong, _phantom: PhantomData, } } } +struct H2PingPong { + timer: Pin>, + on_flight: bool, + ping_pong: PingPong, +} + impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -77,54 +98,92 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - while let Some((req, tx)) = - ready!(Pin::new(&mut this.connection).poll_accept(cx)?) - { - let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::::H2(pl); - let mut req = Request::with_payload(pl); + loop { + match Pin::new(&mut this.connection).poll_accept(cx)? { + Poll::Ready(Some((req, tx))) => { + let (parts, body) = req.into_parts(); + let pl = crate::h2::Payload::new(body); + let pl = Payload::::H2(pl); + let mut req = Request::with_payload(pl); - let head = req.head_mut(); - head.uri = parts.uri; - head.method = parts.method; - head.version = parts.version; - head.headers = parts.headers.into(); - head.peer_addr = this.peer_addr; + let head = req.head_mut(); + head.uri = parts.uri; + head.method = parts.method; + head.version = parts.version; + head.headers = parts.headers.into(); + head.peer_addr = this.peer_addr; - // merge on_connect_ext data into request extensions - this.on_connect_data.merge_into(&mut req); + // merge on_connect_ext data into request extensions + this.on_connect_data.merge_into(&mut req); - let fut = this.flow.service.call(req); - let config = this.config.clone(); + let fut = this.flow.service.call(req); + let config = this.config.clone(); - // multiplex request handling with spawn task - actix_rt::spawn(async move { - // resolve service call and send response. - let res = match fut.await { - Ok(res) => handle_response(res.into(), tx, config).await, - Err(err) => { - let res: Response = err.into(); - handle_response(res, tx, config).await - } - }; + // multiplex request handling with spawn task + actix_rt::spawn(async move { + // resolve service call and send response. + let res = match fut.await { + Ok(res) => handle_response(res.into(), tx, config).await, + Err(err) => { + let res: Response = err.into(); + handle_response(res, tx, config).await + } + }; - // log error. - if let Err(err) = res { - match err { - DispatchError::SendResponse(err) => { - trace!("Error sending HTTP/2 response: {:?}", err) + // log error. + if let Err(err) = res { + match err { + DispatchError::SendResponse(err) => { + trace!("Error sending HTTP/2 response: {:?}", err) + } + DispatchError::SendData(err) => warn!("{:?}", err), + DispatchError::ResponseBody(err) => { + error!("Response payload stream error: {:?}", err) + } + } } - DispatchError::SendData(err) => warn!("{:?}", err), - DispatchError::ResponseBody(err) => { - error!("Response payload stream error: {:?}", err) - } - } + }); } - }); - } + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => match this.ping_pong.as_mut() { + Some(ping_pong) => loop { + if ping_pong.on_flight { + // When have on flight ping pong. poll pong and and keep alive timer. + // on success pong received update keep alive timer to determine the next timing of + // ping pong. + match ping_pong.ping_pong.poll_pong(cx)? { + Poll::Ready(_) => { + ping_pong.on_flight = false; - Poll::Ready(Ok(())) + let dead_line = + this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + } + Poll::Pending => { + return ping_pong + .timer + .as_mut() + .poll(cx) + .map(|_| Ok(())) + } + } + } else { + // When there is no on flight ping pong. keep alive timer is used to wait for next + // timing of ping pong. Therefore at this point it serves as an interval instead. + ready!(ping_pong.timer.as_mut().poll(cx)); + + ping_pong.ping_pong.send_ping(Ping::opaque())?; + + let dead_line = this.config.keep_alive_expire().unwrap(); + ping_pong.timer.as_mut().reset(dead_line); + + ping_pong.on_flight = true; + } + }, + None => return Poll::Pending, + }, + } + } } } diff --git a/actix-http/tests/test_h2_ping_pong.rs b/actix-http/tests/test_h2_ping_pong.rs new file mode 100644 index 000000000..5e03785a1 --- /dev/null +++ b/actix-http/tests/test_h2_ping_pong.rs @@ -0,0 +1,77 @@ +use std::io; + +use actix_http::{error::Error, HttpService, Response}; +use actix_server::Server; + +#[actix_rt::test] +async fn h2_ping_pong() -> io::Result<()> { + let (tx, rx) = std::sync::mpsc::sync_channel(1); + + let lst = std::net::TcpListener::bind("127.0.0.1:0")?; + + let addr = lst.local_addr().unwrap(); + + let join = std::thread::spawn(move || { + actix_rt::System::new().block_on(async move { + let handle = Server::build() + .disable_signals() + .workers(1) + .listen("h2_ping_pong", lst, || { + HttpService::build() + .keep_alive(3) + .h2(|_| async { Ok::<_, Error>(Response::ok()) }) + .tcp() + })? + .run(); + + tx.send(handle.clone()).unwrap(); + + handle.await + }) + }); + + let handle = rx.recv().unwrap(); + + let (sync_tx, rx) = std::sync::mpsc::sync_channel(1); + + // use a separate thread for h2 client so it can be blocked. + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + let stream = tokio::net::TcpStream::connect(addr).await.unwrap(); + + let (mut tx, conn) = h2::client::handshake(stream).await.unwrap(); + + tokio::spawn(async move { conn.await.unwrap() }); + + let (res, _) = tx.send_request(::http::Request::new(()), true).unwrap(); + let res = res.await.unwrap(); + + assert_eq!(res.status().as_u16(), 200); + + sync_tx.send(()).unwrap(); + + // intentionally block the client thread so it can not answer ping pong. + std::thread::sleep(std::time::Duration::from_secs(1000)); + }) + }); + + rx.recv().unwrap(); + + let now = std::time::Instant::now(); + + // stop server gracefully. this step would take up to 30 seconds. + handle.stop(true).await; + + // join server thread. only when connection are all gone this step would finish. + join.join().unwrap()?; + + // check the time used for join server thread so it's known that the server shutdown + // is from keep alive and not server graceful shutdown timeout. + assert!(now.elapsed() < std::time::Duration::from_secs(30)); + + Ok(()) +} From 5e554dca35844c241e813663da09e20575d586d3 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Nov 2021 23:57:55 +0800 Subject: [PATCH 3/3] fix awc clippy warning (#2431) --- awc/src/client/pool.rs | 3 ++- awc/tests/test_client.rs | 14 ++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/awc/src/client/pool.rs b/awc/src/client/pool.rs index 229c6324a..9d130412b 100644 --- a/awc/src/client/pool.rs +++ b/awc/src/client/pool.rs @@ -19,6 +19,7 @@ use actix_rt::time::{sleep, Sleep}; use actix_service::Service; use ahash::AHashMap; use futures_core::future::LocalBoxFuture; +use futures_util::FutureExt; use http::uri::Authority; use pin_project_lite::pin_project; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; @@ -201,7 +202,7 @@ where // check if the connection is still usable if let ConnectionInnerType::H1(ref mut io) = c.conn { let check = ConnectionCheckFuture { io }; - match check.await { + match check.now_or_never().expect("ConnectionCheckFuture must never yield with Poll::Pending.") { ConnectionState::Tainted => { inner.close(c.conn); continue; diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index 615789fb3..a0af0cab6 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -795,17 +795,15 @@ async fn client_unread_response() { let lst = std::net::TcpListener::bind(addr).unwrap(); std::thread::spawn(move || { - for stream in lst.incoming() { - let mut stream = stream.unwrap(); - let mut b = [0; 1000]; - let _ = stream.read(&mut b).unwrap(); - let _ = stream.write_all( - b"HTTP/1.1 200 OK\r\n\ + let (mut stream, _) = lst.accept().unwrap(); + let mut b = [0; 1000]; + let _ = stream.read(&mut b).unwrap(); + let _ = stream.write_all( + b"HTTP/1.1 200 OK\r\n\ connection: close\r\n\ \r\n\ welcome!", - ); - } + ); }); // client request