better dispatcher docs

This commit is contained in:
Rob Ede 2022-01-25 15:01:05 +00:00
parent ab226de475
commit 09cb93b7f0
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
6 changed files with 124 additions and 73 deletions

View File

@ -45,8 +45,10 @@ impl From<Option<usize>> for KeepAlive {
} }
/// HTTP service configuration. /// HTTP service configuration.
#[derive(Debug, Clone)]
pub struct ServiceConfig(Rc<Inner>); pub struct ServiceConfig(Rc<Inner>);
#[derive(Debug)]
struct Inner { struct Inner {
keep_alive: Option<Duration>, keep_alive: Option<Duration>,
client_request_timeout: u64, client_request_timeout: u64,
@ -57,12 +59,6 @@ struct Inner {
date_service: DateService, date_service: DateService,
} }
impl Clone for ServiceConfig {
fn clone(&self) -> Self {
ServiceConfig(self.0.clone())
}
}
impl Default for ServiceConfig { impl Default for ServiceConfig {
fn default() -> Self { fn default() -> Self {
Self::new(KeepAlive::Timeout(5), 0, 0, false, None) Self::new(KeepAlive::Timeout(5), 0, 0, false, None)
@ -143,7 +139,6 @@ impl ServiceConfig {
/// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`. /// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`.
/// ///
/// [client request deadline]: Self::client_deadline /// [client request deadline]: Self::client_deadline
#[inline]
pub fn client_request_timer(&self) -> Option<Sleep> { pub fn client_request_timer(&self) -> Option<Sleep> {
self.client_request_deadline().map(sleep_until) self.client_request_deadline().map(sleep_until)
} }
@ -168,18 +163,15 @@ impl ServiceConfig {
/// Creates a timer that resolves at the [keep-alive deadline]. /// Creates a timer that resolves at the [keep-alive deadline].
/// ///
/// [keep-alive deadline]: Self::keep_alive_deadline /// [keep-alive deadline]: Self::keep_alive_deadline
#[inline]
pub fn keep_alive_timer(&self) -> Option<Sleep> { pub fn keep_alive_timer(&self) -> Option<Sleep> {
self.keep_alive_deadline().map(sleep_until) self.keep_alive_deadline().map(sleep_until)
} }
#[inline]
pub(crate) fn now(&self) -> Instant { pub(crate) fn now(&self) -> Instant {
self.0.date_service.now() self.0.date_service.now()
} }
#[doc(hidden)] pub(crate) fn set_date(&self, dst: &mut BytesMut, camel_case: bool) {
pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) {
let mut buf: [u8; 39] = [0; 39]; let mut buf: [u8; 39] = [0; 39];
buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " }); buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " });
@ -236,6 +228,12 @@ struct DateService {
handle: JoinHandle<()>, handle: JoinHandle<()>,
} }
impl fmt::Debug for DateService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DateService").finish_non_exhaustive()
}
}
impl Drop for DateService { impl Drop for DateService {
fn drop(&mut self) { fn drop(&mut self) {
// stop the timer update async task on drop. // stop the timer update async task on drop.

View File

@ -42,7 +42,7 @@ impl Default for Codec {
impl fmt::Debug for Codec { impl fmt::Debug for Codec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "h1::Codec({:?})", self.flags) f.debug_struct("h1::Codec").finish_non_exhaustive()
} }
} }

View File

@ -38,10 +38,19 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! { bitflags! {
pub struct Flags: u8 { pub struct Flags: u8 {
/// Set when .... started?
const STARTED = 0b0000_0001; const STARTED = 0b0000_0001;
/// Set if keep-alive is active.
const KEEPALIVE = 0b0000_0010; const KEEPALIVE = 0b0000_0010;
/// Set if in shutdown procedure.
const SHUTDOWN = 0b0000_0100; const SHUTDOWN = 0b0000_0100;
/// Set if read-half is disconnected.
const READ_DISCONNECT = 0b0000_1000; const READ_DISCONNECT = 0b0000_1000;
/// Set if write-half is disconnected.
const WRITE_DISCONNECT = 0b0001_0000; const WRITE_DISCONNECT = 0b0001_0000;
} }
} }
@ -142,6 +151,9 @@ pin_project! {
payload: Option<PayloadSender>, payload: Option<PayloadSender>,
messages: VecDeque<DispatcherMessage>, messages: VecDeque<DispatcherMessage>,
// Initialized as initial KA deadline or current time.
// Updated when messages are read from stream and after timer is used for
// first-request timeout.
ka_deadline: Instant, ka_deadline: Instant,
#[pin] #[pin]
ka_timer: Option<Sleep>, ka_timer: Option<Sleep>,
@ -226,8 +238,8 @@ where
}; };
// keep-alive timer // keep-alive timer
let (ka_expire, ka_timer) = match config.keep_alive_timer() { let (ka_deadline, ka_timer) = match config.keep_alive_timer() {
Some(delay) => (delay.deadline(), Some(delay)), Some(timer) => (timer.deadline(), Some(timer)),
None => (config.now(), None), None => (config.now(), None),
}; };
@ -244,7 +256,7 @@ where
payload: None, payload: None,
messages: VecDeque::new(), messages: VecDeque::new(),
ka_deadline: ka_expire, ka_deadline,
ka_timer, ka_timer,
io: Some(io), io: Some(io),
@ -286,11 +298,13 @@ where
} }
} }
// if checked is set to true, delay disconnect until all tasks have finished. /// If checked is set to true, delay disconnect until all tasks have finished.
fn client_disconnected(self: Pin<&mut Self>) { fn client_disconnected(self: Pin<&mut Self>) {
let this = self.project(); let this = self.project();
this.flags this.flags
.insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None)); payload.set_error(PayloadError::Incomplete(None));
} }
@ -308,7 +322,9 @@ where
Poll::Ready(0) => { Poll::Ready(0) => {
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))) return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "")))
} }
Poll::Ready(n) => written += n, Poll::Ready(n) => written += n,
Poll::Pending => { Poll::Pending => {
write_buf.advance(written); write_buf.advance(written);
return Poll::Pending; return Poll::Pending;
@ -316,10 +332,10 @@ where
} }
} }
// everything has written to io. clear buffer. // everything has written to I/O; clear buffer
write_buf.clear(); write_buf.clear();
// flush the io and check if get blocked. // flush the I/O and check if get blocked
io.poll_flush(cx) io.poll_flush(cx)
} }
@ -364,11 +380,12 @@ where
body: BoxBody, body: BoxBody,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
let size = self.as_mut().send_response_inner(message, &body)?; let size = self.as_mut().send_response_inner(message, &body)?;
let state = match size {
self.project().state.set(match size {
BodySize::None | BodySize::Sized(0) => State::None, BodySize::None | BodySize::Sized(0) => State::None,
_ => State::SendErrorPayload { body }, _ => State::SendErrorPayload { body },
}; });
self.project().state.set(state);
Ok(()) Ok(())
} }
@ -536,44 +553,53 @@ where
req: Request, req: Request,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<(), DispatchError> { ) -> Result<(), DispatchError> {
// Handle `EXPECT: 100-Continue` header // initialize dispatcher state
{
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
// Handle `EXPECT: 100-Continue` header
if req.head().expect() { if req.head().expect() {
// set dispatcher state so the future is pinned. // set dispatcher state to call expect handler
let fut = this.flow.expect.call(req); let fut = this.flow.expect.call(req);
this.state.set(State::ExpectCall { fut }); this.state.set(State::ExpectCall { fut });
} else { } else {
// the same as above. // set dispatcher state to call service handler
let fut = this.flow.service.call(req); let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut }); this.state.set(State::ServiceCall { fut });
}; };
};
// eagerly poll the future for once(or twice if expect is resolved immediately). // eagerly poll the future once (or twice if expect is resolved immediately).
loop { loop {
match self.as_mut().project().state.project() { match self.as_mut().project().state.project() {
StateProj::ExpectCall { fut } => { StateProj::ExpectCall { fut } => {
match fut.poll(cx) { match fut.poll(cx) {
// expect is resolved. continue loop and poll the service call branch. // expect is resolved; continue loop and poll the service call branch.
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
self.as_mut().send_continue(); self.as_mut().send_continue();
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
let fut = this.flow.service.call(req); let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut }); this.state.set(State::ServiceCall { fut });
continue; continue;
} }
// future is pending. return Ok(()) to notify that a new state is
// set and the outer loop should be continue. // future is error; send response and return a result
Poll::Pending => return Ok(()), // on success to notify the dispatcher a new state is set and the outer loop
// future is error. send response and return a result. On success // should be continued
// to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into(); let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
return self.send_error_response(res, body); return self.send_error_response(res, body);
} }
// future is pending; return Ok(()) to notify that a new state is
// set and the outer loop should be continue.
Poll::Pending => return Ok(()),
} }
} }
StateProj::ServiceCall { fut } => { StateProj::ServiceCall { fut } => {
// return no matter the service call future's result. // return no matter the service call future's result.
return match fut.poll(cx) { return match fut.poll(cx) {
@ -584,9 +610,11 @@ where
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
} }
// see the comment on ExpectCall state branch's Pending.
// see the comment on ExpectCall state branch's Pending
Poll::Pending => Ok(()), Poll::Pending => Ok(()),
// see the comment on ExpectCall state branch's Ready(Err(err)).
// see the comment on ExpectCall state branch's Ready(Err(err))
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
let res: Response<BoxBody> = err.into(); let res: Response<BoxBody> = err.into();
let (res, body) = res.replace_body(()); let (res, body) = res.replace_body(());
@ -594,6 +622,7 @@ where
} }
}; };
} }
_ => { _ => {
unreachable!( unreachable!(
"State must be set to ServiceCall or ExceptCall in handle_request" "State must be set to ServiceCall or ExceptCall in handle_request"
@ -608,13 +637,17 @@ where
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, DispatchError> { ) -> Result<bool, DispatchError> {
let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES;
let can_not_read = !self.can_read(cx);
// limit amount of non-processed requests // limit amount of non-processed requests
if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { if pipeline_queue_full || can_not_read {
return Ok(false); return Ok(false);
} }
let mut updated = false; let mut updated = false;
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
loop { loop {
match this.codec.decode(this.read_buf) { match this.codec.decode(this.read_buf) {
Ok(Some(msg)) => { Ok(Some(msg)) => {
@ -628,33 +661,28 @@ where
req.conn_data = this.conn_data.as_ref().map(Rc::clone); req.conn_data = this.conn_data.as_ref().map(Rc::clone);
match this.codec.message_type() { match this.codec.message_type() {
// Request is upgradable. add upgrade message and break. // request has no payload
// everything remain in read buffer would be handed to MessageType::None => {}
// Request is upgradable. Add upgrade message and break.
// Everything remaining in read buffer will be handed to
// upgraded Request. // upgraded Request.
MessageType::Stream if this.flow.upgrade.is_some() => { MessageType::Stream if this.flow.upgrade.is_some() => {
this.messages.push_back(DispatcherMessage::Upgrade(req)); this.messages.push_back(DispatcherMessage::Upgrade(req));
break; break;
} }
// Request is not upgradable. // request is not upgradable
MessageType::Payload | MessageType::Stream => { MessageType::Payload | MessageType::Stream => {
/* // PayloadSender and Payload are smart pointers share the
PayloadSender and Payload are smart pointers share the // same state. PayloadSender is attached to dispatcher and used
same state. // to sink new chunked request data to state. Payload is
PayloadSender is attached to dispatcher and used to sink // attached to Request and passed to Service::call where the
new chunked request data to state. // state can be collected and consumed.
Payload is attached to Request and passed to Service::call
where the state can be collected and consumed.
*/
let (sender, payload) = Payload::create(false); let (sender, payload) = Payload::create(false);
let (req1, _) = *req.payload() = crate::Payload::H1 { payload };
req.replace_payload(crate::Payload::H1 { payload });
req = req1;
*this.payload = Some(sender); *this.payload = Some(sender);
} }
// Request has no payload.
MessageType::None => {}
} }
// handle request early when no future in InnerDispatcher state. // handle request early when no future in InnerDispatcher state.
@ -665,6 +693,7 @@ where
this.messages.push_back(DispatcherMessage::Item(req)); this.messages.push_back(DispatcherMessage::Item(req));
} }
} }
Message::Chunk(Some(chunk)) => { Message::Chunk(Some(chunk)) => {
if let Some(ref mut payload) = this.payload { if let Some(ref mut payload) = this.payload {
payload.feed_data(chunk); payload.feed_data(chunk);
@ -678,6 +707,7 @@ where
break; break;
} }
} }
Message::Chunk(None) => { Message::Chunk(None) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.feed_eof(); payload.feed_eof();
@ -693,29 +723,36 @@ where
} }
} }
} }
// decode is partial and buffer is not full yet. // decode is partial and buffer is not full yet.
// break and wait for more read. // break and wait for more read.
Ok(None) => break, Ok(None) => break,
Err(ParseError::Io(err)) => { Err(ParseError::Io(err)) => {
self.as_mut().client_disconnected(); self.as_mut().client_disconnected();
this = self.as_mut().project(); this = self.as_mut().project();
*this.error = Some(DispatchError::Io(err)); *this.error = Some(DispatchError::Io(err));
break; break;
} }
Err(ParseError::TooLarge) => { Err(ParseError::TooLarge) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Overflow); payload.set_error(PayloadError::Overflow);
} }
// Requests overflow buffer size should be responded with 431
// request heads that overflow buffer size return a 431 error
this.messages this.messages
.push_back(DispatcherMessage::Error(Response::with_body( .push_back(DispatcherMessage::Error(Response::with_body(
StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
(), (),
))); )));
this.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
*this.error = Some(ParseError::TooLarge.into()); *this.error = Some(ParseError::TooLarge.into());
break; break;
} }
Err(err) => { Err(err) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::EncodingCorrupted); payload.set_error(PayloadError::EncodingCorrupted);
@ -725,6 +762,7 @@ where
this.messages.push_back(DispatcherMessage::Error( this.messages.push_back(DispatcherMessage::Error(
Response::bad_request().drop_body(), Response::bad_request().drop_body(),
)); ));
this.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
*this.error = Some(err.into()); *this.error = Some(err.into());
break; break;
@ -737,6 +775,7 @@ where
*this.ka_deadline = expire; *this.ka_deadline = expire;
} }
} }
Ok(updated) Ok(updated)
} }
@ -755,24 +794,27 @@ where
if this.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
if let Some(deadline) = this.codec.config().client_disconnect_deadline() { if let Some(deadline) = this.codec.config().client_disconnect_deadline() {
// write client disconnect time out and poll again to // write client disconnect time out and poll again to
// go into Some<Pin<&mut Sleep>> branch // go into Some(timer) branch
this.ka_timer.set(Some(sleep_until(deadline))); this.ka_timer.set(Some(sleep_until(deadline)));
return self.poll_keepalive(cx); return self.poll_keepalive(cx);
} }
} }
} }
Some(mut timer) => { Some(mut timer) => {
// only operate when keep-alive timer is resolved. // only operate when keep-alive timer is resolved
if timer.as_mut().poll(cx).is_ready() { if timer.as_mut().poll(cx).is_ready() {
// got timeout during shutdown, drop connection // timed-out during shutdown; drop connection
if this.flags.contains(Flags::SHUTDOWN) { if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);
// exceed deadline. check for any outstanding tasks }
} else if timer.deadline() >= *this.ka_deadline {
// exceeded deadline; check for any outstanding tasks
if timer.deadline() >= *this.ka_deadline {
if this.state.is_empty() && this.write_buf.is_empty() { if this.state.is_empty() && this.write_buf.is_empty() {
// have no task at hand // have no task at hand
if this.flags.contains(Flags::STARTED) { if this.flags.contains(Flags::STARTED) {
trace!("Keep-alive timeout, close connection"); trace!("keep-alive timed out; closing connection");
this.flags.insert(Flags::SHUTDOWN); this.flags.insert(Flags::SHUTDOWN);
// start shutdown timeout // start shutdown timeout
@ -787,11 +829,17 @@ where
} }
} else { } else {
// timeout on first request (slow request) return 408 // timeout on first request (slow request) return 408
trace!("Slow request timeout");
trace!(
"timed out on slow request; \
replying with 408 and closing connection"
);
let _ = self.as_mut().send_error_response( let _ = self.as_mut().send_error_response(
Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
BoxBody::new(()), BoxBody::new(()),
); );
this = self.project(); this = self.project();
this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN);
} }
@ -810,15 +858,16 @@ where
} }
} }
} }
Ok(()) Ok(())
} }
/// Returns true when io stream can be disconnected after write to it. /// Returns true when I/O stream can be disconnected after write to it.
/// ///
/// It covers these conditions: /// It covers these conditions:
/// - `std::io::ErrorKind::ConnectionReset` after partial read. /// - `std::io::ErrorKind::ConnectionReset` after partial read.
/// - all data read done. /// - all data read done.
#[inline(always)] #[inline(always)] // TODO: bench this inline
fn read_available( fn read_available(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -1004,7 +1053,7 @@ where
let is_empty = inner.state.is_empty(); let is_empty = inner.state.is_empty();
let inner_p = inner.as_mut().project(); let inner_p = inner.as_mut().project();
// read half is closed and we do not processing any responses // read half is closed; we do not process any responses
if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty {
inner_p.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
} }

View File

@ -86,7 +86,7 @@ where
let this = self.get_mut(); let this = self.get_mut();
match Pin::new(&mut this.handshake).poll(cx)? { match Pin::new(&mut this.handshake).poll(cx)? {
// return the timer on success handshake. It can be re-used for h2 ping-pong. // return the timer on success handshake; its slot can be re-used for h2 ping-pong
Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
Poll::Pending => match this.timer.as_mut() { Poll::Pending => match this.timer.as_mut() {
Some(timer) => { Some(timer) => {

View File

@ -130,8 +130,8 @@ impl RequestHead {
} }
} }
/// Request contains `EXPECT` header.
#[inline] #[inline]
/// Request contains `EXPECT` header
pub fn expect(&self) -> bool { pub fn expect(&self) -> bool {
self.flags.contains(Flags::EXPECT) self.flags.contains(Flags::EXPECT)
} }

View File

@ -210,7 +210,11 @@ async fn test_slow_request() {
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n");
let mut data = String::new(); let mut data = String::new();
let _ = stream.read_to_string(&mut data); let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); assert!(
data.starts_with("HTTP/1.1 408 Request Timeout"),
"response was not 408: {}",
data
);
srv.stop().await; srv.stop().await;
} }