From 09cb93b7f01014b660758ff2d909e8c15e7ff994 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 25 Jan 2022 15:01:05 +0000 Subject: [PATCH] better dispatcher docs --- actix-http/src/config.rs | 20 ++-- actix-http/src/h1/codec.rs | 2 +- actix-http/src/h1/dispatcher.rs | 165 +++++++++++++++++++++----------- actix-http/src/h2/mod.rs | 2 +- actix-http/src/requests/head.rs | 2 +- actix-http/tests/test_server.rs | 6 +- 6 files changed, 124 insertions(+), 73 deletions(-) diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index c7c749296..fdc85e81c 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -45,8 +45,10 @@ impl From> for KeepAlive { } /// HTTP service configuration. +#[derive(Debug, Clone)] pub struct ServiceConfig(Rc); +#[derive(Debug)] struct Inner { keep_alive: Option, client_request_timeout: u64, @@ -57,12 +59,6 @@ struct Inner { date_service: DateService, } -impl Clone for ServiceConfig { - fn clone(&self) -> Self { - ServiceConfig(self.0.clone()) - } -} - impl Default for ServiceConfig { fn default() -> Self { Self::new(KeepAlive::Timeout(5), 0, 0, false, None) @@ -143,7 +139,6 @@ impl ServiceConfig { /// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`. /// /// [client request deadline]: Self::client_deadline - #[inline] pub fn client_request_timer(&self) -> Option { self.client_request_deadline().map(sleep_until) } @@ -168,18 +163,15 @@ impl ServiceConfig { /// Creates a timer that resolves at the [keep-alive deadline]. /// /// [keep-alive deadline]: Self::keep_alive_deadline - #[inline] pub fn keep_alive_timer(&self) -> Option { self.keep_alive_deadline().map(sleep_until) } - #[inline] pub(crate) fn now(&self) -> Instant { self.0.date_service.now() } - #[doc(hidden)] - pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) { + pub(crate) fn set_date(&self, dst: &mut BytesMut, camel_case: bool) { let mut buf: [u8; 39] = [0; 39]; buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " }); @@ -236,6 +228,12 @@ struct DateService { handle: JoinHandle<()>, } +impl fmt::Debug for DateService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DateService").finish_non_exhaustive() + } +} + impl Drop for DateService { fn drop(&mut self) { // stop the timer update async task on drop. diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 9a8907579..a630110f1 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -42,7 +42,7 @@ impl Default for Codec { impl fmt::Debug for Codec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "h1::Codec({:?})", self.flags) + f.debug_struct("h1::Codec").finish_non_exhaustive() } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index b32c19621..4f5a16a1a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -38,10 +38,19 @@ const MAX_PIPELINED_MESSAGES: usize = 16; bitflags! { pub struct Flags: u8 { + /// Set when .... started? const STARTED = 0b0000_0001; + + /// Set if keep-alive is active. const KEEPALIVE = 0b0000_0010; + + /// Set if in shutdown procedure. const SHUTDOWN = 0b0000_0100; + + /// Set if read-half is disconnected. const READ_DISCONNECT = 0b0000_1000; + + /// Set if write-half is disconnected. const WRITE_DISCONNECT = 0b0001_0000; } } @@ -142,6 +151,9 @@ pin_project! { payload: Option, messages: VecDeque, + // Initialized as initial KA deadline or current time. + // Updated when messages are read from stream and after timer is used for + // first-request timeout. ka_deadline: Instant, #[pin] ka_timer: Option, @@ -226,8 +238,8 @@ where }; // keep-alive timer - let (ka_expire, ka_timer) = match config.keep_alive_timer() { - Some(delay) => (delay.deadline(), Some(delay)), + let (ka_deadline, ka_timer) = match config.keep_alive_timer() { + Some(timer) => (timer.deadline(), Some(timer)), None => (config.now(), None), }; @@ -244,7 +256,7 @@ where payload: None, messages: VecDeque::new(), - ka_deadline: ka_expire, + ka_deadline, ka_timer, io: Some(io), @@ -286,11 +298,13 @@ where } } - // if checked is set to true, delay disconnect until all tasks have finished. + /// If checked is set to true, delay disconnect until all tasks have finished. fn client_disconnected(self: Pin<&mut Self>) { let this = self.project(); + this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } @@ -308,7 +322,9 @@ where Poll::Ready(0) => { return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))) } + Poll::Ready(n) => written += n, + Poll::Pending => { write_buf.advance(written); return Poll::Pending; @@ -316,10 +332,10 @@ where } } - // everything has written to io. clear buffer. + // everything has written to I/O; clear buffer write_buf.clear(); - // flush the io and check if get blocked. + // flush the I/O and check if get blocked io.poll_flush(cx) } @@ -364,11 +380,12 @@ where body: BoxBody, ) -> Result<(), DispatchError> { let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { + + self.project().state.set(match size { BodySize::None | BodySize::Sized(0) => State::None, _ => State::SendErrorPayload { body }, - }; - self.project().state.set(state); + }); + Ok(()) } @@ -536,44 +553,53 @@ where req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - // Handle `EXPECT: 100-Continue` header - let mut this = self.as_mut().project(); - if req.head().expect() { - // set dispatcher state so the future is pinned. - let fut = this.flow.expect.call(req); - this.state.set(State::ExpectCall { fut }); - } else { - // the same as above. - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); + // initialize dispatcher state + { + let mut this = self.as_mut().project(); + + // Handle `EXPECT: 100-Continue` header + if req.head().expect() { + // set dispatcher state to call expect handler + let fut = this.flow.expect.call(req); + this.state.set(State::ExpectCall { fut }); + } else { + // set dispatcher state to call service handler + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + }; }; - // eagerly poll the future for once(or twice if expect is resolved immediately). + // eagerly poll the future once (or twice if expect is resolved immediately). loop { match self.as_mut().project().state.project() { StateProj::ExpectCall { fut } => { match fut.poll(cx) { - // expect is resolved. continue loop and poll the service call branch. + // expect is resolved; continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); + let mut this = self.as_mut().project(); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); + continue; } - // future is pending. return Ok(()) to notify that a new state is - // set and the outer loop should be continue. - Poll::Pending => return Ok(()), - // future is error. send response and return a result. On success - // to notify the dispatcher a new state is set and the outer loop - // should be continue. + + // future is error; send response and return a result + // on success to notify the dispatcher a new state is set and the outer loop + // should be continued Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); return self.send_error_response(res, body); } + + // future is pending; return Ok(()) to notify that a new state is + // set and the outer loop should be continue. + Poll::Pending => return Ok(()), } } + StateProj::ServiceCall { fut } => { // return no matter the service call future's result. return match fut.poll(cx) { @@ -584,9 +610,11 @@ where let (res, body) = res.into().replace_body(()); self.send_response(res, body) } - // see the comment on ExpectCall state branch's Pending. + + // see the comment on ExpectCall state branch's Pending Poll::Pending => Ok(()), - // see the comment on ExpectCall state branch's Ready(Err(err)). + + // see the comment on ExpectCall state branch's Ready(Err(err)) Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); @@ -594,6 +622,7 @@ where } }; } + _ => { unreachable!( "State must be set to ServiceCall or ExceptCall in handle_request" @@ -608,13 +637,17 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { + let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES; + let can_not_read = !self.can_read(cx); + // limit amount of non-processed requests - if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { + if pipeline_queue_full || can_not_read { return Ok(false); } let mut updated = false; let mut this = self.as_mut().project(); + loop { match this.codec.decode(this.read_buf) { Ok(Some(msg)) => { @@ -628,33 +661,28 @@ where req.conn_data = this.conn_data.as_ref().map(Rc::clone); match this.codec.message_type() { - // Request is upgradable. add upgrade message and break. - // everything remain in read buffer would be handed to + // request has no payload + MessageType::None => {} + + // Request is upgradable. Add upgrade message and break. + // Everything remaining in read buffer will be handed to // upgraded Request. MessageType::Stream if this.flow.upgrade.is_some() => { this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } - // Request is not upgradable. + // request is not upgradable MessageType::Payload | MessageType::Stream => { - /* - PayloadSender and Payload are smart pointers share the - same state. - PayloadSender is attached to dispatcher and used to sink - new chunked request data to state. - Payload is attached to Request and passed to Service::call - where the state can be collected and consumed. - */ + // PayloadSender and Payload are smart pointers share the + // same state. PayloadSender is attached to dispatcher and used + // to sink new chunked request data to state. Payload is + // attached to Request and passed to Service::call where the + // state can be collected and consumed. let (sender, payload) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::Payload::H1 { payload }); - req = req1; + *req.payload() = crate::Payload::H1 { payload }; *this.payload = Some(sender); } - - // Request has no payload. - MessageType::None => {} } // handle request early when no future in InnerDispatcher state. @@ -665,6 +693,7 @@ where this.messages.push_back(DispatcherMessage::Item(req)); } } + Message::Chunk(Some(chunk)) => { if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); @@ -678,6 +707,7 @@ where break; } } + Message::Chunk(None) => { if let Some(mut payload) = this.payload.take() { payload.feed_eof(); @@ -693,29 +723,36 @@ where } } } + // decode is partial and buffer is not full yet. // break and wait for more read. Ok(None) => break, + Err(ParseError::Io(err)) => { self.as_mut().client_disconnected(); this = self.as_mut().project(); *this.error = Some(DispatchError::Io(err)); break; } + Err(ParseError::TooLarge) => { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Overflow); } - // Requests overflow buffer size should be responded with 431 + + // request heads that overflow buffer size return a 431 error this.messages .push_back(DispatcherMessage::Error(Response::with_body( StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, (), ))); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(ParseError::TooLarge.into()); + break; } + Err(err) => { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); @@ -725,6 +762,7 @@ where this.messages.push_back(DispatcherMessage::Error( Response::bad_request().drop_body(), )); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(err.into()); break; @@ -737,6 +775,7 @@ where *this.ka_deadline = expire; } } + Ok(updated) } @@ -755,24 +794,27 @@ where if this.flags.contains(Flags::SHUTDOWN) { if let Some(deadline) = this.codec.config().client_disconnect_deadline() { // write client disconnect time out and poll again to - // go into Some> branch + // go into Some(timer) branch this.ka_timer.set(Some(sleep_until(deadline))); return self.poll_keepalive(cx); } } } + Some(mut timer) => { - // only operate when keep-alive timer is resolved. + // only operate when keep-alive timer is resolved if timer.as_mut().poll(cx).is_ready() { - // got timeout during shutdown, drop connection + // timed-out during shutdown; drop connection if this.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); - // exceed deadline. check for any outstanding tasks - } else if timer.deadline() >= *this.ka_deadline { + } + + // exceeded deadline; check for any outstanding tasks + if timer.deadline() >= *this.ka_deadline { if this.state.is_empty() && this.write_buf.is_empty() { // have no task at hand if this.flags.contains(Flags::STARTED) { - trace!("Keep-alive timeout, close connection"); + trace!("keep-alive timed out; closing connection"); this.flags.insert(Flags::SHUTDOWN); // start shutdown timeout @@ -787,11 +829,17 @@ where } } else { // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); + + trace!( + "timed out on slow request; \ + replying with 408 and closing connection" + ); + let _ = self.as_mut().send_error_response( Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), BoxBody::new(()), ); + this = self.project(); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); } @@ -810,15 +858,16 @@ where } } } + Ok(()) } - /// Returns true when io stream can be disconnected after write to it. + /// Returns true when I/O stream can be disconnected after write to it. /// /// It covers these conditions: /// - `std::io::ErrorKind::ConnectionReset` after partial read. /// - all data read done. - #[inline(always)] + #[inline(always)] // TODO: bench this inline fn read_available( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -1004,7 +1053,7 @@ where let is_empty = inner.state.is_empty(); let inner_p = inner.as_mut().project(); - // read half is closed and we do not processing any responses + // read half is closed; we do not process any responses if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { inner_p.flags.insert(Flags::SHUTDOWN); } diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index 71ded8d54..7703d914a 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -86,7 +86,7 @@ where let this = self.get_mut(); match Pin::new(&mut this.handshake).poll(cx)? { - // return the timer on success handshake. It can be re-used for h2 ping-pong. + // return the timer on success handshake; its slot can be re-used for h2 ping-pong Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Pending => match this.timer.as_mut() { Some(timer) => { diff --git a/actix-http/src/requests/head.rs b/actix-http/src/requests/head.rs index 06fd0429e..4558801f3 100644 --- a/actix-http/src/requests/head.rs +++ b/actix-http/src/requests/head.rs @@ -130,8 +130,8 @@ impl RequestHead { } } + /// Request contains `EXPECT` header. #[inline] - /// Request contains `EXPECT` header pub fn expect(&self) -> bool { self.flags.contains(Flags::EXPECT) } diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 1bb574fd6..c74978f35 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -210,7 +210,11 @@ async fn test_slow_request() { let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); let mut data = String::new(); let _ = stream.read_to_string(&mut data); - assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); + assert!( + data.starts_with("HTTP/1.1 408 Request Timeout"), + "response was not 408: {}", + data + ); srv.stop().await; }