Added freeze() method to ClientRequest which produces a 'read-only' copy of a request suitable for retrying the send operation

This commit is contained in:
Dmitry Pypin 2019-07-10 16:26:44 -07:00
parent f276db0693
commit 36636b71ea
4 changed files with 235 additions and 69 deletions

View File

@ -128,3 +128,23 @@ impl ResponseError for SendRequestError {
.into() .into()
} }
} }
/// A set of errors that can occur during freezing a request
#[derive(Debug, Display, From)]
pub enum FreezeRequestError {
/// Invalid URL
#[display(fmt = "Invalid URL: {}", _0)]
Url(InvalidUrl),
/// Http error
#[display(fmt = "{}", _0)]
Http(HttpError),
}
impl From<FreezeRequestError> for SendRequestError {
fn from(e: FreezeRequestError) -> Self {
match e {
FreezeRequestError::Url(e) => e.into(),
FreezeRequestError::Http(e) => e.into(),
}
}
}

View File

@ -10,7 +10,7 @@ mod pool;
pub use self::connection::Connection; pub use self::connection::Connection;
pub use self::connector::Connector; pub use self::connector::Connector;
pub use self::error::{ConnectError, InvalidUrl, SendRequestError}; pub use self::error::{ConnectError, InvalidUrl, SendRequestError, FreezeRequestError};
pub use self::pool::Protocol; pub use self::pool::Protocol;
#[derive(Clone)] #[derive(Clone)]

View File

@ -1,5 +1,5 @@
//! Http client errors //! Http client errors
pub use actix_http::client::{ConnectError, InvalidUrl, SendRequestError}; pub use actix_http::client::{ConnectError, InvalidUrl, SendRequestError, FreezeRequestError};
pub use actix_http::error::PayloadError; pub use actix_http::error::PayloadError;
pub use actix_http::ws::HandshakeError as WsHandshakeError; pub use actix_http::ws::HandshakeError as WsHandshakeError;
pub use actix_http::ws::ProtocolError as WsProtocolError; pub use actix_http::ws::ProtocolError as WsProtocolError;

View File

