diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 850f97ee4..387237730 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -5,6 +5,7 @@ use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; +use futures_util::ready; use pin_project::{pin_project, project}; use crate::error::Error; @@ -389,12 +390,19 @@ where BodySize::Stream } + /// Attempts to pull out the next value of the underlying [`Stream`]. + /// + /// Empty values are skipped to prevent [`BodyStream`]'s transmission being + /// ended on a zero-length chunk, but rather proceed until the underlying + /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - unsafe { Pin::new_unchecked(self) } - .project() - .stream - .poll_next(cx) - .map(|res| res.map(|res| res.map_err(std::convert::Into::into))) + let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + loop { + return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + Some(Ok(ref bytes)) if bytes.is_empty() => continue, + opt => opt.map(|res| res.map_err(Into::into)), + }); + } } } @@ -424,17 +432,27 @@ where BodySize::Sized64(self.size) } + /// Attempts to pull out the next value of the underlying [`Stream`]. + /// + /// Empty values are skipped to prevent [`SizedStream`]'s transmission being + /// ended on a zero-length chunk, but rather proceed until the underlying + /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - unsafe { Pin::new_unchecked(self) } - .project() - .stream - .poll_next(cx) + let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + loop { + return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + Some(Ok(ref bytes)) if bytes.is_empty() => continue, + val => val, + }); + } } } #[cfg(test)] mod tests { use super::*; + use bytes::BufMut; + use futures::stream; use futures_util::future::poll_fn; impl Body { @@ -589,4 +607,45 @@ mod tests { BodySize::Sized(25) ); } + + mod body_stream { + use super::*; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let body = BodyStream::new(stream::iter( + ["1", "", "2"] + .iter() + .map(|&v| Ok(Bytes::from(v)) as Result), + )); + assert_eq!(read_all(body).await.unwrap(), Bytes::from("12")); + } + } + + mod sized_stream { + use super::*; + + #[actix_rt::test] + async fn skips_empty_chunks() { + let body = SizedStream::new( + 2, + stream::iter( + ["1", "", "2"] + .iter() + .map(|&v| Ok(Bytes::from(v))), + ), + ); + assert_eq!(read_all(body).await.unwrap(), Bytes::from("12")); + } + } + + async fn read_all(mut body: B) -> Result { + use futures::StreamExt as _; + + let mut bytes = BytesMut::new(); + while let Some(b) = stream::poll_fn(|cx| body.poll_next(cx)).next().await { + bytes.put(b?); + } + Ok(bytes.freeze()) + } }