diff --git a/CHANGES.md b/CHANGES.md index 87c021b1e..ee9b9308d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2020-xx-xx +### Changed +* Bumped `rand` to `0.8` + ### Fixed * added the actual parsing error to `test::read_body_json` [#1812] diff --git a/Cargo.toml b/Cargo.toml index 31c4cca7e..6ed327f56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,7 +112,7 @@ tinyvec = { version = "1", features = ["alloc"] } [dev-dependencies] actix = "0.10.0" actix-http = { version = "2.1.0", features = ["actors"] } -rand = "0.7" +rand = "0.8" env_logger = "0.8" serde_derive = "1.0" brotli2 = "0.3.2" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index c602ab2e1..81577688d 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,7 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx - +### Changed +* Bumped `rand` to `0.8` ## 2.2.0 - 2020-11-25 ### Added diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 7375c6eb3..7cf344487 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -72,7 +72,7 @@ log = "0.4" mime = "0.3" percent-encoding = "2.1" pin-project = "1.0.0" -rand = "0.7" +rand = "0.8" regex = "1.3" serde = "1.0" serde_json = "1.0" diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 08abc6277..a8687dbeb 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -9,8 +9,9 @@ use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::{delay_for, Delay}; use actix_service::Service; -use actix_utils::{oneshot, task::LocalWaker}; +use actix_utils::task::LocalWaker; use bytes::Bytes; +use futures_channel::oneshot; use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture}; use fxhash::FxHashMap; use h2::client::{Connection, SendRequest}; diff --git a/actix-http/src/cloneable.rs b/actix-http/src/cloneable.rs index b64c299fc..0e77c455c 100644 --- a/actix-http/src/cloneable.rs +++ b/actix-http/src/cloneable.rs @@ -4,12 +4,12 @@ use std::task::{Context, Poll}; use actix_service::Service; -#[doc(hidden)] /// Service that allows to turn non-clone service to a service with `Clone` impl /// /// # Panics /// CloneableService might panic with some creative use of thread local storage. /// See https://github.com/actix/actix-web/issues/1295 for example +#[doc(hidden)] pub(crate) struct CloneableService(Rc>); impl CloneableService { diff --git a/actix-http/src/extensions.rs b/actix-http/src/extensions.rs index 7dda74731..b20dfe11d 100644 --- a/actix-http/src/extensions.rs +++ b/actix-http/src/extensions.rs @@ -3,8 +3,8 @@ use std::{fmt, mem}; use fxhash::FxHashMap; -#[derive(Default)] /// A type map of request extensions. +#[derive(Default)] pub struct Extensions { /// Use FxHasher with a std HashMap with for faster /// lookups on the small `TypeId` (u64 equivalent) keys. diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 036f16670..c9a62dc30 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -58,6 +58,7 @@ impl Codec { } else { Flags::empty() }; + Codec { config, flags, @@ -69,26 +70,26 @@ impl Codec { } } + /// Check if request is upgrade. #[inline] - /// Check if request is upgrade pub fn upgrade(&self) -> bool { self.ctype == ConnectionType::Upgrade } + /// Check if last response is keep-alive. #[inline] - /// Check if last response is keep-alive pub fn keepalive(&self) -> bool { self.ctype == ConnectionType::KeepAlive } + /// Check if keep-alive enabled on server level. #[inline] - /// Check if keep-alive enabled on server level pub fn keepalive_enabled(&self) -> bool { self.flags.contains(Flags::KEEPALIVE_ENABLED) } + /// Check last request's message type. #[inline] - /// Check last request's message type pub fn message_type(&self) -> MessageType { if self.flags.contains(Flags::STREAM) { MessageType::Stream diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index ace4144e3..1311a0987 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,8 +1,11 @@ -use std::collections::VecDeque; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, io, net}; +use std::{ + collections::VecDeque, + fmt, + future::Future, + io, mem, net, + pin::Pin, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use actix_rt::time::{delay_until, Delay, Instant}; @@ -59,6 +62,9 @@ where { #[pin] inner: DispatcherState, + + #[cfg(test)] + poll_count: u64, } #[pin_project(project = DispatcherStateProj)] @@ -247,6 +253,9 @@ where ka_expire, ka_timer, }), + + #[cfg(test)] + poll_count: 0, } } } @@ -511,12 +520,12 @@ where } } - /// Process one incoming requests + /// Process one incoming request. pub(self) fn poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - // limit a mount of non processed requests + // limit amount of non-processed requests if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { return Ok(false); } @@ -725,6 +734,12 @@ where #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().project(); + + #[cfg(test)] + { + *this.poll_count += 1; + } + match this.inner.project() { DispatcherStateProj::Normal(mut inner) => { inner.as_mut().poll_keepalive(cx)?; @@ -788,10 +803,10 @@ where let inner_p = inner.as_mut().project(); let mut parts = FramedParts::with_read_buf( inner_p.io.take().unwrap(), - std::mem::take(inner_p.codec), - std::mem::take(inner_p.read_buf), + mem::take(inner_p.codec), + mem::take(inner_p.read_buf), ); - parts.write_buf = std::mem::take(inner_p.write_buf); + parts.write_buf = mem::take(inner_p.write_buf); let framed = Framed::from_parts(parts); let upgrade = inner_p.upgrade.take().unwrap().call((req, framed)); @@ -803,8 +818,11 @@ where } // we didn't get WouldBlock from write operation, - // so data get written to kernel completely (OSX) + // so data get written to kernel completely (macOS) // and we have to write again otherwise response can get stuck + // + // TODO: what? is WouldBlock good or bad? + // want to find a reference for this macOS behavior if inner.as_mut().poll_flush(cx)? || !drain { break; } @@ -854,6 +872,11 @@ where } } +/// Returns either: +/// - `Ok(Some(true))` - data was read and done reading all data. +/// - `Ok(Some(false))` - data was read but there should be more to read. +/// - `Ok(None)` - no data was read but there should be more to read later. +/// - Unhandled Errors fn read_available( cx: &mut Context<'_>, io: &mut T, @@ -887,17 +910,17 @@ where read_some = true; } } - Poll::Ready(Err(e)) => { - return if e.kind() == io::ErrorKind::WouldBlock { + Poll::Ready(Err(err)) => { + return if err.kind() == io::ErrorKind::WouldBlock { if read_some { Ok(Some(false)) } else { Ok(None) } - } else if e.kind() == io::ErrorKind::ConnectionReset && read_some { + } else if err.kind() == io::ErrorKind::ConnectionReset && read_some { Ok(Some(true)) } else { - Err(e) + Err(err) } } } @@ -917,13 +940,64 @@ where #[cfg(test)] mod tests { - use actix_service::IntoService; - use futures_util::future::{lazy, ok}; + use std::{marker::PhantomData, str}; + + use actix_service::fn_service; + use futures_util::future::{lazy, ready}; use super::*; - use crate::error::Error; - use crate::h1::{ExpectHandler, UpgradeHandler}; use crate::test::TestBuffer; + use crate::{error::Error, KeepAlive}; + use crate::{ + h1::{ExpectHandler, UpgradeHandler}, + test::TestSeqBuffer, + }; + + fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { + haystack[from..] + .windows(needle.len()) + .position(|window| window == needle) + } + + fn stabilize_date_header(payload: &mut [u8]) { + let mut from = 0; + + while let Some(pos) = find_slice(&payload, b"date", from) { + payload[(from + pos)..(from + pos + 35)] + .copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC"); + from += 35; + } + } + + fn ok_service() -> impl Service + { + fn_service(|_req: Request| ready(Ok::<_, Error>(Response::Ok().finish()))) + } + + fn echo_path_service( + ) -> impl Service { + fn_service(|req: Request| { + let path = req.path().as_bytes(); + ready(Ok::<_, Error>(Response::Ok().body(Body::from_slice(path)))) + }) + } + + fn echo_payload_service( + ) -> impl Service { + fn_service(|mut req: Request| { + Box::pin(async move { + use futures_util::stream::StreamExt as _; + + let mut pl = req.take_payload(); + let mut body = BytesMut::new(); + while let Some(chunk) = pl.next().await { + body.extend_from_slice(chunk.unwrap().bytes()) + } + + Ok::<_, Error>(Response::Ok().body(body)) + }) + }) + } #[actix_rt::test] async fn test_req_parse_err() { @@ -933,9 +1007,7 @@ mod tests { let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( buf, ServiceConfig::default(), - CloneableService::new( - (|_| ok::<_, Error>(Response::Ok().finish())).into_service(), - ), + CloneableService::new(ok_service()), CloneableService::new(ExpectHandler), None, None, @@ -958,4 +1030,274 @@ mod tests { }) .await; } + + #[actix_rt::test] + async fn test_pipelining() { + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /abcd HTTP/1.1\r\n\r\n\ + GET /def HTTP/1.1\r\n\r\n\ + ", + ); + + let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + + let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf, + cfg, + CloneableService::new(echo_path_service()), + CloneableService::new(ExpectHandler), + None, + None, + Extensions::new(), + None, + ); + + assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + + match Pin::new(&mut h1).poll(cx) { + Poll::Pending => panic!("first poll should not be pending"), + Poll::Ready(res) => assert!(res.is_ok()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 2); + + if let DispatcherState::Normal(ref mut inner) = h1.inner { + let res = &mut inner.io.take().unwrap().write_buf[..]; + stabilize_date_header(res); + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + HTTP/1.1 200 OK\r\n\ + content-length: 4\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /def\ + "; + + assert_eq!(res.to_vec(), exp.to_vec()); + } + }) + .await; + + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /abcd HTTP/1.1\r\n\r\n\ + GET /def HTTP/1\r\n\r\n\ + ", + ); + + let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + + let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf, + cfg, + CloneableService::new(echo_path_service()), + CloneableService::new(ExpectHandler), + None, + None, + Extensions::new(), + None, + ); + + assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + + match Pin::new(&mut h1).poll(cx) { + Poll::Pending => panic!("first poll should not be pending"), + Poll::Ready(res) => assert!(res.is_err()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 1); + + if let DispatcherState::Normal(ref mut inner) = h1.inner { + let res = &mut inner.io.take().unwrap().write_buf[..]; + stabilize_date_header(res); + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /abcd\ + HTTP/1.1 400 Bad Request\r\n\ + content-length: 0\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + "; + + assert_eq!(res.to_vec(), exp.to_vec()); + } + }) + .await; + } + + #[actix_rt::test] + async fn test_expect() { + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new( + buf.clone(), + cfg, + CloneableService::new(echo_payload_service()), + CloneableService::new(ExpectHandler), + None, + None, + Extensions::new(), + None, + ); + + buf.extend_read_buf( + "\ + POST /upload HTTP/1.1\r\n\ + Content-Length: 5\r\n\ + Expect: 100-continue\r\n\ + \r\n\ + ", + ); + + assert!(Pin::new(&mut h1).poll(cx).is_pending()); + assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + + // polls: manual + assert_eq!(h1.poll_count, 1); + eprintln!("poll count: {}", h1.poll_count); + + if let DispatcherState::Normal(ref inner) = h1.inner { + let io = inner.io.as_ref().unwrap(); + let res = &io.write_buf()[..]; + assert_eq!( + str::from_utf8(res).unwrap(), + "HTTP/1.1 100 Continue\r\n\r\n" + ); + } + + buf.extend_read_buf("12345"); + assert!(Pin::new(&mut h1).poll(cx).is_ready()); + + // polls: manual manual shutdown + assert_eq!(h1.poll_count, 3); + + if let DispatcherState::Normal(ref inner) = h1.inner { + let io = inner.io.as_ref().unwrap(); + let mut res = (&io.write_buf()[..]).to_owned(); + stabilize_date_header(&mut res); + + assert_eq!( + str::from_utf8(&res).unwrap(), + "\ + HTTP/1.1 100 Continue\r\n\ + \r\n\ + HTTP/1.1 200 OK\r\n\ + content-length: 5\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ + \r\n\ + 12345\ + " + ); + } + }) + .await; + } + + #[actix_rt::test] + async fn test_eager_expect() { + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new( + buf.clone(), + cfg, + CloneableService::new(echo_path_service()), + CloneableService::new(ExpectHandler), + None, + None, + Extensions::new(), + None, + ); + + buf.extend_read_buf( + "\ + POST /upload HTTP/1.1\r\n\ + Content-Length: 5\r\n\ + Expect: 100-continue\r\n\ + \r\n\ + ", + ); + + assert!(Pin::new(&mut h1).poll(cx).is_ready()); + assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + + // polls: manual shutdown + assert_eq!(h1.poll_count, 2); + + if let DispatcherState::Normal(ref inner) = h1.inner { + let io = inner.io.as_ref().unwrap(); + let mut res = (&io.write_buf()[..]).to_owned(); + stabilize_date_header(&mut res); + + // Despite the content-length header and even though the request payload has not + // been sent, this test expects a complete service response since the payload + // is not used at all. The service passed to dispatcher is path echo and doesn't + // consume payload bytes. + assert_eq!( + str::from_utf8(&res).unwrap(), + "\ + HTTP/1.1 100 Continue\r\n\ + \r\n\ + HTTP/1.1 200 OK\r\n\ + content-length: 7\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ + \r\n\ + /upload\ + " + ); + } + }) + .await; + } + + #[actix_rt::test] + async fn test_upgrade() { + lazy(|cx| { + let mut buf = TestSeqBuffer::empty(); + let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new( + buf.clone(), + cfg, + CloneableService::new(ok_service()), + CloneableService::new(ExpectHandler), + Some(CloneableService::new(UpgradeHandler(PhantomData))), + None, + Extensions::new(), + None, + ); + + buf.extend_read_buf( + "\ + GET /ws HTTP/1.1\r\n\ + Connection: Upgrade\r\n\ + Upgrade: websocket\r\n\ + \r\n\ + ", + ); + + assert!(Pin::new(&mut h1).poll(cx).is_ready()); + assert!(matches!(&h1.inner, DispatcherState::Upgrade(_))); + + // polls: manual shutdown + assert_eq!(h1.poll_count, 2); + }) + .await; + } } diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs index 6c08df08e..b89c7ff74 100644 --- a/actix-http/src/h1/expect.rs +++ b/actix-http/src/h1/expect.rs @@ -1,7 +1,7 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, Ready}; +use futures_util::future::{ready, Ready}; use crate::error::Error; use crate::request::Request; @@ -17,8 +17,8 @@ impl ServiceFactory for ExpectHandler { type InitError = Error; type Future = Ready>; - fn new_service(&self, _: ()) -> Self::Future { - ok(ExpectHandler) + fn new_service(&self, _: Self::Config) -> Self::Future { + ready(Ok(ExpectHandler)) } } @@ -33,6 +33,8 @@ impl Service for ExpectHandler { } fn call(&mut self, req: Request) -> Self::Future { - ok(req) + ready(Ok(req)) + // TODO: add some way to trigger error + // Err(error::ErrorExpectationFailed("test")) } } diff --git a/actix-http/src/h1/upgrade.rs b/actix-http/src/h1/upgrade.rs index 22ba99e26..8615f27a8 100644 --- a/actix-http/src/h1/upgrade.rs +++ b/actix-http/src/h1/upgrade.rs @@ -3,13 +3,13 @@ use std::task::{Context, Poll}; use actix_codec::Framed; use actix_service::{Service, ServiceFactory}; -use futures_util::future::Ready; +use futures_util::future::{ready, Ready}; use crate::error::Error; use crate::h1::Codec; use crate::request::Request; -pub struct UpgradeHandler(PhantomData); +pub struct UpgradeHandler(pub(crate) PhantomData); impl ServiceFactory for UpgradeHandler { type Config = (); @@ -36,6 +36,6 @@ impl Service for UpgradeHandler { } fn call(&mut self, _: Self::Request) -> Self::Future { - unimplemented!() + ready(Ok(())) } } diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index b79f5a73c..4512e72c2 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -1,9 +1,14 @@ -//! Test Various helpers for Actix applications to use during testing. -use std::convert::TryFrom; -use std::io::{self, Read, Write}; -use std::pin::Pin; -use std::str::FromStr; -use std::task::{Context, Poll}; +//! Various testing helpers for use in internal and app tests. + +use std::{ + cell::{Ref, RefCell}, + convert::TryFrom, + io::{self, Read, Write}, + pin::Pin, + rc::Rc, + str::FromStr, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::{Bytes, BytesMut}; @@ -183,7 +188,7 @@ fn parts(parts: &mut Option) -> &mut Inner { parts.as_mut().expect("cannot reuse test request builder") } -/// Async io buffer +/// Async I/O test buffer. pub struct TestBuffer { pub read_buf: BytesMut, pub write_buf: BytesMut, @@ -191,24 +196,24 @@ pub struct TestBuffer { } impl TestBuffer { - /// Create new TestBuffer instance - pub fn new(data: T) -> TestBuffer + /// Create new `TestBuffer` instance with initial read buffer. + pub fn new(data: T) -> Self where - BytesMut: From, + T: Into, { - TestBuffer { - read_buf: BytesMut::from(data), + Self { + read_buf: data.into(), write_buf: BytesMut::new(), err: None, } } - /// Create new empty TestBuffer instance - pub fn empty() -> TestBuffer { - TestBuffer::new("") + /// Create new empty `TestBuffer` instance. + pub fn empty() -> Self { + Self::new("") } - /// Add extra data to read buffer. + /// Add data to read buffer. pub fn extend_read_buf>(&mut self, data: T) { self.read_buf.extend_from_slice(data.as_ref()) } @@ -236,6 +241,7 @@ impl io::Write for TestBuffer { self.write_buf.extend(buf); Ok(buf.len()) } + fn flush(&mut self) -> io::Result<()> { Ok(()) } @@ -268,3 +274,113 @@ impl AsyncWrite for TestBuffer { Poll::Ready(Ok(())) } } + +/// Async I/O test buffer with ability to incrementally add to the read buffer. +#[derive(Clone)] +pub struct TestSeqBuffer(Rc>); + +impl TestSeqBuffer { + /// Create new `TestBuffer` instance with initial read buffer. + pub fn new(data: T) -> Self + where + T: Into, + { + Self(Rc::new(RefCell::new(TestSeqInner { + read_buf: data.into(), + write_buf: BytesMut::new(), + err: None, + }))) + } + + /// Create new empty `TestBuffer` instance. + pub fn empty() -> Self { + Self::new("") + } + + pub fn read_buf(&self) -> Ref<'_, BytesMut> { + Ref::map(self.0.borrow(), |inner| &inner.read_buf) + } + + pub fn write_buf(&self) -> Ref<'_, BytesMut> { + Ref::map(self.0.borrow(), |inner| &inner.write_buf) + } + + pub fn err(&self) -> Ref<'_, Option> { + Ref::map(self.0.borrow(), |inner| &inner.err) + } + + /// Add data to read buffer. + pub fn extend_read_buf>(&mut self, data: T) { + self.0 + .borrow_mut() + .read_buf + .extend_from_slice(data.as_ref()) + } +} + +pub struct TestSeqInner { + read_buf: BytesMut, + write_buf: BytesMut, + err: Option, +} + +impl io::Read for TestSeqBuffer { + fn read(&mut self, dst: &mut [u8]) -> Result { + if self.0.borrow().read_buf.is_empty() { + if self.0.borrow().err.is_some() { + Err(self.0.borrow_mut().err.take().unwrap()) + } else { + Err(io::Error::new(io::ErrorKind::WouldBlock, "")) + } + } else { + let size = std::cmp::min(self.0.borrow().read_buf.len(), dst.len()); + let b = self.0.borrow_mut().read_buf.split_to(size); + dst[..size].copy_from_slice(&b); + Ok(size) + } + } +} + +impl io::Write for TestSeqBuffer { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.borrow_mut().write_buf.extend(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncRead for TestSeqBuffer { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let r = self.get_mut().read(buf); + match r { + Ok(n) => Poll::Ready(Ok(n)), + Err(err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending, + Err(err) => Poll::Ready(Err(err)), + } + } +} + +impl AsyncWrite for TestSeqBuffer { + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(self.get_mut().write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/actix-http/src/ws/mod.rs b/actix-http/src/ws/mod.rs index 6ffdecc35..cd212fb7e 100644 --- a/actix-http/src/ws/mod.rs +++ b/actix-http/src/ws/mod.rs @@ -197,13 +197,13 @@ mod tests { let req = TestRequest::default().method(Method::POST).finish(); assert_eq!( HandshakeError::GetMethodRequired, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default().finish(); assert_eq!( HandshakeError::NoWebsocketUpgrade, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() @@ -211,7 +211,7 @@ mod tests { .finish(); assert_eq!( HandshakeError::NoWebsocketUpgrade, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() @@ -222,7 +222,7 @@ mod tests { .finish(); assert_eq!( HandshakeError::NoConnectionUpgrade, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() @@ -237,7 +237,7 @@ mod tests { .finish(); assert_eq!( HandshakeError::NoVersionHeader, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() @@ -256,7 +256,7 @@ mod tests { .finish(); assert_eq!( HandshakeError::UnsupportedVersion, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() @@ -275,7 +275,7 @@ mod tests { .finish(); assert_eq!( HandshakeError::BadWebsocketKey, - verify_handshake(req.head()).err().unwrap() + verify_handshake(req.head()).unwrap_err(), ); let req = TestRequest::default() diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 7ca415336..e4f801bbe 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,6 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx +### Changed +* Bumped `rand` to `0.8` ## 2.0.3 - 2020-11-29 diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 3c1963d6b..2e92526d2 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -50,7 +50,7 @@ futures-core = { version = "0.3.5", default-features = false } log =" 0.4" mime = "0.3" percent-encoding = "2.1" -rand = "0.7" +rand = "0.8" serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.7" diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index a9552d0d5..0024c6652 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -480,6 +480,7 @@ async fn test_client_gzip_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .take(100_000) + .map(char::from) .collect::(); let srv = test::start(|| { @@ -529,6 +530,7 @@ async fn test_client_brotli_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&rand::distributions::Alphanumeric) .take(70_000) + .map(char::from) .collect::(); let srv = test::start(|| { diff --git a/src/middleware/normalize.rs b/src/middleware/normalize.rs index ac8ad71d5..ad9f51079 100644 --- a/src/middleware/normalize.rs +++ b/src/middleware/normalize.rs @@ -1,10 +1,11 @@ -//! `Middleware` to normalize request's URI +//! For middleware documentation, see [`NormalizePath`]. + use std::task::{Context, Poll}; use actix_http::http::{PathAndQuery, Uri}; use actix_service::{Service, Transform}; use bytes::Bytes; -use futures_util::future::{ok, Ready}; +use futures_util::future::{ready, Ready}; use regex::Regex; use crate::service::{ServiceRequest, ServiceResponse}; @@ -17,10 +18,12 @@ pub enum TrailingSlash { /// Always add a trailing slash to the end of the path. /// This will require all routes to end in a trailing slash for them to be accessible. Always, + /// Only merge any present multiple trailing slashes. /// - /// Note: This option provides the best compatibility with the v2 version of this middlware. + /// Note: This option provides the best compatibility with the v2 version of this middleware. MergeOnly, + /// Trim trailing slashes from the end of the path. Trim, } @@ -32,28 +35,53 @@ impl Default for TrailingSlash { } #[derive(Default, Clone, Copy)] -/// `Middleware` to normalize request's URI in place +/// Middleware to normalize a request's path so that routes can be matched less strictly. /// -/// Performs following: -/// -/// - Merges multiple slashes into one. +/// # Normalization Steps +/// - Merges multiple consecutive slashes into one. (For example, `/path//one` always +/// becomes `/path/one`.) /// - Appends a trailing slash if one is not present, removes one if present, or keeps trailing -/// slashes as-is, depending on the supplied `TrailingSlash` variant. +/// slashes as-is, depending on which [`TrailingSlash`] variant is supplied +/// to [`new`](NormalizePath::new()). /// +/// # Default Behavior +/// The default constructor chooses to strip trailing slashes from the end +/// ([`TrailingSlash::Trim`]), the effect is that route definitions should be defined without +/// trailing slashes or else they will be inaccessible. +/// +/// # Example /// ```rust -/// use actix_web::{web, http, middleware, App, HttpResponse}; +/// use actix_web::{web, middleware, App}; /// -/// # fn main() { +/// # #[actix_rt::test] +/// # async fn normalize() { /// let app = App::new() /// .wrap(middleware::NormalizePath::default()) -/// .service( -/// web::resource("/test") -/// .route(web::get().to(|| HttpResponse::Ok())) -/// .route(web::method(http::Method::HEAD).to(|| HttpResponse::MethodNotAllowed())) -/// ); +/// .route("/test", web::get().to(|| async { "test" })) +/// .route("/unmatchable/", web::get().to(|| async { "unmatchable" })); +/// +/// use actix_web::http::StatusCode; +/// use actix_web::test::{call_service, init_service, TestRequest}; +/// +/// let mut app = init_service(app).await; +/// +/// let req = TestRequest::with_uri("/test").to_request(); +/// let res = call_service(&mut app, req).await; +/// assert_eq!(res.status(), StatusCode::OK); +/// +/// let req = TestRequest::with_uri("/test/").to_request(); +/// let res = call_service(&mut app, req).await; +/// assert_eq!(res.status(), StatusCode::OK); +/// +/// let req = TestRequest::with_uri("/unmatchable").to_request(); +/// let res = call_service(&mut app, req).await; +/// assert_eq!(res.status(), StatusCode::NOT_FOUND); +/// +/// let req = TestRequest::with_uri("/unmatchable/").to_request(); +/// let res = call_service(&mut app, req).await; +/// assert_eq!(res.status(), StatusCode::NOT_FOUND); /// # } /// ``` - pub struct NormalizePath(TrailingSlash); impl NormalizePath { @@ -76,11 +104,11 @@ where type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { - ok(NormalizePathNormalization { + ready(Ok(NormalizePathNormalization { service, merge_slash: Regex::new("//+").unwrap(), trailing_slash_behavior: self.0, - }) + })) } } @@ -160,9 +188,11 @@ mod tests { use actix_service::IntoService; use super::*; - use crate::dev::ServiceRequest; - use crate::test::{call_service, init_service, TestRequest}; - use crate::{web, App, HttpResponse}; + use crate::{ + dev::ServiceRequest, + test::{call_service, init_service, TestRequest}, + web, App, HttpResponse, + }; #[actix_rt::test] async fn test_wrap() { @@ -244,7 +274,7 @@ mod tests { } #[actix_rt::test] - async fn keep_trailing_slash_unchange() { + async fn keep_trailing_slash_unchanged() { let mut app = init_service( App::new() .wrap(NormalizePath(TrailingSlash::MergeOnly)) @@ -279,7 +309,7 @@ mod tests { async fn test_in_place_normalization() { let srv = |req: ServiceRequest| { assert_eq!("/v1/something/", req.path()); - ok(req.into_response(HttpResponse::Ok().finish())) + ready(Ok(req.into_response(HttpResponse::Ok().finish()))) }; let mut normalize = NormalizePath::default() @@ -310,7 +340,7 @@ mod tests { let srv = |req: ServiceRequest| { assert_eq!(URI, req.path()); - ok(req.into_response(HttpResponse::Ok().finish())) + ready(Ok(req.into_response(HttpResponse::Ok().finish()))) }; let mut normalize = NormalizePath::default() @@ -324,12 +354,12 @@ mod tests { } #[actix_rt::test] - async fn should_normalize_notrail() { + async fn should_normalize_no_trail() { const URI: &str = "/v1/something"; let srv = |req: ServiceRequest| { assert_eq!(URI.to_string() + "/", req.path()); - ok(req.into_response(HttpResponse::Ok().finish())) + ready(Ok(req.into_response(HttpResponse::Ok().finish()))) }; let mut normalize = NormalizePath::default() diff --git a/tests/test_server.rs b/tests/test_server.rs index f8a9ab86d..c6c316f0d 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -248,6 +248,7 @@ async fn test_body_gzip_large_random() { let data = rand::thread_rng() .sample_iter(&Alphanumeric) .take(70_000) + .map(char::from) .collect::(); let srv_data = data.clone(); @@ -529,6 +530,7 @@ async fn test_reading_gzip_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&Alphanumeric) .take(60_000) + .map(char::from) .collect::(); let srv = test::start_with(test::config().h1(), || { @@ -614,6 +616,7 @@ async fn test_reading_deflate_encoding_large_random() { let data = rand::thread_rng() .sample_iter(&Alphanumeric) .take(160_000) + .map(char::from) .collect::(); let srv = test::start_with(test::config().h1(), || { @@ -672,6 +675,7 @@ async fn test_brotli_encoding_large() { let data = rand::thread_rng() .sample_iter(&Alphanumeric) .take(320_000) + .map(char::from) .collect::(); let srv = test::start_with(test::config().h1(), || { @@ -753,6 +757,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() { let data = rand::thread_rng() .sample_iter(&Alphanumeric) .take(160_000) + .map(char::from) .collect::(); // load ssl keys