Initial commit

This commit is contained in:
Dmitry Pypin 2019-07-09 11:01:03 -07:00
parent f410f3330f
commit 5ae6fc261e
8 changed files with 88 additions and 44 deletions

View File

@ -1,4 +1,5 @@
use std::{fmt, io, time}; use std::{fmt, io, time};
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
@ -10,6 +11,7 @@ use crate::body::MessageBody;
use crate::h1::ClientCodec; use crate::h1::ClientCodec;
use crate::message::{RequestHead, ResponseHead}; use crate::message::{RequestHead, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use crate::header::HeaderMap;
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::{Acquired, Protocol}; use super::pool::{Acquired, Protocol};
@ -29,7 +31,8 @@ pub trait Connection {
/// Send request and body /// Send request and body
fn send_request<B: MessageBody + 'static>( fn send_request<B: MessageBody + 'static>(
self, self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: B, body: B,
) -> Self::Future; ) -> Self::Future;
@ -39,7 +42,10 @@ pub trait Connection {
>; >;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture; fn open_tunnel(self,
head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
) -> Self::TunnelFuture;
} }
pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
@ -106,13 +112,15 @@ where
fn send_request<B: MessageBody + 'static>( fn send_request<B: MessageBody + 'static>(
mut self, mut self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: B, body: B,
) -> Self::Future { ) -> Self::Future {
match self.io.take().unwrap() { match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::new(h1proto::send_request( ConnectionType::H1(io) => Box::new(h1proto::send_request(
io, io,
head, head,
additional_headers,
body, body,
self.created, self.created,
self.pool, self.pool,
@ -120,6 +128,7 @@ where
ConnectionType::H2(io) => Box::new(h2proto::send_request( ConnectionType::H2(io) => Box::new(h2proto::send_request(
io, io,
head, head,
additional_headers,
body, body,
self.created, self.created,
self.pool, self.pool,
@ -138,10 +147,10 @@ where
>; >;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel(mut self, head: RequestHead) -> Self::TunnelFuture { fn open_tunnel(mut self, head: Rc<RequestHead>, additional_headers: Option<HeaderMap>) -> Self::TunnelFuture {
match self.io.take().unwrap() { match self.io.take().unwrap() {
ConnectionType::H1(io) => { ConnectionType::H1(io) => {
Either::A(Box::new(h1proto::open_tunnel(io, head))) Either::A(Box::new(h1proto::open_tunnel(io, head, additional_headers)))
} }
ConnectionType::H2(io) => { ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() { if let Some(mut pool) = self.pool.take() {
@ -180,12 +189,13 @@ where
fn send_request<RB: MessageBody + 'static>( fn send_request<RB: MessageBody + 'static>(
self, self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: RB, body: RB,
) -> Self::Future { ) -> Self::Future {
match self { match self {
EitherConnection::A(con) => con.send_request(head, body), EitherConnection::A(con) => con.send_request(head, additional_headers, body),
EitherConnection::B(con) => con.send_request(head, body), EitherConnection::B(con) => con.send_request(head, additional_headers, body),
} }
} }
@ -197,14 +207,14 @@ where
>; >;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel(self, head: RequestHead) -> Self::TunnelFuture { fn open_tunnel(self, head: Rc<RequestHead>, additional_headers: Option<HeaderMap>) -> Self::TunnelFuture {
match self { match self {
EitherConnection::A(con) => Box::new( EitherConnection::A(con) => Box::new(
con.open_tunnel(head) con.open_tunnel(head, additional_headers)
.map(|(head, framed)| (head, framed.map_io(EitherIo::A))), .map(|(head, framed)| (head, framed.map_io(EitherIo::A))),
), ),
EitherConnection::B(con) => Box::new( EitherConnection::B(con) => Box::new(
con.open_tunnel(head) con.open_tunnel(head, additional_headers)
.map(|(head, framed)| (head, framed.map_io(EitherIo::B))), .map(|(head, framed)| (head, framed.map_io(EitherIo::B))),
), ),
} }

View File

@ -1,5 +1,6 @@
use std::io::Write; use std::io::Write;
use std::{io, time}; use std::{io, time};
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
@ -11,6 +12,7 @@ use crate::h1;
use crate::http::header::{IntoHeaderValue, HOST}; use crate::http::header::{IntoHeaderValue, HOST};
use crate::message::{RequestHead, ResponseHead}; use crate::message::{RequestHead, ResponseHead};
use crate::payload::{Payload, PayloadStream}; use crate::payload::{Payload, PayloadStream};
use crate::header::HeaderMap;
use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
use super::error::{ConnectError, SendRequestError}; use super::error::{ConnectError, SendRequestError};
@ -19,7 +21,8 @@ use crate::body::{BodySize, MessageBody};
pub(crate) fn send_request<T, B>( pub(crate) fn send_request<T, B>(
io: T, io: T,
mut head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Option<Acquired<T>>,
@ -29,7 +32,7 @@ where
B: MessageBody, B: MessageBody,
{ {
// set request host header // set request host header
if !head.headers.contains_key(HOST) { let additional_headers = if !head.headers.contains_key(HOST) && !additional_headers.iter().any(|h| h.contains_key(HOST)) {
if let Some(host) = head.uri.host() { if let Some(host) = head.uri.host() {
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer(); let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
@ -40,14 +43,23 @@ where
match wrt.get_mut().take().freeze().try_into() { match wrt.get_mut().take().freeze().try_into() {
Ok(value) => { Ok(value) => {
head.headers.insert(HOST, value); let mut headers = additional_headers.unwrap_or(HeaderMap::new());
headers.insert(HOST, value);
Some(headers)
} }
Err(e) => { Err(e) => {
log::error!("Can not set HOST header {}", e); log::error!("Can not set HOST header {}", e);
additional_headers
} }
} }
} }
else {
additional_headers
}
} }
else {
additional_headers
};
let io = H1Connection { let io = H1Connection {
created, created,
@ -59,7 +71,7 @@ where
// create Framed and send reqest // create Framed and send reqest
Framed::new(io, h1::ClientCodec::default()) Framed::new(io, h1::ClientCodec::default())
.send((head, len).into()) .send((head, additional_headers, len).into())
.from_err() .from_err()
// send request body // send request body
.and_then(move |framed| match body.size() { .and_then(move |framed| match body.size() {
@ -95,14 +107,15 @@ where
pub(crate) fn open_tunnel<T>( pub(crate) fn open_tunnel<T>(
io: T, io: T,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
) -> impl Future<Item = (ResponseHead, Framed<T, h1::ClientCodec>), Error = SendRequestError> ) -> impl Future<Item = (ResponseHead, Framed<T, h1::ClientCodec>), Error = SendRequestError>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
{ {
// create Framed and send reqest // create Framed and send reqest
Framed::new(io, h1::ClientCodec::default()) Framed::new(io, h1::ClientCodec::default())
.send((head, BodySize::None).into()) .send((head, additional_headers, BodySize::None).into())
.from_err() .from_err()
// read response // read response
.and_then(|framed| { .and_then(|framed| {

View File

@ -1,4 +1,5 @@
use std::time; use std::time;
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use bytes::Bytes; use bytes::Bytes;
@ -11,6 +12,7 @@ use http::{request::Request, HttpTryFrom, Method, Version};
use crate::body::{BodySize, MessageBody}; use crate::body::{BodySize, MessageBody};
use crate::message::{RequestHead, ResponseHead}; use crate::message::{RequestHead, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use crate::header::HeaderMap;
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
use super::error::SendRequestError; use super::error::SendRequestError;
@ -18,7 +20,8 @@ use super::pool::Acquired;
pub(crate) fn send_request<T, B>( pub(crate) fn send_request<T, B>(
io: SendRequest<Bytes>, io: SendRequest<Bytes>,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: B, body: B,
created: time::Instant, created: time::Instant,
pool: Option<Acquired<T>>, pool: Option<Acquired<T>>,
@ -39,8 +42,8 @@ where
.map_err(SendRequestError::from) .map_err(SendRequestError::from)
.and_then(move |mut io| { .and_then(move |mut io| {
let mut req = Request::new(()); let mut req = Request::new(());
*req.uri_mut() = head.uri; *req.uri_mut() = head.uri.clone();
*req.method_mut() = head.method; *req.method_mut() = head.method.clone();
*req.version_mut() = Version::HTTP_2; *req.version_mut() = Version::HTTP_2;
let mut skip_len = true; let mut skip_len = true;
@ -66,8 +69,16 @@ where
), ),
}; };
// merging headers from head and additional headers. HeaderMap::new() does not allocate.
let additional_headers = additional_headers.unwrap_or(HeaderMap::new());
let headers = head.headers.iter()
.filter(|(name, _)| {
!additional_headers.contains_key(*name)
})
.chain(additional_headers.iter());
// copy headers // copy headers
for (key, value) in head.headers.iter() { for (key, value) in headers {
match *key { match *key {
CONNECTION | TRANSFER_ENCODING => continue, // http2 specific CONNECTION | TRANSFER_ENCODING => continue, // http2 specific
CONTENT_LENGTH if skip_len => continue, CONTENT_LENGTH if skip_len => continue,

View File

@ -1,5 +1,6 @@
#![allow(unused_imports, unused_variables, dead_code)] #![allow(unused_imports, unused_variables, dead_code)]
use std::io::{self, Write}; use std::io::{self, Write};
use std::rc::Rc;
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use bitflags::bitflags; use bitflags::bitflags;
@ -17,6 +18,7 @@ use crate::config::ServiceConfig;
use crate::error::{ParseError, PayloadError}; use crate::error::{ParseError, PayloadError};
use crate::helpers; use crate::helpers;
use crate::message::{ConnectionType, Head, MessagePool, RequestHead, ResponseHead}; use crate::message::{ConnectionType, Head, MessagePool, RequestHead, ResponseHead};
use crate::header::HeaderMap;
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
@ -48,7 +50,7 @@ struct ClientCodecInner {
// encoder part // encoder part
flags: Flags, flags: Flags,
headers_size: u32, headers_size: u32,
encoder: encoder::MessageEncoder<RequestHead>, encoder: encoder::MessageEncoder<(Rc<RequestHead>, Option<HeaderMap>)>,
} }
impl Default for ClientCodec { impl Default for ClientCodec {
@ -183,7 +185,7 @@ impl Decoder for ClientPayloadCodec {
} }
impl Encoder for ClientCodec { impl Encoder for ClientCodec {
type Item = Message<(RequestHead, BodySize)>; type Item = Message<(Rc<RequestHead>, Option<HeaderMap>, BodySize)>;
type Error = io::Error; type Error = io::Error;
fn encode( fn encode(
@ -192,13 +194,13 @@ impl Encoder for ClientCodec {
dst: &mut BytesMut, dst: &mut BytesMut,
) -> Result<(), Self::Error> { ) -> Result<(), Self::Error> {
match item { match item {
Message::Item((mut msg, length)) => { Message::Item((head, additional_headers, length)) => {
let inner = &mut self.inner; let inner = &mut self.inner;
inner.version = msg.version; inner.version = head.version;
inner.flags.set(Flags::HEAD, msg.method == Method::HEAD); inner.flags.set(Flags::HEAD, head.method == Method::HEAD);
// connection status // connection status
inner.ctype = match msg.connection_type() { inner.ctype = match head.connection_type() {
ConnectionType::KeepAlive => { ConnectionType::KeepAlive => {
if inner.flags.contains(Flags::KEEPALIVE_ENABLED) { if inner.flags.contains(Flags::KEEPALIVE_ENABLED) {
ConnectionType::KeepAlive ConnectionType::KeepAlive
@ -212,7 +214,7 @@ impl Encoder for ClientCodec {
inner.encoder.encode( inner.encoder.encode(
dst, dst,
&mut msg, &mut (head, additional_headers),
false, false,
false, false,
inner.version, inner.version,

View File

@ -4,6 +4,7 @@ use std::io::Write;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::str::FromStr; use std::str::FromStr;
use std::{cmp, fmt, io, mem}; use std::{cmp, fmt, io, mem};
use std::rc::Rc;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
@ -247,31 +248,32 @@ impl MessageType for Response<()> {
} }
} }
impl MessageType for RequestHead { impl MessageType for (Rc<RequestHead>, Option<HeaderMap>) {
fn status(&self) -> Option<StatusCode> { fn status(&self) -> Option<StatusCode> {
None None
} }
fn chunked(&self) -> bool { fn chunked(&self) -> bool {
self.chunked() self.0.chunked()
} }
fn camel_case(&self) -> bool { fn camel_case(&self) -> bool {
RequestHead::camel_case_headers(self) RequestHead::camel_case_headers(&self.0)
} }
fn headers(&self) -> &HeaderMap { fn headers(&self) -> &HeaderMap {
&self.headers &self.0.headers
} }
fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> {
dst.reserve(256 + self.headers.len() * AVERAGE_HEADER_SIZE); let head = &self.0;
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
write!( write!(
Writer(dst), Writer(dst),
"{} {} {}", "{} {} {}",
self.method, head.method,
self.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"),
match self.version { match head.version {
Version::HTTP_09 => "HTTP/0.9", Version::HTTP_09 => "HTTP/0.9",
Version::HTTP_10 => "HTTP/1.0", Version::HTTP_10 => "HTTP/1.0",
Version::HTTP_11 => "HTTP/1.1", Version::HTTP_11 => "HTTP/1.1",

View File

@ -1,4 +1,5 @@
use std::{fmt, io, net}; use std::{fmt, io, net};
use std::rc::Rc;
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_http::body::Body; use actix_http::body::Body;
@ -7,6 +8,7 @@ use actix_http::client::{
}; };
use actix_http::h1::ClientCodec; use actix_http::h1::ClientCodec;
use actix_http::{RequestHead, ResponseHead}; use actix_http::{RequestHead, ResponseHead};
use actix_http::http::HeaderMap;
use actix_service::Service; use actix_service::Service;
use futures::{Future, Poll}; use futures::{Future, Poll};
@ -17,7 +19,8 @@ pub(crate) struct ConnectorWrapper<T>(pub T);
pub(crate) trait Connect { pub(crate) trait Connect {
fn send_request( fn send_request(
&mut self, &mut self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<Future<Item = ClientResponse, Error = SendRequestError>>; ) -> Box<Future<Item = ClientResponse, Error = SendRequestError>>;
@ -25,7 +28,8 @@ pub(crate) trait Connect {
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel( fn open_tunnel(
&mut self, &mut self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> Box<
Future< Future<
@ -46,7 +50,8 @@ where
{ {
fn send_request( fn send_request(
&mut self, &mut self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
body: Body, body: Body,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box<Future<Item = ClientResponse, Error = SendRequestError>> { ) -> Box<Future<Item = ClientResponse, Error = SendRequestError>> {
@ -59,14 +64,15 @@ where
}) })
.from_err() .from_err()
// send request // send request
.and_then(move |connection| connection.send_request(head, body)) .and_then(move |connection| connection.send_request(head, additional_headers, body))
.map(|(head, payload)| ClientResponse::new(head, payload)), .map(|(head, payload)| ClientResponse::new(head, payload)),
) )
} }
fn open_tunnel( fn open_tunnel(
&mut self, &mut self,
head: RequestHead, head: Rc<RequestHead>,
additional_headers: Option<HeaderMap>,
addr: Option<net::SocketAddr>, addr: Option<net::SocketAddr>,
) -> Box< ) -> Box<
Future< Future<
@ -83,7 +89,7 @@ where
}) })
.from_err() .from_err()
// send request // send request
.and_then(move |connection| connection.open_tunnel(head)) .and_then(move |connection| connection.open_tunnel(head, additional_headers))
.map(|(head, framed)| { .map(|(head, framed)| {
let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io))));
(head, framed) (head, framed)

View File

@ -453,7 +453,7 @@ impl ClientRequest {
let fut = config let fut = config
.connector .connector
.borrow_mut() .borrow_mut()
.send_request(head, body.into(), slf.addr) .send_request(Rc::new(head), None, body.into(), slf.addr)
.map(move |res| { .map(move |res| {
res.map_body(|head, payload| { res.map_body(|head, payload| {
if response_decompress { if response_decompress {

View File

@ -286,7 +286,7 @@ impl WebsocketsRequest {
.config .config
.connector .connector
.borrow_mut() .borrow_mut()
.open_tunnel(head, self.addr) .open_tunnel(Rc::new(head), None, self.addr)
.from_err() .from_err()
.and_then(move |(head, framed)| { .and_then(move |(head, framed)| {
// verify response // verify response