This commit is contained in:
Rob Ede 2021-04-13 11:12:40 +01:00
parent d29f208cba
commit fa9691a2f8
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
14 changed files with 270 additions and 150 deletions

View File

@ -1,6 +1,6 @@
use std::{env, io};
use actix_http::{http::StatusCode, Error, HttpService, Request, Response};
use actix_http::{http::StatusCode, HttpService, Request, Response};
use actix_server::Server;
use bytes::BytesMut;
use futures_util::StreamExt as _;
@ -24,14 +24,14 @@ async fn main() -> io::Result<()> {
}
info!("request body: {:?}", body);
Ok::<_, Error>(
Response::builder(StatusCode::OK)
.insert_header((
"x-head",
HeaderValue::from_static("dummy value!"),
))
.body(body),
)
Response::builder(StatusCode::OK)
.insert_header((
"x-head",
HeaderValue::from_static("dummy value!"),
))
.take()
.body(body)
})
.tcp()
})?

View File

@ -17,9 +17,10 @@ async fn handle_request(mut req: Request) -> Result<Response<Body>, Error> {
}
info!("request body: {:?}", body);
Ok(Response::builder(StatusCode::OK)
Response::builder(StatusCode::OK)
.insert_header(("x-head", HeaderValue::from_static("dummy value!")))
.body(body))
.take()
.body(body)
}
#[actix_rt::main]

View File

@ -23,7 +23,7 @@ async fn main() -> io::Result<()> {
"x-head",
HeaderValue::from_static("dummy value!"),
));
future::ok::<_, ()>(res.body("Hello world!"))
future::ready(res.body("Hello world!"))
})
.tcp()
})?

View File

