From d8a9606162779cb7460f5cb149b8e66aeaf877bd Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Wed, 4 Apr 2018 16:39:01 -0700
Subject: [PATCH] add connection limits to pool

---
 .travis.yml             |   2 +-
 src/client/connector.rs | 697 +++++++++++++++++++++++++++++++---------
 src/client/pipeline.rs  |   8 +
 3 files changed, 549 insertions(+), 158 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 8810ee72c..f27a445ad 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -87,7 +87,7 @@ after_success:
     fi
 
   - |
-    if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then
+    if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_RUST_VERSION" == "nightly" ]]; then
       bash <(curl https://raw.githubusercontent.com/xd009642/tarpaulin/master/travis-install.sh)
       USE_SKEPTIC=1 cargo tarpaulin --out Xml
       bash <(curl -s https://codecov.io/bash)
diff --git a/src/client/connector.rs b/src/client/connector.rs
index 8f2828935..4f14a9e27 100644
--- a/src/client/connector.rs
+++ b/src/client/connector.rs
@@ -1,4 +1,4 @@
-use std::{fmt, io, time};
+use std::{fmt, mem, io, time};
 use std::cell::RefCell;
 use std::rc::Rc;
 use std::net::Shutdown;
@@ -6,28 +6,26 @@ use std::time::{Duration, Instant};
 use std::collections::{HashMap, VecDeque};
 
 use actix::{fut, Actor, ActorFuture, Context, AsyncContext,
-            Handler, Message, ActorResponse, Supervised};
+            Handler, Message, ActorResponse, Supervised, ContextFutureSpawner};
 use actix::registry::ArbiterService;
 use actix::fut::WrapFuture;
 use actix::actors::{Connector, ConnectorError, Connect as ResolveConnect};
 
 use http::{Uri, HttpTryFrom, Error as HttpError};
-use futures::{Async, Poll};
+use futures::{Async, Future, Poll};
+use futures::task::{Task, current as current_task};
+use futures::unsync::oneshot;
 use tokio_io::{AsyncRead, AsyncWrite};
 
 #[cfg(feature="alpn")]
 use openssl::ssl::{SslMethod, SslConnector, Error as OpensslError};
 #[cfg(feature="alpn")]
 use tokio_openssl::SslConnectorExt;
-#[cfg(feature="alpn")]
-use futures::Future;
 
 #[cfg(all(feature="tls", not(feature="alpn")))]
 use native_tls::{TlsConnector, Error as TlsError};
 #[cfg(all(feature="tls", not(feature="alpn")))]
 use tokio_tls::TlsConnectorExt;
-#[cfg(all(feature="tls", not(feature="alpn")))]
-use futures::Future;
 
 use {HAS_OPENSSL, HAS_TLS};
 use server::IoStream;
@@ -102,19 +100,37 @@ impl From<ConnectorError> for ClientConnectorError {
     }
 }
 
+struct Waiter {
+    tx: oneshot::Sender<Result<Connection, ClientConnectorError>>,
+    conn_timeout: Duration,
+}
+
+/// `ClientConnector` type is responsible for transport layer of a client connection
+/// of a client connection.
 pub struct ClientConnector {
     #[cfg(all(feature="alpn"))]
     connector: SslConnector,
     #[cfg(all(feature="tls", not(feature="alpn")))]
     connector: TlsConnector,
+
     pool: Rc<Pool>,
+    conn_lifetime: Duration,
+    conn_keep_alive: Duration,
+    limit: usize,
+    limit_per_host: usize,
+    acquired: usize,
+    acquired_per_host: HashMap<Key, usize>,
+    available: HashMap<Key, VecDeque<Conn>>,
+    to_close: Vec<Connection>,
+    waiters: HashMap<Key, VecDeque<Waiter>>,
 }
 
 impl Actor for ClientConnector {
     type Context = Context<ClientConnector>;
 
     fn started(&mut self, ctx: &mut Self::Context) {
-        self.collect(ctx);
+        self.collect_periodic(ctx);
+        ctx.spawn(Maintenance);
     }
 }
 
