diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7d0520d52..dc0ff0c19 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,6 +26,21 @@ jobs: steps: - uses: actions/checkout@v2 + # install OpenSSL on Windows + - name: Set vcpkg root + if: matrix.target.triple == 'x86_64-pc-windows-msvc' + run: echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append + - name: Install OpenSSL + if: matrix.target.triple == 'x86_64-pc-windows-msvc' + run: vcpkg install openssl:x64-windows + + - name: Install ${{ matrix.version }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.version }}-${{ matrix.target.triple }} + profile: minimal + override: true + - name: Install ${{ matrix.version }} uses: actions-rs/toolchain@v1 with: diff --git a/Cargo.toml b/Cargo.toml index 8477c8ede..0877edf9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,12 +111,6 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tls-rustls = { package = "rustls", version = "0.19.0", optional = true } url = "2.1" -[target.'cfg(windows)'.dependencies.tls-openssl] -version = "0.10.9" -package = "openssl" -features = ["vendored"] -optional = true - [dev-dependencies] brotli2 = "0.3.2" criterion = "0.3" diff --git a/actix-http-test/Cargo.toml b/actix-http-test/Cargo.toml index 69b2a3335..c1e61556b 100644 --- a/actix-http-test/Cargo.toml +++ b/actix-http-test/Cargo.toml @@ -50,12 +50,6 @@ serde_urlencoded = "0.7" time = { version = "0.2.23", default-features = false, features = ["std"] } tls-openssl = { version = "0.10.9", package = "openssl", optional = true } -[target.'cfg(windows)'.dependencies.tls-openssl] -version = "0.10.9" -package = "openssl" -features = ["vendored"] -optional = true - [dev-dependencies] actix-web = { version = "4.0.0-beta.4", default-features = false, features = ["cookies"] } actix-http = "3.0.0-beta.4" diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index edcc7efd5..e1aebb76b 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -98,11 +98,6 @@ serde_derive = "1.0" tls-openssl = { version = "0.10", package = "openssl" } tls-rustls = { version = "0.19", package = "rustls" } -[target.'cfg(windows)'.dev-dependencies.tls-openssl] -version = "0.10.9" -package = "openssl" -features = ["vendored"] - [[example]] name = "ws" required-features = ["rustls"] diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index 90d768cbe..176ac5c2b 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -3,7 +3,7 @@ use std::{env, io}; use actix_http::{Error, HttpService, Request, Response}; use actix_server::Server; use bytes::BytesMut; -use futures_util::StreamExt; +use futures_util::StreamExt as _; use http::header::HeaderValue; use log::info; diff --git a/actix-http/examples/echo2.rs b/actix-http/examples/echo2.rs index bc932ce8f..408a40114 100644 --- a/actix-http/examples/echo2.rs +++ b/actix-http/examples/echo2.rs @@ -4,7 +4,7 @@ use actix_http::http::HeaderValue; use actix_http::{Error, HttpService, Request, Response}; use actix_server::Server; use bytes::BytesMut; -use futures_util::StreamExt; +use futures_util::StreamExt as _; use log::info; async fn handle_request(mut req: Request) -> Result { diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 97ecd0515..291148b5b 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -19,6 +19,10 @@ use super::error::SendRequestError; use super::pool::Acquired; use super::{h1proto, h2proto}; +pub trait ConnectionIo: AsyncRead + AsyncWrite + Unpin + 'static {} + +impl ConnectionIo for T {} + pub(crate) enum ConnectionType { H1(Io), H2(H2Connection), @@ -94,14 +98,6 @@ pub trait Connection { >; } -pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { - /// Close connection - fn close(self: Pin<&mut Self>); - - /// Release connection to the connection pool - fn release(self: Pin<&mut Self>); -} - #[doc(hidden)] /// HTTP client connection pub struct IoConnection @@ -110,7 +106,7 @@ where { io: Option>, created: time::Instant, - pool: Option>, + pool: Acquired, } impl fmt::Debug for IoConnection @@ -130,7 +126,7 @@ impl IoConnection { pub(crate) fn new( io: ConnectionType, created: time::Instant, - pool: Option>, + pool: Acquired, ) -> Self { IoConnection { pool, @@ -139,13 +135,9 @@ impl IoConnection { } } - pub(crate) fn into_inner(self) -> (ConnectionType, time::Instant) { - (self.io.unwrap(), self.created) - } - #[cfg(test)] pub(crate) fn into_parts(self) -> (ConnectionType, time::Instant, Acquired) { - (self.io.unwrap(), self.created, self.pool.unwrap()) + (self.io.unwrap(), self.created, self.pool) } async fn send_request>( @@ -173,13 +165,7 @@ impl IoConnection { match self.io.take().unwrap() { ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await, ConnectionType::H2(io) => { - if let Some(mut pool) = self.pool.take() { - pool.release(IoConnection::new( - ConnectionType::H2(io), - self.created, - None, - )); - } + self.pool.release(ConnectionType::H2(io), self.created); Err(SendRequestError::TunnelNotSupported) } } diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index ebdbbb6eb..0c9159b87 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -18,7 +18,7 @@ use futures_core::ready; use http::Uri; use super::config::ConnectorConfig; -use super::connection::{Connection, EitherIoConnection}; +use super::connection::{Connection, ConnectionIo, EitherIoConnection}; use super::error::ConnectError; use super::pool::ConnectionPool; use super::Connect; @@ -61,9 +61,6 @@ pub struct Connector { ssl: SslConnector, } -pub trait Io: AsyncRead + AsyncWrite + Unpin {} -impl Io for T {} - impl Connector<()> { #[allow(clippy::new_ret_no_self, clippy::let_unit_value)] pub fn new() -> Connector< @@ -281,16 +278,16 @@ where pub type DummyService = Box< dyn Service< Connect, - Response = (Box, Protocol), + Response = (Box, Protocol), Error = ConnectError, Future = futures_core::future::LocalBoxFuture< 'static, - Result<(Box, Protocol), ConnectError>, + Result<(Box, Protocol), ConnectError>, >, >, >; - InnerConnector::<_, DummyService, _, Box> { + InnerConnector::<_, DummyService, _, Box> { tcp_pool: ConnectionPool::new( tcp_service, self.config.no_disconnect_timeout(), @@ -334,9 +331,12 @@ where .map(|protos| protos.windows(2).any(|w| w == H2)) .unwrap_or(false); if h2 { - (Box::new(sock) as Box, Protocol::Http2) + ( + Box::new(sock) as Box, + Protocol::Http2, + ) } else { - (Box::new(sock) as Box, Protocol::Http1) + (Box::new(sock) as _, Protocol::Http1) } }) .map_err(ConnectError::from), @@ -354,9 +354,9 @@ where .map(|protos| protos.windows(2).any(|w| w == H2)) .unwrap_or(false); if h2 { - (Box::new(sock) as Box, Protocol::Http2) + (Box::new(sock) as _, Protocol::Http2) } else { - (Box::new(sock) as Box, Protocol::Http1) + (Box::new(sock) as _, Protocol::Http1) } }), ), diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index d2db18cec..75a3c1f3d 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -7,7 +7,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use futures_core::Stream; -use futures_util::{future::poll_fn, SinkExt, StreamExt}; +use futures_util::{future::poll_fn, SinkExt as _}; use crate::error::PayloadError; use crate::h1; @@ -19,7 +19,7 @@ use crate::http::{ use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::{Payload, PayloadStream}; -use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; +use super::connection::ConnectionType; use super::error::{ConnectError, SendRequestError}; use super::pool::Acquired; use crate::body::{BodySize, MessageBody}; @@ -29,7 +29,7 @@ pub(crate) async fn send_request( mut head: RequestHeadType, body: B, created: time::Instant, - pool: Option>, + acquired: Acquired, ) -> Result<(ResponseHead, Payload), SendRequestError> where T: AsyncRead + AsyncWrite + Unpin + 'static, @@ -42,9 +42,9 @@ where if let Some(host) = head.as_ref().uri.host() { let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); - let _ = match head.as_ref().uri.port_u16() { - None | Some(80) | Some(443) => write!(wrt, "{}", host), - Some(port) => write!(wrt, "{}:{}", host, port), + match head.as_ref().uri.port_u16() { + None | Some(80) | Some(443) => write!(wrt, "{}", host)?, + Some(port) => write!(wrt, "{}:{}", host, port)?, }; match wrt.get_mut().split().freeze().try_into_value() { @@ -64,7 +64,7 @@ where let io = H1Connection { created, - pool, + acquired, io: Some(io), }; @@ -77,10 +77,8 @@ where let is_expect = if head.as_ref().headers.contains_key(EXPECT) { match body.size() { BodySize::None | BodySize::Empty | BodySize::Sized(0) => { - let pin_framed = Pin::new(&mut framed); - - let force_close = !pin_framed.codec_ref().keepalive(); - release_connection(pin_framed, force_close); + let keep_alive = framed.codec_ref().keepalive(); + framed.io_mut().on_release(keep_alive); // TODO: use a new variant or a new type better describing error violate // `Requirements for clients` session of above RFC @@ -128,8 +126,9 @@ where match pin_framed.codec_ref().message_type() { h1::MessageType::None => { - let force_close = !pin_framed.codec_ref().keepalive(); - release_connection(pin_framed, force_close); + let keep_alive = pin_framed.codec_ref().keepalive(); + pin_framed.io_mut().on_release(keep_alive); + Ok((head, Payload::None)) } _ => { @@ -151,12 +150,11 @@ where framed.send((head, BodySize::None).into()).await?; // read response - if let (Some(result), framed) = framed.into_future().await { - let head = result.map_err(SendRequestError::from)?; - Ok((head, framed)) - } else { - Err(SendRequestError::from(ConnectError::Disconnected)) - } + let head = poll_fn(|cx| Pin::new(&mut framed).poll_next(cx)) + .await + .ok_or(ConnectError::Disconnected)??; + + Ok((head, framed)) } /// send request body to the peer @@ -165,7 +163,7 @@ pub(crate) async fn send_body( mut framed: Pin<&mut Framed>, ) -> Result<(), SendRequestError> where - T: ConnectionLifetime + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, B: MessageBody, { actix_rt::pin!(body); @@ -200,7 +198,7 @@ where } } - SinkExt::flush(Pin::into_inner(framed)).await?; + framed.get_mut().flush().await?; Ok(()) } @@ -208,41 +206,37 @@ where /// HTTP client connection pub struct H1Connection where - T: AsyncWrite + Unpin + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, { /// T should be `Unpin` io: Option, created: time::Instant, - pool: Option>, + acquired: Acquired, } -impl ConnectionLifetime for H1Connection +impl H1Connection where T: AsyncRead + AsyncWrite + Unpin + 'static, { + fn on_release(&mut self, keep_alive: bool) { + if keep_alive { + self.release(); + } else { + self.close(); + } + } + /// Close connection - fn close(mut self: Pin<&mut Self>) { - if let Some(mut pool) = self.pool.take() { - if let Some(io) = self.io.take() { - pool.close(IoConnection::new( - ConnectionType::H1(io), - self.created, - None, - )); - } + fn close(&mut self) { + if let Some(io) = self.io.take() { + self.acquired.close(ConnectionType::H1(io)); } } /// Release this connection to the connection pool - fn release(mut self: Pin<&mut Self>) { - if let Some(mut pool) = self.pool.take() { - if let Some(io) = self.io.take() { - pool.release(IoConnection::new( - ConnectionType::H1(io), - self.created, - None, - )); - } + fn release(&mut self) { + if let Some(io) = self.io.take() { + self.acquired.release(ConnectionType::H1(io), self.created); } } } @@ -282,13 +276,19 @@ impl AsyncWrite for H1Connection } #[pin_project::pin_project] -pub(crate) struct PlStream { +pub(crate) struct PlStream +where + Io: AsyncRead + AsyncWrite + Unpin + 'static, +{ #[pin] - framed: Option>, + framed: Option, h1::ClientPayloadCodec>>, } -impl PlStream { - fn new(framed: Framed) -> Self { +impl PlStream +where + Io: AsyncRead + AsyncWrite + Unpin + 'static, +{ + fn new(framed: Framed, h1::ClientCodec>) -> Self { let framed = framed.into_map_codec(|codec| codec.into_payload_codec()); PlStream { @@ -297,24 +297,26 @@ impl PlStream { } } -impl Stream for PlStream { +impl Stream for PlStream +where + Io: AsyncRead + AsyncWrite + Unpin + 'static, +{ type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let mut this = self.project(); + let mut framed = self.project().framed.as_pin_mut().unwrap(); - match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? { + match framed.as_mut().next_item(cx)? { Poll::Pending => Poll::Pending, Poll::Ready(Some(chunk)) => { if let Some(chunk) = chunk { Poll::Ready(Some(Ok(chunk))) } else { - let framed = this.framed.as_mut().as_pin_mut().unwrap(); - let force_close = !framed.codec_ref().keepalive(); - release_connection(framed, force_close); + let keep_alive = framed.codec_ref().keepalive(); + framed.io_mut().on_release(keep_alive); Poll::Ready(None) } } @@ -322,14 +324,3 @@ impl Stream for PlStream { } } } - -fn release_connection(framed: Pin<&mut Framed>, force_close: bool) -where - T: ConnectionLifetime, -{ - if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() { - framed.io_pin().release() - } else { - framed.io_pin().close() - } -} diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 7292972de..82e81c7ff 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -17,7 +17,7 @@ use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; use super::config::ConnectorConfig; -use super::connection::{ConnectionType, IoConnection}; +use super::connection::ConnectionType; use super::error::SendRequestError; use super::pool::Acquired; use crate::client::connection::H2Connection; @@ -27,7 +27,7 @@ pub(crate) async fn send_request( head: RequestHeadType, body: B, created: time::Instant, - pool: Option>, + acquired: Acquired, ) -> Result<(ResponseHead, Payload), SendRequestError> where T: AsyncRead + AsyncWrite + Unpin + 'static, @@ -103,13 +103,13 @@ where let res = poll_fn(|cx| io.poll_ready(cx)).await; if let Err(e) = res { - release(io, pool, created, e.is_io()); + release(io, acquired, created, e.is_io()); return Err(SendRequestError::from(e)); } let resp = match io.send_request(req, eof) { Ok((fut, send)) => { - release(io, pool, created, false); + release(io, acquired, created, false); if !eof { send_body(body, send).await?; @@ -117,7 +117,7 @@ where fut.await.map_err(SendRequestError::from)? } Err(e) => { - release(io, pool, created, e.is_io()); + release(io, acquired, created, e.is_io()); return Err(e.into()); } }; @@ -181,16 +181,14 @@ async fn send_body( /// release SendRequest object fn release( io: H2Connection, - pool: Option>, + acquired: Acquired, created: time::Instant, close: bool, ) { - if let Some(mut pool) = pool { - if close { - pool.close(IoConnection::new(ConnectionType::H2(io), created, None)); - } else { - pool.release(IoConnection::new(ConnectionType::H2(io), created, None)); - } + if close { + acquired.close(ConnectionType::H2(io)); + } else { + acquired.release(ConnectionType::H2(io), created); } } diff --git a/actix-http/src/client/mod.rs b/actix-http/src/client/mod.rs index c6f998c2a..ce6aa3bc1 100644 --- a/actix-http/src/client/mod.rs +++ b/actix-http/src/client/mod.rs @@ -14,7 +14,7 @@ pub use actix_tls::connect::{ Connect as TcpConnect, ConnectError as TcpConnectError, Connection as TcpConnection, }; -pub use self::connection::Connection; +pub use self::connection::{Connection, ConnectionIo}; pub use self::connector::Connector; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use crate::Protocol; diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index b5b4a30d3..79adcf3e2 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -217,7 +217,7 @@ where // construct acquired. It's used to put Io type back to pool/ close the Io type. // permit is carried with the whole lifecycle of Acquired. - let acquired = Some(Acquired { key, inner, permit }); + let acquired = Acquired { key, inner, permit }; // match the connection and spawn new one if did not get anything. match conn { @@ -235,7 +235,7 @@ where acquired, )) } else { - let config = &acquired.as_ref().unwrap().inner.config; + let config = &acquired.inner.config; let (sender, connection) = handshake(io, config).await?; Ok(IoConnection::new( ConnectionType::H2(H2Connection::new(sender, connection)), @@ -346,14 +346,12 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, { /// Close the IO. - pub(crate) fn close(&mut self, conn: IoConnection) { - let (conn, _) = conn.into_inner(); + pub(crate) fn close(&self, conn: ConnectionType) { self.inner.close(conn); } /// Release IO back into pool. - pub(crate) fn release(&mut self, conn: IoConnection) { - let (io, created) = conn.into_inner(); + pub(crate) fn release(&self, conn: ConnectionType, created: Instant) { let Acquired { key, inner, .. } = self; inner @@ -362,12 +360,12 @@ where .entry(key.clone()) .or_insert_with(VecDeque::new) .push_back(PooledConnection { - conn: io, + conn, created, used: Instant::now(), }); - let _ = &mut self.permit; + let _ = &self.permit; } } @@ -447,8 +445,8 @@ mod test { where T: AsyncRead + AsyncWrite + Unpin + 'static, { - let (conn, created, mut acquired) = conn.into_parts(); - acquired.release(IoConnection::new(conn, created, None)); + let (conn, created, acquired) = conn.into_parts(); + acquired.release(conn, created); } #[actix_rt::test] diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5f64b168a..11ace7ca7 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -662,7 +662,7 @@ where // got timeout during shutdown, drop connection if this.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); - // exceed deadline. check for any outstanding tasks + // exceed deadline. check for any outstanding tasks } else if timer.deadline() >= *this.ka_expire { // have no task at hand. if this.state.is_empty() && this.write_buf.is_empty() { @@ -695,15 +695,15 @@ where this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); this.state.set(State::None); } - // still have unfinished task. try to reset and register keep-alive. + // still have unfinished task. try to reset and register keep-alive. } else if let Some(deadline) = this.codec.config().keep_alive_expire() { timer.as_mut().reset(deadline); let _ = timer.poll(cx); } - // timer resolved but still have not met the keep-alive expire deadline. - // reset and register for later wakeup. + // timer resolved but still have not met the keep-alive expire deadline. + // reset and register for later wakeup. } else { timer.as_mut().reset(*this.ka_expire); let _ = timer.poll(cx); @@ -953,14 +953,15 @@ mod tests { use std::str; use actix_service::fn_service; - use futures_util::future::{lazy, ready}; + use futures_util::future::{lazy, ready, Ready}; use super::*; - use crate::test::TestBuffer; - use crate::{error::Error, KeepAlive}; use crate::{ + error::Error, h1::{ExpectHandler, UpgradeHandler}, - test::TestSeqBuffer, + http::Method, + test::{TestBuffer, TestSeqBuffer}, + HttpMessage, KeepAlive, }; fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { @@ -1284,14 +1285,30 @@ mod tests { #[actix_rt::test] async fn test_upgrade() { + struct TestUpgrade; + + impl Service<(Request, Framed)> for TestUpgrade { + type Response = (); + type Error = Error; + type Future = Ready>; + + actix_service::always_ready!(); + + fn call(&self, (req, _framed): (Request, Framed)) -> Self::Future { + assert_eq!(req.method(), Method::GET); + assert!(req.upgrade()); + assert_eq!(req.headers().get("upgrade").unwrap(), "websocket"); + ready(Ok(())) + } + } + lazy(|cx| { let mut buf = TestSeqBuffer::empty(); let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); - let services = - HttpFlow::new(ok_service(), ExpectHandler, Some(UpgradeHandler)); + let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); - let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new( buf.clone(), cfg, services, diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs index 65856edf6..5015069bb 100644 --- a/actix-http/src/h1/expect.rs +++ b/actix-http/src/h1/expect.rs @@ -1,5 +1,3 @@ -use std::task::Poll; - use actix_service::{Service, ServiceFactory}; use futures_util::future::{ready, Ready}; diff --git a/actix-http/src/h1/upgrade.rs b/actix-http/src/h1/upgrade.rs index 5e24d84e3..e57ea8ae9 100644 --- a/actix-http/src/h1/upgrade.rs +++ b/actix-http/src/h1/upgrade.rs @@ -1,8 +1,6 @@ -use std::task::Poll; - use actix_codec::Framed; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; +use futures_core::future::LocalBoxFuture; use crate::error::Error; use crate::h1::Codec; @@ -16,7 +14,7 @@ impl ServiceFactory<(Request, Framed)> for UpgradeHandler { type Config = (); type Service = UpgradeHandler; type InitError = Error; - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { unimplemented!() @@ -26,11 +24,11 @@ impl ServiceFactory<(Request, Framed)> for UpgradeHandler { impl Service<(Request, Framed)> for UpgradeHandler { type Response = (); type Error = Error; - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); fn call(&self, _: (Request, Framed)) -> Self::Future { - ready(Ok(())) + unimplemented!() } } diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 958c761d5..6e6cd5a2f 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -5,8 +5,10 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; use bytes::{Bytes, BytesMut}; use futures_core::ready; -use h2::server::{Connection, SendResponse}; -use h2::SendStream; +use h2::{ + server::{Connection, SendResponse}, + SendStream, +}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 1dc290e49..c64139564 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -13,7 +13,7 @@ use actix_service::{ use bytes::Bytes; use futures_core::ready; use futures_util::future::ok; -use h2::server::{self, Handshake}; +use h2::server::{handshake, Handshake}; use log::error; use crate::body::MessageBody; @@ -307,7 +307,7 @@ where Some(self.cfg.clone()), addr, on_connect_data, - server::handshake(io), + handshake(io), ), } } diff --git a/actix-http/src/helpers.rs b/actix-http/src/helpers.rs index 13195f7db..74188717d 100644 --- a/actix-http/src/helpers.rs +++ b/actix-http/src/helpers.rs @@ -1,15 +1,15 @@ use std::io; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use http::Version; const DIGITS_START: u8 = b'0'; -pub(crate) fn write_status_line(version: Version, n: u16, bytes: &mut BytesMut) { +pub(crate) fn write_status_line(version: Version, n: u16, buf: &mut B) { match version { - Version::HTTP_11 => bytes.put_slice(b"HTTP/1.1 "), - Version::HTTP_10 => bytes.put_slice(b"HTTP/1.0 "), - Version::HTTP_09 => bytes.put_slice(b"HTTP/0.9 "), + Version::HTTP_11 => buf.put_slice(b"HTTP/1.1 "), + Version::HTTP_10 => buf.put_slice(b"HTTP/1.0 "), + Version::HTTP_09 => buf.put_slice(b"HTTP/0.9 "), _ => { // other HTTP version handlers do not use this method } @@ -19,33 +19,36 @@ pub(crate) fn write_status_line(version: Version, n: u16, bytes: &mut BytesMut) let d10 = ((n / 10) % 10) as u8; let d1 = (n % 10) as u8; - bytes.put_u8(DIGITS_START + d100); - bytes.put_u8(DIGITS_START + d10); - bytes.put_u8(DIGITS_START + d1); + buf.put_u8(DIGITS_START + d100); + buf.put_u8(DIGITS_START + d10); + buf.put_u8(DIGITS_START + d1); // trailing space before reason - bytes.put_u8(b' '); + buf.put_u8(b' '); } /// NOTE: bytes object has to contain enough space -pub fn write_content_length(n: u64, bytes: &mut BytesMut) { +pub fn write_content_length(n: u64, buf: &mut B) { if n == 0 { - bytes.put_slice(b"\r\ncontent-length: 0\r\n"); + buf.put_slice(b"\r\ncontent-length: 0\r\n"); return; } - let mut buf = itoa::Buffer::new(); + let mut buffer = itoa::Buffer::new(); - bytes.put_slice(b"\r\ncontent-length: "); - bytes.put_slice(buf.format(n).as_bytes()); - bytes.put_slice(b"\r\n"); + buf.put_slice(b"\r\ncontent-length: "); + buf.put_slice(buffer.format(n).as_bytes()); + buf.put_slice(b"\r\n"); } -pub(crate) struct Writer<'a>(pub &'a mut BytesMut); +pub(crate) struct Writer<'a, B>(pub &'a mut B); -impl<'a> io::Write for Writer<'a> { +impl<'a, B> io::Write for Writer<'a, B> +where + B: BufMut, +{ fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.extend_from_slice(buf); + self.0.put_slice(buf); Ok(buf.len()) } @@ -58,6 +61,8 @@ impl<'a> io::Write for Writer<'a> { mod tests { use std::str::from_utf8; + use bytes::BytesMut; + use super::*; #[test] diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 89f3e3bb1..1a06cec3d 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,14 +1,19 @@ -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, net, rc::Rc}; +use std::{ + fmt, + future::Future, + marker::PhantomData, + net, + pin::Pin, + rc::Rc, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; -use futures_core::{ready, Future}; -use h2::server::{self, Handshake}; +use futures_core::ready; +use h2::server::{handshake, Handshake}; use pin_project::pin_project; use crate::body::MessageBody; @@ -562,7 +567,7 @@ where match proto { Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake(Some(( - server::handshake(io), + handshake(io), self.cfg.clone(), self.flow.clone(), on_connect_data, diff --git a/actix-http/src/time_parser.rs b/actix-http/src/time_parser.rs index 46bf73037..fd82fd42e 100644 --- a/actix-http/src/time_parser.rs +++ b/actix-http/src/time_parser.rs @@ -1,7 +1,7 @@ use time::{Date, OffsetDateTime, PrimitiveDateTime}; /// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime. -pub fn parse_http_date(time: &str) -> Option { +pub(crate) fn parse_http_date(time: &str) -> Option { try_parse_rfc_1123(time) .or_else(|| try_parse_rfc_850(time)) .or_else(|| try_parse_asctime(time)) diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index a50f2404d..758e39745 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -6,7 +6,7 @@ use actix_service::ServiceFactoryExt; use bytes::Bytes; use futures_util::{ future::{self, ok}, - StreamExt, + StreamExt as _, }; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index d5ec645a4..49a68a60d 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -12,8 +12,11 @@ use actix_http::{body, Error, HttpService, Request, Response}; use actix_http_test::test_server; use actix_service::{fn_service, ServiceFactoryExt}; use bytes::{Bytes, BytesMut}; -use futures_util::future::{err, ok, ready}; -use futures_util::stream::{once, Stream, StreamExt}; +use futures_core::Stream; +use futures_util::{ + future::{err, ok, ready}, + stream::{once, StreamExt as _}, +}; use openssl::{ pkey::PKey, ssl::{SslAcceptor, SslMethod}, diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index 81edb5c18..7a3cb1473 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -10,8 +10,9 @@ use actix_http_test::test_server; use actix_service::{fn_factory_with_config, fn_service}; use bytes::{Bytes, BytesMut}; +use futures_core::Stream; use futures_util::future::{self, err, ok}; -use futures_util::stream::{once, Stream, StreamExt}; +use futures_util::stream::{once, StreamExt as _}; use rustls::{ internal::pemfile::{certs, pkcs8_private_keys}, NoClientAuth, ServerConfig as RustlsServerConfig, diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index a4c1f92b5..6d145400c 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -7,7 +7,7 @@ use actix_rt::time::sleep; use actix_service::fn_service; use bytes::Bytes; use futures_util::future::{self, err, ok, ready, FutureExt}; -use futures_util::stream::{once, StreamExt}; +use futures_util::stream::{once, StreamExt as _}; use regex::Regex; use actix_http::HttpMessage; diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 7ed9b0df1..3b90b4e54 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -12,7 +12,7 @@ use actix_utils::dispatcher::Dispatcher; use bytes::Bytes; use futures_util::future; use futures_util::task::{Context, Poll}; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt as _, StreamExt as _}; struct WsService(Arc, Cell)>>); diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 981b93a52..ec2e03a96 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -66,12 +66,6 @@ serde_urlencoded = "0.7" tls-openssl = { version = "0.10.9", package = "openssl", optional = true } tls-rustls = { version = "0.19.0", package = "rustls", optional = true, features = ["dangerous_configuration"] } -[target.'cfg(windows)'.dependencies.tls-openssl] -version = "0.10.9" -package = "openssl" -features = ["vendored"] -optional = true - [dev-dependencies] actix-web = { version = "4.0.0-beta.4", features = ["openssl"] } actix-http = { version = "3.0.0-beta.4", features = ["openssl"] } diff --git a/awc/README.md b/awc/README.md index ec3984d60..2e7dad320 100644 --- a/awc/README.md +++ b/awc/README.md @@ -27,7 +27,7 @@ fn main() { let res = client .get("http://www.rust-lang.org") // <- Create request builder - .header("User-Agent", "Actix-web") + .insert_header(("User-Agent", "Actix-web")) .send() // <- Send http request .await; diff --git a/awc/src/connect.rs b/awc/src/connect.rs index a4abbc46b..d3bc01ed6 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,15 +1,16 @@ use std::{ - fmt, future::Future, - io, net, + net, pin::Pin, task::{Context, Poll}, }; -use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; +use actix_codec::Framed; use actix_http::{ body::Body, - client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, + client::{ + Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError, + }, h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead, }; @@ -123,7 +124,7 @@ impl Future for ConnectRequestFuture where Fut: Future>, C: Connection, - Io: AsyncRead + AsyncWrite + Unpin + 'static, + Io: ConnectionIo, { type Output = Result; @@ -138,14 +139,14 @@ where let fut = ConnectRequestFuture::Client { fut: connection.send_request(head, body), }; - self.as_mut().set(fut); + self.set(fut); } ConnectRequest::Tunnel(head, ..) => { // send request let fut = ConnectRequestFuture::Tunnel { fut: connection.open_tunnel(RequestHeadType::from(head)), }; - self.as_mut().set(fut); + self.set(fut); } } self.poll(cx) @@ -158,65 +159,11 @@ where } ConnectRequestProj::Tunnel { fut } => { let (head, framed) = ready!(fut.as_mut().poll(cx))?; - let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); + let framed = framed.into_map_io(|io| Box::new(io) as _); Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed))) } } } } -trait AsyncSocket { - fn as_read(&self) -> &(dyn AsyncRead + Unpin); - fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin); - fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin); -} - -struct Socket(T); - -impl AsyncSocket for Socket { - fn as_read(&self) -> &(dyn AsyncRead + Unpin) { - &self.0 - } - fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) { - &mut self.0 - } - fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) { - &mut self.0 - } -} - -pub struct BoxedSocket(Box); - -impl fmt::Debug for BoxedSocket { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "BoxedSocket") - } -} - -impl AsyncRead for BoxedSocket { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf) - } -} - -impl AsyncWrite for BoxedSocket { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(self.get_mut().0.as_write()).poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx) - } -} +pub type BoxedSocket = Box; diff --git a/src/app_service.rs b/src/app_service.rs index 9b4ae3354..be4ccf22f 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -1,6 +1,5 @@ use std::cell::RefCell; use std::rc::Rc; -use std::task::Poll; use actix_http::{Extensions, Request, Response}; use actix_router::{Path, ResourceDef, Router, Url}; diff --git a/src/resource.rs b/src/resource.rs index 944beeefa..1a5619de6 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -2,7 +2,6 @@ use std::cell::RefCell; use std::fmt; use std::future::Future; use std::rc::Rc; -use std::task::Poll; use actix_http::{Error, Extensions, Response}; use actix_router::IntoPattern; diff --git a/src/scope.rs b/src/scope.rs index dd02501b0..e96f5b9e8 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -2,7 +2,6 @@ use std::cell::RefCell; use std::fmt; use std::future::Future; use std::rc::Rc; -use std::task::Poll; use actix_http::Extensions; use actix_router::{ResourceDef, Router}; diff --git a/src/server.rs b/src/server.rs index d69d6570d..99355593a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,13 +12,6 @@ use actix_http::{ use actix_server::{Server, ServerBuilder}; use actix_service::{map_config, IntoServiceFactory, Service, ServiceFactory}; -#[cfg(unix)] -use actix_http::Protocol; -#[cfg(unix)] -use actix_service::pipeline_factory; -#[cfg(unix)] -use futures_util::future::ok; - #[cfg(feature = "openssl")] use actix_tls::accept::openssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; #[cfg(feature = "rustls")] @@ -489,7 +482,9 @@ where #[cfg(unix)] /// Start listening for unix domain (UDS) connections on existing listener. pub fn listen_uds(mut self, lst: std::os::unix::net::UnixListener) -> io::Result { + use actix_http::Protocol; use actix_rt::net::UnixStream; + use actix_service::pipeline_factory; let cfg = self.config.clone(); let factory = self.factory.clone(); @@ -511,19 +506,22 @@ where c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), ); - pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then({ - let svc = HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout); + pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }) + .and_then({ + let svc = HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout); - let svc = if let Some(handler) = on_connect_fn.clone() { - svc.on_connect_ext(move |io: &_, ext: _| (&*handler)(io as &dyn Any, ext)) - } else { - svc - }; + let svc = if let Some(handler) = on_connect_fn.clone() { + svc.on_connect_ext(move |io: &_, ext: _| { + (&*handler)(io as &dyn Any, ext) + }) + } else { + svc + }; - svc.finish(map_config(factory(), move |_| config.clone())) - }) + svc.finish(map_config(factory(), move |_| config.clone())) + }) })?; Ok(self) } @@ -534,7 +532,9 @@ where where A: AsRef, { + use actix_http::Protocol; use actix_rt::net::UnixStream; + use actix_service::pipeline_factory; let cfg = self.config.clone(); let factory = self.factory.clone(); @@ -555,12 +555,13 @@ where socket_addr, c.host.clone().unwrap_or_else(|| format!("{}", socket_addr)), ); - pipeline_factory(|io: UnixStream| ok((io, Protocol::Http1, None))).and_then( - HttpService::build() - .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .finish(map_config(factory(), move |_| config.clone())), - ) + pipeline_factory(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }) + .and_then( + HttpService::build() + .keep_alive(c.keep_alive) + .client_timeout(c.client_timeout) + .finish(map_config(factory(), move |_| config.clone())), + ) }, )?; Ok(self) diff --git a/src/types/path.rs b/src/types/path.rs index 4ab124d53..294df6cf2 100644 --- a/src/types/path.rs +++ b/src/types/path.rs @@ -20,7 +20,7 @@ use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest}; /// // extract path info from "/{name}/{count}/index.html" into tuple /// // {name} - deserialize a String /// // {count} - deserialize a u32 -/// #[get("/")] +/// #[get("/{name}/{count}/index.html")] /// async fn index(path: web::Path<(String, u32)>) -> String { /// let (name, count) = path.into_inner(); /// format!("Welcome {}! {}", name, count) @@ -40,7 +40,7 @@ use crate::{dev::Payload, error::PathError, FromRequest, HttpRequest}; /// } /// /// // extract `Info` from a path using serde -/// #[get("/")] +/// #[get("/{name}")] /// async fn index(info: web::Path) -> String { /// format!("Welcome {}!", info.name) /// }