JsonBody does not require Unpin

This commit is contained in:
Rob Ede 2021-12-25 00:09:32 +00:00
parent 745e96a185
commit ee02ed6269
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
1 changed files with 44 additions and 38 deletions

View File

@ -16,6 +16,7 @@ use actix_http::{
use actix_rt::time::{sleep, Sleep};
use bytes::{Bytes, BytesMut};
use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
use serde::de::DeserializeOwned;
#[cfg(feature = "cookies")]
@ -316,27 +317,30 @@ where
}
}
/// Response's payload json parser, it resolves to a deserialized `T` value.
///
/// Returns error:
///
/// * content type is not `application/json`
/// * content length is greater than 64k
pub struct JsonBody<S, U> {
length: Option<usize>,
err: Option<JsonPayloadError>,
timeout: ResponseTimeout,
fut: Option<ReadBody<S>>,
_phantom: PhantomData<U>,
pin_project! {
/// Response's payload json parser, it resolves to a deserialized `T` value.
///
/// # Errors
/// `Future` implementation returns error if:
/// * content type is not `application/json`
/// * content length is greater than 64k
pub struct JsonBody<B, T> {
#[pin]
fut: Option<ReadBody<B>>,
length: Option<usize>,
timeout: ResponseTimeout,
err: Option<JsonPayloadError>,
_phantom: PhantomData<T>,
}
}
impl<S, U> JsonBody<S, U>
impl<B, T> JsonBody<B, T>
where
S: Stream<Item = Result<Bytes, PayloadError>>,
U: DeserializeOwned,
B: Stream<Item = Result<Bytes, PayloadError>>,
T: DeserializeOwned,
{
/// Create `JsonBody` for request.
pub fn new(res: &mut ClientResponse<S>) -> Self {
pub fn new(res: &mut ClientResponse<B>) -> Self {
// check content-type
let json = if let Ok(Some(mime)) = res.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
@ -364,10 +368,10 @@ where
}
JsonBody {
length: len,
err: None,
timeout: std::mem::take(&mut res.timeout),
fut: Some(ReadBody::new(res.take_payload(), 65536)),
length: len,
timeout: std::mem::take(&mut res.timeout),
err: None,
_phantom: PhantomData,
}
}
@ -381,41 +385,37 @@ where
}
}
impl<T, U> Unpin for JsonBody<T, U>
impl<B, T> Future for JsonBody<B, T>
where
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
U: DeserializeOwned,
B: Stream<Item = Result<Bytes, PayloadError>>,
T: DeserializeOwned,
{
}
type Output = Result<T, JsonPayloadError>;
impl<T, U> Future for JsonBody<T, U>
where
T: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
U: DeserializeOwned,
{
type Output = Result<U, JsonPayloadError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(err) = self.err.take() {
if let Some(err) = this.err.take() {
return Poll::Ready(Err(err));
}
if let Some(len) = self.length.take() {
if len > self.fut.as_ref().unwrap().limit {
if let Some(len) = this.length.take() {
let body = Option::as_ref(&this.fut).unwrap();
if len > body.limit {
return Poll::Ready(Err(JsonPayloadError::Payload(PayloadError::Overflow)));
}
}
self.timeout
this.timeout
.poll_timeout(cx)
.map_err(JsonPayloadError::Payload)?;
let body = ready!(Pin::new(&mut self.get_mut().fut.as_mut().unwrap()).poll(cx))?;
Poll::Ready(serde_json::from_slice::<U>(&body).map_err(JsonPayloadError::from))
let body = ready!(this.fut.as_pin_mut().unwrap().poll(cx))?;
Poll::Ready(serde_json::from_slice::<T>(&body).map_err(JsonPayloadError::from))
}
}
pin_project_lite::pin_project! {
pin_project! {
struct ReadBody<S> {
#[pin]
stream: Payload<S>,
@ -447,6 +447,7 @@ where
if (this.buf.len() + chunk.len()) > *this.limit {
return Poll::Ready(Err(PayloadError::Overflow));
}
this.buf.extend_from_slice(&chunk);
}
@ -460,7 +461,12 @@ mod tests {
use static_assertions::assert_impl_all;
use super::*;
use crate::{http::header, test::TestResponse};
use crate::{any_body::AnyBody, http::header, test::TestResponse};
assert_impl_all!(ReadBody<()>: Unpin);
assert_impl_all!(ReadBody<AnyBody>: Unpin);
assert_impl_all!(JsonBody<BoxedPayloadStream, String>: Unpin);
assert_impl_all!(ClientResponse: Unpin);