@@ -127,22 +143,38 @@ impl Default for ClientConnector {
         #[cfg(all(feature="alpn"))]
         {
             let builder = SslConnector::builder(SslMethod::tls()).unwrap();
-            ClientConnector {
-                connector: builder.build(),
-                pool: Rc::new(Pool::new()),
-            }
+            ClientConnector::with_connector(builder.build())
         }
         #[cfg(all(feature="tls", not(feature="alpn")))]
         {
             let builder = TlsConnector::builder().unwrap();
             ClientConnector {
-                connector: builder.build().unwrap(),
                 pool: Rc::new(Pool::new()),
+                connector: builder.build().unwrap(),
+                conn_lifetime: Duration::from_secs(15),
+                conn_keep_alive: Duration::from_secs(75),
+                limit: 100,
+                limit_per_host: 0,
+                acquired: 0,
+                acquired_per_host: HashMap::new(),
+                available: HashMap::new(),
+                to_close: Vec::new(),
+                waiters: HashMap::new(),
             }
         }
 
         #[cfg(not(any(feature="alpn", feature="tls")))]
-        ClientConnector {pool: Rc::new(Pool::new())}
+        ClientConnector {pool: Rc::new(Pool::new()),
+                         conn_lifetime: Duration::from_secs(15),
+                         conn_keep_alive: Duration::from_secs(75),
+                         limit: 100,
+                         limit_per_host: 0,
+                         acquired: 0,
+                         acquired_per_host: HashMap::new(),
+                         available: HashMap::new(),
+                         to_close: Vec::new(),
+                         waiters: HashMap::new(),
+        }
     }
 }
 
