refactor timers

This commit is contained in:
Rob Ede 2022-01-29 04:52:48 +00:00
parent e9aabbfd4f
commit 6dd9f2b558
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
2 changed files with 94 additions and 92 deletions

View File

@ -14,7 +14,6 @@ use actix_service::Service;
use bitflags::bitflags; use bitflags::bitflags;
use bytes::{Buf, BytesMut}; use bytes::{Buf, BytesMut};
use futures_core::ready; use futures_core::ready;
use log::error;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::{ use crate::{
@ -188,7 +187,7 @@ impl TimerState {
fn set(&mut self, timer: Sleep, line: u32) { fn set(&mut self, timer: Sleep, line: u32) {
if !self.is_enabled() { if !self.is_enabled() {
warn!("setting disabled timer from line {}", line); log::warn!("setting disabled timer from line {}", line);
} }
*self = Self::Active { *self = Self::Active {
@ -196,13 +195,32 @@ impl TimerState {
}; };
} }
fn reset(&mut self, deadline: Instant, line: u32) {
if !self.is_enabled() {
log::warn!("resetting disabled timer from line {}", line);
}
match self {
TimerState::Active { timer } => {
let _ = timer.as_mut().reset(deadline);
}
_ => self.set(sleep_until(deadline), line),
}
}
fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) {
self.set(timer, line);
self.init(cx);
}
fn clear(&mut self, line: u32) { fn clear(&mut self, line: u32) {
if !self.is_enabled() { if !self.is_enabled() {
warn!("trying to clear a disabled timer from line {}", line); log::warn!("trying to clear a disabled timer from line {}", line);
} }
if matches!(self, Self::Inactive) { if matches!(self, Self::Inactive) {
warn!("trying to clear an inactive timer from line {}", line); log::warn!("trying to clear an inactive timer from line {}", line);
} }
*self = Self::Inactive; *self = Self::Inactive;
@ -412,9 +430,13 @@ where
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))); return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "")));
} }
Poll::Ready(n) => written += n, Poll::Ready(n) => {
log::trace!(" written {} bytes", n);
written += n
}
Poll::Pending => { Poll::Pending => {
log::trace!(" pending");
write_buf.advance(written); write_buf.advance(written);
return Poll::Pending; return Poll::Pending;
} }
@ -642,16 +664,20 @@ where
Poll::Ready(None) => { Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?; this.codec.encode(Message::Chunk(None), this.write_buf)?;
// payload stream finished // payload stream finished
// set state to None and handle next message // set state to None and handle next message
this.state.set(State::None); this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
continue 'res; continue 'res;
} }
Poll::Ready(Some(Err(err))) => { Poll::Ready(Some(Err(err))) => {
this.flags.insert(Flags::FINISHED);
return Err(DispatchError::Body( return Err(DispatchError::Body(
Error::new_body().with_cause(err).into(), Error::new_body().with_cause(err).into(),
)) ));
} }
Poll::Pending => return Ok(PollResponse::DoNothing), Poll::Pending => return Ok(PollResponse::DoNothing),
@ -851,7 +877,7 @@ where
if let Some(ref mut payload) = this.payload { if let Some(ref mut payload) = this.payload {
payload.feed_data(chunk); payload.feed_data(chunk);
} else { } else {
error!("Internal server error: unexpected payload chunk"); log::error!("Internal server error: unexpected payload chunk");
this.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
this.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::internal_server_error().drop_body(), Response::internal_server_error().drop_body(),
@ -865,7 +891,7 @@ where
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.feed_eof(); payload.feed_eof();
} else { } else {
error!("Internal server error: unexpected eof"); log::error!("Internal server error: unexpected eof");
this.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
this.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::internal_server_error().drop_body(), Response::internal_server_error().drop_body(),
@ -940,26 +966,22 @@ where
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let this = self.as_mut().project(); let this = self.as_mut().project();
match this.head_timer { if let TimerState::Active { timer } = this.head_timer {
TimerState::Active { timer } => { if timer.as_mut().poll(cx).is_ready() {
if timer.as_mut().poll(cx).is_ready() { // timeout on first request (slow request) return 408
// timeout on first request (slow request) return 408
log::trace!( log::trace!(
"timed out on slow request; \ "timed out on slow request; \
replying with 408 and closing connection" replying with 408 and closing connection"
); );
let _ = self.as_mut().send_error_response( let _ = self.as_mut().send_error_response(
Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
BoxBody::new(()), BoxBody::new(()),
); );
self.project().flags.insert(Flags::SHUTDOWN); self.project().flags.insert(Flags::SHUTDOWN);
}
} }
TimerState::Inactive => {}
TimerState::Disabled => {}
}; };
Ok(()) Ok(())
@ -970,39 +992,36 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let this = self.as_mut().project(); let this = self.as_mut().project();
match this.ka_timer { if let TimerState::Active { timer } = this.ka_timer {
TimerState::Active { timer } => { debug_assert!(
debug_assert!( this.flags.contains(Flags::KEEP_ALIVE),
this.flags.contains(Flags::KEEP_ALIVE), "keep-alive flag should be set when timer is active",
"keep-alive flag should be set when timer is active", );
); debug_assert!(
debug_assert!( this.state.is_none(),
this.state.is_none(), "dispatcher should not be in keep-alive phase if state is not none",
"dispatcher should not be in keep-alive phase if state is not none", );
); debug_assert!(
debug_assert!( this.write_buf.is_empty(),
this.write_buf.is_empty(), "dispatcher should not be in keep-alive phase if write_buf is not empty",
"dispatcher should not be in keep-alive phase if write_buf is not empty", );
);
// keep-alive timer has timed out // keep-alive timer has timed out
if timer.as_mut().poll(cx).is_ready() { if timer.as_mut().poll(cx).is_ready() {
// no tasks at hand // no tasks at hand
log::trace!("timer timed out; closing connection"); log::trace!("timer timed out; closing connection");
this.flags.insert(Flags::SHUTDOWN); this.flags.insert(Flags::SHUTDOWN);
if let Some(deadline) = this.config.client_disconnect_deadline() { if let Some(deadline) = this.config.client_disconnect_deadline() {
// start shutdown timeout if enabled // start shutdown timeout if enabled
log::trace!("starting disconnect timer"); log::trace!("starting disconnect timer");
this.shutdown_timer.set(sleep_until(deadline), line!()); this.shutdown_timer
} else { .set_and_init(cx, sleep_until(deadline), line!());
// no shutdown timeout, drop socket } else {
this.flags.insert(Flags::WRITE_DISCONNECT); // no shutdown timeout, drop socket
} this.flags.insert(Flags::WRITE_DISCONNECT);
} }
} }
TimerState::Disabled => {}
TimerState::Inactive => {}
} }
Ok(()) Ok(())
@ -1013,31 +1032,16 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let this = self.as_mut().project(); let this = self.as_mut().project();
match this.shutdown_timer { if let TimerState::Active { timer } = this.shutdown_timer {
TimerState::Disabled => {} debug_assert!(
TimerState::Inactive => {} this.flags.contains(Flags::SHUTDOWN),
TimerState::Active { timer } => { "shutdown flag should be set when timer is active",
debug_assert!( );
this.flags.contains(Flags::SHUTDOWN),
"shutdown flag should be set when timer is active",
);
// timed-out during shutdown; drop connection // timed-out during shutdown; drop connection
if timer.as_mut().poll(cx).is_ready() { if timer.as_mut().poll(cx).is_ready() {
log::trace!("timed-out during shutdown"); log::trace!("timed-out during shutdown");
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);
}
// if this.flags.contains(Flags::SHUTDOWN) {
// log::trace!("start shutdown timer");
// if let Some(deadline) = this.config.client_disconnect_deadline() {
// // write client disconnect time out and poll again to
// // go into Some(timer) branch
// this.timer.set(Some(sleep_until(deadline)));
// return self.poll_timer(cx);
// }
// }
} }
} }
@ -1228,11 +1232,11 @@ where
if let Some(deadline) = inner.config.client_request_deadline() { if let Some(deadline) = inner.config.client_request_deadline() {
log::trace!("start head timer"); log::trace!("start head timer");
inner inner.as_mut().project().head_timer.set_and_init(
.as_mut() cx,
.project() sleep_until(deadline),
.head_timer line!(),
.set(sleep_until(deadline), line!()); );
} }
} }
@ -1258,13 +1262,11 @@ where
if inner.flags.contains(Flags::KEEP_ALIVE) { if inner.flags.contains(Flags::KEEP_ALIVE) {
if let Some(deadline) = inner.config.keep_alive_timer() { if let Some(deadline) = inner.config.keep_alive_timer() {
log::trace!("setting keep-alive timer"); log::trace!("setting keep-alive timer");
inner inner.as_mut().project().ka_timer.set_and_init(
.as_mut() cx,
.project() deadline,
.ka_timer line!(),
.set(deadline, line!()); );
inner.as_mut().project().ka_timer.init(cx);
} }
} }
@ -1355,7 +1357,7 @@ where
} }
DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
error!("Upgrade handler error: {}", err); log::error!("Upgrade handler error: {}", err);
DispatchError::Upgrade DispatchError::Upgrade
}), }),
} }

View File

@ -197,7 +197,9 @@ async fn test_chunked_payload() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_slow_request_close() { async fn slow_request_close() {
let _ = env_logger::try_init();
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.client_timeout(200) .client_timeout(200)
@ -309,8 +311,6 @@ async fn test_http1_keepalive() {
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive_timeout() { async fn test_http1_keepalive_timeout() {
let _ = env_logger::try_init();
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(1) .keep_alive(1)