mirror of https://github.com/fafhrd91/actix-web
Merge branch 'master' into feat/io-uring
This commit is contained in:
commit
3de6d6713a
|
@ -1,6 +1,8 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Changed
|
||||||
|
* `ContentType::html` now returns `Content-Type: text/html; charset=utf-8` instead of `Content-Type: text/html`.
|
||||||
|
|
||||||
|
|
||||||
## 4.0.0-beta.10 - 2021-10-20
|
## 4.0.0-beta.10 - 2021-10-20
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
### Removed
|
||||||
|
* `client` module. [#2425]
|
||||||
|
* `trust-dns` feature. [#2425]
|
||||||
|
|
||||||
|
[#2425]: https://github.com/actix/actix-web/pull/2425
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.11 - 2021-10-20
|
## 3.0.0-beta.11 - 2021-10-20
|
||||||
|
|
|
@ -27,19 +27,16 @@ path = "src/lib.rs"
|
||||||
default = []
|
default = []
|
||||||
|
|
||||||
# openssl
|
# openssl
|
||||||
openssl = ["actix-tls/openssl"]
|
openssl = ["actix-tls/accept", "actix-tls/openssl"]
|
||||||
|
|
||||||
# rustls support
|
# rustls support
|
||||||
rustls = ["actix-tls/rustls"]
|
rustls = ["actix-tls/accept", "actix-tls/rustls"]
|
||||||
|
|
||||||
# enable compression support
|
# enable compression support
|
||||||
compress-brotli = ["brotli2", "__compress"]
|
compress-brotli = ["brotli2", "__compress"]
|
||||||
compress-gzip = ["flate2", "__compress"]
|
compress-gzip = ["flate2", "__compress"]
|
||||||
compress-zstd = ["zstd", "__compress"]
|
compress-zstd = ["zstd", "__compress"]
|
||||||
|
|
||||||
# trust-dns as client dns resolver
|
|
||||||
trust-dns = ["trust-dns-resolver"]
|
|
||||||
|
|
||||||
# Internal (PRIVATE!) features used to aid testing and cheking feature status.
|
# Internal (PRIVATE!) features used to aid testing and cheking feature status.
|
||||||
# Don't rely on these whatsoever. They may disappear at anytime.
|
# Don't rely on these whatsoever. They may disappear at anytime.
|
||||||
__compress = []
|
__compress = []
|
||||||
|
@ -49,7 +46,6 @@ actix-service = "2.0.0"
|
||||||
actix-codec = "0.4.0"
|
actix-codec = "0.4.0"
|
||||||
actix-utils = "3.0.0"
|
actix-utils = "3.0.0"
|
||||||
actix-rt = "2.2"
|
actix-rt = "2.2"
|
||||||
actix-tls = { version = "3.0.0-beta.7", features = ["accept", "connect"] }
|
|
||||||
|
|
||||||
ahash = "0.7"
|
ahash = "0.7"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
|
@ -67,7 +63,6 @@ httpdate = "1.0.1"
|
||||||
itoa = "0.4"
|
itoa = "0.4"
|
||||||
language-tags = "0.3"
|
language-tags = "0.3"
|
||||||
local-channel = "0.1"
|
local-channel = "0.1"
|
||||||
once_cell = "1.5"
|
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mime = "0.3"
|
mime = "0.3"
|
||||||
percent-encoding = "2.1"
|
percent-encoding = "2.1"
|
||||||
|
@ -78,13 +73,14 @@ sha-1 = "0.9"
|
||||||
smallvec = "1.6.1"
|
smallvec = "1.6.1"
|
||||||
tokio = { version = "1.2", features = ["sync"] }
|
tokio = { version = "1.2", features = ["sync"] }
|
||||||
|
|
||||||
|
# tls
|
||||||
|
actix-tls = { version = "3.0.0-beta.7", default-features = false, optional = true }
|
||||||
|
|
||||||
# compression
|
# compression
|
||||||
brotli2 = { version="0.3.2", optional = true }
|
brotli2 = { version="0.3.2", optional = true }
|
||||||
flate2 = { version = "1.0.13", optional = true }
|
flate2 = { version = "1.0.13", optional = true }
|
||||||
zstd = { version = "0.7", optional = true }
|
zstd = { version = "0.7", optional = true }
|
||||||
|
|
||||||
trust-dns-resolver = { version = "0.20.0", optional = true }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-server = "2.0.0-beta.3"
|
actix-server = "2.0.0-beta.3"
|
||||||
actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] }
|
actix-http-test = { version = "3.0.0-beta.5", features = ["openssl"] }
|
||||||
|
|
|
@ -10,11 +10,15 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
|
use actix_rt::time::Sleep;
|
||||||
use actix_service::Service;
|
use actix_service::Service;
|
||||||
use actix_utils::future::poll_fn;
|
use actix_utils::future::poll_fn;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use h2::server::{Connection, SendResponse};
|
use h2::{
|
||||||
|
server::{Connection, SendResponse},
|
||||||
|
Ping, PingPong,
|
||||||
|
};
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
|
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
|
||||||
use log::{error, trace};
|
use log::{error, trace};
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
@ -36,29 +40,46 @@ pin_project! {
|
||||||
on_connect_data: OnConnectData,
|
on_connect_data: OnConnectData,
|
||||||
config: ServiceConfig,
|
config: ServiceConfig,
|
||||||
peer_addr: Option<net::SocketAddr>,
|
peer_addr: Option<net::SocketAddr>,
|
||||||
_phantom: PhantomData<B>,
|
ping_pong: Option<H2PingPong>,
|
||||||
|
_phantom: PhantomData<B>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> {
|
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
|
||||||
|
where
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
flow: Rc<HttpFlow<S, X, U>>,
|
flow: Rc<HttpFlow<S, X, U>>,
|
||||||
connection: Connection<T, Bytes>,
|
mut connection: Connection<T, Bytes>,
|
||||||
on_connect_data: OnConnectData,
|
on_connect_data: OnConnectData,
|
||||||
config: ServiceConfig,
|
config: ServiceConfig,
|
||||||
peer_addr: Option<net::SocketAddr>,
|
peer_addr: Option<net::SocketAddr>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let ping_pong = config.keep_alive_timer().map(|timer| H2PingPong {
|
||||||
|
timer: Box::pin(timer),
|
||||||
|
on_flight: false,
|
||||||
|
ping_pong: connection.ping_pong().unwrap(),
|
||||||
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
flow,
|
flow,
|
||||||
config,
|
config,
|
||||||
peer_addr,
|
peer_addr,
|
||||||
connection,
|
connection,
|
||||||
on_connect_data,
|
on_connect_data,
|
||||||
|
ping_pong,
|
||||||
_phantom: PhantomData,
|
_phantom: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct H2PingPong {
|
||||||
|
timer: Pin<Box<Sleep>>,
|
||||||
|
on_flight: bool,
|
||||||
|
ping_pong: PingPong,
|
||||||
|
}
|
||||||
|
|
||||||
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
|
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
T: AsyncRead + AsyncWrite + Unpin,
|
||||||
|
@ -77,54 +98,92 @@ where
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
|
|
||||||
while let Some((req, tx)) =
|
loop {
|
||||||
ready!(Pin::new(&mut this.connection).poll_accept(cx)?)
|
match Pin::new(&mut this.connection).poll_accept(cx)? {
|
||||||
{
|
Poll::Ready(Some((req, tx))) => {
|
||||||
let (parts, body) = req.into_parts();
|
let (parts, body) = req.into_parts();
|
||||||
let pl = crate::h2::Payload::new(body);
|
let pl = crate::h2::Payload::new(body);
|
||||||
let pl = Payload::<crate::payload::PayloadStream>::H2(pl);
|
let pl = Payload::<crate::payload::PayloadStream>::H2(pl);
|
||||||
let mut req = Request::with_payload(pl);
|
let mut req = Request::with_payload(pl);
|
||||||
|
|
||||||
let head = req.head_mut();
|
let head = req.head_mut();
|
||||||
head.uri = parts.uri;
|
head.uri = parts.uri;
|
||||||
head.method = parts.method;
|
head.method = parts.method;
|
||||||
head.version = parts.version;
|
head.version = parts.version;
|
||||||
head.headers = parts.headers.into();
|
head.headers = parts.headers.into();
|
||||||
head.peer_addr = this.peer_addr;
|
head.peer_addr = this.peer_addr;
|
||||||
|
|
||||||
// merge on_connect_ext data into request extensions
|
// merge on_connect_ext data into request extensions
|
||||||
this.on_connect_data.merge_into(&mut req);
|
this.on_connect_data.merge_into(&mut req);
|
||||||
|
|
||||||
let fut = this.flow.service.call(req);
|
let fut = this.flow.service.call(req);
|
||||||
let config = this.config.clone();
|
let config = this.config.clone();
|
||||||
|
|
||||||
// multiplex request handling with spawn task
|
// multiplex request handling with spawn task
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
// resolve service call and send response.
|
// resolve service call and send response.
|
||||||
let res = match fut.await {
|
let res = match fut.await {
|
||||||
Ok(res) => handle_response(res.into(), tx, config).await,
|
Ok(res) => handle_response(res.into(), tx, config).await,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let res: Response<AnyBody> = err.into();
|
let res: Response<AnyBody> = err.into();
|
||||||
handle_response(res, tx, config).await
|
handle_response(res, tx, config).await
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// log error.
|
// log error.
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
match err {
|
match err {
|
||||||
DispatchError::SendResponse(err) => {
|
DispatchError::SendResponse(err) => {
|
||||||
trace!("Error sending HTTP/2 response: {:?}", err)
|
trace!("Error sending HTTP/2 response: {:?}", err)
|
||||||
|
}
|
||||||
|
DispatchError::SendData(err) => warn!("{:?}", err),
|
||||||
|
DispatchError::ResponseBody(err) => {
|
||||||
|
error!("Response payload stream error: {:?}", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
DispatchError::SendData(err) => warn!("{:?}", err),
|
});
|
||||||
DispatchError::ResponseBody(err) => {
|
|
||||||
error!("Response payload stream error: {:?}", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
Poll::Ready(None) => return Poll::Ready(Ok(())),
|
||||||
}
|
Poll::Pending => match this.ping_pong.as_mut() {
|
||||||
|
Some(ping_pong) => loop {
|
||||||
|
if ping_pong.on_flight {
|
||||||
|
// When have on flight ping pong. poll pong and and keep alive timer.
|
||||||
|
// on success pong received update keep alive timer to determine the next timing of
|
||||||
|
// ping pong.
|
||||||
|
match ping_pong.ping_pong.poll_pong(cx)? {
|
||||||
|
Poll::Ready(_) => {
|
||||||
|
ping_pong.on_flight = false;
|
||||||
|
|
||||||
Poll::Ready(Ok(()))
|
let dead_line =
|
||||||
|
this.config.keep_alive_expire().unwrap();
|
||||||
|
ping_pong.timer.as_mut().reset(dead_line);
|
||||||
|
}
|
||||||
|
Poll::Pending => {
|
||||||
|
return ping_pong
|
||||||
|
.timer
|
||||||
|
.as_mut()
|
||||||
|
.poll(cx)
|
||||||
|
.map(|_| Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// When there is no on flight ping pong. keep alive timer is used to wait for next
|
||||||
|
// timing of ping pong. Therefore at this point it serves as an interval instead.
|
||||||
|
ready!(ping_pong.timer.as_mut().poll(cx));
|
||||||
|
|
||||||
|
ping_pong.ping_pong.send_ping(Ping::opaque())?;
|
||||||
|
|
||||||
|
let dead_line = this.config.keep_alive_expire().unwrap();
|
||||||
|
ping_pong.timer.as_mut().reset(dead_line);
|
||||||
|
|
||||||
|
ping_pong.on_flight = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => return Poll::Pending,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,6 @@ extern crate log;
|
||||||
|
|
||||||
pub mod body;
|
pub mod body;
|
||||||
mod builder;
|
mod builder;
|
||||||
pub mod client;
|
|
||||||
mod config;
|
mod config;
|
||||||
|
|
||||||
#[cfg(feature = "__compress")]
|
#[cfg(feature = "__compress")]
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
use std::io;
|
||||||
|
|
||||||
|
use actix_http::{error::Error, HttpService, Response};
|
||||||
|
use actix_server::Server;
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn h2_ping_pong() -> io::Result<()> {
|
||||||
|
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
||||||
|
|
||||||
|
let lst = std::net::TcpListener::bind("127.0.0.1:0")?;
|
||||||
|
|
||||||
|
let addr = lst.local_addr().unwrap();
|
||||||
|
|
||||||
|
let join = std::thread::spawn(move || {
|
||||||
|
actix_rt::System::new().block_on(async move {
|
||||||
|
let handle = Server::build()
|
||||||
|
.disable_signals()
|
||||||
|
.workers(1)
|
||||||
|
.listen("h2_ping_pong", lst, || {
|
||||||
|
HttpService::build()
|
||||||
|
.keep_alive(3)
|
||||||
|
.h2(|_| async { Ok::<_, Error>(Response::ok()) })
|
||||||
|
.tcp()
|
||||||
|
})?
|
||||||
|
.run();
|
||||||
|
|
||||||
|
tx.send(handle.clone()).unwrap();
|
||||||
|
|
||||||
|
handle.await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let handle = rx.recv().unwrap();
|
||||||
|
|
||||||
|
let (sync_tx, rx) = std::sync::mpsc::sync_channel(1);
|
||||||
|
|
||||||
|
// use a separate thread for h2 client so it can be blocked.
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(async move {
|
||||||
|
let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||||
|
|
||||||
|
let (mut tx, conn) = h2::client::handshake(stream).await.unwrap();
|
||||||
|
|
||||||
|
tokio::spawn(async move { conn.await.unwrap() });
|
||||||
|
|
||||||
|
let (res, _) = tx.send_request(::http::Request::new(()), true).unwrap();
|
||||||
|
let res = res.await.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(res.status().as_u16(), 200);
|
||||||
|
|
||||||
|
sync_tx.send(()).unwrap();
|
||||||
|
|
||||||
|
// intentionally block the client thread so it can not answer ping pong.
|
||||||
|
std::thread::sleep(std::time::Duration::from_secs(1000));
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
rx.recv().unwrap();
|
||||||
|
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
|
// stop server gracefully. this step would take up to 30 seconds.
|
||||||
|
handle.stop(true).await;
|
||||||
|
|
||||||
|
// join server thread. only when connection are all gone this step would finish.
|
||||||
|
join.join().unwrap()?;
|
||||||
|
|
||||||
|
// check the time used for join server thread so it's known that the server shutdown
|
||||||
|
// is from keep alive and not server graceful shutdown timeout.
|
||||||
|
assert!(now.elapsed() < std::time::Duration::from_secs(30));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -22,10 +22,10 @@ edition = "2018"
|
||||||
default = []
|
default = []
|
||||||
|
|
||||||
# rustls
|
# rustls
|
||||||
rustls = ["tls-rustls", "actix-http/rustls"]
|
rustls = ["tls-rustls", "actix-http/rustls", "awc/rustls"]
|
||||||
|
|
||||||
# openssl
|
# openssl
|
||||||
openssl = ["tls-openssl", "actix-http/openssl"]
|
openssl = ["tls-openssl", "actix-http/openssl", "awc/openssl"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.4.0"
|
actix-codec = "0.4.0"
|
||||||
|
|
|
@ -30,10 +30,10 @@ features = ["openssl", "rustls", "compress-brotli", "compress-gzip", "compress-z
|
||||||
default = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"]
|
default = ["compress-brotli", "compress-gzip", "compress-zstd", "cookies"]
|
||||||
|
|
||||||
# openssl
|
# openssl
|
||||||
openssl = ["tls-openssl", "actix-http/openssl"]
|
openssl = ["tls-openssl", "actix-tls/openssl"]
|
||||||
|
|
||||||
# rustls
|
# rustls
|
||||||
rustls = ["tls-rustls", "actix-http/rustls"]
|
rustls = ["tls-rustls", "actix-tls/rustls"]
|
||||||
|
|
||||||
# Brotli algorithm content-encoding support
|
# Brotli algorithm content-encoding support
|
||||||
compress-brotli = ["actix-http/compress-brotli", "__compress"]
|
compress-brotli = ["actix-http/compress-brotli", "__compress"]
|
||||||
|
@ -46,7 +46,7 @@ compress-zstd = ["actix-http/compress-zstd", "__compress"]
|
||||||
cookies = ["cookie"]
|
cookies = ["cookie"]
|
||||||
|
|
||||||
# trust-dns as dns resolver
|
# trust-dns as dns resolver
|
||||||
trust-dns = ["actix-http/trust-dns"]
|
trust-dns = ["trust-dns-resolver"]
|
||||||
|
|
||||||
# Internal (PRIVATE!) features used to aid testing and cheking feature status.
|
# Internal (PRIVATE!) features used to aid testing and cheking feature status.
|
||||||
# Don't rely on these whatsoever. They may disappear at anytime.
|
# Don't rely on these whatsoever. They may disappear at anytime.
|
||||||
|
@ -57,13 +57,18 @@ actix-codec = "0.4.0"
|
||||||
actix-service = "2.0.0"
|
actix-service = "2.0.0"
|
||||||
actix-http = "3.0.0-beta.11"
|
actix-http = "3.0.0-beta.11"
|
||||||
actix-rt = { version = "2.1", default-features = false }
|
actix-rt = { version = "2.1", default-features = false }
|
||||||
|
actix-tls = { version = "3.0.0-beta.7", features = ["connect"] }
|
||||||
|
actix-utils = "3.0.0"
|
||||||
|
|
||||||
|
ahash = "0.7"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
cookie = { version = "0.15", features = ["percent-encode"], optional = true }
|
|
||||||
derive_more = "0.99.5"
|
derive_more = "0.99.5"
|
||||||
futures-core = { version = "0.3.7", default-features = false }
|
futures-core = { version = "0.3.7", default-features = false }
|
||||||
|
futures-util = { version = "0.3.7", default-features = false }
|
||||||
|
h2 = "0.3"
|
||||||
|
http = "0.2"
|
||||||
itoa = "0.4"
|
itoa = "0.4"
|
||||||
log =" 0.4"
|
log =" 0.4"
|
||||||
mime = "0.3"
|
mime = "0.3"
|
||||||
|
@ -73,9 +78,15 @@ rand = "0.8"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
serde_urlencoded = "0.7"
|
serde_urlencoded = "0.7"
|
||||||
|
tokio = { version = "1", features = ["sync"] }
|
||||||
|
|
||||||
|
cookie = { version = "0.15", features = ["percent-encode"], optional = true }
|
||||||
|
|
||||||
tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
|
tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
|
||||||
tls-rustls = { package = "rustls", version = "0.20.0", optional = true, features = ["dangerous_configuration"] }
|
tls-rustls = { package = "rustls", version = "0.20.0", optional = true, features = ["dangerous_configuration"] }
|
||||||
|
|
||||||
|
trust-dns-resolver = { version = "0.20.0", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-web = { version = "4.0.0-beta.10", features = ["openssl"] }
|
actix-web = { version = "4.0.0-beta.10", features = ["openssl"] }
|
||||||
actix-http = { version = "3.0.0-beta.11", features = ["openssl"] }
|
actix-http = { version = "3.0.0-beta.11", features = ["openssl"] }
|
||||||
|
|
|
@ -4,13 +4,11 @@ use std::net::IpAddr;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri};
|
||||||
client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection},
|
|
||||||
http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri},
|
|
||||||
};
|
|
||||||
use actix_rt::net::{ActixStream, TcpStream};
|
use actix_rt::net::{ActixStream, TcpStream};
|
||||||
use actix_service::{boxed, Service};
|
use actix_service::{boxed, Service};
|
||||||
|
|
||||||
|
use crate::client::{Connector, ConnectorService, TcpConnect, TcpConnectError, TcpConnection};
|
||||||
use crate::connect::DefaultConnector;
|
use crate::connect::DefaultConnector;
|
||||||
use crate::error::SendRequestError;
|
use crate::error::SendRequestError;
|
||||||
use crate::middleware::{NestTransform, Redirect, Transform};
|
use crate::middleware::{NestTransform, Redirect, Transform};
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::net::IpAddr;
|
use std::{net::IpAddr, time::Duration};
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2MB
|
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2MB
|
||||||
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB
|
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024; // 1MB
|
|
@ -12,10 +12,9 @@ use bytes::Bytes;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
use h2::client::SendRequest;
|
use h2::client::SendRequest;
|
||||||
|
|
||||||
use crate::h1::ClientCodec;
|
use actix_http::{
|
||||||
use crate::message::{RequestHeadType, ResponseHead};
|
body::MessageBody, h1::ClientCodec, Error, Payload, RequestHeadType, ResponseHead,
|
||||||
use crate::payload::Payload;
|
};
|
||||||
use crate::{body::MessageBody, Error};
|
|
||||||
|
|
||||||
use super::error::SendRequestError;
|
use super::error::SendRequestError;
|
||||||
use super::pool::Acquired;
|
use super::pool::Acquired;
|
||||||
|
@ -219,11 +218,7 @@ impl<Io: ConnectionIo> ConnectionType<Io> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn from_h1(
|
pub(super) fn from_h1(io: Io, created: time::Instant, acquired: Acquired<Io>) -> Self {
|
||||||
io: Io,
|
|
||||||
created: time::Instant,
|
|
||||||
acquired: Acquired<Io>,
|
|
||||||
) -> Self {
|
|
||||||
Self::H1(H1Connection {
|
Self::H1(H1Connection {
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
created,
|
created,
|
||||||
|
@ -271,9 +266,7 @@ where
|
||||||
Connection::Tls(ConnectionType::H2(conn)) => {
|
Connection::Tls(ConnectionType::H2(conn)) => {
|
||||||
h2proto::send_request(conn, head.into(), body).await
|
h2proto::send_request(conn, head.into(), body).await
|
||||||
}
|
}
|
||||||
_ => unreachable!(
|
_ => unreachable!("Plain Tcp connection can be used only in Http1 protocol"),
|
||||||
"Plain Tcp connection can be used only in Http1 protocol"
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -301,9 +294,7 @@ where
|
||||||
Err(SendRequestError::TunnelNotSupported)
|
Err(SendRequestError::TunnelNotSupported)
|
||||||
}
|
}
|
||||||
Connection::Tcp(ConnectionType::H2(_)) => {
|
Connection::Tcp(ConnectionType::H2(_)) => {
|
||||||
unreachable!(
|
unreachable!("Plain Tcp connection can be used only in Http1 protocol")
|
||||||
"Plain Tcp connection can be used only in Http1 protocol"
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -321,12 +312,8 @@ where
|
||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<()>> {
|
) -> Poll<io::Result<()>> {
|
||||||
match self.get_mut() {
|
match self.get_mut() {
|
||||||
Connection::Tcp(ConnectionType::H1(conn)) => {
|
Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf),
|
||||||
Pin::new(conn).poll_read(cx, buf)
|
Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_read(cx, buf),
|
||||||
}
|
|
||||||
Connection::Tls(ConnectionType::H1(conn)) => {
|
|
||||||
Pin::new(conn).poll_read(cx, buf)
|
|
||||||
}
|
|
||||||
_ => unreachable!("H2Connection can not impl AsyncRead trait"),
|
_ => unreachable!("H2Connection can not impl AsyncRead trait"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -345,12 +332,8 @@ where
|
||||||
buf: &[u8],
|
buf: &[u8],
|
||||||
) -> Poll<io::Result<usize>> {
|
) -> Poll<io::Result<usize>> {
|
||||||
match self.get_mut() {
|
match self.get_mut() {
|
||||||
Connection::Tcp(ConnectionType::H1(conn)) => {
|
Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf),
|
||||||
Pin::new(conn).poll_write(cx, buf)
|
Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_write(cx, buf),
|
||||||
}
|
|
||||||
Connection::Tls(ConnectionType::H1(conn)) => {
|
|
||||||
Pin::new(conn).poll_write(cx, buf)
|
|
||||||
}
|
|
||||||
_ => unreachable!(H2_UNREACHABLE_WRITE),
|
_ => unreachable!(H2_UNREACHABLE_WRITE),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -363,17 +346,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_shutdown(
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
match self.get_mut() {
|
match self.get_mut() {
|
||||||
Connection::Tcp(ConnectionType::H1(conn)) => {
|
Connection::Tcp(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx),
|
||||||
Pin::new(conn).poll_shutdown(cx)
|
Connection::Tls(ConnectionType::H1(conn)) => Pin::new(conn).poll_shutdown(cx),
|
||||||
}
|
|
||||||
Connection::Tls(ConnectionType::H1(conn)) => {
|
|
||||||
Pin::new(conn).poll_shutdown(cx)
|
|
||||||
}
|
|
||||||
_ => unreachable!(H2_UNREACHABLE_WRITE),
|
_ => unreachable!(H2_UNREACHABLE_WRITE),
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -8,6 +8,7 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use actix_http::Protocol;
|
||||||
use actix_rt::{
|
use actix_rt::{
|
||||||
net::{ActixStream, TcpStream},
|
net::{ActixStream, TcpStream},
|
||||||
time::{sleep, Sleep},
|
time::{sleep, Sleep},
|
||||||
|
@ -19,14 +20,13 @@ use actix_tls::connect::{
|
||||||
};
|
};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use super::config::ConnectorConfig;
|
use super::config::ConnectorConfig;
|
||||||
use super::connection::{Connection, ConnectionIo};
|
use super::connection::{Connection, ConnectionIo};
|
||||||
use super::error::ConnectError;
|
use super::error::ConnectError;
|
||||||
use super::pool::ConnectionPool;
|
use super::pool::ConnectionPool;
|
||||||
use super::Connect;
|
use super::Connect;
|
||||||
use super::Protocol;
|
|
||||||
|
|
||||||
enum SslConnector {
|
enum SslConnector {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -99,9 +99,7 @@ impl Connector<()> {
|
||||||
/// Build TLS connector with openssl, based on supplied ALPN protocols
|
/// Build TLS connector with openssl, based on supplied ALPN protocols
|
||||||
#[cfg(all(feature = "openssl", not(feature = "rustls")))]
|
#[cfg(all(feature = "openssl", not(feature = "rustls")))]
|
||||||
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
|
||||||
use actix_tls::connect::tls::openssl::{
|
use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
|
||||||
SslConnector as OpensslConnector, SslMethod,
|
|
||||||
};
|
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
|
|
||||||
let mut alpn = BytesMut::with_capacity(20);
|
let mut alpn = BytesMut::with_capacity(20);
|
||||||
|
@ -112,7 +110,7 @@ impl Connector<()> {
|
||||||
|
|
||||||
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
if let Err(err) = ssl.set_alpn_protos(&alpn) {
|
if let Err(err) = ssl.set_alpn_protos(&alpn) {
|
||||||
error!("Can not set ALPN protocol: {:?}", err);
|
log::error!("Can not set ALPN protocol: {:?}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
SslConnector::Openssl(ssl.build())
|
SslConnector::Openssl(ssl.build())
|
||||||
|
@ -148,11 +146,8 @@ where
|
||||||
// This remap is to hide ActixStream's trait methods. They are not meant to be called
|
// This remap is to hide ActixStream's trait methods. They are not meant to be called
|
||||||
// from user code.
|
// from user code.
|
||||||
Io: ActixStream + fmt::Debug + 'static,
|
Io: ActixStream + fmt::Debug + 'static,
|
||||||
S: Service<
|
S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
|
||||||
TcpConnect<Uri>,
|
+ Clone
|
||||||
Response = TcpConnection<Uri, Io>,
|
|
||||||
Error = TcpConnectError,
|
|
||||||
> + Clone
|
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
/// Tcp connection timeout, i.e. max time to connect to remote host including dns name
|
/// Tcp connection timeout, i.e. max time to connect to remote host including dns name
|
||||||
|
@ -171,10 +166,7 @@ where
|
||||||
|
|
||||||
#[cfg(feature = "openssl")]
|
#[cfg(feature = "openssl")]
|
||||||
/// Use custom `SslConnector` instance.
|
/// Use custom `SslConnector` instance.
|
||||||
pub fn ssl(
|
pub fn ssl(mut self, connector: actix_tls::connect::ssl::openssl::SslConnector) -> Self {
|
||||||
mut self,
|
|
||||||
connector: actix_tls::connect::ssl::openssl::SslConnector,
|
|
||||||
) -> Self {
|
|
||||||
self.ssl = SslConnector::Openssl(connector);
|
self.ssl = SslConnector::Openssl(connector);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
@ -328,10 +320,11 @@ where
|
||||||
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> {
|
impl<Io: ConnectionIo> IntoConnectionIo for TcpConnection<Uri, TlsStream<Io>> {
|
||||||
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
|
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol) {
|
||||||
let sock = self.into_parts().0;
|
let sock = self.into_parts().0;
|
||||||
let h2 =
|
let h2 = sock
|
||||||
sock.get_ref().1.alpn_protocol().map_or(false, |protos| {
|
.get_ref()
|
||||||
protos.windows(2).any(|w| w == H2)
|
.1
|
||||||
});
|
.alpn_protocol()
|
||||||
|
.map_or(false, |protos| protos.windows(2).any(|w| w == H2));
|
||||||
if h2 {
|
if h2 {
|
||||||
(Box::new(sock), Protocol::Http2)
|
(Box::new(sock), Protocol::Http2)
|
||||||
} else {
|
} else {
|
||||||
|
@ -357,8 +350,8 @@ where
|
||||||
let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
|
let tcp_pool = ConnectionPool::new(tcp_service, tcp_config);
|
||||||
|
|
||||||
let tls_config = self.config;
|
let tls_config = self.config;
|
||||||
let tls_pool = tls_service
|
let tls_pool =
|
||||||
.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
|
tls_service.map(move |tls_service| ConnectionPool::new(tls_service, tls_config));
|
||||||
|
|
||||||
ConnectorServicePriv { tcp_pool, tls_pool }
|
ConnectorServicePriv { tcp_pool, tls_pool }
|
||||||
}
|
}
|
||||||
|
@ -389,10 +382,12 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
pub struct TcpConnectorFuture<Fut> {
|
#[project = TcpConnectorFutureProj]
|
||||||
#[pin]
|
pub struct TcpConnectorFuture<Fut> {
|
||||||
fut: Fut,
|
#[pin]
|
||||||
|
fut: Fut,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Fut, Io> Future for TcpConnectorFuture<Fut>
|
impl<Fut, Io> Future for TcpConnectorFuture<Fut>
|
||||||
|
@ -451,23 +446,25 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = TlsConnectorProj)]
|
pin_project! {
|
||||||
#[allow(clippy::large_enum_variant)]
|
#[project = TlsConnectorProj]
|
||||||
enum TlsConnectorFuture<S, Fut1, Fut2> {
|
#[allow(clippy::large_enum_variant)]
|
||||||
TcpConnect {
|
enum TlsConnectorFuture<S, Fut1, Fut2> {
|
||||||
#[pin]
|
TcpConnect {
|
||||||
fut: Fut1,
|
#[pin]
|
||||||
tls_service: Option<S>,
|
fut: Fut1,
|
||||||
timeout: Duration,
|
tls_service: Option<S>,
|
||||||
},
|
timeout: Duration,
|
||||||
TlsConnect {
|
},
|
||||||
#[pin]
|
TlsConnect {
|
||||||
fut: Fut2,
|
#[pin]
|
||||||
#[pin]
|
fut: Fut2,
|
||||||
timeout: Sleep,
|
#[pin]
|
||||||
},
|
timeout: Sleep,
|
||||||
}
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
/// helper trait for generic over different TlsStream types between tls crates.
|
/// helper trait for generic over different TlsStream types between tls crates.
|
||||||
trait IntoConnectionIo {
|
trait IntoConnectionIo {
|
||||||
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
|
fn into_connection_io(self) -> (Box<dyn ConnectionIo>, Protocol);
|
||||||
|
@ -475,12 +472,7 @@ trait IntoConnectionIo {
|
||||||
|
|
||||||
impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
|
impl<S, Io, Fut1, Fut2, Res> Future for TlsConnectorFuture<S, Fut1, Fut2>
|
||||||
where
|
where
|
||||||
S: Service<
|
S: Service<TcpConnection<Uri, Io>, Response = Res, Error = std::io::Error, Future = Fut2>,
|
||||||
TcpConnection<Uri, Io>,
|
|
||||||
Response = Res,
|
|
||||||
Error = std::io::Error,
|
|
||||||
Future = Fut2,
|
|
||||||
>,
|
|
||||||
S::Response: IntoConnectionIo,
|
S::Response: IntoConnectionIo,
|
||||||
Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
|
Fut1: Future<Output = Result<TcpConnection<Uri, Io>, ConnectError>>,
|
||||||
Fut2: Future<Output = Result<S::Response, S::Error>>,
|
Fut2: Future<Output = Result<S::Response, S::Error>>,
|
||||||
|
@ -522,11 +514,7 @@ pub struct TcpConnectorInnerService<S: Clone> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Clone> TcpConnectorInnerService<S> {
|
impl<S: Clone> TcpConnectorInnerService<S> {
|
||||||
fn new(
|
fn new(service: S, timeout: Duration, local_address: Option<std::net::IpAddr>) -> Self {
|
||||||
service: S,
|
|
||||||
timeout: Duration,
|
|
||||||
local_address: Option<std::net::IpAddr>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
service,
|
service,
|
||||||
timeout,
|
timeout,
|
||||||
|
@ -537,11 +525,8 @@ impl<S: Clone> TcpConnectorInnerService<S> {
|
||||||
|
|
||||||
impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
|
impl<S, Io> Service<Connect> for TcpConnectorInnerService<S>
|
||||||
where
|
where
|
||||||
S: Service<
|
S: Service<TcpConnect<Uri>, Response = TcpConnection<Uri, Io>, Error = TcpConnectError>
|
||||||
TcpConnect<Uri>,
|
+ Clone
|
||||||
Response = TcpConnection<Uri, Io>,
|
|
||||||
Error = TcpConnectError,
|
|
||||||
> + Clone
|
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
type Response = S::Response;
|
type Response = S::Response;
|
||||||
|
@ -564,12 +549,14 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
pub struct TcpConnectorInnerFuture<Fut> {
|
#[project = TcpConnectorInnerFutureProj]
|
||||||
#[pin]
|
pub struct TcpConnectorInnerFuture<Fut> {
|
||||||
fut: Fut,
|
#[pin]
|
||||||
#[pin]
|
fut: Fut,
|
||||||
timeout: Sleep,
|
#[pin]
|
||||||
|
timeout: Sleep,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
|
impl<Fut, Io> Future for TcpConnectorInnerFuture<Fut>
|
||||||
|
@ -618,12 +605,8 @@ where
|
||||||
|
|
||||||
impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
|
impl<S1, S2, Io1, Io2> Service<Connect> for ConnectorServicePriv<S1, S2, Io1, Io2>
|
||||||
where
|
where
|
||||||
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
|
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
|
||||||
+ Clone
|
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
|
||||||
+ 'static,
|
|
||||||
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
|
|
||||||
+ Clone
|
|
||||||
+ 'static,
|
|
||||||
Io1: ConnectionIo,
|
Io1: ConnectionIo,
|
||||||
Io2: ConnectionIo,
|
Io2: ConnectionIo,
|
||||||
{
|
{
|
||||||
|
@ -643,38 +626,46 @@ where
|
||||||
match req.uri.scheme_str() {
|
match req.uri.scheme_str() {
|
||||||
Some("https") | Some("wss") => match self.tls_pool {
|
Some("https") | Some("wss") => match self.tls_pool {
|
||||||
None => ConnectorServiceFuture::SslIsNotSupported,
|
None => ConnectorServiceFuture::SslIsNotSupported,
|
||||||
Some(ref pool) => ConnectorServiceFuture::Tls(pool.call(req)),
|
Some(ref pool) => ConnectorServiceFuture::Tls {
|
||||||
|
fut: pool.call(req),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
_ => ConnectorServiceFuture::Tcp {
|
||||||
|
fut: self.tcp_pool.call(req),
|
||||||
},
|
},
|
||||||
_ => ConnectorServiceFuture::Tcp(self.tcp_pool.call(req)),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = ConnectorServiceProj)]
|
pin_project! {
|
||||||
pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
|
#[project = ConnectorServiceFutureProj]
|
||||||
where
|
pub enum ConnectorServiceFuture<S1, S2, Io1, Io2>
|
||||||
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
|
where
|
||||||
+ Clone
|
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
|
||||||
+ 'static,
|
S1: Clone,
|
||||||
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
|
S1: 'static,
|
||||||
+ Clone
|
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
|
||||||
+ 'static,
|
S2: Clone,
|
||||||
Io1: ConnectionIo,
|
S2: 'static,
|
||||||
Io2: ConnectionIo,
|
Io1: ConnectionIo,
|
||||||
{
|
Io2: ConnectionIo,
|
||||||
Tcp(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
|
{
|
||||||
Tls(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
|
Tcp {
|
||||||
SslIsNotSupported,
|
#[pin]
|
||||||
|
fut: <ConnectionPool<S1, Io1> as Service<Connect>>::Future
|
||||||
|
},
|
||||||
|
Tls {
|
||||||
|
#[pin]
|
||||||
|
fut: <ConnectionPool<S2, Io2> as Service<Connect>>::Future
|
||||||
|
},
|
||||||
|
SslIsNotSupported
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
|
impl<S1, S2, Io1, Io2> Future for ConnectorServiceFuture<S1, S2, Io1, Io2>
|
||||||
where
|
where
|
||||||
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>
|
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + Clone + 'static,
|
||||||
+ Clone
|
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + Clone + 'static,
|
||||||
+ 'static,
|
|
||||||
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>
|
|
||||||
+ Clone
|
|
||||||
+ 'static,
|
|
||||||
Io1: ConnectionIo,
|
Io1: ConnectionIo,
|
||||||
Io2: ConnectionIo,
|
Io2: ConnectionIo,
|
||||||
{
|
{
|
||||||
|
@ -682,9 +673,9 @@ where
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.project() {
|
match self.project() {
|
||||||
ConnectorServiceProj::Tcp(fut) => fut.poll(cx).map_ok(Connection::Tcp),
|
ConnectorServiceFutureProj::Tcp { fut } => fut.poll(cx).map_ok(Connection::Tcp),
|
||||||
ConnectorServiceProj::Tls(fut) => fut.poll(cx).map_ok(Connection::Tls),
|
ConnectorServiceFutureProj::Tls { fut } => fut.poll(cx).map_ok(Connection::Tls),
|
||||||
ConnectorServiceProj::SslIsNotSupported => {
|
ConnectorServiceFutureProj::SslIsNotSupported => {
|
||||||
Poll::Ready(Err(ConnectError::SslIsNotSupported))
|
Poll::Ready(Err(ConnectError::SslIsNotSupported))
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -2,12 +2,13 @@ use std::{error::Error as StdError, fmt, io};
|
||||||
|
|
||||||
use derive_more::{Display, From};
|
use derive_more::{Display, From};
|
||||||
|
|
||||||
|
use actix_http::{
|
||||||
|
error::{Error, ParseError},
|
||||||
|
http::Error as HttpError,
|
||||||
|
};
|
||||||
#[cfg(feature = "openssl")]
|
#[cfg(feature = "openssl")]
|
||||||
use actix_tls::accept::openssl::SslError;
|
use actix_tls::accept::openssl::SslError;
|
||||||
|
|
||||||
use crate::error::{Error, ParseError};
|
|
||||||
use crate::http::Error as HttpError;
|
|
||||||
|
|
||||||
/// A set of errors that can occur while connecting to an HTTP host
|
/// A set of errors that can occur while connecting to an HTTP host
|
||||||
#[derive(Debug, Display, From)]
|
#[derive(Debug, Display, From)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
|
@ -5,24 +5,25 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::Framed;
|
use actix_codec::Framed;
|
||||||
|
use actix_http::{
|
||||||
|
body::{BodySize, MessageBody},
|
||||||
|
error::PayloadError,
|
||||||
|
h1,
|
||||||
|
http::{
|
||||||
|
header::{HeaderMap, IntoHeaderValue, EXPECT, HOST},
|
||||||
|
StatusCode,
|
||||||
|
},
|
||||||
|
Error, Payload, RequestHeadType, ResponseHead,
|
||||||
|
};
|
||||||
use actix_utils::future::poll_fn;
|
use actix_utils::future::poll_fn;
|
||||||
use bytes::buf::BufMut;
|
use bytes::buf::BufMut;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures_core::{ready, Stream};
|
use futures_core::{ready, Stream};
|
||||||
use futures_util::SinkExt as _;
|
use futures_util::SinkExt as _;
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
use crate::h1;
|
|
||||||
use crate::http::{
|
|
||||||
header::{HeaderMap, IntoHeaderValue, EXPECT, HOST},
|
|
||||||
StatusCode,
|
|
||||||
};
|
|
||||||
use crate::message::{RequestHeadType, ResponseHead};
|
|
||||||
use crate::payload::Payload;
|
|
||||||
use crate::{error::PayloadError, Error};
|
|
||||||
|
|
||||||
use super::connection::{ConnectionIo, H1Connection};
|
use super::connection::{ConnectionIo, H1Connection};
|
||||||
use super::error::{ConnectError, SendRequestError};
|
use super::error::{ConnectError, SendRequestError};
|
||||||
use crate::body::{BodySize, MessageBody};
|
|
||||||
|
|
||||||
pub(crate) async fn send_request<Io, B>(
|
pub(crate) async fn send_request<Io, B>(
|
||||||
io: H1Connection<Io>,
|
io: H1Connection<Io>,
|
||||||
|
@ -194,10 +195,11 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project::pin_project]
|
pin_project! {
|
||||||
pub(crate) struct PlStream<Io: ConnectionIo> {
|
pub(crate) struct PlStream<Io: ConnectionIo> {
|
||||||
#[pin]
|
#[pin]
|
||||||
framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
|
framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io: ConnectionIo> PlStream<Io> {
|
impl<Io: ConnectionIo> PlStream<Io> {
|
||||||
|
@ -211,10 +213,7 @@ impl<Io: ConnectionIo> PlStream<Io> {
|
||||||
impl<Io: ConnectionIo> Stream for PlStream<Io> {
|
impl<Io: ConnectionIo> Stream for PlStream<Io> {
|
||||||
type Item = Result<Bytes, PayloadError>;
|
type Item = Result<Bytes, PayloadError>;
|
||||||
|
|
||||||
fn poll_next(
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
) -> Poll<Option<Self::Item>> {
|
|
||||||
let mut this = self.project();
|
let mut this = self.project();
|
||||||
|
|
||||||
match ready!(this.framed.as_mut().next_item(cx)?) {
|
match ready!(this.framed.as_mut().next_item(cx)?) {
|
|
@ -8,13 +8,12 @@ use h2::{
|
||||||
};
|
};
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||||
use http::{request::Request, Method, Version};
|
use http::{request::Request, Method, Version};
|
||||||
|
use log::trace;
|
||||||
|
|
||||||
use crate::{
|
use actix_http::{
|
||||||
body::{BodySize, MessageBody},
|
body::{BodySize, MessageBody},
|
||||||
header::HeaderMap,
|
header::HeaderMap,
|
||||||
message::{RequestHeadType, ResponseHead},
|
Error, Payload, RequestHeadType, ResponseHead,
|
||||||
payload::Payload,
|
|
||||||
Error,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
|
@ -131,10 +130,7 @@ where
|
||||||
Ok((head, payload))
|
Ok((head, payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_body<B>(
|
async fn send_body<B>(body: B, mut send: SendStream<Bytes>) -> Result<(), SendRequestError>
|
||||||
body: B,
|
|
||||||
mut send: SendStream<Bytes>,
|
|
||||||
) -> Result<(), SendRequestError>
|
|
||||||
where
|
where
|
||||||
B: MessageBody,
|
B: MessageBody,
|
||||||
B::Error: Into<Error>,
|
B::Error: Into<Error>,
|
||||||
|
@ -184,8 +180,7 @@ where
|
||||||
pub(crate) fn handshake<Io: ConnectionIo>(
|
pub(crate) fn handshake<Io: ConnectionIo>(
|
||||||
io: Io,
|
io: Io,
|
||||||
config: &ConnectorConfig,
|
config: &ConnectorConfig,
|
||||||
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>>
|
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>> {
|
||||||
{
|
|
||||||
let mut builder = Builder::new();
|
let mut builder = Builder::new();
|
||||||
builder
|
builder
|
||||||
.initial_window_size(config.stream_window_size)
|
.initial_window_size(config.stream_window_size)
|
|
@ -17,7 +17,6 @@ pub use actix_tls::connect::{
|
||||||
pub use self::connection::{Connection, ConnectionIo};
|
pub use self::connection::{Connection, ConnectionIo};
|
||||||
pub use self::connector::{Connector, ConnectorService};
|
pub use self::connector::{Connector, ConnectorService};
|
||||||
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
|
pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
|
||||||
pub use crate::Protocol;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Connect {
|
pub struct Connect {
|
|
@ -14,22 +14,21 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_http::Protocol;
|
||||||
use actix_rt::time::{sleep, Sleep};
|
use actix_rt::time::{sleep, Sleep};
|
||||||
use actix_service::Service;
|
use actix_service::Service;
|
||||||
use ahash::AHashMap;
|
use ahash::AHashMap;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use futures_util::FutureExt;
|
||||||
use http::uri::Authority;
|
use http::uri::Authority;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||||
|
|
||||||
use super::config::ConnectorConfig;
|
use super::config::ConnectorConfig;
|
||||||
use super::connection::{
|
use super::connection::{ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner};
|
||||||
ConnectionInnerType, ConnectionIo, ConnectionType, H2ConnectionInner,
|
|
||||||
};
|
|
||||||
use super::error::ConnectError;
|
use super::error::ConnectError;
|
||||||
use super::h2proto::handshake;
|
use super::h2proto::handshake;
|
||||||
use super::Connect;
|
use super::Connect;
|
||||||
use super::Protocol;
|
|
||||||
|
|
||||||
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
|
||||||
pub struct Key {
|
pub struct Key {
|
||||||
|
@ -152,9 +151,7 @@ where
|
||||||
|
|
||||||
impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
|
impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
|
||||||
where
|
where
|
||||||
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError>
|
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + Clone + 'static,
|
||||||
+ Clone
|
|
||||||
+ 'static,
|
|
||||||
Io: ConnectionIo,
|
Io: ConnectionIo,
|
||||||
{
|
{
|
||||||
type Response = ConnectionType<Io>;
|
type Response = ConnectionType<Io>;
|
||||||
|
@ -195,8 +192,8 @@ where
|
||||||
let config = &inner.config;
|
let config = &inner.config;
|
||||||
let idle_dur = now - c.used;
|
let idle_dur = now - c.used;
|
||||||
let age = now - c.created;
|
let age = now - c.created;
|
||||||
let conn_ineligible = idle_dur > config.conn_keep_alive
|
let conn_ineligible =
|
||||||
|| age > config.conn_lifetime;
|
idle_dur > config.conn_keep_alive || age > config.conn_lifetime;
|
||||||
|
|
||||||
if conn_ineligible {
|
if conn_ineligible {
|
||||||
// drop connections that are too old
|
// drop connections that are too old
|
||||||
|
@ -205,7 +202,7 @@ where
|
||||||
// check if the connection is still usable
|
// check if the connection is still usable
|
||||||
if let ConnectionInnerType::H1(ref mut io) = c.conn {
|
if let ConnectionInnerType::H1(ref mut io) = c.conn {
|
||||||
let check = ConnectionCheckFuture { io };
|
let check = ConnectionCheckFuture { io };
|
||||||
match check.await {
|
match check.now_or_never().expect("ConnectionCheckFuture must never yield with Poll::Pending.") {
|
||||||
ConnectionState::Tainted => {
|
ConnectionState::Tainted => {
|
||||||
inner.close(c.conn);
|
inner.close(c.conn);
|
||||||
continue;
|
continue;
|
||||||
|
@ -231,9 +228,7 @@ where
|
||||||
|
|
||||||
// match the connection and spawn new one if did not get anything.
|
// match the connection and spawn new one if did not get anything.
|
||||||
match conn {
|
match conn {
|
||||||
Some(conn) => {
|
Some(conn) => Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired)),
|
||||||
Ok(ConnectionType::from_pool(conn.conn, conn.created, acquired))
|
|
||||||
}
|
|
||||||
None => {
|
None => {
|
||||||
let (io, proto) = connector.call(req).await?;
|
let (io, proto) = connector.call(req).await?;
|
||||||
|
|
||||||
|
@ -284,9 +279,7 @@ where
|
||||||
let mut read_buf = ReadBuf::new(&mut buf);
|
let mut read_buf = ReadBuf::new(&mut buf);
|
||||||
|
|
||||||
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
|
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
|
||||||
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => {
|
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => ConnectionState::Tainted,
|
||||||
ConnectionState::Tainted
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Pending => ConnectionState::Live,
|
Poll::Pending => ConnectionState::Live,
|
||||||
_ => ConnectionState::Skip,
|
_ => ConnectionState::Skip,
|
||||||
|
@ -302,11 +295,13 @@ struct PooledConnection<Io> {
|
||||||
created: Instant,
|
created: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
struct CloseConnection<Io> {
|
#[project = CloseConnectionProj]
|
||||||
io: Io,
|
struct CloseConnection<Io> {
|
||||||
#[pin]
|
io: Io,
|
||||||
timeout: Sleep,
|
#[pin]
|
||||||
|
timeout: Sleep,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Io> CloseConnection<Io>
|
impl<Io> CloseConnection<Io>
|
||||||
|
@ -413,17 +408,11 @@ mod test {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_: &mut Context<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_shutdown(
|
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
self: Pin<&mut Self>,
|
|
||||||
_: &mut Context<'_>,
|
|
||||||
) -> Poll<io::Result<()>> {
|
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -8,16 +8,14 @@ use std::{
|
||||||
|
|
||||||
use actix_codec::Framed;
|
use actix_codec::Framed;
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
body::Body,
|
body::Body, h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead,
|
||||||
client::{
|
|
||||||
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
|
|
||||||
},
|
|
||||||
h1::ClientCodec,
|
|
||||||
Payload, RequestHead, RequestHeadType, ResponseHead,
|
|
||||||
};
|
};
|
||||||
use actix_service::Service;
|
use actix_service::Service;
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
|
|
||||||
|
use crate::client::{
|
||||||
|
Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError,
|
||||||
|
};
|
||||||
use crate::response::ClientResponse;
|
use crate::response::ClientResponse;
|
||||||
|
|
||||||
pub type BoxConnectorService = Rc<
|
pub type BoxConnectorService = Rc<
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
//! HTTP client errors
|
//! HTTP client errors
|
||||||
|
|
||||||
pub use actix_http::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
|
pub use actix_http::{
|
||||||
pub use actix_http::error::PayloadError;
|
error::PayloadError,
|
||||||
pub use actix_http::http::Error as HttpError;
|
http::{header::HeaderValue, Error as HttpError, StatusCode},
|
||||||
pub use actix_http::ws::HandshakeError as WsHandshakeError;
|
ws::{HandshakeError as WsHandshakeError, ProtocolError as WsProtocolError},
|
||||||
pub use actix_http::ws::ProtocolError as WsProtocolError;
|
};
|
||||||
|
|
||||||
|
use derive_more::{Display, From};
|
||||||
use serde_json::error::Error as JsonError;
|
use serde_json::error::Error as JsonError;
|
||||||
|
|
||||||
use actix_http::http::{header::HeaderValue, StatusCode};
|
pub use crate::client::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError};
|
||||||
use derive_more::{Display, From};
|
|
||||||
|
|
||||||
/// Websocket client error
|
/// Websocket client error
|
||||||
#[derive(Debug, Display, From)]
|
#[derive(Debug, Display, From)]
|
||||||
|
|
|
@ -104,22 +104,8 @@
|
||||||
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
||||||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||||
|
|
||||||
use std::{convert::TryFrom, rc::Rc, time::Duration};
|
|
||||||
|
|
||||||
#[cfg(feature = "cookies")]
|
|
||||||
pub use cookie;
|
|
||||||
|
|
||||||
pub use actix_http::{client::Connector, http};
|
|
||||||
|
|
||||||
use actix_http::{
|
|
||||||
client::{TcpConnect, TcpConnectError, TcpConnection},
|
|
||||||
http::{Error as HttpError, HeaderMap, Method, Uri},
|
|
||||||
RequestHead,
|
|
||||||
};
|
|
||||||
use actix_rt::net::TcpStream;
|
|
||||||
use actix_service::Service;
|
|
||||||
|
|
||||||
mod builder;
|
mod builder;
|
||||||
|
mod client;
|
||||||
mod connect;
|
mod connect;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
mod frozen;
|
mod frozen;
|
||||||
|
@ -130,13 +116,29 @@ mod sender;
|
||||||
pub mod test;
|
pub mod test;
|
||||||
pub mod ws;
|
pub mod ws;
|
||||||
|
|
||||||
|
pub use actix_http::http;
|
||||||
|
#[cfg(feature = "cookies")]
|
||||||
|
pub use cookie;
|
||||||
|
|
||||||
pub use self::builder::ClientBuilder;
|
pub use self::builder::ClientBuilder;
|
||||||
|
pub use self::client::Connector;
|
||||||
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
|
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
|
||||||
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
|
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
|
||||||
pub use self::request::ClientRequest;
|
pub use self::request::ClientRequest;
|
||||||
pub use self::response::{ClientResponse, JsonBody, MessageBody};
|
pub use self::response::{ClientResponse, JsonBody, MessageBody};
|
||||||
pub use self::sender::SendClientRequest;
|
pub use self::sender::SendClientRequest;
|
||||||
|
|
||||||
|
use std::{convert::TryFrom, rc::Rc, time::Duration};
|
||||||
|
|
||||||
|
use actix_http::{
|
||||||
|
http::{Error as HttpError, HeaderMap, Method, Uri},
|
||||||
|
RequestHead,
|
||||||
|
};
|
||||||
|
use actix_rt::net::TcpStream;
|
||||||
|
use actix_service::Service;
|
||||||
|
|
||||||
|
use self::client::{TcpConnect, TcpConnectError, TcpConnection};
|
||||||
|
|
||||||
/// An asynchronous HTTP and WebSocket client.
|
/// An asynchronous HTTP and WebSocket client.
|
||||||
///
|
///
|
||||||
/// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU
|
/// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU
|
||||||
|
|
|
@ -9,7 +9,6 @@ use std::{
|
||||||
|
|
||||||
use actix_http::{
|
use actix_http::{
|
||||||
body::Body,
|
body::Body,
|
||||||
client::{InvalidUrl, SendRequestError},
|
|
||||||
http::{header, Method, StatusCode, Uri},
|
http::{header, Method, StatusCode, Uri},
|
||||||
RequestHead, RequestHeadType,
|
RequestHead, RequestHeadType,
|
||||||
};
|
};
|
||||||
|
@ -19,6 +18,7 @@ use futures_core::ready;
|
||||||
|
|
||||||
use super::Transform;
|
use super::Transform;
|
||||||
|
|
||||||
|
use crate::client::{InvalidUrl, SendRequestError};
|
||||||
use crate::connect::{ConnectRequest, ConnectResponse};
|
use crate::connect::{ConnectRequest, ConnectResponse};
|
||||||
use crate::ClientResponse;
|
use crate::ClientResponse;
|
||||||
|
|
||||||
|
|
|
@ -795,17 +795,15 @@ async fn client_unread_response() {
|
||||||
let lst = std::net::TcpListener::bind(addr).unwrap();
|
let lst = std::net::TcpListener::bind(addr).unwrap();
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
for stream in lst.incoming() {
|
let (mut stream, _) = lst.accept().unwrap();
|
||||||
let mut stream = stream.unwrap();
|
let mut b = [0; 1000];
|
||||||
let mut b = [0; 1000];
|
let _ = stream.read(&mut b).unwrap();
|
||||||
let _ = stream.read(&mut b).unwrap();
|
let _ = stream.write_all(
|
||||||
let _ = stream.write_all(
|
b"HTTP/1.1 200 OK\r\n\
|
||||||
b"HTTP/1.1 200 OK\r\n\
|
|
||||||
connection: close\r\n\
|
connection: close\r\n\
|
||||||
\r\n\
|
\r\n\
|
||||||
welcome!",
|
welcome!",
|
||||||
);
|
);
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// client request
|
// client request
|
||||||
|
|
|
@ -137,7 +137,7 @@ impl<T: ?Sized + 'static> FromRequest for Data<T> {
|
||||||
type_name::<T>(),
|
type_name::<T>(),
|
||||||
);
|
);
|
||||||
err(ErrorInternalServerError(
|
err(ErrorInternalServerError(
|
||||||
"App data is not configured, to configure use App::data()",
|
"App data is not configured, to configure construct it with web::Data::new() and pass it to App::app_data()",
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,52 +60,53 @@ crate::http::header::common_header! {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ContentType {
|
impl ContentType {
|
||||||
/// A constructor to easily create a `Content-Type: application/json`
|
/// A constructor to easily create a `Content-Type: application/json`
|
||||||
/// header.
|
/// header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn json() -> ContentType {
|
pub fn json() -> ContentType {
|
||||||
ContentType(mime::APPLICATION_JSON)
|
ContentType(mime::APPLICATION_JSON)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type: text/plain;
|
/// A constructor to easily create a `Content-Type: text/plain;
|
||||||
/// charset=utf-8` header.
|
/// charset=utf-8` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn plaintext() -> ContentType {
|
pub fn plaintext() -> ContentType {
|
||||||
ContentType(mime::TEXT_PLAIN_UTF_8)
|
ContentType(mime::TEXT_PLAIN_UTF_8)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type: text/html` header.
|
/// A constructor to easily create a `Content-Type: text/html; charset=utf-8`
|
||||||
|
/// header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn html() -> ContentType {
|
pub fn html() -> ContentType {
|
||||||
ContentType(mime::TEXT_HTML)
|
ContentType(mime::TEXT_HTML_UTF_8)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type: text/xml` header.
|
/// A constructor to easily create a `Content-Type: text/xml` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn xml() -> ContentType {
|
pub fn xml() -> ContentType {
|
||||||
ContentType(mime::TEXT_XML)
|
ContentType(mime::TEXT_XML)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type:
|
/// A constructor to easily create a `Content-Type:
|
||||||
/// application/www-form-url-encoded` header.
|
/// application/www-form-url-encoded` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn form_url_encoded() -> ContentType {
|
pub fn form_url_encoded() -> ContentType {
|
||||||
ContentType(mime::APPLICATION_WWW_FORM_URLENCODED)
|
ContentType(mime::APPLICATION_WWW_FORM_URLENCODED)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type: image/jpeg` header.
|
/// A constructor to easily create a `Content-Type: image/jpeg` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn jpeg() -> ContentType {
|
pub fn jpeg() -> ContentType {
|
||||||
ContentType(mime::IMAGE_JPEG)
|
ContentType(mime::IMAGE_JPEG)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type: image/png` header.
|
/// A constructor to easily create a `Content-Type: image/png` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn png() -> ContentType {
|
pub fn png() -> ContentType {
|
||||||
ContentType(mime::IMAGE_PNG)
|
ContentType(mime::IMAGE_PNG)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A constructor to easily create a `Content-Type:
|
/// A constructor to easily create a `Content-Type:
|
||||||
/// application/octet-stream` header.
|
/// application/octet-stream` header.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn octet_stream() -> ContentType {
|
pub fn octet_stream() -> ContentType {
|
||||||
|
|
Loading…
Reference in New Issue