From f0faadad09f2169a95737ac09ae063be9bf631ec Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 29 Jan 2022 03:28:55 +0000 Subject: [PATCH] rewrite h1 timeout handling --- actix-http/src/h1/codec.rs | 2 + actix-http/src/h1/dispatcher.rs | 438 +++++++++++++++----------- actix-http/src/h1/dispatcher_tests.rs | 25 +- actix-http/src/h1/encoder.rs | 5 +- actix-http/src/h1/mod.rs | 5 +- actix-http/src/responses/head.rs | 25 +- actix-http/src/test.rs | 2 +- actix-http/tests/test_server.rs | 35 +- src/app.rs | 4 +- 9 files changed, 328 insertions(+), 213 deletions(-) diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index a630110f1..719a7c354 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -179,9 +179,11 @@ impl Encoder, BodySize)>> for Codec { &self.config, )?; } + Message::Chunk(Some(bytes)) => { self.encoder.encode_chunk(bytes.as_ref(), dst)?; } + Message::Chunk(None) => { self.encoder.encode_eof(dst)?; } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 21797555c..32f42c3cd 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -38,20 +38,23 @@ const MAX_PIPELINED_MESSAGES: usize = 16; bitflags! { pub struct Flags: u8 { - /// Set when first byte is read from stream. + /// Set when stream is read for first time. const STARTED = 0b0000_0001; - /// Set if connection is in keep-alive state. - const KEEP_ALIVE = 0b0000_0010; + /// Set when full request-response cycle has occurred. + const FINISHED = 0b0000_0010; + + /// Set if connection is in keep-alive (inactive) state. + const KEEP_ALIVE = 0b0000_0100; /// Set if in shutdown procedure. - const SHUTDOWN = 0b0000_0100; + const SHUTDOWN = 0b0000_1000; /// Set if read-half is disconnected. - const READ_DISCONNECT = 0b0000_1000; + const READ_DISCONNECT = 0b0001_0000; /// Set if write-half is disconnected. - const WRITE_DISCONNECT = 0b0001_0000; + const WRITE_DISCONNECT = 0b0010_0000; } } @@ -152,11 +155,6 @@ pin_project! { payload: Option, messages: VecDeque, - // // Initialized as initial KA deadline or current time. - // // Updated when messages are read from stream and after timer is used for - // // first-request timeout. - // ka_deadline: Instant, - head_timer: TimerState, ka_timer: TimerState, shutdown_timer: TimerState, @@ -188,11 +186,27 @@ impl TimerState { matches!(self, Self::Active { .. } | Self::Inactive) } - fn set(&mut self, timer: Sleep) { + fn set(&mut self, timer: Sleep, line: u32) { + if !self.is_enabled() { + warn!("setting disabled timer from line {}", line); + } + *self = Self::Active { timer: Box::pin(timer), }; } + + fn clear(&mut self, line: u32) { + if !self.is_enabled() { + warn!("trying to clear a disabled timer from line {}", line); + } + + if matches!(self, Self::Inactive) { + warn!("trying to clear an inactive timer from line {}", line); + } + + *self = Self::Inactive; + } } impl fmt::Display for TimerState { @@ -410,7 +424,7 @@ where fn send_response_inner( self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: &impl MessageBody, ) -> Result { log::trace!("enter InnerDispatcher::send_response_inner"); @@ -420,7 +434,7 @@ where let size = body.size(); this.codec - .encode(Message::Item((message, size)), this.write_buf) + .encode(Message::Item((res, size)), this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); @@ -429,39 +443,51 @@ where DispatchError::Io(err) })?; - this.flags.set(Flags::KEEP_ALIVE, this.codec.keepalive()); + let conn_keep_alive = this.codec.keepalive(); + this.flags.set(Flags::KEEP_ALIVE, conn_keep_alive); + + if !conn_keep_alive { + log::trace!("clearing keep-alive timer"); + this.ka_timer.clear(line!()); + } Ok(size) } fn send_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: B, ) -> Result<(), DispatchError> { log::trace!("enter InnerDispatcher::send_response"); - let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendPayload { body }, - }; - self.project().state.set(state); + }); Ok(()) } fn send_error_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: BoxBody, ) -> Result<(), DispatchError> { log::trace!("enter InnerDispatcher::send_error_response"); - let size = self.as_mut().send_response_inner(message, &body)?; - - self.project().state.set(match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendErrorPayload { body }, }); @@ -480,70 +506,87 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - log::trace!("enter InnerDispatcher::poll_response"); - 'res: loop { + log::trace!("enter InnerDispatcher::poll_response loop iteration"); + let mut this = self.as_mut().project(); match this.state.as_mut().project() { - // no future is in InnerDispatcher state. pop next message. + // no future is in InnerDispatcher state; pop next message StateProj::None => match this.messages.pop_front() { - // handle request message. + // handle request message Some(DispatcherMessage::Item(req)) => { // Handle `EXPECT: 100-Continue` header if req.head().expect() { - // set InnerDispatcher state and continue loop to poll it. + log::trace!(" passing request to expect handler"); + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.expect.call(req); this.state.set(State::ExpectCall { fut }); } else { - // the same as expect call. + log::trace!(" passing request to service handler"); + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); }; } - // handle error message. + // handle error message Some(DispatcherMessage::Error(res)) => { - // send_response would update InnerDispatcher state to SendPayload or - // None(If response body is empty). - // continue loop to poll it. + log::trace!(" handling dispatcher error message"); + // send_response would update InnerDispatcher state to SendPayload or None + // (If response body is empty) + // continue loop to poll it self.as_mut().send_error_response(res, BoxBody::new(()))?; } - // return with upgrade request and poll it exclusively. + // return with upgrade request and poll it exclusively Some(DispatcherMessage::Upgrade(req)) => { + // return upgrade return Ok(PollResponse::Upgrade(req)); } - // all messages are dealt with. - None => return Ok(PollResponse::DoNothing), + // all messages are dealt with + None => { + log::trace!("all messages handled"); + return Ok(PollResponse::DoNothing); + } }, - StateProj::ServiceCall { fut } => match fut.poll(cx) { - // service call resolved. send response. - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body)?; - } - // send service call error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } + StateProj::ServiceCall { fut } => { + log::trace!(" calling request handler service"); - // service call pending and could be waiting for more chunk messages. - // (pipeline message limit and/or payload can_read limit) - Poll::Pending => { - // no new message is decoded and no new payload is feed. - // nothing to do except waiting for new incoming data from client. - if !self.as_mut().poll_request(cx)? { - return Ok(PollResponse::DoNothing); + match fut.poll(cx) { + // service call resolved. send response. + Poll::Ready(Ok(res)) => { + log::trace!(" ok"); + let (res, body) = res.into().replace_body(()); + self.as_mut().send_response(res, body)?; + } + + // send service call error as response + Poll::Ready(Err(err)) => { + log::trace!(" error"); + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // service call pending and could be waiting for more chunk messages + // (pipeline message limit and/or payload can_read limit) + Poll::Pending => { + log::trace!(" pending"); + // no new message is decoded and no new payload is fed + // nothing to do except waiting for new incoming data from client + if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + } + // else loop } - // otherwise keep loop. } - }, + } StateProj::SendPayload { mut body } => { + log::trace!("sending payload"); + // keep populate writer buffer until buffer size limit hit, // get blocked or finished. while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { @@ -555,14 +598,18 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // payload stream finished. // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { - return Err(DispatchError::Body(err.into())) + this.flags.insert(Flags::FINISHED); + return Err(DispatchError::Body(err.into())); } Poll::Pending => return Ok(PollResponse::DoNothing), @@ -574,6 +621,8 @@ where } StateProj::SendErrorPayload { mut body } => { + log::trace!(" sending error payload"); + // TODO: de-dupe impl with SendPayload // keep populate writer buffer until buffer size limit hit, @@ -587,7 +636,7 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; - // payload stream finished. + // payload stream finished // set state to None and handle next message this.state.set(State::None); continue 'res; @@ -602,31 +651,36 @@ where Poll::Pending => return Ok(PollResponse::DoNothing), } } - // buffer is beyond max size. - // return and try to write the whole buffer to io stream. + + // buffer is beyond max size + // return and try to write the whole buffer to stream return Ok(PollResponse::DrainWriteBuf); } - StateProj::ExpectCall { fut } => match fut.poll(cx) { - // expect resolved. write continue to buffer and set InnerDispatcher state - // to service call. - Poll::Ready(Ok(req)) => { - this.write_buf - .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); - } + StateProj::ExpectCall { fut } => { + log::trace!(" calling expect service"); - // send expect error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } + match fut.poll(cx) { + // expect resolved. write continue to buffer and set InnerDispatcher state + // to service call. + Poll::Ready(Ok(req)) => { + this.write_buf + .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + } - // expect must be solved before progress can be made. - Poll::Pending => return Ok(PollResponse::DoNothing), - }, + // send expect error as response + Poll::Ready(Err(err)) => { + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // expect must be solved before progress can be made. + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } } } } @@ -686,22 +740,22 @@ where StateProj::ServiceCall { fut } => { // return no matter the service call future's result. return match fut.poll(cx) { - // future is resolved. send response and return a result. On success + // Future is resolved. Send response and return a result. On success // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); - self.send_response(res, body) + self.as_mut().send_response(res, body) } // see the comment on ExpectCall state branch's Pending Poll::Pending => Ok(()), - // see the comment on ExpectCall state branch's Ready(Err(err)) + // see the comment on ExpectCall state branch's Ready(Err(_)) Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); - self.send_error_response(res, body) + self.as_mut().send_error_response(res, body) } }; } @@ -730,7 +784,6 @@ where return Ok(false); } - let mut updated = false; let mut this = self.as_mut().project(); loop { @@ -738,11 +791,12 @@ where match this.codec.decode(this.read_buf) { Ok(Some(msg)) => { - updated = true; - // this.flags.insert(Flags::STARTED); + log::trace!("found full frame (head)"); match msg { Message::Item(mut req) => { + this.head_timer.clear(line!()); + req.head_mut().peer_addr = *this.peer_addr; req.conn_data = this.conn_data.as_ref().map(Rc::clone); @@ -811,14 +865,15 @@ where } } - // decode is partial and buffer is not full yet. - // break and wait for more read. + // decode is partial and buffer is not full yet + // break and wait for more read Ok(None) => { log::trace!("found partial frame"); break; } Err(ParseError::Io(err)) => { + log::trace!("io error: {}", &err); self.as_mut().client_disconnected(); this = self.as_mut().project(); *this.error = Some(DispatchError::Io(err)); @@ -826,6 +881,8 @@ where } Err(ParseError::TooLarge) => { + log::trace!("request head is too big"); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Overflow); } @@ -844,11 +901,13 @@ where } Err(err) => { + log::trace!("parse error {}", &err); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } - // Malformed requests should be responded with 400 + // malformed requests should be responded with 400 this.messages.push_back(DispatcherMessage::Error( Response::bad_request().drop_body(), )); @@ -860,14 +919,8 @@ where } } - // if updated && this.timer.is_some() { - // if let Some(expire) = this.config.keep_alive_deadline() { - // log::trace!("set keep-alive deadline"); - // *this.ka_deadline = expire; - // } - // } - - Ok(updated) + // TODO: what's this boolean do now? + Ok(false) } fn poll_head_timer( @@ -883,7 +936,7 @@ where log::trace!( "timed out on slow request; \ - replying with 408 and closing connection" + replying with 408 and closing connection" ); let _ = self.as_mut().send_error_response( @@ -907,9 +960,38 @@ where ) -> Result<(), DispatchError> { let this = self.as_mut().project(); match this.ka_timer { + TimerState::Active { timer } => { + debug_assert!( + this.flags.contains(Flags::KEEP_ALIVE), + "keep-alive flag should be set when timer is active", + ); + debug_assert!( + this.state.is_none(), + "dispatcher should not be in keep-alive phase if state is not none", + ); + debug_assert!( + this.write_buf.is_empty(), + "dispatcher should not be in keep-alive phase if write_buf is not empty", + ); + + // keep-alive timer has timed out + if timer.as_mut().poll(cx).is_ready() { + // no tasks at hand + log::trace!("timer timed out; closing connection"); + this.flags.insert(Flags::SHUTDOWN); + + if let Some(deadline) = this.config.client_disconnect_deadline() { + // start shutdown timeout if enabled + log::trace!("starting disconnect timer"); + this.shutdown_timer.set(sleep_until(deadline), line!()); + } else { + // no shutdown timeout, drop socket + this.flags.insert(Flags::WRITE_DISCONNECT); + } + } + } TimerState::Disabled => {} TimerState::Inactive => {} - TimerState::Active { timer } => {} } Ok(()) @@ -926,9 +1008,10 @@ where TimerState::Active { timer } => { debug_assert!( this.flags.contains(Flags::SHUTDOWN), - "shutdown flag should be set when timer is active" + "shutdown flag should be set when timer is active", ); + // timed-out during shutdown; drop connection if timer.as_mut().poll(cx).is_ready() { log::trace!("timed-out during shutdown"); return Err(DispatchError::DisconnectTimeout); @@ -950,7 +1033,7 @@ where Ok(()) } - /// Poll head/keep-alive/disconnect timer. + /// Poll head, keep-alive, and disconnect timer. fn poll_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> { log::trace!("enter InnerDispatcher::poll_timer"); trace_timer_states(&self.head_timer, &self.ka_timer, &self.shutdown_timer); @@ -959,77 +1042,13 @@ where self.as_mut().poll_ka_timer(cx)?; self.as_mut().poll_shutdown_timer(cx)?; - // Some(mut timer) => { - // let deadline = timer.deadline(); - - // // only operate when timer is resolved - // if timer.as_mut().poll(cx).is_ready() { - // log::trace!("timer reached deadline"); - - // // timed-out during shutdown; drop connection - // if this.flags.contains(Flags::SHUTDOWN) { - // log::trace!("timed-out during shutdown"); - // return Err(DispatchError::DisconnectTimeout); - // } - - // // exceeded deadline; check for any outstanding tasks - // if timer.deadline() >= *this.ka_deadline { - // if this.state.is_none() && this.write_buf.is_empty() { - // // no tasks at hand - // if this.flags.contains(Flags::KEEP_ALIVE) { - // log::trace!("timer timed out; closing connection"); - // this.flags.insert(Flags::SHUTDOWN); - - // // start shutdown timeout - // if let Some(deadline) = this.config.client_disconnect_deadline() - // { - // log::trace!("starting disconnect timer"); - // timer.as_mut().reset(deadline); - // let _ = timer.poll(cx); - // } else { - // // no shutdown timeout, drop socket - // this.flags.insert(Flags::WRITE_DISCONNECT); - // } - // } else { - // // timeout on first request (slow request) return 408 - - // log::trace!( - // "timed out on slow request; \ - // replying with 408 and closing connection" - // ); - - // let _ = self.as_mut().send_error_response( - // Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), - // BoxBody::new(()), - // ); - - // this = self.project(); - // this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - // } - // } else if let Some(deadline) = this.config.keep_alive_deadline() { - // // still have unfinished tasks; try to reset and register keep-alive - // log::trace!("starting keep-alive timer"); - // timer.as_mut().reset(deadline); - // let _ = timer.poll(cx); - // } - // } else { - // // timer resolved but still have not met the expire deadline - // // reset and register for later wakeup - // log::trace!("reset timer to keep-alive deadline"); - // timer.as_mut().reset(*this.ka_deadline); - // let _ = timer.poll(cx); - // } - // } - // } - // } - Ok(()) } /// Returns true when I/O stream can be disconnected after write to it. /// /// It covers these conditions: - /// - `std::io::ErrorKind::ConnectionReset` after partial read. + /// - `std::io::ErrorKind::ConnectionReset` after partial read; /// - all data read done. #[inline(always)] // TODO: bench this inline fn read_available( @@ -1037,10 +1056,12 @@ where cx: &mut Context<'_>, ) -> Result { log::trace!("enter InnerDispatcher::read_available"); + log::trace!(" reading from a {}", core::any::type_name::()); let this = self.project(); if this.flags.contains(Flags::READ_DISCONNECT) { + log::trace!(" read DC"); return Ok(false); }; @@ -1062,7 +1083,7 @@ where // When read_buf is beyond max buffer size the early return could be successfully // be parsed as a new Request. This case would not generate ParseError::TooLarge and // at this point IO stream is not fully read to Pending and would result in - // dispatcher stuck until timeout (KA) + // dispatcher stuck until timeout (keep-alive). // // Note: // This is a perf choice to reduce branch on ::decode. @@ -1096,18 +1117,31 @@ where match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { Poll::Ready(Ok(n)) => { + log::trace!(" read {} bytes", n); + if n == 0 { + log::trace!(" signalling should_disconnect"); return Ok(true); } + read_some = true; } - Poll::Pending => return Ok(false), + Poll::Pending => { + log::trace!(" read pending"); + return Ok(false); + } Poll::Ready(Err(err)) => { + log::trace!(" read err: {:?}", &err); + return match err.kind() { + // convert WouldBlock error to the same as Pending return io::ErrorKind::WouldBlock => Ok(false), + + // connection reset after partial read io::ErrorKind::ConnectionReset if read_some => Ok(true), + _ => Err(DispatchError::Io(err)), - } + }; } } } @@ -1147,6 +1181,7 @@ where #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + log::trace!(target: "", ""); log::trace!("enter Dispatcher::poll"); let this = self.as_mut().project(); @@ -1162,13 +1197,13 @@ where inner.as_mut().poll_timer(cx)?; - if inner.flags.contains(Flags::SHUTDOWN) { + let poll = if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { // flush buffer and wait on blocked ready!(inner.as_mut().poll_flush(cx))?; - Pin::new(inner.project().io.as_mut().unwrap()) + Pin::new(inner.as_mut().project().io.as_mut().unwrap()) .poll_shutdown(cx) .map_err(DispatchError::from) } @@ -1186,13 +1221,14 @@ where .as_mut() .project() .head_timer - .set(sleep_until(deadline)); + .set(sleep_until(deadline), line!()); } } inner.as_mut().poll_request(cx)?; if should_disconnect { + log::trace!("should_disconnect = true"); // I/O stream should to be closed let inner = inner.as_mut().project(); inner.flags.insert(Flags::READ_DISCONNECT); @@ -1205,9 +1241,25 @@ where // poll response to populate write buffer // drain indicates whether write buffer should be emptied before next run let drain = match inner.as_mut().poll_response(cx)? { - PollResponse::DrainWriteBuf => true, + PollResponse::DrainWriteBuf => { + inner.flags.contains(Flags::KEEP_ALIVE); + true + } - PollResponse::DoNothing => false, + PollResponse::DoNothing => { + if inner.flags.contains(Flags::KEEP_ALIVE) { + if let Some(deadline) = inner.config.keep_alive_timer() { + log::trace!("setting keep-alive timer"); + inner + .as_mut() + .project() + .ka_timer + .set(deadline, line!()); + } + } + + false + } // upgrade request and goes Upgrade variant of DispatcherState. PollResponse::Upgrade(req) => { @@ -1234,15 +1286,16 @@ where // client is gone if inner.flags.contains(Flags::WRITE_DISCONNECT) { + log::trace!("client is gone; disconnecting"); return Poll::Ready(Ok(())); } - let state_is_none = inner.state.is_none(); - let inner_p = inner.as_mut().project(); + let state_is_none = inner_p.state.is_none(); // read half is closed; we do not process any responses if inner_p.flags.contains(Flags::READ_DISCONNECT) && state_is_none { + log::trace!("read half closed; start shutdown"); inner_p.flags.insert(Flags::SHUTDOWN); } @@ -1255,14 +1308,17 @@ where return Poll::Ready(Err(err)); } - // // disconnect if keep-alive is not enabled - // if inner_p.flags.contains(Flags::STARTED) - // && !inner_p.flags.contains(Flags::KEEP_ALIVE) - // { - // log::trace!("shutdown because keep-alive is not enabled"); - // inner_p.flags.insert(Flags::SHUTDOWN); - // return self.poll(cx); - // } + // disconnect if keep-alive is not enabled + if inner_p.flags.contains(Flags::FINISHED) + && !inner_p.flags.contains(Flags::KEEP_ALIVE) + { + log::trace!( + "start shutdown because keep-alive is disabled or opted \ + out for this connection" + ); + inner_p.flags.insert(Flags::SHUTDOWN); + return self.poll(cx); + } // disconnect if shutdown if inner_p.flags.contains(Flags::SHUTDOWN) { @@ -1271,7 +1327,7 @@ where } } - log::trace!("but after all that, PENDING; wait for more data"); + log::trace!("dispatcher going to sleep; wait for next event"); trace_timer_states( inner_p.head_timer, @@ -1280,7 +1336,11 @@ where ); Poll::Pending - } + }; + + log::trace!("current flags: {:?}", &inner.flags); + + poll } DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 379019c6f..46f5b9153 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -17,7 +17,7 @@ use crate::{ h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, test::{TestBuffer, TestSeqBuffer}, - Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, + Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode, }; fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { @@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) { } fn ok_service() -> impl Service, Error = Error> { - fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) + status_service(StatusCode::OK) +} + +fn status_service( + status: StatusCode, +) -> impl Service, Error = Error> { + fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status)))) } fn echo_path_service( @@ -127,7 +133,7 @@ async fn late_request() { } #[actix_rt::test] -async fn test_basic() { +async fn oneshot_connection() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); @@ -179,7 +185,7 @@ async fn test_basic() { } #[actix_rt::test] -async fn test_keep_alive_timeout() { +async fn keep_alive_timeout() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); @@ -252,7 +258,7 @@ async fn test_keep_alive_timeout() { } #[actix_rt::test] -async fn test_keep_alive_follow_up_req() { +async fn keep_alive_follow_up_req() { let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None); @@ -371,7 +377,7 @@ async fn test_keep_alive_follow_up_req() { } #[actix_rt::test] -async fn test_req_parse_err() { +async fn req_parse_err() { lazy(|cx| { let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); @@ -531,7 +537,7 @@ async fn pipelining_ok_then_bad() { } #[actix_rt::test] -async fn test_expect() { +async fn expect_handling() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); @@ -562,7 +568,6 @@ async fn test_expect() { // polls: manual assert_eq!(h1.poll_count, 1); - eprintln!("poll count: {}", h1.poll_count); if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); @@ -603,7 +608,7 @@ async fn test_expect() { } #[actix_rt::test] -async fn test_eager_expect() { +async fn expect_eager() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); @@ -663,7 +668,7 @@ async fn test_eager_expect() { } #[actix_rt::test] -async fn test_upgrade() { +async fn upgrade_handling() { struct TestUpgrade; impl Service<(Request, Framed)> for TestUpgrade { diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 5fcb2f688..091b71b24 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -318,16 +318,17 @@ impl MessageType for RequestHeadType { } impl MessageEncoder { - /// Encode message + /// Encode chunk. pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result { self.te.encode(msg, buf) } - /// Encode eof + /// Encode EOF. pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { self.te.encode_eof(buf) } + /// Encode message. pub fn encode( &mut self, dst: &mut BytesMut, diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 8c569165d..662a1ef98 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -28,9 +28,10 @@ pub use self::utils::SendResponse; #[derive(Debug)] /// Codec message pub enum Message { - /// Http message + /// HTTP message. Item(T), - /// Payload chunk + + /// Payload chunk. Chunk(Option), } diff --git a/actix-http/src/responses/head.rs b/actix-http/src/responses/head.rs index f21957349..41d558440 100644 --- a/actix-http/src/responses/head.rs +++ b/actix-http/src/responses/head.rs @@ -208,16 +208,20 @@ mod tests { }; use memchr::memmem; + use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; use crate::{ + h1::H1Service, header::{HeaderName, HeaderValue}, - Error, HttpService, Request, Response, + Error, Request, Response, ServiceConfig, }; #[actix_rt::test] async fn camel_case_headers() { + let _ = env_logger::try_init(); + let mut srv = actix_http_test::test_server(|| { - HttpService::new(|req: Request| async move { + H1Service::with_config(ServiceConfig::default(), |req: Request| async move { let mut res = Response::ok(); if req.path().contains("camel") { @@ -228,16 +232,21 @@ mod tests { HeaderName::from_static("foo-bar"), HeaderValue::from_static("baz"), ); + Ok::<_, Error>(res) }) .tcp() }) .await; - let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n"); + let mut stream = tokio::net::TcpStream::connect(srv.addr()).await.unwrap(); + dbg!(stream.ready(Interest::WRITABLE).await.unwrap()); + let _ = stream + .write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n") + .await + .unwrap(); let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream.read_to_end(&mut data).await.unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_some()); assert!(memmem::find(&data, b"foo-bar").is_none()); @@ -247,9 +256,11 @@ mod tests { assert!(memmem::find(&data, b"content-length").is_none()); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n"); + let _ = stream + .write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n") + .unwrap(); let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream.read_to_end(&mut data).unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_none()); assert!(memmem::find(&data, b"foo-bar").is_some()); diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 0d4d342ec..6212c19d1 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -242,7 +242,7 @@ impl io::Read for TestBuffer { impl io::Write for TestBuffer { fn write(&mut self, buf: &[u8]) -> io::Result { - RefCell::borrow_mut(&self.write_buf).extend(buf); + self.write_buf.borrow_mut().extend(buf); Ok(buf.len()) } diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index d51cbcf69..a6b5de470 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -197,7 +197,40 @@ async fn test_chunked_payload() { } #[actix_rt::test] -async fn test_slow_request() { +async fn test_slow_request_close() { + let mut srv = test_server(|| { + HttpService::build() + .client_timeout(200) + .keep_alive(2) + .finish(|_| ok::<_, Infallible>(Response::ok())) + .tcp() + }) + .await; + + let start = Instant::now(); + + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); + + sleep(Duration::from_secs(1)).await; + + let mut data = Vec::new(); + let _ = stream.read(&mut data).unwrap(); + assert!( + data.is_empty(), + "connection should close without writing a response" + ); + + let end = Instant::now(); + + if end.duration_since(start) > Duration::from_secs(1) { + panic!("request took way too long to time out"); + } + + srv.stop().await; +} + +#[actix_rt::test] +async fn test_slow_request_408() { let mut srv = test_server(|| { HttpService::build() .client_timeout(200) diff --git a/src/app.rs b/src/app.rs index da33ebc4b..c6582766a 100644 --- a/src/app.rs +++ b/src/app.rs @@ -238,7 +238,9 @@ where /// Default service to be used if no matching resource could be found. /// - /// It is possible to use services like `Resource`, `Route`. + /// It is possible to use services like `Route` and [`NamedFile`]. + /// + /// [`NamedFile`]: https://docs.rs/actix-files/latest/actix_files/struct.NamedFile.html /// /// ``` /// use actix_web::{web, App, HttpResponse};