diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index dc51ada18..31e1434bd 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -96,8 +96,7 @@ impl Service for FilesService { return Box::pin(ok(req.into_response( HttpResponse::Found() .insert_header((header::LOCATION, redirect_to)) - .body("") - .into_body(), + .finish(), ))); } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 7ab89ba87..4f66a4a3b 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -17,7 +17,7 @@ use futures_core::ready; use log::{error, trace}; use pin_project::pin_project; -use crate::body::{Body, BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::error::{ParseError, PayloadError}; @@ -141,7 +141,8 @@ where None, ExpectCall(#[pin] X::Future), ServiceCall(#[pin] S::Future), - SendPayload(#[pin] ResponseBody), + SendPayload(#[pin] B), + SendErrorPayload(#[pin] Body), } impl State @@ -298,7 +299,7 @@ where fn send_response( self: Pin<&mut Self>, message: Response<()>, - body: ResponseBody, + body: B, ) -> Result<(), DispatchError> { let size = body.size(); let mut this = self.project(); @@ -319,6 +320,32 @@ where Ok(()) } + fn send_response_any_body( + self: Pin<&mut Self>, + message: Response<()>, + body: Body, + ) -> Result<(), DispatchError> { + // TODO: de-dupe impl with send_response + + let size = body.size(); + let mut this = self.project(); + this.codec + .encode(Message::Item((message, size)), &mut this.write_buf) + .map_err(|err| { + if let Some(mut payload) = this.payload.take() { + payload.set_error(PayloadError::Incomplete(None)); + } + DispatchError::Io(err) + })?; + + this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); + match size { + BodySize::None | BodySize::Empty => this.state.set(State::None), + _ => this.state.set(State::SendErrorPayload(body)), + }; + Ok(()) + } + fn send_continue(self: Pin<&mut Self>) { self.project() .write_buf @@ -353,8 +380,7 @@ where // send_response would update InnerDispatcher state to SendPayload or // None(If response body is empty). // continue loop to poll it. - self.as_mut() - .send_response(res, ResponseBody::Other(Body::Empty))?; + self.as_mut().send_response_any_body(res, Body::Empty)?; } // return with upgrade request and poll it exclusively. @@ -376,7 +402,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_response_any_body(res, body)?; } // service call pending and could be waiting for more chunk messages. @@ -392,6 +418,41 @@ where }, StateProj::SendPayload(mut stream) => { + // keep populate writer buffer until buffer size limit hit, + // get blocked or finished. + while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => { + this.codec.encode( + Message::Chunk(Some(item)), + &mut this.write_buf, + )?; + } + + Poll::Ready(None) => { + this.codec + .encode(Message::Chunk(None), &mut this.write_buf)?; + // payload stream finished. + // set state to None and handle next message + this.state.set(State::None); + continue 'res; + } + + Poll::Ready(Some(Err(err))) => { + return Err(DispatchError::Service(err.into())) + } + + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } + // buffer is beyond max size. + // return and try to write the whole buffer to io stream. + return Ok(PollResponse::DrainWriteBuf); + } + + StateProj::SendErrorPayload(mut stream) => { + // TODO: de-dupe impl with SendPayload + // keep populate writer buffer until buffer size limit hit, // get blocked or finished. while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { @@ -433,12 +494,14 @@ where let fut = this.flow.service.call(req); this.state.set(State::ServiceCall(fut)); } + // send expect error as response Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_response_any_body(res, body)?; } + // expect must be solved before progress can be made. Poll::Pending => return Ok(PollResponse::DoNothing), }, @@ -486,7 +549,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - return self.send_response(res, body.into_body()); + return self.send_response_any_body(res, body); } } } @@ -506,7 +569,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.send_response(res, body.into_body()) + self.send_response_any_body(res, body) } }; } @@ -706,10 +769,10 @@ where } else { // timeout on first request (slow request) return 408 trace!("Slow request timeout"); - let _ = self.as_mut().send_response( + let _ = self.as_mut().send_response_any_body( Response::new(StatusCode::REQUEST_TIMEOUT) .drop_body(), - ResponseBody::Other(Body::Empty), + Body::Empty, ); this = self.project(); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 73f01e913..90e44daa4 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{BodySize, MessageBody}; use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; @@ -14,7 +14,7 @@ use crate::response::Response; pub struct SendResponse { res: Option, BodySize)>>, #[pin] - body: Option>, + body: Option, #[pin] framed: Option>, } @@ -62,7 +62,18 @@ where .unwrap() .is_write_buf_full() { - match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { + let next = + // TODO: MSRV 1.51: poll_map_err + match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)), + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Err(err.into())) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }; + + match next { Poll::Ready(item) => { // body is done when item is None body_done = item.is_none(); diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 07636470b..4a17f4f1a 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -12,7 +12,7 @@ use h2::{ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::message::ResponseHead; @@ -135,7 +135,8 @@ struct ServiceResponse { #[pin_project::pin_project(project = ServiceResponseStateProj)] enum ServiceResponseState { ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, #[pin] ResponseBody), + SendPayload(SendStream, #[pin] B), + SendErrorPayload(SendStream, #[pin] Body), } impl ServiceResponse @@ -280,9 +281,8 @@ where if size.is_eof() { Poll::Ready(()) } else { - this.state.set(ServiceResponseState::SendPayload( - stream, - body.into_body(), + this.state.set(ServiceResponseState::SendErrorPayload( + stream, body, )); self.poll(cx) } @@ -331,8 +331,65 @@ where *this.buffer = Some(chunk); } + Some(Err(err)) => { + error!( + "Response payload stream error: {:?}", + err.into() + ); + + return Poll::Ready(()); + } + }, + } + } + } + + ServiceResponseStateProj::SendErrorPayload(ref mut stream, ref mut body) => { + // TODO: de-dupe impl with SendPayload + + loop { + match this.buffer { + Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { + None => return Poll::Ready(()), + + Some(Ok(cap)) => { + let len = buffer.len(); + let bytes = buffer.split_to(cmp::min(cap, len)); + + if let Err(e) = stream.send_data(bytes, false) { + warn!("{:?}", e); + return Poll::Ready(()); + } else if !buffer.is_empty() { + let cap = cmp::min(buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + this.buffer.take(); + } + } + Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); + warn!("{:?}", e); + return Poll::Ready(()); + } + }, + + None => match ready!(body.as_mut().poll_next(cx)) { + None => { + if let Err(e) = stream.send_data(Bytes::new(), true) { + warn!("{:?}", e); + } + return Poll::Ready(()); + } + + Some(Ok(chunk)) => { + stream + .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); + *this.buffer = Some(chunk); + } + + Some(Err(err)) => { + error!("Response payload stream error: {:?}", err); + return Poll::Ready(()); } }, diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 8cb99d43a..3659213ce 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -389,12 +389,6 @@ impl BoxedResponseHead { pub fn new(status: StatusCode) -> Self { RESPONSE_POOL.with(|p| p.get_message(status)) } - - pub(crate) fn take(&mut self) -> Self { - BoxedResponseHead { - head: self.head.take(), - } - } } impl std::ops::Deref for BoxedResponseHead { diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index da5c7e000..ac49c9d13 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -2,17 +2,13 @@ use std::{ cell::{Ref, RefMut}, - fmt, - future::Future, - pin::Pin, - str, - task::{Context, Poll}, + fmt, str, }; use bytes::{Bytes, BytesMut}; use crate::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, error::Error, extensions::Extensions, http::{HeaderMap, StatusCode}, @@ -23,7 +19,7 @@ use crate::{ /// An HTTP response. pub struct Response { pub(crate) head: BoxedResponseHead, - pub(crate) body: ResponseBody, + pub(crate) body: B, pub(crate) error: Option, } @@ -33,7 +29,7 @@ impl Response { pub fn new(status: StatusCode) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(Body::Empty), + body: Body::Empty, error: None, } } @@ -84,18 +80,18 @@ impl Response { resp } - /// Convert response to response with body - pub fn into_body(self) -> Response { - let b = match self.body { - ResponseBody::Body(b) => b, - ResponseBody::Other(b) => b, - }; - Response { - head: self.head, - error: self.error, - body: ResponseBody::Other(b), - } - } + // /// Convert response to response with body + // pub fn into_body(self) -> Response { + // let b = match self.body { + // ResponseBody::Body(b) => b, + // ResponseBody::Other(b) => b, + // }; + // Response { + // head: self.head, + // error: self.error, + // body: ResponseBody::Other(b), + // } + // } } impl Response { @@ -104,7 +100,7 @@ impl Response { pub fn with_body(status: StatusCode, body: B) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(body), + body, error: None, } } @@ -176,7 +172,7 @@ impl Response { /// Get body of this response #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { &self.body } @@ -184,17 +180,17 @@ impl Response { pub fn set_body(self, body: B2) -> Response { Response { head: self.head, - body: ResponseBody::Body(body), + body, error: None, } } /// Split response and body - pub fn into_parts(self) -> (Response<()>, ResponseBody) { + pub fn into_parts(self) -> (Response<()>, B) { ( Response { head: self.head, - body: ResponseBody::Body(()), + body: (), error: self.error, }, self.body, @@ -205,17 +201,17 @@ impl Response { pub fn drop_body(self) -> Response<()> { Response { head: self.head, - body: ResponseBody::Body(()), + body: (), error: None, } } /// Set a body and return previous body value - pub(crate) fn replace_body(self, body: B2) -> (Response, ResponseBody) { + pub(crate) fn replace_body(self, body: B2) -> (Response, B) { ( Response { head: self.head, - body: ResponseBody::Body(body), + body, error: self.error, }, self.body, @@ -225,7 +221,7 @@ impl Response { /// Set a body and return previous body value pub fn map_body(mut self, f: F) -> Response where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let body = f(&mut self.head, self.body); @@ -236,9 +232,14 @@ impl Response { } } + // /// Extract response body + // pub fn take_body(&mut self) -> ResponseBody { + // self.body.take_body() + // } + /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.body.take_body() + pub fn into_body(self) -> B { + self.body } } @@ -264,17 +265,18 @@ where } } -impl Future for Response { - type Output = Result, Error>; +// TODO: document why this is needed +// impl Future for Response { +// type Output = Result, Infallible>; - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(Response { - head: self.head.take(), - body: self.body.take_body(), - error: self.error.take(), - })) - } -} +// fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { +// Poll::Ready(Ok(Response { +// head: self.head.take(), +// body: self.body.take_body(), +// error: self.error.take(), +// })) +// } +// } /// Helper converters impl>, E: Into> From> for Response { diff --git a/actix-http/src/response_builder.rs b/actix-http/src/response_builder.rs index 0105f70cf..5d94f2eb7 100644 --- a/actix-http/src/response_builder.rs +++ b/actix-http/src/response_builder.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use futures_core::Stream; use crate::{ - body::{Body, BodyStream, ResponseBody}, + body::{Body, BodyStream}, error::{Error, HttpError}, header::{self, IntoHeaderPair, IntoHeaderValue}, message::{BoxedResponseHead, ConnectionType, ResponseHead}, @@ -242,15 +242,16 @@ impl ResponseBuilder { /// /// This `ResponseBuilder` will be left in a useless state. pub fn message_body(&mut self, body: B) -> Response { - if let Some(e) = self.err.take() { - return Response::from(Error::from(e)).into_body(); - } + // TODO: put error handling back somehow + // if let Some(e) = self.err.take() { + // return Response::from(Error::from(e)).into_body(); + // } let response = self.head.take().expect("cannot reuse response builder"); Response { head: response, - body: ResponseBody::Body(body), + body, error: None, } } diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 3a85591da..4f2f2a504 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -1,12 +1,13 @@ //! For middleware documentation, see [`Compat`]. use std::{ + error::Error as StdError, future::Future, pin::Pin, task::{Context, Poll}, }; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_service::{Service, Transform}; use futures_core::{future::LocalBoxFuture, ready}; @@ -116,10 +117,10 @@ pub trait MapServiceResponseBody { impl MapServiceResponseBody for ServiceResponse where B: MessageBody + Unpin + 'static, - B::Error: Into, + B::Error: Into>, { fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) + self.map_body(|_, body| Body::from_message(body)) } } diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 6a56e6de0..f8514c7cc 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -10,7 +10,7 @@ use std::{ }; use actix_http::{ - body::MessageBody, + body::{MessageBody, ResponseBody}, encoding::Encoder, http::header::{ContentEncoding, ACCEPT_ENCODING}, Error, @@ -59,7 +59,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Transform = CompressMiddleware; type InitError = (); @@ -83,7 +83,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Future = CompressResponse; @@ -127,7 +127,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Output = Result>, Error>; + type Output = Result>>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -140,9 +140,9 @@ where *this.encoding }; - Poll::Ready(Ok( - resp.map_body(move |head, body| Encoder::response(enc, head, body)) - )) + Poll::Ready(Ok(resp.map_body(move |head, body| { + Encoder::response(enc, head, ResponseBody::Body(body)) + }))) } Err(e) => Poll::Ready(Err(e)), } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 8a60d6c70..bbb0e3dc4 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -21,7 +21,7 @@ use regex::{Regex, RegexSet}; use time::OffsetDateTime; use crate::{ - dev::{BodySize, MessageBody, ResponseBody}, + dev::{BodySize, MessageBody}, http::{HeaderName, StatusCode}, service::{ServiceRequest, ServiceResponse}, Error, HttpResponse, Result, @@ -289,13 +289,11 @@ where let time = *this.time; let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| { - ResponseBody::Body(StreamLog { - body, - time, - format, - size: 0, - }) + Poll::Ready(Ok(res.map_body(move |_, body| StreamLog { + body, + time, + format, + size: 0, }))) } } @@ -305,7 +303,7 @@ use pin_project::{pin_project, pinned_drop}; #[pin_project(PinnedDrop)] pub struct StreamLog { #[pin] - body: ResponseBody, + body: B, format: Option, size: usize, time: OffsetDateTime, @@ -342,12 +340,15 @@ where cx: &mut Context<'_>, ) -> Poll>> { let this = self.project(); - match this.body.poll_next(cx) { - Poll::Ready(Some(Ok(chunk))) => { + + // TODO: MSRV 1.51: poll_map_err + match ready!(this.body.poll_next(cx)) { + Some(Ok(chunk)) => { *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } - val => val, + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), } } } diff --git a/src/response/builder.rs b/src/response/builder.rs index 8b3c0f10d..7afffb2cf 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -318,9 +318,9 @@ impl HttpResponseBuilder { /// /// `HttpResponseBuilder` can not be used after this call. pub fn message_body(&mut self, body: B) -> HttpResponse { - if let Some(err) = self.err.take() { - return HttpResponse::from_error(Error::from(err)).into_body(); - } + // if let Some(err) = self.err.take() { + // return HttpResponse::from_error(Error::from(err)).into_body(); + // } let res = self .res @@ -336,7 +336,8 @@ impl HttpResponseBuilder { for cookie in jar.delta() { match HeaderValue::from_str(&cookie.to_string()) { Ok(val) => res.headers_mut().append(header::SET_COOKIE, val), - Err(err) => return HttpResponse::from_error(Error::from(err)).into_body(), + Err(err) => res.error = Some(err.into()), + // Err(err) => return HttpResponse::from_error(Error::from(err)).into_body(), }; } } diff --git a/src/response/response.rs b/src/response/response.rs index 6e09a0136..5d129ba67 100644 --- a/src/response/response.rs +++ b/src/response/response.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_http::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, http::{header::HeaderMap, StatusCode}, Extensions, Response, ResponseHead, }; @@ -27,7 +27,7 @@ use crate::{error::Error, HttpResponseBuilder}; /// An HTTP Response pub struct HttpResponse { res: Response, - error: Option, + pub(crate) error: Option, } impl HttpResponse { @@ -57,13 +57,13 @@ impl HttpResponse { } } - /// Convert response to response with body - pub fn into_body(self) -> HttpResponse { - HttpResponse { - res: self.res.into_body(), - error: self.error, - } - } + // /// Convert response to response with body + // pub fn into_body(self) -> HttpResponse { + // HttpResponse { + // res: self.res.into_body(), + // error: self.error, + // } + // } } impl HttpResponse { @@ -192,7 +192,7 @@ impl HttpResponse { /// Get body of this response #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { self.res.body() } @@ -206,7 +206,7 @@ impl HttpResponse { } /// Split response and body - pub fn into_parts(self) -> (HttpResponse<()>, ResponseBody) { + pub fn into_parts(self) -> (HttpResponse<()>, B) { let (head, body) = self.res.into_parts(); ( @@ -229,7 +229,7 @@ impl HttpResponse { /// Set a body and return previous body value pub fn map_body(self, f: F) -> HttpResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { HttpResponse { res: self.res.map_body(f), @@ -237,9 +237,14 @@ impl HttpResponse { } } + // /// Extract response body + // pub fn take_body(&mut self) -> ResponseBody { + // self.res.take_body() + // } + /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.res.take_body() + pub fn into_body(self) -> B { + self.res.into_body() } } @@ -286,9 +291,9 @@ impl Future for HttpResponse { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - if let Some(err) = self.error.take() { - return Poll::Ready(Ok(Response::from_error(err).into_body())); - } + // if let Some(err) = self.error.take() { + // return Poll::Ready(Ok(Response::from_error(err).into_body())); + // } Poll::Ready(Ok(mem::replace( &mut self.res, diff --git a/src/service.rs b/src/service.rs index 0c03f84ad..23b772574 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,7 +2,7 @@ use std::cell::{Ref, RefMut}; use std::rc::Rc; use std::{fmt, net}; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_http::http::{HeaderMap, Method, StatusCode, Uri, Version}; use actix_http::{ Error, Extensions, HttpMessage, Payload, PayloadStream, RequestHead, Response, ResponseHead, @@ -110,9 +110,9 @@ impl ServiceRequest { /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> ServiceResponse { + pub fn error_response>(self, err: E) -> ServiceResponse { let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.req, res.into_body()) + ServiceResponse::new(self.req, res) } /// This method returns reference to the request head @@ -335,22 +335,24 @@ pub struct ServiceResponse { response: HttpResponse, } +impl ServiceResponse { + /// Create service response from the error + pub fn from_err>(err: E, request: HttpRequest) -> Self { + let response = HttpResponse::from_error(err.into()); + ServiceResponse { request, response } + } +} + impl ServiceResponse { /// Create service response instance pub fn new(request: HttpRequest, response: HttpResponse) -> Self { ServiceResponse { request, response } } - /// Create service response from the error - pub fn from_err>(err: E, request: HttpRequest) -> Self { - let response = HttpResponse::from_error(err.into()).into_body(); - ServiceResponse { request, response } - } - /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> Self { - Self::from_err(err, self.request) + pub fn error_response>(self, err: E) -> ServiceResponse { + ServiceResponse::from_err(err, self.request) } /// Create service response @@ -396,23 +398,29 @@ impl ServiceResponse { } /// Execute closure and in case of error convert it to response. - pub fn checked_expr(mut self, f: F) -> Self + pub fn checked_expr(mut self, f: F) -> Result where F: FnOnce(&mut Self) -> Result<(), E>, E: Into, { match f(&mut self) { - Ok(_) => self, + Ok(_) => Ok(self), Err(err) => { - let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.request, res.into_body()) + Err(err.into()) + // let res = HttpResponse::from_error(err.into()); + // ServiceResponse::new(self.request, res.into_body()) } } } + // /// Extract response body + // pub fn take_body(&mut self) -> ResponseBody { + // self.response.take_body() + // } + /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.response.take_body() + pub fn into_body(self) -> B { + self.response.into_body() } } @@ -420,7 +428,7 @@ impl ServiceResponse { /// Set a new body pub fn map_body(self, f: F) -> ServiceResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let response = self.response.map_body(f); diff --git a/src/test.rs b/src/test.rs index 9fe5e9b5d..df72fa279 100644 --- a/src/test.rs +++ b/src/test.rs @@ -10,7 +10,7 @@ use actix_http::{ }; use actix_router::{Path, ResourceDef, Url}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use actix_utils::future::ok; +use actix_utils::future::{ok, poll_fn}; use futures_core::Stream; use futures_util::StreamExt as _; use serde::{de::DeserializeOwned, Serialize}; @@ -153,16 +153,17 @@ where B: MessageBody + Unpin, B::Error: Into, { - let mut resp = app + let resp = app .call(req) .await .unwrap_or_else(|e| panic!("read_response failed at application call: {}", e)); - let mut body = resp.take_body(); + let body = resp.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } bytes.freeze() @@ -194,16 +195,19 @@ where /// assert_eq!(result, Bytes::from_static(b"welcome!")); /// } /// ``` -pub async fn read_body(mut res: ServiceResponse) -> Bytes +pub async fn read_body(res: ServiceResponse) -> Bytes where B: MessageBody + Unpin, B::Error: Into, { - let mut body = res.take_body(); + let body = res.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } + bytes.freeze() }