Added RequestHeaderWrapper

This commit is contained in:
Dmitry Pypin 2019-09-05 10:48:06 -07:00
parent abac24fea2
commit 7f20d58525
9 changed files with 537 additions and 241 deletions

View File

@ -9,7 +9,7 @@ use h2::client::SendRequest;
use crate::body::MessageBody;
use crate::h1::ClientCodec;
use crate::message::{RequestHead, ResponseHead};
use crate::message::{RequestHead, RequestHeadWrapper, ResponseHead};
use crate::payload::Payload;
use crate::header::HeaderMap;
@ -30,6 +30,13 @@ pub trait Connection {
/// Send request and body
fn send_request<B: MessageBody + 'static>(
self,
head: RequestHead,
body: B,
) -> Self::Future;
/// Send request, extra headers and body
fn send_request_extra<B: MessageBody + 'static>(
self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -43,6 +50,11 @@ pub trait Connection {
/// Send request, returns Response and Framed
fn open_tunnel(self,
head: RequestHead,
) -> Self::TunnelFuture;
/// Send request and extra headers, returns Response and Framed
fn open_tunnel_extra(self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
) -> Self::TunnelFuture;
@ -112,6 +124,29 @@ where
}
fn send_request<B: MessageBody + 'static>(
mut self,
head: RequestHead,
body: B,
) -> Self::Future {
match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::new(h1proto::send_request(
io,
RequestHeadWrapper::Owned(head),
body,
self.created,
self.pool,
)),
ConnectionType::H2(io) => Box::new(h2proto::send_request(
io,
RequestHeadWrapper::Owned(head),
body,
self.created,
self.pool,
)),
}
}
fn send_request_extra<B: MessageBody + 'static>(
mut self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -120,16 +155,14 @@ where
match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::new(h1proto::send_request(
io,
head,
extra_headers,
RequestHeadWrapper::Rc(head, extra_headers),
body,
self.created,
self.pool,
)),
ConnectionType::H2(io) => Box::new(h2proto::send_request(
io,
head,
extra_headers,
RequestHeadWrapper::Rc(head, extra_headers),
body,
self.created,
self.pool,
@ -148,10 +181,28 @@ where
>;
/// Send request, returns Response and Framed
fn open_tunnel(mut self, head: Rc<RequestHead>, extra_headers: Option<HeaderMap>) -> Self::TunnelFuture {
fn open_tunnel(mut self, head: RequestHead) -> Self::TunnelFuture {
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
Either::A(Box::new(h1proto::open_tunnel(io, head, extra_headers)))
Either::A(Box::new(h1proto::open_tunnel(io, RequestHeadWrapper::Owned(head))))
}
ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() {
pool.release(IoConnection::new(
ConnectionType::H2(io),
self.created,
None,
));
}
Either::B(err(SendRequestError::TunnelNotSupported))
}
}
}
fn open_tunnel_extra(mut self, head: Rc<RequestHead>, extra_headers: Option<HeaderMap>) -> Self::TunnelFuture {
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
Either::A(Box::new(h1proto::open_tunnel(io, RequestHeadWrapper::Rc(head, extra_headers))))
}
ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() {
@ -190,14 +241,25 @@ where
}
fn send_request<RB: MessageBody + 'static>(
self,
head: RequestHead,
body: RB,
) -> Self::Future {
match self {
EitherConnection::A(con) => con.send_request(head, body),
EitherConnection::B(con) => con.send_request(head, body),
}
}
fn send_request_extra<RB: MessageBody + 'static>(
self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
body: RB,
) -> Self::Future {
match self {
EitherConnection::A(con) => con.send_request(head, extra_headers, body),
EitherConnection::B(con) => con.send_request(head, extra_headers, body),
EitherConnection::A(con) => con.send_request_extra(head, extra_headers, body),
EitherConnection::B(con) => con.send_request_extra(head, extra_headers, body),
}
}
@ -209,14 +271,27 @@ where
>;
/// Send request, returns Response and Framed
fn open_tunnel(self, head: Rc<RequestHead>, extra_headers: Option<HeaderMap>) -> Self::TunnelFuture {
fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture {
match self {
EitherConnection::A(con) => Box::new(
con.open_tunnel(head, extra_headers)
con.open_tunnel(head)
.map(|(head, framed)| (head, framed.map_io(EitherIo::A))),
),
EitherConnection::B(con) => Box::new(
con.open_tunnel(head, extra_headers)
con.open_tunnel(head)
.map(|(head, framed)| (head, framed.map_io(EitherIo::B))),
),
}
}
fn open_tunnel_extra(self, head: Rc<RequestHead>, extra_headers: Option<HeaderMap>) -> Self::TunnelFuture {
match self {
EitherConnection::A(con) => Box::new(
con.open_tunnel_extra(head, extra_headers)
.map(|(head, framed)| (head, framed.map_io(EitherIo::A))),
),
EitherConnection::B(con) => Box::new(
con.open_tunnel_extra(head, extra_headers)
.map(|(head, framed)| (head, framed.map_io(EitherIo::B))),
),
}

View File

@ -1,6 +1,5 @@
use std::io::Write;
use std::{io, time};
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{BufMut, Bytes, BytesMut};
@ -10,7 +9,7 @@ use futures::{Async, Future, Poll, Sink, Stream};
use crate::error::PayloadError;
use crate::h1;
use crate::http::header::{IntoHeaderValue, HOST};
use crate::message::{RequestHead, ResponseHead};
use crate::message::{RequestHeadWrapper, ResponseHead};
use crate::payload::{Payload, PayloadStream};
use crate::header::HeaderMap;
@ -21,8 +20,7 @@ use crate::body::{BodySize, MessageBody};
pub(crate) fn send_request<T, B>(
io: T,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
head_wrapper: RequestHeadWrapper,
body: B,
created: time::Instant,
pool: Option<Acquired<T>>,
@ -32,33 +30,41 @@ where
B: MessageBody,
{
// set request host header
let extra_headers = if !head.headers.contains_key(HOST) && !extra_headers.iter().any(|h| h.contains_key(HOST)) {
if let Some(host) = head.uri.host() {
let head_wrapper = if !head_wrapper.as_ref().headers.contains_key(HOST) && !head_wrapper.extra_headers().iter().any(|h| h.contains_key(HOST)) {
if let Some(host) = head_wrapper.as_ref().uri.host() {
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
let _ = match head.uri.port_u16() {
let _ = match head_wrapper.as_ref().uri.port_u16() {
None | Some(80) | Some(443) => write!(wrt, "{}", host),
Some(port) => write!(wrt, "{}:{}", host, port),
};
match wrt.get_mut().take().freeze().try_into() {
Ok(value) => {
match head_wrapper {
RequestHeadWrapper::Owned(mut head) => {
head.headers.insert(HOST, value);
RequestHeadWrapper::Owned(head)
},
RequestHeadWrapper::Rc(head, extra_headers) => {
let mut headers = extra_headers.unwrap_or(HeaderMap::new());
headers.insert(HOST, value);
Some(headers)
RequestHeadWrapper::Rc(head, Some(headers))
},
}
}
Err(e) => {
log::error!("Can not set HOST header {}", e);
extra_headers
head_wrapper
}
}
}
else {
extra_headers
head_wrapper
}
}
else {
extra_headers
head_wrapper
};
let io = H1Connection {
@ -69,9 +75,9 @@ where
let len = body.size();
// create Framed and send reqest
// create Framed and send request
Framed::new(io, h1::ClientCodec::default())
.send((head, extra_headers, len).into())
.send((head_wrapper, len).into())
.from_err()
// send request body
.and_then(move |framed| match body.size() {
@ -107,15 +113,14 @@ where
pub(crate) fn open_tunnel<T>(
io: T,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
head_wrapper: RequestHeadWrapper,
) -> impl Future<Item = (ResponseHead, Framed<T, h1::ClientCodec>), Error = SendRequestError>
where
T: AsyncRead + AsyncWrite + 'static,
{
// create Framed and send reqest
// create Framed and send request
Framed::new(io, h1::ClientCodec::default())
.send((head, extra_headers, BodySize::None).into())
.send((head_wrapper, BodySize::None).into())
.from_err()
// read response
.and_then(|framed| {

View File

@ -1,5 +1,4 @@
use std::time;
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes;
@ -10,7 +9,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{request::Request, HttpTryFrom, Method, Version};
use crate::body::{BodySize, MessageBody};
use crate::message::{RequestHead, ResponseHead};
use crate::message::{RequestHeadWrapper, ResponseHead};
use crate::payload::Payload;
use crate::header::HeaderMap;
@ -20,8 +19,7 @@ use super::pool::Acquired;
pub(crate) fn send_request<T, B>(
io: SendRequest<Bytes>,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
head_wrapper: RequestHeadWrapper,
body: B,
created: time::Instant,
pool: Option<Acquired<T>>,
@ -30,8 +28,8 @@ where
T: AsyncRead + AsyncWrite + 'static,
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.size());
let head_req = head.method == Method::HEAD;
trace!("Sending client request: {:?} {:?}", head_wrapper, body.size());
let head_req = head_wrapper.as_ref().method == Method::HEAD;
let length = body.size();
let eof = match length {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => true,
@ -42,8 +40,8 @@ where
.map_err(SendRequestError::from)
.and_then(move |mut io| {
let mut req = Request::new(());
*req.uri_mut() = head.uri.clone();
*req.method_mut() = head.method.clone();
*req.uri_mut() = head_wrapper.as_ref().uri.clone();
*req.method_mut() = head_wrapper.as_ref().method.clone();
*req.version_mut() = Version::HTTP_2;
let mut skip_len = true;
@ -69,9 +67,14 @@ where
),
};
// merging headers from head and extra headers. HeaderMap::new() does not allocate.
let extra_headers = extra_headers.unwrap_or(HeaderMap::new());
let headers = head.headers.iter()
// Extracting extra headers from RequestHeadWrapper. HeaderMap::new() does not allocate.
let (head_wrapper, extra_headers) = match head_wrapper {
RequestHeadWrapper::Owned(head) => (RequestHeadWrapper::Owned(head), HeaderMap::new()),
RequestHeadWrapper::Rc(head, extra_headers) => (RequestHeadWrapper::Rc(head, None), extra_headers.unwrap_or(HeaderMap::new())),
};
// merging headers from head and extra headers.
let headers = head_wrapper.as_ref().headers.iter()
.filter(|(name, _)| {
!extra_headers.contains_key(*name)
})

View File

@ -17,7 +17,7 @@ use crate::body::BodySize;
use crate::config::ServiceConfig;
use crate::error::{ParseError, PayloadError};
use crate::helpers;
use crate::message::{ConnectionType, Head, MessagePool, RequestHead, ResponseHead};
use crate::message::{ConnectionType, Head, MessagePool, RequestHead, RequestHeadWrapper, ResponseHead};
use crate::header::HeaderMap;
bitflags! {
@ -50,7 +50,7 @@ struct ClientCodecInner {
// encoder part
flags: Flags,
headers_size: u32,
encoder: encoder::MessageEncoder<(Rc<RequestHead>, Option<HeaderMap>)>,
encoder: encoder::MessageEncoder<RequestHeadWrapper>,
}
impl Default for ClientCodec {
@ -185,7 +185,7 @@ impl Decoder for ClientPayloadCodec {
}
impl Encoder for ClientCodec {
type Item = Message<(Rc<RequestHead>, Option<HeaderMap>, BodySize)>;
type Item = Message<(RequestHeadWrapper, BodySize)>;
type Error = io::Error;
fn encode(
@ -194,13 +194,13 @@ impl Encoder for ClientCodec {
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Message::Item((head, extra_headers, length)) => {
Message::Item((mut head_wrapper, length)) => {
let inner = &mut self.inner;
inner.version = head.version;
inner.flags.set(Flags::HEAD, head.method == Method::HEAD);
inner.version = head_wrapper.as_ref().version;
inner.flags.set(Flags::HEAD, head_wrapper.as_ref().method == Method::HEAD);
// connection status
inner.ctype = match head.connection_type() {
inner.ctype = match head_wrapper.as_ref().connection_type() {
ConnectionType::KeepAlive => {
if inner.flags.contains(Flags::KEEPALIVE_ENABLED) {
ConnectionType::KeepAlive
@ -214,7 +214,7 @@ impl Encoder for ClientCodec {
inner.encoder.encode(
dst,
&mut (head, extra_headers),
&mut head_wrapper,
false,
false,
inner.version,

View File

@ -16,7 +16,7 @@ use crate::http::header::{
HeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING,
};
use crate::http::{HeaderMap, Method, StatusCode, Version};
use crate::message::{ConnectionType, Head, RequestHead, ResponseHead};
use crate::message::{ConnectionType, Head, RequestHead, ResponseHead, RequestHeadWrapper};
use crate::request::Request;
use crate::response::Response;
@ -263,29 +263,29 @@ impl MessageType for Response<()> {
}
}
impl MessageType for (Rc<RequestHead>, Option<HeaderMap>) {
impl MessageType for RequestHeadWrapper {
fn status(&self) -> Option<StatusCode> {
None
}
fn chunked(&self) -> bool {
self.0.chunked()
self.as_ref().chunked()
}
fn camel_case(&self) -> bool {
RequestHead::camel_case_headers(&self.0)
self.as_ref().camel_case_headers()
}
fn headers(&self) -> &HeaderMap {
&self.0.headers
self.as_ref().headers()
}
fn extra_headers(&self) -> Option<&HeaderMap> {
self.1.as_ref()
self.extra_headers()
}
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> {
let head = &self.0;
let head = self.as_ref();
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
write!(
Writer(dst),
@ -538,9 +538,9 @@ mod tests {
head.headers
.insert(CONTENT_TYPE, HeaderValue::from_static("plain/text"));
let mut head_with_extra = (Rc::new(head), None);
let mut head_wrapper = RequestHeadWrapper::Owned(head);
let _ = head_with_extra.encode_headers(
let _ = head_wrapper.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Empty,
@ -552,7 +552,7 @@ mod tests {
Bytes::from_static(b"\r\nContent-Length: 0\r\nConnection: close\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n")
);
let _ = head_with_extra.encode_headers(
let _ = head_wrapper.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Stream,
@ -564,7 +564,7 @@ mod tests {
Bytes::from_static(b"\r\nTransfer-Encoding: chunked\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n")
);
let _ = head_with_extra.encode_headers(
let _ = head_wrapper.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Sized64(100),
@ -584,9 +584,9 @@ mod tests {
head.headers
.append(CONTENT_TYPE, HeaderValue::from_static("xml"));
let mut head_with_extra = (Rc::new(head), None);
let mut head_wrapper = RequestHeadWrapper::Owned(head);
let _ = head_with_extra.encode_headers(
let _ = head_wrapper.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Stream,
@ -610,9 +610,9 @@ mod tests {
extra_headers.insert(AUTHORIZATION,HeaderValue::from_static("another authorization"));
extra_headers.insert(DATE, HeaderValue::from_static("date"));
let mut head_with_extra = (Rc::new(head), Some(extra_headers));
let mut head_wrapper = RequestHeadWrapper::Rc(Rc::new(head), Some(extra_headers));
let _ = head_with_extra.encode_headers(
let _ = head_wrapper.encode_headers(
&mut bytes,
Version::HTTP_11,
BodySize::Empty,

View File

@ -181,6 +181,30 @@ impl RequestHead {
}
}
#[derive(Debug)]
pub enum RequestHeadWrapper {
Owned(RequestHead),
Rc(Rc<RequestHead>, Option<HeaderMap>),
}
impl RequestHeadWrapper {
pub fn extra_headers(&self) -> Option<&HeaderMap> {
match self {
RequestHeadWrapper::Owned(_) => None,
RequestHeadWrapper::Rc(_, headers) => headers.as_ref(),
}
}
}
impl AsRef<RequestHead> for RequestHeadWrapper {
fn as_ref(&self) -> &RequestHead {
match self {
RequestHeadWrapper::Owned(head) => &head,
RequestHeadWrapper::Rc(head, _) => head.as_ref(),
}
}
}
#[derive(Debug)]
pub struct ResponseHead {
pub version: Version,

View File

@ -18,6 +18,13 @@ pub(crate) struct ConnectorWrapper<T>(pub T);
pub(crate) trait Connect {
fn send_request(
&mut self,
head: RequestHead,
body: Body,
addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>>;
fn send_request_extra(
&mut self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -27,6 +34,18 @@ pub(crate) trait Connect {
/// Send request, returns Response and Framed
fn open_tunnel(
&mut self,
head: RequestHead,
addr: Option<net::SocketAddr>,
) -> Box<
dyn Future<
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>),
Error = SendRequestError,
>,
>;
/// Send request and extra headers, returns Response and Framed
fn open_tunnel_extra(
&mut self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -49,6 +68,26 @@ where
T::Future: 'static,
{
fn send_request(
&mut self,
head: RequestHead,
body: Body,
addr: Option<net::SocketAddr>,
) -> Box<dyn Future<Item = ClientResponse, Error = SendRequestError>> {
Box::new(
self.0
// connect to the host
.call(ClientConnect {
uri: head.uri.clone(),
addr,
})
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, body))
.map(|(head, payload)| ClientResponse::new(head, payload)),
)
}
fn send_request_extra(
&mut self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -64,12 +103,39 @@ where
})
.from_err()
// send request
.and_then(move |connection| connection.send_request(head, extra_headers, body))
.and_then(move |connection| connection.send_request_extra(head, extra_headers, body))
.map(|(head, payload)| ClientResponse::new(head, payload)),
)
}
fn open_tunnel(
&mut self,
head: RequestHead,
addr: Option<net::SocketAddr>,
) -> Box<
dyn Future<
Item = (ResponseHead, Framed<BoxedSocket, ClientCodec>),
Error = SendRequestError,
>,
> {
Box::new(
self.0
// connect to the host
.call(ClientConnect {
uri: head.uri.clone(),
addr,
})
.from_err()
// send request
.and_then(move |connection| connection.open_tunnel(head))
.map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed)
}),
)
}
fn open_tunnel_extra(
&mut self,
head: Rc<RequestHead>,
extra_headers: Option<HeaderMap>,
@ -89,7 +155,7 @@ where
})
.from_err()
// send request
.and_then(move |connection| connection.open_tunnel(head, extra_headers))
.and_then(move |connection| connection.open_tunnel_extra(head, extra_headers))
.map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed)

View File

@ -11,6 +11,7 @@ use percent_encoding::percent_encode;
use serde::Serialize;
use serde_json;
use tokio_timer::Timeout;
use derive_more::From;
use actix_http::body::{Body, BodyStream};
use actix_http::cookie::{Cookie, CookieJar, USERINFO};
@ -375,7 +376,112 @@ impl ClientRequest {
}
}
pub fn freeze(mut self) -> Result<FrozenClientRequest, FreezeRequestError> {
pub fn freeze(self) -> Result<FrozenClientRequest, FreezeRequestError> {
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Err(e.into()),
};
let request = FrozenClientRequest {
head: Rc::new(slf.head),
addr: slf.addr,
response_decompress: slf.response_decompress,
timeout: slf.timeout,
config: slf.config,
};
Ok(request)
}
/// Complete request construction and send body.
pub fn send_body<B>(
self,
body: B,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
B: Into<Body>,
{
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Either::A(err(e.into())),
};
Either::B(RequestSender::Owned(slf.head).send_body(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), body))
}
/// Set a JSON body and generate `ClientRequest`
pub fn send_json<T: Serialize>(
self,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Either::A(err(e.into())),
};
Either::B(RequestSender::Owned(slf.head).send_json(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value))
}
/// Set a urlencoded body and generate `ClientRequest`
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn send_form<T: Serialize>(
self,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Either::A(err(e.into())),
};
Either::B(RequestSender::Owned(slf.head).send_form(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value))
}
/// Set an streaming body and generate `ClientRequest`.
pub fn send_stream<S, E>(
self,
stream: S,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
{
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Either::A(err(e.into())),
};
Either::B(RequestSender::Owned(slf.head).send_stream(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), stream))
}
/// Set an empty body and generate `ClientRequest`.
pub fn send(
self,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let slf = match self.prep_for_sending() {
Ok(slf) => slf,
Err(e) => return Either::A(err(e.into())),
};
Either::B(RequestSender::Owned(slf.head).send(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref()))
}
fn prep_for_sending(mut self) -> Result<Self, PrepForSendingError> {
if let Some(e) = self.err {
return Err(e.into());
}
@ -438,112 +544,7 @@ impl ClientRequest {
}
}
let request = FrozenClientRequest {
head: Rc::new(slf.head),
addr: slf.addr,
response_decompress: slf.response_decompress,
timeout: slf.timeout,
config: slf.config,
};
Ok(request)
}
/// Complete request construction and send body.
pub fn send_body<B>(
self,
body: B,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
B: Into<Body>,
{
let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_body(None, body))
}
/// Set a JSON body and generate `ClientRequest`
pub fn send_json<T: Serialize>(
self,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
// set content-type
let slf = self.set_header_if_none(header::CONTENT_TYPE, "application/json");
let frozen_request = match slf.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_json(None, value))
}
/// Set a urlencoded body and generate `ClientRequest`
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn send_form<T: Serialize>(
self,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
// set content-type
let slf = self.set_header_if_none(
header::CONTENT_TYPE,
"application/x-www-form-urlencoded",
);
let frozen_request = match slf.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_form(None, value))
}
/// Set an streaming body and generate `ClientRequest`.
pub fn send_stream<S, E>(
self,
stream: S,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
{
let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_stream(None, stream))
}
/// Set an empty body and generate `ClientRequest`.
pub fn send(
self,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send(None))
Ok(slf)
}
}
@ -599,81 +600,36 @@ impl FrozenClientRequest {
where
B: Into<Body>,
{
let config = self.config.as_ref();
let response_decompress = self.response_decompress;
let fut = config
.connector
.borrow_mut()
.send_request(self.head.clone(), extra_headers, body.into(), self.addr)
.map(move |res| {
res.map_body(|head, payload| {
if response_decompress {
Payload::Stream(Decoder::from_headers(payload, &head.headers))
} else {
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
}
})
});
// set request timeout
if let Some(timeout) = self.timeout.or_else(|| config.timeout.clone()) {
Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout
}
}))
} else {
Either::B(fut)
}
RequestSender::Rc(self.head.clone(), extra_headers)
.send_body(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), body)
}
/// Send a JSON body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_json<T: Serialize>(
&self,
mut extra_headers: Option<HeaderMap>,
extra_headers: Option<HeaderMap>,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_json::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type
if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/json") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body))))
RequestSender::Rc(self.head.clone(), extra_headers)
.send_json(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value)
}
/// Send a urlencoded body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_form<T: Serialize>(
&self,
mut extra_headers: Option<HeaderMap>,
extra_headers: Option<HeaderMap>,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_urlencoded::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type
if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/x-www-form-urlencoded") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body))))
RequestSender::Rc(self.head.clone(), extra_headers)
.send_form(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value)
}
/// Send a streaming body with optional extra headers.
@ -690,7 +646,8 @@ impl FrozenClientRequest {
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
{
self.send_body(extra_headers, Body::from_message(BodyStream::new(stream)))
RequestSender::Rc(self.head.clone(), extra_headers)
.send_stream(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), stream)
}
/// Send an empty body with optional extra headers.
@ -702,16 +659,180 @@ impl FrozenClientRequest {
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
self.send_body(extra_headers, Body::Empty)
RequestSender::Rc(self.head.clone(), extra_headers)
.send(self.addr, self.response_decompress, self.timeout, self.config.as_ref())
}
}
fn set_extra_header_if_none<V>(&self, extra_headers: &mut Option<HeaderMap>, key: HeaderName, value: V) -> Result<(), HttpError>
#[derive(Debug, From)]
enum PrepForSendingError {
Url(InvalidUrl),
Http(HttpError),
}
impl Into<FreezeRequestError> for PrepForSendingError {
fn into(self) -> FreezeRequestError {
match self {
PrepForSendingError::Url(e) => FreezeRequestError::Url(e),
PrepForSendingError::Http(e) => FreezeRequestError::Http(e),
}
}
}
impl Into<SendRequestError> for PrepForSendingError {
fn into(self) -> SendRequestError {
match self {
PrepForSendingError::Url(e) => SendRequestError::Url(e),
PrepForSendingError::Http(e) => SendRequestError::Http(e),
}
}
}
enum RequestSender {
Owned(RequestHead),
Rc(Rc<RequestHead>, Option<HeaderMap>),
}
impl RequestSender {
pub fn send_body<B>(
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: &ClientConfig,
body: B,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
B: Into<Body>,
{
let mut connector = config.connector.borrow_mut();
let fut = match self {
RequestSender::Owned(head) => connector.send_request(head, body.into(), addr),
RequestSender::Rc(head, extra_headers) => connector.send_request_extra(head, extra_headers, body.into(), addr),
};
let fut = fut
.map(move |res| {
res.map_body(|head, payload| {
if response_decompress {
Payload::Stream(Decoder::from_headers(payload, &head.headers))
} else {
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
}
})
});
// set request timeout
if let Some(timeout) = timeout.or_else(|| config.timeout.clone()) {
Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout
}
}))
} else {
Either::B(fut)
}
}
pub fn send_json<T: Serialize>(
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: &ClientConfig,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_json::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))))
}
pub fn send_form<T: Serialize>(
mut self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: &ClientConfig,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_urlencoded::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type
if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/x-www-form-urlencoded") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))))
}
pub fn send_stream<S, E>(
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: &ClientConfig,
stream: S,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
{
self.send_body(addr, response_decompress, timeout, config, Body::from_message(BodyStream::new(stream)))
}
pub fn send(
self,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: &ClientConfig,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
self.send_body(addr, response_decompress, timeout, config, Body::Empty)
}
fn set_header_if_none<V>(&mut self, key: HeaderName, value: V) -> Result<(), HttpError>
where
V: IntoHeaderValue,
{
if !self.head.headers.contains_key(&key)
&& !extra_headers.iter().any(|h| h.contains_key(&key)) {
match self {
RequestSender::Owned(head) => {
if !head.headers.contains_key(&key) {
match value.try_into() {
Ok(value) => head.headers.insert(key, value),
Err(e) => return Err(e.into()),
}
}
},
RequestSender::Rc(head, extra_headers) => {
if !head.headers.contains_key(&key) && !extra_headers.iter().any(|h| h.contains_key(&key)) {
match value.try_into(){
Ok(v) => {
let h = extra_headers.get_or_insert(HeaderMap::new());
@ -720,6 +841,8 @@ impl FrozenClientRequest {
Err(e) => return Err(e.into()),
};
}
}
}
Ok(())
}

View File

@ -287,7 +287,7 @@ impl WebsocketsRequest {
.config
.connector
.borrow_mut()
.open_tunnel(Rc::new(head), None, self.addr)
.open_tunnel(head, self.addr)
.from_err()
.and_then(move |(head, framed)| {
// verify response