Merge branch 'master' into feat/awc_response_timeout

This commit is contained in:
fakeshadow 2021-02-07 13:23:21 -08:00 committed by GitHub
commit 9c3328f338
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 69 additions and 189 deletions

View File

@ -1,4 +1,4 @@
name: Benchmark (Linux) name: Benchmark
on: on:
pull_request: pull_request:

View File

@ -1,32 +1,39 @@
name: Lint
on: on:
pull_request: pull_request:
types: [opened, synchronize, reopened] types: [opened, synchronize, reopened]
name: Clippy and rustfmt Check
jobs: jobs:
clippy_check: fmt:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1 - name: Install Rust
uses: actions-rs/toolchain@v1
with: with:
toolchain: stable toolchain: stable
components: rustfmt components: rustfmt
override: true
- name: Check with rustfmt - name: Check with rustfmt
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: fmt command: fmt
args: --all -- --check args: --all -- --check
- uses: actions-rs/toolchain@v1 clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with: with:
toolchain: nightly toolchain: stable
components: clippy components: clippy
override: true override: true
- name: Check with Clippy - name: Check with Clippy
uses: actions-rs/clippy-check@v1 uses: actions-rs/clippy-check@v1
with: with:
token: ${{ secrets.GITHUB_TOKEN }} token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features --all --tests args: --workspace --tests --all-features

View File

@ -24,7 +24,7 @@ jobs:
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: doc command: doc
args: --no-deps --workspace --all-features args: --workspace --all-features --no-deps
- name: Tweak HTML - name: Tweak HTML
run: echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html run: echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html

View File

@ -223,15 +223,3 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
Ok(()) Ok(())
} }
} }
pub struct Writer<'a>(pub &'a mut BytesMut);
impl<'a> io::Write for Writer<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

View File