@@ -192,12 +224,200 @@ impl ClientConnector {
     /// }
     /// ```
     pub fn with_connector(connector: SslConnector) -> ClientConnector {
-        ClientConnector { connector, pool: Rc::new(Pool::new()) }
+        ClientConnector {
+            connector,
+            pool: Rc::new(Pool::new()),
+            conn_lifetime: Duration::from_secs(15),
+            conn_keep_alive: Duration::from_secs(75),
+            limit: 100,
+            limit_per_host: 0,
+            acquired: 0,
+            acquired_per_host: HashMap::new(),
+            available: HashMap::new(),
+            to_close: Vec::new(),
+            waiters: HashMap::new(),
+        }
     }
 
-    fn collect(&mut self, ctx: &mut Context<Self>) {
-        self.pool.collect();
-        ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect(ctx));
+    /// Set total number of simultaneous connections.
+    ///
+    /// If limit is 0, the connector has no limit.
+    /// The default limit size is 100.
+    pub fn limit(mut self, limit: usize) -> Self {
+        self.limit = limit;
+        self
+    }
+
+    /// Set total number of simultaneous connections to the same endpoint.
+    ///
+    /// Endpoints are the same if they are have equal (host, port, ssl) triplet.
+    /// If limit is 0, the connector has no limit. The default limit size is 0.
+    pub fn limit_per_host(mut self, limit: usize) -> Self {
+        self.limit_per_host = limit;
+        self
+    }
+
+    /// Set keep-alive period for opened connection.
+    ///
+    /// Keep-alive period is period between connection usage.
+    /// if deley between connection usage exceeds this period
+    /// connection closes.
+    pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
+        self.conn_keep_alive = dur;
+        self
+    }
+
+    /// Set max lifetime period for connection.
+    ///
+    /// Connection lifetime is max lifetime of any opened connection
+    /// until it get closed regardless of keep-alive period.
+    pub fn conn_lifetime(mut self, dur: Duration) -> Self {
+        self.conn_lifetime = dur;
+        self
+    }
+
+    fn acquire(&mut self, key: &Key) -> Acquire {
+        // check limits
+        if self.limit > 0 {
+            if self.acquired >= self.limit {
+                return Acquire::NotAvailable
+            }
+            if self.limit_per_host > 0 {
+                if let Some(per_host) = self.acquired_per_host.get(key) {
+                    if self.limit_per_host >= *per_host {
+                        return Acquire::NotAvailable
+                    }
+                }
+            }
+        }
+        else if self.limit_per_host > 0 {
+            if let Some(per_host) = self.acquired_per_host.get(key) {
+                if self.limit_per_host >= *per_host {
+                    return Acquire::NotAvailable
+                }
+            }
+        }
+
+        self.reserve(key);
+
+        // check if open connection is available
+        // cleanup stale connections at the same time
+        if let Some(ref mut connections) = self.available.get_mut(key) {
+            let now = Instant::now();
+            while let Some(conn) = connections.pop_back() {
+                // check if it still usable
+                if (now - conn.0) > self.conn_keep_alive
+                    || (now - conn.1.ts) > self.conn_lifetime
+                {
+                    self.to_close.push(conn.1);
+                } else {
+                    let mut conn = conn.1;
+                    let mut buf = [0; 2];
+                    match conn.stream().read(&mut buf) {
+                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
+                        Ok(n) if n > 0 => {
+                            self.to_close.push(conn);
+                            continue
+                        },
+                        Ok(_) | Err(_) => continue,
+                    }
+                    return Acquire::Acquired(conn)
+                }
+            }
+        }
+        Acquire::Available
+    }
+
+    fn reserve(&mut self, key: &Key) {
+        self.acquired += 1;
+        let per_host =
+            if let Some(per_host) = self.acquired_per_host.get(key) {
+                *per_host
+            } else {
+                0
+            };
+        self.acquired_per_host.insert(key.clone(), per_host + 1);
+    }
+
+    fn release_key(&mut self, key: &Key) {
+        self.acquired -= 1;
+        let per_host =
+            if let Some(per_host) = self.acquired_per_host.get(key) {
+                *per_host
+            } else {
+                return
+            };
+        if per_host > 1 {
+            self.acquired_per_host.insert(key.clone(), per_host - 1);
+        } else {
+            self.acquired_per_host.remove(key);
+        }
+    }
+
+    fn collect(&mut self, periodic: bool) {
+        let now = Instant::now();
+
+        // collect half acquire keys
+        if let Some(keys) = self.pool.collect_keys() {
+            for key in keys {
+                self.release_key(&key);
+            }
+        }
+
+        // collect connections for close
+        if let Some(to_close) = self.pool.collect_close() {
+            for conn in to_close {
+                self.release_key(&conn.key);
+                self.to_close.push(conn);
+            }
+        }
+
+        // connection connections
+        if let Some(to_release) = self.pool.collect_release() {
+            for conn in to_release {
+                self.release_key(&conn.key);
+
+                // check connection lifetime and the return to available pool
+                if (now - conn.ts) < self.conn_lifetime {
+                    self.available.entry(conn.key.clone())
+                        .or_insert_with(VecDeque::new)
+                        .push_back(Conn(Instant::now(), conn));
+                }
+            }
+        }
+
+        // check keep-alive
+        for conns in self.available.values_mut() {
+            while !conns.is_empty() {
+                if (now > conns[0].0) && (now - conns[0].0) > self.conn_keep_alive
+                    || (now - conns[0].1.ts) > self.conn_lifetime
+                {
+                    let conn = conns.pop_front().unwrap().1;
+                    self.to_close.push(conn);
+                } else {
+                    break
+                }
+            }
+        }
+
+        // check connections for shutdown
+        if periodic {
+            let mut idx = 0;
+            while idx < self.to_close.len() {
+                match AsyncWrite::shutdown(&mut self.to_close[idx]) {
+                    Ok(Async::NotReady) => idx += 1,
+                    _ => {
+                        self.to_close.swap_remove(idx);
+                    },
+                }
+            }
+        }
+    }
+
+    fn collect_periodic(&mut self, ctx: &mut Context<Self>) {
+        self.collect(true);
+        // re-schedule next collect period
+        ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx));
     }
 }
 
