diff --git a/CHANGES.md b/CHANGES.md index b42635b86..ab9caa7bd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,13 +3,17 @@ ## [2.0.NEXT] - 2020-01-xx +### Added + +* Add helper function for creating routes with `TRACE` method guard `web::trace()` + ### Changed * Use `sha-1` crate instead of unmaintained `sha1` crate * Skip empty chunks when returning response from a `Stream` #1308 -* Update the `time` dependency to 0.2.5 +* Update the `time` dependency to 0.2.7 ## [2.0.0] - 2019-12-25 diff --git a/Cargo.toml b/Cargo.toml index a6783a6db..125008870 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,8 +33,8 @@ members = [ "actix-cors", "actix-files", "actix-framed", - "actix-session", - "actix-identity", +# "actix-session", +# "actix-identity", "actix-multipart", "actix-web-actors", "actix-web-codegen", @@ -71,7 +71,7 @@ actix-threadpool = "0.3.1" actix-tls = "1.0.0" actix-web-codegen = "0.2.0" -actix-http = "1.0.1" +actix-http = "2.0.0-alpha.1" awc = { version = "1.0.1", default-features = false } bytes = "0.5.3" @@ -87,13 +87,13 @@ regex = "1.3" serde = { version = "1.0", features=["derive"] } serde_json = "1.0" serde_urlencoded = "0.6.1" -time = { version = "0.2.5", default-features = false, features = ["std"] } +time = { version = "0.2.7", default-features = false, features = ["std"] } url = "2.1" open-ssl = { version="0.10", package = "openssl", optional = true } rust-tls = { version = "0.16.0", package = "rustls", optional = true } [dev-dependencies] -actix = "0.9.0" +actix = "0.10.0-alpha.1" rand = "0.7" env_logger = "0.6" serde_derive = "1.0" diff --git a/MIGRATION.md b/MIGRATION.md index aef382a21..86721e0eb 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -3,6 +3,8 @@ * Setting a cookie's SameSite property, explicitly, to `SameSite::None` will now result in `SameSite=None` being sent with the response Set-Cookie header. To create a cookie without a SameSite attribute, remove any calls setting same_site. +* actix-http support for Actors messages was moved to actix-http crate and is enabled + with feature `actors` ## 2.0.0 diff --git a/actix-files/Cargo.toml b/actix-files/Cargo.toml index 104eb3dfa..269ea5371 100644 --- a/actix-files/Cargo.toml +++ b/actix-files/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "2.0.0-rc", default-features = false } -actix-http = "1.0.1" +actix-http = "2.0.0-alpha.1" actix-service = "1.0.1" bitflags = "1" bytes = "0.5.3" diff --git a/actix-framed/Cargo.toml b/actix-framed/Cargo.toml index 7e322e1d4..dc974e402 100644 --- a/actix-framed/Cargo.toml +++ b/actix-framed/Cargo.toml @@ -23,7 +23,7 @@ actix-codec = "0.2.0" actix-service = "1.0.1" actix-router = "0.2.1" actix-rt = "1.0.0" -actix-http = "1.0.1" +actix-http = "2.0.0-alpha.1" bytes = "0.5.3" futures = "0.3.1" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 511ef4f1c..229fcbbae 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,10 +1,16 @@ # Changes -# [Unreleased] +## [2.0.0-alpha.1] - 2020-02-27 ### Changed -* Update the `time` dependency to 0.2.5 +* Update the `time` dependency to 0.2.7. + +* Moved actors messages support from actix crate, enabled with feature `actors`. + +* Breaking change: trait MessageBody requires Unpin and accepting Pin<&mut Self> instead of &mut self in the poll_next(). + +* MessageBody is not implemented for &'static [u8] anymore. ### Fixed diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index cd813e49f..356e68405 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-http" -version = "1.0.1" +version = "2.0.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix http primitives" readme = "README.md" @@ -15,7 +15,7 @@ license = "MIT/Apache-2.0" edition = "2018" [package.metadata.docs.rs] -features = ["openssl", "rustls", "failure", "compress", "secure-cookies"] +features = ["openssl", "rustls", "failure", "compress", "secure-cookies","actors"] [lib] name = "actix_http" @@ -39,14 +39,18 @@ failure = ["fail-ure"] # support for secure cookies secure-cookies = ["ring"] +# support for actix Actor messages +actors = ["actix"] + [dependencies] -actix-service = "1.0.1" +actix-service = "1.0.5" actix-codec = "0.2.0" -actix-connect = "1.0.1" -actix-utils = "1.0.3" +actix-connect = "1.0.2" +actix-utils = "1.0.6" actix-rt = "1.0.0" actix-threadpool = "0.3.1" actix-tls = { version = "1.0.0", optional = true } +actix = { version = "0.10.0-alpha.1", optional = true } base64 = "0.11" bitflags = "1.2" @@ -76,7 +80,7 @@ serde_json = "1.0" sha-1 = "0.8" slab = "0.4" serde_urlencoded = "0.6.1" -time = { version = "0.2.5", default-features = false, features = ["std"] } +time = { version = "0.2.7", default-features = false, features = ["std"] } # for secure cookie ring = { version = "0.16.9", optional = true } @@ -89,12 +93,17 @@ flate2 = { version = "1.0.13", optional = true } fail-ure = { version = "0.1.5", package="failure", optional = true } [dev-dependencies] -actix-server = "1.0.0" -actix-connect = { version = "1.0.0", features=["openssl"] } +actix-server = "1.0.1" +actix-connect = { version = "1.0.2", features=["openssl"] } actix-http-test = { version = "1.0.0", features=["openssl"] } actix-tls = { version = "1.0.0", features=["openssl"] } +criterion = "0.3" futures = "0.3.1" -env_logger = "0.6" +env_logger = "0.7" serde_derive = "1.0" open-ssl = { version="0.10", package = "openssl" } rust-tls = { version="0.16", package = "rustls" } + +[[bench]] +name = "content-length" +harness = false diff --git a/actix-http/benches/content-length.rs b/actix-http/benches/content-length.rs new file mode 100644 index 000000000..b001b3931 --- /dev/null +++ b/actix-http/benches/content-length.rs @@ -0,0 +1,267 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; + +use bytes::BytesMut; + +// benchmark sending all requests at the same time +fn bench_write_content_length(c: &mut Criterion) { + let mut group = c.benchmark_group("write_content_length"); + + let sizes = [ + 0, 1, 11, 83, 101, 653, 1001, 6323, 10001, 56329, 100001, 123456, 98724245, + 4294967202, + ]; + + for i in sizes.iter() { + group.bench_with_input(BenchmarkId::new("Original (unsafe)", i), i, |b, &i| { + b.iter(|| { + let mut b = BytesMut::with_capacity(35); + _original::write_content_length(i, &mut b) + }) + }); + + group.bench_with_input(BenchmarkId::new("New (safe)", i), i, |b, &i| { + b.iter(|| { + let mut b = BytesMut::with_capacity(35); + _new::write_content_length(i, &mut b) + }) + }); + } + + group.finish(); +} + +criterion_group!(benches, bench_write_content_length); +criterion_main!(benches); + +mod _new { + use bytes::{BufMut, BytesMut}; + + const DIGITS_START: u8 = b'0'; + + /// NOTE: bytes object has to contain enough space + pub fn write_content_length(n: usize, bytes: &mut BytesMut) { + if n == 0 { + bytes.put_slice(b"\r\ncontent-length: 0\r\n"); + return; + } + + bytes.put_slice(b"\r\ncontent-length: "); + + if n < 10 { + bytes.put_u8(DIGITS_START + (n as u8)); + } else if n < 100 { + let n = n as u8; + + let d10 = n / 10; + let d1 = n % 10; + + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 1000 { + let n = n as u16; + + let d100 = (n / 100) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 10_000 { + let n = n as u16; + + let d1000 = (n / 1000) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 100_000 { + let n = n as u32; + + let d10000 = (n / 10000) as u8; + let d1000 = ((n / 1000) % 10) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d10000); + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 1_000_000 { + let n = n as u32; + + let d100000 = (n / 100000) as u8; + let d10000 = ((n / 10000) % 10) as u8; + let d1000 = ((n / 1000) % 10) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d100000); + bytes.put_u8(DIGITS_START + d10000); + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else { + write_usize(n, bytes); + } + + bytes.put_slice(b"\r\n"); + } + + fn write_usize(n: usize, bytes: &mut BytesMut) { + let mut n = n; + + // 20 chars is max length of a usize (2^64) + // digits will be added to the buffer from lsd to msd + let mut buf = BytesMut::with_capacity(20); + + while n > 9 { + // "pop" the least-significant digit + let lsd = (n % 10) as u8; + + // remove the lsd from n + n = n / 10; + + buf.put_u8(DIGITS_START + lsd); + } + + // put msd to result buffer + bytes.put_u8(DIGITS_START + (n as u8)); + + // put, in reverse (msd to lsd), remaining digits to buffer + for i in (0..buf.len()).rev() { + bytes.put_u8(buf[i]); + } + } +} + +mod _original { + use std::{mem, ptr, slice}; + + use bytes::{BufMut, BytesMut}; + + const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\ + 2021222324252627282930313233343536373839\ + 4041424344454647484950515253545556575859\ + 6061626364656667686970717273747576777879\ + 8081828384858687888990919293949596979899"; + + /// NOTE: bytes object has to contain enough space + pub fn write_content_length(mut n: usize, bytes: &mut BytesMut) { + if n < 10 { + let mut buf: [u8; 21] = [ + b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', + b'e', b'n', b'g', b't', b'h', b':', b' ', b'0', b'\r', b'\n', + ]; + buf[18] = (n as u8) + b'0'; + bytes.put_slice(&buf); + } else if n < 100 { + let mut buf: [u8; 22] = [ + b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', + b'e', b'n', b'g', b't', b'h', b':', b' ', b'0', b'0', b'\r', b'\n', + ]; + let d1 = n << 1; + unsafe { + ptr::copy_nonoverlapping( + DEC_DIGITS_LUT.as_ptr().add(d1), + buf.as_mut_ptr().offset(18), + 2, + ); + } + bytes.put_slice(&buf); + } else if n < 1000 { + let mut buf: [u8; 23] = [ + b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', + b'e', b'n', b'g', b't', b'h', b':', b' ', b'0', b'0', b'0', b'\r', + b'\n', + ]; + // decode 2 more chars, if > 2 chars + let d1 = (n % 100) << 1; + n /= 100; + unsafe { + ptr::copy_nonoverlapping( + DEC_DIGITS_LUT.as_ptr().add(d1), + buf.as_mut_ptr().offset(19), + 2, + ) + }; + + // decode last 1 + buf[18] = (n as u8) + b'0'; + + bytes.put_slice(&buf); + } else { + bytes.put_slice(b"\r\ncontent-length: "); + convert_usize(n, bytes); + } + } + + pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) { + let mut curr: isize = 39; + let mut buf: [u8; 41] = unsafe { mem::MaybeUninit::uninit().assume_init() }; + buf[39] = b'\r'; + buf[40] = b'\n'; + let buf_ptr = buf.as_mut_ptr(); + let lut_ptr = DEC_DIGITS_LUT.as_ptr(); + + // eagerly decode 4 characters at a time + while n >= 10_000 { + let rem = (n % 10_000) as isize; + n /= 10_000; + + let d1 = (rem / 100) << 1; + let d2 = (rem % 100) << 1; + curr -= 4; + unsafe { + ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); + ptr::copy_nonoverlapping( + lut_ptr.offset(d2), + buf_ptr.offset(curr + 2), + 2, + ); + } + } + + // if we reach here numbers are <= 9999, so at most 4 chars long + let mut n = n as isize; // possibly reduce 64bit math + + // decode 2 more chars, if > 2 chars + if n >= 100 { + let d1 = (n % 100) << 1; + n /= 100; + curr -= 2; + unsafe { + ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); + } + } + + // decode last 1 or 2 chars + if n < 10 { + curr -= 1; + unsafe { + *buf_ptr.offset(curr) = (n as u8) + b'0'; + } + } else { + let d1 = n << 1; + curr -= 2; + unsafe { + ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); + } + } + + unsafe { + bytes.extend_from_slice(slice::from_raw_parts( + buf_ptr.offset(curr), + 41 - curr as usize, + )); + } + } +} diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index 3d57a472a..b2b88a7ea 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -17,23 +17,18 @@ async fn main() -> io::Result<()> { HttpService::build() .client_timeout(1000) .client_disconnect(1000) - .finish(|mut req: Request| { - async move { - let mut body = BytesMut::new(); - while let Some(item) = req.payload().next().await { - body.extend_from_slice(&item?); - } - - info!("request body: {:?}", body); - Ok::<_, Error>( - Response::Ok() - .header( - "x-head", - HeaderValue::from_static("dummy value!"), - ) - .body(body), - ) + .finish(|mut req: Request| async move { + let mut body = BytesMut::new(); + while let Some(item) = req.payload().next().await { + body.extend_from_slice(&item?); } + + info!("request body: {:?}", body); + Ok::<_, Error>( + Response::Ok() + .header("x-head", HeaderValue::from_static("dummy value!")) + .body(body), + ) }) .tcp() })? diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index e2bcce359..c581db604 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -36,33 +36,46 @@ impl BodySize { pub trait MessageBody { fn size(&self) -> BodySize; - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>>; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>>; + + downcast_get_type_id!(); } +downcast!(MessageBody); + impl MessageBody for () { fn size(&self) -> BodySize { BodySize::Empty } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { Poll::Ready(None) } } -impl MessageBody for Box { +impl MessageBody for Box { fn size(&self) -> BodySize { self.as_ref().size() } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - self.as_mut().poll_next(cx) + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(self.get_mut().as_mut()).poll_next(cx) } } #[pin_project] pub enum ResponseBody { - Body(B), - Other(Body), + Body(#[pin] B), + Other(#[pin] Body), } impl ResponseBody { @@ -98,10 +111,15 @@ impl MessageBody for ResponseBody { } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self { - ResponseBody::Body(ref mut body) => body.poll_next(cx), - ResponseBody::Other(ref mut body) => body.poll_next(cx), + #[project] + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + #[project] + match self.project() { + ResponseBody::Body(body) => body.poll_next(cx), + ResponseBody::Other(body) => body.poll_next(cx), } } } @@ -116,12 +134,13 @@ impl Stream for ResponseBody { ) -> Poll> { #[project] match self.project() { - ResponseBody::Body(ref mut body) => body.poll_next(cx), - ResponseBody::Other(ref mut body) => body.poll_next(cx), + ResponseBody::Body(body) => body.poll_next(cx), + ResponseBody::Other(body) => body.poll_next(cx), } } } +#[pin_project] /// Represents various types of http message body. pub enum Body { /// Empty response. `Content-Length` header is not set. @@ -131,7 +150,7 @@ pub enum Body { /// Specific response body. Bytes(Bytes), /// Generic message body. - Message(Box), + Message(Box), } impl Body { @@ -141,7 +160,7 @@ impl Body { } /// Create body from generic message body. - pub fn from_message(body: B) -> Body { + pub fn from_message(body: B) -> Body { Body::Message(Box::new(body)) } } @@ -156,8 +175,13 @@ impl MessageBody for Body { } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self { + #[project] + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + #[project] + match self.project() { Body::None => Poll::Ready(None), Body::Empty => Poll::Ready(None), Body::Bytes(ref mut bin) => { @@ -168,7 +192,7 @@ impl MessageBody for Body { Poll::Ready(Some(Ok(mem::replace(bin, Bytes::new())))) } } - Body::Message(ref mut body) => body.poll_next(cx), + Body::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx), } } } @@ -254,7 +278,7 @@ impl From for Body { impl From> for Body where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, { fn from(s: SizedStream) -> Body { Body::from_message(s) @@ -263,7 +287,7 @@ where impl From> for Body where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { fn from(s: BodyStream) -> Body { @@ -276,11 +300,14 @@ impl MessageBody for Bytes { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(mem::replace(self, Bytes::new())))) + Poll::Ready(Some(Ok(mem::replace(self.get_mut(), Bytes::new())))) } } } @@ -290,11 +317,16 @@ impl MessageBody for BytesMut { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(mem::replace(self, BytesMut::new()).freeze()))) + Poll::Ready(Some(Ok( + mem::replace(self.get_mut(), BytesMut::new()).freeze() + ))) } } } @@ -304,41 +336,36 @@ impl MessageBody for &'static str { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { Poll::Ready(Some(Ok(Bytes::from_static( - mem::replace(self, "").as_ref(), + mem::replace(self.get_mut(), "").as_ref(), )))) } } } -impl MessageBody for &'static [u8] { - fn size(&self) -> BodySize { - BodySize::Sized(self.len()) - } - - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { - if self.is_empty() { - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(Bytes::from_static(mem::replace(self, b""))))) - } - } -} - impl MessageBody for Vec { fn size(&self) -> BodySize { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(Bytes::from(mem::replace(self, Vec::new()))))) + Poll::Ready(Some(Ok(Bytes::from(mem::replace( + self.get_mut(), + Vec::new(), + ))))) } } } @@ -348,12 +375,15 @@ impl MessageBody for String { BodySize::Sized(self.len()) } - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { Poll::Ready(Some(Ok(Bytes::from( - mem::replace(self, String::new()).into_bytes(), + mem::replace(self.get_mut(), String::new()).into_bytes(), )))) } } @@ -361,19 +391,21 @@ impl MessageBody for String { /// Type represent streaming body. /// Response does not contain `content-length` header and appropriate transfer encoding is used. -pub struct BodyStream { - stream: Pin>, +#[pin_project] +pub struct BodyStream { + #[pin] + stream: S, _t: PhantomData, } impl BodyStream where - S: Stream>, + S: Stream> + Unpin, E: Into, { pub fn new(stream: S) -> Self { BodyStream { - stream: Box::pin(stream), + stream, _t: PhantomData, } } @@ -381,7 +413,7 @@ where impl MessageBody for BodyStream where - S: Stream>, + S: Stream> + Unpin, E: Into, { fn size(&self) -> BodySize { @@ -393,10 +425,14 @@ where /// Empty values are skipped to prevent [`BodyStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = self.stream.as_mut(); + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut stream = self.project().stream; loop { - return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + let stream = stream.as_mut(); + return Poll::Ready(match ready!(stream.poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, opt => opt.map(|res| res.map_err(Into::into)), }); @@ -406,23 +442,25 @@ where /// Type represent streaming body. This body implementation should be used /// if total size of stream is known. Data get sent as is without using transfer encoding. -pub struct SizedStream { +#[pin_project] +pub struct SizedStream { size: u64, - stream: Pin>, + #[pin] + stream: S, } impl SizedStream where - S: Stream>, + S: Stream> + Unpin, { pub fn new(size: u64, stream: S) -> Self { - SizedStream { size, stream: Box::pin(stream) } + SizedStream { size, stream } } } impl MessageBody for SizedStream where - S: Stream>, + S: Stream> + Unpin, { fn size(&self) -> BodySize { BodySize::Sized64(self.size) @@ -433,10 +471,14 @@ where /// Empty values are skipped to prevent [`SizedStream`]'s transmission being /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = self.stream.as_mut(); + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut stream: Pin<&mut S> = self.project().stream; loop { - return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { + let stream = stream.as_mut(); + return Poll::Ready(match ready!(stream.poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, val => val, }); @@ -449,6 +491,7 @@ mod tests { use super::*; use futures::stream; use futures_util::future::poll_fn; + use futures_util::pin_mut; impl Body { pub(crate) fn get_ref(&self) -> &[u8] { @@ -476,7 +519,10 @@ mod tests { assert_eq!("test".size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| "test".poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("test")) ); } @@ -490,13 +536,12 @@ mod tests { BodySize::Sized(4) ); assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test"); + let sb = Bytes::from(&b"test"[..]); + pin_mut!(sb); - assert_eq!((&b"test"[..]).size(), BodySize::Sized(4)); + assert_eq!(sb.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| (&b"test"[..]).poll_next(cx)) - .await - .unwrap() - .ok(), + poll_fn(|cx| sb.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } @@ -505,10 +550,12 @@ mod tests { async fn test_vec() { assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4)); assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test"); + let test_vec = Vec::from("test"); + pin_mut!(test_vec); - assert_eq!(Vec::from("test").size(), BodySize::Sized(4)); + assert_eq!(test_vec.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| Vec::from("test").poll_next(cx)) + poll_fn(|cx| test_vec.as_mut().poll_next(cx)) .await .unwrap() .ok(), @@ -518,41 +565,44 @@ mod tests { #[actix_rt::test] async fn test_bytes() { - let mut b = Bytes::from("test"); + let b = Bytes::from("test"); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } #[actix_rt::test] async fn test_bytes_mut() { - let mut b = BytesMut::from("test"); + let b = BytesMut::from("test"); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } #[actix_rt::test] async fn test_string() { - let mut b = "test".to_owned(); + let b = "test".to_owned(); assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4)); assert_eq!(Body::from(b.clone()).get_ref(), b"test"); assert_eq!(Body::from(&b).size(), BodySize::Sized(4)); assert_eq!(Body::from(&b).get_ref(), b"test"); + pin_mut!(b); assert_eq!(b.size(), BodySize::Sized(4)); assert_eq!( - poll_fn(|cx| b.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| b.as_mut().poll_next(cx)).await.unwrap().ok(), Some(Bytes::from("test")) ); } @@ -560,14 +610,17 @@ mod tests { #[actix_rt::test] async fn test_unit() { assert_eq!(().size(), BodySize::Empty); - assert!(poll_fn(|cx| ().poll_next(cx)).await.is_none()); + assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)) + .await + .is_none()); } #[actix_rt::test] async fn test_box() { - let mut val = Box::new(()); + let val = Box::new(()); + pin_mut!(val); assert_eq!(val.size(), BodySize::Empty); - assert!(poll_fn(|cx| val.poll_next(cx)).await.is_none()); + assert!(poll_fn(|cx| val.as_mut().poll_next(cx)).await.is_none()); } #[actix_rt::test] @@ -605,23 +658,54 @@ mod tests { mod body_stream { use super::*; + //use futures::task::noop_waker; + //use futures::stream::once; #[actix_rt::test] async fn skips_empty_chunks() { - let mut body = BodyStream::new(stream::iter( + let body = BodyStream::new(stream::iter( ["1", "", "2"] .iter() .map(|&v| Ok(Bytes::from(v)) as Result), )); + pin_mut!(body); + assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("2")), ); } + + /* Now it does not compile as it should + #[actix_rt::test] + async fn move_pinned_pointer() { + let (sender, receiver) = futures::channel::oneshot::channel(); + let mut body_stream = Ok(BodyStream::new(once(async { + let x = Box::new(0i32); + let y = &x; + receiver.await.unwrap(); + let _z = **y; + Ok::<_, ()>(Bytes::new()) + }))); + + let waker = noop_waker(); + let mut context = Context::from_waker(&waker); + pin_mut!(body_stream); + + let _ = body_stream.as_mut().unwrap().poll_next(&mut context); + sender.send(()).unwrap(); + let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); + }*/ } mod sized_stream { @@ -629,18 +713,39 @@ mod tests { #[actix_rt::test] async fn skips_empty_chunks() { - let mut body = SizedStream::new( + let body = SizedStream::new( 2, stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), ); + pin_mut!(body); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("1")), ); assert_eq!( - poll_fn(|cx| body.poll_next(cx)).await.unwrap().ok(), + poll_fn(|cx| body.as_mut().poll_next(cx)) + .await + .unwrap() + .ok(), Some(Bytes::from("2")), ); } } + + #[actix_rt::test] + async fn test_body_casting() { + let mut body = String::from("hello cast"); + let resp_body: &mut dyn MessageBody = &mut body; + let body = resp_body.downcast_ref::().unwrap(); + assert_eq!(body, "hello cast"); + let body = &mut resp_body.downcast_mut::().unwrap(); + body.push_str("!"); + let body = resp_body.downcast_ref::().unwrap(); + assert_eq!(body, "hello cast!"); + let not_body = resp_body.downcast_ref::<()>(); + assert!(not_body.is_none()); + } } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index a0a20edf6..51e853b3d 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -8,7 +8,7 @@ use bytes::buf::BufMutExt; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::future::poll_fn; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{pin_mut, SinkExt, StreamExt}; use crate::error::PayloadError; use crate::h1; @@ -120,7 +120,7 @@ where /// send request body to the peer pub(crate) async fn send_body( - mut body: B, + body: B, framed: &mut Framed, ) -> Result<(), SendRequestError> where @@ -128,9 +128,10 @@ where B: MessageBody, { let mut eof = false; + pin_mut!(body); while !eof { while !eof && !framed.is_write_buf_full() { - match poll_fn(|cx| body.poll_next(cx)).await { + match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(result) => { framed.write(h1::Message::Chunk(Some(result?)))?; } diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index eabf54e97..69d20752a 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -4,6 +4,7 @@ use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures_util::future::poll_fn; +use futures_util::pin_mut; use h2::{client::SendRequest, SendStream}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; @@ -123,13 +124,14 @@ where } async fn send_body( - mut body: B, + body: B, mut send: SendStream, ) -> Result<(), SendRequestError> { let mut buf = None; + pin_mut!(body); loop { if buf.is_none() { - match poll_fn(|cx| body.poll_next(cx)).await { + match poll_fn(|cx| body.as_mut().poll_next(cx)).await { Some(Ok(b)) => { send.reserve_capacity(b.len()); buf = Some(b); diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 8c94423ac..38a51b558 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -16,6 +16,7 @@ use fxhash::FxHashMap; use h2::client::{handshake, Connection, SendRequest}; use http::uri::Authority; use indexmap::IndexSet; +use pin_project::pin_project; use slab::Slab; use super::connection::{ConnectionType, IoConnection}; @@ -422,6 +423,7 @@ where } } +#[pin_project] struct ConnectorPoolSupport where Io: AsyncRead + AsyncWrite + Unpin + 'static, @@ -439,7 +441,7 @@ where type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = unsafe { self.get_unchecked_mut() }; + let this = self.project(); let mut inner = this.inner.as_ref().borrow_mut(); inner.waker.register(cx.waker()); diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index a38a80e76..899046231 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -211,7 +211,12 @@ impl Date { } fn update(&mut self) { self.pos = 0; - write!(self, "{}", OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); + write!( + self, + "{}", + OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT") + ) + .unwrap(); } } @@ -282,7 +287,6 @@ impl DateService { mod tests { use super::*; - // Test modifying the date from within the closure // passed to `set_date` #[test] @@ -290,9 +294,7 @@ mod tests { let service = DateService::new(); // Make sure that `check_date` doesn't try to spawn a task service.0.update(); - service.set_date(|_| { - service.0.reset() - }); + service.set_date(|_| service.0.reset()); } #[test] diff --git a/actix-http/src/cookie/builder.rs b/actix-http/src/cookie/builder.rs index c3820abf0..80e7ee71f 100644 --- a/actix-http/src/cookie/builder.rs +++ b/actix-http/src/cookie/builder.rs @@ -109,7 +109,8 @@ impl CookieBuilder { pub fn max_age_time(mut self, value: Duration) -> CookieBuilder { // Truncate any nanoseconds from the Duration, as they aren't represented within `Max-Age` // and would cause two otherwise identical `Cookie` instances to not be equivalent to one another. - self.cookie.set_max_age(Duration::seconds(value.whole_seconds())); + self.cookie + .set_max_age(Duration::seconds(value.whole_seconds())); self } diff --git a/actix-http/src/cookie/jar.rs b/actix-http/src/cookie/jar.rs index 64922897b..dd4ec477e 100644 --- a/actix-http/src/cookie/jar.rs +++ b/actix-http/src/cookie/jar.rs @@ -533,8 +533,8 @@ mod test { #[test] #[cfg(feature = "secure-cookies")] fn delta() { - use time::Duration; use std::collections::HashMap; + use time::Duration; let mut c = CookieJar::new(); diff --git a/actix-http/src/cookie/mod.rs b/actix-http/src/cookie/mod.rs index 09120e19f..7f74abc95 100644 --- a/actix-http/src/cookie/mod.rs +++ b/actix-http/src/cookie/mod.rs @@ -47,7 +47,7 @@ //! ``` #![doc(html_root_url = "https://docs.rs/cookie/0.11")] -#![deny(missing_docs)] +#![warn(missing_docs)] mod builder; mod delta; @@ -990,7 +990,7 @@ impl<'a, 'b> PartialEq> for Cookie<'a> { #[cfg(test)] mod tests { use super::{Cookie, SameSite}; - use time::{offset, PrimitiveDateTime}; + use time::PrimitiveDateTime; #[test] fn format() { @@ -1015,7 +1015,9 @@ mod tests { assert_eq!(&cookie.to_string(), "foo=bar; Domain=www.rust-lang.org"); let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; - let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().using_offset(offset!(UTC)); + let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S") + .unwrap() + .assume_utc(); let cookie = Cookie::build("foo", "bar").expires(expires).finish(); assert_eq!( &cookie.to_string(), diff --git a/actix-http/src/cookie/parse.rs b/actix-http/src/cookie/parse.rs index 28eb4f8b6..ce261c758 100644 --- a/actix-http/src/cookie/parse.rs +++ b/actix-http/src/cookie/parse.rs @@ -6,7 +6,7 @@ use std::fmt; use std::str::Utf8Error; use percent_encoding::percent_decode; -use time::{Duration, offset}; +use time::Duration; use super::{Cookie, CookieStr, SameSite}; @@ -188,7 +188,7 @@ fn parse_inner<'c>(s: &str, decode: bool) -> Result, ParseError> { .or_else(|| time::parse(v, "%a, %d-%b-%Y %H:%M:%S").ok()); if let Some(time) = tm { - cookie.expires = Some(time.using_offset(offset!(UTC))) + cookie.expires = Some(time.assume_utc()) } } _ => { @@ -216,7 +216,7 @@ where #[cfg(test)] mod tests { use super::{Cookie, SameSite}; - use time::{offset, Duration, PrimitiveDateTime}; + use time::{Duration, PrimitiveDateTime}; macro_rules! assert_eq_parse { ($string:expr, $expected:expr) => { @@ -376,7 +376,9 @@ mod tests { ); let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; - let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().using_offset(offset!(UTC)); + let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S") + .unwrap() + .assume_utc(); expected.set_expires(expires); assert_eq_parse!( " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ @@ -385,7 +387,9 @@ mod tests { ); unexpected.set_domain("foo.com"); - let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M").unwrap().using_offset(offset!(UTC)); + let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M") + .unwrap() + .assume_utc(); expected.set_expires(bad_expires); assert_ne_parse!( " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ @@ -414,8 +418,15 @@ mod tests { #[test] fn do_not_panic_on_large_max_ages() { let max_duration = Duration::max_value(); - let expected = Cookie::build("foo", "bar").max_age_time(max_duration).finish(); - let overflow_duration = max_duration.checked_add(Duration::nanoseconds(1)).unwrap_or(max_duration); - assert_eq_parse!(format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()), expected); + let expected = Cookie::build("foo", "bar") + .max_age_time(max_duration) + .finish(); + let overflow_duration = max_duration + .checked_add(Duration::nanoseconds(1)) + .unwrap_or(max_duration); + assert_eq_parse!( + format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()), + expected + ); } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index ca04845ab..72bb7d603 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -9,6 +9,7 @@ use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; +use pin_project::{pin_project, project}; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; @@ -19,8 +20,10 @@ use super::Writer; const INPLACE: usize = 1024; +#[pin_project] pub struct Encoder { eof: bool, + #[pin] body: EncoderBody, encoder: Option, fut: Option>, @@ -76,67 +79,88 @@ impl Encoder { } } +#[pin_project] enum EncoderBody { Bytes(Bytes), - Stream(B), - BoxedStream(Box), + Stream(#[pin] B), + BoxedStream(Box), +} + +impl MessageBody for EncoderBody { + fn size(&self) -> BodySize { + match self { + EncoderBody::Bytes(ref b) => b.size(), + EncoderBody::Stream(ref b) => b.size(), + EncoderBody::BoxedStream(ref b) => b.size(), + } + } + + #[project] + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + #[project] + match self.project() { + EncoderBody::Bytes(b) => { + if b.is_empty() { + Poll::Ready(None) + } else { + Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new())))) + } + } + EncoderBody::Stream(b) => b.poll_next(cx), + EncoderBody::BoxedStream(ref mut b) => Pin::new(b.as_mut()).poll_next(cx), + } + } } impl MessageBody for Encoder { fn size(&self) -> BodySize { if self.encoder.is_none() { - match self.body { - EncoderBody::Bytes(ref b) => b.size(), - EncoderBody::Stream(ref b) => b.size(), - EncoderBody::BoxedStream(ref b) => b.size(), - } + self.body.size() } else { BodySize::Stream } } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let mut this = self.project(); loop { - if self.eof { + if *this.eof { return Poll::Ready(None); } - if let Some(ref mut fut) = self.fut { + if let Some(ref mut fut) = this.fut { let mut encoder = match ready!(Pin::new(fut).poll(cx)) { Ok(item) => item, Err(e) => return Poll::Ready(Some(Err(e.into()))), }; let chunk = encoder.take(); - self.encoder = Some(encoder); - self.fut.take(); + *this.encoder = Some(encoder); + this.fut.take(); if !chunk.is_empty() { return Poll::Ready(Some(Ok(chunk))); } } - let result = match self.body { - EncoderBody::Bytes(ref mut b) => { - if b.is_empty() { - Poll::Ready(None) - } else { - Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new())))) - } - } - EncoderBody::Stream(ref mut b) => b.poll_next(cx), - EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx), - }; + let result = this.body.as_mut().poll_next(cx); + match result { Poll::Ready(Some(Ok(chunk))) => { - if let Some(mut encoder) = self.encoder.take() { + if let Some(mut encoder) = this.encoder.take() { if chunk.len() < INPLACE { encoder.write(&chunk)?; let chunk = encoder.take(); - self.encoder = Some(encoder); + *this.encoder = Some(encoder); if !chunk.is_empty() { return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(run(move || { + *this.fut = Some(run(move || { encoder.write(&chunk)?; Ok(encoder) })); @@ -146,12 +170,12 @@ impl MessageBody for Encoder { } } Poll::Ready(None) => { - if let Some(encoder) = self.encoder.take() { + if let Some(encoder) = this.encoder.take() { let chunk = encoder.finish()?; if chunk.is_empty() { return Poll::Ready(None); } else { - self.eof = true; + *this.eof = true; return Poll::Ready(Some(Ok(chunk))); } } else { diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index b6637075c..0850e18ff 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -1,5 +1,4 @@ //! Error and Result module -use std::any::TypeId; use std::cell::RefCell; use std::io::Write; use std::str::Utf8Error; @@ -15,7 +14,6 @@ use derive_more::{Display, From}; pub use futures_channel::oneshot::Canceled; use http::uri::InvalidUri; use http::{header, Error as HttpError, StatusCode}; -use httparse; use serde::de::value::Error as DeError; use serde_json::error::Error as JsonError; use serde_urlencoded::ser::Error as FormError; @@ -60,12 +58,6 @@ impl Error { } } -/// A struct with a private constructor, for use with -/// `__private_get_type_id__`. Its single field is private, -/// ensuring that it can only be constructed from this module -#[doc(hidden)] -pub struct PrivateHelper(()); - /// Error that can be converted to `Response` pub trait ResponseError: fmt::Debug + fmt::Display { /// Response's status code @@ -89,43 +81,10 @@ pub trait ResponseError: fmt::Debug + fmt::Display { resp.set_body(Body::from(buf)) } - /// A helper method to get the type ID of the type - /// this trait is implemented on. - /// This method is unsafe to *implement*, since `downcast_ref` relies - /// on the returned `TypeId` to perform a cast. - /// - /// Unfortunately, Rust has no notion of a trait method that is - /// unsafe to implement (marking it as `unsafe` makes it unsafe - /// to *call*). As a workaround, we require this method - /// to return a private type along with the `TypeId`. This - /// private type (`PrivateHelper`) has a private constructor, - /// making it impossible for safe code to construct outside of - /// this module. This ensures that safe code cannot violate - /// type-safety by implementing this method. - #[doc(hidden)] - fn __private_get_type_id__(&self) -> (TypeId, PrivateHelper) - where - Self: 'static, - { - (TypeId::of::(), PrivateHelper(())) - } + downcast_get_type_id!(); } -impl dyn ResponseError + 'static { - /// Downcasts a response error to a specific type. - pub fn downcast_ref(&self) -> Option<&T> { - if self.__private_get_type_id__().0 == TypeId::of::() { - // Safety: external crates cannot override the default - // implementation of `__private_get_type_id__`, since - // it requires returning a private type. We can therefore - // rely on the returned `TypeId`, which ensures that this - // case is correct. - unsafe { Some(&*(self as *const dyn ResponseError as *const T)) } - } else { - None - } - } -} +downcast!(ResponseError); impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -991,6 +950,16 @@ where /// Compatibility for `failure::Error` impl ResponseError for fail_ure::Error {} +#[cfg(feature = "actors")] +/// `InternalServerError` for `actix::MailboxError` +/// This is supported on feature=`actors` only +impl ResponseError for actix::MailboxError {} + +#[cfg(feature = "actors")] +/// `InternalServerError` for `actix::ResolverError` +/// This is supported on feature=`actors` only +impl ResponseError for actix::actors::resolver::ResolverError {} + #[cfg(test)] mod tests { use super::*; diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index e113fd52d..d3ccd8e5a 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -8,7 +8,6 @@ use actix_codec::Decoder; use bytes::{Buf, Bytes, BytesMut}; use http::header::{HeaderName, HeaderValue}; use http::{header, Method, StatusCode, Uri, Version}; -use httparse; use log::{debug, error, trace}; use crate::error::ParseError; diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 6f4c09915..6276653d3 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,3 +1,6 @@ +// Because MSRV is 1.39.0. +#![allow(clippy::mem_replace_with_default)] + use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; @@ -10,6 +13,7 @@ use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use log::{error, trace}; +use pin_project::pin_project; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::cloneable::CloneableService; @@ -41,6 +45,7 @@ bitflags! { } } +#[pin_project::pin_project] /// Dispatcher for HTTP/1.1 protocol pub struct Dispatcher where @@ -52,9 +57,11 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { + #[pin] inner: DispatcherState, } +#[pin_project] enum DispatcherState where S: Service, @@ -65,11 +72,11 @@ where U: Service), Response = ()>, U::Error: fmt::Display, { - Normal(InnerDispatcher), - Upgrade(U::Future), - None, + Normal(#[pin] InnerDispatcher), + Upgrade(#[pin] U::Future), } +#[pin_project] struct InnerDispatcher where S: Service, @@ -88,6 +95,7 @@ where peer_addr: Option, error: Option, + #[pin] state: State, payload: Option, messages: VecDeque, @@ -95,7 +103,7 @@ where ka_expire: Instant, ka_timer: Option, - io: T, + io: Option, read_buf: BytesMut, write_buf: BytesMut, codec: Codec, @@ -107,6 +115,7 @@ enum DispatcherMessage { Error(Response<()>), } +#[pin_project] enum State where S: Service, @@ -114,9 +123,9 @@ where B: MessageBody, { None, - ExpectCall(X::Future), - ServiceCall(S::Future), - SendPayload(ResponseBody), + ExpectCall(#[pin] X::Future), + ServiceCall(#[pin] S::Future), + SendPayload(#[pin] ResponseBody), } impl State @@ -141,7 +150,6 @@ where } } } - enum PollResponse { Upgrade(Request), DoNothing, @@ -236,7 +244,7 @@ where state: State::None, error: None, messages: VecDeque::new(), - io, + io: Some(io), codec, read_buf, service, @@ -278,10 +286,11 @@ where } // if checked is set to true, delay disconnect until all tasks have finished. - fn client_disconnected(&mut self) { - self.flags + fn client_disconnected(self: Pin<&mut Self>) { + let this = self.project(); + this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } } @@ -290,17 +299,22 @@ where /// /// true - got whouldblock /// false - didnt get whouldblock - fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result { + #[pin_project::project] + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result { if self.write_buf.is_empty() { return Ok(false); } let len = self.write_buf.len(); let mut written = 0; + #[project] + let InnerDispatcher { io, write_buf, .. } = self.project(); + let mut io = Pin::new(io.as_mut().unwrap()); while written < len { - match unsafe { Pin::new_unchecked(&mut self.io) } - .poll_write(cx, &self.write_buf[written..]) - { + match io.as_mut().poll_write(cx, &write_buf[written..]) { Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( io::ErrorKind::WriteZero, @@ -312,112 +326,118 @@ where } Poll::Pending => { if written > 0 { - self.write_buf.advance(written); + write_buf.advance(written); } return Ok(true); } Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)), } } - if written == self.write_buf.len() { - unsafe { self.write_buf.set_len(0) } + if written == write_buf.len() { + unsafe { write_buf.set_len(0) } } else { - self.write_buf.advance(written); + write_buf.advance(written); } Ok(false) } fn send_response( - &mut self, + self: Pin<&mut Self>, message: Response<()>, body: ResponseBody, ) -> Result, DispatchError> { - self.codec - .encode(Message::Item((message, body.size())), &mut self.write_buf) + let mut this = self.project(); + this.codec + .encode(Message::Item((message, body.size())), &mut this.write_buf) .map_err(|err| { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } DispatchError::Io(err) })?; - self.flags.set(Flags::KEEPALIVE, self.codec.keepalive()); + this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); match body.size() { BodySize::None | BodySize::Empty => Ok(State::None), _ => Ok(State::SendPayload(body)), } } - fn send_continue(&mut self) { - self.write_buf + fn send_continue(self: Pin<&mut Self>) { + self.project() + .write_buf .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } + #[pin_project::project] fn poll_response( - &mut self, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { loop { - let state = match self.state { - State::None => match self.messages.pop_front() { + let mut this = self.as_mut().project(); + #[project] + let state = match this.state.project() { + State::None => match this.messages.pop_front() { Some(DispatcherMessage::Item(req)) => { - Some(self.handle_request(req, cx)?) - } - Some(DispatcherMessage::Error(res)) => { - Some(self.send_response(res, ResponseBody::Other(Body::Empty))?) + Some(self.as_mut().handle_request(req, cx)?) } + Some(DispatcherMessage::Error(res)) => Some( + self.as_mut() + .send_response(res, ResponseBody::Other(Body::Empty))?, + ), Some(DispatcherMessage::Upgrade(req)) => { return Ok(PollResponse::Upgrade(req)); } None => None, }, - State::ExpectCall(ref mut fut) => { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Ready(Ok(req)) => { - self.send_continue(); - self.state = State::ServiceCall(self.service.call(req)); - continue; - } - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - Some(self.send_response(res, body.into_body())?) - } - Poll::Pending => None, + State::ExpectCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(req)) => { + self.as_mut().send_continue(); + this = self.as_mut().project(); + this.state.set(State::ServiceCall(this.service.call(req))); + continue; } - } - State::ServiceCall(ref mut fut) => { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.state = self.send_response(res, body)?; - continue; - } - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); - Some(self.send_response(res, body.into_body())?) - } - Poll::Pending => None, + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + Some(self.as_mut().send_response(res, body.into_body())?) } - } - State::SendPayload(ref mut stream) => { + Poll::Pending => None, + }, + State::ServiceCall(fut) => match fut.poll(cx) { + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); + let state = self.as_mut().send_response(res, body)?; + this = self.as_mut().project(); + this.state.set(state); + continue; + } + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + Some(self.as_mut().send_response(res, body.into_body())?) + } + Poll::Pending => None, + }, + State::SendPayload(mut stream) => { loop { - if self.write_buf.len() < HW_BUFFER_SIZE { - match stream.poll_next(cx) { + if this.write_buf.len() < HW_BUFFER_SIZE { + match stream.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { - self.codec.encode( + this.codec.encode( Message::Chunk(Some(item)), - &mut self.write_buf, + &mut this.write_buf, )?; continue; } Poll::Ready(None) => { - self.codec.encode( + this.codec.encode( Message::Chunk(None), - &mut self.write_buf, + &mut this.write_buf, )?; - self.state = State::None; + this = self.as_mut().project(); + this.state.set(State::None); } Poll::Ready(Some(Err(_))) => { return Err(DispatchError::Unknown) @@ -433,9 +453,11 @@ where } }; + this = self.as_mut().project(); + // set new state if let Some(state) = state { - self.state = state; + this.state.set(state); if !self.state.is_empty() { continue; } @@ -443,7 +465,7 @@ where // if read-backpressure is enabled and we consumed some data. // we may read more data and retry if self.state.is_call() { - if self.poll_request(cx)? { + if self.as_mut().poll_request(cx)? { continue; } } else if !self.messages.is_empty() { @@ -457,16 +479,16 @@ where } fn handle_request( - &mut self, + mut self: Pin<&mut Self>, req: Request, cx: &mut Context<'_>, ) -> Result, DispatchError> { // Handle `EXPECT: 100-Continue` header let req = if req.head().expect() { - let mut task = self.expect.call(req); + let mut task = self.as_mut().project().expect.call(req); match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { Poll::Ready(Ok(req)) => { - self.send_continue(); + self.as_mut().send_continue(); req } Poll::Pending => return Ok(State::ExpectCall(task)), @@ -482,7 +504,7 @@ where }; // Call service - let mut task = self.service.call(req); + let mut task = self.as_mut().project().service.call(req); match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); @@ -499,7 +521,7 @@ where /// Process one incoming requests pub(self) fn poll_request( - &mut self, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { // limit a mount of non processed requests @@ -508,24 +530,25 @@ where } let mut updated = false; + let mut this = self.as_mut().project(); loop { - match self.codec.decode(&mut self.read_buf) { + match this.codec.decode(&mut this.read_buf) { Ok(Some(msg)) => { updated = true; - self.flags.insert(Flags::STARTED); + this.flags.insert(Flags::STARTED); match msg { Message::Item(mut req) => { - let pl = self.codec.message_type(); - req.head_mut().peer_addr = self.peer_addr; + let pl = this.codec.message_type(); + req.head_mut().peer_addr = *this.peer_addr; // set on_connect data - if let Some(ref on_connect) = self.on_connect { + if let Some(ref on_connect) = this.on_connect { on_connect.set(&mut req.extensions_mut()); } - if pl == MessageType::Stream && self.upgrade.is_some() { - self.messages.push_back(DispatcherMessage::Upgrade(req)); + if pl == MessageType::Stream && this.upgrade.is_some() { + this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } if pl == MessageType::Payload || pl == MessageType::Stream { @@ -533,41 +556,43 @@ where let (req1, _) = req.replace_payload(crate::Payload::H1(pl)); req = req1; - self.payload = Some(ps); + *this.payload = Some(ps); } // handle request early - if self.state.is_empty() { - self.state = self.handle_request(req, cx)?; + if this.state.is_empty() { + let state = self.as_mut().handle_request(req, cx)?; + this = self.as_mut().project(); + this.state.set(state); } else { - self.messages.push_back(DispatcherMessage::Item(req)); + this.messages.push_back(DispatcherMessage::Item(req)); } } Message::Chunk(Some(chunk)) => { - if let Some(ref mut payload) = self.payload { + if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); } else { error!( "Internal server error: unexpected payload chunk" ); - self.flags.insert(Flags::READ_DISCONNECT); - self.messages.push_back(DispatcherMessage::Error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), )); - self.error = Some(DispatchError::InternalError); + *this.error = Some(DispatchError::InternalError); break; } } Message::Chunk(None) => { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.feed_eof(); } else { error!("Internal server error: unexpected eof"); - self.flags.insert(Flags::READ_DISCONNECT); - self.messages.push_back(DispatcherMessage::Error( + this.flags.insert(Flags::READ_DISCONNECT); + this.messages.push_back(DispatcherMessage::Error( Response::InternalServerError().finish().drop_body(), )); - self.error = Some(DispatchError::InternalError); + *this.error = Some(DispatchError::InternalError); break; } } @@ -575,44 +600,49 @@ where } Ok(None) => break, Err(ParseError::Io(e)) => { - self.client_disconnected(); - self.error = Some(DispatchError::Io(e)); + self.as_mut().client_disconnected(); + this = self.as_mut().project(); + *this.error = Some(DispatchError::Io(e)); break; } Err(e) => { - if let Some(mut payload) = self.payload.take() { + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } // Malformed requests should be responded with 400 - self.messages.push_back(DispatcherMessage::Error( + this.messages.push_back(DispatcherMessage::Error( Response::BadRequest().finish().drop_body(), )); - self.flags.insert(Flags::READ_DISCONNECT); - self.error = Some(e.into()); + this.flags.insert(Flags::READ_DISCONNECT); + *this.error = Some(e.into()); break; } } } - if updated && self.ka_timer.is_some() { - if let Some(expire) = self.codec.config().keep_alive_expire() { - self.ka_expire = expire; + if updated && this.ka_timer.is_some() { + if let Some(expire) = this.codec.config().keep_alive_expire() { + *this.ka_expire = expire; } } Ok(updated) } /// keep-alive timer - fn poll_keepalive(&mut self, cx: &mut Context<'_>) -> Result<(), DispatchError> { - if self.ka_timer.is_none() { + fn poll_keepalive( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + let mut this = self.as_mut().project(); + if this.ka_timer.is_none() { // shutdown timeout - if self.flags.contains(Flags::SHUTDOWN) { - if let Some(interval) = self.codec.config().client_disconnect_timer() { - self.ka_timer = Some(delay_until(interval)); + if this.flags.contains(Flags::SHUTDOWN) { + if let Some(interval) = this.codec.config().client_disconnect_timer() { + *this.ka_timer = Some(delay_until(interval)); } else { - self.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = self.payload.take() { + this.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } return Ok(()); @@ -622,55 +652,56 @@ where } } - match Pin::new(&mut self.ka_timer.as_mut().unwrap()).poll(cx) { + match Pin::new(&mut this.ka_timer.as_mut().unwrap()).poll(cx) { Poll::Ready(()) => { // if we get timeout during shutdown, drop connection - if self.flags.contains(Flags::SHUTDOWN) { + if this.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); - } else if self.ka_timer.as_mut().unwrap().deadline() >= self.ka_expire { + } else if this.ka_timer.as_mut().unwrap().deadline() >= *this.ka_expire { // check for any outstanding tasks - if self.state.is_empty() && self.write_buf.is_empty() { - if self.flags.contains(Flags::STARTED) { + if this.state.is_empty() && this.write_buf.is_empty() { + if this.flags.contains(Flags::STARTED) { trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); + this.flags.insert(Flags::SHUTDOWN); // start shutdown timer if let Some(deadline) = - self.codec.config().client_disconnect_timer() + this.codec.config().client_disconnect_timer() { - if let Some(mut timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = this.ka_timer.as_mut() { timer.reset(deadline); let _ = Pin::new(&mut timer).poll(cx); } } else { // no shutdown timeout, drop socket - self.flags.insert(Flags::WRITE_DISCONNECT); + this.flags.insert(Flags::WRITE_DISCONNECT); return Ok(()); } } else { // timeout on first request (slow request) return 408 - if !self.flags.contains(Flags::STARTED) { + if !this.flags.contains(Flags::STARTED) { trace!("Slow request timeout"); - let _ = self.send_response( + let _ = self.as_mut().send_response( Response::RequestTimeout().finish().drop_body(), ResponseBody::Other(Body::Empty), ); + this = self.as_mut().project(); } else { trace!("Keep-alive connection timeout"); } - self.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - self.state = State::None; + this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); + this.state.set(State::None); } } else if let Some(deadline) = - self.codec.config().keep_alive_expire() + this.codec.config().keep_alive_expire() { - if let Some(mut timer) = self.ka_timer.as_mut() { + if let Some(mut timer) = this.ka_timer.as_mut() { timer.reset(deadline); let _ = Pin::new(&mut timer).poll(cx); } } - } else if let Some(mut timer) = self.ka_timer.as_mut() { - timer.reset(self.ka_expire); + } else if let Some(mut timer) = this.ka_timer.as_mut() { + timer.reset(*this.ka_expire); let _ = Pin::new(&mut timer).poll(cx); } } @@ -681,20 +712,6 @@ where } } -impl Unpin for Dispatcher -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - S::Error: Into, - S::Response: Into>, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service), Response = ()>, - U::Error: fmt::Display, -{ -} - impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -709,22 +726,29 @@ where { type Output = Result<(), DispatchError>; + #[pin_project::project] #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().inner { - DispatcherState::Normal(ref mut inner) => { - inner.poll_keepalive(cx)?; + let this = self.as_mut().project(); + #[project] + match this.inner.project() { + DispatcherState::Normal(mut inner) => { + inner.as_mut().poll_keepalive(cx)?; if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { // flush buffer - inner.poll_flush(cx)?; - if !inner.write_buf.is_empty() { + inner.as_mut().poll_flush(cx)?; + if !inner.write_buf.is_empty() || inner.io.is_none() { Poll::Pending } else { - match Pin::new(&mut inner.io).poll_shutdown(cx) { + match Pin::new(inner.project().io) + .as_pin_mut() + .unwrap() + .poll_shutdown(cx) + { Poll::Ready(res) => { Poll::Ready(res.map_err(DispatchError::from)) } @@ -736,53 +760,61 @@ where // read socket into a buf let should_disconnect = if !inner.flags.contains(Flags::READ_DISCONNECT) { - read_available(cx, &mut inner.io, &mut inner.read_buf)? + let mut inner_p = inner.as_mut().project(); + read_available( + cx, + inner_p.io.as_mut().unwrap(), + &mut inner_p.read_buf, + )? } else { None }; - inner.poll_request(cx)?; + inner.as_mut().poll_request(cx)?; if let Some(true) = should_disconnect { - inner.flags.insert(Flags::READ_DISCONNECT); - if let Some(mut payload) = inner.payload.take() { + let inner_p = inner.as_mut().project(); + inner_p.flags.insert(Flags::READ_DISCONNECT); + if let Some(mut payload) = inner_p.payload.take() { payload.feed_eof(); } }; loop { + let inner_p = inner.as_mut().project(); let remaining = - inner.write_buf.capacity() - inner.write_buf.len(); + inner_p.write_buf.capacity() - inner_p.write_buf.len(); if remaining < LW_BUFFER_SIZE { - inner.write_buf.reserve(HW_BUFFER_SIZE - remaining); + inner_p.write_buf.reserve(HW_BUFFER_SIZE - remaining); } - let result = inner.poll_response(cx)?; + let result = inner.as_mut().poll_response(cx)?; let drain = result == PollResponse::DrainWriteBuf; // switch to upgrade handler if let PollResponse::Upgrade(req) = result { - if let DispatcherState::Normal(inner) = - std::mem::replace(&mut self.inner, DispatcherState::None) - { - let mut parts = FramedParts::with_read_buf( - inner.io, - inner.codec, - inner.read_buf, - ); - parts.write_buf = inner.write_buf; - let framed = Framed::from_parts(parts); - self.inner = DispatcherState::Upgrade( - inner.upgrade.unwrap().call((req, framed)), - ); - return self.poll(cx); - } else { - panic!() - } + let inner_p = inner.as_mut().project(); + let mut parts = FramedParts::with_read_buf( + inner_p.io.take().unwrap(), + std::mem::replace(inner_p.codec, Codec::default()), + std::mem::replace(inner_p.read_buf, BytesMut::default()), + ); + parts.write_buf = std::mem::replace( + inner_p.write_buf, + BytesMut::default(), + ); + let framed = Framed::from_parts(parts); + let upgrade = + inner_p.upgrade.take().unwrap().call((req, framed)); + self.as_mut() + .project() + .inner + .set(DispatcherState::Upgrade(upgrade)); + return self.poll(cx); } // we didnt get WouldBlock from write operation, // so data get written to kernel completely (OSX) // and we have to write again otherwise response can get stuck - if inner.poll_flush(cx)? || !drain { + if inner.as_mut().poll_flush(cx)? || !drain { break; } } @@ -794,25 +826,26 @@ where let is_empty = inner.state.is_empty(); + let inner_p = inner.as_mut().project(); // read half is closed and we do not processing any responses - if inner.flags.contains(Flags::READ_DISCONNECT) && is_empty { - inner.flags.insert(Flags::SHUTDOWN); + if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { + inner_p.flags.insert(Flags::SHUTDOWN); } // keep-alive and stream errors - if is_empty && inner.write_buf.is_empty() { - if let Some(err) = inner.error.take() { + if is_empty && inner_p.write_buf.is_empty() { + if let Some(err) = inner_p.error.take() { Poll::Ready(Err(err)) } // disconnect if keep-alive is not enabled - else if inner.flags.contains(Flags::STARTED) - && !inner.flags.intersects(Flags::KEEPALIVE) + else if inner_p.flags.contains(Flags::STARTED) + && !inner_p.flags.intersects(Flags::KEEPALIVE) { - inner.flags.insert(Flags::SHUTDOWN); + inner_p.flags.insert(Flags::SHUTDOWN); self.poll(cx) } // disconnect if shutdown - else if inner.flags.contains(Flags::SHUTDOWN) { + else if inner_p.flags.contains(Flags::SHUTDOWN) { self.poll(cx) } else { Poll::Pending @@ -822,13 +855,10 @@ where } } } - DispatcherState::Upgrade(ref mut fut) => { - unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| { - error!("Upgrade handler error: {}", e); - DispatchError::Upgrade - }) - } - DispatcherState::None => panic!(), + DispatcherState::Upgrade(fut) => fut.poll(cx).map_err(|e| { + error!("Upgrade handler error: {}", e); + DispatchError::Upgrade + }), } } } @@ -918,9 +948,12 @@ mod tests { Poll::Ready(res) => assert!(res.is_err()), } - if let DispatcherState::Normal(ref inner) = h1.inner { + if let DispatcherState::Normal(ref mut inner) = h1.inner { assert!(inner.flags.contains(Flags::READ_DISCONNECT)); - assert_eq!(&inner.io.write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); + assert_eq!( + &inner.io.take().unwrap().write_buf[..26], + b"HTTP/1.1 400 Bad Request\r\n" + ); } }) .await; diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 9ba4aa053..c44925c7a 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -13,6 +13,7 @@ use crate::response::Response; #[pin_project::pin_project] pub struct SendResponse { res: Option, BodySize)>>, + #[pin] body: Option>, framed: Option>, } @@ -35,24 +36,27 @@ where impl Future for SendResponse where T: AsyncRead + AsyncWrite, - B: MessageBody, + B: MessageBody + Unpin, { type Output = Result, Error>; + // TODO: rethink if we need loops in polls fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let mut this = self.project(); + let mut body_done = this.body.is_none(); loop { - let mut body_ready = this.body.is_some(); + let mut body_ready = !body_done; let framed = this.framed.as_mut().unwrap(); // send body - if this.res.is_none() && this.body.is_some() { - while body_ready && this.body.is_some() && !framed.is_write_buf_full() { - match this.body.as_mut().unwrap().poll_next(cx)? { + if this.res.is_none() && body_ready { + while body_ready && !body_done && !framed.is_write_buf_full() { + match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { Poll::Ready(item) => { - // body is done - if item.is_none() { + // body is done when item is None + body_done = item.is_none(); + if body_done { let _ = this.body.take(); } framed.write(Message::Chunk(item))?; @@ -82,7 +86,7 @@ where continue; } - if this.body.is_some() { + if !body_done { if body_ready { continue; } else { diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 8b17e9479..b07764a03 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -168,7 +168,7 @@ struct ServiceResponse { #[pin_project::pin_project] enum ServiceResponseState { ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, ResponseBody), + SendPayload(SendStream, #[pin] ResponseBody), } impl ServiceResponse @@ -255,63 +255,60 @@ where #[project] match this.state.project() { - ServiceResponseState::ServiceCall(call, send) => { - match call.poll(cx) { - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); + ServiceResponseState::ServiceCall(call, send) => match call.poll(cx) { + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); + let mut send = send.take().unwrap(); + let mut size = body.size(); + let h2_res = self.as_mut().prepare_response(res.head(), &mut size); + this = self.as_mut().project(); - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending h2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state.set(ServiceResponseState::SendPayload(stream, body)); - self.poll(cx) + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { + trace!("Error sending h2 response: {:?}", e); + return Poll::Ready(()); } - } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - let (res, body) = res.replace_body(()); + Ok(stream) => stream, + }; - let mut send = send.take().unwrap(); - let mut size = body.size(); - let h2_res = - self.as_mut().prepare_response(res.head(), &mut size); - this = self.as_mut().project(); - - let stream = match send.send_response(h2_res, size.is_eof()) { - Err(e) => { - trace!("Error sending h2 response: {:?}", e); - return Poll::Ready(()); - } - Ok(stream) => stream, - }; - - if size.is_eof() { - Poll::Ready(()) - } else { - this.state.set(ServiceResponseState::SendPayload( - stream, - body.into_body(), - )); - self.poll(cx) - } + if size.is_eof() { + Poll::Ready(()) + } else { + this.state + .set(ServiceResponseState::SendPayload(stream, body)); + self.poll(cx) } } - } + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + + let mut send = send.take().unwrap(); + let mut size = body.size(); + let h2_res = self.as_mut().prepare_response(res.head(), &mut size); + this = self.as_mut().project(); + + let stream = match send.send_response(h2_res, size.is_eof()) { + Err(e) => { + trace!("Error sending h2 response: {:?}", e); + return Poll::Ready(()); + } + Ok(stream) => stream, + }; + + if size.is_eof() { + Poll::Ready(()) + } else { + this.state.set(ServiceResponseState::SendPayload( + stream, + body.into_body(), + )); + self.poll(cx) + } + } + }, ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop { loop { if let Some(ref mut buffer) = this.buffer { @@ -338,7 +335,7 @@ where } } } else { - match body.poll_next(cx) { + match body.as_mut().poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { if let Err(e) = stream.send_data(Bytes::new(), true) { diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index ff3f69faf..eef5dd02c 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -83,13 +83,11 @@ where Error = DispatchError, InitError = S::InitError, > { - pipeline_factory(fn_factory(|| { - async { - Ok::<_, S::InitError>(fn_service(|io: TcpStream| { - let peer_addr = io.peer_addr().ok(); - ok::<_, DispatchError>((io, peer_addr)) - })) - } + pipeline_factory(fn_factory(|| async { + Ok::<_, S::InitError>(fn_service(|io: TcpStream| { + let peer_addr = io.peer_addr().ok(); + ok::<_, DispatchError>((io, peer_addr)) + })) })) .and_then(self) } diff --git a/actix-http/src/header/common/accept_charset.rs b/actix-http/src/header/common/accept_charset.rs index 117e2015d..291ca53b6 100644 --- a/actix-http/src/header/common/accept_charset.rs +++ b/actix-http/src/header/common/accept_charset.rs @@ -63,7 +63,7 @@ header! { (AcceptCharset, ACCEPT_CHARSET) => (QualityItem)+ test_accept_charset { - /// Test case from RFC + // Test case from RFC test_header!(test1, vec![b"iso-8859-5, unicode-1-1;q=0.8"]); } } diff --git a/actix-http/src/header/common/content_disposition.rs b/actix-http/src/header/common/content_disposition.rs index d0d5af765..aa2e00ec0 100644 --- a/actix-http/src/header/common/content_disposition.rs +++ b/actix-http/src/header/common/content_disposition.rs @@ -423,7 +423,7 @@ impl ContentDisposition { /// Return the value of *name* if exists. pub fn get_name(&self) -> Option<&str> { - self.parameters.iter().filter_map(|p| p.as_name()).nth(0) + self.parameters.iter().filter_map(|p| p.as_name()).next() } /// Return the value of *filename* if exists. @@ -431,7 +431,7 @@ impl ContentDisposition { self.parameters .iter() .filter_map(|p| p.as_filename()) - .nth(0) + .next() } /// Return the value of *filename\** if exists. @@ -439,7 +439,7 @@ impl ContentDisposition { self.parameters .iter() .filter_map(|p| p.as_filename_ext()) - .nth(0) + .next() } /// Return the value of the parameter which the `name` matches. @@ -448,7 +448,7 @@ impl ContentDisposition { self.parameters .iter() .filter_map(|p| p.as_unknown(name)) - .nth(0) + .next() } /// Return the value of the extended parameter which the `name` matches. @@ -457,7 +457,7 @@ impl ContentDisposition { self.parameters .iter() .filter_map(|p| p.as_unknown_ext(name)) - .nth(0) + .next() } } diff --git a/actix-http/src/header/shared/httpdate.rs b/actix-http/src/header/shared/httpdate.rs index 1b52f0de4..81caf6d53 100644 --- a/actix-http/src/header/shared/httpdate.rs +++ b/actix-http/src/header/shared/httpdate.rs @@ -5,7 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use bytes::{buf::BufMutExt, BytesMut}; use http::header::{HeaderValue, InvalidHeaderValue}; -use time::{PrimitiveDateTime, OffsetDateTime, offset}; +use time::{offset, OffsetDateTime, PrimitiveDateTime}; use crate::error::ParseError; use crate::header::IntoHeaderValue; @@ -20,8 +20,8 @@ impl FromStr for HttpDate { fn from_str(s: &str) -> Result { match time_parser::parse_http_date(s) { - Some(t) => Ok(HttpDate(t.using_offset(offset!(UTC)))), - None => Err(ParseError::Header) + Some(t) => Ok(HttpDate(t.assume_utc())), + None => Err(ParseError::Header), } } } @@ -40,7 +40,7 @@ impl From for HttpDate { impl From for HttpDate { fn from(sys: SystemTime) -> HttpDate { - HttpDate(PrimitiveDateTime::from(sys).using_offset(offset!(UTC))) + HttpDate(PrimitiveDateTime::from(sys).assume_utc()) } } @@ -49,7 +49,14 @@ impl IntoHeaderValue for HttpDate { fn try_into(self) -> Result { let mut wrt = BytesMut::with_capacity(29).writer(); - write!(wrt, "{}", self.0.to_offset(offset!(UTC)).format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); + write!( + wrt, + "{}", + self.0 + .to_offset(offset!(UTC)) + .format("%a, %d %b %Y %H:%M:%S GMT") + ) + .unwrap(); HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze()) } } @@ -66,14 +73,13 @@ impl From for SystemTime { #[cfg(test)] mod tests { use super::HttpDate; - use time::{PrimitiveDateTime, date, time, offset}; + use time::{date, time, PrimitiveDateTime}; #[test] fn test_date() { - let nov_07 = HttpDate(PrimitiveDateTime::new( - date!(1994-11-07), - time!(8:48:37) - ).using_offset(offset!(UTC))); + let nov_07 = HttpDate( + PrimitiveDateTime::new(date!(1994 - 11 - 07), time!(8:48:37)).assume_utc(), + ); assert_eq!( "Sun, 07 Nov 1994 08:48:37 GMT".parse::().unwrap(), diff --git a/actix-http/src/helpers.rs b/actix-http/src/helpers.rs index 58ebff61f..86f8250b6 100644 --- a/actix-http/src/helpers.rs +++ b/actix-http/src/helpers.rs @@ -1,4 +1,4 @@ -use std::{io, mem, ptr, slice}; +use std::{io, ptr}; use bytes::{BufMut, BytesMut}; use http::Version; @@ -14,9 +14,7 @@ const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\ pub(crate) const STATUS_LINE_BUF_SIZE: usize = 13; pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesMut) { - let mut buf: [u8; STATUS_LINE_BUF_SIZE] = [ - b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' ', - ]; + let mut buf: [u8; STATUS_LINE_BUF_SIZE] = *b"HTTP/1.1 "; match version { Version::HTTP_2 => buf[5] = b'2', Version::HTTP_10 => buf[7] = b'0', @@ -64,109 +62,104 @@ pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesM } } +const DIGITS_START: u8 = b'0'; + /// NOTE: bytes object has to contain enough space -pub fn write_content_length(mut n: usize, bytes: &mut BytesMut) { +pub fn write_content_length(n: usize, bytes: &mut BytesMut) { + bytes.put_slice(b"\r\ncontent-length: "); + if n < 10 { - let mut buf: [u8; 21] = [ - b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', b'e', - b'n', b'g', b't', b'h', b':', b' ', b'0', b'\r', b'\n', - ]; - buf[18] = (n as u8) + b'0'; - bytes.put_slice(&buf); + bytes.put_u8(DIGITS_START + (n as u8)); } else if n < 100 { - let mut buf: [u8; 22] = [ - b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', b'e', - b'n', b'g', b't', b'h', b':', b' ', b'0', b'0', b'\r', b'\n', - ]; - let d1 = n << 1; - unsafe { - ptr::copy_nonoverlapping( - DEC_DIGITS_LUT.as_ptr().add(d1), - buf.as_mut_ptr().offset(18), - 2, - ); - } - bytes.put_slice(&buf); + let n = n as u8; + + let d10 = n / 10; + let d1 = n % 10; + + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); } else if n < 1000 { - let mut buf: [u8; 23] = [ - b'\r', b'\n', b'c', b'o', b'n', b't', b'e', b'n', b't', b'-', b'l', b'e', - b'n', b'g', b't', b'h', b':', b' ', b'0', b'0', b'0', b'\r', b'\n', - ]; - // decode 2 more chars, if > 2 chars - let d1 = (n % 100) << 1; - n /= 100; - unsafe { - ptr::copy_nonoverlapping( - DEC_DIGITS_LUT.as_ptr().add(d1), - buf.as_mut_ptr().offset(19), - 2, - ) - }; + let n = n as u16; - // decode last 1 - buf[18] = (n as u8) + b'0'; + let d100 = (n / 100) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; - bytes.put_slice(&buf); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 10_000 { + let n = n as u16; + + let d1000 = (n / 1000) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 100_000 { + let n = n as u32; + + let d10000 = (n / 10000) as u8; + let d1000 = ((n / 1000) % 10) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d10000); + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); + } else if n < 1_000_000 { + let n = n as u32; + + let d100000 = (n / 100_000) as u8; + let d10000 = ((n / 10000) % 10) as u8; + let d1000 = ((n / 1000) % 10) as u8; + let d100 = ((n / 100) % 10) as u8; + let d10 = ((n / 10) % 10) as u8; + let d1 = (n % 10) as u8; + + bytes.put_u8(DIGITS_START + d100000); + bytes.put_u8(DIGITS_START + d10000); + bytes.put_u8(DIGITS_START + d1000); + bytes.put_u8(DIGITS_START + d100); + bytes.put_u8(DIGITS_START + d10); + bytes.put_u8(DIGITS_START + d1); } else { - bytes.put_slice(b"\r\ncontent-length: "); - convert_usize(n, bytes); + write_usize(n, bytes); } + + bytes.put_slice(b"\r\n"); } -pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) { - let mut curr: isize = 39; - let mut buf: [u8; 41] = unsafe { mem::MaybeUninit::uninit().assume_init() }; - buf[39] = b'\r'; - buf[40] = b'\n'; - let buf_ptr = buf.as_mut_ptr(); - let lut_ptr = DEC_DIGITS_LUT.as_ptr(); +pub(crate) fn write_usize(n: usize, bytes: &mut BytesMut) { + let mut n = n; - // eagerly decode 4 characters at a time - while n >= 10_000 { - let rem = (n % 10_000) as isize; - n /= 10_000; + // 20 chars is max length of a usize (2^64) + // digits will be added to the buffer from lsd to msd + let mut buf = BytesMut::with_capacity(20); - let d1 = (rem / 100) << 1; - let d2 = (rem % 100) << 1; - curr -= 4; - unsafe { - ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); - ptr::copy_nonoverlapping(lut_ptr.offset(d2), buf_ptr.offset(curr + 2), 2); - } + while n > 9 { + // "pop" the least-significant digit + let lsd = (n % 10) as u8; + + // remove the lsd from n + n /= 10; + + buf.put_u8(DIGITS_START + lsd); } - // if we reach here numbers are <= 9999, so at most 4 chars long - let mut n = n as isize; // possibly reduce 64bit math + // put msd to result buffer + bytes.put_u8(DIGITS_START + (n as u8)); - // decode 2 more chars, if > 2 chars - if n >= 100 { - let d1 = (n % 100) << 1; - n /= 100; - curr -= 2; - unsafe { - ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); - } - } - - // decode last 1 or 2 chars - if n < 10 { - curr -= 1; - unsafe { - *buf_ptr.offset(curr) = (n as u8) + b'0'; - } - } else { - let d1 = n << 1; - curr -= 2; - unsafe { - ptr::copy_nonoverlapping(lut_ptr.offset(d1), buf_ptr.offset(curr), 2); - } - } - - unsafe { - bytes.extend_from_slice(slice::from_raw_parts( - buf_ptr.offset(curr), - 41 - curr as usize, - )); + // put, in reverse (msd to lsd), remaining digits to buffer + for i in (0..buf.len()).rev() { + bytes.put_u8(buf[i]); } } @@ -231,5 +224,48 @@ mod tests { bytes.reserve(50); write_content_length(5909, &mut bytes); assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 5909\r\n"[..]); + bytes.reserve(50); + write_content_length(9999, &mut bytes); + assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 9999\r\n"[..]); + bytes.reserve(50); + write_content_length(10001, &mut bytes); + assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 10001\r\n"[..]); + bytes.reserve(50); + write_content_length(59094, &mut bytes); + assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 59094\r\n"[..]); + bytes.reserve(50); + write_content_length(99999, &mut bytes); + assert_eq!(bytes.split().freeze(), b"\r\ncontent-length: 99999\r\n"[..]); + + bytes.reserve(50); + write_content_length(590947, &mut bytes); + assert_eq!( + bytes.split().freeze(), + b"\r\ncontent-length: 590947\r\n"[..] + ); + bytes.reserve(50); + write_content_length(999999, &mut bytes); + assert_eq!( + bytes.split().freeze(), + b"\r\ncontent-length: 999999\r\n"[..] + ); + bytes.reserve(50); + write_content_length(5909471, &mut bytes); + assert_eq!( + bytes.split().freeze(), + b"\r\ncontent-length: 5909471\r\n"[..] + ); + bytes.reserve(50); + write_content_length(59094718, &mut bytes); + assert_eq!( + bytes.split().freeze(), + b"\r\ncontent-length: 59094718\r\n"[..] + ); + bytes.reserve(50); + write_content_length(4294973728, &mut bytes); + assert_eq!( + bytes.split().freeze(), + b"\r\ncontent-length: 4294973728\r\n"[..] + ); } } diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index a5ae4b447..9f615a129 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -1,5 +1,5 @@ //! Basic http primitives for actix-net framework. -#![deny(rust_2018_idioms, warnings)] +#![warn(rust_2018_idioms, warnings)] #![allow( clippy::type_complexity, clippy::too_many_arguments, @@ -10,6 +10,9 @@ #[macro_use] extern crate log; +#[macro_use] +mod macros; + pub mod body; mod builder; pub mod client; diff --git a/actix-http/src/macros.rs b/actix-http/src/macros.rs new file mode 100644 index 000000000..b970b14f2 --- /dev/null +++ b/actix-http/src/macros.rs @@ -0,0 +1,95 @@ +#[macro_export] +macro_rules! downcast_get_type_id { + () => { + /// A helper method to get the type ID of the type + /// this trait is implemented on. + /// This method is unsafe to *implement*, since `downcast_ref` relies + /// on the returned `TypeId` to perform a cast. + /// + /// Unfortunately, Rust has no notion of a trait method that is + /// unsafe to implement (marking it as `unsafe` makes it unsafe + /// to *call*). As a workaround, we require this method + /// to return a private type along with the `TypeId`. This + /// private type (`PrivateHelper`) has a private constructor, + /// making it impossible for safe code to construct outside of + /// this module. This ensures that safe code cannot violate + /// type-safety by implementing this method. + #[doc(hidden)] + fn __private_get_type_id__(&self) -> (std::any::TypeId, PrivateHelper) + where + Self: 'static, + { + (std::any::TypeId::of::(), PrivateHelper(())) + } + }; +} + +//Generate implementation for dyn $name +#[macro_export] +macro_rules! downcast { + ($name:ident) => { + /// A struct with a private constructor, for use with + /// `__private_get_type_id__`. Its single field is private, + /// ensuring that it can only be constructed from this module + #[doc(hidden)] + pub struct PrivateHelper(()); + + impl dyn $name + 'static { + /// Downcasts generic body to a specific type. + pub fn downcast_ref(&self) -> Option<&T> { + if self.__private_get_type_id__().0 == std::any::TypeId::of::() { + // Safety: external crates cannot override the default + // implementation of `__private_get_type_id__`, since + // it requires returning a private type. We can therefore + // rely on the returned `TypeId`, which ensures that this + // case is correct. + unsafe { Some(&*(self as *const dyn $name as *const T)) } + } else { + None + } + } + /// Downcasts a generic body to a mutable specific type. + pub fn downcast_mut(&mut self) -> Option<&mut T> { + if self.__private_get_type_id__().0 == std::any::TypeId::of::() { + // Safety: external crates cannot override the default + // implementation of `__private_get_type_id__`, since + // it requires returning a private type. We can therefore + // rely on the returned `TypeId`, which ensures that this + // case is correct. + unsafe { + Some(&mut *(self as *const dyn $name as *const T as *mut T)) + } + } else { + None + } + } + } + }; +} + +#[cfg(test)] +mod tests { + + trait MB { + downcast_get_type_id!(); + } + + downcast!(MB); + + impl MB for String {} + impl MB for () {} + + #[actix_rt::test] + async fn test_any_casting() { + let mut body = String::from("hello cast"); + let resp_body: &mut dyn MB = &mut body; + let body = resp_body.downcast_ref::().unwrap(); + assert_eq!(body, "hello cast"); + let body = &mut resp_body.downcast_mut::().unwrap(); + body.push_str("!"); + let body = resp_body.downcast_ref::().unwrap(); + assert_eq!(body, "hello cast!"); + let not_body = resp_body.downcast_ref::<()>(); + assert!(not_body.is_none()); + } +} diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index fcdcd7cdf..7a9b82df2 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -9,7 +9,6 @@ use std::{fmt, str}; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use serde::Serialize; -use serde_json; use crate::body::{Body, BodyStream, MessageBody, ResponseBody}; use crate::cookie::{Cookie, CookieJar}; @@ -637,7 +636,7 @@ impl ResponseBuilder { /// `ResponseBuilder` can not be used after this call. pub fn streaming(&mut self, stream: S) -> Response where - S: Stream> + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { self.body(Body::from_message(BodyStream::new(stream))) diff --git a/actix-http/src/time_parser.rs b/actix-http/src/time_parser.rs index f6623d24e..b5b07ccba 100644 --- a/actix-http/src/time_parser.rs +++ b/actix-http/src/time_parser.rs @@ -1,4 +1,4 @@ -use time::{PrimitiveDateTime, Date}; +use time::{Date, OffsetDateTime, PrimitiveDateTime}; /// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime. pub fn parse_http_date(time: &str) -> Option { @@ -19,7 +19,7 @@ fn try_parse_rfc_850(time: &str) -> Option { // If the `time` string contains a two-digit year, then as per RFC 2616 ยง 19.3, // we consider the year as part of this century if it's within the next 50 years, // otherwise we consider as part of the previous century. - let now = PrimitiveDateTime::now(); + let now = OffsetDateTime::now(); let century_start_year = (now.year() / 100) * 100; let mut expanded_year = century_start_year + dt.year(); @@ -29,10 +29,10 @@ fn try_parse_rfc_850(time: &str) -> Option { match Date::try_from_ymd(expanded_year, dt.month(), dt.day()) { Ok(date) => Some(PrimitiveDateTime::new(date, dt.time())), - Err(_) => None + Err(_) => None, } } - Err(_) => None + Err(_) => None, } } diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index 3c70eb2bd..8f7004f18 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -2,7 +2,6 @@ use std::convert::TryFrom; use bytes::{Buf, BufMut, BytesMut}; use log::debug; -use rand; use crate::ws::mask::apply_mask; use crate::ws::proto::{CloseCode, CloseReason, OpCode}; diff --git a/actix-http/src/ws/proto.rs b/actix-http/src/ws/proto.rs index 60af6f08b..7b55cbf1a 100644 --- a/actix-http/src/ws/proto.rs +++ b/actix-http/src/ws/proto.rs @@ -1,5 +1,3 @@ -use base64; -use sha1; use std::convert::{From, Into}; use std::fmt; diff --git a/actix-http/tests/test_openssl.rs b/actix-http/tests/test_openssl.rs index b25f05272..77caa045b 100644 --- a/actix-http/tests/test_openssl.rs +++ b/actix-http/tests/test_openssl.rs @@ -97,11 +97,9 @@ async fn test_h2_body() -> io::Result<()> { let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let mut srv = test_server(move || { HttpService::build() - .h2(|mut req: Request<_>| { - async move { - let body = load_body(req.take_payload()).await?; - Ok::<_, Error>(Response::Ok().body(body)) - } + .h2(|mut req: Request<_>| async move { + let body = load_body(req.take_payload()).await?; + Ok::<_, Error>(Response::Ok().body(body)) }) .openssl(ssl_acceptor()) .map_err(|_| ()) diff --git a/actix-http/tests/test_rustls.rs b/actix-http/tests/test_rustls.rs index bc0c91cc3..933a6c894 100644 --- a/actix-http/tests/test_rustls.rs +++ b/actix-http/tests/test_rustls.rs @@ -104,11 +104,9 @@ async fn test_h2_body1() -> io::Result<()> { let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let mut srv = test_server(move || { HttpService::build() - .h2(|mut req: Request<_>| { - async move { - let body = load_body(req.take_payload()).await?; - Ok::<_, Error>(Response::Ok().body(body)) - } + .h2(|mut req: Request<_>| async move { + let body = load_body(req.take_payload()).await?; + Ok::<_, Error>(Response::Ok().body(body)) }) .rustls(ssl_acceptor()) }); diff --git a/actix-identity/Cargo.toml b/actix-identity/Cargo.toml index efeb24bda..f97b66291 100644 --- a/actix-identity/Cargo.toml +++ b/actix-identity/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] actix-web = { version = "2.0.0", default-features = false, features = ["secure-cookies"] } -actix-service = "1.0.2" +actix-service = "1.0.5" futures = "0.3.1" serde = "1.0" serde_json = "1.0" @@ -26,4 +26,4 @@ time = { version = "0.2.5", default-features = false, features = ["std"] } [dev-dependencies] actix-rt = "1.0.0" actix-http = "1.0.1" -bytes = "0.5.3" +bytes = "0.5.4" diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index d73a69393..ed5c8ad3f 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -4,6 +4,8 @@ * Remove the unused `time` dependency +* Fix missing `std::error::Error` implement for `MultipartError`. + ## [0.2.0] - 2019-12-20 * Release diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index f9cd7cfd2..a1d91c6a4 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -16,7 +16,7 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = { version = "2.0.0-rc", default-features = false } +actix-web = { version = "2.0.0", default-features = false } actix-service = "1.0.1" actix-utils = "1.0.3" bytes = "0.5.3" @@ -29,4 +29,4 @@ twoway = "0.2" [dev-dependencies] actix-rt = "1.0.0" -actix-http = "1.0.0" +actix-http = "2.0.0-alpha.1" diff --git a/actix-multipart/src/error.rs b/actix-multipart/src/error.rs index 6677f69c7..cdbb5d395 100644 --- a/actix-multipart/src/error.rs +++ b/actix-multipart/src/error.rs @@ -33,6 +33,8 @@ pub enum MultipartError { NotConsumed, } +impl std::error::Error for MultipartError {} + /// Return `BadRequest` for `MultipartError` impl ResponseError for MultipartError { fn status_code(&self) -> StatusCode { diff --git a/actix-session/Cargo.toml b/actix-session/Cargo.toml index b279c9d89..b0a89ee29 100644 --- a/actix-session/Cargo.toml +++ b/actix-session/Cargo.toml @@ -22,9 +22,9 @@ default = ["cookie-session"] cookie-session = ["actix-web/secure-cookies"] [dependencies] -actix-web = "2.0.0-rc" -actix-service = "1.0.1" -bytes = "0.5.3" +actix-web = { version = "2.0.0" } +actix-service = "1.0.5" +bytes = "0.5.4" derive_more = "0.99.2" futures = "0.3.1" serde = "1.0" diff --git a/actix-web-actors/Cargo.toml b/actix-web-actors/Cargo.toml index 6f573e442..333edb8c0 100644 --- a/actix-web-actors/Cargo.toml +++ b/actix-web-actors/Cargo.toml @@ -16,9 +16,9 @@ name = "actix_web_actors" path = "src/lib.rs" [dependencies] -actix = "0.9.0" -actix-web = "2.0.0-rc" -actix-http = "1.0.1" +actix = "0.10.0-alpha.1" +actix-web = "2.0.0" +actix-http = "2.0.0-alpha.1" actix-codec = "0.2.0" bytes = "0.5.2" futures = "0.3.1" diff --git a/actix-web-codegen/CHANGES.md b/actix-web-codegen/CHANGES.md index 95696abd3..941cd36de 100644 --- a/actix-web-codegen/CHANGES.md +++ b/actix-web-codegen/CHANGES.md @@ -1,8 +1,12 @@ # Changes -## [0.2.NEXT] - 2020-xx-xx +## [0.2.1] - 2020-02-25 -* Allow the handler function to be named as `config` #1290 +* Add `#[allow(missing_docs)]` attribute to generated structs [#1368] +* Allow the handler function to be named as `config` [#1290] + +[#1368]: https://github.com/actix/actix-web/issues/1368 +[#1290]: https://github.com/actix/actix-web/issues/1290 ## [0.2.0] - 2019-12-13 diff --git a/actix-web-codegen/Cargo.toml b/actix-web-codegen/Cargo.toml index 3fe561deb..0b926b807 100644 --- a/actix-web-codegen/Cargo.toml +++ b/actix-web-codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web-codegen" -version = "0.2.0" +version = "0.2.1" description = "Actix web proc macros" readme = "README.md" authors = ["Nikolay Kim "] @@ -17,6 +17,6 @@ syn = { version = "^1", features = ["full", "parsing"] } proc-macro2 = "^1" [dev-dependencies] -actix-rt = { version = "1.0.0" } -actix-web = { version = "2.0.0-rc" } -futures = { version = "0.3.1" } +actix-rt = "1.0.0" +actix-web = "2.0.0" +futures = "0.3.1" diff --git a/actix-web-codegen/src/route.rs b/actix-web-codegen/src/route.rs index c0edcc109..341329ea2 100644 --- a/actix-web-codegen/src/route.rs +++ b/actix-web-codegen/src/route.rs @@ -189,7 +189,7 @@ impl ToTokens for Route { } = self; let resource_name = name.to_string(); let stream = quote! { - #[allow(non_camel_case_types)] + #[allow(non_camel_case_types, missing_docs)] pub struct #name; impl actix_web::dev::HttpServiceFactory for #name { diff --git a/awc/CHANGES.md b/awc/CHANGES.md index d9b26e453..d410ae514 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -4,7 +4,6 @@ * Fix compilation with default features off - ## [1.0.0] - 2019-12-13 * Release diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 67e0a3ee4..71b23ece3 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -36,7 +36,7 @@ compress = ["actix-http/compress"] [dependencies] actix-codec = "0.2.0" actix-service = "1.0.1" -actix-http = "1.0.0" +actix-http = "2.0.0-alpha.1" actix-rt = "1.0.0" base64 = "0.11" @@ -55,8 +55,8 @@ rust-tls = { version = "0.16.0", package="rustls", optional = true, features = [ [dev-dependencies] actix-connect = { version = "1.0.1", features=["openssl"] } -actix-web = { version = "2.0.0-rc", features=["openssl"] } -actix-http = { version = "1.0.1", features=["openssl"] } +actix-web = { version = "2.0.0", features=["openssl"] } +actix-http = { version = "2.0.0-alpha.1", features=["openssl"] } actix-http-test = { version = "1.0.0", features=["openssl"] } actix-utils = "1.0.3" actix-server = "1.0.0" diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 8944fe229..952a15369 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -1,4 +1,4 @@ -#![deny(rust_2018_idioms, warnings)] +#![warn(rust_2018_idioms, warnings)] #![allow( clippy::type_complexity, clippy::borrow_interior_mutable_const, diff --git a/src/lib.rs b/src/lib.rs index d51005cfe..d7cb45074 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -#![deny(rust_2018_idioms, warnings)] +#![warn(rust_2018_idioms, warnings)] #![allow( clippy::needless_doctest_main, clippy::type_complexity, @@ -7,6 +7,12 @@ //! Actix web is a small, pragmatic, and extremely fast web framework //! for Rust. //! +//! ## Example +//! +//! The `#[actix_rt::main]` macro in the example below is provided by the Actix runtime +//! crate, [`actix-rt`](https://crates.io/crates/actix-rt). You will need to include +//! `actix-rt` in your dependencies for it to run. +//! //! ```rust,no_run //! use actix_web::{web, App, Responder, HttpServer}; //! diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index d692132ce..e40fe648a 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -238,15 +238,20 @@ where } } +use pin_project::{pin_project, pinned_drop}; + +#[pin_project(PinnedDrop)] pub struct StreamLog { + #[pin] body: ResponseBody, format: Option, size: usize, time: OffsetDateTime, } -impl Drop for StreamLog { - fn drop(&mut self) { +#[pinned_drop] +impl PinnedDrop for StreamLog { + fn drop(self: Pin<&mut Self>) { if let Some(ref format) = self.format { let render = |fmt: &mut Formatter<'_>| { for unit in &format.0 { @@ -259,15 +264,17 @@ impl Drop for StreamLog { } } + impl MessageBody for StreamLog { fn size(&self) -> BodySize { self.body.size() } - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - match self.body.poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { + let this = self.project(); + match this.body.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { - self.size += chunk.len(); + *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } val => val, diff --git a/src/server.rs b/src/server.rs index 11cfbb6bc..97dd9f7f7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -443,8 +443,6 @@ where #[cfg(unix)] /// Start listening for unix domain connections on existing listener. - /// - /// This method is available with `uds` feature. pub fn listen_uds( mut self, lst: std::os::unix::net::UnixListener, @@ -483,8 +481,6 @@ where #[cfg(unix)] /// Start listening for incoming unix domain connections. - /// - /// This method is available with `uds` feature. pub fn bind_uds(mut self, addr: A) -> io::Result where A: AsRef, diff --git a/src/test.rs b/src/test.rs index 956980530..0eb02ff7c 100644 --- a/src/test.rs +++ b/src/test.rs @@ -150,7 +150,7 @@ where pub async fn read_response(app: &mut S, req: Request) -> Bytes where S: Service, Error = Error>, - B: MessageBody, + B: MessageBody + Unpin, { let mut resp = app .call(req) @@ -193,7 +193,7 @@ where /// ``` pub async fn read_body(mut res: ServiceResponse) -> Bytes where - B: MessageBody, + B: MessageBody + Unpin, { let mut body = res.take_body(); let mut bytes = BytesMut::new(); @@ -251,7 +251,7 @@ where pub async fn read_response_json(app: &mut S, req: Request) -> T where S: Service, Error = Error>, - B: MessageBody, + B: MessageBody + Unpin, T: DeserializeOwned, { let body = read_response(app, req).await; @@ -953,7 +953,6 @@ impl Drop for TestServer { #[cfg(test)] mod tests { use actix_http::httpmessage::HttpMessage; - use futures::FutureExt; use serde::{Deserialize, Serialize}; use std::time::SystemTime; @@ -1163,6 +1162,13 @@ mod tests { assert!(res.status().is_success()); } +/* + + Comment out until actix decoupled of actix-http: + https://github.com/actix/actix/issues/321 + + use futures::FutureExt; + #[actix_rt::test] async fn test_actor() { use actix::Actor; @@ -1183,7 +1189,6 @@ mod tests { } } - let addr = MyActor.start(); let mut app = init_service(App::new().service(web::resource("/index.html").to( move || { @@ -1205,4 +1210,5 @@ mod tests { let res = app.call(req).await.unwrap(); assert!(res.status().is_success()); } +*/ } diff --git a/src/web.rs b/src/web.rs index 962c1157b..f47cf865e 100644 --- a/src/web.rs +++ b/src/web.rs @@ -193,6 +193,24 @@ pub fn head() -> Route { method(Method::HEAD) } +/// Create *route* with `TRACE` method guard. +/// +/// ```rust +/// use actix_web::{web, App, HttpResponse}; +/// +/// let app = App::new().service( +/// web::resource("/{project_id}") +/// .route(web::trace().to(|| HttpResponse::Ok())) +/// ); +/// ``` +/// +/// In the above example, one `HEAD` route gets added: +/// * /{project_id} +/// +pub fn trace() -> Route { + method(Method::TRACE) +} + /// Create *route* and add method guard. /// /// ```rust diff --git a/test-server/CHANGES.md b/test-server/CHANGES.md index 617b8092f..96c010355 100644 --- a/test-server/CHANGES.md +++ b/test-server/CHANGES.md @@ -2,8 +2,7 @@ ## [Unreleased] - 2020-xx-xx -* Update the `time` dependency to 0.2.5 - +* Update the `time` dependency to 0.2.7 ## [1.0.0] - 2019-12-13 diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index b22414e29..117c1a318 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -37,7 +37,7 @@ actix-utils = "1.0.3" actix-rt = "1.0.0" actix-server = "1.0.0" actix-testing = "1.0.0" -awc = "1.0.0" +awc = "1.0.1" base64 = "0.11" bytes = "0.5.3" @@ -51,9 +51,9 @@ serde_json = "1.0" sha1 = "0.6" slab = "0.4" serde_urlencoded = "0.6.1" -time = { version = "0.2.5", default-features = false, features = ["std"] } +time = { version = "0.2.7", default-features = false, features = ["std"] } open-ssl = { version="0.10", package="openssl", optional = true } [dev-dependencies] -actix-web = "2.0.0-rc" -actix-http = "1.0.1" +actix-web = "2.0.0" +actix-http = "2.0.0-alpha.1" diff --git a/tests/test_weird_poll.rs b/tests/test_weird_poll.rs index 21d1d611a..571b69f45 100644 --- a/tests/test_weird_poll.rs +++ b/tests/test_weird_poll.rs @@ -5,6 +5,9 @@ use futures::stream::once; use actix_http::body::{MessageBody, BodyStream}; use bytes::Bytes; +/* +Disable weird poll until actix-web is based on actix-http 2.0.0 + #[test] fn weird_poll() { let (sender, receiver) = futures::channel::oneshot::channel(); @@ -24,3 +27,4 @@ fn weird_poll() { let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); } +*/