add pipeline dispatch test

This commit is contained in:
Rob Ede 2020-12-19 02:20:44 +00:00
parent 95ccf1c9bc
commit 0c81d9634a
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
5 changed files with 122 additions and 23 deletions

View File

@ -4,12 +4,12 @@ use std::task::{Context, Poll};
use actix_service::Service; use actix_service::Service;
#[doc(hidden)]
/// Service that allows to turn non-clone service to a service with `Clone` impl /// Service that allows to turn non-clone service to a service with `Clone` impl
/// ///
/// # Panics /// # Panics
/// CloneableService might panic with some creative use of thread local storage. /// CloneableService might panic with some creative use of thread local storage.
/// See https://github.com/actix/actix-web/issues/1295 for example /// See https://github.com/actix/actix-web/issues/1295 for example
#[doc(hidden)]
pub(crate) struct CloneableService<T: Service>(Rc<RefCell<T>>); pub(crate) struct CloneableService<T: Service>(Rc<RefCell<T>>);
impl<T: Service> CloneableService<T> { impl<T: Service> CloneableService<T> {

View File

@ -3,8 +3,8 @@ use std::{fmt, mem};
use fxhash::FxHashMap; use fxhash::FxHashMap;
#[derive(Default)]
/// A type map of request extensions. /// A type map of request extensions.
#[derive(Default)]
pub struct Extensions { pub struct Extensions {
/// Use FxHasher with a std HashMap with for faster /// Use FxHasher with a std HashMap with for faster
/// lookups on the small `TypeId` (u64 equivalent) keys. /// lookups on the small `TypeId` (u64 equivalent) keys.

View File

@ -58,6 +58,7 @@ impl Codec {
} else { } else {
Flags::empty() Flags::empty()
}; };
Codec { Codec {
config, config,
flags, flags,
@ -69,26 +70,26 @@ impl Codec {
} }
} }
/// Check if request is upgrade.
#[inline] #[inline]
/// Check if request is upgrade
pub fn upgrade(&self) -> bool { pub fn upgrade(&self) -> bool {
self.ctype == ConnectionType::Upgrade self.ctype == ConnectionType::Upgrade
} }
/// Check if last response is keep-alive.
#[inline] #[inline]
/// Check if last response is keep-alive
pub fn keepalive(&self) -> bool { pub fn keepalive(&self) -> bool {
self.ctype == ConnectionType::KeepAlive self.ctype == ConnectionType::KeepAlive
} }
/// Check if keep-alive enabled on server level.
#[inline] #[inline]
/// Check if keep-alive enabled on server level
pub fn keepalive_enabled(&self) -> bool { pub fn keepalive_enabled(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE_ENABLED) self.flags.contains(Flags::KEEPALIVE_ENABLED)
} }
/// Check last request's message type.
#[inline] #[inline]
/// Check last request's message type
pub fn message_type(&self) -> MessageType { pub fn message_type(&self) -> MessageType {
if self.flags.contains(Flags::STREAM) { if self.flags.contains(Flags::STREAM) {
MessageType::Stream MessageType::Stream

View File

@ -59,6 +59,9 @@ where
{ {
#[pin] #[pin]
inner: DispatcherState<T, S, B, X, U>, inner: DispatcherState<T, S, B, X, U>,
#[cfg(test)]
poll_count: u64,
} }
#[pin_project(project = DispatcherStateProj)] #[pin_project(project = DispatcherStateProj)]
@ -247,6 +250,9 @@ where
ka_expire, ka_expire,
ka_timer, ka_timer,
}), }),
#[cfg(test)]
poll_count: 0,
} }
} }
} }
@ -511,12 +517,12 @@ where
} }
} }
/// Process one incoming requests /// Process one incoming request.
pub(self) fn poll_request( pub(self) fn poll_request(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, DispatchError> { ) -> Result<bool, DispatchError> {
// limit a mount of non processed requests // limit amount of non-processed requests
if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) {
return Ok(false); return Ok(false);
} }
@ -725,6 +731,12 @@ where
#[inline] #[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project(); let this = self.as_mut().project();
#[cfg(test)]
{
*this.poll_count += 1;
}
match this.inner.project() { match this.inner.project() {
DispatcherStateProj::Normal(mut inner) => { DispatcherStateProj::Normal(mut inner) => {
inner.as_mut().poll_keepalive(cx)?; inner.as_mut().poll_keepalive(cx)?;
@ -833,15 +845,21 @@ where
&& !inner_p.flags.intersects(Flags::KEEPALIVE) && !inner_p.flags.intersects(Flags::KEEPALIVE)
{ {
inner_p.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
eprintln!("flag shutdown inserted, re-poll");
self.poll(cx) self.poll(cx)
} }
// disconnect if shutdown // disconnect if shutdown
else if inner_p.flags.contains(Flags::SHUTDOWN) { else if inner_p.flags.contains(Flags::SHUTDOWN) {
eprintln!("flag shutdown set, re-poll");
self.poll(cx) self.poll(cx)
} else { } else {
eprintln!("no error, not started or KA, not shutdown");
eprintln!("{:?}", &inner_p.flags);
Poll::Pending Poll::Pending
} }
} else { } else {
eprintln!("is_empty and write_buf is_empty");
eprintln!("{:?}", &inner_p.flags);
Poll::Pending Poll::Pending
} }
} }
@ -854,6 +872,11 @@ where
} }
} }
/// Returns either:
/// - `Ok(Some(true))` - data was read and done reading all data.
/// - `Ok(Some(false))` - data was read but there should be more to read.
/// - `Ok(None)` - no data was read but there should be more to read later.
/// - Unhandled Errors
fn read_available<T>( fn read_available<T>(
cx: &mut Context<'_>, cx: &mut Context<'_>,
io: &mut T, io: &mut T,
@ -887,17 +910,17 @@ where
read_some = true; read_some = true;
} }
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(err)) => {
return if e.kind() == io::ErrorKind::WouldBlock { return if err.kind() == io::ErrorKind::WouldBlock {
if read_some { if read_some {
Ok(Some(false)) Ok(Some(false))
} else { } else {
Ok(None) Ok(None)
} }
} else if e.kind() == io::ErrorKind::ConnectionReset && read_some { } else if err.kind() == io::ErrorKind::ConnectionReset && read_some {
Ok(Some(true)) Ok(Some(true))
} else { } else {
Err(e) Err(err)
} }
} }
} }
@ -918,12 +941,33 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::IntoService; use actix_service::IntoService;
use futures_util::future::{lazy, ok}; use futures_util::future::{lazy, ready};
use super::*; use super::*;
use crate::error::Error;
use crate::h1::{ExpectHandler, UpgradeHandler}; use crate::h1::{ExpectHandler, UpgradeHandler};
use crate::test::TestBuffer; use crate::test::TestBuffer;
use crate::{error::Error, KeepAlive};
fn ok_service() -> impl Service<Request = Request, Response = Response, Error = Error>
{
(|_| ready(Ok::<_, Error>(Response::Ok().finish()))).into_service()
}
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
haystack[from..]
.windows(needle.len())
.position(|window| window == needle)
}
fn stabilize_date_header(payload: &mut [u8]) {
let mut from = 0;
while let Some(pos) = find_slice(&payload, b"date", from) {
payload[(from + pos)..(from + pos + 35)]
.copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC");
from += 35;
}
}
#[actix_rt::test] #[actix_rt::test]
async fn test_req_parse_err() { async fn test_req_parse_err() {
@ -933,9 +977,7 @@ mod tests {
let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new( let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
buf, buf,
ServiceConfig::default(), ServiceConfig::default(),
CloneableService::new( CloneableService::new(ok_service()),
(|_| ok::<_, Error>(Response::Ok().finish())).into_service(),
),
CloneableService::new(ExpectHandler), CloneableService::new(ExpectHandler),
None, None,
None, None,
@ -958,4 +1000,59 @@ mod tests {
}) })
.await; .await;
} }
#[actix_rt::test]
async fn test_pipelining() {
lazy(|cx| {
let buf = TestBuffer::new(
"\
GET /test HTTP/1.1\r\n\r\n\
GET /test HTTP/1.1\r\n\r\n\
",
);
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<TestBuffer>>::new(
buf,
cfg,
CloneableService::new(ok_service()),
CloneableService::new(ExpectHandler),
None,
None,
Extensions::new(),
None,
);
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
match Pin::new(&mut h1).poll(cx) {
Poll::Pending => panic!("first poll should not be pending"),
Poll::Ready(res) => assert!(res.is_ok()),
}
// polls: initial => shutdown
assert_eq!(h1.poll_count, 2);
if let DispatcherState::Normal(ref mut inner) = h1.inner {
let res = &mut inner.io.take().unwrap().write_buf[..];
stabilize_date_header(res);
assert_eq!(
&res,
b"\
HTTP/1.1 200 OK\r\n\
content-length: 0\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
HTTP/1.1 200 OK\r\n\
content-length: 0\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
"
);
}
})
.await;
}
} }