@ -36,12 +36,12 @@ async fn main() -> io::Result<()> {
async fn handler(req: Request) -> Result<Response<BodyStream<Heartbeat>>, Error> {
log::info!("handshaking");
let mut res = ws::handshake(req.head())?;
let res = ws::handshake(req.head())?;
// handshake will always fail under HTTP/2
log::info!("responding");
Ok(res.message_body(BodyStream::new(Heartbeat::new(ws::Codec::new()))))
res.streaming(Heartbeat::new(ws::Codec::new()))
}
struct Heartbeat {

View File

@ -1,5 +1,12 @@
//! Traits and structures to aid consuming and writing HTTP payloads.
use std::task::Poll;
use actix_rt::pin;
use actix_utils::future::poll_fn;
use bytes::{Bytes, BytesMut};
use futures_core::ready;
#[allow(clippy::module_inception)]
mod body;
mod body_stream;
@ -15,6 +22,31 @@ pub use self::response_body::ResponseBody;
pub use self::size::BodySize;
pub use self::sized_stream::SizedStream;
pub async fn to_bytes(body: impl MessageBody) -> Result<Bytes, crate::Error> {
let cap = match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => return Ok(Bytes::new()),
BodySize::Sized(size) => size as usize,
BodySize::Stream => 32_768,
};
let mut buf = BytesMut::with_capacity(cap);
pin!(body);
poll_fn(|cx| loop {
let body = body.as_mut();
match ready!(body.poll_next(cx)) {
Some(Ok(bytes)) => buf.extend(bytes),
None => return Poll::Ready(Ok(())),
Some(Err(err)) => return Poll::Ready(Err(err)),
}
})
.await?;
Ok(buf.freeze())
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
@ -44,6 +76,42 @@ mod tests {
}
}
impl ResponseBody<Bytes> {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.as_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
impl ResponseBody<String> {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.as_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
impl ResponseBody<&str> {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.as_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
impl ResponseBody<&[u8]> {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.as_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
#[actix_rt::test]
async fn test_static_str() {
assert_eq!(Body::from("").size(), BodySize::Sized(0));

View File

@ -135,8 +135,14 @@ impl From<Response<Body>> for Error {
/// Convert ResponseBuilder to a Error
impl From<ResponseBuilder> for Error {
fn from(mut res: ResponseBuilder) -> Error {
InternalError::from_response("", res.finish()).into()
fn from(res: ResponseBuilder) -> Error {
match res.finish() {
Ok(res) => InternalError::from_response("", res).into(),
Err(err) => {
let res = err.as_response_error().error_response();
InternalError::from_response(err, res).into()
}
}
}
}

View File

@ -64,6 +64,12 @@ impl Response<()> {
pub fn not_found() -> Response<()> {
Response::new(StatusCode::NOT_FOUND)
}
/// Creates a new response with status 500 Internal Server Error.
#[inline]
pub fn internal_server_error() -> Response<()> {
Response::new(StatusCode::INTERNAL_SERVER_ERROR)
}
}
impl Response<Body> {
@ -204,7 +210,7 @@ impl<B> Response<B> {
}
}
/// Set a body and return previous body value
/// Set a body and return previous body value.
pub(crate) fn replace_body<B2>(self, body: B2) -> (Response<B2>, ResponseBody<B>) {
(
Response {
@ -216,7 +222,9 @@ impl<B> Response<B> {
)
}
/// Set a body and return previous body value
/// Consumes the response returning a new response with the body mapped by a function.
///
/// Given function also has mutable access to response head.
pub fn map_body<F, B2>(mut self, f: F) -> Response<B2>
where
F: FnOnce(&mut ResponseHead, ResponseBody<B>) -> ResponseBody<B2>,
@ -271,38 +279,44 @@ impl<I: Into<Response<Body>>, E: Into<Error>> From<Result<I, E>> for Response<Bo
fn from(res: Result<I, E>) -> Self {
match res {
Ok(val) => val.into(),
Err(err) => err.into().into(),
Err(err) => Response::from_error(err.into()),
}
}
}
impl From<ResponseBuilder> for Response<Body> {
fn from(mut builder: ResponseBuilder) -> Self {
builder.finish()
fn from(builder: ResponseBuilder) -> Self {
builder.finish().into()
}
}
impl From<&'static str> for Response<Body> {
impl From<&'static str> for Response<&'static str> {
fn from(val: &'static str) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::TEXT_PLAIN_UTF_8)
.take()
.body(val)
.unwrap()
}
}
impl From<&'static [u8]> for Response<Body> {
impl From<&'static [u8]> for Response<&'static [u8]> {
fn from(val: &'static [u8]) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::APPLICATION_OCTET_STREAM)
.take()
.body(val)
.unwrap()
}
}
impl From<String> for Response<Body> {
impl From<String> for Response<String> {
fn from(val: String) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::TEXT_PLAIN_UTF_8)
.take()
.body(val)
.unwrap()
}
}
@ -310,23 +324,29 @@ impl<'a> From<&'a String> for Response<Body> {
fn from(val: &'a String) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::TEXT_PLAIN_UTF_8)
.body(val)
.take()
.body(Body::from_slice(val.as_bytes()))
.unwrap()
}
}
impl From<Bytes> for Response<Body> {
impl From<Bytes> for Response<Bytes> {
fn from(val: Bytes) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::APPLICATION_OCTET_STREAM)
.take()
.body(val)
.unwrap()
}
}
impl From<BytesMut> for Response<Body> {
impl From<BytesMut> for Response<Bytes> {
fn from(val: BytesMut) -> Self {
Response::builder(StatusCode::OK)
.content_type(mime::APPLICATION_OCTET_STREAM)
.body(val)
.take()
.body(val.freeze())
.unwrap()
}
}
@ -348,7 +368,7 @@ mod tests {
#[test]
fn test_into_response() {
let resp: Response<Body> = "test".into();
let resp: Response<_> = "test".into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -357,7 +377,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().get_ref(), b"test");
let resp: Response<Body> = b"test".as_ref().into();
let resp: Response<_> = b"test".as_ref().into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -366,7 +386,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().get_ref(), b"test");
let resp: Response<Body> = "test".to_owned().into();
let resp: Response<_> = "test".to_owned().into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -385,7 +405,7 @@ mod tests {
assert_eq!(resp.body().get_ref(), b"test");
let b = Bytes::from_static(b"test");
let resp: Response<Body> = b.into();
let resp: Response<_> = b.into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -394,8 +414,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().get_ref(), b"test");
let b = Bytes::from_static(b"test");
let resp: Response<Body> = b.into();
let resp: Response<_> = Bytes::from_static(b"test").into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),
@ -405,7 +424,7 @@ mod tests {
assert_eq!(resp.body().get_ref(), b"test");
let b = BytesMut::from("test");
let resp: Response<Body> = b.into();
let resp: Response<_> = b.into();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(
resp.headers().get(CONTENT_TYPE).unwrap(),

View File

@ -2,6 +2,7 @@ use std::{
cell::{Ref, RefMut},
fmt,
future::Future,
mem,
pin::Pin,
str,
task::{Context, Poll},
@ -24,8 +25,7 @@ use crate::{
///
/// This type can be used to construct an instance of `Response` through a builder-like pattern.
pub struct ResponseBuilder {
head: Option<BoxedResponseHead>,
err: Option<HttpError>,
inner: Result<BoxedResponseHead, HttpError>,
}
impl ResponseBuilder {
@ -33,14 +33,13 @@ impl ResponseBuilder {
/// Create response builder
pub fn new(status: StatusCode) -> Self {
ResponseBuilder {
head: Some(BoxedResponseHead::new(status)),
err: None,
inner: Ok(BoxedResponseHead::new(status)),
}
}
/// Set HTTP status code of this response.
#[inline]
pub fn status(&mut self, status: StatusCode) -> &mut Self {
pub fn status(mut self, status: StatusCode) -> Self {
if let Some(parts) = self.inner() {
parts.status = status;
}
@ -58,7 +57,7 @@ impl ResponseBuilder {
/// .insert_header(("X-TEST", "value"))
/// .finish();
/// ```
pub fn insert_header<H>(&mut self, header: H) -> &mut Self
pub fn insert_header<H>(mut self, header: H) -> Self
where
H: IntoHeaderPair,
{
@ -67,7 +66,7 @@ impl ResponseBuilder {
Ok((key, value)) => {
parts.headers.insert(key, value);
}
Err(e) => self.err = Some(e.into()),
Err(err) => self.inner = Err(err.into()),
};
}
@ -86,14 +85,14 @@ impl ResponseBuilder {
/// .append_header(("X-TEST", "value2"))
/// .finish();
/// ```
pub fn append_header<H>(&mut self, header: H) -> &mut Self
pub fn append_header<H>(mut self, header: H) -> Self
where
H: IntoHeaderPair,
{
if let Some(parts) = self.inner() {
match header.try_into_header_pair() {
Ok((key, value)) => parts.headers.append(key, value),
Err(e) => self.err = Some(e.into()),
Err(err) => self.inner = Err(err.into()),
};
}
@ -102,7 +101,7 @@ impl ResponseBuilder {
/// Set the custom reason for the response.
#[inline]
pub fn reason(&mut self, reason: &'static str) -> &mut Self {
pub fn reason(mut self, reason: &'static str) -> Self {
if let Some(parts) = self.inner() {
parts.reason = Some(reason);
}
@ -111,7 +110,7 @@ impl ResponseBuilder {
/// Set connection type to KeepAlive
#[inline]
pub fn keep_alive(&mut self) -> &mut Self {
pub fn keep_alive(self) -> Self {
if let Some(parts) = self.inner() {
parts.set_connection_type(ConnectionType::KeepAlive);
}
@ -120,7 +119,7 @@ impl ResponseBuilder {
/// Set connection type to Upgrade
#[inline]
pub fn upgrade<V>(&mut self, value: V) -> &mut Self
pub fn upgrade<V>(mut self, value: V) -> Self
where
V: IntoHeaderValue,
{
@ -132,12 +131,12 @@ impl ResponseBuilder {
self.insert_header((header::UPGRADE, value));
}
self
res
}
/// Force close connection, even if it is marked as keep-alive
#[inline]
pub fn force_close(&mut self) -> &mut Self {
pub fn force_close(mut self) -> Self {
if let Some(parts) = self.inner() {
parts.set_connection_type(ConnectionType::Close);
}
@ -146,28 +145,29 @@ impl ResponseBuilder {
/// Disable chunked transfer encoding for HTTP/1.1 streaming responses.
#[inline]
pub fn no_chunking(&mut self, len: u64) -> &mut Self {
pub fn no_chunking(self, len: u64) -> Self {
let mut buf = itoa::Buffer::new();
self.insert_header((header::CONTENT_LENGTH, buf.format(len)));
let mut res = self.insert_header((header::CONTENT_LENGTH, buf.format(len)));
if let Some(parts) = self.inner() {
parts.no_chunking(true);
if let Some(head) = res.inner() {
head.no_chunking(true);
}
self
res
}
/// Set response content type.
#[inline]
pub fn content_type<V>(&mut self, value: V) -> &mut Self
pub fn content_type<V>(mut self, value: V) -> Self
where
V: IntoHeaderValue,
{
if let Some(parts) = self.inner() {
if let Some(head) = self.inner() {
match value.try_into_value() {
Ok(value) => {
parts.headers.insert(header::CONTENT_TYPE, value);
head.headers.insert(header::CONTENT_TYPE, value);
}
Err(e) => self.err = Some(e.into()),
Err(err) => self.inner = Err(err.into()),
};
}
self
@ -176,77 +176,74 @@ impl ResponseBuilder {
/// Responses extensions
#[inline]
pub fn extensions(&self) -> Ref<'_, Extensions> {
let head = self.head.as_ref().expect("cannot reuse response builder");
let head = self.inner.as_ref().expect("cannot reuse response builder");
head.extensions.borrow()
}
/// Mutable reference to a the response's extensions
#[inline]
pub fn extensions_mut(&mut self) -> RefMut<'_, Extensions> {
let head = self.head.as_ref().expect("cannot reuse response builder");
let head = self.inner.as_ref().expect("cannot reuse response builder");
head.extensions.borrow_mut()
}
/// Set a body and generate `Response`.
/// Creates an owned response builder, leaving a default-ish builder in it's place.
///
/// `ResponseBuilder` can not be used after this call.
#[inline]
pub fn body<B: Into<Body>>(&mut self, body: B) -> Response<Body> {
self.message_body(body.into())
/// Useful under the assumption the original builder will be dropped immediately.
///
/// If the builder contains an error, it will be passed to the new, owned builder.
pub fn take(&mut self) -> ResponseBuilder {
let res = BoxedResponseHead::new(StatusCode::INTERNAL_SERVER_ERROR);
let inner = mem::replace(&mut self.inner, Ok(res));
ResponseBuilder { inner }
}
/// Set a body and generate `Response`.
///
/// `ResponseBuilder` can not be used after this call.
pub fn message_body<B>(&mut self, body: B) -> Response<B> {
if let Some(e) = self.err.take() {
return Response::from(Error::from(e)).into_body();
}
let response = self.head.take().expect("cannot reuse response builder");
Response {
head: response,
body: ResponseBody::Body(body),
error: None,
fn with_body<B>(self, body: B) -> Result<Response<B>, Error> {
match self.inner {
Ok(head) => Ok(Response {
head,
body: ResponseBody::Body(body),
error: None,
}),
Err(err) => Err(Error::from(err)),
}
}
/// Set a streaming body and generate `Response`.
///
/// `ResponseBuilder` can not be used after this call.
/// Consume builder and generate response with given body.
#[inline]
pub fn streaming<S, E>(&mut self, stream: S) -> Response<Body>
pub fn body<B>(self, body: B) -> Result<Response<B>, Error> {
self.with_body(body.into())
}
/// Consume builder and generate response with given stream as body.
#[inline]
pub fn streaming<S, E>(self, stream: S) -> Result<Response<BodyStream<S>>, Error>
where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Error> + 'static,
{
self.body(Body::from_message(BodyStream::new(stream)))
self.body(BodyStream::new(stream))
}
/// Set an empty body and generate `Response`
///
/// `ResponseBuilder` can not be used after this call.
/// Consume builder and generate response with empty body.
#[inline]
pub fn finish(&mut self) -> Response<Body> {
pub fn finish(self) -> Result<Response<Body>, Error> {
self.body(Body::Empty)
}
/// This method construct new `ResponseBuilder`
pub fn take(&mut self) -> ResponseBuilder {
ResponseBuilder {
head: self.head.take(),
err: self.err.take(),
}
/// Consume builder and generate response with empty body, converting errors into responses.
#[inline]
pub fn complete(self) -> Response<Body> {
self.body(Body::Empty).into()
}
/// Access to contained response when there is no error.
fn inner(&mut self) -> Option<&mut ResponseHead> {
if self.err.is_some() {
return None;
}
self.head.as_mut().map(|r| &mut **r)
self.inner.as_mut().ok().map(|head| &mut **head)
}
}
@ -254,8 +251,7 @@ impl ResponseBuilder {
impl<B> From<Response<B>> for ResponseBuilder {
fn from(res: Response<B>) -> ResponseBuilder {
ResponseBuilder {
head: Some(res.head),
err: None,
inner: Ok(res.head),
}
}
}
@ -273,10 +269,7 @@ impl<'a> From<&'a ResponseHead> for ResponseBuilder {
msg.no_chunking(!head.chunked());
ResponseBuilder {
head: Some(msg),
err: None,
}
ResponseBuilder { inner: Ok(msg) }
}
}
@ -284,13 +277,13 @@ impl Future for ResponseBuilder {
type Output = Result<Response<Body>, Error>;
fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(self.finish()))
Poll::Ready(self.take().finish())
}
}
impl fmt::Debug for ResponseBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let head = self.head.as_ref().unwrap();
let head = self.inner.as_ref().unwrap();
let res = writeln!(
f,
@ -317,7 +310,7 @@ mod tests {
fn test_basic_builder() {
let resp = Response::builder(StatusCode::OK)
.insert_header(("X-TEST", "value"))
.finish();
.complete();
assert_eq!(resp.status(), StatusCode::OK);
}
@ -325,7 +318,7 @@ mod tests {
fn test_upgrade() {
let resp = Response::builder(StatusCode::OK)
.upgrade("websocket")
.finish();
.complete();
assert!(resp.upgrade());
assert_eq!(
resp.headers().get(header::UPGRADE).unwrap(),
@ -335,7 +328,10 @@ mod tests {
#[test]
fn test_force_close() {
let resp = Response::builder(StatusCode::OK).force_close().finish();
let resp = Response::builder(StatusCode::OK)
.force_close()
.finish()
.unwrap();
assert!(!resp.keep_alive())
}
@ -343,13 +339,14 @@ mod tests {
fn test_content_type() {
let resp = Response::builder(StatusCode::OK)
.content_type("text/plain")
.body(Body::Empty);
.body(Body::Empty)
.unwrap();
assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain")
}
#[test]
fn test_into_builder() {
let mut resp: Response<Body> = "test".into();
let mut resp: Response<_> = "test".into();
assert_eq!(resp.status(), StatusCode::OK);
resp.headers_mut().insert(
@ -358,7 +355,7 @@ mod tests {
);
let mut builder: ResponseBuilder = resp.into();
let resp = builder.status(StatusCode::BAD_REQUEST).finish();
let resp = builder.status(StatusCode::BAD_REQUEST).finish().unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
let cookie = resp.headers().get_all("Cookie").next().unwrap();
@ -367,9 +364,11 @@ mod tests {
#[test]
fn response_builder_header_insert_kv() {
let mut res = Response::builder(StatusCode::OK);
res.insert_header(("Content-Type", "application/octet-stream"));
let res = res.finish();
let res = Response::builder(StatusCode::OK)
.insert_header(("Content-Type", "application/octet-stream"))
.take()
.finish()
.unwrap();
assert_eq!(
res.headers().get("Content-Type"),
@ -379,9 +378,11 @@ mod tests {
#[test]
fn response_builder_header_insert_typed() {
let mut res = Response::builder(StatusCode::OK);
res.insert_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM));
let res = res.finish();
let res = Response::builder(StatusCode::OK)
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM))
.take()
.finish()
.unwrap();
assert_eq!(
res.headers().get("Content-Type"),
@ -391,10 +392,12 @@ mod tests {
#[test]
fn response_builder_header_append_kv() {
let mut res = Response::builder(StatusCode::OK);
res.append_header(("Content-Type", "application/octet-stream"));
res.append_header(("Content-Type", "application/json"));
let res = res.finish();
let mut res = Response::builder(StatusCode::OK)
.append_header(("Content-Type", "application/octet-stream"))
.append_header(("Content-Type", "application/json"))
.take()
.finish()
.unwrap();
let headers: Vec<_> = res.headers().get_all("Content-Type").cloned().collect();
assert_eq!(headers.len(), 2);
@ -407,7 +410,7 @@ mod tests {
let mut res = Response::builder(StatusCode::OK);
res.append_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM));
res.append_header((header::CONTENT_TYPE, mime::APPLICATION_JSON));
let res = res.finish();
let res = res.finish().unwrap();
let headers: Vec<_> = res.headers().get_all("Content-Type").cloned().collect();
assert_eq!(headers.len(), 2);

View File

@ -103,38 +103,50 @@ impl ResponseError for HandshakeError {
match self {
HandshakeError::GetMethodRequired => {
Response::builder(StatusCode::METHOD_NOT_ALLOWED)
.insert_header((header::ALLOW, "GET"))
.finish()
.insert_header((header::ALLOW, "GET"))
.take()
.body(Body::Empty)
.unwrap()
}
HandshakeError::NoWebsocketUpgrade => {
Response::builder(StatusCode::BAD_REQUEST)
.reason("No WebSocket Upgrade header found")
.finish()
.reason("No WebSocket Upgrade header found")
.take()
.body(Body::Empty)
.unwrap()
}
HandshakeError::NoConnectionUpgrade => {
Response::builder(StatusCode::BAD_REQUEST)
.reason("No Connection upgrade")
.finish()
.reason("No Connection upgrade")
.take()
.body(Body::Empty)
.unwrap()
}
HandshakeError::NoVersionHeader => {
Response::builder(StatusCode::BAD_REQUEST)
.reason("WebSocket version header is required")
.finish()
.reason("WebSocket version header is required")
.take()
.body(Body::Empty)
.unwrap()
}
HandshakeError::UnsupportedVersion => {
Response::builder(StatusCode::BAD_REQUEST)
.reason("Unsupported WebSocket version")
.finish()
.reason("Unsupported WebSocket version")
.take()
.body(Body::Empty)
.unwrap()
}
HandshakeError::BadWebsocketKey => {
Response::builder(StatusCode::BAD_REQUEST)
.reason("Handshake error")
.finish()
.take()
.body(Body::Empty)
.unwrap()
}
}
}
@ -326,7 +338,7 @@ mod tests {
.finish();
assert_eq!(
StatusCode::SWITCHING_PROTOCOLS,
handshake_response(req.head()).finish().status()
handshake_response(req.head()).finish().unwrap().status()
);
}

View File

@ -205,7 +205,7 @@ async fn test_h2_headers() {
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ",
));
}
ok::<_, ()>(builder.body(data.clone()))
ready(builder.body(data.clone()))
})
.openssl(tls_config())
.map_err(|_| ())
@ -331,8 +331,9 @@ async fn test_h2_body_length() {
HttpService::build()
.h2(|_| {
let body = once(ok(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.take()
.body(SizedStream::new(STR.len() as u64, body)),
)
})
@ -355,9 +356,10 @@ async fn test_h2_body_chunked_explicit() {
HttpService::build()
.h2(|_| {
let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((header::TRANSFER_ENCODING, "chunked"))
.take()
.streaming(body),
)
})
@ -383,9 +385,10 @@ async fn test_h2_response_http_error_handling() {
HttpService::build()
.h2(fn_service(|_| {
let broken_header = Bytes::from_static(b"\0\0\0");
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((header::CONTENT_TYPE, broken_header))
.take()
.body(STR),
)
}))

View File

@ -13,7 +13,7 @@ use actix_http::{
};
use actix_http_test::test_server;
use actix_service::{fn_factory_with_config, fn_service};
use actix_utils::future::{err, ok};
use actix_utils::future::{err, ok, ready};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
@ -218,9 +218,9 @@ async fn test_h2_headers() {
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ",
));
}
ok::<_, ()>(config.body(data.clone()))
ready(config.body(data.clone()))
})
.rustls(tls_config())
.rustls(tls_config())
}).await;
let response = srv.sget("/").send().await.unwrap();
@ -370,9 +370,10 @@ async fn test_h2_body_chunked_explicit() {
HttpService::build()
.h2(|_| {
let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((header::TRANSFER_ENCODING, "chunked"))
.take()
.streaming(body),
)
})
@ -398,9 +399,10 @@ async fn test_h2_response_http_error_handling() {
.h2(fn_factory_with_config(|_: ()| {
ok::<_, ()>(fn_service(|_| {
let broken_header = Bytes::from_static(b"\0\0\0");
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((http::header::CONTENT_TYPE, broken_header))
.take()
.body(STR),
)
}))

View File

@ -416,7 +416,7 @@ async fn test_h1_headers() {
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ",
));
}
ok::<_, ()>(builder.body(data.clone()))
ok::<_, ()>(builder.body(data.clone()).unwrap())
}).tcp()
}).await;
@ -566,9 +566,10 @@ async fn test_h1_body_chunked_explicit() {
HttpService::build()
.h1(|_| {
let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((header::TRANSFER_ENCODING, "chunked"))
.take()
.streaming(body),
)
})
@ -601,7 +602,7 @@ async fn test_h1_body_chunked_implicit() {
HttpService::build()
.h1(|_| {
let body = once(ok::<_, Error>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(Response::builder(StatusCode::OK).streaming(body))
ready(Response::builder(StatusCode::OK).streaming(body))
})
.tcp()
})
@ -630,9 +631,10 @@ async fn test_h1_response_http_error_handling() {
HttpService::build()
.h1(fn_service(|_| {
let broken_header = Bytes::from_static(b"\0\0\0");
ok::<_, ()>(
ready(
Response::builder(StatusCode::OK)
.insert_header((http::header::CONTENT_TYPE, broken_header))
.take()
.body(STR),
)
}))

View File

@ -52,7 +52,11 @@ where
fn call(&self, (req, mut framed): (Request, Framed<T, h1::Codec>)) -> Self::Future {
let fut = async move {
let res = ws::handshake(req.head()).unwrap().message_body(());
let res = ws::handshake(req.head())
.unwrap()
.finish()
.unwrap()
.set_body(());
framed
.send((res, body::BodySize::None).into())

View File

@ -82,8 +82,8 @@ impl Responder for HttpResponseBuilder {
impl Responder for actix_http::ResponseBuilder {
#[inline]
fn respond_to(mut self, _: &HttpRequest) -> HttpResponse {
HttpResponse::from(self.finish())
fn respond_to(self, _: &HttpRequest) -> HttpResponse {
HttpResponse::from(self.complete())
}
}