@ -337,7 +337,7 @@ impl MessageType for RequestHeadType {
let head = self.as_ref(); let head = self.as_ref();
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
write!( write!(
Writer(dst), helpers::Writer(dst),
"{} {} {}", "{} {} {}",
head.method, head.method,
head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"),
@ -470,7 +470,7 @@ impl TransferEncoding {
*eof = true; *eof = true;
buf.extend_from_slice(b"0\r\n\r\n"); buf.extend_from_slice(b"0\r\n\r\n");
} else { } else {
writeln!(Writer(buf), "{:X}\r", msg.len()) writeln!(helpers::Writer(buf), "{:X}\r", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
buf.reserve(msg.len() + 2); buf.reserve(msg.len() + 2);
@ -520,18 +520,6 @@ impl TransferEncoding {
} }
} }
struct Writer<'a>(pub &'a mut BytesMut);
impl<'a> io::Write for Writer<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// # Safety /// # Safety
/// Callers must ensure that the given length matches given value length. /// Callers must ensure that the given length matches given value length.
unsafe fn write_data(value: &[u8], buf: *mut u8, len: usize) { unsafe fn write_data(value: &[u8], buf: *mut u8, len: usize) {

View File

@ -343,6 +343,8 @@ impl ResponseHead {
} }
pub struct Message<T: Head> { pub struct Message<T: Head> {
// Rc here should not be cloned by anyone.
// It's used to reuse allocation of T and no shared ownership is allowed.
head: Rc<T>, head: Rc<T>,
} }
@ -353,14 +355,6 @@ impl<T: Head> Message<T> {
} }
} }
impl<T: Head> Clone for Message<T> {
fn clone(&self) -> Self {
Message {
head: self.head.clone(),
}
}
}
impl<T: Head> std::ops::Deref for Message<T> { impl<T: Head> std::ops::Deref for Message<T> {
type Target = T; type Target = T;
@ -377,9 +371,7 @@ impl<T: Head> std::ops::DerefMut for Message<T> {
impl<T: Head> Drop for Message<T> { impl<T: Head> Drop for Message<T> {
fn drop(&mut self) { fn drop(&mut self) {
if Rc::strong_count(&self.head) == 1 { T::with_pool(|p| p.release(self.head.clone()))
T::with_pool(|p| p.release(self.head.clone()))
}
} }
} }

View File

@ -1,71 +1,39 @@
use std::future::Future; use std::{
use std::pin::Pin; fmt, io, net,
use std::rc::Rc; pin::Pin,
use std::task::{Context, Poll}; task::{Context, Poll},
use std::{fmt, io, net}; };
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
use actix_http::body::Body; use actix_http::{
use actix_http::client::{ body::Body,
Connect as ClientConnect, ConnectError, Connection, SendRequestError, client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError},
h1::ClientCodec,
RequestHead, RequestHeadType, ResponseHead,
}; };
use actix_http::h1::ClientCodec;
use actix_http::http::HeaderMap;
use actix_http::{RequestHead, RequestHeadType, ResponseHead};
use actix_service::Service; use actix_service::Service;
use futures_core::future::LocalBoxFuture;
use crate::response::ClientResponse; use crate::response::ClientResponse;
pub(crate) struct ConnectorWrapper<T>(pub T); pub(crate) struct ConnectorWrapper<T>(pub T);
type TunnelResponse = (ResponseHead, Framed<BoxedSocket, ClientCodec>);
pub(crate) trait Connect { pub(crate) trait Connect {
fn send_request( fn send_request(
&self, &self,
head: RequestHead, head: RequestHeadType,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>; ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>;
fn send_request_extra(
&self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
body: Body,
addr: Option<net::SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel( fn open_tunnel(
&self, &self,
head: RequestHead, head: RequestHead,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Pin< ) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>>;
Box<
dyn Future<
Output = Result<
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
SendRequestError,
>,
>,
>,
>;
/// Send request and extra headers, returns Response and Framed
fn open_tunnel_extra(
&self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>,
) -> Pin<
Box<
dyn Future<
Output = Result<
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
SendRequestError,
>,
>,
>,
>;
} }
impl<T> Connect for ConnectorWrapper<T> impl<T> Connect for ConnectorWrapper<T>
@ -79,13 +47,13 @@ where
{ {
fn send_request( fn send_request(
&self, &self,
head: RequestHead, head: RequestHeadType,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>> { ) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>> {
// connect to the host // connect to the host
let fut = self.0.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.as_ref().uri.clone(),
addr, addr,
}); });
@ -93,33 +61,7 @@ where
let connection = fut.await?; let connection = fut.await?;
// send request // send request
connection let (head, payload) = connection.send_request(head, body).await?;
.send_request(RequestHeadType::from(head), body)
.await
.map(|(head, payload)| ClientResponse::new(head, payload))
})
}
fn send_request_extra(
&self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
body: Body,
addr: Option<net::SocketAddr>,
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>> {
// connect to the host
let fut = self.0.call(ClientConnect {
uri: head.uri.clone(),
addr,
});
Box::pin(async move {
let connection = fut.await?;
// send request
let (head, payload) = connection
.send_request(RequestHeadType::Rc(head, extra_headers), body)
.await?;
Ok(ClientResponse::new(head, payload)) Ok(ClientResponse::new(head, payload))
}) })
@ -129,16 +71,7 @@ where
&self, &self,
head: RequestHead, head: RequestHead,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Pin< ) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>> {
Box<
dyn Future<
Output = Result<
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
SendRequestError,
>,
>,
>,
> {
// connect to the host // connect to the host
let fut = self.0.call(ClientConnect { let fut = self.0.call(ClientConnect {
uri: head.uri.clone(), uri: head.uri.clone(),
@ -156,40 +89,6 @@ where
Ok((head, framed)) Ok((head, framed))
}) })
} }
fn open_tunnel_extra(
&self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>,
) -> Pin<
Box<
dyn Future<
Output = Result<
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
SendRequestError,
>,
>,
>,
> {
// connect to the host
let fut = self.0.call(ClientConnect {
uri: head.uri.clone(),
addr,
});
Box::pin(async move {
let connection = fut.await?;
// send request
let (head, framed) = connection
.open_tunnel(RequestHeadType::Rc(head, extra_headers))
.await?;
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
Ok((head, framed))
})
}
} }
trait AsyncSocket { trait AsyncSocket {

View File

@ -1,18 +1,24 @@
use std::future::Future; use std::{
use std::net; future::Future,
use std::pin::Pin; net,
use std::rc::Rc; pin::Pin,
use std::task::{Context, Poll}; rc::Rc,
use std::time::Duration; task::{Context, Poll},
time::Duration,
};
use actix_http::body::{Body, BodyStream}; use actix_http::{
use actix_http::http::header::{self, IntoHeaderValue}; body::{Body, BodyStream},
use actix_http::http::{Error as HttpError, HeaderMap, HeaderName}; http::{
use actix_http::{Error, RequestHead}; header::{self, HeaderMap, HeaderName, IntoHeaderValue},
Error as HttpError,
},
Error, RequestHead, RequestHeadType,
};
use actix_rt::time::{sleep, Sleep}; use actix_rt::time::{sleep, Sleep};
use bytes::Bytes; use bytes::Bytes;
use derive_more::From; use derive_more::From;
use futures_core::{ready, Stream}; use futures_core::Stream;
use serde::Serialize; use serde::Serialize;
#[cfg(feature = "compress")] #[cfg(feature = "compress")]
@ -181,12 +187,16 @@ impl RequestSender {
B: Into<Body>, B: Into<Body>,
{ {
let fut = match self { let fut = match self {
RequestSender::Owned(head) => { RequestSender::Owned(head) => config.connector.send_request(
config.connector.send_request(head, body.into(), addr) RequestHeadType::Owned(head),
} body.into(),
RequestSender::Rc(head, extra_headers) => config addr,
.connector ),
.send_request_extra(head, extra_headers, body.into(), addr), RequestSender::Rc(head, extra_headers) => config.connector.send_request(
RequestHeadType::Rc(head, extra_headers),
body.into(),
addr,
),
}; };
SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout)) SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout))

View File

@ -106,7 +106,6 @@ mod tests {
HttpResponse, HttpResponse,
}; };
#[allow(clippy::unnecessary_wraps)]
fn render_500<B>(mut res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> { fn render_500<B>(mut res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> {
res.response_mut() res.response_mut()
.headers_mut() .headers_mut()

View File

@ -182,7 +182,6 @@ mod tests {
use crate::test::{self, TestRequest}; use crate::test::{self, TestRequest};
use crate::HttpResponse; use crate::HttpResponse;
#[allow(clippy::unnecessary_wraps)]
fn render_500<B>(mut res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> { fn render_500<B>(mut res: ServiceResponse<B>) -> Result<ErrorHandlerResponse<B>> {
res.response_mut() res.response_mut()
.headers_mut() .headers_mut()
@ -207,7 +206,6 @@ mod tests {
assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "0001"); assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "0001");
} }
#[allow(clippy::unnecessary_wraps)]
fn render_500_async<B: 'static>( fn render_500_async<B: 'static>(
mut res: ServiceResponse<B>, mut res: ServiceResponse<B>,
) -> Result<ErrorHandlerResponse<B>> { ) -> Result<ErrorHandlerResponse<B>> {

View File

@ -72,7 +72,6 @@ async fn test_start() {
let _ = sys.stop(); let _ = sys.stop();
} }
#[allow(clippy::unnecessary_wraps)]
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
fn ssl_acceptor() -> std::io::Result<SslAcceptorBuilder> { fn ssl_acceptor() -> std::io::Result<SslAcceptorBuilder> {
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};