mirror of https://github.com/fafhrd91/actix-web
add expect-continue test
This commit is contained in:
parent
0d3d37d79c
commit
794da673fa
|
@ -815,8 +815,11 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
// we didn't get WouldBlock from write operation,
|
// we didn't get WouldBlock from write operation,
|
||||||
// so data get written to kernel completely (OSX)
|
// so data get written to kernel completely (macOS)
|
||||||
// and we have to write again otherwise response can get stuck
|
// and we have to write again otherwise response can get stuck
|
||||||
|
//
|
||||||
|
// TODO: what? is WouldBlock good or bad?
|
||||||
|
// want to find a reference for this macOS behavior
|
||||||
if inner.as_mut().poll_flush(cx)? || !drain {
|
if inner.as_mut().poll_flush(cx)? || !drain {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -845,21 +848,15 @@ where
|
||||||
&& !inner_p.flags.intersects(Flags::KEEPALIVE)
|
&& !inner_p.flags.intersects(Flags::KEEPALIVE)
|
||||||
{
|
{
|
||||||
inner_p.flags.insert(Flags::SHUTDOWN);
|
inner_p.flags.insert(Flags::SHUTDOWN);
|
||||||
eprintln!("flag shutdown inserted, re-poll");
|
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
}
|
}
|
||||||
// disconnect if shutdown
|
// disconnect if shutdown
|
||||||
else if inner_p.flags.contains(Flags::SHUTDOWN) {
|
else if inner_p.flags.contains(Flags::SHUTDOWN) {
|
||||||
eprintln!("flag shutdown set, re-poll");
|
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
} else {
|
} else {
|
||||||
eprintln!("no error, not started or KA, not shutdown");
|
|
||||||
eprintln!("{:?}", &inner_p.flags);
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
eprintln!("is_empty and write_buf is_empty");
|
|
||||||
eprintln!("{:?}", &inner_p.flags);
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -940,13 +937,18 @@ where
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::str;
|
||||||
|
|
||||||
use actix_service::fn_service;
|
use actix_service::fn_service;
|
||||||
use futures_util::future::{lazy, ready};
|
use futures_util::future::{lazy, ready};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::h1::{ExpectHandler, UpgradeHandler};
|
|
||||||
use crate::test::TestBuffer;
|
use crate::test::TestBuffer;
|
||||||
use crate::{error::Error, KeepAlive};
|
use crate::{error::Error, KeepAlive};
|
||||||
|
use crate::{
|
||||||
|
h1::{ExpectHandler, UpgradeHandler},
|
||||||
|
test::TestSeqBuffer,
|
||||||
|
};
|
||||||
|
|
||||||
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
||||||
haystack[from..]
|
haystack[from..]
|
||||||
|
@ -977,6 +979,23 @@ mod tests {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn echo_payload_service(
|
||||||
|
) -> impl Service<Request = Request, Response = Response, Error = Error> {
|
||||||
|
fn_service(|mut req: Request| {
|
||||||
|
Box::pin(async move {
|
||||||
|
use futures_util::stream::StreamExt as _;
|
||||||
|
|
||||||
|
let mut pl = req.take_payload();
|
||||||
|
let mut body = BytesMut::new();
|
||||||
|
while let Some(chunk) = pl.next().await {
|
||||||
|
body.extend_from_slice(chunk.unwrap().bytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok::<_, Error>(Response::Ok().body(body))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_req_parse_err() {
|
async fn test_req_parse_err() {
|
||||||
lazy(|cx| {
|
lazy(|cx| {
|
||||||
|
@ -1116,4 +1135,74 @@ mod tests {
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_expect() {
|
||||||
|
lazy(|cx| {
|
||||||
|
let mut buf = TestSeqBuffer::empty();
|
||||||
|
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None);
|
||||||
|
let mut h1 = Dispatcher::<_, _, _, _, UpgradeHandler<_>>::new(
|
||||||
|
buf.clone(),
|
||||||
|
cfg,
|
||||||
|
CloneableService::new(echo_payload_service()),
|
||||||
|
CloneableService::new(ExpectHandler),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Extensions::new(),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
buf.extend_read_buf(
|
||||||
|
"\
|
||||||
|
POST /upload HTTP/1.1\r\n\
|
||||||
|
Content-Length: 5\r\n\
|
||||||
|
Expect: 100-continue\r\n\
|
||||||
|
\r\n\
|
||||||
|
",
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(Pin::new(&mut h1).poll(cx).is_pending());
|
||||||
|
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
|
||||||
|
|
||||||
|
// polls: manual
|
||||||
|
assert_eq!(h1.poll_count, 1);
|
||||||
|
eprintln!("poll count: {}", h1.poll_count);
|
||||||
|
|
||||||
|
if let DispatcherState::Normal(ref inner) = h1.inner {
|
||||||
|
let io = inner.io.as_ref().unwrap();
|
||||||
|
let res = &io.write_buf()[..];
|
||||||
|
assert_eq!(
|
||||||
|
str::from_utf8(res).unwrap(),
|
||||||
|
"HTTP/1.1 100 Continue\r\n\r\n"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_read_buf("12345");
|
||||||
|
assert!(Pin::new(&mut h1).poll(cx).is_ready());
|
||||||
|
|
||||||
|
// polls: manual manual shutdown
|
||||||
|
assert_eq!(h1.poll_count, 3);
|
||||||
|
|
||||||
|
if let DispatcherState::Normal(ref inner) = h1.inner {
|
||||||
|
let io = inner.io.as_ref().unwrap();
|
||||||
|
let mut res = (&io.write_buf()[..]).to_owned();
|
||||||
|
stabilize_date_header(&mut res);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
str::from_utf8(&res).unwrap(),
|
||||||
|
"\
|
||||||
|
HTTP/1.1 100 Continue\r\n\
|
||||||
|
\r\n\
|
||||||
|
HTTP/1.1 200 OK\r\n\
|
||||||
|
content-length: 5\r\n\
|
||||||
|
connection: close\r\n\
|
||||||
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
|
||||||
|
\r\n\
|
||||||
|
12345\
|
||||||
|
"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use futures_util::future::{ok, Ready};
|
use futures_util::future::{ready, Ready};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::request::Request;
|
use crate::request::Request;
|
||||||
|
@ -17,8 +17,8 @@ impl ServiceFactory for ExpectHandler {
|
||||||
type InitError = Error;
|
type InitError = Error;
|
||||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
fn new_service(&self, _: Self::Config) -> Self::Future {
|
||||||
ok(ExpectHandler)
|
ready(Ok(ExpectHandler))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,6 +33,8 @@ impl Service for ExpectHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, req: Request) -> Self::Future {
|
fn call(&mut self, req: Request) -> Self::Future {
|
||||||
ok(req)
|
ready(Ok(req))
|
||||||
|
// TODO: add some way to trigger error
|
||||||
|
// Err(error::ErrorExpectationFailed("test"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,14 @@
|
||||||
//! Test Various helpers for Actix applications to use during testing.
|
//! Various testing helpers for use in internal and app tests.
|
||||||
use std::convert::TryFrom;
|
|
||||||
use std::io::{self, Read, Write};
|
use std::{
|
||||||
use std::pin::Pin;
|
cell::{Ref, RefCell},
|
||||||
use std::str::FromStr;
|
convert::TryFrom,
|
||||||
use std::task::{Context, Poll};
|
io::{self, Read, Write},
|
||||||
|
pin::Pin,
|
||||||
|
rc::Rc,
|
||||||
|
str::FromStr,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
|
@ -192,11 +197,11 @@ pub struct TestBuffer {
|
||||||
|
|
||||||
impl TestBuffer {
|
impl TestBuffer {
|
||||||
/// Create new `TestBuffer` instance with initial read buffer.
|
/// Create new `TestBuffer` instance with initial read buffer.
|
||||||
pub fn new<T>(data: T) -> TestBuffer
|
pub fn new<T>(data: T) -> Self
|
||||||
where
|
where
|
||||||
T: Into<BytesMut>,
|
T: Into<BytesMut>,
|
||||||
{
|
{
|
||||||
TestBuffer {
|
Self {
|
||||||
read_buf: data.into(),
|
read_buf: data.into(),
|
||||||
write_buf: BytesMut::new(),
|
write_buf: BytesMut::new(),
|
||||||
err: None,
|
err: None,
|
||||||
|
@ -204,8 +209,8 @@ impl TestBuffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create new empty `TestBuffer` instance.
|
/// Create new empty `TestBuffer` instance.
|
||||||
pub fn empty() -> TestBuffer {
|
pub fn empty() -> Self {
|
||||||
TestBuffer::new("")
|
Self::new("")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add data to read buffer.
|
/// Add data to read buffer.
|
||||||
|
@ -269,3 +274,113 @@ impl AsyncWrite for TestBuffer {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Async I/O test buffer with ability to incrementally add to the read buffer.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct TestSeqBuffer(Rc<RefCell<TestSeqInner>>);
|
||||||
|
|
||||||
|
impl TestSeqBuffer {
|
||||||
|
/// Create new `TestBuffer` instance with initial read buffer.
|
||||||
|
pub fn new<T>(data: T) -> Self
|
||||||
|
where
|
||||||
|
T: Into<BytesMut>,
|
||||||
|
{
|
||||||
|
Self(Rc::new(RefCell::new(TestSeqInner {
|
||||||
|
read_buf: data.into(),
|
||||||
|
write_buf: BytesMut::new(),
|
||||||
|
err: None,
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create new empty `TestBuffer` instance.
|
||||||
|
pub fn empty() -> Self {
|
||||||
|
Self::new("")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_buf(&self) -> Ref<'_, BytesMut> {
|
||||||
|
Ref::map(self.0.borrow(), |inner| &inner.read_buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_buf(&self) -> Ref<'_, BytesMut> {
|
||||||
|
Ref::map(self.0.borrow(), |inner| &inner.write_buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn err(&self) -> Ref<'_, Option<io::Error>> {
|
||||||
|
Ref::map(self.0.borrow(), |inner| &inner.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add data to read buffer.
|
||||||
|
pub fn extend_read_buf<T: AsRef<[u8]>>(&mut self, data: T) {
|
||||||
|
self.0
|
||||||
|
.borrow_mut()
|
||||||
|
.read_buf
|
||||||
|
.extend_from_slice(data.as_ref())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TestSeqInner {
|
||||||
|
read_buf: BytesMut,
|
||||||
|
write_buf: BytesMut,
|
||||||
|
err: Option<io::Error>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Read for TestSeqBuffer {
|
||||||
|
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
|
||||||
|
if self.0.borrow().read_buf.is_empty() {
|
||||||
|
if self.0.borrow().err.is_some() {
|
||||||
|
Err(self.0.borrow_mut().err.take().unwrap())
|
||||||
|
} else {
|
||||||
|
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let size = std::cmp::min(self.0.borrow().read_buf.len(), dst.len());
|
||||||
|
let b = self.0.borrow_mut().read_buf.split_to(size);
|
||||||
|
dst[..size].copy_from_slice(&b);
|
||||||
|
Ok(size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for TestSeqBuffer {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.0.borrow_mut().write_buf.extend(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncRead for TestSeqBuffer {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
let r = self.get_mut().read(buf);
|
||||||
|
match r {
|
||||||
|
Ok(n) => Poll::Ready(Ok(n)),
|
||||||
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
|
||||||
|
Err(err) => Poll::Ready(Err(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWrite for TestSeqBuffer {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Poll::Ready(self.get_mut().write(buf))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -197,13 +197,13 @@ mod tests {
|
||||||
let req = TestRequest::default().method(Method::POST).finish();
|
let req = TestRequest::default().method(Method::POST).finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::GetMethodRequired,
|
HandshakeError::GetMethodRequired,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default().finish();
|
let req = TestRequest::default().finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::NoWebsocketUpgrade,
|
HandshakeError::NoWebsocketUpgrade,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
@ -211,7 +211,7 @@ mod tests {
|
||||||
.finish();
|
.finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::NoWebsocketUpgrade,
|
HandshakeError::NoWebsocketUpgrade,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
@ -222,7 +222,7 @@ mod tests {
|
||||||
.finish();
|
.finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::NoConnectionUpgrade,
|
HandshakeError::NoConnectionUpgrade,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
@ -237,7 +237,7 @@ mod tests {
|
||||||
.finish();
|
.finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::NoVersionHeader,
|
HandshakeError::NoVersionHeader,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
@ -256,7 +256,7 @@ mod tests {
|
||||||
.finish();
|
.finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::UnsupportedVersion,
|
HandshakeError::UnsupportedVersion,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
@ -275,7 +275,7 @@ mod tests {
|
||||||
.finish();
|
.finish();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
HandshakeError::BadWebsocketKey,
|
HandshakeError::BadWebsocketKey,
|
||||||
verify_handshake(req.head()).err().unwrap()
|
verify_handshake(req.head()).unwrap_err(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let req = TestRequest::default()
|
let req = TestRequest::default()
|
||||||
|
|
Loading…
Reference in New Issue