View File

@ -183,7 +183,7 @@ fn parts(parts: &mut Option<Inner>) -> &mut Inner {
parts.as_mut().expect("cannot reuse test request builder") parts.as_mut().expect("cannot reuse test request builder")
} }
/// Async io buffer /// Async I/O test buffer.
pub struct TestBuffer { pub struct TestBuffer {
pub read_buf: BytesMut, pub read_buf: BytesMut,
pub write_buf: BytesMut, pub write_buf: BytesMut,
@ -191,24 +191,24 @@ pub struct TestBuffer {
} }
impl TestBuffer { impl TestBuffer {
/// Create new TestBuffer instance /// Create new `TestBuffer` instance with initial read buffer.
pub fn new<T>(data: T) -> TestBuffer pub fn new<T>(data: T) -> TestBuffer
where where
BytesMut: From<T>, T: Into<BytesMut>,
{ {
TestBuffer { TestBuffer {
read_buf: BytesMut::from(data), read_buf: data.into(),
write_buf: BytesMut::new(), write_buf: BytesMut::new(),
err: None, err: None,
} }
} }
/// Create new empty TestBuffer instance /// Create new empty `TestBuffer` instance.
pub fn empty() -> TestBuffer { pub fn empty() -> TestBuffer {
TestBuffer::new("") TestBuffer::new("")
} }
/// Add extra data to read buffer. /// Add data to read buffer.
pub fn extend_read_buf<T: AsRef<[u8]>>(&mut self, data: T) { pub fn extend_read_buf<T: AsRef<[u8]>>(&mut self, data: T) {
self.read_buf.extend_from_slice(data.as_ref()) self.read_buf.extend_from_slice(data.as_ref())
} }
@ -236,6 +236,7 @@ impl io::Write for TestBuffer {
self.write_buf.extend(buf); self.write_buf.extend(buf);
Ok(buf.len()) Ok(buf.len())
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
Ok(()) Ok(())
} }