From bccd7c76715eac46f92191c5fbd4d6391af1aeae Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 17 Jul 2018 01:57:57 +0600 Subject: [PATCH 1/2] add wait queue size stat to client connector --- src/client/connector.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/client/connector.rs b/src/client/connector.rs index 1ff0efe51..604af0b86 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -34,6 +34,8 @@ use {HAS_OPENSSL, HAS_TLS}; pub struct ClientConnectorStats { /// Number of waited-on connections pub waits: usize, + /// Size of the wait queue + pub wait_queue: usize, /// Number of reused connections pub reused: usize, /// Number of opened connections @@ -494,8 +496,13 @@ impl ClientConnector { ctx.run_later(Duration::from_secs(1), |act, ctx| act.collect_periodic(ctx)); // send stats - let stats = mem::replace(&mut self.stats, ClientConnectorStats::default()); + let mut stats = mem::replace(&mut self.stats, ClientConnectorStats::default()); if let Some(ref mut subscr) = self.subscriber { + if let Some(ref waiters) = self.waiters { + for w in waiters.values() { + stats.wait_queue += w.len(); + } + } let _ = subscr.do_send(stats); } } From 1af5aa3a3ea4d897ca598b0528e1c70ddfc46cff Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 17 Jul 2018 02:30:21 +0600 Subject: [PATCH 2/2] calculate client request timeout --- src/client/pipeline.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 2192d474c..c3f3bf4cd 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -71,7 +71,7 @@ pub struct SendRequest { conn: Option>, conn_timeout: Duration, wait_timeout: Duration, - timeout: Option, + timeout: Option, } impl SendRequest { @@ -115,7 +115,7 @@ impl SendRequest { /// Request timeout is the total time before a response must be received. /// Default value is 5 seconds. pub fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = Some(Delay::new(Instant::now() + timeout)); + self.timeout = Some(timeout); self } @@ -185,9 +185,10 @@ impl Future for SendRequest { _ => IoBody::Done, }; - let timeout = self.timeout.take().unwrap_or_else(|| { - Delay::new(Instant::now() + Duration::from_secs(5)) - }); + let timeout = self + .timeout + .take() + .unwrap_or_else(|| Duration::from_secs(5)); let pl = Box::new(Pipeline { body, @@ -201,7 +202,7 @@ impl Future for SendRequest { decompress: None, should_decompress: self.req.response_decompress(), write_state: RunningState::Running, - timeout: Some(timeout), + timeout: Some(Delay::new(Instant::now() + timeout)), close: self.req.method() == &Method::HEAD, }); self.state = State::Send(pl);