@@ -205,6 +425,8 @@ impl Handler<Connect> for ClientConnector {
     type Result = ActorResponse<ClientConnector, Connection, ClientConnectorError>;
 
     fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
+        self.collect(false);
+
         let uri = &msg.uri;
         let conn_timeout = msg.conn_timeout;
 
@@ -227,76 +449,244 @@ impl Handler<Connect> for ClientConnector {
             return ActorResponse::reply(Err(ClientConnectorError::SslIsNotSupported))
         }
 
+        // check if pool has task reference
+        if self.pool.task.borrow().is_none() {
+            *self.pool.task.borrow_mut() = Some(current_task());
+        }
+        
         let host = uri.host().unwrap().to_owned();
         let port = uri.port().unwrap_or_else(|| proto.port());
         let key = Key {host, port, ssl: proto.is_secure()};
 
+        // acquire connection
         let pool = if proto.is_http() {
-            if let Some(mut conn) = self.pool.query(&key) {
-                conn.pool = Some(self.pool.clone());
-                return ActorResponse::async(fut::ok(conn))
-            } else {
-                Some(Rc::clone(&self.pool))
+            match self.acquire(&key) {
+                Acquire::Acquired(mut conn) => {
+                    // use existing connection
+                    conn.pool = Some(AcquiredConn(key, Some(Rc::clone(&self.pool))));
+                    return ActorResponse::async(fut::ok(conn))
+                },
+                Acquire::NotAvailable => {
+                    // connection is not available, wait
+                    let (tx, rx) = oneshot::channel();
+                    let waiter = Waiter{ tx, conn_timeout };
+                    self.waiters.entry(key.clone()).or_insert_with(VecDeque::new)
+                        .push_back(waiter);
+                    return ActorResponse::async(
+                        rx.map_err(|_| ClientConnectorError::Disconnected)
+                            .into_actor(self)
+                            .and_then(|res, _, _| match res {
+                                Ok(conn) => fut::ok(conn),
+                                Err(err) => fut::err(err),
+                            }));
+                }
+                Acquire::Available => {
+                    Some(Rc::clone(&self.pool))
+                },
             }
         } else {
             None
         };
+        let conn = AcquiredConn(key, pool);
 
-        ActorResponse::async(
-            Connector::from_registry()
-                .send(ResolveConnect::host_and_port(&key.host, port)
-                      .timeout(conn_timeout))
-                .into_actor(self)
-                .map_err(|_, _, _| ClientConnectorError::Disconnected)
-                .and_then(move |res, _act, _| {
-                    #[cfg(feature="alpn")]
-                    match res {
-                        Err(err) => fut::Either::B(fut::err(err.into())),
-                        Ok(stream) => {
-                            if proto.is_secure() {
-                                fut::Either::A(
-                                    _act.connector.connect_async(&key.host, stream)
-                                        .map_err(ClientConnectorError::SslError)
-                                        .map(|stream| Connection::new(
-                                            key, pool, Box::new(stream)))
-                                        .into_actor(_act))
-                            } else {
-                                fut::Either::B(fut::ok(
-                                    Connection::new(key, pool, Box::new(stream))))
-                            }
+{
+    ActorResponse::async(
+        Connector::from_registry()
+            .send(ResolveConnect::host_and_port(&conn.0.host, port)
+                  .timeout(conn_timeout))
+            .into_actor(self)
+            .map_err(|_, _, _| ClientConnectorError::Disconnected)
+            .and_then(move |res, _act, _| {
+                #[cfg(feature="alpn")]
+                match res {
+                    Err(err) => fut::Either::B(fut::err(err.into())),
+                    Ok(stream) => {
+                        if proto.is_secure() {
+                            fut::Either::A(
+                                _act.connector.connect_async(&conn.0.host, stream)
+                                    .map_err(ClientConnectorError::SslError)
+                                    .map(|stream| Connection::new(
+                                        conn.0.clone(), Some(conn), Box::new(stream)))
+                                    .into_actor(_act))
+                        } else {
+                            fut::Either::B(fut::ok(
+                                Connection::new(
+                                    conn.0.clone(), Some(conn), Box::new(stream))))
                         }
                     }
+                }
 
-                    #[cfg(all(feature="tls", not(feature="alpn")))]
-                    match res {
-                        Err(err) => fut::Either::B(fut::err(err.into())),
-                        Ok(stream) => {
-                            if proto.is_secure() {
-                                fut::Either::A(
-                                    _act.connector.connect_async(&key.host, stream)
-                                        .map_err(ClientConnectorError::SslError)
-                                        .map(|stream| Connection::new(
-                                            key, pool, Box::new(stream)))
-                                        .into_actor(_act))
-                            } else {
-                                fut::Either::B(fut::ok(
-                                    Connection::new(key, pool, Box::new(stream))))
-                            }
+                #[cfg(all(feature="tls", not(feature="alpn")))]
+                match res {
+                    Err(err) => fut::Either::B(fut::err(err.into())),
+                    Ok(stream) => {
+                        if proto.is_secure() {
+                            fut::Either::A(
+                                _act.connector.connect_async(&conn.0.host, stream)
+                                    .map_err(ClientConnectorError::SslError)
+                                    .map(|stream| Connection::new(
+                                        conn.0.clone(), Some(conn), Box::new(stream)))
+                                    .into_actor(_act))
+                        } else {
+                            fut::Either::B(fut::ok(
+                                Connection::new(
+                                    conn.0.clone(), Some(conn), Box::new(stream))))
                         }
                     }
+                }
 
-                    #[cfg(not(any(feature="alpn", feature="tls")))]
-                    match res {
-                        Err(err) => fut::err(err.into()),
-                        Ok(stream) => {
-                            if proto.is_secure() {
-                                fut::err(ClientConnectorError::SslIsNotSupported)
-                            } else {
-                                fut::ok(Connection::new(key, pool, Box::new(stream)))
-                            }
+                #[cfg(not(any(feature="alpn", feature="tls")))]
+                match res {
+                    Err(err) => fut::err(err.into()),
+                    Ok(stream) => {
+                        if proto.is_secure() {
+                            fut::err(ClientConnectorError::SslIsNotSupported)
+                        } else {
+                            fut::ok(Connection::new(
+                                conn.0.clone(), Some(conn), Box::new(stream)))
                         }
                     }
-                }))
+                }
+            }))
+}
+    }
+}
+
+struct Maintenance;
+
+impl fut::ActorFuture for Maintenance
+{
+    type Item = ();
+    type Error = ();
+    type Actor = ClientConnector;
+
+    fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>)
+            -> Poll<Self::Item, Self::Error>
+    {
+        // collecto connections
+        act.collect(false);
+
+        // check waiters
+        let tmp: &mut ClientConnector = unsafe{mem::transmute(act as &mut _)};
+
+        for (key, waiters) in &mut tmp.waiters {
+            while let Some(waiter) = waiters.pop_front() {
+                if waiter.tx.is_canceled() { continue }
+
+                match act.acquire(key) {
+                    Acquire::Acquired(mut conn) => {
+                        // use existing connection
+                        conn.pool = Some(
+                            AcquiredConn(key.clone(), Some(Rc::clone(&act.pool))));
+                        let _ = waiter.tx.send(Ok(conn));
+                    },
+                    Acquire::NotAvailable => {
+                        waiters.push_front(waiter);
+                        break
+                    }
+                    Acquire::Available =>
+       {
+           let conn = AcquiredConn(key.clone(), Some(Rc::clone(&act.pool)));
+
+           fut::WrapFuture::<ClientConnector>::actfuture(
+               Connector::from_registry()
+                   .send(ResolveConnect::host_and_port(&conn.0.host, conn.0.port)
+                         .timeout(waiter.conn_timeout)))
+               .map_err(|_, _, _| ())
+               .and_then(move |res, _act, _| {
+                   #[cfg(feature="alpn")]
+                   match res {
+                       Err(err) => {
+                           let _ = waiter.tx.send(Err(err.into()));
+                           fut::Either::B(fut::err(()))
+                       },
+                       Ok(stream) => {
+                           if conn.0.ssl {
+                               fut::Either::A(
+                                   _act.connector.connect_async(&key.host, stream)
+                                       .then(move |res| {
+                                           match res {
+                                               Err(e) => {
+                                                   let _ = waiter.tx.send(Err(
+                                                       ClientConnectorError::SslError(e)));
+                                               },
+                                               Ok(stream) => {
+                                                   let _ = waiter.tx.send(Ok(
+                                                       Connection::new(
+                                                           conn.0.clone(),
+                                                           Some(conn), Box::new(stream))));
+                                               }
+                                           }
+                                           Ok(())
+                                       })
+                                       .actfuture())
+                           } else {
+                               let _ = waiter.tx.send(Ok(Connection::new(
+                                   conn.0.clone(), Some(conn), Box::new(stream))));
+                               fut::Either::B(fut::ok(()))
+                           }
+                       }
+                   }
+
+                   #[cfg(all(feature="tls", not(feature="alpn")))]
+                   match res {
+                       Err(err) => {
+                           let _ = waiter.tx.send(Err(err.into()));
+                           fut::Either::B(fut::err(()))
+                       },
+                       Ok(stream) => {
+                           if conn.0.ssl {
+                               fut::Either::A(
+                                   _act.connector.connect_async(&conn.0.host, stream)
+                                       .then(|res| {
+                                           match res {
+                                               Err(e) => {
+                                                   let _ = waiter.tx.send(Err(
+                                                       ClientConnectorError::SslError(e)));
+                                               },
+                                               Ok(stream) => {
+                                                   let _ = waiter.tx.send(Ok(
+                                                       Connection::new(
+                                                           conn.0.clone(), Some(conn),
+                                                           Box::new(stream))));
+                                               }
+                                           }
+                                           Ok(())
+                                       })
+                                       .into_actor(_act))
+                           } else {
+                               let _ = waiter.tx.send(Ok(Connection::new(
+                                   conn.0.clone(), Some(conn), Box::new(stream))));
+                               fut::Either::B(fut::ok(()))
+                           }
+                       }
+                   }
+
+                   #[cfg(not(any(feature="alpn", feature="tls")))]
+                   match res {
+                       Err(err) => {
+                           let _ = waiter.tx.send(Err(err.into()));
+                           fut::err(())
+                       },
+                       Ok(stream) => {
+                           if conn.0.ssl {
+                               let _ = waiter.tx.send(
+                                   Err(ClientConnectorError::SslIsNotSupported));
+                           } else {
+                               let _ = waiter.tx.send(Ok(Connection::new(
+                                   conn.0.clone(), Some(conn), Box::new(stream))));
+                           };
+                           fut::ok(())
+                       },
+                   }
+               })
+               .spawn(ctx);
+       }
+                }
+            }
+        }
+
+        Ok(Async::NotReady)
     }
 }
 
@@ -357,104 +747,94 @@ impl Key {
 #[derive(Debug)]
 struct Conn(Instant, Connection);
 
+enum Acquire {
+    Acquired(Connection),
+    Available,
+    NotAvailable,
+}
+
+struct AcquiredConn(Key, Option<Rc<Pool>>);
+
+impl AcquiredConn {
+    fn close(&mut self, conn: Connection) {
+        if let Some(pool) = self.1.take() {
+            pool.close(conn);
+        }
+    }
+    fn release(&mut self, conn: Connection) {
+        if let Some(pool) = self.1.take() {
+            pool.release(conn);
+        }
+    }
+}
+
+impl Drop for AcquiredConn {
+    fn drop(&mut self) {
+        if let Some(pool) = self.1.take() {
+            pool.release_key(self.0.clone());
+        }
+    }
+}
+
 pub struct Pool {
-    max_size: usize,
-    keep_alive: Duration,
-    max_lifetime: Duration,
-    pool: RefCell<HashMap<Key, VecDeque<Conn>>>,
+    keys: RefCell<Vec<Key>>,
     to_close: RefCell<Vec<Connection>>,
+    to_release: RefCell<Vec<Connection>>,
+    task: RefCell<Option<Task>>,
 }
 
 impl Pool {
     fn new() -> Pool {
         Pool {
-            max_size: 128,
-            keep_alive: Duration::from_secs(15),
-            max_lifetime: Duration::from_secs(75),
-            pool: RefCell::new(HashMap::new()),
+            keys: RefCell::new(Vec::new()),
             to_close: RefCell::new(Vec::new()),
+            to_release: RefCell::new(Vec::new()),
+            task: RefCell::new(None),
         }
     }
 
-    fn collect(&self) {
-        let mut pool = self.pool.borrow_mut();
-        let mut to_close = self.to_close.borrow_mut();
-
-        // check keep-alive
-        let now = Instant::now();
-        for conns in pool.values_mut() {
-            while !conns.is_empty() {
-                if (now - conns[0].0) > self.keep_alive
-                    || (now - conns[0].1.ts) > self.max_lifetime
-                {
-                    let conn = conns.pop_front().unwrap().1;
-                    to_close.push(conn);
-                } else {
-                    break
-                }
-            }
-        }
-
-        // check connections for shutdown
-        let mut idx = 0;
-        while idx < to_close.len() {
-            match AsyncWrite::shutdown(&mut to_close[idx]) {
-                Ok(Async::NotReady) => idx += 1,
-                _ => {
-                    to_close.swap_remove(idx);
-                },
-            }
+    fn collect_keys(&self) -> Option<Vec<Key>> {
+        if self.keys.borrow().is_empty() {
+            None
+        } else {
+            Some(mem::replace(&mut *self.keys.borrow_mut(), Vec::new()))
         }
     }
 
-    fn query(&self, key: &Key) -> Option<Connection> {
-        let mut pool = self.pool.borrow_mut();
-        let mut to_close = self.to_close.borrow_mut();
-
-        if let Some(ref mut connections) = pool.get_mut(key) {
-            let now = Instant::now();
-            while let Some(conn) = connections.pop_back() {
-                // check if it still usable
-                if (now - conn.0) > self.keep_alive
-                    || (now - conn.1.ts) > self.max_lifetime
-                {
-                    to_close.push(conn.1);
-                } else {
-                    let mut conn = conn.1;
-                    let mut buf = [0; 2];
-                    match conn.stream().read(&mut buf) {
-                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (),
-                        Ok(n) if n > 0 => {
-                            to_close.push(conn);
-                            continue
-                        },
-                        Ok(_) | Err(_) => continue,
-                    }
-                    return Some(conn)
-                }
-            }
+    fn collect_close(&self) -> Option<Vec<Connection>> {
+        if self.to_close.borrow().is_empty() {
+            None
+        } else {
+            Some(mem::replace(&mut *self.to_close.borrow_mut(), Vec::new()))
+        }
+    }
+
+    fn collect_release(&self) -> Option<Vec<Connection>> {
+        if self.to_release.borrow().is_empty() {
+            None
+        } else {
+            Some(mem::replace(&mut *self.to_release.borrow_mut(), Vec::new()))
+        }
+    }
+
+    fn close(&self, conn: Connection) {
+        self.to_close.borrow_mut().push(conn);
+        if let Some(ref task) = *self.task.borrow() {
+            task.notify()
         }
-        None
     }
 
     fn release(&self, conn: Connection) {
-        if (Instant::now() - conn.ts) < self.max_lifetime {
-            let mut pool = self.pool.borrow_mut();
-            if !pool.contains_key(&conn.key) {
-                let key = conn.key.clone();
-                let mut vec = VecDeque::new();
-                vec.push_back(Conn(Instant::now(), conn));
-                pool.insert(key, vec);
-            } else {
-                let vec = pool.get_mut(&conn.key).unwrap();
-                vec.push_back(Conn(Instant::now(), conn));
-                if vec.len() > self.max_size {
-                    let conn = vec.pop_front().unwrap();
-                    self.to_close.borrow_mut().push(conn.1);
-                }
-            }
-        } else {
-            self.to_close.borrow_mut().push(conn);
+        self.to_release.borrow_mut().push(conn);
+        if let Some(ref task) = *self.task.borrow() {
+            task.notify()
+        }
+    }
+
+    fn release_key(&self, key: Key) {
+        self.keys.borrow_mut().push(key);
+        if let Some(ref task) = *self.task.borrow() {
+            task.notify()
         }
     }
 }
@@ -463,7 +843,7 @@ impl Pool {
 pub struct Connection {
     key: Key,
     stream: Box<IoStream>,
-    pool: Option<Rc<Pool>>,
+    pool: Option<AcquiredConn>,
     ts: Instant,
 }
 
@@ -474,11 +854,8 @@ impl fmt::Debug for Connection {
 }
 
 impl Connection {
-    fn new(key: Key, pool: Option<Rc<Pool>>, stream: Box<IoStream>) -> Self {
-        Connection {
-            key, pool, stream,
-            ts: Instant::now(),
-        }
+    fn new(key: Key, pool: Option<AcquiredConn>, stream: Box<IoStream>) -> Self {
+        Connection {key, stream, pool, ts: Instant::now()}
     }
 
     pub fn stream(&mut self) -> &mut IoStream {
@@ -489,8 +866,14 @@ impl Connection {
         Connection::new(Key::empty(), None, Box::new(io))
     }
 
+    pub fn close(mut self) {
+        if let Some(mut pool) = self.pool.take() {
+            pool.close(self)
+        }
+    }
+
     pub fn release(mut self) {
-        if let Some(pool) = self.pool.take() {
+        if let Some(mut pool) = self.pool.take() {
             pool.release(self)
         }
     }
diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs
index 19ccf8927..15e7ef472 100644
--- a/src/client/pipeline.rs
+++ b/src/client/pipeline.rs
@@ -458,3 +458,11 @@ impl Pipeline {
         }
     }
 }
+
+impl Drop for Pipeline {
+    fn drop(&mut self) {
+        if let Some(conn) = self.conn.take() {
+            conn.close()
+        }
+    }
+}