diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index a5b22f065..2584c05c5 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -9,7 +9,7 @@ use h2::client::SendRequest; use crate::body::MessageBody; use crate::h1::ClientCodec; -use crate::message::{RequestHead, ResponseHead}; +use crate::message::{RequestHead, RequestHeadWrapper, ResponseHead}; use crate::payload::Payload; use crate::header::HeaderMap; @@ -30,6 +30,13 @@ pub trait Connection { /// Send request and body fn send_request( + self, + head: RequestHead, + body: B, + ) -> Self::Future; + + /// Send request, extra headers and body + fn send_request_extra( self, head: Rc, extra_headers: Option, @@ -43,6 +50,11 @@ pub trait Connection { /// Send request, returns Response and Framed fn open_tunnel(self, + head: RequestHead, + ) -> Self::TunnelFuture; + + /// Send request and extra headers, returns Response and Framed + fn open_tunnel_extra(self, head: Rc, extra_headers: Option, ) -> Self::TunnelFuture; @@ -112,6 +124,29 @@ where } fn send_request( + mut self, + head: RequestHead, + body: B, + ) -> Self::Future { + match self.io.take().unwrap() { + ConnectionType::H1(io) => Box::new(h1proto::send_request( + io, + RequestHeadWrapper::Owned(head), + body, + self.created, + self.pool, + )), + ConnectionType::H2(io) => Box::new(h2proto::send_request( + io, + RequestHeadWrapper::Owned(head), + body, + self.created, + self.pool, + )), + } + } + + fn send_request_extra( mut self, head: Rc, extra_headers: Option, @@ -120,16 +155,14 @@ where match self.io.take().unwrap() { ConnectionType::H1(io) => Box::new(h1proto::send_request( io, - head, - extra_headers, + RequestHeadWrapper::Rc(head, extra_headers), body, self.created, self.pool, )), ConnectionType::H2(io) => Box::new(h2proto::send_request( io, - head, - extra_headers, + RequestHeadWrapper::Rc(head, extra_headers), body, self.created, self.pool, @@ -148,10 +181,28 @@ where >; /// Send request, returns Response and Framed - fn open_tunnel(mut self, head: Rc, extra_headers: Option) -> Self::TunnelFuture { + fn open_tunnel(mut self, head: RequestHead) -> Self::TunnelFuture { match self.io.take().unwrap() { ConnectionType::H1(io) => { - Either::A(Box::new(h1proto::open_tunnel(io, head, extra_headers))) + Either::A(Box::new(h1proto::open_tunnel(io, RequestHeadWrapper::Owned(head)))) + } + ConnectionType::H2(io) => { + if let Some(mut pool) = self.pool.take() { + pool.release(IoConnection::new( + ConnectionType::H2(io), + self.created, + None, + )); + } + Either::B(err(SendRequestError::TunnelNotSupported)) + } + } + } + + fn open_tunnel_extra(mut self, head: Rc, extra_headers: Option) -> Self::TunnelFuture { + match self.io.take().unwrap() { + ConnectionType::H1(io) => { + Either::A(Box::new(h1proto::open_tunnel(io, RequestHeadWrapper::Rc(head, extra_headers)))) } ConnectionType::H2(io) => { if let Some(mut pool) = self.pool.take() { @@ -190,14 +241,25 @@ where } fn send_request( + self, + head: RequestHead, + body: RB, + ) -> Self::Future { + match self { + EitherConnection::A(con) => con.send_request(head, body), + EitherConnection::B(con) => con.send_request(head, body), + } + } + + fn send_request_extra( self, head: Rc, extra_headers: Option, body: RB, ) -> Self::Future { match self { - EitherConnection::A(con) => con.send_request(head, extra_headers, body), - EitherConnection::B(con) => con.send_request(head, extra_headers, body), + EitherConnection::A(con) => con.send_request_extra(head, extra_headers, body), + EitherConnection::B(con) => con.send_request_extra(head, extra_headers, body), } } @@ -209,14 +271,27 @@ where >; /// Send request, returns Response and Framed - fn open_tunnel(self, head: Rc, extra_headers: Option) -> Self::TunnelFuture { + fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture { match self { EitherConnection::A(con) => Box::new( - con.open_tunnel(head, extra_headers) + con.open_tunnel(head) .map(|(head, framed)| (head, framed.map_io(EitherIo::A))), ), EitherConnection::B(con) => Box::new( - con.open_tunnel(head, extra_headers) + con.open_tunnel(head) + .map(|(head, framed)| (head, framed.map_io(EitherIo::B))), + ), + } + } + + fn open_tunnel_extra(self, head: Rc, extra_headers: Option) -> Self::TunnelFuture { + match self { + EitherConnection::A(con) => Box::new( + con.open_tunnel_extra(head, extra_headers) + .map(|(head, framed)| (head, framed.map_io(EitherIo::A))), + ), + EitherConnection::B(con) => Box::new( + con.open_tunnel_extra(head, extra_headers) .map(|(head, framed)| (head, framed.map_io(EitherIo::B))), ), } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 4163886c6..5bb6ae43b 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -1,6 +1,5 @@ use std::io::Write; use std::{io, time}; -use std::rc::Rc; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use bytes::{BufMut, Bytes, BytesMut}; @@ -10,7 +9,7 @@ use futures::{Async, Future, Poll, Sink, Stream}; use crate::error::PayloadError; use crate::h1; use crate::http::header::{IntoHeaderValue, HOST}; -use crate::message::{RequestHead, ResponseHead}; +use crate::message::{RequestHeadWrapper, ResponseHead}; use crate::payload::{Payload, PayloadStream}; use crate::header::HeaderMap; @@ -21,8 +20,7 @@ use crate::body::{BodySize, MessageBody}; pub(crate) fn send_request( io: T, - head: Rc, - extra_headers: Option, + head_wrapper: RequestHeadWrapper, body: B, created: time::Instant, pool: Option>, @@ -32,33 +30,41 @@ where B: MessageBody, { // set request host header - let extra_headers = if !head.headers.contains_key(HOST) && !extra_headers.iter().any(|h| h.contains_key(HOST)) { - if let Some(host) = head.uri.host() { + let head_wrapper = if !head_wrapper.as_ref().headers.contains_key(HOST) && !head_wrapper.extra_headers().iter().any(|h| h.contains_key(HOST)) { + if let Some(host) = head_wrapper.as_ref().uri.host() { let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); - let _ = match head.uri.port_u16() { + let _ = match head_wrapper.as_ref().uri.port_u16() { None | Some(80) | Some(443) => write!(wrt, "{}", host), Some(port) => write!(wrt, "{}:{}", host, port), }; match wrt.get_mut().take().freeze().try_into() { Ok(value) => { - let mut headers = extra_headers.unwrap_or(HeaderMap::new()); - headers.insert(HOST, value); - Some(headers) + match head_wrapper { + RequestHeadWrapper::Owned(mut head) => { + head.headers.insert(HOST, value); + RequestHeadWrapper::Owned(head) + }, + RequestHeadWrapper::Rc(head, extra_headers) => { + let mut headers = extra_headers.unwrap_or(HeaderMap::new()); + headers.insert(HOST, value); + RequestHeadWrapper::Rc(head, Some(headers)) + }, + } } Err(e) => { log::error!("Can not set HOST header {}", e); - extra_headers + head_wrapper } } } else { - extra_headers + head_wrapper } } else { - extra_headers + head_wrapper }; let io = H1Connection { @@ -69,9 +75,9 @@ where let len = body.size(); - // create Framed and send reqest + // create Framed and send request Framed::new(io, h1::ClientCodec::default()) - .send((head, extra_headers, len).into()) + .send((head_wrapper, len).into()) .from_err() // send request body .and_then(move |framed| match body.size() { @@ -107,15 +113,14 @@ where pub(crate) fn open_tunnel( io: T, - head: Rc, - extra_headers: Option, + head_wrapper: RequestHeadWrapper, ) -> impl Future), Error = SendRequestError> where T: AsyncRead + AsyncWrite + 'static, { - // create Framed and send reqest + // create Framed and send request Framed::new(io, h1::ClientCodec::default()) - .send((head, extra_headers, BodySize::None).into()) + .send((head_wrapper, BodySize::None).into()) .from_err() // read response .and_then(|framed| { diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 844c4cfa2..bd34c6dca 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -1,5 +1,4 @@ use std::time; -use std::rc::Rc; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; @@ -10,7 +9,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, HttpTryFrom, Method, Version}; use crate::body::{BodySize, MessageBody}; -use crate::message::{RequestHead, ResponseHead}; +use crate::message::{RequestHeadWrapper, ResponseHead}; use crate::payload::Payload; use crate::header::HeaderMap; @@ -20,8 +19,7 @@ use super::pool::Acquired; pub(crate) fn send_request( io: SendRequest, - head: Rc, - extra_headers: Option, + head_wrapper: RequestHeadWrapper, body: B, created: time::Instant, pool: Option>, @@ -30,8 +28,8 @@ where T: AsyncRead + AsyncWrite + 'static, B: MessageBody, { - trace!("Sending client request: {:?} {:?}", head, body.size()); - let head_req = head.method == Method::HEAD; + trace!("Sending client request: {:?} {:?}", head_wrapper, body.size()); + let head_req = head_wrapper.as_ref().method == Method::HEAD; let length = body.size(); let eof = match length { BodySize::None | BodySize::Empty | BodySize::Sized(0) => true, @@ -42,8 +40,8 @@ where .map_err(SendRequestError::from) .and_then(move |mut io| { let mut req = Request::new(()); - *req.uri_mut() = head.uri.clone(); - *req.method_mut() = head.method.clone(); + *req.uri_mut() = head_wrapper.as_ref().uri.clone(); + *req.method_mut() = head_wrapper.as_ref().method.clone(); *req.version_mut() = Version::HTTP_2; let mut skip_len = true; @@ -69,9 +67,14 @@ where ), }; - // merging headers from head and extra headers. HeaderMap::new() does not allocate. - let extra_headers = extra_headers.unwrap_or(HeaderMap::new()); - let headers = head.headers.iter() + // Extracting extra headers from RequestHeadWrapper. HeaderMap::new() does not allocate. + let (head_wrapper, extra_headers) = match head_wrapper { + RequestHeadWrapper::Owned(head) => (RequestHeadWrapper::Owned(head), HeaderMap::new()), + RequestHeadWrapper::Rc(head, extra_headers) => (RequestHeadWrapper::Rc(head, None), extra_headers.unwrap_or(HeaderMap::new())), + }; + + // merging headers from head and extra headers. + let headers = head_wrapper.as_ref().headers.iter() .filter(|(name, _)| { !extra_headers.contains_key(*name) }) diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index be95b5d96..4fefdf856 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -17,7 +17,7 @@ use crate::body::BodySize; use crate::config::ServiceConfig; use crate::error::{ParseError, PayloadError}; use crate::helpers; -use crate::message::{ConnectionType, Head, MessagePool, RequestHead, ResponseHead}; +use crate::message::{ConnectionType, Head, MessagePool, RequestHead, RequestHeadWrapper, ResponseHead}; use crate::header::HeaderMap; bitflags! { @@ -50,7 +50,7 @@ struct ClientCodecInner { // encoder part flags: Flags, headers_size: u32, - encoder: encoder::MessageEncoder<(Rc, Option)>, + encoder: encoder::MessageEncoder, } impl Default for ClientCodec { @@ -185,7 +185,7 @@ impl Decoder for ClientPayloadCodec { } impl Encoder for ClientCodec { - type Item = Message<(Rc, Option, BodySize)>; + type Item = Message<(RequestHeadWrapper, BodySize)>; type Error = io::Error; fn encode( @@ -194,13 +194,13 @@ impl Encoder for ClientCodec { dst: &mut BytesMut, ) -> Result<(), Self::Error> { match item { - Message::Item((head, extra_headers, length)) => { + Message::Item((mut head_wrapper, length)) => { let inner = &mut self.inner; - inner.version = head.version; - inner.flags.set(Flags::HEAD, head.method == Method::HEAD); + inner.version = head_wrapper.as_ref().version; + inner.flags.set(Flags::HEAD, head_wrapper.as_ref().method == Method::HEAD); // connection status - inner.ctype = match head.connection_type() { + inner.ctype = match head_wrapper.as_ref().connection_type() { ConnectionType::KeepAlive => { if inner.flags.contains(Flags::KEEPALIVE_ENABLED) { ConnectionType::KeepAlive @@ -214,7 +214,7 @@ impl Encoder for ClientCodec { inner.encoder.encode( dst, - &mut (head, extra_headers), + &mut head_wrapper, false, false, inner.version, diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index cae0ba88b..6cfc5f92c 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -16,7 +16,7 @@ use crate::http::header::{ HeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, }; use crate::http::{HeaderMap, Method, StatusCode, Version}; -use crate::message::{ConnectionType, Head, RequestHead, ResponseHead}; +use crate::message::{ConnectionType, Head, RequestHead, ResponseHead, RequestHeadWrapper}; use crate::request::Request; use crate::response::Response; @@ -263,29 +263,29 @@ impl MessageType for Response<()> { } } -impl MessageType for (Rc, Option) { +impl MessageType for RequestHeadWrapper { fn status(&self) -> Option { None } fn chunked(&self) -> bool { - self.0.chunked() + self.as_ref().chunked() } fn camel_case(&self) -> bool { - RequestHead::camel_case_headers(&self.0) + self.as_ref().camel_case_headers() } fn headers(&self) -> &HeaderMap { - &self.0.headers + self.as_ref().headers() } fn extra_headers(&self) -> Option<&HeaderMap> { - self.1.as_ref() + self.extra_headers() } fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { - let head = &self.0; + let head = self.as_ref(); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); write!( Writer(dst), @@ -538,9 +538,9 @@ mod tests { head.headers .insert(CONTENT_TYPE, HeaderValue::from_static("plain/text")); - let mut head_with_extra = (Rc::new(head), None); + let mut head_wrapper = RequestHeadWrapper::Owned(head); - let _ = head_with_extra.encode_headers( + let _ = head_wrapper.encode_headers( &mut bytes, Version::HTTP_11, BodySize::Empty, @@ -552,7 +552,7 @@ mod tests { Bytes::from_static(b"\r\nContent-Length: 0\r\nConnection: close\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n") ); - let _ = head_with_extra.encode_headers( + let _ = head_wrapper.encode_headers( &mut bytes, Version::HTTP_11, BodySize::Stream, @@ -564,7 +564,7 @@ mod tests { Bytes::from_static(b"\r\nTransfer-Encoding: chunked\r\nDate: date\r\nContent-Type: plain/text\r\n\r\n") ); - let _ = head_with_extra.encode_headers( + let _ = head_wrapper.encode_headers( &mut bytes, Version::HTTP_11, BodySize::Sized64(100), @@ -584,9 +584,9 @@ mod tests { head.headers .append(CONTENT_TYPE, HeaderValue::from_static("xml")); - let mut head_with_extra = (Rc::new(head), None); + let mut head_wrapper = RequestHeadWrapper::Owned(head); - let _ = head_with_extra.encode_headers( + let _ = head_wrapper.encode_headers( &mut bytes, Version::HTTP_11, BodySize::Stream, @@ -610,9 +610,9 @@ mod tests { extra_headers.insert(AUTHORIZATION,HeaderValue::from_static("another authorization")); extra_headers.insert(DATE, HeaderValue::from_static("date")); - let mut head_with_extra = (Rc::new(head), Some(extra_headers)); + let mut head_wrapper = RequestHeadWrapper::Rc(Rc::new(head), Some(extra_headers)); - let _ = head_with_extra.encode_headers( + let _ = head_wrapper.encode_headers( &mut bytes, Version::HTTP_11, BodySize::Empty, diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index cf23a401c..bb8c87aef 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -181,6 +181,30 @@ impl RequestHead { } } +#[derive(Debug)] +pub enum RequestHeadWrapper { + Owned(RequestHead), + Rc(Rc, Option), +} + +impl RequestHeadWrapper { + pub fn extra_headers(&self) -> Option<&HeaderMap> { + match self { + RequestHeadWrapper::Owned(_) => None, + RequestHeadWrapper::Rc(_, headers) => headers.as_ref(), + } + } +} + +impl AsRef for RequestHeadWrapper { + fn as_ref(&self) -> &RequestHead { + match self { + RequestHeadWrapper::Owned(head) => &head, + RequestHeadWrapper::Rc(head, _) => head.as_ref(), + } + } +} + #[derive(Debug)] pub struct ResponseHead { pub version: Version, diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 77ee73bd9..63b8a895a 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -18,6 +18,13 @@ pub(crate) struct ConnectorWrapper(pub T); pub(crate) trait Connect { fn send_request( + &mut self, + head: RequestHead, + body: Body, + addr: Option, + ) -> Box>; + + fn send_request_extra( &mut self, head: Rc, extra_headers: Option, @@ -27,6 +34,18 @@ pub(crate) trait Connect { /// Send request, returns Response and Framed fn open_tunnel( + &mut self, + head: RequestHead, + addr: Option, + ) -> Box< + dyn Future< + Item = (ResponseHead, Framed), + Error = SendRequestError, + >, + >; + + /// Send request and extra headers, returns Response and Framed + fn open_tunnel_extra( &mut self, head: Rc, extra_headers: Option, @@ -49,6 +68,26 @@ where T::Future: 'static, { fn send_request( + &mut self, + head: RequestHead, + body: Body, + addr: Option, + ) -> Box> { + Box::new( + self.0 + // connect to the host + .call(ClientConnect { + uri: head.uri.clone(), + addr, + }) + .from_err() + // send request + .and_then(move |connection| connection.send_request(head, body)) + .map(|(head, payload)| ClientResponse::new(head, payload)), + ) + } + + fn send_request_extra( &mut self, head: Rc, extra_headers: Option, @@ -64,12 +103,39 @@ where }) .from_err() // send request - .and_then(move |connection| connection.send_request(head, extra_headers, body)) + .and_then(move |connection| connection.send_request_extra(head, extra_headers, body)) .map(|(head, payload)| ClientResponse::new(head, payload)), ) } fn open_tunnel( + &mut self, + head: RequestHead, + addr: Option, + ) -> Box< + dyn Future< + Item = (ResponseHead, Framed), + Error = SendRequestError, + >, + > { + Box::new( + self.0 + // connect to the host + .call(ClientConnect { + uri: head.uri.clone(), + addr, + }) + .from_err() + // send request + .and_then(move |connection| connection.open_tunnel(head)) + .map(|(head, framed)| { + let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + (head, framed) + }), + ) + } + + fn open_tunnel_extra( &mut self, head: Rc, extra_headers: Option, @@ -89,7 +155,7 @@ where }) .from_err() // send request - .and_then(move |connection| connection.open_tunnel(head, extra_headers)) + .and_then(move |connection| connection.open_tunnel_extra(head, extra_headers)) .map(|(head, framed)| { let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); (head, framed) diff --git a/awc/src/request.rs b/awc/src/request.rs index 0743c3f85..86651200e 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -11,6 +11,7 @@ use percent_encoding::percent_encode; use serde::Serialize; use serde_json; use tokio_timer::Timeout; +use derive_more::From; use actix_http::body::{Body, BodyStream}; use actix_http::cookie::{Cookie, CookieJar, USERINFO}; @@ -375,7 +376,112 @@ impl ClientRequest { } } - pub fn freeze(mut self) -> Result { + pub fn freeze(self) -> Result { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Err(e.into()), + }; + + let request = FrozenClientRequest { + head: Rc::new(slf.head), + addr: slf.addr, + response_decompress: slf.response_decompress, + timeout: slf.timeout, + config: slf.config, + }; + + Ok(request) + } + + /// Complete request construction and send body. + pub fn send_body( + self, + body: B, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > + where + B: Into, + { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Either::A(err(e.into())), + }; + + Either::B(RequestSender::Owned(slf.head).send_body(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), body)) + } + + /// Set a JSON body and generate `ClientRequest` + pub fn send_json( + self, + value: &T, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Either::A(err(e.into())), + }; + + Either::B(RequestSender::Owned(slf.head).send_json(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value)) + } + + /// Set a urlencoded body and generate `ClientRequest` + /// + /// `ClientRequestBuilder` can not be used after this call. + pub fn send_form( + self, + value: &T, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Either::A(err(e.into())), + }; + + Either::B(RequestSender::Owned(slf.head).send_form(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value)) + } + + /// Set an streaming body and generate `ClientRequest`. + pub fn send_stream( + self, + stream: S, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > + where + S: Stream + 'static, + E: Into + 'static, + { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Either::A(err(e.into())), + }; + + Either::B(RequestSender::Owned(slf.head).send_stream(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), stream)) + } + + /// Set an empty body and generate `ClientRequest`. + pub fn send( + self, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + let slf = match self.prep_for_sending() { + Ok(slf) => slf, + Err(e) => return Either::A(err(e.into())), + }; + + Either::B(RequestSender::Owned(slf.head).send(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref())) + } + + fn prep_for_sending(mut self) -> Result { if let Some(e) = self.err { return Err(e.into()); } @@ -438,112 +544,7 @@ impl ClientRequest { } } - let request = FrozenClientRequest { - head: Rc::new(slf.head), - addr: slf.addr, - response_decompress: slf.response_decompress, - timeout: slf.timeout, - config: slf.config, - }; - - Ok(request) - } - - /// Complete request construction and send body. - pub fn send_body( - self, - body: B, - ) -> impl Future< - Item = ClientResponse>, - Error = SendRequestError, - > - where - B: Into, - { - let frozen_request = match self.freeze() { - Ok(r) => r, - Err(e) => return Either::A(err(e.into())), - }; - - Either::B(frozen_request.send_body(None, body)) - } - - /// Set a JSON body and generate `ClientRequest` - pub fn send_json( - self, - value: &T, - ) -> impl Future< - Item = ClientResponse>, - Error = SendRequestError, - > { - // set content-type - let slf = self.set_header_if_none(header::CONTENT_TYPE, "application/json"); - - let frozen_request = match slf.freeze() { - Ok(r) => r, - Err(e) => return Either::A(err(e.into())), - }; - - Either::B(frozen_request.send_json(None, value)) - } - - /// Set a urlencoded body and generate `ClientRequest` - /// - /// `ClientRequestBuilder` can not be used after this call. - pub fn send_form( - self, - value: &T, - ) -> impl Future< - Item = ClientResponse>, - Error = SendRequestError, - > { - // set content-type - let slf = self.set_header_if_none( - header::CONTENT_TYPE, - "application/x-www-form-urlencoded", - ); - - let frozen_request = match slf.freeze() { - Ok(r) => r, - Err(e) => return Either::A(err(e.into())), - }; - - Either::B(frozen_request.send_form(None, value)) - } - - /// Set an streaming body and generate `ClientRequest`. - pub fn send_stream( - self, - stream: S, - ) -> impl Future< - Item = ClientResponse>, - Error = SendRequestError, - > - where - S: Stream + 'static, - E: Into + 'static, - { - let frozen_request = match self.freeze() { - Ok(r) => r, - Err(e) => return Either::A(err(e.into())), - }; - - Either::B(frozen_request.send_stream(None, stream)) - } - - /// Set an empty body and generate `ClientRequest`. - pub fn send( - self, - ) -> impl Future< - Item = ClientResponse>, - Error = SendRequestError, - > { - let frozen_request = match self.freeze() { - Ok(r) => r, - Err(e) => return Either::A(err(e.into())), - }; - - Either::B(frozen_request.send(None)) + Ok(slf) } } @@ -599,81 +600,36 @@ impl FrozenClientRequest { where B: Into, { - let config = self.config.as_ref(); - let response_decompress = self.response_decompress; - - let fut = config - .connector - .borrow_mut() - .send_request(self.head.clone(), extra_headers, body.into(), self.addr) - .map(move |res| { - res.map_body(|head, payload| { - if response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) - } else { - Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) - } - }) - }); - - // set request timeout - if let Some(timeout) = self.timeout.or_else(|| config.timeout.clone()) { - Either::A(Timeout::new(fut, timeout).map_err(|e| { - if let Some(e) = e.into_inner() { - e - } else { - SendRequestError::Timeout - } - })) - } else { - Either::B(fut) - } + RequestSender::Rc(self.head.clone(), extra_headers) + .send_body(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), body) } /// Send a JSON body with optional extra headers. /// Extra headers will override corresponding existing headers in a frozen request. pub fn send_json( &self, - mut extra_headers: Option, + extra_headers: Option, value: &T, ) -> impl Future< Item = ClientResponse>, Error = SendRequestError, > { - let body = match serde_json::to_string(value) { - Ok(body) => body, - Err(e) => return Either::A(err(Error::from(e).into())), - }; - - // set content-type - if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/json") { - return Either::A(err(e.into())); - } - - Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body)))) + RequestSender::Rc(self.head.clone(), extra_headers) + .send_json(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) } /// Send a urlencoded body with optional extra headers. /// Extra headers will override corresponding existing headers in a frozen request. pub fn send_form( &self, - mut extra_headers: Option, + extra_headers: Option, value: &T, ) -> impl Future< Item = ClientResponse>, Error = SendRequestError, > { - let body = match serde_urlencoded::to_string(value) { - Ok(body) => body, - Err(e) => return Either::A(err(Error::from(e).into())), - }; - - // set content-type - if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/x-www-form-urlencoded") { - return Either::A(err(e.into())); - } - - Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body)))) + RequestSender::Rc(self.head.clone(), extra_headers) + .send_form(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) } /// Send a streaming body with optional extra headers. @@ -690,7 +646,8 @@ impl FrozenClientRequest { S: Stream + 'static, E: Into + 'static, { - self.send_body(extra_headers, Body::from_message(BodyStream::new(stream))) + RequestSender::Rc(self.head.clone(), extra_headers) + .send_stream(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), stream) } /// Send an empty body with optional extra headers. @@ -702,23 +659,189 @@ impl FrozenClientRequest { Item = ClientResponse>, Error = SendRequestError, > { - self.send_body(extra_headers, Body::Empty) + RequestSender::Rc(self.head.clone(), extra_headers) + .send(self.addr, self.response_decompress, self.timeout, self.config.as_ref()) + } +} + +#[derive(Debug, From)] +enum PrepForSendingError { + Url(InvalidUrl), + Http(HttpError), +} + +impl Into for PrepForSendingError { + fn into(self) -> FreezeRequestError { + match self { + PrepForSendingError::Url(e) => FreezeRequestError::Url(e), + PrepForSendingError::Http(e) => FreezeRequestError::Http(e), + } + } +} + +impl Into for PrepForSendingError { + fn into(self) -> SendRequestError { + match self { + PrepForSendingError::Url(e) => SendRequestError::Url(e), + PrepForSendingError::Http(e) => SendRequestError::Http(e), + } + } +} + +enum RequestSender { + Owned(RequestHead), + Rc(Rc, Option), +} + +impl RequestSender { + pub fn send_body( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + body: B, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > + where + B: Into, + { + let mut connector = config.connector.borrow_mut(); + + let fut = match self { + RequestSender::Owned(head) => connector.send_request(head, body.into(), addr), + RequestSender::Rc(head, extra_headers) => connector.send_request_extra(head, extra_headers, body.into(), addr), + }; + + let fut = fut + .map(move |res| { + res.map_body(|head, payload| { + if response_decompress { + Payload::Stream(Decoder::from_headers(payload, &head.headers)) + } else { + Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) + } + }) + }); + + // set request timeout + if let Some(timeout) = timeout.or_else(|| config.timeout.clone()) { + Either::A(Timeout::new(fut, timeout).map_err(|e| { + if let Some(e) = e.into_inner() { + e + } else { + SendRequestError::Timeout + } + })) + } else { + Either::B(fut) + } } - fn set_extra_header_if_none(&self, extra_headers: &mut Option, key: HeaderName, value: V) -> Result<(), HttpError> + pub fn send_json( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + let body = match serde_json::to_string(value) { + Ok(body) => body, + Err(e) => return Either::A(err(Error::from(e).into())), + }; + + if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") { + return Either::A(err(e.into())); + } + + Either::B(self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body)))) + } + + pub fn send_form( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + let body = match serde_urlencoded::to_string(value) { + Ok(body) => body, + Err(e) => return Either::A(err(Error::from(e).into())), + }; + + // set content-type + if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/x-www-form-urlencoded") { + return Either::A(err(e.into())); + } + + Either::B(self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body)))) + } + + pub fn send_stream( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + stream: S, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > + where + S: Stream + 'static, + E: Into + 'static, + { + self.send_body(addr, response_decompress, timeout, config, Body::from_message(BodyStream::new(stream))) + } + + pub fn send( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + ) -> impl Future< + Item = ClientResponse>, + Error = SendRequestError, + > { + self.send_body(addr, response_decompress, timeout, config, Body::Empty) + } + + fn set_header_if_none(&mut self, key: HeaderName, value: V) -> Result<(), HttpError> where V: IntoHeaderValue, { - if !self.head.headers.contains_key(&key) - && !extra_headers.iter().any(|h| h.contains_key(&key)) { - - match value.try_into(){ - Ok(v) => { - let h = extra_headers.get_or_insert(HeaderMap::new()); - h.insert(key, v) - }, - Err(e) => return Err(e.into()), - }; + match self { + RequestSender::Owned(head) => { + if !head.headers.contains_key(&key) { + match value.try_into() { + Ok(value) => head.headers.insert(key, value), + Err(e) => return Err(e.into()), + } + } + }, + RequestSender::Rc(head, extra_headers) => { + if !head.headers.contains_key(&key) && !extra_headers.iter().any(|h| h.contains_key(&key)) { + match value.try_into(){ + Ok(v) => { + let h = extra_headers.get_or_insert(HeaderMap::new()); + h.insert(key, v) + }, + Err(e) => return Err(e.into()), + }; + } + } } Ok(()) diff --git a/awc/src/ws.rs b/awc/src/ws.rs index dbfe9e7ab..72c9a38bc 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -287,7 +287,7 @@ impl WebsocketsRequest { .config .connector .borrow_mut() - .open_tunnel(Rc::new(head), None, self.addr) + .open_tunnel(head, self.addr) .from_err() .and_then(move |(head, framed)| { // verify response