From a03dbe2dcfc02ad6c8d3e81d47f9b2defccce2db Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 7 Jan 2021 02:43:52 +0800 Subject: [PATCH 01/11] replace cloneable service with httpflow abstraction (#1876) --- actix-http/src/cloneable.rs | 39 ---------- actix-http/src/h1/dispatcher.rs | 128 ++++++++++++++++---------------- actix-http/src/h1/service.rs | 39 ++++------ actix-http/src/h2/dispatcher.rs | 27 +++---- actix-http/src/h2/service.rs | 45 +++++------ actix-http/src/lib.rs | 34 ++++++++- actix-http/src/service.rs | 65 +++++++++------- 7 files changed, 190 insertions(+), 187 deletions(-) delete mode 100644 actix-http/src/cloneable.rs diff --git a/actix-http/src/cloneable.rs b/actix-http/src/cloneable.rs deleted file mode 100644 index 5f0b1ea28..000000000 --- a/actix-http/src/cloneable.rs +++ /dev/null @@ -1,39 +0,0 @@ -use std::cell::RefCell; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use actix_service::Service; - -/// Service that allows to turn non-clone service to a service with `Clone` impl -/// -/// # Panics -/// CloneableService might panic with some creative use of thread local storage. -/// See https://github.com/actix/actix-web/issues/1295 for example -#[doc(hidden)] -pub(crate) struct CloneableService(Rc>); - -impl CloneableService { - pub(crate) fn new(service: T) -> Self { - Self(Rc::new(RefCell::new(service))) - } -} - -impl Clone for CloneableService { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl, Req> Service for CloneableService { - type Response = T::Response; - type Error = T::Error; - type Future = T::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.borrow_mut().poll_ready(cx) - } - - fn call(&mut self, req: Req) -> Self::Future { - self.0.borrow_mut().call(req) - } -} diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index a9510dc1e..60552d102 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,9 +1,11 @@ use std::{ + cell::RefCell, collections::VecDeque, fmt, future::Future, io, mem, net, pin::Pin, + rc::Rc, task::{Context, Poll}, }; @@ -15,17 +17,14 @@ use bytes::{Buf, BytesMut}; use log::{error, trace}; use pin_project::pin_project; -use crate::cloneable::CloneableService; +use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::error::{ParseError, PayloadError}; -use crate::httpmessage::HttpMessage; use crate::request::Request; use crate::response::Response; -use crate::{ - body::{Body, BodySize, MessageBody, ResponseBody}, - Extensions, -}; +use crate::service::HttpFlow; +use crate::OnConnectData; use super::codec::Codec; use super::payload::{Payload, PayloadSender, PayloadStatus}; @@ -78,7 +77,7 @@ where U::Error: fmt::Display, { Normal(#[pin] InnerDispatcher), - Upgrade(Pin>), + Upgrade(#[pin] U::Future), } #[pin_project(project = InnerDispatcherProj)] @@ -92,10 +91,8 @@ where U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { - service: CloneableService, - expect: CloneableService, - upgrade: Option>, - on_connect_data: Extensions, + services: Rc>>, + on_connect_data: OnConnectData, flags: Flags, peer_addr: Option, error: Option, @@ -180,10 +177,8 @@ where pub(crate) fn new( stream: T, config: ServiceConfig, - service: CloneableService, - expect: CloneableService, - upgrade: Option>, - on_connect_data: Extensions, + services: Rc>>, + on_connect_data: OnConnectData, peer_addr: Option, ) -> Self { Dispatcher::with_timeout( @@ -192,9 +187,7 @@ where config, BytesMut::with_capacity(HW_BUFFER_SIZE), None, - service, - expect, - upgrade, + services, on_connect_data, peer_addr, ) @@ -207,10 +200,8 @@ where config: ServiceConfig, read_buf: BytesMut, timeout: Option, - service: CloneableService, - expect: CloneableService, - upgrade: Option>, - on_connect_data: Extensions, + services: Rc>>, + on_connect_data: OnConnectData, peer_addr: Option, ) -> Self { let keepalive = config.keep_alive_enabled(); @@ -239,9 +230,7 @@ where io: Some(io), codec, read_buf, - service, - expect, - upgrade, + services, on_connect_data, flags, peer_addr, @@ -395,7 +384,8 @@ where Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); this = self.as_mut().project(); - this.state.set(State::ServiceCall(this.service.call(req))); + let fut = this.services.borrow_mut().service.call(req); + this.state.set(State::ServiceCall(fut)); continue; } Poll::Ready(Err(e)) => { @@ -483,12 +473,14 @@ where // Handle `EXPECT: 100-Continue` header if req.head().expect() { // set dispatcher state so the future is pinned. - let task = self.as_mut().project().expect.call(req); - self.as_mut().project().state.set(State::ExpectCall(task)); + let mut this = self.as_mut().project(); + let task = this.services.borrow_mut().expect.call(req); + this.state.set(State::ExpectCall(task)); } else { // the same as above. - let task = self.as_mut().project().service.call(req); - self.as_mut().project().state.set(State::ServiceCall(task)); + let mut this = self.as_mut().project(); + let task = this.services.borrow_mut().service.call(req); + this.state.set(State::ServiceCall(task)); }; // eagerly poll the future for once(or twice if expect is resolved immediately). @@ -499,8 +491,9 @@ where // expect is resolved. continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); - let task = self.as_mut().project().service.call(req); - self.as_mut().project().state.set(State::ServiceCall(task)); + let mut this = self.as_mut().project(); + let task = this.services.borrow_mut().service.call(req); + this.state.set(State::ServiceCall(task)); continue; } // future is pending. return Ok(()) to notify that a new state is @@ -568,9 +561,11 @@ where req.head_mut().peer_addr = *this.peer_addr; // merge on_connect_ext data into request extensions - req.extensions_mut().drain_from(this.on_connect_data); + this.on_connect_data.merge_into(&mut req); - if pl == MessageType::Stream && this.upgrade.is_some() { + if pl == MessageType::Stream + && this.services.borrow().upgrade.is_some() + { this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } @@ -834,12 +829,17 @@ where ); parts.write_buf = mem::take(inner_p.write_buf); let framed = Framed::from_parts(parts); - let upgrade = - inner_p.upgrade.take().unwrap().call((req, framed)); + let upgrade = inner_p + .services + .borrow_mut() + .upgrade + .take() + .unwrap() + .call((req, framed)); self.as_mut() .project() .inner - .set(DispatcherState::Upgrade(Box::pin(upgrade))); + .set(DispatcherState::Upgrade(upgrade)); return self.poll(cx); } @@ -890,7 +890,7 @@ where } } } - DispatcherStateProj::Upgrade(fut) => fut.as_mut().poll(cx).map_err(|e| { + DispatcherStateProj::Upgrade(fut) => fut.poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e); DispatchError::Upgrade }), @@ -1028,13 +1028,13 @@ mod tests { lazy(|cx| { let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); + let services = HttpFlow::new(ok_service(), ExpectHandler, None); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf, ServiceConfig::default(), - CloneableService::new(ok_service()), - CloneableService::new(ExpectHandler), - None, - Extensions::new(), + services, + OnConnectData::default(), None, ); @@ -1068,13 +1068,13 @@ mod tests { let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf, cfg, - CloneableService::new(echo_path_service()), - CloneableService::new(ExpectHandler), - None, - Extensions::new(), + services, + OnConnectData::default(), None, ); @@ -1122,13 +1122,13 @@ mod tests { let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf, cfg, - CloneableService::new(echo_path_service()), - CloneableService::new(ExpectHandler), - None, - Extensions::new(), + services, + OnConnectData::default(), None, ); @@ -1171,13 +1171,14 @@ mod tests { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf.clone(), cfg, - CloneableService::new(echo_payload_service()), - CloneableService::new(ExpectHandler), - None, - Extensions::new(), + services, + OnConnectData::default(), None, ); @@ -1242,13 +1243,14 @@ mod tests { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf.clone(), cfg, - CloneableService::new(echo_path_service()), - CloneableService::new(ExpectHandler), - None, - Extensions::new(), + services, + OnConnectData::default(), None, ); @@ -1301,13 +1303,15 @@ mod tests { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + + let services = + HttpFlow::new(ok_service(), ExpectHandler, Some(UpgradeHandler)); + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf.clone(), cfg, - CloneableService::new(ok_service()), - CloneableService::new(ExpectHandler), - Some(CloneableService::new(UpgradeHandler)), - Extensions::new(), + services, + OnConnectData::default(), None, ); diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 34b7e31a1..19272c133 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -12,12 +13,12 @@ use futures_core::ready; use futures_util::future::ready; use crate::body::MessageBody; -use crate::cloneable::CloneableService; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; -use crate::{ConnectCallback, Extensions}; +use crate::service::HttpFlow; +use crate::{ConnectCallback, OnConnectData}; use super::codec::Codec; use super::dispatcher::Dispatcher; @@ -299,7 +300,7 @@ where upgrade: Option, on_connect_ext: Option>>, cfg: Option, - _phantom: PhantomData<(T, B)>, + _phantom: PhantomData, } impl Future for H1ServiceResponse @@ -366,9 +367,7 @@ where X: Service, U: Service<(Request, Framed)>, { - srv: CloneableService, - expect: CloneableService, - upgrade: Option>, + services: Rc>>, on_connect_ext: Option>>, cfg: ServiceConfig, _phantom: PhantomData, @@ -387,15 +386,13 @@ where { fn new( cfg: ServiceConfig, - srv: S, + service: S, expect: X, upgrade: Option, on_connect_ext: Option>>, ) -> H1ServiceHandler { H1ServiceHandler { - srv: CloneableService::new(srv), - expect: CloneableService::new(expect), - upgrade: upgrade.map(CloneableService::new), + services: HttpFlow::new(service, expect, upgrade), cfg, on_connect_ext, _phantom: PhantomData, @@ -421,7 +418,8 @@ where type Future = Dispatcher; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let ready = self + let mut services = self.services.borrow_mut(); + let ready = services .expect .poll_ready(cx) .map_err(|e| { @@ -431,8 +429,8 @@ where })? .is_ready(); - let ready = self - .srv + let ready = services + .service .poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -442,7 +440,7 @@ where .is_ready() && ready; - let ready = if let Some(ref mut upg) = self.upgrade { + let ready = if let Some(ref mut upg) = services.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -463,19 +461,14 @@ where } fn call(&mut self, (io, addr): (T, Option)) -> Self::Future { - let mut connect_extensions = Extensions::new(); - if let Some(ref handler) = self.on_connect_ext { - // run on_connect_ext callback, populating connect extensions - handler(&io, &mut connect_extensions); - } + let on_connect_data = + OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); Dispatcher::new( io, self.cfg.clone(), - self.srv.clone(), - self.expect.clone(), - self.upgrade.clone(), - connect_extensions, + self.services.clone(), + on_connect_data, addr, ) } diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index b8828edd0..621035869 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -1,7 +1,9 @@ +use std::cell::RefCell; use std::future::Future; use std::marker::PhantomData; use std::net; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; use std::{cmp, convert::TryFrom}; @@ -16,29 +18,28 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD use log::{error, trace}; use crate::body::{BodySize, MessageBody, ResponseBody}; -use crate::cloneable::CloneableService; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; -use crate::httpmessage::HttpMessage; use crate::message::ResponseHead; use crate::payload::Payload; use crate::request::Request; use crate::response::Response; -use crate::Extensions; +use crate::service::HttpFlow; +use crate::OnConnectData; const CHUNK_SIZE: usize = 16_384; /// Dispatcher for HTTP/2 protocol. #[pin_project::pin_project] -pub struct Dispatcher +pub struct Dispatcher where T: AsyncRead + AsyncWrite + Unpin, S: Service, B: MessageBody, { - service: CloneableService, + services: Rc>>, connection: Connection, - on_connect_data: Extensions, + on_connect_data: OnConnectData, config: ServiceConfig, peer_addr: Option, ka_expire: Instant, @@ -46,7 +47,7 @@ where _phantom: PhantomData, } -impl Dispatcher +impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, S: Service, @@ -55,9 +56,9 @@ where B: MessageBody, { pub(crate) fn new( - service: CloneableService, + services: Rc>>, connection: Connection, - on_connect_data: Extensions, + on_connect_data: OnConnectData, config: ServiceConfig, timeout: Option, peer_addr: Option, @@ -79,7 +80,7 @@ where }; Dispatcher { - service, + services, config, peer_addr, connection, @@ -91,7 +92,7 @@ where } } -impl Future for Dispatcher +impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, S: Service, @@ -133,11 +134,11 @@ where head.peer_addr = this.peer_addr; // merge on_connect_ext data into request extensions - req.extensions_mut().drain_from(&mut this.on_connect_data); + this.on_connect_data.merge_into(&mut req); let svc = ServiceResponse:: { state: ServiceResponseState::ServiceCall( - this.service.call(req), + this.services.borrow_mut().service.call(req), Some(res), ), config: this.config.clone(), diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 462f5c2c1..f94aae79e 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -17,12 +18,12 @@ use h2::server::{self, Handshake}; use log::error; use crate::body::MessageBody; -use crate::cloneable::CloneableService; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; -use crate::{ConnectCallback, Extensions}; +use crate::service::HttpFlow; +use crate::{ConnectCallback, OnConnectData}; use super::dispatcher::Dispatcher; @@ -248,7 +249,7 @@ pub struct H2ServiceHandler where S: Service, { - srv: CloneableService, + services: Rc>>, cfg: ServiceConfig, on_connect_ext: Option>>, _phantom: PhantomData, @@ -265,12 +266,12 @@ where fn new( cfg: ServiceConfig, on_connect_ext: Option>>, - srv: S, + service: S, ) -> H2ServiceHandler { H2ServiceHandler { + services: HttpFlow::new(service, (), None), cfg, on_connect_ext, - srv: CloneableService::new(srv), _phantom: PhantomData, } } @@ -290,26 +291,27 @@ where type Future = H2ServiceHandlerResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.srv.poll_ready(cx).map_err(|e| { - let e = e.into(); - error!("Service readiness error: {:?}", e); - DispatchError::Service(e) - }) + self.services + .borrow_mut() + .service + .poll_ready(cx) + .map_err(|e| { + let e = e.into(); + error!("Service readiness error: {:?}", e); + DispatchError::Service(e) + }) } fn call(&mut self, (io, addr): (T, Option)) -> Self::Future { - let mut connect_extensions = Extensions::new(); - if let Some(ref handler) = self.on_connect_ext { - // run on_connect_ext callback, populating connect extensions - handler(&io, &mut connect_extensions); - } + let on_connect_data = + OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); H2ServiceHandlerResponse { state: State::Handshake( - Some(self.srv.clone()), + Some(self.services.clone()), Some(self.cfg.clone()), addr, - Some(connect_extensions), + on_connect_data, server::handshake(io), ), } @@ -321,12 +323,12 @@ where T: AsyncRead + AsyncWrite + Unpin, S::Future: 'static, { - Incoming(Dispatcher), + Incoming(Dispatcher), Handshake( - Option>, + Option>>>, Option, Option, - Option, + OnConnectData, Handshake, ), } @@ -365,10 +367,11 @@ where ref mut handshake, ) => match ready!(Pin::new(handshake).poll(cx)) { Ok(conn) => { + let on_connect_data = std::mem::take(on_connect_data); self.state = State::Incoming(Dispatcher::new( srv.take().unwrap(), conn, - on_connect_data.take().unwrap(), + on_connect_data, config.take().unwrap(), None, *peer_addr, diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 94cc50a76..0c58df2ed 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -19,7 +19,6 @@ mod macros; pub mod body; mod builder; pub mod client; -mod cloneable; mod config; #[cfg(feature = "compress")] pub mod encoding; @@ -81,3 +80,36 @@ pub enum Protocol { } type ConnectCallback = dyn Fn(&IO, &mut Extensions); + +/// Container for data that extract with ConnectCallback. +pub(crate) struct OnConnectData(Option); + +impl Default for OnConnectData { + fn default() -> Self { + Self(None) + } +} + +impl OnConnectData { + // construct self from io. + pub(crate) fn from_io( + io: &T, + on_connect_ext: Option<&ConnectCallback>, + ) -> Self { + let ext = on_connect_ext.map(|handler| { + let mut extensions = Extensions::new(); + handler(io, &mut extensions); + extensions + }); + + Self(ext) + } + + // merge self to given request's head extension. + #[inline] + pub(crate) fn merge_into(&mut self, req: &mut Request) { + if let Some(ref mut ext) = self.0 { + req.head.extensions.get_mut().drain_from(ext); + } + } +} diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index af625b1bf..eb16a6e70 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -14,12 +15,11 @@ use pin_project::pin_project; use crate::body::MessageBody; use crate::builder::HttpServiceBuilder; -use crate::cloneable::CloneableService; use crate::config::{KeepAlive, ServiceConfig}; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; -use crate::{h1, h2::Dispatcher, ConnectCallback, Extensions, Protocol}; +use crate::{h1, h2::Dispatcher, ConnectCallback, OnConnectData, Protocol}; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. pub struct HttpService { @@ -371,7 +371,7 @@ where upgrade: Option, on_connect_ext: Option>>, cfg: ServiceConfig, - _phantom: PhantomData<(T, B)>, + _phantom: PhantomData, } impl Future for HttpServiceResponse @@ -441,14 +441,29 @@ where X: Service, U: Service<(Request, Framed)>, { - srv: CloneableService, - expect: CloneableService, - upgrade: Option>, + services: Rc>>, cfg: ServiceConfig, on_connect_ext: Option>>, _phantom: PhantomData, } +// a collection of service for http. +pub(super) struct HttpFlow { + pub(super) service: S, + pub(super) expect: X, + pub(super) upgrade: Option, +} + +impl HttpFlow { + pub(super) fn new(service: S, expect: X, upgrade: Option) -> Rc> { + Rc::new(RefCell::new(Self { + service, + expect, + upgrade, + })) + } +} + impl HttpServiceHandler where S: Service, @@ -463,7 +478,7 @@ where { fn new( cfg: ServiceConfig, - srv: S, + service: S, expect: X, upgrade: Option, on_connect_ext: Option>>, @@ -471,9 +486,7 @@ where HttpServiceHandler { cfg, on_connect_ext, - srv: CloneableService::new(srv), - expect: CloneableService::new(expect), - upgrade: upgrade.map(CloneableService::new), + services: HttpFlow::new(service, expect, upgrade), _phantom: PhantomData, } } @@ -498,7 +511,8 @@ where type Future = HttpServiceHandlerResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let ready = self + let mut services = self.services.borrow_mut(); + let ready = services .expect .poll_ready(cx) .map_err(|e| { @@ -508,8 +522,8 @@ where })? .is_ready(); - let ready = self - .srv + let ready = services + .service .poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -519,7 +533,7 @@ where .is_ready() && ready; - let ready = if let Some(ref mut upg) = self.upgrade { + let ready = if let Some(ref mut upg) = services.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -543,19 +557,16 @@ where &mut self, (io, proto, peer_addr): (T, Protocol, Option), ) -> Self::Future { - let mut connect_extensions = Extensions::new(); - - if let Some(ref handler) = self.on_connect_ext { - handler(&io, &mut connect_extensions); - } + let on_connect_data = + OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); match proto { Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake(Some(( server::handshake(io), self.cfg.clone(), - self.srv.clone(), - connect_extensions, + self.services.clone(), + on_connect_data, peer_addr, ))), }, @@ -564,10 +575,8 @@ where state: State::H1(h1::Dispatcher::new( io, self.cfg.clone(), - self.srv.clone(), - self.expect.clone(), - self.upgrade.clone(), - connect_extensions, + self.services.clone(), + on_connect_data, peer_addr, )), }, @@ -589,13 +598,13 @@ where U::Error: fmt::Display, { H1(#[pin] h1::Dispatcher), - H2(#[pin] Dispatcher), + H2(#[pin] Dispatcher), H2Handshake( Option<( Handshake, ServiceConfig, - CloneableService, - Extensions, + Rc>>, + OnConnectData, Option, )>, ), From 51e9e1500b18a5ce55f90085f1397ef8f6c3d1cf Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 6 Jan 2021 18:52:06 +0000 Subject: [PATCH 02/11] add docs to recent additions --- actix-http/src/h1/dispatcher.rs | 16 ++++++++-------- actix-http/src/h1/service.rs | 14 +++++++------- actix-http/src/h2/dispatcher.rs | 6 +++--- actix-http/src/h2/service.rs | 8 ++++---- actix-http/src/lib.rs | 9 ++++++--- actix-http/src/service.rs | 18 +++++++++--------- 6 files changed, 37 insertions(+), 34 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 60552d102..a914880ce 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -91,7 +91,7 @@ where U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { - services: Rc>>, + flow: Rc>>, on_connect_data: OnConnectData, flags: Flags, peer_addr: Option, @@ -230,7 +230,7 @@ where io: Some(io), codec, read_buf, - services, + flow: services, on_connect_data, flags, peer_addr, @@ -384,7 +384,7 @@ where Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); this = self.as_mut().project(); - let fut = this.services.borrow_mut().service.call(req); + let fut = this.flow.borrow_mut().service.call(req); this.state.set(State::ServiceCall(fut)); continue; } @@ -474,12 +474,12 @@ where if req.head().expect() { // set dispatcher state so the future is pinned. let mut this = self.as_mut().project(); - let task = this.services.borrow_mut().expect.call(req); + let task = this.flow.borrow_mut().expect.call(req); this.state.set(State::ExpectCall(task)); } else { // the same as above. let mut this = self.as_mut().project(); - let task = this.services.borrow_mut().service.call(req); + let task = this.flow.borrow_mut().service.call(req); this.state.set(State::ServiceCall(task)); }; @@ -492,7 +492,7 @@ where Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); let mut this = self.as_mut().project(); - let task = this.services.borrow_mut().service.call(req); + let task = this.flow.borrow_mut().service.call(req); this.state.set(State::ServiceCall(task)); continue; } @@ -564,7 +564,7 @@ where this.on_connect_data.merge_into(&mut req); if pl == MessageType::Stream - && this.services.borrow().upgrade.is_some() + && this.flow.borrow().upgrade.is_some() { this.messages.push_back(DispatcherMessage::Upgrade(req)); break; @@ -830,7 +830,7 @@ where parts.write_buf = mem::take(inner_p.write_buf); let framed = Framed::from_parts(parts); let upgrade = inner_p - .services + .flow .borrow_mut() .upgrade .take() diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 19272c133..067c8b647 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -367,7 +367,7 @@ where X: Service, U: Service<(Request, Framed)>, { - services: Rc>>, + flow: Rc>>, on_connect_ext: Option>>, cfg: ServiceConfig, _phantom: PhantomData, @@ -392,7 +392,7 @@ where on_connect_ext: Option>>, ) -> H1ServiceHandler { H1ServiceHandler { - services: HttpFlow::new(service, expect, upgrade), + flow: HttpFlow::new(service, expect, upgrade), cfg, on_connect_ext, _phantom: PhantomData, @@ -418,8 +418,8 @@ where type Future = Dispatcher; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut services = self.services.borrow_mut(); - let ready = services + let mut flow = self.flow.borrow_mut(); + let ready = flow .expect .poll_ready(cx) .map_err(|e| { @@ -429,7 +429,7 @@ where })? .is_ready(); - let ready = services + let ready = flow .service .poll_ready(cx) .map_err(|e| { @@ -440,7 +440,7 @@ where .is_ready() && ready; - let ready = if let Some(ref mut upg) = services.upgrade { + let ready = if let Some(ref mut upg) = flow.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -467,7 +467,7 @@ where Dispatcher::new( io, self.cfg.clone(), - self.services.clone(), + self.flow.clone(), on_connect_data, addr, ) diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 621035869..959c34f13 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -37,7 +37,7 @@ where S: Service, B: MessageBody, { - services: Rc>>, + flow: Rc>>, connection: Connection, on_connect_data: OnConnectData, config: ServiceConfig, @@ -80,7 +80,7 @@ where }; Dispatcher { - services, + flow: services, config, peer_addr, connection, @@ -138,7 +138,7 @@ where let svc = ServiceResponse:: { state: ServiceResponseState::ServiceCall( - this.services.borrow_mut().service.call(req), + this.flow.borrow_mut().service.call(req), Some(res), ), config: this.config.clone(), diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index f94aae79e..95ff3de26 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -249,7 +249,7 @@ pub struct H2ServiceHandler where S: Service, { - services: Rc>>, + flow: Rc>>, cfg: ServiceConfig, on_connect_ext: Option>>, _phantom: PhantomData, @@ -269,7 +269,7 @@ where service: S, ) -> H2ServiceHandler { H2ServiceHandler { - services: HttpFlow::new(service, (), None), + flow: HttpFlow::new(service, (), None), cfg, on_connect_ext, _phantom: PhantomData, @@ -291,7 +291,7 @@ where type Future = H2ServiceHandlerResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.services + self.flow .borrow_mut() .service .poll_ready(cx) @@ -308,7 +308,7 @@ where H2ServiceHandlerResponse { state: State::Handshake( - Some(self.services.clone()), + Some(self.flow.clone()), Some(self.cfg.clone()), addr, on_connect_data, diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 0c58df2ed..3879bae81 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -72,7 +72,7 @@ pub mod http { pub use crate::message::ConnectionType; } -/// Http protocol +/// HTTP protocol #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum Protocol { Http1, @@ -82,6 +82,9 @@ pub enum Protocol { type ConnectCallback = dyn Fn(&IO, &mut Extensions); /// Container for data that extract with ConnectCallback. +/// +/// # Implementation Details +/// Uses Option to reduce necessary allocations when merging with request extensions. pub(crate) struct OnConnectData(Option); impl Default for OnConnectData { @@ -91,7 +94,7 @@ impl Default for OnConnectData { } impl OnConnectData { - // construct self from io. + /// Construct by calling the on-connect callback with the underlying transport I/O. pub(crate) fn from_io( io: &T, on_connect_ext: Option<&ConnectCallback>, @@ -105,7 +108,7 @@ impl OnConnectData { Self(ext) } - // merge self to given request's head extension. + /// Merge self into given request's extensions. #[inline] pub(crate) fn merge_into(&mut self, req: &mut Request) { if let Some(ref mut ext) = self.0 { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index eb16a6e70..e137ab6fa 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -441,13 +441,13 @@ where X: Service, U: Service<(Request, Framed)>, { - services: Rc>>, + flow: Rc>>, cfg: ServiceConfig, on_connect_ext: Option>>, _phantom: PhantomData, } -// a collection of service for http. +/// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow { pub(super) service: S, pub(super) expect: X, @@ -486,7 +486,7 @@ where HttpServiceHandler { cfg, on_connect_ext, - services: HttpFlow::new(service, expect, upgrade), + flow: HttpFlow::new(service, expect, upgrade), _phantom: PhantomData, } } @@ -511,8 +511,8 @@ where type Future = HttpServiceHandlerResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut services = self.services.borrow_mut(); - let ready = services + let mut flow = self.flow.borrow_mut(); + let ready = flow .expect .poll_ready(cx) .map_err(|e| { @@ -522,7 +522,7 @@ where })? .is_ready(); - let ready = services + let ready = flow .service .poll_ready(cx) .map_err(|e| { @@ -533,7 +533,7 @@ where .is_ready() && ready; - let ready = if let Some(ref mut upg) = services.upgrade { + let ready = if let Some(ref mut upg) = flow.upgrade { upg.poll_ready(cx) .map_err(|e| { let e = e.into(); @@ -565,7 +565,7 @@ where state: State::H2Handshake(Some(( server::handshake(io), self.cfg.clone(), - self.services.clone(), + self.flow.clone(), on_connect_data, peer_addr, ))), @@ -575,7 +575,7 @@ where state: State::H1(h1::Dispatcher::new( io, self.cfg.clone(), - self.services.clone(), + self.flow.clone(), on_connect_data, peer_addr, )), From 00ba8d55492284581695d824648590715a8bd386 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 6 Jan 2021 18:58:24 +0000 Subject: [PATCH 03/11] add http3 variant to protocol enum --- actix-http/CHANGES.md | 3 +++ actix-http/src/lib.rs | 4 +++- actix-http/src/service.rs | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index eadbf6f46..6daed67a0 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +* Add `Http3` to `Protocol` enum for future compatibility and also mark `#[non_exhaustive]`. + ### Changed * Update `actix-*` dependencies to tokio `1.0` based versions. [#1813] * Bumped `rand` to `0.8`. diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 3879bae81..e17b7de0a 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -72,11 +72,13 @@ pub mod http { pub use crate::message::ConnectionType; } -/// HTTP protocol +/// A major HTTP protocol version. #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +#[non_exhaustive] pub enum Protocol { Http1, Http2, + Http3, } type ConnectCallback = dyn Fn(&IO, &mut Extensions); diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index e137ab6fa..392a1426f 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -580,6 +580,8 @@ where peer_addr, )), }, + + proto => unimplemented!("Unsupported HTTP version: {:?}.", proto) } } } From 85753130d98c1e766b729f22d1537097a0cf8a72 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 7 Jan 2021 00:35:19 +0000 Subject: [PATCH 04/11] fmt --- actix-http/src/h2/service.rs | 14 +++++--------- actix-http/src/service.rs | 2 +- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 95ff3de26..36f7dc311 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -291,15 +291,11 @@ where type Future = H2ServiceHandlerResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.flow - .borrow_mut() - .service - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - error!("Service readiness error: {:?}", e); - DispatchError::Service(e) - }) + self.flow.borrow_mut().service.poll_ready(cx).map_err(|e| { + let e = e.into(); + error!("Service readiness error: {:?}", e); + DispatchError::Service(e) + }) } fn call(&mut self, (io, addr): (T, Option)) -> Self::Future { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 392a1426f..6236e9fbe 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -581,7 +581,7 @@ where )), }, - proto => unimplemented!("Unsupported HTTP version: {:?}.", proto) + proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), } } } From 6d710629afeb7b83725ce56141bded20d7e931e7 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 7 Jan 2021 08:57:34 +0800 Subject: [PATCH 05/11] fix bug where upgrade future is not reset properly (#1880) --- actix-http/src/body.rs | 3 +- actix-http/src/h1/service.rs | 2 +- actix-http/src/service.rs | 80 +++++++++++++----------------------- 3 files changed, 30 insertions(+), 55 deletions(-) diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 5d1cf7329..d5d1989d4 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -3,8 +3,7 @@ use std::task::{Context, Poll}; use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; -use futures_core::Stream; -use futures_util::ready; +use futures_core::{ready, Stream}; use pin_project::pin_project; use crate::error::Error; diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 067c8b647..aed700eed 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -338,7 +338,7 @@ where .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.upgrade = Some(upgrade); - this.fut_ex.set(None); + this.fut_upg.set(None); } let result = ready!(this diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 6236e9fbe..357ac4c53 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -9,7 +9,6 @@ use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; use futures_core::{ready, Future}; -use futures_util::future::ok; use h2::server::{self, Handshake}; use pin_project::pin_project; @@ -175,9 +174,9 @@ where Error = DispatchError, InitError = (), > { - pipeline_factory(|io: TcpStream| { + pipeline_factory(|io: TcpStream| async { let peer_addr = io.peer_addr().ok(); - ok((io, Protocol::Http1, peer_addr)) + Ok((io, Protocol::Http1, peer_addr)) }) .and_then(self) } @@ -227,7 +226,7 @@ mod openssl { .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) - .and_then(|io: SslStream| { + .and_then(|io: SslStream| async { let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 @@ -238,7 +237,7 @@ mod openssl { Protocol::Http1 }; let peer_addr = io.get_ref().peer_addr().ok(); - ok((io, proto, peer_addr)) + Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } @@ -295,7 +294,7 @@ mod rustls { .map_err(TlsError::Tls) .map_init_err(|_| panic!()), ) - .and_then(|io: TlsStream| { + .and_then(|io: TlsStream| async { let proto = if let Some(protos) = io.get_ref().1.get_alpn_protocol() { if protos.windows(2).any(|window| window == b"h2") { Protocol::Http2 @@ -306,7 +305,7 @@ mod rustls { Protocol::Http1 }; let peer_addr = io.get_ref().0.peer_addr().ok(); - ok((io, proto, peer_addr)) + Ok((io, proto, peer_addr)) }) .and_then(self.map_err(TlsError::Service)) } @@ -413,7 +412,7 @@ where .map_err(|e| log::error!("Init http service error: {:?}", e)))?; this = self.as_mut().project(); *this.upgrade = Some(upgrade); - this.fut_ex.set(None); + this.fut_upg.set(None); } let result = ready!(this @@ -645,53 +644,30 @@ where { type Output = Result<(), DispatchError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().state.poll(cx) - } -} - -impl State -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - S::Error: Into + 'static, - S::Response: Into> + 'static, - B: MessageBody + 'static, - X: Service, - X::Error: Into, - U: Service<(Request, Framed), Response = ()>, - U::Error: fmt::Display, -{ - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match self.as_mut().project() { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project().state.project() { StateProj::H1(disp) => disp.poll(cx), StateProj::H2(disp) => disp.poll(cx), - StateProj::H2Handshake(ref mut data) => { - let conn = if let Some(ref mut item) = data { - match Pin::new(&mut item.0).poll(cx) { - Poll::Ready(Ok(conn)) => conn, - Poll::Ready(Err(err)) => { - trace!("H2 handshake error: {}", err); - return Poll::Ready(Err(err.into())); - } - Poll::Pending => return Poll::Pending, + StateProj::H2Handshake(data) => { + match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { + Ok(conn) => { + let (_, cfg, srv, on_connect_data, peer_addr) = + data.take().unwrap(); + self.as_mut().project().state.set(State::H2(Dispatcher::new( + srv, + conn, + on_connect_data, + cfg, + None, + peer_addr, + ))); + self.poll(cx) } - } else { - panic!() - }; - let (_, cfg, srv, on_connect_data, peer_addr) = data.take().unwrap(); - self.set(State::H2(Dispatcher::new( - srv, - conn, - on_connect_data, - cfg, - None, - peer_addr, - ))); - self.poll(cx) + Err(err) => { + trace!("H2 handshake error: {}", err); + Poll::Ready(Err(err.into())) + } + } } } } From dc23559f23f9a14f9ff48d6fa71735e70de8edb6 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 7 Jan 2021 01:13:46 +0000 Subject: [PATCH 06/11] address clippy lints --- actix-http/src/ws/proto.rs | 20 ++++++++++++-------- awc/src/sender.rs | 12 ++++++------ src/service.rs | 6 +++--- src/types/either.rs | 6 +++--- 4 files changed, 24 insertions(+), 20 deletions(-) diff --git a/actix-http/src/ws/proto.rs b/actix-http/src/ws/proto.rs index 6fa3debc5..93468d232 100644 --- a/actix-http/src/ws/proto.rs +++ b/actix-http/src/ws/proto.rs @@ -1,7 +1,6 @@ use std::convert::{From, Into}; use std::fmt; -use self::OpCode::*; /// Operation codes as part of RFC6455. #[derive(Debug, Eq, PartialEq, Clone, Copy)] pub enum OpCode { @@ -29,6 +28,7 @@ pub enum OpCode { impl fmt::Display for OpCode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use self::OpCode::*; match *self { Continue => write!(f, "CONTINUE"), Text => write!(f, "TEXT"), @@ -41,9 +41,10 @@ impl fmt::Display for OpCode { } } -impl Into for OpCode { - fn into(self) -> u8 { - match self { +impl From for u8 { + fn from(op: OpCode) -> u8 { + use self::OpCode::*; + match op { Continue => 0, Text => 1, Binary => 2, @@ -60,6 +61,7 @@ impl Into for OpCode { impl From for OpCode { fn from(byte: u8) -> OpCode { + use self::OpCode::*; match byte { 0 => Continue, 1 => Text, @@ -72,7 +74,6 @@ impl From for OpCode { } } -use self::CloseCode::*; /// Status code used to indicate why an endpoint is closing the `WebSocket` /// connection. #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -138,9 +139,10 @@ pub enum CloseCode { Other(u16), } -impl Into for CloseCode { - fn into(self) -> u16 { - match self { +impl From for u16 { + fn from(code: CloseCode) -> u16 { + use self::CloseCode::*; + match code { Normal => 1000, Away => 1001, Protocol => 1002, @@ -161,6 +163,7 @@ impl Into for CloseCode { impl From for CloseCode { fn from(code: u16) -> CloseCode { + use self::CloseCode::*; match code { 1000 => Normal, 1001 => Away, @@ -185,6 +188,7 @@ impl From for CloseCode { pub struct CloseReason { /// Exit code pub code: CloseCode, + /// Optional description of the exit code pub description: Option, } diff --git a/awc/src/sender.rs b/awc/src/sender.rs index d4d3d9b72..ebf87e23b 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -33,18 +33,18 @@ pub(crate) enum PrepForSendingError { Http(HttpError), } -impl Into for PrepForSendingError { - fn into(self) -> FreezeRequestError { - match self { +impl From for FreezeRequestError { + fn from(err: PrepForSendingError) -> FreezeRequestError { + match err { PrepForSendingError::Url(e) => FreezeRequestError::Url(e), PrepForSendingError::Http(e) => FreezeRequestError::Http(e), } } } -impl Into for PrepForSendingError { - fn into(self) -> SendRequestError { - match self { +impl From for SendRequestError { + fn from(err: PrepForSendingError) -> SendRequestError { + match err { PrepForSendingError::Url(e) => SendRequestError::Url(e), PrepForSendingError::Http(e) => SendRequestError::Http(e), } diff --git a/src/service.rs b/src/service.rs index e6f71ed06..b88dac465 100644 --- a/src/service.rs +++ b/src/service.rs @@ -416,9 +416,9 @@ impl ServiceResponse { } } -impl Into> for ServiceResponse { - fn into(self) -> Response { - self.response +impl From> for Response { + fn into(res: ServiceResponse) -> Response { + res.response } } diff --git a/src/types/either.rs b/src/types/either.rs index 9f1d81a0b..3d4d6bf05 100644 --- a/src/types/either.rs +++ b/src/types/either.rs @@ -121,13 +121,13 @@ pub enum EitherExtractError { Extract(A, B), } -impl Into for EitherExtractError +impl From> for Error where A: Into, B: Into, { - fn into(self) -> Error { - match self { + fn into(err: EitherExtractError) -> Error { + match err { EitherExtractError::Bytes(err) => err, EitherExtractError::Extract(a_err, _b_err) => a_err.into(), } From d3c476b8c2d7daaa9a221b242bda3070db7f8eb0 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 7 Jan 2021 02:41:05 +0000 Subject: [PATCH 07/11] use env_logger builders in examples --- examples/on_connect.rs | 8 ++------ examples/uds.rs | 3 +-- src/service.rs | 2 +- src/types/either.rs | 2 +- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/examples/on_connect.rs b/examples/on_connect.rs index bdad7e67e..ba5a18f3f 100644 --- a/examples/on_connect.rs +++ b/examples/on_connect.rs @@ -4,7 +4,7 @@ //! For an example of extracting a client TLS certificate, see: //! -use std::{any::Any, env, io, net::SocketAddr}; +use std::{any::Any, io, net::SocketAddr}; use actix_web::{dev::Extensions, rt::net::TcpStream, web, App, HttpServer}; @@ -36,11 +36,7 @@ fn get_conn_info(connection: &dyn Any, data: &mut Extensions) { #[actix_web::main] async fn main() -> io::Result<()> { - if env::var("RUST_LOG").is_err() { - env::set_var("RUST_LOG", "info"); - } - - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); HttpServer::new(|| App::new().default_service(web::to(route_whoami))) .on_connect(get_conn_info) diff --git a/examples/uds.rs b/examples/uds.rs index e34fa5ac9..c0a5d76a6 100644 --- a/examples/uds.rs +++ b/examples/uds.rs @@ -22,8 +22,7 @@ async fn no_params() -> &'static str { #[cfg(unix)] #[actix_web::main] async fn main() -> std::io::Result<()> { - std::env::set_var("RUST_LOG", "actix_server=info,actix_web=info"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); HttpServer::new(|| { App::new() diff --git a/src/service.rs b/src/service.rs index b88dac465..c6a961efc 100644 --- a/src/service.rs +++ b/src/service.rs @@ -417,7 +417,7 @@ impl ServiceResponse { } impl From> for Response { - fn into(res: ServiceResponse) -> Response { + fn from(res: ServiceResponse) -> Response { res.response } } diff --git a/src/types/either.rs b/src/types/either.rs index 3d4d6bf05..8a046d291 100644 --- a/src/types/either.rs +++ b/src/types/either.rs @@ -126,7 +126,7 @@ where A: Into, B: Into, { - fn into(err: EitherExtractError) -> Error { + fn from(err: EitherExtractError) -> Error { match err { EitherExtractError::Bytes(err) => err, EitherExtractError::Extract(a_err, _b_err) => a_err.into(), From c09186a2c07076a02d53137f31fef59f78fe95d5 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 7 Jan 2021 20:02:08 +0000 Subject: [PATCH 08/11] prepare v4 beta releases (#1881) --- CHANGES.md | 3 +++ Cargo.toml | 9 +++++---- actix-files/CHANGES.md | 3 +++ actix-files/Cargo.toml | 6 +++--- actix-http-test/CHANGES.md | 3 +++ actix-http-test/Cargo.toml | 8 ++++---- actix-http/CHANGES.md | 3 +++ actix-http/Cargo.toml | 4 ++-- actix-multipart/CHANGES.md | 12 ++++++++---- actix-multipart/Cargo.toml | 8 ++++---- actix-web-actors/CHANGES.md | 4 ++++ actix-web-actors/Cargo.toml | 6 +++--- actix-web-codegen/Cargo.toml | 2 +- awc/CHANGES.md | 3 +++ awc/Cargo.toml | 12 ++++++------ 15 files changed, 55 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f0b55801b..25fd10952 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 4.0.0-beta.1 - 2021-01-07 ### Added * `Compat` middleware enabling generic response body/error type of middlewares like `Logger` and `Compress` to be used in `middleware::Condition` and `Resource`, `Scope` services. [#1865] diff --git a/Cargo.toml b/Cargo.toml index 5388de4ed..87183c327 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "3.3.2" +version = "4.0.0-beta.1" authors = ["Nikolay Kim "] description = "Actix Web is a powerful, pragmatic, and extremely fast web framework for Rust" readme = "README.md" @@ -84,8 +84,8 @@ actix-threadpool = "0.3.1" actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true } actix-web-codegen = "0.4.0" -actix-http = "2.2.0" -awc = { version = "2.0.3", default-features = false } +actix-http = "3.0.0-beta.1" +awc = { version = "3.0.0-beta.1", default-features = false } ahash = "0.6" bytes = "1" @@ -109,7 +109,7 @@ smallvec = "1.6" [dev-dependencies] actix = "0.11.0-beta.1" -actix-http = { version = "2.2.0", features = ["actors"] } +actix-http = { version = "3.0.0-beta.1", features = ["actors"] } rand = "0.8" env_logger = "0.8" serde_derive = "1.0" @@ -126,6 +126,7 @@ codegen-units = 1 actix-web = { path = "." } actix-http = { path = "actix-http" } actix-http-test = { path = "actix-http-test" } +actix-web-actors = { path = "actix-web-actors" } actix-web-codegen = { path = "actix-web-codegen" } actix-multipart = { path = "actix-multipart" } actix-files = { path = "actix-files" } diff --git a/actix-files/CHANGES.md b/actix-files/CHANGES.md index 6dcf4f66f..ff8ccd640 100644 --- a/actix-files/CHANGES.md +++ b/actix-files/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 0.6.0-beta.1 - 2021-01-07 * `HttpRange::parse` now has its own error type. * Update `bytes` to `1.0`. [#1813] diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index 17e1a4888..f93450ff8 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-files" -version = "0.5.0" +version = "0.6.0-beta.1" authors = ["Nikolay Kim "] description = "Static file serving for Actix Web" readme = "README.md" @@ -17,7 +17,7 @@ name = "actix_files" path = "src/lib.rs" [dependencies] -actix-web = { version = "3.0.0", default-features = false } +actix-web = { version = "4.0.0-beta.1", default-features = false } actix-service = "2.0.0-beta.2" bitflags = "1" bytes = "1" @@ -32,4 +32,4 @@ v_htmlescape = "0.12" [dev-dependencies] actix-rt = "2.0.0-beta.1" -actix-web = "3.0.0" +actix-web = "4.0.0-beta.1" diff --git a/actix-http-test/CHANGES.md b/actix-http-test/CHANGES.md index 6ed6a0603..ed6d87a60 100644 --- a/actix-http-test/CHANGES.md +++ b/actix-http-test/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.1 - 2021-01-07 * Update `bytes` to `1.0`. [#1813] [#1813]: https://github.com/actix/actix-web/pull/1813 diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index 910fbab73..a056b833e 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http-test" -version = "2.1.0" +version = "3.0.0-beta.1" authors = ["Nikolay Kim "] description = "Various helpers for Actix applications to use during testing" readme = "README.md" @@ -35,7 +35,7 @@ actix-tls = "3.0.0-beta.2" actix-utils = "3.0.0-beta.1" actix-rt = "2.0.0-beta.1" actix-server = "2.0.0-beta.2" -awc = "2.0.0" +awc = "3.0.0-beta.1" base64 = "0.13" bytes = "1" @@ -51,5 +51,5 @@ time = { version = "0.2.7", default-features = false, features = ["std"] } open-ssl = { version = "0.10", package = "openssl", optional = true } [dev-dependencies] -actix-web = "3.0.0" -actix-http = "2.0.0" +actix-web = "4.0.0-beta.1" +actix-http = "3.0.0-beta.1" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 6daed67a0..6abd0ba76 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.1 - 2021-01-07 ### Added * Add `Http3` to `Protocol` enum for future compatibility and also mark `#[non_exhaustive]`. diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index e80800d06..1c8206ef2 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "2.2.0" +version = "3.0.0-beta.1" authors = ["Nikolay Kim "] description = "HTTP primitives for the Actix ecosystem" readme = "README.md" @@ -87,7 +87,7 @@ flate2 = { version = "1.0.13", optional = true } [dev-dependencies] actix-server = "2.0.0-beta.2" -actix-http-test = { version = "2.0.0", features = ["openssl"] } +actix-http-test = { version = "3.0.0-beta.1", features = ["openssl"] } actix-tls = { version = "3.0.0-beta.2", features = ["openssl"] } criterion = "0.3" env_logger = "0.7" diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index 4c6f01d29..e1fe9c4af 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,17 +1,21 @@ # Changes ## Unreleased - 2021-xx-xx -* Fix multipart consuming payload before header checks #1513 + + +## 0.4.0-beta.1 - 2021-01-07 +* Fix multipart consuming payload before header checks. [#1513] * Update `bytes` to `1.0`. [#1813] [#1813]: https://github.com/actix/actix-web/pull/1813 +[#1513]: https://github.com/actix/actix-web/pull/1513 -## 3.0.0 - 2020-09-11 -* No significant changes from `3.0.0-beta.2`. +## 0.3.0 - 2020-09-11 +* No significant changes from `0.3.0-beta.2`. -## 3.0.0-beta.2 - 2020-09-10 +## 0.3.0-beta.2 - 2020-09-10 * Update `actix-*` dependencies to latest versions. diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index ed572a700..d22cf7ef0 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-multipart" -version = "0.3.0" +version = "0.4.0-beta.1" authors = ["Nikolay Kim "] description = "Multipart support for actix web framework." readme = "README.md" @@ -16,11 +16,11 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = { version = "3.0.0", default-features = false } +actix-web = { version = "4.0.0-beta.1", default-features = false } actix-utils = "3.0.0-beta.1" bytes = "1" -derive_more = "0.99.2" +derive_more = "0.99.5" httparse = "1.3" futures-util = { version = "0.3.7", default-features = false } log = "0.4" @@ -29,4 +29,4 @@ twoway = "0.2" [dev-dependencies] actix-rt = "2.0.0-beta.1" -actix-http = "2.0.0" +actix-http = "3.0.0-beta.1" diff --git a/actix-web-actors/CHANGES.md b/actix-web-actors/CHANGES.md index dab35953a..f75c6805f 100644 --- a/actix-web-actors/CHANGES.md +++ b/actix-web-actors/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 4.0.0-beta.1 - 2021-01-07 * Update `pin-project` to `1.0`. * Update `bytes` to `1.0`. [#1813] * `WebsocketContext::text` now takes an `Into`. [#1864] @@ -8,6 +11,7 @@ [#1813]: https://github.com/actix/actix-web/pull/1813 [#1864]: https://github.com/actix/actix-web/pull/1864 + ## 3.0.0 - 2020-09-11 * No significant changes from `3.0.0-beta.2`. diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index dac4060ba..331363543 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web-actors" -version = "3.0.0" +version = "4.0.0-beta.1" authors = ["Nikolay Kim "] description = "Actix actors support for actix web framework." readme = "README.md" @@ -18,8 +18,8 @@ path = "src/lib.rs" [dependencies] actix = "0.11.0-beta.1" actix-codec = "0.4.0-beta.1" -actix-http = "2.0.0" -actix-web = { version = "3.0.0", default-features = false } +actix-http = "3.0.0-beta.1" +actix-web = { version = "4.0.0-beta.1", default-features = false } bytes = "1" bytestring = "1" diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 3fc4ae1be..25e88d9e1 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -20,7 +20,7 @@ proc-macro2 = "1" [dev-dependencies] actix-rt = "2.0.0-beta.1" -actix-web = "3.0.0" +actix-web = "4.0.0-beta.1" futures-util = { version = "0.3.7", default-features = false } trybuild = "1" rustversion = "1" diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 45a38259c..89b6121f3 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.1 - 2021-01-07 ### Changed * Update `rand` to `0.8` * Update `bytes` to `1.0`. [#1813] diff --git a/awc/Cargo.toml b/awc/Cargo.toml index b80f1ba6b..b92df8247 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "2.0.3" +version = "3.0.0-beta.1" authors = ["Nikolay Kim "] description = "Async HTTP and WebSocket client library built on the Actix ecosystem" readme = "README.md" @@ -39,13 +39,13 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.4.0-beta.1" actix-service = "2.0.0-beta.2" -actix-http = "2.2.0" +actix-http = "3.0.0-beta.1" actix-rt = "2.0.0-beta.1" base64 = "0.13" bytes = "1" cfg-if = "1.0" -derive_more = "0.99.2" +derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false } log =" 0.4" mime = "0.3" @@ -61,9 +61,9 @@ rust-tls = { version = "0.19.0", package = "rustls", optional = true, features = # TODO: actix is temporary added as dev dep for actix-macro reason. # Can be removed when it does not impact tests. actix = "0.11.0-beta.1" -actix-web = { version = "3.0.0", features = ["openssl"] } -actix-http = { version = "2.0.0", features = ["openssl"] } -actix-http-test = { version = "2.0.0", features = ["openssl"] } +actix-web = { version = "4.0.0-beta.1", features = ["openssl"] } +actix-http = { version = "3.0.0-beta.1", features = ["openssl"] } +actix-http-test = { version = "3.0.0-beta.1", features = ["openssl"] } actix-utils = "3.0.0-beta.1" actix-server = "2.0.0-beta.2" actix-tls = { version = "3.0.0-beta.2", features = ["openssl", "rustls"] } From a4c9aaf337459d77edbf58df36ad4ad75b1d1746 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 8 Jan 2021 04:42:09 +0800 Subject: [PATCH 09/11] fix extra branch in h1 dispatcher timer (#1882) --- actix-http/src/h1/dispatcher.rs | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index a914880ce..8ef96fbef 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -687,15 +687,11 @@ where if let Some(deadline) = this.codec.config().client_disconnect_timer() { - if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() + if let Some(mut timer) = + this.ka_timer.as_mut().as_pin_mut() { - timer.reset(deadline); - let _ = this - .ka_timer - .as_mut() - .as_pin_mut() - .unwrap() - .poll(cx); + timer.as_mut().reset(deadline); + let _ = timer.poll(cx); } } else { // no shutdown timeout, drop socket @@ -720,15 +716,14 @@ where } else if let Some(deadline) = this.codec.config().keep_alive_expire() { - if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() { - timer.reset(deadline); - let _ = - this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx); + if let Some(mut timer) = this.ka_timer.as_mut().as_pin_mut() { + timer.as_mut().reset(deadline); + let _ = timer.poll(cx); } } - } else if let Some(timer) = this.ka_timer.as_mut().as_pin_mut() { - timer.reset(*this.ka_expire); - let _ = this.ka_timer.as_mut().as_pin_mut().unwrap().poll(cx); + } else if let Some(mut timer) = this.ka_timer.as_mut().as_pin_mut() { + timer.as_mut().reset(*this.ka_expire); + let _ = timer.poll(cx); } } Poll::Pending => {} From 188ee44f819670944c0af92730f863e5683e7045 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 8 Jan 2021 05:55:00 +0800 Subject: [PATCH 10/11] remove copyless dependency (#1884) --- actix-http/Cargo.toml | 1 - actix-http/src/message.rs | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 1c8206ef2..b64c71a8a 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -53,7 +53,6 @@ bitflags = "1.2" bytes = "1" bytestring = "1" cookie = { version = "0.14.1", features = ["percent-encode"] } -copyless = "0.1.4" derive_more = "0.99.5" either = "1.5.3" encoding_rs = "0.8" diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 3673017bc..1a5500c31 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -3,7 +3,6 @@ use std::net; use std::rc::Rc; use bitflags::bitflags; -use copyless::BoxHelper; use crate::extensions::Extensions; use crate::header::HeaderMap; @@ -480,17 +479,17 @@ impl BoxedResponsePool { BoxedResponseHead { head: Some(head) } } else { BoxedResponseHead { - head: Some(Box::alloc().init(ResponseHead::new(status))), + head: Some(Box::new(ResponseHead::new(status))), } } } #[inline] /// Release request instance - fn release(&self, msg: Box) { + fn release(&self, mut msg: Box) { let v = &mut self.0.borrow_mut(); if v.len() < 128 { - msg.extensions.borrow_mut().clear(); + msg.extensions.get_mut().clear(); v.push(msg); } } From 2204614134a95e1a904091fcde614d44829c1499 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 8 Jan 2021 12:00:58 +0000 Subject: [PATCH 11/11] don't run awc doctests that rely on external public endpoints (#1888) --- awc/src/lib.rs | 10 +++++----- awc/src/ws.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/awc/src/lib.rs b/awc/src/lib.rs index d9db7a2cf..aad6ec38b 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -2,7 +2,7 @@ //! //! ## Making a GET request //! -//! ```rust +//! ```no_run //! # #[actix_rt::main] //! # async fn main() -> Result<(), awc::error::SendRequestError> { //! let mut client = awc::Client::default(); @@ -20,7 +20,7 @@ //! //! ### Raw body contents //! -//! ```rust +//! ```no_run //! # #[actix_rt::main] //! # async fn main() -> Result<(), awc::error::SendRequestError> { //! let mut client = awc::Client::default(); @@ -33,7 +33,7 @@ //! //! ### Forms //! -//! ```rust +//! ```no_run //! # #[actix_rt::main] //! # async fn main() -> Result<(), awc::error::SendRequestError> { //! let params = [("foo", "bar"), ("baz", "quux")]; @@ -48,7 +48,7 @@ //! //! ### JSON //! -//! ```rust +//! ```no_run //! # #[actix_rt::main] //! # async fn main() -> Result<(), awc::error::SendRequestError> { //! let request = serde_json::json!({ @@ -66,7 +66,7 @@ //! //! ## WebSocket support //! -//! ``` +//! ```no_run //! # #[actix_rt::main] //! # async fn main() -> Result<(), Box> { //! use futures_util::{sink::SinkExt, stream::StreamExt}; diff --git a/awc/src/ws.rs b/awc/src/ws.rs index b90d0942b..fda2aefca 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -4,7 +4,7 @@ //! //! # Example //! -//! ``` +//! ```no_run //! use awc::{Client, ws}; //! use futures_util::{sink::SinkExt, stream::StreamExt}; //!