add test for pool

This commit is contained in:
fakeshadow 2021-02-15 01:28:54 -08:00
parent 8986e99494
commit cd1b0dd37a
2 changed files with 189 additions and 0 deletions

View File

@ -141,6 +141,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
pub(crate) fn into_inner(self) -> (ConnectionType<T>, time::Instant) {
(self.io.unwrap(), self.created)
}
#[cfg(test)]
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
(self.io.unwrap(), self.created, self.pool.unwrap())
}
}
impl<T> Connection for IoConnection<T>

View File

@ -377,3 +377,187 @@ where
let _permit = &mut self.permit;
}
}
#[cfg(test)]
mod test {
use super::*;
use std::cell::Cell;
use std::io;
use http::Uri;
use crate::client::connection::IoConnection;
// A stream type always return pending on async read.
// mock a usable tcp stream that ready to be used as client
struct TestStream;
impl AsyncRead for TestStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Poll::Pending
}
}
impl AsyncWrite for TestStream {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<io::Result<usize>> {
unimplemented!()
}
fn poll_flush(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
unimplemented!()
}
fn poll_shutdown(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<io::Result<()>> {
unimplemented!()
}
}
struct TestPoolConnector {
generated: Rc<Cell<usize>>,
}
impl Service<Connect> for TestPoolConnector {
type Response = (TestStream, Protocol);
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unimplemented!("poll_ready is not used in test")
}
fn call(&self, _: Connect) -> Self::Future {
self.generated.set(self.generated.get() + 1);
Box::pin(async { Ok((TestStream, Protocol::Http1)) })
}
}
#[actix_rt::test]
async fn test_pool_limit() {
let connector = TestPoolConnector {
generated: Rc::new(Cell::new(0)),
};
let mut config = ConnectorConfig::default();
config.limit = 1;
let pool = super::ConnectionPool::new(connector, config);
let req = Connect {
uri: Uri::from_static("http://localhost"),
addr: None,
};
let conn = pool.call(req.clone()).await.unwrap();
let waiting = Rc::new(Cell::new(true));
let waiting_clone = waiting.clone();
actix_rt::spawn(async move {
actix_rt::time::sleep(Duration::from_millis(100)).await;
waiting_clone.set(false);
drop(conn);
});
assert!(waiting.get());
let now = Instant::now();
let conn = pool.call(req.clone()).await.unwrap();
drop(conn);
assert!(!waiting.get());
assert!(now.elapsed() >= Duration::from_millis(100));
}
#[actix_rt::test]
async fn test_pool_keep_alive() {
let generated = Rc::new(Cell::new(0));
let generated_clone = generated.clone();
let connector = TestPoolConnector { generated };
let mut config = ConnectorConfig::default();
config.conn_keep_alive = Duration::from_secs(1);
let pool = super::ConnectionPool::new(connector, config);
let req = Connect {
uri: Uri::from_static("http://localhost"),
addr: None,
};
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(1, generated_clone.get());
{
let (conn, created, mut acquired) = conn.into_parts();
acquired.release(IoConnection::new(conn, created, None));
}
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(1, generated_clone.get());
{
let (conn, created, mut acquired) = conn.into_parts();
acquired.release(IoConnection::new(conn, created, None));
}
actix_rt::time::sleep(Duration::from_millis(1500)).await;
actix_rt::task::yield_now().await;
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(2, generated_clone.get());
drop(conn);
}
#[actix_rt::test]
async fn test_pool_lifetime() {
let generated = Rc::new(Cell::new(0));
let generated_clone = generated.clone();
let connector = TestPoolConnector { generated };
let mut config = ConnectorConfig::default();
config.conn_lifetime = Duration::from_secs(1);
let pool = super::ConnectionPool::new(connector, config);
let req = Connect {
uri: Uri::from_static("http://localhost"),
addr: None,
};
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(1, generated_clone.get());
{
let (conn, created, mut acquired) = conn.into_parts();
acquired.release(IoConnection::new(conn, created, None));
}
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(1, generated_clone.get());
{
let (conn, created, mut acquired) = conn.into_parts();
acquired.release(IoConnection::new(conn, created, None));
}
actix_rt::time::sleep(Duration::from_millis(1500)).await;
actix_rt::task::yield_now().await;
let conn = pool.call(req.clone()).await.unwrap();
assert_eq!(2, generated_clone.get());
drop(conn);
}
}