diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 13004ba2d..2581504cd 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -14,7 +14,6 @@ use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use futures_core::ready; -use log::error; use pin_project_lite::pin_project; use crate::{ @@ -188,7 +187,7 @@ impl TimerState { fn set(&mut self, timer: Sleep, line: u32) { if !self.is_enabled() { - warn!("setting disabled timer from line {}", line); + log::warn!("setting disabled timer from line {}", line); } *self = Self::Active { @@ -196,13 +195,32 @@ impl TimerState { }; } + fn reset(&mut self, deadline: Instant, line: u32) { + if !self.is_enabled() { + log::warn!("resetting disabled timer from line {}", line); + } + + match self { + TimerState::Active { timer } => { + let _ = timer.as_mut().reset(deadline); + } + + _ => self.set(sleep_until(deadline), line), + } + } + + fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) { + self.set(timer, line); + self.init(cx); + } + fn clear(&mut self, line: u32) { if !self.is_enabled() { - warn!("trying to clear a disabled timer from line {}", line); + log::warn!("trying to clear a disabled timer from line {}", line); } if matches!(self, Self::Inactive) { - warn!("trying to clear an inactive timer from line {}", line); + log::warn!("trying to clear an inactive timer from line {}", line); } *self = Self::Inactive; @@ -412,9 +430,13 @@ where return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))); } - Poll::Ready(n) => written += n, + Poll::Ready(n) => { + log::trace!(" written {} bytes", n); + written += n + } Poll::Pending => { + log::trace!(" pending"); write_buf.advance(written); return Poll::Pending; } @@ -642,16 +664,20 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // payload stream finished // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { + this.flags.insert(Flags::FINISHED); return Err(DispatchError::Body( Error::new_body().with_cause(err).into(), - )) + )); } Poll::Pending => return Ok(PollResponse::DoNothing), @@ -851,7 +877,7 @@ where if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); } else { - error!("Internal server error: unexpected payload chunk"); + log::error!("Internal server error: unexpected payload chunk"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -865,7 +891,7 @@ where if let Some(mut payload) = this.payload.take() { payload.feed_eof(); } else { - error!("Internal server error: unexpected eof"); + log::error!("Internal server error: unexpected eof"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -940,26 +966,22 @@ where ) -> Result<(), DispatchError> { let this = self.as_mut().project(); - match this.head_timer { - TimerState::Active { timer } => { - if timer.as_mut().poll(cx).is_ready() { - // timeout on first request (slow request) return 408 + if let TimerState::Active { timer } = this.head_timer { + if timer.as_mut().poll(cx).is_ready() { + // timeout on first request (slow request) return 408 - log::trace!( - "timed out on slow request; \ + log::trace!( + "timed out on slow request; \ replying with 408 and closing connection" - ); + ); - let _ = self.as_mut().send_error_response( - Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), - BoxBody::new(()), - ); + let _ = self.as_mut().send_error_response( + Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), + BoxBody::new(()), + ); - self.project().flags.insert(Flags::SHUTDOWN); - } + self.project().flags.insert(Flags::SHUTDOWN); } - TimerState::Inactive => {} - TimerState::Disabled => {} }; Ok(()) @@ -970,39 +992,36 @@ where cx: &mut Context<'_>, ) -> Result<(), DispatchError> { let this = self.as_mut().project(); - match this.ka_timer { - TimerState::Active { timer } => { - debug_assert!( - this.flags.contains(Flags::KEEP_ALIVE), - "keep-alive flag should be set when timer is active", - ); - debug_assert!( - this.state.is_none(), - "dispatcher should not be in keep-alive phase if state is not none", - ); - debug_assert!( - this.write_buf.is_empty(), - "dispatcher should not be in keep-alive phase if write_buf is not empty", - ); + if let TimerState::Active { timer } = this.ka_timer { + debug_assert!( + this.flags.contains(Flags::KEEP_ALIVE), + "keep-alive flag should be set when timer is active", + ); + debug_assert!( + this.state.is_none(), + "dispatcher should not be in keep-alive phase if state is not none", + ); + debug_assert!( + this.write_buf.is_empty(), + "dispatcher should not be in keep-alive phase if write_buf is not empty", + ); - // keep-alive timer has timed out - if timer.as_mut().poll(cx).is_ready() { - // no tasks at hand - log::trace!("timer timed out; closing connection"); - this.flags.insert(Flags::SHUTDOWN); + // keep-alive timer has timed out + if timer.as_mut().poll(cx).is_ready() { + // no tasks at hand + log::trace!("timer timed out; closing connection"); + this.flags.insert(Flags::SHUTDOWN); - if let Some(deadline) = this.config.client_disconnect_deadline() { - // start shutdown timeout if enabled - log::trace!("starting disconnect timer"); - this.shutdown_timer.set(sleep_until(deadline), line!()); - } else { - // no shutdown timeout, drop socket - this.flags.insert(Flags::WRITE_DISCONNECT); - } + if let Some(deadline) = this.config.client_disconnect_deadline() { + // start shutdown timeout if enabled + log::trace!("starting disconnect timer"); + this.shutdown_timer + .set_and_init(cx, sleep_until(deadline), line!()); + } else { + // no shutdown timeout, drop socket + this.flags.insert(Flags::WRITE_DISCONNECT); } } - TimerState::Disabled => {} - TimerState::Inactive => {} } Ok(()) @@ -1013,31 +1032,16 @@ where cx: &mut Context<'_>, ) -> Result<(), DispatchError> { let this = self.as_mut().project(); - match this.shutdown_timer { - TimerState::Disabled => {} - TimerState::Inactive => {} - TimerState::Active { timer } => { - debug_assert!( - this.flags.contains(Flags::SHUTDOWN), - "shutdown flag should be set when timer is active", - ); + if let TimerState::Active { timer } = this.shutdown_timer { + debug_assert!( + this.flags.contains(Flags::SHUTDOWN), + "shutdown flag should be set when timer is active", + ); - // timed-out during shutdown; drop connection - if timer.as_mut().poll(cx).is_ready() { - log::trace!("timed-out during shutdown"); - return Err(DispatchError::DisconnectTimeout); - } - - // if this.flags.contains(Flags::SHUTDOWN) { - // log::trace!("start shutdown timer"); - - // if let Some(deadline) = this.config.client_disconnect_deadline() { - // // write client disconnect time out and poll again to - // // go into Some(timer) branch - // this.timer.set(Some(sleep_until(deadline))); - // return self.poll_timer(cx); - // } - // } + // timed-out during shutdown; drop connection + if timer.as_mut().poll(cx).is_ready() { + log::trace!("timed-out during shutdown"); + return Err(DispatchError::DisconnectTimeout); } } @@ -1228,11 +1232,11 @@ where if let Some(deadline) = inner.config.client_request_deadline() { log::trace!("start head timer"); - inner - .as_mut() - .project() - .head_timer - .set(sleep_until(deadline), line!()); + inner.as_mut().project().head_timer.set_and_init( + cx, + sleep_until(deadline), + line!(), + ); } } @@ -1258,13 +1262,11 @@ where if inner.flags.contains(Flags::KEEP_ALIVE) { if let Some(deadline) = inner.config.keep_alive_timer() { log::trace!("setting keep-alive timer"); - inner - .as_mut() - .project() - .ka_timer - .set(deadline, line!()); - - inner.as_mut().project().ka_timer.init(cx); + inner.as_mut().project().ka_timer.set_and_init( + cx, + deadline, + line!(), + ); } } @@ -1355,7 +1357,7 @@ where } DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { - error!("Upgrade handler error: {}", err); + log::error!("Upgrade handler error: {}", err); DispatchError::Upgrade }), } diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 4ea96419e..56ae74c96 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -197,7 +197,9 @@ async fn test_chunked_payload() { } #[actix_rt::test] -async fn test_slow_request_close() { +async fn slow_request_close() { + let _ = env_logger::try_init(); + let mut srv = test_server(|| { HttpService::build() .client_timeout(200) @@ -309,8 +311,6 @@ async fn test_http1_keepalive() { #[actix_rt::test] async fn test_http1_keepalive_timeout() { - let _ = env_logger::try_init(); - let mut srv = test_server(|| { HttpService::build() .keep_alive(1)