From 45d2fd429928d243d80a6bce246d2542f9e8cde7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 12 Sep 2019 10:40:56 +0600 Subject: [PATCH 1/8] export frozen request related types; refactor code layout --- awc/CHANGES.md | 7 + awc/src/frozen.rs | 235 ++++++++++++++++++++ awc/src/lib.rs | 4 + awc/src/request.rs | 535 +++++---------------------------------------- awc/src/sender.rs | 282 ++++++++++++++++++++++++ 5 files changed, 581 insertions(+), 482 deletions(-) create mode 100644 awc/src/frozen.rs create mode 100644 awc/src/sender.rs diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 4a52a9df2..94ad65ffe 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.2.6] - 2019-09-12 + +### Added + +* Export frozen request related types. + + ## [0.2.5] - 2019-09-11 ### Added diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs new file mode 100644 index 000000000..d9f65d431 --- /dev/null +++ b/awc/src/frozen.rs @@ -0,0 +1,235 @@ +use std::net; +use std::rc::Rc; +use std::time::Duration; + +use bytes::Bytes; +use futures::Stream; +use serde::Serialize; + +use actix_http::body::Body; +use actix_http::http::header::IntoHeaderValue; +use actix_http::http::{ + Error as HttpError, HeaderMap, HeaderName, HttpTryFrom, Method, Uri, +}; +use actix_http::{Error, RequestHead}; + +use crate::sender::{RequestSender, SendClientRequest}; +use crate::ClientConfig; + +/// `FrozenClientRequest` struct represents clonable client request. +/// It could be used to send same request multiple times. +#[derive(Clone)] +pub struct FrozenClientRequest { + pub(crate) head: Rc, + pub(crate) addr: Option, + pub(crate) response_decompress: bool, + pub(crate) timeout: Option, + pub(crate) config: Rc, +} + +impl FrozenClientRequest { + /// Get HTTP URI of request + pub fn get_uri(&self) -> &Uri { + &self.head.uri + } + + /// Get HTTP method of this request + pub fn get_method(&self) -> &Method { + &self.head.method + } + + /// Returns request's headers. + pub fn headers(&self) -> &HeaderMap { + &self.head.headers + } + + /// Send a body. + pub fn send_body(&self, body: B) -> SendClientRequest + where + B: Into, + { + RequestSender::Rc(self.head.clone(), None).send_body( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + body, + ) + } + + /// Send a json body. + pub fn send_json(&self, value: &T) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send_json( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + value, + ) + } + + /// Send an urlencoded body. + pub fn send_form(&self, value: &T) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send_form( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + value, + ) + } + + /// Send a streaming body. + pub fn send_stream(&self, stream: S) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + RequestSender::Rc(self.head.clone(), None).send_stream( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + stream, + ) + } + + /// Send an empty body. + pub fn send(&self) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + ) + } + + /// Create a `FrozenSendBuilder` with extra headers + pub fn extra_headers(&self, extra_headers: HeaderMap) -> FrozenSendBuilder { + FrozenSendBuilder::new(self.clone(), extra_headers) + } + + /// Create a `FrozenSendBuilder` with an extra header + pub fn extra_header(&self, key: K, value: V) -> FrozenSendBuilder + where + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + self.extra_headers(HeaderMap::new()) + .extra_header(key, value) + } +} + +/// Builder that allows to modify extra headers. +pub struct FrozenSendBuilder { + req: FrozenClientRequest, + extra_headers: HeaderMap, + err: Option, +} + +impl FrozenSendBuilder { + pub(crate) fn new(req: FrozenClientRequest, extra_headers: HeaderMap) -> Self { + Self { + req, + extra_headers, + err: None, + } + } + + /// Insert a header, it overrides existing header in `FrozenClientRequest`. + pub fn extra_header(mut self, key: K, value: V) -> Self + where + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + match HeaderName::try_from(key) { + Ok(key) => match value.try_into() { + Ok(value) => self.extra_headers.insert(key, value), + Err(e) => self.err = Some(e.into()), + }, + Err(e) => self.err = Some(e.into()), + } + self + } + + /// Complete request construction and send a body. + pub fn send_body(self, body: B) -> SendClientRequest + where + B: Into, + { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_body( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + body, + ) + } + + /// Complete request construction and send a json body. + pub fn send_json(self, value: &T) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_json( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + value, + ) + } + + /// Complete request construction and send an urlencoded body. + pub fn send_form(self, value: &T) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_form( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + value, + ) + } + + /// Complete request construction and send a streaming body. + pub fn send_stream(self, stream: S) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_stream( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + stream, + ) + } + + /// Complete request construction and send an empty body. + pub fn send(self) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + ) + } +} diff --git a/awc/src/lib.rs b/awc/src/lib.rs index da63bbd93..58c9056b2 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -33,15 +33,19 @@ use actix_http::RequestHead; mod builder; mod connect; pub mod error; +mod frozen; mod request; mod response; +mod sender; pub mod test; pub mod ws; pub use self::builder::ClientBuilder; pub use self::connect::BoxedSocket; +pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; pub use self::response::{ClientResponse, JsonBody, MessageBody}; +pub use self::sender::SendClientRequest; use self::connect::{Connect, ConnectorWrapper}; diff --git a/awc/src/request.rs b/awc/src/request.rs index 4dd07c5d8..d597a1638 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -1,29 +1,26 @@ use std::fmt::Write as FmtWrite; use std::io::Write; use std::rc::Rc; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{fmt, net}; use bytes::{BufMut, Bytes, BytesMut}; -use futures::{Async, Future, Poll, Stream, try_ready}; +use futures::Stream; use percent_encoding::percent_encode; use serde::Serialize; -use serde_json; -use tokio_timer::Delay; -use derive_more::From; -use actix_http::body::{Body, BodyStream}; +use actix_http::body::Body; use actix_http::cookie::{Cookie, CookieJar, USERINFO}; -use actix_http::encoding::Decoder; -use actix_http::http::header::{self, ContentEncoding, Header, IntoHeaderValue}; +use actix_http::http::header::{self, Header, IntoHeaderValue}; use actix_http::http::{ uri, ConnectionType, Error as HttpError, HeaderMap, HeaderName, HeaderValue, HttpTryFrom, Method, Uri, Version, }; -use actix_http::{Error, Payload, PayloadStream, RequestHead}; +use actix_http::{Error, RequestHead}; -use crate::error::{InvalidUrl, SendRequestError, FreezeRequestError}; -use crate::response::ClientResponse; +use crate::error::{FreezeRequestError, InvalidUrl}; +use crate::frozen::FrozenClientRequest; +use crate::sender::{PrepForSendingError, RequestSender, SendClientRequest}; use crate::ClientConfig; #[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))] @@ -375,6 +372,8 @@ impl ClientRequest { } } + /// Freeze request builder and construct `FrozenClientRequest`, + /// which could be used for sending same request multiple times. pub fn freeze(self) -> Result { let slf = match self.prep_for_sending() { Ok(slf) => slf, @@ -393,10 +392,7 @@ impl ClientRequest { } /// Complete request construction and send body. - pub fn send_body( - self, - body: B, - ) -> SendBody + pub fn send_body(self, body: B) -> SendClientRequest where B: Into, { @@ -405,47 +401,51 @@ impl ClientRequest { Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_body(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), body) + RequestSender::Owned(slf.head).send_body( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + body, + ) } /// Set a JSON body and generate `ClientRequest` - pub fn send_json( - self, - value: &T, - ) -> SendBody - { + pub fn send_json(self, value: &T) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_json(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value) + RequestSender::Owned(slf.head).send_json( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + value, + ) } /// Set a urlencoded body and generate `ClientRequest` /// /// `ClientRequestBuilder` can not be used after this call. - pub fn send_form( - self, - value: &T, - ) -> SendBody - { + pub fn send_form(self, value: &T) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_form(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value) + RequestSender::Owned(slf.head).send_form( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + value, + ) } /// Set an streaming body and generate `ClientRequest`. - pub fn send_stream( - self, - stream: S, - ) -> SendBody + pub fn send_stream(self, stream: S) -> SendClientRequest where S: Stream + 'static, E: Into + 'static, @@ -455,22 +455,28 @@ impl ClientRequest { Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_stream(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), stream) + RequestSender::Owned(slf.head).send_stream( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + stream, + ) } /// Set an empty body and generate `ClientRequest`. - pub fn send( - self, - ) -> SendBody - { + pub fn send(self) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref()) + RequestSender::Owned(slf.head).send( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + ) } fn prep_for_sending(mut self) -> Result { @@ -528,10 +534,10 @@ impl ClientRequest { slf = slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING) } else { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] - { - slf = slf - .set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") - } + { + slf = slf + .set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") + } }; } } @@ -555,441 +561,6 @@ impl fmt::Debug for ClientRequest { } } -#[derive(Clone)] -pub struct FrozenClientRequest { - pub(crate) head: Rc, - pub(crate) addr: Option, - pub(crate) response_decompress: bool, - pub(crate) timeout: Option, - pub(crate) config: Rc, -} - -impl FrozenClientRequest { - /// Get HTTP URI of request - pub fn get_uri(&self) -> &Uri { - &self.head.uri - } - - /// Get HTTP method of this request - pub fn get_method(&self) -> &Method { - &self.head.method - } - - /// Returns request's headers. - pub fn headers(&self) -> &HeaderMap { - &self.head.headers - } - - /// Send a body. - pub fn send_body( - &self, - body: B, - ) -> SendBody - where - B: Into, - { - RequestSender::Rc(self.head.clone(), None) - .send_body(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), body) - } - - /// Send a json body. - pub fn send_json( - &self, - value: &T, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send_json(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) - } - - /// Send an urlencoded body. - pub fn send_form( - &self, - value: &T, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send_form(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) - } - - /// Send a streaming body. - pub fn send_stream( - &self, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - RequestSender::Rc(self.head.clone(), None) - .send_stream(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), stream) - } - - /// Send an empty body. - pub fn send( - &self, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send(self.addr, self.response_decompress, self.timeout, self.config.as_ref()) - } - - /// Create a `FrozenSendBuilder` with extra headers - pub fn extra_headers(&self, extra_headers: HeaderMap) -> FrozenSendBuilder { - FrozenSendBuilder::new(self.clone(), extra_headers) - } - - /// Create a `FrozenSendBuilder` with an extra header - pub fn extra_header(&self, key: K, value: V) -> FrozenSendBuilder - where - HeaderName: HttpTryFrom, - V: IntoHeaderValue, - { - self.extra_headers(HeaderMap::new()).extra_header(key, value) - } -} - -pub struct FrozenSendBuilder { - req: FrozenClientRequest, - extra_headers: HeaderMap, - err: Option, -} - -impl FrozenSendBuilder { - pub(crate) fn new(req: FrozenClientRequest, extra_headers: HeaderMap) -> Self { - Self { - req, - extra_headers, - err: None, - } - } - - /// Insert a header, it overrides existing header in `FrozenClientRequest`. - pub fn extra_header(mut self, key: K, value: V) -> Self - where - HeaderName: HttpTryFrom, - V: IntoHeaderValue, - { - match HeaderName::try_from(key) { - Ok(key) => match value.try_into() { - Ok(value) => self.extra_headers.insert(key, value), - Err(e) => self.err = Some(e.into()), - }, - Err(e) => self.err = Some(e.into()), - } - self - } - - /// Complete request construction and send a body. - pub fn send_body( - self, - body: B, - ) -> SendBody - where - B: Into, - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_body(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), body) - } - - /// Complete request construction and send a json body. - pub fn send_json( - self, - value: &T, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_json(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), value) - } - - /// Complete request construction and send an urlencoded body. - pub fn send_form( - self, - value: &T, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_form(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), value) - } - - /// Complete request construction and send a streaming body. - pub fn send_stream( - self, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_stream(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), stream) - } - - /// Complete request construction and send an empty body. - pub fn send( - self, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref()) - } -} - -#[derive(Debug, From)] -enum PrepForSendingError { - Url(InvalidUrl), - Http(HttpError), -} - -impl Into for PrepForSendingError { - fn into(self) -> FreezeRequestError { - match self { - PrepForSendingError::Url(e) => FreezeRequestError::Url(e), - PrepForSendingError::Http(e) => FreezeRequestError::Http(e), - } - } -} - -impl Into for PrepForSendingError { - fn into(self) -> SendRequestError { - match self { - PrepForSendingError::Url(e) => SendRequestError::Url(e), - PrepForSendingError::Http(e) => SendRequestError::Http(e), - } - } -} - -pub enum SendBody -{ - Fut(Box>, Option, bool), - Err(Option), -} - -impl SendBody -{ - pub fn new( - send: Box>, - response_decompress: bool, - timeout: Option, - ) -> SendBody - { - let delay = timeout.map(|t| Delay::new(Instant::now() + t)); - SendBody::Fut(send, delay, response_decompress) - } -} - -impl Future for SendBody -{ - type Item = ClientResponse>>; - type Error = SendRequestError; - - fn poll(&mut self) -> Poll { - match self { - SendBody::Fut(send, delay, response_decompress) => { - if delay.is_some() { - match delay.poll() { - Ok(Async::NotReady) => (), - _ => return Err(SendRequestError::Timeout), - } - } - - let res = try_ready!(send.poll()) - .map_body(|head, payload| { - if *response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) - } else { - Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) - } - }); - - Ok(Async::Ready(res)) - }, - SendBody::Err(ref mut e) => { - match e.take() { - Some(e) => Err(e.into()), - None => panic!("Attempting to call completed future"), - } - } - } - } -} - - -impl From for SendBody -{ - fn from(e: SendRequestError) -> Self { - SendBody::Err(Some(e)) - } -} - -impl From for SendBody -{ - fn from(e: Error) -> Self { - SendBody::Err(Some(e.into())) - } -} - -impl From for SendBody -{ - fn from(e: HttpError) -> Self { - SendBody::Err(Some(e.into())) - } -} - -impl From for SendBody -{ - fn from(e: PrepForSendingError) -> Self { - SendBody::Err(Some(e.into())) - } -} - -#[derive(Debug)] -enum RequestSender { - Owned(RequestHead), - Rc(Rc, Option), -} - -impl RequestSender { - pub fn send_body( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - body: B, - ) -> SendBody - where - B: Into, - { - let mut connector = config.connector.borrow_mut(); - - let fut = match self { - RequestSender::Owned(head) => connector.send_request(head, body.into(), addr), - RequestSender::Rc(head, extra_headers) => connector.send_request_extra(head, extra_headers, body.into(), addr), - }; - - SendBody::new(fut, response_decompress, timeout.or_else(|| config.timeout.clone())) - } - - pub fn send_json( - mut self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - value: &T, - ) -> SendBody - { - let body = match serde_json::to_string(value) { - Ok(body) => body, - Err(e) => return Error::from(e).into(), - }; - - if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") { - return e.into(); - } - - self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))) - } - - pub fn send_form( - mut self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - value: &T, - ) -> SendBody - { - let body = match serde_urlencoded::to_string(value) { - Ok(body) => body, - Err(e) => return Error::from(e).into(), - }; - - // set content-type - if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/x-www-form-urlencoded") { - return e.into(); - } - - self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))) - } - - pub fn send_stream( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - self.send_body(addr, response_decompress, timeout, config, Body::from_message(BodyStream::new(stream))) - } - - pub fn send( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - ) -> SendBody - { - self.send_body(addr, response_decompress, timeout, config, Body::Empty) - } - - fn set_header_if_none(&mut self, key: HeaderName, value: V) -> Result<(), HttpError> - where - V: IntoHeaderValue, - { - match self { - RequestSender::Owned(head) => { - if !head.headers.contains_key(&key) { - match value.try_into() { - Ok(value) => head.headers.insert(key, value), - Err(e) => return Err(e.into()), - } - } - }, - RequestSender::Rc(head, extra_headers) => { - if !head.headers.contains_key(&key) && !extra_headers.iter().any(|h| h.contains_key(&key)) { - match value.try_into(){ - Ok(v) => { - let h = extra_headers.get_or_insert(HeaderMap::new()); - h.insert(key, v) - }, - Err(e) => return Err(e.into()), - }; - } - } - } - - Ok(()) - } -} - #[cfg(test)] mod tests { use std::time::SystemTime; diff --git a/awc/src/sender.rs b/awc/src/sender.rs new file mode 100644 index 000000000..c8e169cb1 --- /dev/null +++ b/awc/src/sender.rs @@ -0,0 +1,282 @@ +use std::net; +use std::rc::Rc; +use std::time::{Duration, Instant}; + +use bytes::Bytes; +use derive_more::From; +use futures::{try_ready, Async, Future, Poll, Stream}; +use serde::Serialize; +use serde_json; +use tokio_timer::Delay; + +use actix_http::body::{Body, BodyStream}; +use actix_http::encoding::Decoder; +use actix_http::http::header::{self, ContentEncoding, IntoHeaderValue}; +use actix_http::http::{Error as HttpError, HeaderMap, HeaderName}; +use actix_http::{Error, Payload, PayloadStream, RequestHead}; + +use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError}; +use crate::response::ClientResponse; +use crate::ClientConfig; + +#[derive(Debug, From)] +pub(crate) enum PrepForSendingError { + Url(InvalidUrl), + Http(HttpError), +} + +impl Into for PrepForSendingError { + fn into(self) -> FreezeRequestError { + match self { + PrepForSendingError::Url(e) => FreezeRequestError::Url(e), + PrepForSendingError::Http(e) => FreezeRequestError::Http(e), + } + } +} + +impl Into for PrepForSendingError { + fn into(self) -> SendRequestError { + match self { + PrepForSendingError::Url(e) => SendRequestError::Url(e), + PrepForSendingError::Http(e) => SendRequestError::Http(e), + } + } +} + +/// Future that sends request's payload and resolves to a server response. +#[must_use = "futures do nothing unless polled"] +pub enum SendClientRequest { + Fut( + Box>, + Option, + bool, + ), + Err(Option), +} + +impl SendClientRequest { + pub(crate) fn new( + send: Box>, + response_decompress: bool, + timeout: Option, + ) -> SendClientRequest { + let delay = timeout.map(|t| Delay::new(Instant::now() + t)); + SendClientRequest::Fut(send, delay, response_decompress) + } +} + +impl Future for SendClientRequest { + type Item = ClientResponse>>; + type Error = SendRequestError; + + fn poll(&mut self) -> Poll { + match self { + SendClientRequest::Fut(send, delay, response_decompress) => { + if delay.is_some() { + match delay.poll() { + Ok(Async::NotReady) => (), + _ => return Err(SendRequestError::Timeout), + } + } + + let res = try_ready!(send.poll()).map_body(|head, payload| { + if *response_decompress { + Payload::Stream(Decoder::from_headers(payload, &head.headers)) + } else { + Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) + } + }); + + Ok(Async::Ready(res)) + } + SendClientRequest::Err(ref mut e) => match e.take() { + Some(e) => Err(e.into()), + None => panic!("Attempting to call completed future"), + }, + } + } +} + +impl From for SendClientRequest { + fn from(e: SendRequestError) -> Self { + SendClientRequest::Err(Some(e)) + } +} + +impl From for SendClientRequest { + fn from(e: Error) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +impl From for SendClientRequest { + fn from(e: HttpError) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +impl From for SendClientRequest { + fn from(e: PrepForSendingError) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +#[derive(Debug)] +pub(crate) enum RequestSender { + Owned(RequestHead), + Rc(Rc, Option), +} + +impl RequestSender { + pub(crate) fn send_body( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + body: B, + ) -> SendClientRequest + where + B: Into, + { + let mut connector = config.connector.borrow_mut(); + + let fut = match self { + RequestSender::Owned(head) => { + connector.send_request(head, body.into(), addr) + } + RequestSender::Rc(head, extra_headers) => { + connector.send_request_extra(head, extra_headers, body.into(), addr) + } + }; + + SendClientRequest::new( + fut, + response_decompress, + timeout.or_else(|| config.timeout.clone()), + ) + } + + pub(crate) fn send_json( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> SendClientRequest { + let body = match serde_json::to_string(value) { + Ok(body) => body, + Err(e) => return Error::from(e).into(), + }; + + if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") + { + return e.into(); + } + + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::Bytes(Bytes::from(body)), + ) + } + + pub(crate) fn send_form( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> SendClientRequest { + let body = match serde_urlencoded::to_string(value) { + Ok(body) => body, + Err(e) => return Error::from(e).into(), + }; + + // set content-type + if let Err(e) = self.set_header_if_none( + header::CONTENT_TYPE, + "application/x-www-form-urlencoded", + ) { + return e.into(); + } + + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::Bytes(Bytes::from(body)), + ) + } + + pub(crate) fn send_stream( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + stream: S, + ) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::from_message(BodyStream::new(stream)), + ) + } + + pub(crate) fn send( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + ) -> SendClientRequest { + self.send_body(addr, response_decompress, timeout, config, Body::Empty) + } + + fn set_header_if_none( + &mut self, + key: HeaderName, + value: V, + ) -> Result<(), HttpError> + where + V: IntoHeaderValue, + { + match self { + RequestSender::Owned(head) => { + if !head.headers.contains_key(&key) { + match value.try_into() { + Ok(value) => head.headers.insert(key, value), + Err(e) => return Err(e.into()), + } + } + } + RequestSender::Rc(head, extra_headers) => { + if !head.headers.contains_key(&key) + && !extra_headers.iter().any(|h| h.contains_key(&key)) + { + match value.try_into() { + Ok(v) => { + let h = extra_headers.get_or_insert(HeaderMap::new()); + h.insert(key, v) + } + Err(e) => return Err(e.into()), + }; + } + } + } + + Ok(()) + } +} From 60b7aebd0a6de57cc480e8fdc9a755743654bde1 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 12 Sep 2019 21:52:46 +0600 Subject: [PATCH 2/8] fmt & clippy --- actix-http/Cargo.toml | 2 +- actix-http/src/client/error.rs | 2 +- actix-http/src/client/h1proto.rs | 28 +++++++++++++--------------- actix-http/src/client/h2proto.rs | 20 +++++++++++++------- actix-http/src/client/mod.rs | 2 +- actix-http/src/client/pool.rs | 2 +- actix-http/src/h1/client.rs | 10 +++++++--- actix-http/src/h1/encoder.rs | 23 +++++++++++++++-------- actix-multipart/src/server.rs | 28 ++++++++++++++++------------ awc/src/connect.rs | 21 +++++++++++++++------ awc/src/error.rs | 4 +++- awc/src/sender.rs | 4 ++-- awc/src/ws.rs | 5 ++++- src/app_service.rs | 2 +- src/middleware/mod.rs | 4 ++-- 15 files changed, 95 insertions(+), 62 deletions(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 3019b2897..cc7c885e7 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -66,7 +66,7 @@ hashbrown = "0.5.0" h2 = "0.1.16" http = "0.1.17" httparse = "1.3" -indexmap = "1.0" +indexmap = "1.2" lazy_static = "1.0" language-tags = "0.2" log = "0.4" diff --git a/actix-http/src/client/error.rs b/actix-http/src/client/error.rs index 40aef2cce..0ac5f30f5 100644 --- a/actix-http/src/client/error.rs +++ b/actix-http/src/client/error.rs @@ -147,4 +147,4 @@ impl From for SendRequestError { FreezeRequestError::Http(e) => e.into(), } } -} \ No newline at end of file +} diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index fa920ab92..b078c6a67 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -8,10 +8,10 @@ use futures::{Async, Future, Poll, Sink, Stream}; use crate::error::PayloadError; use crate::h1; +use crate::header::HeaderMap; use crate::http::header::{IntoHeaderValue, HOST}; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::{Payload, PayloadStream}; -use crate::header::HeaderMap; use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::error::{ConnectError, SendRequestError}; @@ -30,7 +30,9 @@ where B: MessageBody, { // set request host header - if !head.as_ref().headers.contains_key(HOST) && !head.extra_headers().iter().any(|h| h.contains_key(HOST)) { + if !head.as_ref().headers.contains_key(HOST) + && !head.extra_headers().iter().any(|h| h.contains_key(HOST)) + { if let Some(host) = head.as_ref().uri.host() { let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); @@ -40,20 +42,16 @@ where }; match wrt.get_mut().take().freeze().try_into() { - Ok(value) => { - match head { - RequestHeadType::Owned(ref mut head) => { - head.headers.insert(HOST, value) - }, - RequestHeadType::Rc(_, ref mut extra_headers) => { - let headers = extra_headers.get_or_insert(HeaderMap::new()); - headers.insert(HOST, value) - }, + Ok(value) => match head { + RequestHeadType::Owned(ref mut head) => { + head.headers.insert(HOST, value) } - } - Err(e) => { - log::error!("Can not set HOST header {}", e) - } + RequestHeadType::Rc(_, ref mut extra_headers) => { + let headers = extra_headers.get_or_insert(HeaderMap::new()); + headers.insert(HOST, value) + } + }, + Err(e) => log::error!("Can not set HOST header {}", e), } } } diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 2993d89d8..5744a1547 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -9,9 +9,9 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, HttpTryFrom, Method, Version}; use crate::body::{BodySize, MessageBody}; +use crate::header::HeaderMap; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; -use crate::header::HeaderMap; use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; @@ -69,15 +69,21 @@ where // Extracting extra headers from RequestHeadType. HeaderMap::new() does not allocate. let (head, extra_headers) = match head { - RequestHeadType::Owned(head) => (RequestHeadType::Owned(head), HeaderMap::new()), - RequestHeadType::Rc(head, extra_headers) => (RequestHeadType::Rc(head, None), extra_headers.unwrap_or(HeaderMap::new())), + RequestHeadType::Owned(head) => { + (RequestHeadType::Owned(head), HeaderMap::new()) + } + RequestHeadType::Rc(head, extra_headers) => ( + RequestHeadType::Rc(head, None), + extra_headers.unwrap_or_else(HeaderMap::new), + ), }; // merging headers from head and extra headers. - let headers = head.as_ref().headers.iter() - .filter(|(name, _)| { - !extra_headers.contains_key(*name) - }) + let headers = head + .as_ref() + .headers + .iter() + .filter(|(name, _)| !extra_headers.contains_key(*name)) .chain(extra_headers.iter()); // copy headers diff --git a/actix-http/src/client/mod.rs b/actix-http/src/client/mod.rs index 04427ce42..a45aebcd5 100644 --- a/actix-http/src/client/mod.rs +++ b/actix-http/src/client/mod.rs @@ -10,7 +10,7 @@ mod pool; pub use self::connection::Connection; pub use self::connector::Connector; -pub use self::error::{ConnectError, InvalidUrl, SendRequestError, FreezeRequestError}; +pub use self::error::{ConnectError, FreezeRequestError, InvalidUrl, SendRequestError}; pub use self::pool::Protocol; #[derive(Clone)] diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 24a187392..a3522ff8a 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -326,7 +326,7 @@ impl Inner { fn release_waiter(&mut self, key: &Key, token: usize) { self.waiters.remove(token); - self.waiters_queue.remove(&(key.clone(), token)); + let _ = self.waiters_queue.shift_remove(&(key.clone(), token)); } } diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index c0bbcc694..bea629c4f 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -16,9 +16,11 @@ use super::{Message, MessageType}; use crate::body::BodySize; use crate::config::ServiceConfig; use crate::error::{ParseError, PayloadError}; -use crate::helpers; -use crate::message::{ConnectionType, Head, MessagePool, RequestHead, RequestHeadType, ResponseHead}; use crate::header::HeaderMap; +use crate::helpers; +use crate::message::{ + ConnectionType, Head, MessagePool, RequestHead, RequestHeadType, ResponseHead, +}; bitflags! { struct Flags: u8 { @@ -197,7 +199,9 @@ impl Encoder for ClientCodec { Message::Item((mut head, length)) => { let inner = &mut self.inner; inner.version = head.as_ref().version; - inner.flags.set(Flags::HEAD, head.as_ref().method == Method::HEAD); + inner + .flags + .set(Flags::HEAD, head.as_ref().method == Method::HEAD); // connection status inner.ctype = match head.as_ref().connection_type() { diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 380dfe328..51ea497e0 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -2,9 +2,9 @@ use std::fmt::Write as FmtWrite; use std::io::Write; use std::marker::PhantomData; +use std::rc::Rc; use std::str::FromStr; use std::{cmp, fmt, io, mem}; -use std::rc::Rc; use bytes::{BufMut, Bytes, BytesMut}; @@ -16,7 +16,7 @@ use crate::http::header::{ HeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, }; use crate::http::{HeaderMap, Method, StatusCode, Version}; -use crate::message::{ConnectionType, Head, RequestHead, ResponseHead, RequestHeadType}; +use crate::message::{ConnectionType, Head, RequestHead, RequestHeadType, ResponseHead}; use crate::request::Request; use crate::response::Response; @@ -134,10 +134,11 @@ pub(crate) trait MessageType: Sized { // merging headers from head and extra headers. HeaderMap::new() does not allocate. let empty_headers = HeaderMap::new(); let extra_headers = self.extra_headers().unwrap_or(&empty_headers); - let headers = self.headers().inner.iter() - .filter(|(name, _)| { - !extra_headers.contains_key(*name) - }) + let headers = self + .headers() + .inner + .iter() + .filter(|(name, _)| !extra_headers.contains_key(*name)) .chain(extra_headers.inner.iter()); // write headers @@ -604,10 +605,16 @@ mod tests { let mut bytes = BytesMut::with_capacity(2048); let mut head = RequestHead::default(); - head.headers.insert(AUTHORIZATION, HeaderValue::from_static("some authorization")); + head.headers.insert( + AUTHORIZATION, + HeaderValue::from_static("some authorization"), + ); let mut extra_headers = HeaderMap::new(); - extra_headers.insert(AUTHORIZATION,HeaderValue::from_static("another authorization")); + extra_headers.insert( + AUTHORIZATION, + HeaderValue::from_static("another authorization"), + ); extra_headers.insert(DATE, HeaderValue::from_static("date")); let mut head = RequestHeadType::Rc(Rc::new(head), Some(extra_headers)); diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 3312a580a..a7c787f46 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -178,13 +178,15 @@ impl InnerMultipart { Some(chunk) => { if chunk.len() < boundary.len() + 4 || &chunk[..2] != b"--" - || &chunk[2..boundary.len() + 2] != boundary.as_bytes() { + || &chunk[2..boundary.len() + 2] != boundary.as_bytes() + { Err(MultipartError::Boundary) } else if &chunk[boundary.len() + 2..] == b"\r\n" { Ok(Some(false)) } else if &chunk[boundary.len() + 2..boundary.len() + 4] == b"--" && (chunk.len() == boundary.len() + 4 - || &chunk[boundary.len() + 4..] == b"\r\n") { + || &chunk[boundary.len() + 4..] == b"\r\n") + { Ok(Some(true)) } else { Err(MultipartError::Boundary) @@ -781,8 +783,10 @@ impl PayloadBuffer { /// Read bytes until new line delimiter or eof pub fn readline_or_eof(&mut self) -> Result, MultipartError> { match self.readline() { - Err(MultipartError::Incomplete) if self.eof => Ok(Some(self.buf.take().freeze())), - line => line + Err(MultipartError::Incomplete) if self.eof => { + Ok(Some(self.buf.take().freeze())) + } + line => line, } } @@ -859,14 +863,14 @@ mod tests { fn create_simple_request_with_header() -> (Bytes, HeaderMap) { let bytes = Bytes::from( "testasdadsad\r\n\ - --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ - Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\ - Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\ - test\r\n\ - --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ - Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\ - data\r\n\ - --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n" + --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"fn.txt\"\r\n\ + Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\ + test\r\n\ + --abbc761f78ff4d7cb7573b5a23f96ef0\r\n\ + Content-Type: text/plain; charset=utf-8\r\nContent-Length: 4\r\n\r\n\ + data\r\n\ + --abbc761f78ff4d7cb7573b5a23f96ef0--\r\n", ); let mut headers = HeaderMap::new(); headers.insert( diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 82fd6a759..97864d300 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,5 +1,5 @@ -use std::{fmt, io, net}; use std::rc::Rc; +use std::{fmt, io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::Body; @@ -7,8 +7,8 @@ use actix_http::client::{ Connect as ClientConnect, ConnectError, Connection, SendRequestError, }; use actix_http::h1::ClientCodec; -use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_http::http::HeaderMap; +use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_service::Service; use futures::{Future, Poll}; @@ -82,7 +82,9 @@ where }) .from_err() // send request - .and_then(move |connection| connection.send_request(RequestHeadType::from(head), body)) + .and_then(move |connection| { + connection.send_request(RequestHeadType::from(head), body) + }) .map(|(head, payload)| ClientResponse::new(head, payload)), ) } @@ -103,7 +105,10 @@ where }) .from_err() // send request - .and_then(move |connection| connection.send_request(RequestHeadType::Rc(head, extra_headers), body)) + .and_then(move |connection| { + connection + .send_request(RequestHeadType::Rc(head, extra_headers), body) + }) .map(|(head, payload)| ClientResponse::new(head, payload)), ) } @@ -127,7 +132,9 @@ where }) .from_err() // send request - .and_then(move |connection| connection.open_tunnel(RequestHeadType::from(head))) + .and_then(move |connection| { + connection.open_tunnel(RequestHeadType::from(head)) + }) .map(|(head, framed)| { let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); (head, framed) @@ -155,7 +162,9 @@ where }) .from_err() // send request - .and_then(move |connection| connection.open_tunnel(RequestHeadType::Rc(head, extra_headers))) + .and_then(move |connection| { + connection.open_tunnel(RequestHeadType::Rc(head, extra_headers)) + }) .map(|(head, framed)| { let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); (head, framed) diff --git a/awc/src/error.rs b/awc/src/error.rs index 4eb929007..eb8d03e2b 100644 --- a/awc/src/error.rs +++ b/awc/src/error.rs @@ -1,5 +1,7 @@ //! Http client errors -pub use actix_http::client::{ConnectError, InvalidUrl, SendRequestError, FreezeRequestError}; +pub use actix_http::client::{ + ConnectError, FreezeRequestError, InvalidUrl, SendRequestError, +}; pub use actix_http::error::PayloadError; pub use actix_http::ws::HandshakeError as WsHandshakeError; pub use actix_http::ws::ProtocolError as WsProtocolError; diff --git a/awc/src/sender.rs b/awc/src/sender.rs index c8e169cb1..95109b92d 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -90,7 +90,7 @@ impl Future for SendClientRequest { Ok(Async::Ready(res)) } SendClientRequest::Err(ref mut e) => match e.take() { - Some(e) => Err(e.into()), + Some(e) => Err(e), None => panic!("Attempting to call completed future"), }, } @@ -153,7 +153,7 @@ impl RequestSender { SendClientRequest::new( fut, response_decompress, - timeout.or_else(|| config.timeout.clone()), + timeout.or_else(|| config.timeout), ) } diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 67be9e9d8..77cbc7ca4 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -234,7 +234,10 @@ impl WebsocketsRequest { } if !self.head.headers.contains_key(header::HOST) { - self.head.headers.insert(header::HOST, HeaderValue::from_str(uri.host().unwrap()).unwrap()); + self.head.headers.insert( + header::HOST, + HeaderValue::from_str(uri.host().unwrap()).unwrap(), + ); } // set cookies diff --git a/src/app_service.rs b/src/app_service.rs index 736c35010..513b4aa4b 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -169,7 +169,7 @@ where match self.data_factories_fut[idx].poll()? { Async::Ready(f) => { self.data_factories.push(f); - self.data_factories_fut.remove(idx); + let _ = self.data_factories_fut.remove(idx); } Async::NotReady => idx += 1, } diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 311d0ee99..84e0758bf 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -2,13 +2,13 @@ mod compress; pub use self::compress::{BodyEncoding, Compress}; +mod condition; mod defaultheaders; pub mod errhandlers; mod logger; mod normalize; -mod condition; +pub use self::condition::Condition; pub use self::defaultheaders::DefaultHeaders; pub use self::logger::Logger; pub use self::normalize::NormalizePath; -pub use self::condition::Condition; From e35d930ef9d2c7c33a36cd9760f44b0b3bb2d6f5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 12 Sep 2019 21:58:08 +0600 Subject: [PATCH 3/8] prepare releases --- actix-multipart/CHANGES.md | 3 ++- actix-multipart/Cargo.toml | 2 +- awc/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index 365dca286..ca61176c7 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,5 +1,6 @@ # Changes -## [0.1.4] - 2019-xx-xx + +## [0.1.4] - 2019-09-12 * Multipart handling now parses requests which do not end in CRLF #1038 diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index b26681e25..2168c259a 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-multipart" -version = "0.1.3" +version = "0.1.4" authors = ["Nikolay Kim "] description = "Multipart support for actix web framework." readme = "README.md" diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 3d15c943e..3a86193c6 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "0.2.5" +version = "0.2.6" authors = ["Nikolay Kim "] description = "Actix http client." readme = "README.md" From a32573bb58727059afa470bf5d596b03f6616b7e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 13 Sep 2019 11:56:24 +0600 Subject: [PATCH 4/8] Allow to re-construct ServiceRequest from HttpRequest and Payload #1088 --- CHANGES.md | 8 +++---- actix-http/CHANGES.md | 2 ++ src/service.rs | 49 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f37f8b466..2a2e2e414 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,13 +1,13 @@ # Changes -## not released yet + +## [1.0.8] - 2019-09-xx ### Added -* Add `middleware::Conditon` that conditionally enables another middleware +* Add `middleware::Conditon` that conditionally enables another middleware -### Fixed +* Allow to re-construct `ServiceRequest` from `HttpRequest` and `Payload` -* h2 will use error response #1080 ## [1.0.7] - 2019-08-29 diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index d603cde7b..849839378 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -8,6 +8,8 @@ ### Fixed +* h2 will use error response #1080 + * on_connect result isn't added to request extensions for http2 requests #1009 diff --git a/src/service.rs b/src/service.rs index 1d475cf15..8b94dd284 100644 --- a/src/service.rs +++ b/src/service.rs @@ -68,6 +68,34 @@ impl ServiceRequest { (self.0, pl) } + /// Construct request from parts. + /// + /// `ServiceRequest` can be re-constructed only if `req` hasnt been cloned. + pub fn from_parts( + mut req: HttpRequest, + pl: Payload, + ) -> Result { + if Rc::strong_count(&req.0) == 1 && Rc::weak_count(&req.0) == 0 { + Rc::get_mut(&mut req.0).unwrap().payload = pl; + Ok(ServiceRequest(req)) + } else { + Err((req, pl)) + } + } + + /// Construct request from request. + /// + /// `HttpRequest` implements `Clone` trait via `Rc` type. `ServiceRequest` + /// can be re-constructed only if rc's strong pointers count eq 1 and + /// weak pointers count is 0. + pub fn from_request(req: HttpRequest) -> Result { + if Rc::strong_count(&req.0) == 1 && Rc::weak_count(&req.0) == 0 { + Ok(ServiceRequest(req)) + } else { + Err(req) + } + } + /// Create service response #[inline] pub fn into_response>>(self, res: R) -> ServiceResponse { @@ -514,6 +542,27 @@ mod tests { use crate::test::{call_service, init_service, TestRequest}; use crate::{guard, http, web, App, HttpResponse}; + #[test] + fn test_service_request() { + let req = TestRequest::default().to_srv_request(); + let (r, pl) = req.into_parts(); + assert!(ServiceRequest::from_parts(r, pl).is_ok()); + + let req = TestRequest::default().to_srv_request(); + let (r, pl) = req.into_parts(); + let _r2 = r.clone(); + assert!(ServiceRequest::from_parts(r, pl).is_err()); + + let req = TestRequest::default().to_srv_request(); + let (r, _pl) = req.into_parts(); + assert!(ServiceRequest::from_request(r).is_ok()); + + let req = TestRequest::default().to_srv_request(); + let (r, _pl) = req.into_parts(); + let _r2 = r.clone(); + assert!(ServiceRequest::from_request(r).is_err()); + } + #[test] fn test_service() { let mut srv = init_service( From c1f99e0775b986d57244e4ef20faa8982f0dac88 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Mon, 16 Sep 2019 07:52:23 +0900 Subject: [PATCH 5/8] Remove `mem::uninitialized()` (#1090) --- actix-http/src/client/connector.rs | 2 +- actix-http/src/h1/decoder.rs | 13 ++++++++----- actix-http/src/helpers.rs | 2 +- actix-http/src/test.rs | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 98e8618c3..d92441f25 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -212,7 +212,7 @@ where pub fn finish( self, ) -> impl Service - + Clone { + + Clone { #[cfg(not(any(feature = "ssl", feature = "rust-tls")))] { let connector = TimeoutService::new( diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index c7ef065a5..ce113a145 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -1,5 +1,6 @@ +use std::io; use std::marker::PhantomData; -use std::{io, mem}; +use std::mem::MaybeUninit; use actix_codec::Decoder; use bytes::{Bytes, BytesMut}; @@ -186,11 +187,12 @@ impl MessageType for Request { fn decode(src: &mut BytesMut) -> Result, ParseError> { // Unsafe: we read only this data only after httparse parses headers into. // performance bump for pipeline benchmarks. - let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() }; + let mut headers: [HeaderIndex; MAX_HEADERS] = + unsafe { MaybeUninit::uninit().assume_init() }; let (len, method, uri, ver, h_len) = { let mut parsed: [httparse::Header; MAX_HEADERS] = - unsafe { mem::uninitialized() }; + unsafe { MaybeUninit::uninit().assume_init() }; let mut req = httparse::Request::new(&mut parsed); match req.parse(src)? { @@ -260,11 +262,12 @@ impl MessageType for ResponseHead { fn decode(src: &mut BytesMut) -> Result, ParseError> { // Unsafe: we read only this data only after httparse parses headers into. // performance bump for pipeline benchmarks. - let mut headers: [HeaderIndex; MAX_HEADERS] = unsafe { mem::uninitialized() }; + let mut headers: [HeaderIndex; MAX_HEADERS] = + unsafe { MaybeUninit::uninit().assume_init() }; let (len, ver, status, h_len) = { let mut parsed: [httparse::Header; MAX_HEADERS] = - unsafe { mem::uninitialized() }; + unsafe { MaybeUninit::uninit().assume_init() }; let mut res = httparse::Response::new(&mut parsed); match res.parse(src)? { diff --git a/actix-http/src/helpers.rs b/actix-http/src/helpers.rs index e4583ee37..84403d8fd 100644 --- a/actix-http/src/helpers.rs +++ b/actix-http/src/helpers.rs @@ -115,7 +115,7 @@ pub fn write_content_length(mut n: usize, bytes: &mut BytesMut) { pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) { let mut curr: isize = 39; - let mut buf: [u8; 41] = unsafe { mem::uninitialized() }; + let mut buf: [u8; 41] = unsafe { mem::MaybeUninit::uninit().assume_init() }; buf[39] = b'\r'; buf[40] = b'\n'; let buf_ptr = buf.as_mut_ptr(); diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index ce81a54d5..ed5b81a35 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -150,7 +150,7 @@ impl TestRequest { /// Complete request creation and generate `Request` instance pub fn finish(&mut self) -> Request { - let inner = self.0.take().expect("cannot reuse test request builder");; + let inner = self.0.take().expect("cannot reuse test request builder"); let mut req = if let Some(pl) = inner.payload { Request::with_payload(pl) From 7c9f9afc46d2bf1c0a69826b2de41fa4f59f97a9 Mon Sep 17 00:00:00 2001 From: nWacky <38620459+nWacky@users.noreply.github.com> Date: Tue, 17 Sep 2019 03:57:39 +0300 Subject: [PATCH 6/8] Add ability to use `Infallible` as `HttpResponse` error type (#1093) * Add `std::convert::Infallible` implementantion for `ResponseError` * Add from `std::convert::Infallible` to `Error` * Remove `ResponseError` implementantion for `Infallible` * Remove useless docs * Better comment * Update changelog * Update actix_http::changelog --- actix-http/CHANGES.md | 3 +++ actix-http/src/error.rs | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 849839378..6820626f5 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,6 +6,9 @@ * Add support for sending HTTP requests with `Rc` in addition to sending HTTP requests with `RequestHead` +* Allow to use `std::convert::Infallible` as `actix_http::error::Error` + + ### Fixed * h2 will use error response #1080 diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 2c01c86db..90c35e486 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -132,6 +132,14 @@ impl std::error::Error for Error { } } +impl From for Error { + fn from(_: std::convert::Infallible) -> Self { + // `std::convert::Infallible` indicates an error + // that will never happen + unreachable!() + } +} + /// Convert `Error` to a `Response` instance impl From for Response { fn from(err: Error) -> Self { From 32a1c365975acffb25959f5887a6530396788504 Mon Sep 17 00:00:00 2001 From: Jos van den Oever Date: Tue, 17 Sep 2019 02:58:04 +0200 Subject: [PATCH 7/8] Make UrlencodedError::Overflow more informative (#1089) --- CHANGES.md | 1 + src/error.rs | 13 +++++++++---- src/types/form.rs | 16 +++++++++++----- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2a2e2e414..95f8b75e1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ * Allow to re-construct `ServiceRequest` from `HttpRequest` and `Payload` +* Make UrlEncodedError::Overflow more informativve ## [1.0.7] - 2019-08-29 diff --git a/src/error.rs b/src/error.rs index 9f31582ed..a60276a7a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -32,8 +32,12 @@ pub enum UrlencodedError { #[display(fmt = "Can not decode chunked transfer encoding")] Chunked, /// Payload size is bigger than allowed. (default: 256kB) - #[display(fmt = "Urlencoded payload size is bigger than allowed (default: 256kB)")] - Overflow, + #[display( + fmt = "Urlencoded payload size is bigger ({} bytes) than allowed (default: {} bytes)", + size, + limit + )] + Overflow { size: usize, limit: usize }, /// Payload size is now known #[display(fmt = "Payload size is now known")] UnknownLength, @@ -52,7 +56,7 @@ pub enum UrlencodedError { impl ResponseError for UrlencodedError { fn error_response(&self) -> HttpResponse { match *self { - UrlencodedError::Overflow => { + UrlencodedError::Overflow { .. } => { HttpResponse::new(StatusCode::PAYLOAD_TOO_LARGE) } UrlencodedError::UnknownLength => { @@ -164,7 +168,8 @@ mod tests { #[test] fn test_urlencoded_error() { - let resp: HttpResponse = UrlencodedError::Overflow.error_response(); + let resp: HttpResponse = + UrlencodedError::Overflow { size: 0, limit: 0 }.error_response(); assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); let resp: HttpResponse = UrlencodedError::UnknownLength.error_response(); assert_eq!(resp.status(), StatusCode::LENGTH_REQUIRED); diff --git a/src/types/form.rs b/src/types/form.rs index 9ab98b17b..3bc067ab5 100644 --- a/src/types/form.rs +++ b/src/types/form.rs @@ -318,7 +318,7 @@ where let limit = self.limit; if let Some(len) = self.length.take() { if len > limit { - return Err(UrlencodedError::Overflow); + return Err(UrlencodedError::Overflow { size: len, limit }); } } @@ -331,7 +331,10 @@ where .from_err() .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { - Err(UrlencodedError::Overflow) + Err(UrlencodedError::Overflow { + size: body.len() + chunk.len(), + limit, + }) } else { body.extend_from_slice(&chunk); Ok(body) @@ -390,8 +393,8 @@ mod tests { fn eq(err: UrlencodedError, other: UrlencodedError) -> bool { match err { - UrlencodedError::Overflow => match other { - UrlencodedError::Overflow => true, + UrlencodedError::Overflow { .. } => match other { + UrlencodedError::Overflow { .. } => true, _ => false, }, UrlencodedError::UnknownLength => match other { @@ -420,7 +423,10 @@ mod tests { .header(CONTENT_LENGTH, "1000000") .to_http_parts(); let info = block_on(UrlEncoded::::new(&req, &mut pl)); - assert!(eq(info.err().unwrap(), UrlencodedError::Overflow)); + assert!(eq( + info.err().unwrap(), + UrlencodedError::Overflow { size: 0, limit: 0 } + )); let (req, mut pl) = TestRequest::with_header(CONTENT_TYPE, "text/plain") .header(CONTENT_LENGTH, "10") From e4503046de1263148e1b56394144b1828bbfdac0 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 17 Sep 2019 21:45:06 +0600 Subject: [PATCH 8/8] Do not override current System --- test-server/CHANGES.md | 5 +++ test-server/Cargo.toml | 8 ++--- test-server/src/lib.rs | 81 +++++++++++++++++++++--------------------- 3 files changed, 50 insertions(+), 44 deletions(-) diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index c3fe5b285..798dbf506 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -1,10 +1,15 @@ # Changes +## [0.2.5] - 2019-0917 ### Changed * Update serde_urlencoded to "0.6.1" +### Fixed + +* Do not override current `System` + ## [0.2.4] - 2019-07-18 diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 22809c060..77301b0a9 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http-test" -version = "0.2.4" +version = "0.2.5" authors = ["Nikolay Kim "] description = "Actix http test server" readme = "README.md" @@ -35,7 +35,7 @@ actix-rt = "0.2.2" actix-service = "0.4.1" actix-server = "0.6.0" actix-utils = "0.4.1" -awc = "0.2.2" +awc = "0.2.6" actix-connect = "0.2.2" base64 = "0.10" @@ -56,5 +56,5 @@ tokio-timer = "0.2" openssl = { version="0.10", optional = true } [dev-dependencies] -actix-web = "1.0.0" -actix-http = "0.2.4" +actix-web = "1.0.7" +actix-http = "0.2.9" diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index aba53980c..a2366bf48 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -103,8 +103,8 @@ pub struct TestServer; /// Test server controller pub struct TestServerRuntime { addr: net::SocketAddr, - rt: Runtime, client: Client, + system: System, } impl TestServer { @@ -130,45 +130,47 @@ impl TestServer { }); let (system, addr) = rx.recv().unwrap(); - let mut rt = Runtime::new().unwrap(); - let client = rt - .block_on(lazy(move || { - let connector = { - #[cfg(feature = "ssl")] - { - use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; + let client = block_on(lazy(move || { + let connector = { + #[cfg(feature = "ssl")] + { + use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; - let mut builder = - SslConnector::builder(SslMethod::tls()).unwrap(); - builder.set_verify(SslVerifyMode::NONE); - let _ = builder.set_alpn_protos(b"\x02h2\x08http/1.1").map_err( - |e| log::error!("Can not set alpn protocol: {:?}", e), - ); - Connector::new() - .conn_lifetime(time::Duration::from_secs(0)) - .timeout(time::Duration::from_millis(500)) - .ssl(builder.build()) - .finish() - } - #[cfg(not(feature = "ssl"))] - { - Connector::new() - .conn_lifetime(time::Duration::from_secs(0)) - .timeout(time::Duration::from_millis(500)) - .finish() - } - }; + let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); + builder.set_verify(SslVerifyMode::NONE); + let _ = builder + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); + Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) + .timeout(time::Duration::from_millis(500)) + .ssl(builder.build()) + .finish() + } + #[cfg(not(feature = "ssl"))] + { + Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) + .timeout(time::Duration::from_millis(500)) + .finish() + } + }; - Ok::(Client::build().connector(connector).finish()) - })) - .unwrap(); - rt.block_on(lazy( + Ok::(Client::build().connector(connector).finish()) + })) + .unwrap(); + + block_on(lazy( || Ok::<_, ()>(actix_connect::start_default_resolver()), )) .unwrap(); - System::set_current(system); - TestServerRuntime { addr, rt, client } + + TestServerRuntime { + addr, + client, + system, + } } /// Get first available unused address @@ -188,7 +190,7 @@ impl TestServerRuntime { where F: Future, { - self.rt.block_on(fut) + block_on(fut) } /// Execute future on current core @@ -197,7 +199,7 @@ impl TestServerRuntime { F: FnOnce() -> R, R: Future, { - self.rt.block_on(lazy(f)) + block_on(lazy(f)) } /// Execute function on current core @@ -205,7 +207,7 @@ impl TestServerRuntime { where F: FnOnce() -> R, { - self.rt.block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap() + block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap() } /// Construct test server url @@ -324,8 +326,7 @@ impl TestServerRuntime { { let url = self.url(path); let connect = self.client.ws(url).connect(); - self.rt - .block_on(lazy(move || connect.map(|(_, framed)| framed))) + block_on(lazy(move || connect.map(|(_, framed)| framed))) } /// Connect to a websocket server @@ -338,7 +339,7 @@ impl TestServerRuntime { /// Stop http server fn stop(&mut self) { - System::current().stop(); + self.system.stop(); } }