Merge remote-tracking branch 'origin/master' into header-from-request

This commit is contained in:
Rob Ede 2021-04-01 15:25:46 +01:00
commit a6964eed9b
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
15 changed files with 212 additions and 284 deletions

3
.cargo/config.toml Normal file
View File

@ -0,0 +1,3 @@
[alias]
chk = "hack check --workspace --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"

View File

@ -3,6 +3,7 @@
## Unreleased - 2021-xx-xx
### Added
* `Header` extractor for extracting common HTTP headers in handlers. [#2094]
* Added `TestServer::client_headers` method. [#2097]
### Fixed
* Double ampersand in Logger format is escaped correctly. [#2067]
@ -16,8 +17,9 @@
[871ca5e4](https://github.com/actix/actix-web/commit/871ca5e4ae2bdc22d1ea02701c2992fa8d04aed7)
[#2067]: https://github.com/actix/actix-web/pull/2067
[#2094]: https://github.com/actix/actix-web/pull/2094
[#2093]: https://github.com/actix/actix-web/pull/2093
[#2094]: https://github.com/actix/actix-web/pull/2094
[#2097]: https://github.com/actix/actix-web/pull/2097
## 4.0.0-beta.4 - 2021-03-09

View File

@ -1,7 +1,8 @@
# Changes
## Unreleased - 2021-xx-xx
### Added
* Added `TestServer::client_headers` method. [#2097]
## 3.0.0-beta.3 - 2021-03-09
* No notable changes.

View File

@ -13,7 +13,9 @@ use std::{net, thread, time};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::{net::TcpStream, System};
use actix_server::{Server, ServiceFactory};
use awc::{error::PayloadError, ws, Client, ClientRequest, ClientResponse, Connector};
use awc::{
error::PayloadError, http::HeaderMap, ws, Client, ClientRequest, ClientResponse, Connector,
};
use bytes::Bytes;
use futures_core::stream::Stream;
use http::Method;
@ -258,6 +260,14 @@ impl TestServer {
self.ws_at("/").await
}
/// Get default HeaderMap of Client.
///
/// Returns Some(&mut HeaderMap) when Client object is unique
/// (No other clone of client exists at the same time).
pub fn client_headers(&mut self) -> Option<&mut HeaderMap> {
self.client.headers()
}
/// Stop HTTP server
fn stop(&mut self) {
self.system.stop();

View File

@ -63,11 +63,9 @@ where
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{
/// Set server keep-alive setting.
///
@ -127,7 +125,6 @@ where
X1: ServiceFactory<Request, Config = (), Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
<X1::Service as Service<Request>>::Future: 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
@ -152,7 +149,6 @@ where
U1: ServiceFactory<(Request, Framed<T, Codec>), Config = (), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
<U1::Service as Service<(Request, Framed<T, Codec>)>>::Future: 'static,
{
HttpServiceBuilder {
keep_alive: self.keep_alive,
@ -211,7 +207,6 @@ where
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,
@ -233,7 +228,6 @@ where
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
{
let cfg = ServiceConfig::new(
self.keep_alive,

View File

@ -7,7 +7,7 @@ use std::{
use actix_codec::Framed;
use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_core::{ready, Stream};
use futures_util::{future::poll_fn, SinkExt as _};
use crate::error::PayloadError;
@ -17,7 +17,7 @@ use crate::http::{
StatusCode,
};
use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::{Payload, PayloadStream};
use crate::payload::Payload;
use super::connection::{ConnectionIo, H1Connection};
use super::error::{ConnectError, SendRequestError};
@ -122,10 +122,7 @@ where
Ok((head, Payload::None))
}
_ => {
let pl: PayloadStream = Box::pin(PlStream::new(framed));
Ok((head, pl.into()))
}
_ => Ok((head, Payload::Stream(Box::pin(PlStream::new(framed))))),
}
}
@ -194,21 +191,16 @@ where
}
#[pin_project::pin_project]
pub(crate) struct PlStream<Io: ConnectionIo>
where
Io: ConnectionIo,
{
pub(crate) struct PlStream<Io: ConnectionIo> {
#[pin]
framed: Option<Framed<H1Connection<Io>, h1::ClientPayloadCodec>>,
framed: Framed<H1Connection<Io>, h1::ClientPayloadCodec>,
}
impl<Io: ConnectionIo> PlStream<Io> {
fn new(framed: Framed<H1Connection<Io>, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());
PlStream {
framed: Some(framed),
}
PlStream { framed }
}
}
@ -219,20 +211,16 @@ impl<Io: ConnectionIo> Stream for PlStream<Io> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut framed = self.project().framed.as_pin_mut().unwrap();
let mut this = self.project();
match framed.as_mut().next_item(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(chunk)) => {
if let Some(chunk) = chunk {
Poll::Ready(Some(Ok(chunk)))
} else {
let keep_alive = framed.codec_ref().keepalive();
framed.io_mut().on_release(keep_alive);
match ready!(this.framed.as_mut().next_item(cx)?) {
Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
Some(None) => {
let keep_alive = this.framed.codec_ref().keepalive();
this.framed.io_mut().on_release(keep_alive);
Poll::Ready(None)
}
}
Poll::Ready(None) => Poll::Ready(None),
None => Poll::Ready(None),
}
}
}

View File

@ -6,7 +6,7 @@ use std::{fmt, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use futures_core::future::LocalBoxFuture;
use futures_util::future::ready;
use crate::body::MessageBody;
@ -14,7 +14,7 @@ use crate::config::ServiceConfig;
use crate::error::{DispatchError, Error};
use crate::request::Request;
use crate::response::Response;
use crate::service::HttpFlow;
use crate::service::HttpServiceHandler;
use crate::{ConnectCallback, OnConnectData};
use super::codec::Codec;
@ -315,47 +315,10 @@ where
}
/// `Service` implementation for HTTP/1 transport
pub struct H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> H1ServiceHandler<T, S, B, X, U> {
H1ServiceHandler {
flow: HttpFlow::new(service, expect, upgrade),
cfg,
on_connect_ext,
_phantom: PhantomData,
}
}
}
pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>;
impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)>
for H1ServiceHandler<T, S, B, X, U>
for HttpServiceHandler<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
@ -372,27 +335,10 @@ where
type Future = Dispatcher<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.flow.expect.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http expect service readiness error: {:?}", e);
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP/1 service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
if let Some(ref upg) = self.flow.upgrade {
ready!(upg.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http upgrade service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
};
ready!(self.flow.service.poll_ready(cx)).map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?;
Poll::Ready(Ok(()))
})
}
fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future {

View File

@ -1,18 +1,20 @@
use std::fmt::{self, Display};
use std::io::Write;
use std::str::FromStr;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
fmt,
io::Write,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};
use bytes::buf::BufMut;
use bytes::BytesMut;
use http::header::{HeaderValue, InvalidHeaderValue};
use time::{offset, OffsetDateTime, PrimitiveDateTime};
use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
use crate::error::ParseError;
use crate::header::IntoHeaderValue;
use crate::time_parser;
/// A timestamp with HTTP formatting and parsing
/// A timestamp with HTTP formatting and parsing.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct HttpDate(OffsetDateTime);
@ -27,18 +29,12 @@ impl FromStr for HttpDate {
}
}
impl Display for HttpDate {
impl fmt::Display for HttpDate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0.format("%a, %d %b %Y %H:%M:%S GMT"), f)
}
}
impl From<OffsetDateTime> for HttpDate {
fn from(dt: OffsetDateTime) -> HttpDate {
HttpDate(dt)
}
}
impl From<SystemTime> for HttpDate {
fn from(sys: SystemTime) -> HttpDate {
HttpDate(PrimitiveDateTime::from(sys).assume_utc())
@ -54,7 +50,7 @@ impl IntoHeaderValue for HttpDate {
wrt,
"{}",
self.0
.to_offset(offset!(UTC))
.to_offset(UtcOffset::UTC)
.format("%a, %d %b %Y %H:%M:%S GMT")
)
.unwrap();

View File

@ -12,7 +12,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_rt::net::TcpStream;
use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
use bytes::Bytes;
use futures_core::ready;
use futures_core::{future::LocalBoxFuture, ready};
use h2::server::{handshake, Handshake};
use pin_project::pin_project;
@ -107,7 +107,6 @@ where
X1: ServiceFactory<Request, Config = (), Response = Request>,
X1::Error: Into<Error>,
X1::InitError: fmt::Debug,
<X1::Service as Service<Request>>::Future: 'static,
{
HttpService {
expect,
@ -128,7 +127,6 @@ where
U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
U1::Error: fmt::Display,
U1::InitError: fmt::Debug,
<U1::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
HttpService {
upgrade,
@ -150,23 +148,24 @@ where
impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TcpStream, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TcpStream, h1::Codec>)>>::Future: 'static,
{
/// Create simple tcp stream service
pub fn tcp(
@ -196,23 +195,24 @@ mod openssl {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TlsStream<TcpStream>, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static,
{
/// Create openssl based service
pub fn openssl(
@ -261,23 +261,24 @@ mod rustls {
impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<
(Request, Framed<TlsStream<TcpStream>, h1::Codec>),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static,
{
/// Create openssl based service
pub fn rustls(
@ -319,137 +320,117 @@ mod rustls {
impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)>
for HttpService<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin + 'static,
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
U::Future: 'static,
U::Error: fmt::Display + Into<Error>,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
type Response = ();
type Error = DispatchError;
type Config = ();
type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
type InitError = ();
type Future = HttpServiceResponse<T, S, B, X, U>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
HttpServiceResponse {
fut: self.srv.new_service(()),
fut_ex: Some(self.expect.new_service(())),
fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
expect: None,
upgrade: None,
on_connect_ext: self.on_connect_ext.clone(),
cfg: self.cfg.clone(),
_phantom: PhantomData,
let service = self.srv.new_service(());
let expect = self.expect.new_service(());
let upgrade = self.upgrade.as_ref().map(|s| s.new_service(()));
let on_connect_ext = self.on_connect_ext.clone();
let cfg = self.cfg.clone();
Box::pin(async move {
let expect = expect
.await
.map_err(|e| log::error!("Init http expect service error: {:?}", e))?;
let upgrade = match upgrade {
Some(upgrade) => {
let upgrade = upgrade.await.map_err(|e| {
log::error!("Init http upgrade service error: {:?}", e)
})?;
Some(upgrade)
}
}
}
None => None,
};
#[doc(hidden)]
#[pin_project]
pub struct HttpServiceResponse<T, S, B, X, U>
where
S: ServiceFactory<Request>,
X: ServiceFactory<Request>,
U: ServiceFactory<(Request, Framed<T, h1::Codec>)>,
{
#[pin]
fut: S::Future,
#[pin]
fut_ex: Option<X::Future>,
#[pin]
fut_upg: Option<U::Future>,
expect: Option<X::Service>,
upgrade: Option<U::Service>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
cfg: ServiceConfig,
_phantom: PhantomData<B>,
}
let service = service
.await
.map_err(|e| log::error!("Init http service error: {:?}", e))?;
impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: ServiceFactory<Request>,
S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Response = Request>,
X::Error: Into<Error>,
X::InitError: fmt::Debug,
<X::Service as Service<Request>>::Future: 'static,
U: ServiceFactory<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
U::InitError: fmt::Debug,
<U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static,
{
type Output =
Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if let Some(fut) = this.fut_ex.as_pin_mut() {
let expect = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.expect = Some(expect);
this.fut_ex.set(None);
}
if let Some(fut) = this.fut_upg.as_pin_mut() {
let upgrade = ready!(fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)))?;
this = self.as_mut().project();
*this.upgrade = Some(upgrade);
this.fut_upg.set(None);
}
let result = ready!(this
.fut
.poll(cx)
.map_err(|e| log::error!("Init http service error: {:?}", e)));
Poll::Ready(result.map(|service| {
let this = self.as_mut().project();
HttpServiceHandler::new(
this.cfg.clone(),
Ok(HttpServiceHandler::new(
cfg,
service,
this.expect.take().unwrap(),
this.upgrade.take(),
this.on_connect_ext.clone(),
)
}))
expect,
upgrade,
on_connect_ext,
))
})
}
}
/// `Service` implementation for HTTP transport
/// `Service` implementation for HTTP/1 and HTTP/2 transport
pub struct HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
X: Service<Request>,
U: Service<(Request, Framed<T, h1::Codec>)>,
{
flow: Rc<HttpFlow<S, X, U>>,
cfg: ServiceConfig,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
pub(super) flow: Rc<HttpFlow<S, X, U>>,
pub(super) cfg: ServiceConfig,
pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>,
_phantom: PhantomData<B>,
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error>,
X: Service<Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>)>,
U::Error: Into<Error>,
{
pub(super) fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?;
ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?;
if let Some(ref upg) = self.flow.upgrade {
ready!(upg.poll_ready(cx).map_err(Into::into))?;
};
Poll::Ready(Ok(()))
}
}
/// A collection of services that describe an HTTP request flow.
pub(super) struct HttpFlow<S, X, U> {
pub(super) service: S,
@ -467,34 +448,6 @@ impl<S, X, U> HttpFlow<S, X, U> {
}
}
impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
where
S: Service<Request>,
S::Error: Into<Error> + 'static,
S::Future: 'static,
S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static,
X: Service<Request, Response = Request>,
X::Error: Into<Error>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
{
fn new(
cfg: ServiceConfig,
service: S,
expect: X,
upgrade: Option<U>,
on_connect_ext: Option<Rc<ConnectCallback<T>>>,
) -> HttpServiceHandler<T, S, B, X, U> {
HttpServiceHandler {
cfg,
on_connect_ext,
flow: HttpFlow::new(service, expect, upgrade),
_phantom: PhantomData,
}
}
}
impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)>
for HttpServiceHandler<T, S, B, X, U>
where
@ -514,47 +467,10 @@ where
type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let ready = self
.flow
.expect
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
self._poll_ready(cx).map_err(|e| {
log::error!("HTTP service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready();
let ready = self
.flow
.service
.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready;
let ready = if let Some(ref upg) = self.flow.upgrade {
upg.poll_ready(cx)
.map_err(|e| {
let e = e.into();
log::error!("Http service readiness error: {:?}", e);
DispatchError::Service(e)
})?
.is_ready()
&& ready
} else {
ready
};
if ready {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
})
}
fn call(

View File

@ -29,5 +29,6 @@ tokio = { version = "1", features = ["sync"] }
[dev-dependencies]
actix-rt = "2.2"
awc = { version = "3.0.0-beta.3", default-features = false }
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false }

View File

@ -1,5 +1,8 @@
use actix::prelude::*;
use actix_web::{test, web, App, HttpRequest};
use actix_web::{
http::{header, StatusCode},
test, web, App, HttpRequest, HttpResponse,
};
use actix_web_actors::*;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
@ -56,3 +59,51 @@ async fn test_simple() {
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into())));
}
#[actix_rt::test]
async fn test_with_credentials() {
let mut srv = test::start(|| {
App::new().service(web::resource("/").to(
|req: HttpRequest, stream: web::Payload| async move {
if req.headers().contains_key("Authorization") {
ws::start(Ws, &req, stream)
} else {
Ok(HttpResponse::new(StatusCode::UNAUTHORIZED))
}
},
))
});
// client service without credentials
match srv.ws().await {
Ok(_) => panic!("WebSocket client without credentials should panic"),
Err(awc::error::WsClientError::InvalidResponseStatus(status)) => {
assert_eq!(status, StatusCode::UNAUTHORIZED)
}
Err(e) => panic!("Invalid error from WebSocket client: {}", e),
}
let headers = srv.client_headers().unwrap();
headers.insert(
header::AUTHORIZATION,
header::HeaderValue::from_static("Bearer Something"),
);
// client service with credentials
let client = srv.ws();
let mut framed = client.await.unwrap();
framed.send(ws::Message::Text("text".into())).await.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
framed
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
.await
.unwrap();
let item = framed.next().await.unwrap().unwrap();
assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into())));
}

View File

@ -1,11 +1,15 @@
# Changes
## Unreleased - 2021-xx-xx
### Added
* Add `Client::headers` to get default mut reference of `HeaderMap` of client object. [#2114]
### Changed
* `ConnectorService` type is renamed to `BoxConnectorService`. [#2081]
* Fix http/https encoding when enabling `compress` feature. [#2116]
[#2081]: https://github.com/actix/actix-web/pull/2081
[#2114]: https://github.com/actix/actix-web/pull/2114
[#2116]: https://github.com/actix/actix-web/pull/2116

View File

@ -286,4 +286,12 @@ impl Client {
}
req
}
/// Get default HeaderMap of Client.
///
/// Returns Some(&mut HeaderMap) when Client object is unique
/// (No other clone of client exists at the same time).
pub fn headers(&mut self) -> Option<&mut HeaderMap> {
Rc::get_mut(&mut self.0.headers)
}
}

View File

@ -7,7 +7,7 @@ digraph {
}
"actix-codec" -> { "actix-rt" "actix-service" "local-channel" "tokio" }
"actix-utils" -> { "actix-rt" "actix-service" "local-waker" }
"actix-utils" -> { "local-waker" }
"actix-tracing" -> { "actix-service" }
"actix-tls" -> { "actix-service" "actix-codec" "actix-utils" "actix-rt" }
"actix-server" -> { "actix-service" "actix-rt" "actix-codec" "actix-utils" "tokio" }

View File

@ -970,6 +970,14 @@ impl TestServer {
self.ws_at("/").await
}
/// Get default HeaderMap of Client.
///
/// Returns Some(&mut HeaderMap) when Client object is unique
/// (No other clone of client exists at the same time).
pub fn client_headers(&mut self) -> Option<&mut HeaderMap> {
self.client.headers()
}
/// Gracefully stop HTTP server
pub async fn stop(self) {
self.server.stop(true).await;