From 15b8dba2c78959c8a4322bd979999f716a2db3cd Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 3 Feb 2021 04:32:17 -0800 Subject: [PATCH] clean up h1 dispatcher. add comments --- Cargo.toml | 7 +- actix-files/Cargo.toml | 2 +- actix-http-test/Cargo.toml | 2 +- actix-http/Cargo.toml | 3 +- actix-http/src/h1/decoder.rs | 2 +- actix-http/src/h1/dispatcher.rs | 323 +++++++++++++++------------- actix-http/src/header/into_value.rs | 2 +- actix-multipart/Cargo.toml | 2 +- actix-web-actors/Cargo.toml | 2 +- actix-web-codegen/Cargo.toml | 2 +- awc/Cargo.toml | 2 +- 11 files changed, 182 insertions(+), 167 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 754335baa..2ad92690c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,8 +75,8 @@ required-features = ["rustls"] [dependencies] actix-codec = "0.4.0-beta.1" actix-macros = "0.2.0" -actix-router = "0.2.4" -actix-rt = "2.0.0-beta.2" +actix-router = "0.2.6" +actix-rt = "2" actix-server = "2.0.0-beta.2" actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" @@ -137,8 +137,7 @@ actix-server = { git = "https://github.com/actix/actix-net.git" } actix-tls = { git = "https://github.com/actix/actix-net.git" } actix-utils = { git = "https://github.com/actix/actix-net.git" } actix-router = { git = "https://github.com/actix/actix-net.git" } -actix-macros = { git = "https://github.com/actix/actix-net" } -actix = { git = "https://github.com/actix/actix", branch = "feat/actix-rt-2" } +actix = { git = "https://github.com/actix/actix.git", branch = "feat/actix-rt-2" } [[bench]] name = "server" diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index bde2cb717..9889ea813 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -31,5 +31,5 @@ percent-encoding = "2.1" v_htmlescape = "0.12" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2" actix-web = "4.0.0-beta.1" diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index 772b60f76..b678c4fcc 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -33,7 +33,7 @@ actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-tls = "3.0.0-beta.2" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = "2" actix-server = "2.0.0-beta.2" awc = "3.0.0-beta.1" diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index cbf5dcf2c..d6b9843d7 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -43,7 +43,7 @@ actors = ["actix"] actix-service = "2.0.0-beta.3" actix-codec = "0.4.0-beta.1" actix-utils = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.3" +actix-rt = "2" actix-tls = "3.0.0-beta.2" actix = { version = "0.11.0-beta.1", optional = true } @@ -70,6 +70,7 @@ log = "0.4" mime = "0.3" percent-encoding = "2.1" pin-project = "1.0.0" +pin-project-lite = "0.2" rand = "0.8" regex = "1.3" serde = "1.0" diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index 85379b084..7c412d8fd 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -14,7 +14,7 @@ use crate::header::HeaderMap; use crate::message::{ConnectionType, ResponseHead}; use crate::request::Request; -const MAX_BUFFER_SIZE: usize = 131_072; +pub(crate) const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; /// Incoming message decoder diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index bcab82ac0..41123625d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -45,7 +45,7 @@ bitflags! { } } -#[pin_project::pin_project] +#[pin_project] /// Dispatcher for HTTP/1.1 protocol pub struct Dispatcher where @@ -139,11 +139,8 @@ where fn is_empty(&self) -> bool { matches!(self, State::None) } - - fn is_call(&self) -> bool { - matches!(self, State::ServiceCall(_)) - } } + enum PollResponse { Upgrade(Request), DoNothing, @@ -345,55 +342,81 @@ where ) -> Result { loop { let mut this = self.as_mut().project(); - // state is not changed on Poll::Pending. - // other variant and conditions always trigger a state change(or an error). - let state_change = match this.state.project() { + match this.state.as_mut().project() { + // no future is in InnerDispatcher state. pop next message. StateProj::None => match this.messages.pop_front() { + // handle request message. Some(DispatcherMessage::Item(req)) => { - self.as_mut().handle_request(req, cx)?; - true + // Handle `EXPECT: 100-Continue` header + if req.head().expect() { + // set InnerDispatcher state and continue loop to poll it. + let task = this.flow.expect.call(req); + this.state.set(State::ExpectCall(task)); + } else { + // the same as expect call. + let task = this.flow.service.call(req); + this.state.set(State::ServiceCall(task)); + }; } + // handle error message. Some(DispatcherMessage::Error(res)) => { + // send_response would update InnerDispatcher state to SendPayload or + // None(If response body is empty). + // continue loop to poll it. self.as_mut() .send_response(res, ResponseBody::Other(Body::Empty))?; - true } + // return with upgrade request and poll it exclusively. Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } - None => false, + // all messages are dealt with. + None => return Ok(PollResponse::DoNothing), }, StateProj::ExpectCall(fut) => match fut.poll(cx) { + // expect resolved. write continue to buffer and set InnerDispatcher state + // to service call. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); this = self.as_mut().project(); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall(fut)); - continue; } + // send expect error as response Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); self.as_mut().send_response(res, body.into_body())?; - true } - Poll::Pending => false, + // expect must be solved before progress can be made. + Poll::Pending => return Ok(PollResponse::DoNothing), }, StateProj::ServiceCall(fut) => match fut.poll(cx) { + // service call resolved. send response. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.as_mut().send_response(res, body)?; - continue; } + // send service call error as response Poll::Ready(Err(e)) => { let res: Response = e.into().into(); let (res, body) = res.replace_body(()); self.as_mut().send_response(res, body.into_body())?; - true } - Poll::Pending => false, + // service call pending and could be waiting for more chunk messages. + // (pipeline message limit and/or payload can_read limit) + Poll::Pending => { + // no new message is decoded and no new payload is feed. + // nothing to do except waiting for new incoming data from client. + if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + } + // otherwise keep loop. + } }, StateProj::SendPayload(mut stream) => { + // keep populate writer buffer until buffer size limit hit, + // get blocked or finished. loop { if this.write_buf.len() < HW_BUFFER_SIZE { match stream.as_mut().poll_next(cx) { @@ -402,50 +425,42 @@ where Message::Chunk(Some(item)), &mut this.write_buf, )?; - continue; } Poll::Ready(None) => { this.codec.encode( Message::Chunk(None), &mut this.write_buf, )?; - this = self.as_mut().project(); - this.state.set(State::None); + // payload stream finished. + // break and goes out of scope of borrowed stream. + break; } - Poll::Ready(Some(Err(_))) => { - return Err(DispatchError::Unknown) + Poll::Ready(Some(Err(e))) => { + return Err(DispatchError::Service(e)) } + // Payload Stream Pending should only be given when the caller + // promise to wake it up properly. + // + // TODO: Think if it's an good idea to mix in a self wake up. + // It would turn dispatcher into a busy polling style of stream + // handling. (Or another timer as source of scheduled wake up) + // As There is no way to know when or how the caller would wake + // up the stream so a self wake up is an overhead that would + // result in a double polling(or an extra timer) Poll::Pending => return Ok(PollResponse::DoNothing), } } else { + // buffer is beyond max size. + // return and write the whole buffer to io stream. return Ok(PollResponse::DrainWriteBuf); } - break; } - continue; - } - }; - - // state is changed and continue when the state is not Empty - if state_change { - if !self.state.is_empty() { - continue; - } - } else { - // if read-backpressure is enabled and we consumed some data. - // we may read more data and retry - if self.state.is_call() { - if self.as_mut().poll_request(cx)? { - continue; - } - } else if !self.messages.is_empty() { - continue; + // break from Poll::Ready(None) on stream finished. + // this is for re borrow InnerDispatcher state and set it to None. + this.state.set(State::None); } } - break; } - - Ok(PollResponse::DoNothing) } fn handle_request( @@ -457,7 +472,7 @@ where // Handle `EXPECT: 100-Continue` header if req.head().expect() { - // set dispatcher state so the future is pinned. + // set InnerDispatcher state so the future is pinned. let task = this.flow.expect.call(req); this.state.set(State::ExpectCall(task)); } else { @@ -468,26 +483,22 @@ where // eagerly poll the future for once(or twice if expect is resolved immediately). loop { - match this.state.project() { + match this.state.as_mut().project() { StateProj::ExpectCall(fut) => { match fut.poll(cx) { // expect is resolved. continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { - self.as_mut().send_continue(); - this = self.as_mut().project(); let task = this.flow.service.call(req); - this.state.set(State::ServiceCall(task)); - continue; + this.state.as_mut().set(State::ServiceCall(task)); } - // future is pending. return Ok(()) to notify that a new state is - // set and the outer loop should be continue. + // future is pending. return Ok(()) to notify that a new InnerDispatcher + // state is set and the outer loop should be continue. Poll::Pending => return Ok(()), // future is error. send response and return a result. On success - // to notify the dispatcher a new state is set and the outer loop - // should be continue. + // to notify the dispatcher a new InnerDispatcher state is set and + // the outer loop should be continue. Poll::Ready(Err(e)) => { - let e = e.into(); - let res: Response = e.into(); + let res: Response = e.into().into(); let (res, body) = res.replace_body(()); return self.send_response(res, body.into_body()); } @@ -497,8 +508,8 @@ where // return no matter the service call future's result. return match fut.poll(cx) { // future is resolved. send response and return a result. On success - // to notify the dispatcher a new state is set and the outer loop - // should be continue. + // to notify the dispatcher a new InnerDispatcher state is set and the + // outer loop should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) @@ -540,25 +551,28 @@ where match msg { Message::Item(mut req) => { - let pl = this.codec.message_type(); req.head_mut().peer_addr = *this.peer_addr; // merge on_connect_ext data into request extensions this.on_connect_data.merge_into(&mut req); - if pl == MessageType::Stream && this.flow.upgrade.is_some() { - this.messages.push_back(DispatcherMessage::Upgrade(req)); - break; - } - if pl == MessageType::Payload || pl == MessageType::Stream { - let (ps, pl) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::Payload::H1(pl)); - req = req1; - *this.payload = Some(ps); + match this.codec.message_type() { + MessageType::Stream if this.flow.upgrade.is_some() => { + this.messages + .push_back(DispatcherMessage::Upgrade(req)); + break; + } + MessageType::Payload | MessageType::Stream => { + let (ps, pl) = Payload::create(false); + let (req1, _) = + req.replace_payload(crate::Payload::H1(pl)); + req = req1; + *this.payload = Some(ps); + } + MessageType::None => {} } - // handle request early + // handle request early if no future lives in InnerDispatcher state. if this.state.is_empty() { self.as_mut().handle_request(req, cx)?; this = self.as_mut().project(); @@ -624,6 +638,7 @@ where *this.ka_expire = expire; } } + Ok(updated) } @@ -711,6 +726,75 @@ where } Ok(()) } + + /// Returns true when io stream can be disconnected after write to it. + /// + /// It covers these conditions: + /// + /// - `Flags::READ_DISCONNECT` flag active. + /// - `std::io::ErrorKind::ConnectionReset` after partial read. + /// - all data read done. + #[inline(always)] + fn read_available( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result { + let this = self.project(); + + if this.flags.contains(Flags::READ_DISCONNECT) { + return Ok(true); + }; + + let buf = this.read_buf; + let mut io = Pin::new(this.io.as_mut().unwrap()); + + let mut read_some = false; + + loop { + // grow buffer if necessary. + let remaining = buf.capacity() - buf.len(); + if remaining < LW_BUFFER_SIZE { + buf.reserve(HW_BUFFER_SIZE - remaining); + } + + match actix_codec::poll_read_buf(io.as_mut(), cx, buf) { + Poll::Pending => return Ok(false), + Poll::Ready(Ok(n)) => { + if n == 0 { + return Ok(true); + } else { + // Return early when read buf exceed decoder's max buffer size. + if buf.len() >= super::decoder::MAX_BUFFER_SIZE { + return Ok(false); + } + read_some = true; + } + } + Poll::Ready(Err(err)) => { + return if err.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { + Ok(true) + } else { + Err(DispatchError::Io(err)) + } + } + } + } + } + + /// call upgrade service with request. + fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future { + let this = self.project(); + let mut parts = FramedParts::with_read_buf( + this.io.take().unwrap(), + mem::take(this.codec), + mem::take(this.read_buf), + ); + parts.write_buf = mem::take(this.write_buf); + let framed = Framed::from_parts(parts); + this.flow.upgrade.as_ref().unwrap().call((req, framed)) + } } impl Future for Dispatcher @@ -744,9 +828,10 @@ where if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { - // flush buffer + // flush buffer. inner.as_mut().poll_flush(cx)?; if !inner.write_buf.is_empty() { + // still have unfinished data. wait. Poll::Pending } else { Pin::new(inner.project().io.as_mut().unwrap()) @@ -755,21 +840,14 @@ where } } } else { - // read socket into a buf - let should_disconnect = - if !inner.flags.contains(Flags::READ_DISCONNECT) { - let mut inner_p = inner.as_mut().project(); - read_available( - cx, - inner_p.io.as_mut().unwrap(), - &mut inner_p.read_buf, - )? - } else { - None - }; + // read from io stream and fill read buffer. + let should_disconnect = inner.as_mut().read_available(cx)?; + // parse read buffer into http requests and payloads. inner.as_mut().poll_request(cx)?; - if let Some(true) = should_disconnect { + + // io stream should to be closed. + if should_disconnect { let inner_p = inner.as_mut().project(); inner_p.flags.insert(Flags::READ_DISCONNECT); if let Some(mut payload) = inner_p.payload.take() { @@ -778,6 +856,7 @@ where }; loop { + // grow buffer if necessary. let inner_p = inner.as_mut().project(); let remaining = inner_p.write_buf.capacity() - inner_p.write_buf.len(); @@ -785,24 +864,14 @@ where inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining); } + // poll_response and populate write buffer. + // drain indicate if write buffer should be emptied before next run. let drain = match inner.as_mut().poll_response(cx)? { PollResponse::DrainWriteBuf => true, PollResponse::DoNothing => false, + // upgrade request and goes Upgrade variant of DispatcherState. PollResponse::Upgrade(req) => { - let inner_p = inner.as_mut().project(); - let mut parts = FramedParts::with_read_buf( - inner_p.io.take().unwrap(), - mem::take(inner_p.codec), - mem::take(inner_p.read_buf), - ); - parts.write_buf = mem::take(inner_p.write_buf); - let framed = Framed::from_parts(parts); - let upgrade = inner_p - .flow - .upgrade - .as_ref() - .unwrap() - .call((req, framed)); + let upgrade = inner.upgrade(req); self.as_mut() .project() .inner @@ -827,6 +896,7 @@ where return Poll::Ready(Ok(())); } + // check if still have unsolved future in InnerDispatcher state. let is_empty = inner.state.is_empty(); let inner_p = inner.as_mut().project(); @@ -866,61 +936,6 @@ where } } -/// Returns either: -/// - `Ok(Some(true))` - data was read and done reading all data. -/// - `Ok(Some(false))` - data was read but there should be more to read. -/// - `Ok(None)` - no data was read but there should be more to read later. -/// - Unhandled Errors -fn read_available( - cx: &mut Context<'_>, - io: &mut T, - buf: &mut BytesMut, -) -> Result, io::Error> -where - T: AsyncRead + Unpin, -{ - let mut read_some = false; - - loop { - // If buf is full return but do not disconnect since - // there is more reading to be done - if buf.len() >= HW_BUFFER_SIZE { - return Ok(Some(false)); - } - - let remaining = buf.capacity() - buf.len(); - if remaining < LW_BUFFER_SIZE { - buf.reserve(HW_BUFFER_SIZE - remaining); - } - - match actix_codec::poll_read_buf(Pin::new(io), cx, buf) { - Poll::Pending => { - return if read_some { Ok(Some(false)) } else { Ok(None) }; - } - Poll::Ready(Ok(n)) => { - if n == 0 { - return Ok(Some(true)); - } else { - read_some = true; - } - } - Poll::Ready(Err(err)) => { - return if err.kind() == io::ErrorKind::WouldBlock { - if read_some { - Ok(Some(false)) - } else { - Ok(None) - } - } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { - Ok(Some(true)) - } else { - Err(err) - } - } - } - } -} - #[cfg(test)] mod tests { use std::str; diff --git a/actix-http/src/header/into_value.rs b/actix-http/src/header/into_value.rs index 4b1e6cbbd..4ba58e726 100644 --- a/actix-http/src/header/into_value.rs +++ b/actix-http/src/header/into_value.rs @@ -126,6 +126,6 @@ impl IntoHeaderValue for Mime { #[inline] fn try_into_value(self) -> Result { - HeaderValue::try_from(format!("{}", self)) + HeaderValue::from_str(self.as_ref()) } } diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 44a7e8d16..8eb63065c 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -28,5 +28,5 @@ mime = "0.3" twoway = "0.2" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2" actix-http = "3.0.0-beta.1" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 0f90edb07..dd742d282 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -28,6 +28,6 @@ pin-project = "1.0.0" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2" env_logger = "0.7" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 00875cf1b..04bd10421 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -19,7 +19,7 @@ syn = { version = "1", features = ["full", "parsing"] } proc-macro2 = "1" [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2" actix-web = "4.0.0-beta.1" futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index dded78a20..8500e9702 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -40,7 +40,7 @@ compress = ["actix-http/compress"] actix-codec = "0.4.0-beta.1" actix-service = "2.0.0-beta.3" actix-http = "3.0.0-beta.1" -actix-rt = "2.0.0-beta.3" +actix-rt = "2" base64 = "0.13" bytes = "1"