diff --git a/CHANGES.md b/CHANGES.md
index 0a5f50e56..db582dc57 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -5,7 +5,9 @@
### Changed
-* Use `sha-1` crate instead of unmaintained `sha1` crate
+* Use `sha-1` crate instead of unmaintained `sha1` crate
+
+* Skip empty chunks when returning response from a `Stream` #1308
* Update the `time` dependency to 0.2.3
diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs
index 850f97ee4..881764439 100644
--- a/actix-http/src/body.rs
+++ b/actix-http/src/body.rs
@@ -5,6 +5,7 @@ use std::{fmt, mem};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
+use futures_util::ready;
use pin_project::{pin_project, project};
use crate::error::Error;
@@ -389,12 +390,19 @@ where
BodySize::Stream
}
+ /// Attempts to pull out the next value of the underlying [`Stream`].
+ ///
+ /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
+ /// ended on a zero-length chunk, but rather proceed until the underlying
+ /// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> {
- unsafe { Pin::new_unchecked(self) }
- .project()
- .stream
- .poll_next(cx)
- .map(|res| res.map(|res| res.map_err(std::convert::Into::into)))
+ let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
+ loop {
+ return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
+ Some(Ok(ref bytes)) if bytes.is_empty() => continue,
+ opt => opt.map(|res| res.map_err(Into::into)),
+ });
+ }
}
}
@@ -424,17 +432,26 @@ where
BodySize::Sized64(self.size)
}
+ /// Attempts to pull out the next value of the underlying [`Stream`].
+ ///
+ /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
+ /// ended on a zero-length chunk, but rather proceed until the underlying
+ /// [`Stream`] ends.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll >> {
- unsafe { Pin::new_unchecked(self) }
- .project()
- .stream
- .poll_next(cx)
+ let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
+ loop {
+ return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
+ Some(Ok(ref bytes)) if bytes.is_empty() => continue,
+ val => val,
+ });
+ }
}
}
#[cfg(test)]
mod tests {
use super::*;
+ use futures::stream;
use futures_util::future::poll_fn;
impl Body {
@@ -589,4 +606,45 @@ mod tests {
BodySize::Sized(25)
);
}
+
+ mod body_stream {
+ use super::*;
+
+ #[actix_rt::test]
+ async fn skips_empty_chunks() {
+ let mut body = BodyStream::new(stream::iter(
+ ["1", "", "2"]
+ .iter()
+ .map(|&v| Ok(Bytes::from(v)) as Result),
+ ));
+ assert_eq!(
+ poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
+ Some(Bytes::from("1")),
+ );
+ assert_eq!(
+ poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
+ Some(Bytes::from("2")),
+ );
+ }
+ }
+
+ mod sized_stream {
+ use super::*;
+
+ #[actix_rt::test]
+ async fn skips_empty_chunks() {
+ let mut body = SizedStream::new(
+ 2,
+ stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))),
+ );
+ assert_eq!(
+ poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
+ Some(Bytes::from("1")),
+ );
+ assert_eq!(
+ poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(),
+ Some(Bytes::from("2")),
+ );
+ }
+ }
}
diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs
index acf76559a..8c94423ac 100644
--- a/actix-http/src/client/pool.rs
+++ b/actix-http/src/client/pool.rs
@@ -488,10 +488,12 @@ where
}
}
+#[pin_project::pin_project(PinnedDrop)]
struct OpenWaitingConnection
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
+ #[pin]
fut: F,
key: Key,
h2: Option<
@@ -525,12 +527,13 @@ where
}
}
-impl Drop for OpenWaitingConnection
+#[pin_project::pinned_drop]
+impl PinnedDrop for OpenWaitingConnection
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
{
- fn drop(&mut self) {
- if let Some(inner) = self.inner.take() {
+ fn drop(self: Pin<&mut Self>) {
+ if let Some(inner) = self.project().inner.take() {
let mut inner = inner.as_ref().borrow_mut();
inner.release();
inner.check_availibility();
@@ -545,8 +548,8 @@ where
{
type Output = ();
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
- let this = unsafe { self.get_unchecked_mut() };
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let this = self.as_mut().project();
if let Some(ref mut h2) = this.h2 {
return match Pin::new(h2).poll(cx) {
@@ -571,7 +574,7 @@ where
};
}
- match unsafe { Pin::new_unchecked(&mut this.fut) }.poll(cx) {
+ match this.fut.poll(cx) {
Poll::Ready(Err(err)) => {
let _ = this.inner.take();
if let Some(rx) = this.rx.take() {
@@ -589,8 +592,8 @@ where
)));
Poll::Ready(())
} else {
- this.h2 = Some(handshake(io).boxed_local());
- unsafe { Pin::new_unchecked(this) }.poll(cx)
+ *this.h2 = Some(handshake(io).boxed_local());
+ self.poll(cx)
}
}
Poll::Pending => Poll::Pending,
diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs
index a4ec15fab..8b17e9479 100644
--- a/actix-http/src/h2/dispatcher.rs
+++ b/actix-http/src/h2/dispatcher.rs
@@ -158,14 +158,16 @@ where
#[pin_project::pin_project]
struct ServiceResponse {
+ #[pin]
state: ServiceResponseState,
config: ServiceConfig,
buffer: Option,
_t: PhantomData<(I, E)>,
}
+#[pin_project::pin_project]
enum ServiceResponseState {
- ServiceCall(F, Option>),
+ ServiceCall(#[pin] F, Option>),
SendPayload(SendStream, ResponseBody),
}
@@ -247,12 +249,14 @@ where
{
type Output = ();
+ #[pin_project::project]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
let mut this = self.as_mut().project();
- match this.state {
- ServiceResponseState::ServiceCall(ref mut call, ref mut send) => {
- match unsafe { Pin::new_unchecked(call) }.poll(cx) {
+ #[project]
+ match this.state.project() {
+ ServiceResponseState::ServiceCall(call, send) => {
+ match call.poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
@@ -273,8 +277,7 @@ where
if size.is_eof() {
Poll::Ready(())
} else {
- *this.state =
- ServiceResponseState::SendPayload(stream, body);
+ this.state.set(ServiceResponseState::SendPayload(stream, body));
self.poll(cx)
}
}
@@ -300,10 +303,10 @@ where
if size.is_eof() {
Poll::Ready(())
} else {
- *this.state = ServiceResponseState::SendPayload(
+ this.state.set(ServiceResponseState::SendPayload(
stream,
body.into_body(),
- );
+ ));
self.poll(cx)
}
}
diff --git a/src/extract.rs b/src/extract.rs
index c189bbf97..5289bd7db 100644
--- a/src/extract.rs
+++ b/src/extract.rs
@@ -193,57 +193,83 @@ impl FromRequest for () {
macro_rules! tuple_from_req ({$fut_type:ident, $(($n:tt, $T:ident)),+} => {
- /// FromRequest implementation for tuple
- #[doc(hidden)]
- #[allow(unused_parens)]
- impl<$($T: FromRequest + 'static),+> FromRequest for ($($T,)+)
- {
- type Error = Error;
- type Future = $fut_type<$($T),+>;
- type Config = ($($T::Config),+);
+ // This module is a trick to get around the inability of
+ // `macro_rules!` macros to make new idents. We want to make
+ // a new `FutWrapper` struct for each distinct invocation of
+ // this macro. Ideally, we would name it something like
+ // `FutWrapper_$fut_type`, but this can't be done in a macro_rules
+ // macro.
+ //
+ // Instead, we put everything in a module named `$fut_type`, thus allowing
+ // us to use the name `FutWrapper` without worrying about conflicts.
+ // This macro only exists to generate trait impls for tuples - these
+ // are inherently global, so users don't have to care about this
+ // weird trick.
+ #[allow(non_snake_case)]
+ mod $fut_type {
- fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
- $fut_type {
- items: <($(Option<$T>,)+)>::default(),
- futs: ($($T::from_request(req, payload),)+),
+ // Bring everything into scope, so we don't need
+ // redundant imports
+ use super::*;
+
+ /// A helper struct to allow us to pin-project through
+ /// to individual fields
+ #[pin_project::pin_project]
+ struct FutWrapper<$($T: FromRequest),+>($(#[pin] $T::Future),+);
+
+ /// FromRequest implementation for tuple
+ #[doc(hidden)]
+ #[allow(unused_parens)]
+ impl<$($T: FromRequest + 'static),+> FromRequest for ($($T,)+)
+ {
+ type Error = Error;
+ type Future = $fut_type<$($T),+>;
+ type Config = ($($T::Config),+);
+
+ fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
+ $fut_type {
+ items: <($(Option<$T>,)+)>::default(),
+ futs: FutWrapper($($T::from_request(req, payload),)+),
+ }
}
}
- }
- #[doc(hidden)]
- #[pin_project::pin_project]
- pub struct $fut_type<$($T: FromRequest),+> {
- items: ($(Option<$T>,)+),
- futs: ($($T::Future,)+),
- }
+ #[doc(hidden)]
+ #[pin_project::pin_project]
+ pub struct $fut_type<$($T: FromRequest),+> {
+ items: ($(Option<$T>,)+),
+ #[pin]
+ futs: FutWrapper<$($T,)+>,
+ }
- impl<$($T: FromRequest),+> Future for $fut_type<$($T),+>
- {
- type Output = Result<($($T,)+), Error>;
+ impl<$($T: FromRequest),+> Future for $fut_type<$($T),+>
+ {
+ type Output = Result<($($T,)+), Error>;
- fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
- let this = self.project();
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let mut this = self.project();
- let mut ready = true;
- $(
- if this.items.$n.is_none() {
- match unsafe { Pin::new_unchecked(&mut this.futs.$n) }.poll(cx) {
- Poll::Ready(Ok(item)) => {
- this.items.$n = Some(item);
+ let mut ready = true;
+ $(
+ if this.items.$n.is_none() {
+ match this.futs.as_mut().project().$n.poll(cx) {
+ Poll::Ready(Ok(item)) => {
+ this.items.$n = Some(item);
+ }
+ Poll::Pending => ready = false,
+ Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
- Poll::Pending => ready = false,
- Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
}
- }
- )+
+ )+
- if ready {
- Poll::Ready(Ok(
- ($(this.items.$n.take().unwrap(),)+)
- ))
- } else {
- Poll::Pending
- }
+ if ready {
+ Poll::Ready(Ok(
+ ($(this.items.$n.take().unwrap(),)+)
+ ))
+ } else {
+ Poll::Pending
+ }
+ }
}
}
});