@ -22,7 +22,7 @@ use actix_http::http::{
}; };
use actix_http::{Error, Payload, RequestHead}; use actix_http::{Error, Payload, RequestHead};
use crate::error::{InvalidUrl, PayloadError, SendRequestError}; use crate::error::{InvalidUrl, PayloadError, SendRequestError, FreezeRequestError};
use crate::response::ClientResponse; use crate::response::ClientResponse;
use crate::ClientConfig; use crate::ClientConfig;
@ -373,34 +373,24 @@ impl ClientRequest {
} }
} }
/// Complete request construction and send body. pub fn freeze(mut self) -> Result<FrozenClientRequest, FreezeRequestError> {
pub fn send_body<B>( if let Some(e) = self.err {
mut self, return Err(e.into());
body: B,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
B: Into<Body>,
{
if let Some(e) = self.err.take() {
return Either::A(err(e.into()));
} }
// validate uri // validate uri
let uri = &self.head.uri; let uri = &self.head.uri;
if uri.host().is_none() { if uri.host().is_none() {
return Either::A(err(InvalidUrl::MissingHost.into())); return Err(InvalidUrl::MissingHost.into());
} else if uri.scheme_part().is_none() { } else if uri.scheme_part().is_none() {
return Either::A(err(InvalidUrl::MissingScheme.into())); return Err(InvalidUrl::MissingScheme.into());
} else if let Some(scheme) = uri.scheme_part() { } else if let Some(scheme) = uri.scheme_part() {
match scheme.as_str() { match scheme.as_str() {
"http" | "ws" | "https" | "wss" => (), "http" | "ws" | "https" | "wss" => (),
_ => return Either::A(err(InvalidUrl::UnknownScheme.into())), _ => return Err(InvalidUrl::UnknownScheme.into()),
} }
} else { } else {
return Either::A(err(InvalidUrl::UnknownScheme.into())); return Err(InvalidUrl::UnknownScheme.into());
} }
// set cookies // set cookies
@ -421,9 +411,9 @@ impl ClientRequest {
// enable br only for https // enable br only for https
#[cfg(any( #[cfg(any(
feature = "brotli", feature = "brotli",
feature = "flate2-zlib", feature = "flate2-zlib",
feature = "flate2-rust" feature = "flate2-rust"
))] ))]
{ {
if slf.response_decompress { if slf.response_decompress {
@ -438,44 +428,42 @@ impl ClientRequest {
slf = slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING) slf = slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING)
} else { } else {
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
{ {
slf = slf slf = slf
.set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") .set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate")
} }
}; };
} }
} }
let head = slf.head; let request = FrozenClientRequest {
let config = slf.config.as_ref(); head: Rc::new(slf.head),
let response_decompress = slf.response_decompress; addr: slf.addr,
response_decompress: slf.response_decompress,
timeout: slf.timeout,
config: slf.config,
};
let fut = config Ok(request)
.connector }
.borrow_mut()
.send_request(Rc::new(head), None, body.into(), slf.addr)
.map(move |res| {
res.map_body(|head, payload| {
if response_decompress {
Payload::Stream(Decoder::from_headers(payload, &head.headers))
} else {
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
}
})
});
// set request timeout /// Complete request construction and send body.
if let Some(timeout) = slf.timeout.or_else(|| config.timeout.clone()) { pub fn send_body<B>(
Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| { self,
if let Some(e) = e.into_inner() { body: B,
e ) -> impl Future<
} else { Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
SendRequestError::Timeout Error = SendRequestError,
} >
}))) where
} else { B: Into<Body>,
Either::B(Either::B(fut)) {
} let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_body(None, body))
} }
/// Set a JSON body and generate `ClientRequest` /// Set a JSON body and generate `ClientRequest`
@ -486,15 +474,15 @@ impl ClientRequest {
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>, Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError, Error = SendRequestError,
> { > {
let body = match serde_json::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type // set content-type
let slf = self.set_header_if_none(header::CONTENT_TYPE, "application/json"); let slf = self.set_header_if_none(header::CONTENT_TYPE, "application/json");
Either::B(slf.send_body(Body::Bytes(Bytes::from(body)))) let frozen_request = match slf.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_json(None, value))
} }
/// Set a urlencoded body and generate `ClientRequest` /// Set a urlencoded body and generate `ClientRequest`
@ -507,18 +495,18 @@ impl ClientRequest {
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>, Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError, Error = SendRequestError,
> { > {
let body = match serde_urlencoded::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type // set content-type
let slf = self.set_header_if_none( let slf = self.set_header_if_none(
header::CONTENT_TYPE, header::CONTENT_TYPE,
"application/x-www-form-urlencoded", "application/x-www-form-urlencoded",
); );
Either::B(slf.send_body(Body::Bytes(Bytes::from(body)))) let frozen_request = match slf.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_form(None, value))
} }
/// Set an streaming body and generate `ClientRequest`. /// Set an streaming body and generate `ClientRequest`.
@ -533,7 +521,12 @@ impl ClientRequest {
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static, E: Into<Error> + 'static,
{ {
self.send_body(Body::from_message(BodyStream::new(stream))) let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send_stream(None, stream))
} }
/// Set an empty body and generate `ClientRequest`. /// Set an empty body and generate `ClientRequest`.
@ -543,7 +536,12 @@ impl ClientRequest {
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>, Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError, Error = SendRequestError,
> { > {
self.send_body(Body::Empty) let frozen_request = match self.freeze() {
Ok(r) => r,
Err(e) => return Either::A(err(e.into())),
};
Either::B(frozen_request.send(None))
} }
} }
@ -562,6 +560,154 @@ impl fmt::Debug for ClientRequest {
} }
} }
pub struct FrozenClientRequest {
head: Rc<RequestHead>,
addr: Option<net::SocketAddr>,
response_decompress: bool,
timeout: Option<Duration>,
config: Rc<ClientConfig>,
}
impl FrozenClientRequest {
/// Send body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_body<B>(
&self,
extra_headers: Option<HeaderMap>,
body: B,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
B: Into<Body>,
{
let config = self.config.as_ref();
let response_decompress = self.response_decompress;
let fut = config
.connector
.borrow_mut()
.send_request(self.head.clone(), extra_headers, body.into(), self.addr)
.map(move |res| {
res.map_body(|head, payload| {
if response_decompress {
Payload::Stream(Decoder::from_headers(payload, &head.headers))
} else {
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
}
})
});
// set request timeout
if let Some(timeout) = self.timeout.or_else(|| config.timeout.clone()) {
Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout
}
}))
} else {
Either::B(fut)
}
}
/// Send a JSON body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_json<T: Serialize>(
&self,
mut extra_headers: Option<HeaderMap>,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_json::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type
if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/json") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body))))
}
/// Send a urlencoded body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_form<T: Serialize>(
&self,
mut extra_headers: Option<HeaderMap>,
value: &T,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
let body = match serde_urlencoded::to_string(value) {
Ok(body) => body,
Err(e) => return Either::A(err(Error::from(e).into())),
};
// set content-type
if let Err(e) = self.set_extra_header_if_none(&mut extra_headers, header::CONTENT_TYPE, "application/x-www-form-urlencoded") {
return Either::A(err(e.into()));
}
Either::B(self.send_body(extra_headers, Body::Bytes(Bytes::from(body))))
}
/// Send a streaming body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send_stream<S, E>(
&self,
extra_headers: Option<HeaderMap>,
stream: S,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
>
where
S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error> + 'static,
{
self.send_body(extra_headers, Body::from_message(BodyStream::new(stream)))
}
/// Send an empty body with optional extra headers.
/// Extra headers will override corresponding existing headers in a frozen request.
pub fn send(
&self,
extra_headers: Option<HeaderMap>,
) -> impl Future<
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
Error = SendRequestError,
> {
self.send_body(extra_headers, Body::Empty)
}
fn set_extra_header_if_none<V>(&self, extra_headers: &mut Option<HeaderMap>, key: HeaderName, value: V) -> Result<(), HttpError>
where
V: IntoHeaderValue,
{
if !self.head.headers.contains_key(&key)
&& !extra_headers.iter().any(|h| h.contains_key(&key)) {
match value.try_into(){
Ok(v) => {
let h = extra_headers.get_or_insert(HeaderMap::new());
h.insert(key, v)
},
Err(e) => return Err(e.into()),
};
}
Ok(())
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::time::SystemTime; use std::time::SystemTime;