diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index c513310b9..0b2c6e1df 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -141,6 +141,11 @@ impl IoConnection { pub(crate) fn into_inner(self) -> (ConnectionType, time::Instant) { (self.io.unwrap(), self.created) } + + #[cfg(test)] + pub(crate) fn into_parts(self) -> (ConnectionType, time::Instant, Acquired) { + (self.io.unwrap(), self.created, self.pool.unwrap()) + } } impl Connection for IoConnection diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 01a7535e3..5e1451c6c 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -377,3 +377,187 @@ where let _permit = &mut self.permit; } } + +#[cfg(test)] +mod test { + use super::*; + + use std::cell::Cell; + use std::io; + + use http::Uri; + + use crate::client::connection::IoConnection; + + // A stream type always return pending on async read. + // mock a usable tcp stream that ready to be used as client + struct TestStream; + + impl AsyncRead for TestStream { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut ReadBuf<'_>, + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for TestStream { + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &[u8], + ) -> Poll> { + unimplemented!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + } + + struct TestPoolConnector { + generated: Rc>, + } + + impl Service for TestPoolConnector { + type Response = (TestStream, Protocol); + type Error = ConnectError; + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + unimplemented!("poll_ready is not used in test") + } + + fn call(&self, _: Connect) -> Self::Future { + self.generated.set(self.generated.get() + 1); + Box::pin(async { Ok((TestStream, Protocol::Http1)) }) + } + } + + #[actix_rt::test] + async fn test_pool_limit() { + let connector = TestPoolConnector { + generated: Rc::new(Cell::new(0)), + }; + + let mut config = ConnectorConfig::default(); + config.limit = 1; + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + + let waiting = Rc::new(Cell::new(true)); + + let waiting_clone = waiting.clone(); + actix_rt::spawn(async move { + actix_rt::time::sleep(Duration::from_millis(100)).await; + waiting_clone.set(false); + drop(conn); + }); + + assert!(waiting.get()); + + let now = Instant::now(); + let conn = pool.call(req.clone()).await.unwrap(); + + drop(conn); + assert!(!waiting.get()); + assert!(now.elapsed() >= Duration::from_millis(100)); + } + + #[actix_rt::test] + async fn test_pool_keep_alive() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let mut config = ConnectorConfig::default(); + config.conn_keep_alive = Duration::from_secs(1); + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + { + let (conn, created, mut acquired) = conn.into_parts(); + acquired.release(IoConnection::new(conn, created, None)); + } + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + { + let (conn, created, mut acquired) = conn.into_parts(); + acquired.release(IoConnection::new(conn, created, None)); + } + + actix_rt::time::sleep(Duration::from_millis(1500)).await; + actix_rt::task::yield_now().await; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(2, generated_clone.get()); + drop(conn); + } + + #[actix_rt::test] + async fn test_pool_lifetime() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let mut config = ConnectorConfig::default(); + config.conn_lifetime = Duration::from_secs(1); + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + { + let (conn, created, mut acquired) = conn.into_parts(); + acquired.release(IoConnection::new(conn, created, None)); + } + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + { + let (conn, created, mut acquired) = conn.into_parts(); + acquired.release(IoConnection::new(conn, created, None)); + } + + actix_rt::time::sleep(Duration::from_millis(1500)).await; + actix_rt::task::yield_now().await; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(2, generated_clone.get()); + drop(conn); + } +}