diff --git a/awc/src/responses/mod.rs b/awc/src/responses/mod.rs index 8a409b0de..588ce014c 100644 --- a/awc/src/responses/mod.rs +++ b/awc/src/responses/mod.rs @@ -1,42 +1,24 @@ -use std::{ - cell::{Ref, RefMut}, - fmt, - future::Future, - io, mem, - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; +use std::{future::Future, io, pin::Pin, task::Context}; -use actix_http::{ - error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, - HttpMessage, Payload, ResponseHead, StatusCode, Version, -}; -use actix_rt::time::{sleep, Sleep}; -use bytes::Bytes; -use futures_core::Stream; -use serde::de::DeserializeOwned; - -#[cfg(feature = "cookies")] -use crate::cookie::{Cookie, ParseError as CookieParseError}; +use actix_http::error::PayloadError; +use actix_rt::time::Sleep; mod json_body; mod read_body; +mod response; mod response_body; pub use self::json_body::JsonBody; +pub use self::response::ClientResponse; #[allow(deprecated)] pub use self::response_body::{MessageBody, ResponseBody}; -/// Client Response -pub struct ClientResponse { - pub(crate) head: ResponseHead, - pub(crate) payload: Payload, - pub(crate) timeout: ResponseTimeout, -} +/// Default body size limit: 2 MiB +const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; -/// helper enum with reusable sleep passed from `SendClientResponse`. -/// See `ClientResponse::_timeout` for reason. +/// Helper enum with reusable sleep passed from `SendClientResponse`. +/// +/// See [`ClientResponse::_timeout`] for reason. pub(crate) enum ResponseTimeout { Disabled(Option>>), Enabled(Pin>), @@ -65,199 +47,3 @@ impl ResponseTimeout { } } } - -impl HttpMessage for ClientResponse { - type Stream = S; - - fn headers(&self) -> &HeaderMap { - &self.head.headers - } - - fn take_payload(&mut self) -> Payload { - mem::replace(&mut self.payload, Payload::None) - } - - fn extensions(&self) -> Ref<'_, Extensions> { - self.head.extensions() - } - - fn extensions_mut(&self) -> RefMut<'_, Extensions> { - self.head.extensions_mut() - } -} - -impl ClientResponse { - /// Create new Request instance - pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { - ClientResponse { - head, - payload, - timeout: ResponseTimeout::default(), - } - } - - #[inline] - pub(crate) fn head(&self) -> &ResponseHead { - &self.head - } - - /// Read the Request Version. - #[inline] - pub fn version(&self) -> Version { - self.head().version - } - - /// Get the status from the server. - #[inline] - pub fn status(&self) -> StatusCode { - self.head().status - } - - #[inline] - /// Returns request's headers. - pub fn headers(&self) -> &HeaderMap { - &self.head().headers - } - - /// Set a body and return previous body value - pub fn map_body(mut self, f: F) -> ClientResponse - where - F: FnOnce(&mut ResponseHead, Payload) -> Payload, - { - let payload = f(&mut self.head, self.payload); - - ClientResponse { - payload, - head: self.head, - timeout: self.timeout, - } - } - - /// Set a timeout duration for [`ClientResponse`](self::ClientResponse). - /// - /// This duration covers the duration of processing the response body stream - /// and would end it as timeout error when deadline met. - /// - /// Disabled by default. - pub fn timeout(self, dur: Duration) -> Self { - let timeout = match self.timeout { - ResponseTimeout::Disabled(Some(mut timeout)) - | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) { - Some(deadline) => { - timeout.as_mut().reset(deadline.into()); - ResponseTimeout::Enabled(timeout) - } - None => ResponseTimeout::Enabled(Box::pin(sleep(dur))), - }, - _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))), - }; - - Self { - payload: self.payload, - head: self.head, - timeout, - } - } - - /// This method does not enable timeout. It's used to pass the boxed `Sleep` from - /// `SendClientRequest` and reuse it's heap allocation together with it's slot in - /// timer wheel. - pub(crate) fn _timeout(mut self, timeout: Option>>) -> Self { - self.timeout = ResponseTimeout::Disabled(timeout); - self - } - - /// Load request cookies. - #[cfg(feature = "cookies")] - pub fn cookies(&self) -> Result>>, CookieParseError> { - struct Cookies(Vec>); - - if self.extensions().get::().is_none() { - let mut cookies = Vec::new(); - for hdr in self.headers().get_all(&header::SET_COOKIE) { - let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?; - cookies.push(Cookie::parse_encoded(s)?.into_owned()); - } - self.extensions_mut().insert(Cookies(cookies)); - } - Ok(Ref::map(self.extensions(), |ext| { - &ext.get::().unwrap().0 - })) - } - - /// Return request cookie. - #[cfg(feature = "cookies")] - pub fn cookie(&self, name: &str) -> Option> { - if let Ok(cookies) = self.cookies() { - for cookie in cookies.iter() { - if cookie.name() == name { - return Some(cookie.to_owned()); - } - } - } - None - } -} - -impl ClientResponse -where - S: Stream>, -{ - /// Loads HTTP response's body. - /// Consumes body stream and parses JSON, resolving to a deserialized `T` value. - /// - /// # Errors - /// `Future` implementation returns error if: - /// - content type is not `application/json` - /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB) - pub fn body(&mut self) -> ResponseBody { - ResponseBody::new(self) - } - - /// Consumes body stream and parses JSON, resolving to a deserialized `T` value. - /// - /// # Errors - /// Future returns error if: - /// - content type is not `application/json`; - /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). - pub fn json(&mut self) -> JsonBody { - JsonBody::new(self) - } -} - -impl Stream for ClientResponse -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - this.timeout.poll_timeout(cx)?; - - Pin::new(&mut this.payload).poll_next(cx) - } -} - -impl fmt::Debug for ClientResponse { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; - writeln!(f, " headers:")?; - for (key, val) in self.headers().iter() { - writeln!(f, " {:?}: {:?}", key, val)?; - } - Ok(()) - } -} - -/// Default body size limit: 2 MiB -const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; - -#[cfg(test)] -mod tests { - use static_assertions::assert_impl_all; - - use super::*; - - assert_impl_all!(ClientResponse: Unpin); -} diff --git a/awc/src/responses/response.rs b/awc/src/responses/response.rs new file mode 100644 index 000000000..470984341 --- /dev/null +++ b/awc/src/responses/response.rs @@ -0,0 +1,221 @@ +use std::{ + cell::{Ref, RefMut}, + fmt, mem, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use actix_http::{ + error::PayloadError, header, header::HeaderMap, BoxedPayloadStream, Extensions, + HttpMessage, Payload, ResponseHead, StatusCode, Version, +}; +use actix_rt::time::{sleep, Sleep}; +use bytes::Bytes; +use futures_core::Stream; +use serde::de::DeserializeOwned; + +#[cfg(feature = "cookies")] +use crate::cookie::{Cookie, ParseError as CookieParseError}; + +use super::{JsonBody, ResponseBody, ResponseTimeout}; + +/// Client Response +pub struct ClientResponse { + pub(crate) head: ResponseHead, + pub(crate) payload: Payload, + pub(crate) timeout: ResponseTimeout, +} + +impl ClientResponse { + /// Create new Request instance + pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { + ClientResponse { + head, + payload, + timeout: ResponseTimeout::default(), + } + } + + #[inline] + pub(crate) fn head(&self) -> &ResponseHead { + &self.head + } + + /// Read the Request Version. + #[inline] + pub fn version(&self) -> Version { + self.head().version + } + + /// Get the status from the server. + #[inline] + pub fn status(&self) -> StatusCode { + self.head().status + } + + #[inline] + /// Returns request's headers. + pub fn headers(&self) -> &HeaderMap { + &self.head().headers + } + + /// Set a body and return previous body value + pub fn map_body(mut self, f: F) -> ClientResponse + where + F: FnOnce(&mut ResponseHead, Payload) -> Payload, + { + let payload = f(&mut self.head, self.payload); + + ClientResponse { + payload, + head: self.head, + timeout: self.timeout, + } + } + + /// Set a timeout duration for [`ClientResponse`](self::ClientResponse). + /// + /// This duration covers the duration of processing the response body stream + /// and would end it as timeout error when deadline met. + /// + /// Disabled by default. + pub fn timeout(self, dur: Duration) -> Self { + let timeout = match self.timeout { + ResponseTimeout::Disabled(Some(mut timeout)) + | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) { + Some(deadline) => { + timeout.as_mut().reset(deadline.into()); + ResponseTimeout::Enabled(timeout) + } + None => ResponseTimeout::Enabled(Box::pin(sleep(dur))), + }, + _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))), + }; + + Self { + payload: self.payload, + head: self.head, + timeout, + } + } + + /// This method does not enable timeout. It's used to pass the boxed `Sleep` from + /// `SendClientRequest` and reuse it's heap allocation together with it's slot in + /// timer wheel. + pub(crate) fn _timeout(mut self, timeout: Option>>) -> Self { + self.timeout = ResponseTimeout::Disabled(timeout); + self + } + + /// Load request cookies. + #[cfg(feature = "cookies")] + pub fn cookies(&self) -> Result>>, CookieParseError> { + struct Cookies(Vec>); + + if self.extensions().get::().is_none() { + let mut cookies = Vec::new(); + for hdr in self.headers().get_all(&header::SET_COOKIE) { + let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?; + cookies.push(Cookie::parse_encoded(s)?.into_owned()); + } + self.extensions_mut().insert(Cookies(cookies)); + } + Ok(Ref::map(self.extensions(), |ext| { + &ext.get::().unwrap().0 + })) + } + + /// Return request cookie. + #[cfg(feature = "cookies")] + pub fn cookie(&self, name: &str) -> Option> { + if let Ok(cookies) = self.cookies() { + for cookie in cookies.iter() { + if cookie.name() == name { + return Some(cookie.to_owned()); + } + } + } + None + } +} + +impl ClientResponse +where + S: Stream>, +{ + /// Loads HTTP response's body. + /// Consumes body stream and parses JSON, resolving to a deserialized `T` value. + /// + /// # Errors + /// `Future` implementation returns error if: + /// - content type is not `application/json` + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB) + pub fn body(&mut self) -> ResponseBody { + ResponseBody::new(self) + } + + /// Consumes body stream and parses JSON, resolving to a deserialized `T` value. + /// + /// # Errors + /// Future returns error if: + /// - content type is not `application/json`; + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). + pub fn json(&mut self) -> JsonBody { + JsonBody::new(self) + } +} + +impl fmt::Debug for ClientResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; + writeln!(f, " headers:")?; + for (key, val) in self.headers().iter() { + writeln!(f, " {:?}: {:?}", key, val)?; + } + Ok(()) + } +} + +impl HttpMessage for ClientResponse { + type Stream = S; + + fn headers(&self) -> &HeaderMap { + &self.head.headers + } + + fn take_payload(&mut self) -> Payload { + mem::replace(&mut self.payload, Payload::None) + } + + fn extensions(&self) -> Ref<'_, Extensions> { + self.head.extensions() + } + + fn extensions_mut(&self) -> RefMut<'_, Extensions> { + self.head.extensions_mut() + } +} + +impl Stream for ClientResponse +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + this.timeout.poll_timeout(cx)?; + + Pin::new(&mut this.payload).poll_next(cx) + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::*; + + assert_impl_all!(ClientResponse: Unpin); +}