From 1296e07c4830f0ab2e2864a6fc9faa93972e5935 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 24 Dec 2021 17:47:47 +0000 Subject: [PATCH 1/4] relax unpin bounds on payload types (#2545) --- actix-http/CHANGES.md | 9 ++++ actix-http/src/encoding/decoder.rs | 39 ++++++++------- actix-http/src/h1/dispatcher.rs | 7 +-- actix-http/src/h1/payload.rs | 76 +++++++++++++++++------------- actix-http/src/h1/utils.rs | 4 +- actix-http/src/h2/dispatcher.rs | 4 +- actix-http/src/h2/mod.rs | 11 +++++ actix-http/src/lib.rs | 3 +- actix-http/src/payload.rs | 73 +++++++++++++++++----------- actix-http/src/requests/request.rs | 11 +++-- actix-http/src/test.rs | 2 +- actix-http/tests/test_rustls.rs | 33 +++++++++---- actix-multipart/src/server.rs | 2 +- awc/src/client/connection.rs | 4 +- awc/src/client/h1proto.rs | 16 +++++-- awc/src/response.rs | 6 +-- awc/src/sender.rs | 15 +++--- awc/src/test.rs | 5 +- src/dev.rs | 2 +- src/response/builder.rs | 36 ++++++-------- src/service.rs | 4 +- src/test/test_request.rs | 21 +++++---- 22 files changed, 229 insertions(+), 154 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 3b45e934f..adc4c35c7 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,8 +3,17 @@ ## Unreleased - 2021-xx-xx ### Changes - `HeaderMap::get_all` now returns a `std::slice::Iter`. [#2527] +- `Payload` inner fields are now named. [#2545] +- `impl Stream` for `Payload` no longer requires the `Stream` variant be `Unpin`. [#2545] +- `impl Future` for `h1::SendResponse` no longer requires the body type be `Unpin`. [#2545] +- `impl Stream` for `encoding::Decoder` no longer requires the stream type be `Unpin`. [#2545] +- Rename `PayloadStream` to `BoxedPayloadStream`. [#2545] + +### Removed +- `h1::Payload::readany`. [#2545] [#2527]: https://github.com/actix/actix-web/pull/2527 +[#2545]: https://github.com/actix/actix-web/pull/2545 ## 3.0.0-beta.16 - 2021-12-17 diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index a46e330c9..0f519637a 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -28,11 +28,14 @@ use crate::{ const MAX_CHUNK_SIZE_DECODE_IN_PLACE: usize = 2049; -pub struct Decoder { - decoder: Option, - stream: S, - eof: bool, - fut: Option, ContentDecoder), io::Error>>>, +pin_project_lite::pin_project! { + pub struct Decoder { + decoder: Option, + #[pin] + stream: S, + eof: bool, + fut: Option, ContentDecoder), io::Error>>>, + } } impl Decoder @@ -89,42 +92,44 @@ where impl Stream for Decoder where - S: Stream> + Unpin, + S: Stream>, { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { - if let Some(ref mut fut) = self.fut { + if let Some(ref mut fut) = this.fut { let (chunk, decoder) = ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; - self.decoder = Some(decoder); - self.fut.take(); + *this.decoder = Some(decoder); + this.fut.take(); if let Some(chunk) = chunk { return Poll::Ready(Some(Ok(chunk))); } } - if self.eof { + if *this.eof { return Poll::Ready(None); } - match ready!(Pin::new(&mut self.stream).poll_next(cx)) { + match ready!(this.stream.as_mut().poll_next(cx)) { Some(Err(err)) => return Poll::Ready(Some(Err(err))), Some(Ok(chunk)) => { - if let Some(mut decoder) = self.decoder.take() { + if let Some(mut decoder) = this.decoder.take() { if chunk.len() < MAX_CHUNK_SIZE_DECODE_IN_PLACE { let chunk = decoder.feed_data(chunk)?; - self.decoder = Some(decoder); + *this.decoder = Some(decoder); if let Some(chunk) = chunk { return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(spawn_blocking(move || { + *this.fut = Some(spawn_blocking(move || { let chunk = decoder.feed_data(chunk)?; Ok((chunk, decoder)) })); @@ -137,9 +142,9 @@ where } None => { - self.eof = true; + *this.eof = true; - return if let Some(mut decoder) = self.decoder.take() { + return if let Some(mut decoder) = this.decoder.take() { match decoder.feed_eof() { Ok(Some(res)) => Poll::Ready(Some(Ok(res))), Ok(None) => Poll::Ready(None), diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5c0cb64af..13055f08a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -646,10 +646,11 @@ where Payload is attached to Request and passed to Service::call where the state can be collected and consumed. */ - let (ps, pl) = Payload::create(false); - let (req1, _) = req.replace_payload(crate::Payload::H1(pl)); + let (sender, payload) = Payload::create(false); + let (req1, _) = + req.replace_payload(crate::Payload::H1 { payload }); req = req1; - *this.payload = Some(ps); + *this.payload = Some(sender); } // Request has no payload. diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index f912e0ba3..4d031c15a 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -1,9 +1,12 @@ //! Payload stream -use std::cell::RefCell; -use std::collections::VecDeque; -use std::pin::Pin; -use std::rc::{Rc, Weak}; -use std::task::{Context, Poll, Waker}; + +use std::{ + cell::RefCell, + collections::VecDeque, + pin::Pin, + rc::{Rc, Weak}, + task::{Context, Poll, Waker}, +}; use bytes::Bytes; use futures_core::Stream; @@ -22,39 +25,32 @@ pub enum PayloadStatus { /// Buffered stream of bytes chunks /// -/// Payload stores chunks in a vector. First chunk can be received with -/// `.readany()` method. Payload stream is not thread safe. Payload does not -/// notify current task when new data is available. +/// Payload stores chunks in a vector. First chunk can be received with `poll_next`. Payload does +/// not notify current task when new data is available. /// -/// Payload stream can be used as `Response` body stream. +/// Payload can be used as `Response` body stream. #[derive(Debug)] pub struct Payload { inner: Rc>, } impl Payload { - /// Create payload stream. + /// Creates a payload stream. /// - /// This method construct two objects responsible for bytes stream - /// generation. - /// - /// * `PayloadSender` - *Sender* side of the stream - /// - /// * `Payload` - *Receiver* side of the stream + /// This method construct two objects responsible for bytes stream generation: + /// - `PayloadSender` - *Sender* side of the stream + /// - `Payload` - *Receiver* side of the stream pub fn create(eof: bool) -> (PayloadSender, Payload) { let shared = Rc::new(RefCell::new(Inner::new(eof))); ( - PayloadSender { - inner: Rc::downgrade(&shared), - }, + PayloadSender::new(Rc::downgrade(&shared)), Payload { inner: shared }, ) } - /// Create empty payload - #[doc(hidden)] - pub fn empty() -> Payload { + /// Creates an empty payload. + pub(crate) fn empty() -> Payload { Payload { inner: Rc::new(RefCell::new(Inner::new(true))), } @@ -77,14 +73,6 @@ impl Payload { pub fn unread_data(&mut self, data: Bytes) { self.inner.borrow_mut().unread_data(data); } - - #[inline] - pub fn readany( - &mut self, - cx: &mut Context<'_>, - ) -> Poll>> { - self.inner.borrow_mut().readany(cx) - } } impl Stream for Payload { @@ -94,7 +82,7 @@ impl Stream for Payload { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.inner.borrow_mut().readany(cx) + Pin::new(&mut *self.inner.borrow_mut()).poll_next(cx) } } @@ -104,6 +92,10 @@ pub struct PayloadSender { } impl PayloadSender { + fn new(inner: Weak>) -> Self { + Self { inner } + } + #[inline] pub fn set_error(&mut self, err: PayloadError) { if let Some(shared) = self.inner.upgrade() { @@ -227,7 +219,10 @@ impl Inner { self.len } - fn readany(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); self.need_read = self.len < MAX_BUFFER_SIZE; @@ -257,8 +252,18 @@ impl Inner { #[cfg(test)] mod tests { - use super::*; + use std::panic::{RefUnwindSafe, UnwindSafe}; + use actix_utils::future::poll_fn; + use static_assertions::{assert_impl_all, assert_not_impl_any}; + + use super::*; + + assert_impl_all!(Payload: Unpin); + assert_not_impl_any!(Payload: Send, Sync, UnwindSafe, RefUnwindSafe); + + assert_impl_all!(Inner: Unpin, Send, Sync); + assert_not_impl_any!(Inner: UnwindSafe, RefUnwindSafe); #[actix_rt::test] async fn test_unread_data() { @@ -270,7 +275,10 @@ mod tests { assert_eq!( Bytes::from("data"), - poll_fn(|cx| payload.readany(cx)).await.unwrap().unwrap() + poll_fn(|cx| Pin::new(&mut payload).poll_next(cx)) + .await + .unwrap() + .unwrap() ); } } diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 131c7f1ed..5c11b1dab 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -45,7 +45,7 @@ where impl Future for SendResponse where T: AsyncRead + AsyncWrite + Unpin, - B: MessageBody + Unpin, + B: MessageBody, B::Error: Into, { type Output = Result, Error>; @@ -81,7 +81,7 @@ where // body is done when item is None body_done = item.is_none(); if body_done { - let _ = this.body.take(); + this.body.set(None); } let framed = this.framed.as_mut().as_pin_mut().unwrap(); framed diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 8fbefe6de..7f0f15ee6 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -108,8 +108,8 @@ where match Pin::new(&mut this.connection).poll_accept(cx)? { Poll::Ready(Some((req, tx))) => { let (parts, body) = req.into_parts(); - let pl = crate::h2::Payload::new(body); - let pl = Payload::H2(pl); + let payload = crate::h2::Payload::new(body); + let pl = Payload::H2 { payload }; let mut req = Request::with_payload(pl); let head = req.head_mut(); diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index cbcb6d0fc..47d51b420 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -98,3 +98,14 @@ where } } } + +#[cfg(test)] +mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; + + use static_assertions::assert_impl_all; + + use super::*; + + assert_impl_all!(Payload: Unpin, Send, Sync, UnwindSafe, RefUnwindSafe); +} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 2b7bc730b..f2b415790 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -58,7 +58,8 @@ pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; pub use self::message::ConnectionType; pub use self::message::Message; -pub use self::payload::{Payload, PayloadStream}; +#[allow(deprecated)] +pub use self::payload::{BoxedPayloadStream, Payload, PayloadStream}; pub use self::requests::{Request, RequestHead, RequestHeadType}; pub use self::responses::{Response, ResponseBuilder, ResponseHead}; pub use self::service::HttpService; diff --git a/actix-http/src/payload.rs b/actix-http/src/payload.rs index 69840e7c1..c9f338c7d 100644 --- a/actix-http/src/payload.rs +++ b/actix-http/src/payload.rs @@ -1,70 +1,89 @@ use std::{ + mem, pin::Pin, task::{Context, Poll}, }; use bytes::Bytes; use futures_core::Stream; -use h2::RecvStream; use crate::error::PayloadError; -// TODO: rename to boxed payload -/// A boxed payload. -pub type PayloadStream = Pin>>>; +/// A boxed payload stream. +pub type BoxedPayloadStream = Pin>>>; -/// A streaming payload. -pub enum Payload { - None, - H1(crate::h1::Payload), - H2(crate::h2::Payload), - Stream(S), +#[deprecated(since = "4.0.0", note = "Renamed to `BoxedPayloadStream`.")] +pub type PayloadStream = BoxedPayloadStream; + +pin_project_lite::pin_project! { + /// A streaming payload. + #[project = PayloadProj] + pub enum Payload { + None, + H1 { payload: crate::h1::Payload }, + H2 { payload: crate::h2::Payload }, + Stream { #[pin] payload: S }, + } } impl From for Payload { - fn from(v: crate::h1::Payload) -> Self { - Payload::H1(v) + fn from(payload: crate::h1::Payload) -> Self { + Payload::H1 { payload } } } impl From for Payload { - fn from(v: crate::h2::Payload) -> Self { - Payload::H2(v) + fn from(payload: crate::h2::Payload) -> Self { + Payload::H2 { payload } } } -impl From for Payload { - fn from(v: RecvStream) -> Self { - Payload::H2(crate::h2::Payload::new(v)) +impl From for Payload { + fn from(stream: h2::RecvStream) -> Self { + Payload::H2 { + payload: crate::h2::Payload::new(stream), + } } } -impl From for Payload { - fn from(pl: PayloadStream) -> Self { - Payload::Stream(pl) +impl From for Payload { + fn from(payload: BoxedPayloadStream) -> Self { + Payload::Stream { payload } } } impl Payload { /// Takes current payload and replaces it with `None` value pub fn take(&mut self) -> Payload { - std::mem::replace(self, Payload::None) + mem::replace(self, Payload::None) } } impl Stream for Payload where - S: Stream> + Unpin, + S: Stream>, { type Item = Result; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.get_mut() { - Payload::None => Poll::Ready(None), - Payload::H1(ref mut pl) => pl.readany(cx), - Payload::H2(ref mut pl) => Pin::new(pl).poll_next(cx), - Payload::Stream(ref mut pl) => Pin::new(pl).poll_next(cx), + match self.project() { + PayloadProj::None => Poll::Ready(None), + PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx), + PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx), + PayloadProj::Stream { payload } => payload.poll_next(cx), } } } + +#[cfg(test)] +mod tests { + use std::panic::{RefUnwindSafe, UnwindSafe}; + + use static_assertions::{assert_impl_all, assert_not_impl_any}; + + use super::*; + + assert_impl_all!(Payload: Unpin); + assert_not_impl_any!(Payload: Send, Sync, UnwindSafe, RefUnwindSafe); +} diff --git a/actix-http/src/requests/request.rs b/actix-http/src/requests/request.rs index 0254a8f11..4eaaba8e1 100644 --- a/actix-http/src/requests/request.rs +++ b/actix-http/src/requests/request.rs @@ -10,11 +10,12 @@ use std::{ use http::{header, Method, Uri, Version}; use crate::{ - header::HeaderMap, Extensions, HttpMessage, Message, Payload, PayloadStream, RequestHead, + header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Message, Payload, + RequestHead, }; /// An HTTP request. -pub struct Request

{ +pub struct Request

{ pub(crate) payload: Payload

, pub(crate) head: Message, pub(crate) conn_data: Option>, @@ -46,7 +47,7 @@ impl

HttpMessage for Request

{ } } -impl From> for Request { +impl From> for Request { fn from(head: Message) -> Self { Request { head, @@ -57,10 +58,10 @@ impl From> for Request { } } -impl Request { +impl Request { /// Create new Request instance #[allow(clippy::new_without_default)] - pub fn new() -> Request { + pub fn new() -> Request { Request { head: Message::new(), payload: Payload::None, diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index ea80345fe..1f76498ef 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -120,7 +120,7 @@ impl TestRequest { } /// Set request payload. - pub fn set_payload>(&mut self, data: B) -> &mut Self { + pub fn set_payload(&mut self, data: impl Into) -> &mut Self { let mut payload = crate::h1::Payload::empty(); payload.unread_data(data.into()); parts(&mut self.0).payload = Some(payload.into()); diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index 42ff0dba1..51fefae72 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -7,6 +7,7 @@ use std::{ io::{self, BufReader, Write}, net::{SocketAddr, TcpStream as StdTcpStream}, sync::Arc, + task::Poll, }; use actix_http::{ @@ -16,25 +17,37 @@ use actix_http::{ Error, HttpService, Method, Request, Response, StatusCode, Version, }; use actix_http_test::test_server; +use actix_rt::pin; use actix_service::{fn_factory_with_config, fn_service}; use actix_tls::connect::rustls::webpki_roots_cert_store; -use actix_utils::future::{err, ok}; +use actix_utils::future::{err, ok, poll_fn}; use bytes::{Bytes, BytesMut}; use derive_more::{Display, Error}; -use futures_core::Stream; -use futures_util::stream::{once, StreamExt as _}; +use futures_core::{ready, Stream}; +use futures_util::stream::once; use rustls::{Certificate, PrivateKey, ServerConfig as RustlsServerConfig, ServerName}; use rustls_pemfile::{certs, pkcs8_private_keys}; -async fn load_body(mut stream: S) -> Result +async fn load_body(stream: S) -> Result where - S: Stream> + Unpin, + S: Stream>, { - let mut body = BytesMut::new(); - while let Some(item) = stream.next().await { - body.extend_from_slice(&item?) - } - Ok(body) + let mut buf = BytesMut::new(); + + pin!(stream); + + poll_fn(|cx| loop { + let body = stream.as_mut(); + + match ready!(body.poll_next(cx)) { + Some(Ok(bytes)) => buf.extend_from_slice(&*bytes), + None => return Poll::Ready(Ok(())), + Some(Err(err)) => return Poll::Ready(Err(err)), + } + }) + .await?; + + Ok(buf) } fn tls_config() -> RustlsServerConfig { diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 8eabcee10..239f7f905 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -1233,7 +1233,7 @@ mod tests { // and should not consume the payload match payload { - actix_web::dev::Payload::H1(_) => {} //expected + actix_web::dev::Payload::H1 { .. } => {} //expected _ => unreachable!(), } } diff --git a/awc/src/client/connection.rs b/awc/src/client/connection.rs index 0e1f0bfec..456f119aa 100644 --- a/awc/src/client/connection.rs +++ b/awc/src/client/connection.rs @@ -267,7 +267,9 @@ where Connection::Tls(ConnectionType::H2(conn)) => { h2proto::send_request(conn, head.into(), body).await } - _ => unreachable!("Plain Tcp connection can be used only in Http1 protocol"), + _ => { + unreachable!("Plain TCP connection can be used only with HTTP/1.1 protocol") + } } }) } diff --git a/awc/src/client/h1proto.rs b/awc/src/client/h1proto.rs index 1028a2178..cf716db72 100644 --- a/awc/src/client/h1proto.rs +++ b/awc/src/client/h1proto.rs @@ -13,16 +13,17 @@ use actix_http::{ Payload, RequestHeadType, ResponseHead, StatusCode, }; use actix_utils::future::poll_fn; -use bytes::buf::BufMut; -use bytes::{Bytes, BytesMut}; +use bytes::{buf::BufMut, Bytes, BytesMut}; use futures_core::{ready, Stream}; use futures_util::SinkExt as _; use pin_project_lite::pin_project; use crate::BoxError; -use super::connection::{ConnectionIo, H1Connection}; -use super::error::{ConnectError, SendRequestError}; +use super::{ + connection::{ConnectionIo, H1Connection}, + error::{ConnectError, SendRequestError}, +}; pub(crate) async fn send_request( io: H1Connection, @@ -123,7 +124,12 @@ where Ok((head, Payload::None)) } - _ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))), + _ => Ok(( + head, + Payload::Stream { + payload: Box::pin(PlStream::new(framed)), + }, + )), } } diff --git a/awc/src/response.rs b/awc/src/response.rs index fefebd0a0..78cc339b4 100644 --- a/awc/src/response.rs +++ b/awc/src/response.rs @@ -10,8 +10,8 @@ use std::{ }; use actix_http::{ - error::PayloadError, header, header::HeaderMap, Extensions, HttpMessage, Payload, - PayloadStream, ResponseHead, StatusCode, Version, + error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, + HttpMessage, Payload, ResponseHead, StatusCode, Version, }; use actix_rt::time::{sleep, Sleep}; use bytes::{Bytes, BytesMut}; @@ -23,7 +23,7 @@ use crate::cookie::{Cookie, ParseError as CookieParseError}; use crate::error::JsonPayloadError; /// Client Response -pub struct ClientResponse { +pub struct ClientResponse { pub(crate) head: ResponseHead, pub(crate) payload: Payload, pub(crate) timeout: ResponseTimeout, diff --git a/awc/src/sender.rs b/awc/src/sender.rs index f83a70a9b..29c814531 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -20,7 +20,7 @@ use futures_core::Stream; use serde::Serialize; #[cfg(feature = "__compress")] -use actix_http::{encoding::Decoder, header::ContentEncoding, Payload, PayloadStream}; +use actix_http::{encoding::Decoder, header::ContentEncoding, Payload}; use crate::{ any_body::AnyBody, @@ -91,7 +91,7 @@ impl SendClientRequest { #[cfg(feature = "__compress")] impl Future for SendClientRequest { - type Output = Result>>, SendRequestError>; + type Output = Result>, SendRequestError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); @@ -108,12 +108,13 @@ impl Future for SendClientRequest { res.into_client_response()._timeout(delay.take()).map_body( |head, payload| { if *response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) + Payload::Stream { + payload: Decoder::from_headers(payload, &head.headers), + } } else { - Payload::Stream(Decoder::new( - payload, - ContentEncoding::Identity, - )) + Payload::Stream { + payload: Decoder::new(payload, ContentEncoding::Identity), + } } }, ) diff --git a/awc/src/test.rs b/awc/src/test.rs index 1b41efc93..96ae1f0a1 100644 --- a/awc/src/test.rs +++ b/awc/src/test.rs @@ -65,7 +65,7 @@ impl TestResponse { /// Set response's payload pub fn set_payload>(mut self, data: B) -> Self { - let mut payload = h1::Payload::empty(); + let (_, mut payload) = h1::Payload::create(true); payload.unread_data(data.into()); self.payload = Some(payload.into()); self @@ -90,7 +90,8 @@ impl TestResponse { if let Some(pl) = self.payload { ClientResponse::new(head, pl) } else { - ClientResponse::new(head, h1::Payload::empty().into()) + let (_, payload) = h1::Payload::create(true); + ClientResponse::new(head, payload.into()) } } } diff --git a/src/dev.rs b/src/dev.rs index 23a40f292..6e1970467 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -14,7 +14,7 @@ pub use crate::types::form::UrlEncoded; pub use crate::types::json::JsonBody; pub use crate::types::readlines::Readlines; -pub use actix_http::{Extensions, Payload, PayloadStream, RequestHead, Response, ResponseHead}; +pub use actix_http::{Extensions, Payload, RequestHead, Response, ResponseHead}; pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; pub use actix_server::{Server, ServerHandle}; pub use actix_service::{ diff --git a/src/response/builder.rs b/src/response/builder.rs index b500ab331..93d8ab567 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -429,9 +429,12 @@ mod tests { use actix_http::body; use super::*; - use crate::http::{ - header::{self, HeaderValue, CONTENT_TYPE}, - StatusCode, + use crate::{ + http::{ + header::{self, HeaderValue, CONTENT_TYPE}, + StatusCode, + }, + test::assert_body_eq, }; #[test] @@ -472,32 +475,23 @@ mod tests { #[actix_rt::test] async fn test_json() { - let resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let res = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); - let resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let res = HttpResponse::Ok().json(&["v1", "v2", "v3"]); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); // content type override - let resp = HttpResponse::Ok() + let res = HttpResponse::Ok() .insert_header((CONTENT_TYPE, "text/json")) .json(&vec!["v1", "v2", "v3"]); - let ct = resp.headers().get(CONTENT_TYPE).unwrap(); + let ct = res.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("text/json")); - assert_eq!( - body::to_bytes(resp.into_body()).await.unwrap().as_ref(), - br#"["v1","v2","v3"]"# - ); + assert_body_eq!(res, br#"["v1","v2","v3"]"#); } #[actix_rt::test] diff --git a/src/service.rs b/src/service.rs index 9ccf5274d..d5c381fa4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -7,7 +7,7 @@ use std::{ use actix_http::{ body::{BoxBody, EitherBody, MessageBody}, header::HeaderMap, - Extensions, HttpMessage, Method, Payload, PayloadStream, RequestHead, Response, + BoxedPayloadStream, Extensions, HttpMessage, Method, Payload, RequestHead, Response, ResponseHead, StatusCode, Uri, Version, }; use actix_router::{IntoPatterns, Path, Patterns, Resource, ResourceDef, Url}; @@ -293,7 +293,7 @@ impl Resource for ServiceRequest { } impl HttpMessage for ServiceRequest { - type Stream = PayloadStream; + type Stream = BoxedPayloadStream; #[inline] /// Returns Request's headers. diff --git a/src/test/test_request.rs b/src/test/test_request.rs index fd3355ef3..5c4de9084 100644 --- a/src/test/test_request.rs +++ b/src/test/test_request.rs @@ -174,25 +174,28 @@ impl TestRequest { } /// Set request payload. - pub fn set_payload>(mut self, data: B) -> Self { + pub fn set_payload(mut self, data: impl Into) -> Self { self.req.set_payload(data); self } - /// Serialize `data` to a URL encoded form and set it as the request payload. The `Content-Type` - /// header is set to `application/x-www-form-urlencoded`. - pub fn set_form(mut self, data: &T) -> Self { - let bytes = serde_urlencoded::to_string(data) + /// Serialize `data` to a URL encoded form and set it as the request payload. + /// + /// The `Content-Type` header is set to `application/x-www-form-urlencoded`. + pub fn set_form(mut self, data: impl Serialize) -> Self { + let bytes = serde_urlencoded::to_string(&data) .expect("Failed to serialize test data as a urlencoded form"); self.req.set_payload(bytes); self.req.insert_header(ContentType::form_url_encoded()); self } - /// Serialize `data` to JSON and set it as the request payload. The `Content-Type` header is - /// set to `application/json`. - pub fn set_json(mut self, data: &T) -> Self { - let bytes = serde_json::to_string(data).expect("Failed to serialize test data to json"); + /// Serialize `data` to JSON and set it as the request payload. + /// + /// The `Content-Type` header is set to `application/json`. + pub fn set_json(mut self, data: impl Serialize) -> Self { + let bytes = + serde_json::to_string(&data).expect("Failed to serialize test data to json"); self.req.set_payload(bytes); self.req.insert_header(ContentType::json()); self From d2590fd46cbab9cf96b3e6864430f675f4512835 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 25 Dec 2021 02:33:37 +0000 Subject: [PATCH 2/4] `ClientRequest::send_body` takes `impl MessageBody` (#2546) --- .github/workflows/ci-master.yml | 66 ++++ .github/workflows/ci.yml | 62 ---- awc/CHANGES.md | 6 + awc/src/any_body.rs | 19 +- awc/src/connect.rs | 2 +- awc/src/frozen.rs | 6 +- awc/src/lib.rs | 5 +- awc/src/middleware/redirect.rs | 4 +- awc/src/request.rs | 74 ++-- awc/src/response.rs | 556 ----------------------------- awc/src/responses/json_body.rs | 192 ++++++++++ awc/src/responses/mod.rs | 49 +++ awc/src/responses/read_body.rs | 61 ++++ awc/src/responses/response.rs | 257 +++++++++++++ awc/src/responses/response_body.rs | 144 ++++++++ awc/src/sender.rs | 22 +- awc/src/ws.rs | 3 +- src/guard.rs | 12 +- 18 files changed, 853 insertions(+), 687 deletions(-) create mode 100644 .github/workflows/ci-master.yml delete mode 100644 awc/src/response.rs create mode 100644 awc/src/responses/json_body.rs create mode 100644 awc/src/responses/mod.rs create mode 100644 awc/src/responses/read_body.rs create mode 100644 awc/src/responses/response.rs create mode 100644 awc/src/responses/response_body.rs diff --git a/.github/workflows/ci-master.yml b/.github/workflows/ci-master.yml new file mode 100644 index 000000000..548ec21b7 --- /dev/null +++ b/.github/workflows/ci-master.yml @@ -0,0 +1,66 @@ +name: CI (master only) + +on: + push: + branches: [master] + +jobs: + ci_feature_powerset_check: + name: Verify Feature Combinations + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install stable + uses: actions-rs/toolchain@v1 + with: + toolchain: stable-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Generate Cargo.lock + uses: actions-rs/cargo@v1 + with: { command: generate-lockfile } + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1.2.0 + + - name: Install cargo-hack + uses: actions-rs/cargo@v1 + with: + command: install + args: cargo-hack + + - name: check feature combinations + uses: actions-rs/cargo@v1 + with: { command: ci-check-all-feature-powerset } + + - name: check feature combinations + uses: actions-rs/cargo@v1 + with: { command: ci-check-all-feature-powerset-linux } + + coverage: + name: coverage + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install stable + uses: actions-rs/toolchain@v1 + with: + toolchain: stable-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Generate Cargo.lock + uses: actions-rs/cargo@v1 + with: { command: generate-lockfile } + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1.2.0 + + - name: Generate coverage file + run: | + cargo install cargo-tarpaulin --vers "^0.13" + cargo tarpaulin --workspace --features=rustls,openssl --out Xml --verbose + - name: Upload to Codecov + uses: codecov/codecov-action@v1 + with: { file: cobertura.xml } diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9b98a7b8..fe464bf27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,68 +96,6 @@ jobs: cargo install cargo-cache --version 0.6.3 --no-default-features --features ci-autoclean cargo-cache - ci_feature_powerset_check: - name: Verify Feature Combinations - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Install stable - uses: actions-rs/toolchain@v1 - with: - toolchain: stable-x86_64-unknown-linux-gnu - profile: minimal - override: true - - - name: Generate Cargo.lock - uses: actions-rs/cargo@v1 - with: { command: generate-lockfile } - - name: Cache Dependencies - uses: Swatinem/rust-cache@v1.2.0 - - - name: Install cargo-hack - uses: actions-rs/cargo@v1 - with: - command: install - args: cargo-hack - - - name: check feature combinations - uses: actions-rs/cargo@v1 - with: { command: ci-check-all-feature-powerset } - - - name: check feature combinations - uses: actions-rs/cargo@v1 - with: { command: ci-check-all-feature-powerset-linux } - - coverage: - name: coverage - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Install stable - uses: actions-rs/toolchain@v1 - with: - toolchain: stable-x86_64-unknown-linux-gnu - profile: minimal - override: true - - - name: Generate Cargo.lock - uses: actions-rs/cargo@v1 - with: { command: generate-lockfile } - - name: Cache Dependencies - uses: Swatinem/rust-cache@v1.2.0 - - - name: Generate coverage file - if: github.ref == 'refs/heads/master' - run: | - cargo install cargo-tarpaulin --vers "^0.13" - cargo tarpaulin --workspace --features=rustls,openssl --out Xml --verbose - - name: Upload to Codecov - if: github.ref == 'refs/heads/master' - uses: codecov/codecov-action@v1 - with: { file: cobertura.xml } - rustdoc: name: doc tests runs-on: ubuntu-latest diff --git a/awc/CHANGES.md b/awc/CHANGES.md index b5144b7a2..e1a059481 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -3,8 +3,14 @@ ## Unreleased - 2021-xx-xx - Rename `Connector::{ssl => openssl}`. [#2503] - Improve `Client` instantiation efficiency when using `openssl` by only building connectors once. [#2503] +- `ClientRequest::send_body` now takes an `impl MessageBody`. [#2546] +- Rename `MessageBody => ResponseBody` to avoid conflicts with `MessageBody` trait. [#2546] +- `impl Future` for `ResponseBody` no longer requires the body type be `Unpin`. [#2546] +- `impl Future` for `JsonBody` no longer requires the body type be `Unpin`. [#2546] +- `impl Stream` for `ClientResponse` no longer requires the body type be `Unpin`. [#2546] [#2503]: https://github.com/actix/actix-web/pull/2503 +[#2546]: https://github.com/actix/actix-web/pull/2546 ## 3.0.0-beta.14 - 2021-12-17 diff --git a/awc/src/any_body.rs b/awc/src/any_body.rs index 2ffeb5074..437216313 100644 --- a/awc/src/any_body.rs +++ b/awc/src/any_body.rs @@ -77,10 +77,27 @@ impl AnyBody where B: MessageBody + 'static, { + /// Converts a [`MessageBody`] type into the best possible representation. + /// + /// Checks size for `None` and tries to convert to `Bytes`. Otherwise, uses the `Body` variant. + pub fn from_message_body(body: B) -> Self + where + B: MessageBody, + { + if matches!(body.size(), BodySize::None) { + return Self::None; + } + + match body.try_into_bytes() { + Ok(body) => Self::Bytes { body }, + Err(body) => Self::new(body), + } + } + pub fn into_boxed(self) -> AnyBody { match self { Self::None => AnyBody::None, - Self::Bytes { body: bytes } => AnyBody::Bytes { body: bytes }, + Self::Bytes { body } => AnyBody::Bytes { body }, Self::Body { body } => AnyBody::new_boxed(body), } } diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 19870b069..f93014a67 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -16,7 +16,7 @@ use crate::{ client::{ Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError, }, - response::ClientResponse, + ClientResponse, }; pub type BoxConnectorService = Rc< diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs index cd93a1d60..b98d8d5e1 100644 --- a/awc/src/frozen.rs +++ b/awc/src/frozen.rs @@ -5,13 +5,13 @@ use futures_core::Stream; use serde::Serialize; use actix_http::{ + body::MessageBody, error::HttpError, header::{HeaderMap, HeaderName, TryIntoHeaderValue}, Method, RequestHead, Uri, }; use crate::{ - any_body::AnyBody, sender::{RequestSender, SendClientRequest}, BoxError, ClientConfig, }; @@ -46,7 +46,7 @@ impl FrozenClientRequest { /// Send a body. pub fn send_body(&self, body: B) -> SendClientRequest where - B: Into, + B: MessageBody + 'static, { RequestSender::Rc(self.head.clone(), None).send_body( self.addr, @@ -159,7 +159,7 @@ impl FrozenSendBuilder { /// Complete request construction and send a body. pub fn send_body(self, body: B) -> SendClientRequest where - B: Into, + B: MessageBody + 'static, { if let Some(e) = self.err { return e.into(); diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 00c559406..cef8e03dc 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -113,7 +113,7 @@ pub mod error; mod frozen; pub mod middleware; mod request; -mod response; +mod responses; mod sender; pub mod test; pub mod ws; @@ -128,7 +128,8 @@ pub use self::client::Connector; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; -pub use self::response::{ClientResponse, JsonBody, MessageBody}; +#[allow(deprecated)] +pub use self::responses::{ClientResponse, JsonBody, MessageBody, ResponseBody}; pub use self::sender::SendClientRequest; use std::{convert::TryFrom, rc::Rc, time::Duration}; diff --git a/awc/src/middleware/redirect.rs b/awc/src/middleware/redirect.rs index 704d2d79d..0ee969eee 100644 --- a/awc/src/middleware/redirect.rs +++ b/awc/src/middleware/redirect.rs @@ -190,9 +190,7 @@ where let body_new = if is_redirect { // try to reuse body match body { - Some(ref bytes) => AnyBody::Bytes { - body: bytes.clone(), - }, + Some(ref bytes) => AnyBody::from(bytes.clone()), // TODO: should this be AnyBody::Empty or AnyBody::None. _ => AnyBody::empty(), } diff --git a/awc/src/request.rs b/awc/src/request.rs index 9e37b2755..3eb76e3f6 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -5,13 +5,13 @@ use futures_core::Stream; use serde::Serialize; use actix_http::{ + body::MessageBody, error::HttpError, header::{self, HeaderMap, HeaderValue, TryIntoHeaderPair}, ConnectionType, Method, RequestHead, Uri, Version, }; use crate::{ - any_body::AnyBody, error::{FreezeRequestError, InvalidUrl}, frozen::FrozenClientRequest, sender::{PrepForSendingError, RequestSender, SendClientRequest}, @@ -26,20 +26,20 @@ use crate::cookie::{Cookie, CookieJar}; /// This type can be used to construct an instance of `ClientRequest` through a /// builder-like pattern. /// -/// ``` -/// #[actix_rt::main] -/// async fn main() { -/// let response = awc::Client::new() -/// .get("http://www.rust-lang.org") // <- Create request builder -/// .insert_header(("User-Agent", "Actix-web")) -/// .send() // <- Send HTTP request -/// .await; +/// ```no_run +/// # #[actix_rt::main] +/// # async fn main() { +/// let response = awc::Client::new() +/// .get("http://www.rust-lang.org") // <- Create request builder +/// .insert_header(("User-Agent", "Actix-web")) +/// .send() // <- Send HTTP request +/// .await; /// -/// response.and_then(|response| { // <- server HTTP response -/// println!("Response: {:?}", response); -/// Ok(()) -/// }); -/// } +/// response.and_then(|response| { // <- server HTTP response +/// println!("Response: {:?}", response); +/// Ok(()) +/// }); +/// # } /// ``` pub struct ClientRequest { pub(crate) head: RequestHead, @@ -174,17 +174,13 @@ impl ClientRequest { /// Append a header, keeping any that were set with an equivalent field name. /// - /// ``` - /// # #[actix_rt::main] - /// # async fn main() { - /// # use awc::Client; - /// use awc::http::header::CONTENT_TYPE; + /// ```no_run + /// use awc::{http::header, Client}; /// /// Client::new() /// .get("http://www.rust-lang.org") /// .insert_header(("X-TEST", "value")) - /// .insert_header((CONTENT_TYPE, mime::APPLICATION_JSON)); - /// # } + /// .insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON)); /// ``` pub fn append_header(mut self, header: impl TryIntoHeaderPair) -> Self { match header.try_into_pair() { @@ -252,23 +248,25 @@ impl ClientRequest { /// Set a cookie /// - /// ``` - /// #[actix_rt::main] - /// async fn main() { - /// let resp = awc::Client::new().get("https://www.rust-lang.org") - /// .cookie( - /// awc::cookie::Cookie::build("name", "value") - /// .domain("www.rust-lang.org") - /// .path("/") - /// .secure(true) - /// .http_only(true) - /// .finish(), - /// ) - /// .send() - /// .await; + /// ```no_run + /// use awc::{cookie, Client}; /// - /// println!("Response: {:?}", resp); - /// } + /// # #[actix_rt::main] + /// # async fn main() { + /// let resp = Client::new().get("https://www.rust-lang.org") + /// .cookie( + /// awc::cookie::Cookie::build("name", "value") + /// .domain("www.rust-lang.org") + /// .path("/") + /// .secure(true) + /// .http_only(true) + /// .finish(), + /// ) + /// .send() + /// .await; + /// + /// println!("Response: {:?}", resp); + /// # } /// ``` #[cfg(feature = "cookies")] pub fn cookie(mut self, cookie: Cookie<'_>) -> Self { @@ -340,7 +338,7 @@ impl ClientRequest { /// Complete request construction and send body. pub fn send_body(self, body: B) -> SendClientRequest where - B: Into, + B: MessageBody + 'static, { let slf = match self.prep_for_sending() { Ok(slf) => slf, diff --git a/awc/src/response.rs b/awc/src/response.rs deleted file mode 100644 index 78cc339b4..000000000 --- a/awc/src/response.rs +++ /dev/null @@ -1,556 +0,0 @@ -use std::{ - cell::{Ref, RefMut}, - fmt, - future::Future, - io, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use actix_http::{ - error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, - HttpMessage, Payload, ResponseHead, StatusCode, Version, -}; -use actix_rt::time::{sleep, Sleep}; -use bytes::{Bytes, BytesMut}; -use futures_core::{ready, Stream}; -use serde::de::DeserializeOwned; - -#[cfg(feature = "cookies")] -use crate::cookie::{Cookie, ParseError as CookieParseError}; -use crate::error::JsonPayloadError; - -/// Client Response -pub struct ClientResponse { - pub(crate) head: ResponseHead, - pub(crate) payload: Payload, - pub(crate) timeout: ResponseTimeout, -} - -/// helper enum with reusable sleep passed from `SendClientResponse`. -/// See `ClientResponse::_timeout` for reason. -pub(crate) enum ResponseTimeout { - Disabled(Option>>), - Enabled(Pin>), -} - -impl Default for ResponseTimeout { - fn default() -> Self { - Self::Disabled(None) - } -} - -impl ResponseTimeout { - fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> { - match *self { - Self::Enabled(ref mut timeout) => { - if timeout.as_mut().poll(cx).is_ready() { - Err(PayloadError::Io(io::Error::new( - io::ErrorKind::TimedOut, - "Response Payload IO timed out", - ))) - } else { - Ok(()) - } - } - Self::Disabled(_) => Ok(()), - } - } -} - -impl HttpMessage for ClientResponse { - type Stream = S; - - fn headers(&self) -> &HeaderMap { - &self.head.headers - } - - fn take_payload(&mut self) -> Payload { - std::mem::replace(&mut self.payload, Payload::None) - } - - fn extensions(&self) -> Ref<'_, Extensions> { - self.head.extensions() - } - - fn extensions_mut(&self) -> RefMut<'_, Extensions> { - self.head.extensions_mut() - } -} - -impl ClientResponse { - /// Create new Request instance - pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { - ClientResponse { - head, - payload, - timeout: ResponseTimeout::default(), - } - } - - #[inline] - pub(crate) fn head(&self) -> &ResponseHead { - &self.head - } - - /// Read the Request Version. - #[inline] - pub fn version(&self) -> Version { - self.head().version - } - - /// Get the status from the server. - #[inline] - pub fn status(&self) -> StatusCode { - self.head().status - } - - #[inline] - /// Returns request's headers. - pub fn headers(&self) -> &HeaderMap { - &self.head().headers - } - - /// Set a body and return previous body value - pub fn map_body(mut self, f: F) -> ClientResponse - where - F: FnOnce(&mut ResponseHead, Payload) -> Payload, - { - let payload = f(&mut self.head, self.payload); - - ClientResponse { - payload, - head: self.head, - timeout: self.timeout, - } - } - - /// Set a timeout duration for [`ClientResponse`](self::ClientResponse). - /// - /// This duration covers the duration of processing the response body stream - /// and would end it as timeout error when deadline met. - /// - /// Disabled by default. - pub fn timeout(self, dur: Duration) -> Self { - let timeout = match self.timeout { - ResponseTimeout::Disabled(Some(mut timeout)) - | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) { - Some(deadline) => { - timeout.as_mut().reset(deadline.into()); - ResponseTimeout::Enabled(timeout) - } - None => ResponseTimeout::Enabled(Box::pin(sleep(dur))), - }, - _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))), - }; - - Self { - payload: self.payload, - head: self.head, - timeout, - } - } - - /// This method does not enable timeout. It's used to pass the boxed `Sleep` from - /// `SendClientRequest` and reuse it's heap allocation together with it's slot in - /// timer wheel. - pub(crate) fn _timeout(mut self, timeout: Option>>) -> Self { - self.timeout = ResponseTimeout::Disabled(timeout); - self - } - - /// Load request cookies. - #[cfg(feature = "cookies")] - pub fn cookies(&self) -> Result>>, CookieParseError> { - struct Cookies(Vec>); - - if self.extensions().get::().is_none() { - let mut cookies = Vec::new(); - for hdr in self.headers().get_all(&header::SET_COOKIE) { - let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?; - cookies.push(Cookie::parse_encoded(s)?.into_owned()); - } - self.extensions_mut().insert(Cookies(cookies)); - } - Ok(Ref::map(self.extensions(), |ext| { - &ext.get::().unwrap().0 - })) - } - - /// Return request cookie. - #[cfg(feature = "cookies")] - pub fn cookie(&self, name: &str) -> Option> { - if let Ok(cookies) = self.cookies() { - for cookie in cookies.iter() { - if cookie.name() == name { - return Some(cookie.to_owned()); - } - } - } - None - } -} - -impl ClientResponse -where - S: Stream>, -{ - /// Loads HTTP response's body. - pub fn body(&mut self) -> MessageBody { - MessageBody::new(self) - } - - /// Loads and parse `application/json` encoded body. - /// Return `JsonBody` future. It resolves to a `T` value. - /// - /// Returns error: - /// - /// * content type is not `application/json` - /// * content length is greater than 256k - pub fn json(&mut self) -> JsonBody { - JsonBody::new(self) - } -} - -impl Stream for ClientResponse -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - this.timeout.poll_timeout(cx)?; - - Pin::new(&mut this.payload).poll_next(cx) - } -} - -impl fmt::Debug for ClientResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; - writeln!(f, " headers:")?; - for (key, val) in self.headers().iter() { - writeln!(f, " {:?}: {:?}", key, val)?; - } - Ok(()) - } -} - -const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; - -/// Future that resolves to a complete HTTP message body. -pub struct MessageBody { - length: Option, - timeout: ResponseTimeout, - body: Result, Option>, -} - -impl MessageBody -where - S: Stream>, -{ - /// Create `MessageBody` for request. - pub fn new(res: &mut ClientResponse) -> MessageBody { - let length = match res.headers().get(&header::CONTENT_LENGTH) { - Some(value) => { - let len = value.to_str().ok().and_then(|s| s.parse::().ok()); - - match len { - None => return Self::err(PayloadError::UnknownLength), - len => len, - } - } - None => None, - }; - - MessageBody { - length, - timeout: std::mem::take(&mut res.timeout), - body: Ok(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), - } - } - - /// Change max size of payload. By default max size is 2048kB - pub fn limit(mut self, limit: usize) -> Self { - if let Ok(ref mut body) = self.body { - body.limit = limit; - } - self - } - - fn err(e: PayloadError) -> Self { - MessageBody { - length: None, - timeout: ResponseTimeout::default(), - body: Err(Some(e)), - } - } -} - -impl Future for MessageBody -where - S: Stream> + Unpin, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - match this.body { - Err(ref mut err) => Poll::Ready(Err(err.take().unwrap())), - Ok(ref mut body) => { - if let Some(len) = this.length.take() { - if len > body.limit { - return Poll::Ready(Err(PayloadError::Overflow)); - } - } - - this.timeout.poll_timeout(cx)?; - - Pin::new(body).poll(cx) - } - } - } -} - -/// Response's payload json parser, it resolves to a deserialized `T` value. -/// -/// Returns error: -/// -/// * content type is not `application/json` -/// * content length is greater than 64k -pub struct JsonBody { - length: Option, - err: Option, - timeout: ResponseTimeout, - fut: Option>, - _phantom: PhantomData, -} - -impl JsonBody -where - S: Stream>, - U: DeserializeOwned, -{ - /// Create `JsonBody` for request. - pub fn new(res: &mut ClientResponse) -> Self { - // check content-type - let json = if let Ok(Some(mime)) = res.mime_type() { - mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) - } else { - false - }; - if !json { - return JsonBody { - length: None, - fut: None, - timeout: ResponseTimeout::default(), - err: Some(JsonPayloadError::ContentType), - _phantom: PhantomData, - }; - } - - let mut len = None; - - if let Some(l) = res.headers().get(&header::CONTENT_LENGTH) { - if let Ok(s) = l.to_str() { - if let Ok(l) = s.parse::() { - len = Some(l) - } - } - } - - JsonBody { - length: len, - err: None, - timeout: std::mem::take(&mut res.timeout), - fut: Some(ReadBody::new(res.take_payload(), 65536)), - _phantom: PhantomData, - } - } - - /// Change max size of payload. By default max size is 64kB - pub fn limit(mut self, limit: usize) -> Self { - if let Some(ref mut fut) = self.fut { - fut.limit = limit; - } - self - } -} - -impl Unpin for JsonBody -where - T: Stream> + Unpin, - U: DeserializeOwned, -{ -} - -impl Future for JsonBody -where - T: Stream> + Unpin, - U: DeserializeOwned, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(err) = self.err.take() { - return Poll::Ready(Err(err)); - } - - if let Some(len) = self.length.take() { - if len > self.fut.as_ref().unwrap().limit { - return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow))); - } - } - - self.timeout - .poll_timeout(cx) - .map_err(JsonPayloadError::Payload)?; - - let body = ready!(Pin::new(&mut self.get_mut().fut.as_mut().unwrap()).poll(cx))?; - Poll::Ready(serde_json::from_slice::(&body).map_err(JsonPayloadError::from)) - } -} - -struct ReadBody { - stream: Payload, - buf: BytesMut, - limit: usize, -} - -impl ReadBody { - fn new(stream: Payload, limit: usize) -> Self { - Self { - stream, - buf: BytesMut::new(), - limit, - } - } -} - -impl Future for ReadBody -where - S: Stream> + Unpin, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - while let Some(chunk) = ready!(Pin::new(&mut this.stream).poll_next(cx)?) { - if (this.buf.len() + chunk.len()) > this.limit { - return Poll::Ready(Err(PayloadError::Overflow)); - } - this.buf.extend_from_slice(&chunk); - } - - Poll::Ready(Ok(this.buf.split().freeze())) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde::{Deserialize, Serialize}; - - use crate::{http::header, test::TestResponse}; - - #[actix_rt::test] - async fn test_body() { - let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish(); - match req.body().await.err().unwrap() { - PayloadError::UnknownLength => {} - _ => unreachable!("error"), - } - - let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "10000000")).finish(); - match req.body().await.err().unwrap() { - PayloadError::Overflow => {} - _ => unreachable!("error"), - } - - let mut req = TestResponse::default() - .set_payload(Bytes::from_static(b"test")) - .finish(); - assert_eq!(req.body().await.ok().unwrap(), Bytes::from_static(b"test")); - - let mut req = TestResponse::default() - .set_payload(Bytes::from_static(b"11111111111111")) - .finish(); - match req.body().limit(5).await.err().unwrap() { - PayloadError::Overflow => {} - _ => unreachable!("error"), - } - } - - #[derive(Serialize, Deserialize, PartialEq, Debug)] - struct MyObject { - name: String, - } - - fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool { - match err { - JsonPayloadError::Payload(PayloadError::Overflow) => { - matches!(other, JsonPayloadError::Payload(PayloadError::Overflow)) - } - JsonPayloadError::ContentType => matches!(other, JsonPayloadError::ContentType), - _ => false, - } - } - - #[actix_rt::test] - async fn test_json_body() { - let mut req = TestResponse::default().finish(); - let json = JsonBody::<_, MyObject>::new(&mut req).await; - assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); - - let mut req = TestResponse::default() - .insert_header(( - header::CONTENT_TYPE, - header::HeaderValue::from_static("application/text"), - )) - .finish(); - let json = JsonBody::<_, MyObject>::new(&mut req).await; - assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); - - let mut req = TestResponse::default() - .insert_header(( - header::CONTENT_TYPE, - header::HeaderValue::from_static("application/json"), - )) - .insert_header(( - header::CONTENT_LENGTH, - header::HeaderValue::from_static("10000"), - )) - .finish(); - - let json = JsonBody::<_, MyObject>::new(&mut req).limit(100).await; - assert!(json_eq( - json.err().unwrap(), - JsonPayloadError::Payload(PayloadError::Overflow) - )); - - let mut req = TestResponse::default() - .insert_header(( - header::CONTENT_TYPE, - header::HeaderValue::from_static("application/json"), - )) - .insert_header(( - header::CONTENT_LENGTH, - header::HeaderValue::from_static("16"), - )) - .set_payload(Bytes::from_static(b"{\"name\": \"test\"}")) - .finish(); - - let json = JsonBody::<_, MyObject>::new(&mut req).await; - assert_eq!( - json.ok().unwrap(), - MyObject { - name: "test".to_owned() - } - ); - } -} diff --git a/awc/src/responses/json_body.rs b/awc/src/responses/json_body.rs new file mode 100644 index 000000000..3912324b6 --- /dev/null +++ b/awc/src/responses/json_body.rs @@ -0,0 +1,192 @@ +use std::{ + future::Future, + marker::PhantomData, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_http::{error::PayloadError, header, HttpMessage}; +use bytes::Bytes; +use futures_core::{ready, Stream}; +use pin_project_lite::pin_project; +use serde::de::DeserializeOwned; + +use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT}; +use crate::{error::JsonPayloadError, ClientResponse}; + +pin_project! { + /// A `Future` that reads a body stream, parses JSON, resolving to a deserialized `T`. + /// + /// # Errors + /// `Future` implementation returns error if: + /// - content type is not `application/json`; + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). + pub struct JsonBody { + #[pin] + body: Option>, + length: Option, + timeout: ResponseTimeout, + err: Option, + _phantom: PhantomData, + } +} + +impl JsonBody +where + S: Stream>, + T: DeserializeOwned, +{ + /// Creates a JSON body stream reader from a response by taking its payload. + pub fn new(res: &mut ClientResponse) -> Self { + // check content-type + let json = if let Ok(Some(mime)) = res.mime_type() { + mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) + } else { + false + }; + + if !json { + return JsonBody { + length: None, + body: None, + timeout: ResponseTimeout::default(), + err: Some(JsonPayloadError::ContentType), + _phantom: PhantomData, + }; + } + + let length = res + .headers() + .get(&header::CONTENT_LENGTH) + .and_then(|len_hdr| len_hdr.to_str().ok()) + .and_then(|len_str| len_str.parse::().ok()); + + JsonBody { + body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), + length, + timeout: mem::take(&mut res.timeout), + err: None, + _phantom: PhantomData, + } + } + + /// Change max size of payload. Default limit is 2 MiB. + pub fn limit(mut self, limit: usize) -> Self { + if let Some(ref mut fut) = self.body { + fut.limit = limit; + } + + self + } +} + +impl Future for JsonBody +where + S: Stream>, + T: DeserializeOwned, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Some(err) = this.err.take() { + return Poll::Ready(Err(err)); + } + + if let Some(len) = this.length.take() { + let body = Option::as_ref(&this.body).unwrap(); + if len > body.limit { + return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow))); + } + } + + this.timeout + .poll_timeout(cx) + .map_err(JsonPayloadError::Payload)?; + + let body = ready!(this.body.as_pin_mut().unwrap().poll(cx))?; + Poll::Ready(serde_json::from_slice::(&body).map_err(JsonPayloadError::from)) + } +} + +#[cfg(test)] +mod tests { + use actix_http::BoxedPayloadStream; + use serde::{Deserialize, Serialize}; + use static_assertions::assert_impl_all; + + use super::*; + use crate::{http::header, test::TestResponse}; + + assert_impl_all!(JsonBody: Unpin); + + #[derive(Serialize, Deserialize, PartialEq, Debug)] + struct MyObject { + name: String, + } + + fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool { + match err { + JsonPayloadError::Payload(PayloadError::Overflow) => { + matches!(other, JsonPayloadError::Payload(PayloadError::Overflow)) + } + JsonPayloadError::ContentType => matches!(other, JsonPayloadError::ContentType), + _ => false, + } + } + + #[actix_rt::test] + async fn read_json_body() { + let mut req = TestResponse::default().finish(); + let json = JsonBody::<_, MyObject>::new(&mut req).await; + assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); + + let mut req = TestResponse::default() + .insert_header(( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/text"), + )) + .finish(); + let json = JsonBody::<_, MyObject>::new(&mut req).await; + assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); + + let mut req = TestResponse::default() + .insert_header(( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + )) + .insert_header(( + header::CONTENT_LENGTH, + header::HeaderValue::from_static("10000"), + )) + .finish(); + + let json = JsonBody::<_, MyObject>::new(&mut req).limit(100).await; + assert!(json_eq( + json.err().unwrap(), + JsonPayloadError::Payload(PayloadError::Overflow) + )); + + let mut req = TestResponse::default() + .insert_header(( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + )) + .insert_header(( + header::CONTENT_LENGTH, + header::HeaderValue::from_static("16"), + )) + .set_payload(Bytes::from_static(b"{\"name\": \"test\"}")) + .finish(); + + let json = JsonBody::<_, MyObject>::new(&mut req).await; + assert_eq!( + json.ok().unwrap(), + MyObject { + name: "test".to_owned() + } + ); + } +} diff --git a/awc/src/responses/mod.rs b/awc/src/responses/mod.rs new file mode 100644 index 000000000..588ce014c --- /dev/null +++ b/awc/src/responses/mod.rs @@ -0,0 +1,49 @@ +use std::{future::Future, io, pin::Pin, task::Context}; + +use actix_http::error::PayloadError; +use actix_rt::time::Sleep; + +mod json_body; +mod read_body; +mod response; +mod response_body; + +pub use self::json_body::JsonBody; +pub use self::response::ClientResponse; +#[allow(deprecated)] +pub use self::response_body::{MessageBody, ResponseBody}; + +/// Default body size limit: 2 MiB +const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; + +/// Helper enum with reusable sleep passed from `SendClientResponse`. +/// +/// See [`ClientResponse::_timeout`] for reason. +pub(crate) enum ResponseTimeout { + Disabled(Option>>), + Enabled(Pin>), +} + +impl Default for ResponseTimeout { + fn default() -> Self { + Self::Disabled(None) + } +} + +impl ResponseTimeout { + fn poll_timeout(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> { + match *self { + Self::Enabled(ref mut timeout) => { + if timeout.as_mut().poll(cx).is_ready() { + Err(PayloadError::Io(io::Error::new( + io::ErrorKind::TimedOut, + "Response Payload IO timed out", + ))) + } else { + Ok(()) + } + } + Self::Disabled(_) => Ok(()), + } + } +} diff --git a/awc/src/responses/read_body.rs b/awc/src/responses/read_body.rs new file mode 100644 index 000000000..a32bbb984 --- /dev/null +++ b/awc/src/responses/read_body.rs @@ -0,0 +1,61 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_http::{error::PayloadError, Payload}; +use bytes::{Bytes, BytesMut}; +use futures_core::{ready, Stream}; +use pin_project_lite::pin_project; + +pin_project! { + pub(crate) struct ReadBody { + #[pin] + pub(crate) stream: Payload, + pub(crate) buf: BytesMut, + pub(crate) limit: usize, + } +} + +impl ReadBody { + pub(crate) fn new(stream: Payload, limit: usize) -> Self { + Self { + stream, + buf: BytesMut::new(), + limit, + } + } +} + +impl Future for ReadBody +where + S: Stream>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + while let Some(chunk) = ready!(this.stream.as_mut().poll_next(cx)?) { + if (this.buf.len() + chunk.len()) > *this.limit { + return Poll::Ready(Err(PayloadError::Overflow)); + } + + this.buf.extend_from_slice(&chunk); + } + + Poll::Ready(Ok(this.buf.split().freeze())) + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::*; + use crate::any_body::AnyBody; + + assert_impl_all!(ReadBody<()>: Unpin); + assert_impl_all!(ReadBody: Unpin); +} diff --git a/awc/src/responses/response.rs b/awc/src/responses/response.rs new file mode 100644 index 000000000..6385aea19 --- /dev/null +++ b/awc/src/responses/response.rs @@ -0,0 +1,257 @@ +use std::{ + cell::{Ref, RefMut}, + fmt, mem, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use actix_http::{ + error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, + HttpMessage, Payload, ResponseHead, StatusCode, Version, +}; +use actix_rt::time::{sleep, Sleep}; +use bytes::Bytes; +use futures_core::Stream; +use pin_project_lite::pin_project; +use serde::de::DeserializeOwned; + +#[cfg(feature = "cookies")] +use crate::cookie::{Cookie, ParseError as CookieParseError}; + +use super::{JsonBody, ResponseBody, ResponseTimeout}; + +pin_project! { + /// Client Response + pub struct ClientResponse { + pub(crate) head: ResponseHead, + #[pin] + pub(crate) payload: Payload, + pub(crate) timeout: ResponseTimeout, + } +} + +impl ClientResponse { + /// Create new Request instance + pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { + ClientResponse { + head, + payload, + timeout: ResponseTimeout::default(), + } + } + + #[inline] + pub(crate) fn head(&self) -> &ResponseHead { + &self.head + } + + /// Read the Request Version. + #[inline] + pub fn version(&self) -> Version { + self.head().version + } + + /// Get the status from the server. + #[inline] + pub fn status(&self) -> StatusCode { + self.head().status + } + + #[inline] + /// Returns request's headers. + pub fn headers(&self) -> &HeaderMap { + &self.head().headers + } + + /// Set a body and return previous body value + pub fn map_body(mut self, f: F) -> ClientResponse + where + F: FnOnce(&mut ResponseHead, Payload) -> Payload, + { + let payload = f(&mut self.head, self.payload); + + ClientResponse { + payload, + head: self.head, + timeout: self.timeout, + } + } + + /// Set a timeout duration for [`ClientResponse`](self::ClientResponse). + /// + /// This duration covers the duration of processing the response body stream + /// and would end it as timeout error when deadline met. + /// + /// Disabled by default. + pub fn timeout(self, dur: Duration) -> Self { + let timeout = match self.timeout { + ResponseTimeout::Disabled(Some(mut timeout)) + | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) { + Some(deadline) => { + timeout.as_mut().reset(deadline.into()); + ResponseTimeout::Enabled(timeout) + } + None => ResponseTimeout::Enabled(Box::pin(sleep(dur))), + }, + _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))), + }; + + Self { + payload: self.payload, + head: self.head, + timeout, + } + } + + /// This method does not enable timeout. It's used to pass the boxed `Sleep` from + /// `SendClientRequest` and reuse it's heap allocation together with it's slot in + /// timer wheel. + pub(crate) fn _timeout(mut self, timeout: Option>>) -> Self { + self.timeout = ResponseTimeout::Disabled(timeout); + self + } + + /// Load request cookies. + #[cfg(feature = "cookies")] + pub fn cookies(&self) -> Result>>, CookieParseError> { + struct Cookies(Vec>); + + if self.extensions().get::().is_none() { + let mut cookies = Vec::new(); + for hdr in self.headers().get_all(&header::SET_COOKIE) { + let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?; + cookies.push(Cookie::parse_encoded(s)?.into_owned()); + } + self.extensions_mut().insert(Cookies(cookies)); + } + Ok(Ref::map(self.extensions(), |ext| { + &ext.get::().unwrap().0 + })) + } + + /// Return request cookie. + #[cfg(feature = "cookies")] + pub fn cookie(&self, name: &str) -> Option> { + if let Ok(cookies) = self.cookies() { + for cookie in cookies.iter() { + if cookie.name() == name { + return Some(cookie.to_owned()); + } + } + } + None + } +} + +impl ClientResponse +where + S: Stream>, +{ + /// Returns a [`Future`] that consumes the body stream and resolves to [`Bytes`]. + /// + /// # Errors + /// `Future` implementation returns error if: + /// - content type is not `application/json` + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB) + /// + /// # Examples + /// ```no_run + /// # use awc::Client; + /// # use bytes::Bytes; + /// # #[actix_rt::main] + /// # async fn async_ctx() -> Result<(), Box> { + /// let client = Client::default(); + /// let mut res = client.get("https://httpbin.org/robots.txt").send().await?; + /// let body: Bytes = res.body().await?; + /// # Ok(()) + /// # } + /// ``` + /// + /// [`Future`]: std::future::Future + pub fn body(&mut self) -> ResponseBody { + ResponseBody::new(self) + } + + /// Returns a [`Future`] consumes the body stream, parses JSON, and resolves to a deserialized + /// `T` value. + /// + /// # Errors + /// Future returns error if: + /// - content type is not `application/json`; + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). + /// + /// # Examples + /// ```no_run + /// # use awc::Client; + /// # #[actix_rt::main] + /// # async fn async_ctx() -> Result<(), Box> { + /// let client = Client::default(); + /// let mut res = client.get("https://httpbin.org/json").send().await?; + /// let val = res.json::().await?; + /// assert!(val.is_object()); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`Future`]: std::future::Future + pub fn json(&mut self) -> JsonBody { + JsonBody::new(self) + } +} + +impl fmt::Debug for ClientResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; + writeln!(f, " headers:")?; + for (key, val) in self.headers().iter() { + writeln!(f, " {:?}: {:?}", key, val)?; + } + Ok(()) + } +} + +impl HttpMessage for ClientResponse { + type Stream = S; + + fn headers(&self) -> &HeaderMap { + &self.head.headers + } + + fn take_payload(&mut self) -> Payload { + mem::replace(&mut self.payload, Payload::None) + } + + fn extensions(&self) -> Ref<'_, Extensions> { + self.head.extensions() + } + + fn extensions_mut(&self) -> RefMut<'_, Extensions> { + self.head.extensions_mut() + } +} + +impl Stream for ClientResponse +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.timeout.poll_timeout(cx)?; + this.payload.poll_next(cx) + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::*; + use crate::any_body::AnyBody; + + assert_impl_all!(ClientResponse: Unpin); + assert_impl_all!(ClientResponse<()>: Unpin); + assert_impl_all!(ClientResponse: Unpin); +} diff --git a/awc/src/responses/response_body.rs b/awc/src/responses/response_body.rs new file mode 100644 index 000000000..8d9d1274a --- /dev/null +++ b/awc/src/responses/response_body.rs @@ -0,0 +1,144 @@ +use std::{ + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_http::{error::PayloadError, header, HttpMessage}; +use bytes::Bytes; +use futures_core::Stream; +use pin_project_lite::pin_project; + +use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT}; +use crate::ClientResponse; + +pin_project! { + /// A `Future` that reads a body stream, resolving as [`Bytes`]. + /// + /// # Errors + /// `Future` implementation returns error if: + /// - content type is not `application/json`; + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). + pub struct ResponseBody { + #[pin] + body: Option>, + length: Option, + timeout: ResponseTimeout, + err: Option, + } +} + +#[deprecated(since = "3.0.0", note = "Renamed to `ResponseBody`.")] +pub type MessageBody = ResponseBody; + +impl ResponseBody +where + S: Stream>, +{ + /// Creates a body stream reader from a response by taking its payload. + pub fn new(res: &mut ClientResponse) -> ResponseBody { + let length = match res.headers().get(&header::CONTENT_LENGTH) { + Some(value) => { + let len = value.to_str().ok().and_then(|s| s.parse::().ok()); + + match len { + None => return Self::err(PayloadError::UnknownLength), + len => len, + } + } + None => None, + }; + + ResponseBody { + body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), + length, + timeout: mem::take(&mut res.timeout), + err: None, + } + } + + /// Change max size limit of payload. + /// + /// The default limit is 2 MiB. + pub fn limit(mut self, limit: usize) -> Self { + if let Some(ref mut body) = self.body { + body.limit = limit; + } + + self + } + + fn err(err: PayloadError) -> Self { + ResponseBody { + body: None, + length: None, + timeout: ResponseTimeout::default(), + err: Some(err), + } + } +} + +impl Future for ResponseBody +where + S: Stream>, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if let Some(err) = this.err.take() { + return Poll::Ready(Err(err)); + } + + if let Some(len) = this.length.take() { + let body = Option::as_ref(&this.body).unwrap(); + if len > body.limit { + return Poll::Ready(Err(PayloadError::Overflow)); + } + } + + this.timeout.poll_timeout(cx)?; + + this.body.as_pin_mut().unwrap().poll(cx) + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::*; + use crate::{http::header, test::TestResponse}; + + assert_impl_all!(ResponseBody<()>: Unpin); + + #[actix_rt::test] + async fn read_body() { + let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish(); + match req.body().await.err().unwrap() { + PayloadError::UnknownLength => {} + _ => unreachable!("error"), + } + + let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "10000000")).finish(); + match req.body().await.err().unwrap() { + PayloadError::Overflow => {} + _ => unreachable!("error"), + } + + let mut req = TestResponse::default() + .set_payload(Bytes::from_static(b"test")) + .finish(); + assert_eq!(req.body().await.ok().unwrap(), Bytes::from_static(b"test")); + + let mut req = TestResponse::default() + .set_payload(Bytes::from_static(b"11111111111111")) + .finish(); + match req.body().limit(5).await.err().unwrap() { + PayloadError::Overflow => {} + _ => unreachable!("error"), + } + } +} diff --git a/awc/src/sender.rs b/awc/src/sender.rs index 29c814531..71d705d38 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_http::{ - body::BodyStream, + body::{BodyStream, MessageBody}, error::HttpError, header::{self, HeaderMap, HeaderName, TryIntoHeaderValue}, RequestHead, RequestHeadType, @@ -189,15 +189,17 @@ impl RequestSender { body: B, ) -> SendClientRequest where - B: Into, + B: MessageBody + 'static, { let req = match self { - RequestSender::Owned(head) => { - ConnectRequest::Client(RequestHeadType::Owned(head), body.into(), addr) - } + RequestSender::Owned(head) => ConnectRequest::Client( + RequestHeadType::Owned(head), + AnyBody::from_message_body(body).into_boxed(), + addr, + ), RequestSender::Rc(head, extra_headers) => ConnectRequest::Client( RequestHeadType::Rc(head, extra_headers), - body.into(), + AnyBody::from_message_body(body).into_boxed(), addr, ), }; @@ -229,9 +231,7 @@ impl RequestSender { response_decompress, timeout, config, - AnyBody::Bytes { - body: Bytes::from(body), - }, + AnyBody::from_message_body(body.into_bytes()), ) } @@ -260,9 +260,7 @@ impl RequestSender { response_decompress, timeout, config, - AnyBody::Bytes { - body: Bytes::from(body), - }, + AnyBody::from_message_body(body.into_bytes()), ) } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 06d54aadb..c63e22969 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -42,8 +42,7 @@ use crate::{ header::{self, HeaderName, HeaderValue, TryIntoHeaderValue, AUTHORIZATION}, ConnectionType, Method, StatusCode, Uri, Version, }, - response::ClientResponse, - ClientConfig, + ClientConfig, ClientResponse, }; #[cfg(feature = "cookies")] diff --git a/src/guard.rs b/src/guard.rs index a5770df89..d5c585c1b 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -270,13 +270,11 @@ impl Guard for HeaderGuard { /// ``` /// use actix_web::{web, guard::Host, App, HttpResponse}; /// -/// fn main() { -/// App::new().service( -/// web::resource("/index.html") -/// .guard(Host("www.rust-lang.org")) -/// .to(|| HttpResponse::MethodNotAllowed()) -/// ); -/// } +/// App::new().service( +/// web::resource("/index.html") +/// .guard(Host("www.rust-lang.org")) +/// .to(|| HttpResponse::MethodNotAllowed()) +/// ); /// ``` pub fn Host>(host: H) -> HostGuard { HostGuard(host.as_ref().to_string(), None) From 3756dfc2cea2049c393e2944cffeb84075a982e4 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 25 Dec 2021 02:23:22 +0000 Subject: [PATCH 3/4] move client to own module --- awc/src/builder.rs | 6 +- awc/src/client/mod.rs | 188 ++++++++++++++++++++++++++++++++++++++++-- awc/src/frozen.rs | 3 +- awc/src/lib.rs | 183 +--------------------------------------- awc/src/request.rs | 22 ++--- awc/src/sender.rs | 3 +- awc/src/ws.rs | 5 +- 7 files changed, 203 insertions(+), 207 deletions(-) diff --git a/awc/src/builder.rs b/awc/src/builder.rs index 30f203bb8..16a4e9cb5 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -9,11 +9,13 @@ use actix_rt::net::{ActixStream, TcpStream}; use actix_service::{boxed, Service}; use crate::{ - client::{ConnectInfo, Connector, ConnectorService, TcpConnectError, TcpConnection}, + client::{ + ClientConfig, ConnectInfo, Connector, ConnectorService, TcpConnectError, TcpConnection, + }, connect::DefaultConnector, error::SendRequestError, middleware::{NestTransform, Redirect, Transform}, - Client, ClientConfig, ConnectRequest, ConnectResponse, + Client, ConnectRequest, ConnectResponse, }; /// An HTTP Client builder diff --git a/awc/src/client/mod.rs b/awc/src/client/mod.rs index 0d5c899bc..d5854d83e 100644 --- a/awc/src/client/mod.rs +++ b/awc/src/client/mod.rs @@ -1,6 +1,15 @@ //! HTTP client. -use http::Uri; +use std::{convert::TryFrom, rc::Rc, time::Duration}; + +use actix_http::{error::HttpError, header::HeaderMap, Method, RequestHead, Uri}; +use actix_rt::net::TcpStream; +use actix_service::Service; +pub use actix_tls::connect::{ + ConnectError as TcpConnectError, ConnectInfo, Connection as TcpConnection, +}; + +use crate::{ws, BoxConnectorService, ClientBuilder, ClientRequest}; mod config; mod connection; @@ -10,10 +19,6 @@ mod h1proto; mod h2proto; mod pool; -pub use actix_tls::connect::{ - ConnectError as TcpConnectError, ConnectInfo, Connection as TcpConnection, -}; - pub use self::connection::{Connection, ConnectionIo}; pub use self::connector::{Connector, ConnectorService}; pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; @@ -23,3 +28,176 @@ pub struct Connect { pub uri: Uri, pub addr: Option, } + +/// An asynchronous HTTP and WebSocket client. +/// +/// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU +/// and memory usage. +/// +/// # Examples +/// ``` +/// use awc::Client; +/// +/// #[actix_rt::main] +/// async fn main() { +/// let mut client = Client::default(); +/// +/// let res = client.get("http://www.rust-lang.org") +/// .insert_header(("User-Agent", "my-app/1.2")) +/// .send() +/// .await; +/// +/// println!("Response: {:?}", res); +/// } +/// ``` +#[derive(Clone)] +pub struct Client(pub(crate) ClientConfig); + +#[derive(Clone)] +pub(crate) struct ClientConfig { + pub(crate) connector: BoxConnectorService, + pub(crate) default_headers: Rc, + pub(crate) timeout: Option, +} + +impl Default for Client { + fn default() -> Self { + ClientBuilder::new().finish() + } +} + +impl Client { + /// Create new client instance with default settings. + pub fn new() -> Client { + Client::default() + } + + /// Create `Client` builder. + /// This function is equivalent of `ClientBuilder::new()`. + pub fn builder() -> ClientBuilder< + impl Service< + ConnectInfo, + Response = TcpConnection, + Error = TcpConnectError, + > + Clone, + > { + ClientBuilder::new() + } + + /// Construct HTTP request. + pub fn request(&self, method: Method, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + let mut req = ClientRequest::new(method, url, self.0.clone()); + + for header in self.0.default_headers.iter() { + // header map is empty + // TODO: probably append instead + req = req.insert_header_if_none(header); + } + req + } + + /// Create `ClientRequest` from `RequestHead` + /// + /// It is useful for proxy requests. This implementation + /// copies all headers and the method. + pub fn request_from(&self, url: U, head: &RequestHead) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + let mut req = self.request(head.method.clone(), url); + for header in head.headers.iter() { + req = req.insert_header_if_none(header); + } + req + } + + /// Construct HTTP *GET* request. + pub fn get(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::GET, url) + } + + /// Construct HTTP *HEAD* request. + pub fn head(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::HEAD, url) + } + + /// Construct HTTP *PUT* request. + pub fn put(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::PUT, url) + } + + /// Construct HTTP *POST* request. + pub fn post(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::POST, url) + } + + /// Construct HTTP *PATCH* request. + pub fn patch(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::PATCH, url) + } + + /// Construct HTTP *DELETE* request. + pub fn delete(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::DELETE, url) + } + + /// Construct HTTP *OPTIONS* request. + pub fn options(&self, url: U) -> ClientRequest + where + Uri: TryFrom, + >::Error: Into, + { + self.request(Method::OPTIONS, url) + } + + /// Initialize a WebSocket connection. + /// Returns a WebSocket connection builder. + pub fn ws(&self, url: U) -> ws::WebsocketsRequest + where + Uri: TryFrom, + >::Error: Into, + { + let mut req = ws::WebsocketsRequest::new(url, self.0.clone()); + for (key, value) in self.0.default_headers.iter() { + req.head.headers.insert(key.clone(), value.clone()); + } + req + } + + /// Get default HeaderMap of Client. + /// + /// Returns Some(&mut HeaderMap) when Client object is unique + /// (No other clone of client exists at the same time). + pub fn headers(&mut self) -> Option<&mut HeaderMap> { + Rc::get_mut(&mut self.0.default_headers) + } +} diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs index b98d8d5e1..14ecf9f32 100644 --- a/awc/src/frozen.rs +++ b/awc/src/frozen.rs @@ -12,8 +12,9 @@ use actix_http::{ }; use crate::{ + client::ClientConfig, sender::{RequestSender, SendClientRequest}, - BoxError, ClientConfig, + BoxError, }; /// `FrozenClientRequest` struct represents cloneable client request. diff --git a/awc/src/lib.rs b/awc/src/lib.rs index cef8e03dc..348d9312b 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -124,7 +124,7 @@ pub use actix_http as http; pub use cookie; pub use self::builder::ClientBuilder; -pub use self::client::Connector; +pub use self::client::{Client, Connector}; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; @@ -132,185 +132,4 @@ pub use self::request::ClientRequest; pub use self::responses::{ClientResponse, JsonBody, MessageBody, ResponseBody}; pub use self::sender::SendClientRequest; -use std::{convert::TryFrom, rc::Rc, time::Duration}; - -use actix_http::{error::HttpError, header::HeaderMap, Method, RequestHead, Uri}; -use actix_rt::net::TcpStream; -use actix_service::Service; - -use self::client::{ConnectInfo, TcpConnectError, TcpConnection}; - pub(crate) type BoxError = Box; - -/// An asynchronous HTTP and WebSocket client. -/// -/// You should take care to create, at most, one `Client` per thread. Otherwise, expect higher CPU -/// and memory usage. -/// -/// # Examples -/// ``` -/// use awc::Client; -/// -/// #[actix_rt::main] -/// async fn main() { -/// let mut client = Client::default(); -/// -/// let res = client.get("http://www.rust-lang.org") -/// .insert_header(("User-Agent", "my-app/1.2")) -/// .send() -/// .await; -/// -/// println!("Response: {:?}", res); -/// } -/// ``` -#[derive(Clone)] -pub struct Client(ClientConfig); - -#[derive(Clone)] -pub(crate) struct ClientConfig { - pub(crate) connector: BoxConnectorService, - pub(crate) default_headers: Rc, - pub(crate) timeout: Option, -} - -impl Default for Client { - fn default() -> Self { - ClientBuilder::new().finish() - } -} - -impl Client { - /// Create new client instance with default settings. - pub fn new() -> Client { - Client::default() - } - - /// Create `Client` builder. - /// This function is equivalent of `ClientBuilder::new()`. - pub fn builder() -> ClientBuilder< - impl Service< - ConnectInfo, - Response = TcpConnection, - Error = TcpConnectError, - > + Clone, - > { - ClientBuilder::new() - } - - /// Construct HTTP request. - pub fn request(&self, method: Method, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - let mut req = ClientRequest::new(method, url, self.0.clone()); - - for header in self.0.default_headers.iter() { - // header map is empty - // TODO: probably append instead - req = req.insert_header_if_none(header); - } - req - } - - /// Create `ClientRequest` from `RequestHead` - /// - /// It is useful for proxy requests. This implementation - /// copies all headers and the method. - pub fn request_from(&self, url: U, head: &RequestHead) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - let mut req = self.request(head.method.clone(), url); - for header in head.headers.iter() { - req = req.insert_header_if_none(header); - } - req - } - - /// Construct HTTP *GET* request. - pub fn get(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::GET, url) - } - - /// Construct HTTP *HEAD* request. - pub fn head(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::HEAD, url) - } - - /// Construct HTTP *PUT* request. - pub fn put(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::PUT, url) - } - - /// Construct HTTP *POST* request. - pub fn post(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::POST, url) - } - - /// Construct HTTP *PATCH* request. - pub fn patch(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::PATCH, url) - } - - /// Construct HTTP *DELETE* request. - pub fn delete(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::DELETE, url) - } - - /// Construct HTTP *OPTIONS* request. - pub fn options(&self, url: U) -> ClientRequest - where - Uri: TryFrom, - >::Error: Into, - { - self.request(Method::OPTIONS, url) - } - - /// Initialize a WebSocket connection. - /// Returns a WebSocket connection builder. - pub fn ws(&self, url: U) -> ws::WebsocketsRequest - where - Uri: TryFrom, - >::Error: Into, - { - let mut req = ws::WebsocketsRequest::new(url, self.0.clone()); - for (key, value) in self.0.default_headers.iter() { - req.head.headers.insert(key.clone(), value.clone()); - } - req - } - - /// Get default HeaderMap of Client. - /// - /// Returns Some(&mut HeaderMap) when Client object is unique - /// (No other clone of client exists at the same time). - pub fn headers(&mut self) -> Option<&mut HeaderMap> { - Rc::get_mut(&mut self.0.default_headers) - } -} diff --git a/awc/src/request.rs b/awc/src/request.rs index 3eb76e3f6..8824dd08a 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -12,10 +12,11 @@ use actix_http::{ }; use crate::{ + client::ClientConfig, error::{FreezeRequestError, InvalidUrl}, frozen::FrozenClientRequest, sender::{PrepForSendingError, RequestSender, SendClientRequest}, - BoxError, ClientConfig, + BoxError, }; #[cfg(feature = "cookies")] @@ -249,23 +250,16 @@ impl ClientRequest { /// Set a cookie /// /// ```no_run - /// use awc::{cookie, Client}; + /// use awc::{cookie::Cookie, Client}; /// /// # #[actix_rt::main] /// # async fn main() { - /// let resp = Client::new().get("https://www.rust-lang.org") - /// .cookie( - /// awc::cookie::Cookie::build("name", "value") - /// .domain("www.rust-lang.org") - /// .path("/") - /// .secure(true) - /// .http_only(true) - /// .finish(), - /// ) - /// .send() - /// .await; + /// let res = Client::new().get("https://httpbin.org/cookies") + /// .cookie(Cookie::new("name", "value")) + /// .send() + /// .await; /// - /// println!("Response: {:?}", resp); + /// println!("Response: {:?}", res); /// # } /// ``` #[cfg(feature = "cookies")] diff --git a/awc/src/sender.rs b/awc/src/sender.rs index 71d705d38..edf41163d 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -24,8 +24,9 @@ use actix_http::{encoding::Decoder, header::ContentEncoding, Payload}; use crate::{ any_body::AnyBody, + client::ClientConfig, error::{FreezeRequestError, InvalidUrl, SendRequestError}, - BoxError, ClientConfig, ClientResponse, ConnectRequest, ConnectResponse, + BoxError, ClientResponse, ConnectRequest, ConnectResponse, }; #[derive(Debug, From)] diff --git a/awc/src/ws.rs b/awc/src/ws.rs index c63e22969..96f8cf893 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -31,18 +31,19 @@ use std::{convert::TryFrom, fmt, net::SocketAddr, str}; use actix_codec::Framed; use actix_http::{ws, Payload, RequestHead}; use actix_rt::time::timeout; -use actix_service::Service; +use actix_service::Service as _; pub use actix_http::ws::{CloseCode, CloseReason, Codec, Frame, Message}; use crate::{ + client::ClientConfig, connect::{BoxedSocket, ConnectRequest}, error::{HttpError, InvalidUrl, SendRequestError, WsClientError}, http::{ header::{self, HeaderName, HeaderValue, TryIntoHeaderValue, AUTHORIZATION}, ConnectionType, Method, StatusCode, Uri, Version, }, - ClientConfig, ClientResponse, + ClientResponse, }; #[cfg(feature = "cookies")] From 01cbfc57244bc7c0528d158dbc61492ed65a64a3 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 25 Dec 2021 02:28:23 +0000 Subject: [PATCH 4/4] reduce -http re-exports in awc --- awc/src/error.rs | 1 + awc/src/lib.rs | 17 +++++++++++++---- src/http/mod.rs | 1 + 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/awc/src/error.rs b/awc/src/error.rs index c1d855053..aa9dc4d99 100644 --- a/awc/src/error.rs +++ b/awc/src/error.rs @@ -1,5 +1,6 @@ //! HTTP client errors +// TODO: figure out how best to expose http::Error vs actix_http::Error pub use actix_http::{ error::{HttpError, PayloadError}, header::HeaderValue, diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 348d9312b..970ca2d92 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -105,6 +105,11 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +pub use actix_http::body; + +#[cfg(feature = "cookies")] +pub use cookie; + mod any_body; mod builder; mod client; @@ -118,10 +123,14 @@ mod sender; pub mod test; pub mod ws; -// TODO: hmmmmmm -pub use actix_http as http; -#[cfg(feature = "cookies")] -pub use cookie; +pub mod http { + //! Various HTTP related types. + + // TODO: figure out how best to expose http::Error vs actix_http::Error + pub use actix_http::{ + header, uri, ConnectionType, Error, Method, StatusCode, Uri, Version, + }; +} pub use self::builder::ClientBuilder; pub use self::client::{Client, Connector}; diff --git a/src/http/mod.rs b/src/http/mod.rs index bbd94a60f..2581532cd 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -2,4 +2,5 @@ pub mod header; +// TODO: figure out how best to expose http::Error vs actix_http::Error pub use actix_http::{uri, ConnectionType, Error, Method, StatusCode, Uri, Version};