mirror of https://github.com/fafhrd91/actix-web
1585 lines
42 KiB
Rust
1585 lines
42 KiB
Rust
use std::{
|
|
cell::Cell,
|
|
future::Future,
|
|
io,
|
|
pin::Pin,
|
|
rc::Rc,
|
|
str,
|
|
task::{Context, Poll},
|
|
time::Duration,
|
|
};
|
|
|
|
use actix_codec::Framed;
|
|
use actix_rt::{
|
|
pin,
|
|
time::{sleep, timeout},
|
|
};
|
|
use actix_service::{fn_service, Service};
|
|
use actix_utils::future::{ready, Ready};
|
|
use bytes::{Buf, Bytes, BytesMut};
|
|
use futures_util::future::lazy;
|
|
|
|
use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags};
|
|
use crate::{
|
|
body::{BoxBody, MessageBody},
|
|
config::ServiceConfig,
|
|
h1::{Codec, ExpectHandler, UpgradeHandler},
|
|
service::HttpFlow,
|
|
test::{TestBuffer, TestSeqBuffer},
|
|
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode,
|
|
};
|
|
|
|
struct YieldService;
|
|
|
|
impl Service<Request> for YieldService {
|
|
type Response = Response<BoxBody>;
|
|
type Error = Response<BoxBody>;
|
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
|
|
|
|
actix_service::always_ready!();
|
|
|
|
fn call(&self, _: Request) -> Self::Future {
|
|
Box::pin(async {
|
|
// Yield twice because the dispatcher can poll the service twice per dispatcher's poll:
|
|
// once in `handle_request` and another in `poll_response`
|
|
actix_rt::task::yield_now().await;
|
|
actix_rt::task::yield_now().await;
|
|
Ok(Response::ok())
|
|
})
|
|
}
|
|
}
|
|
|
|
struct ReadyChunkBody {
|
|
chunk_polls: Rc<Cell<usize>>,
|
|
remaining: usize,
|
|
chunk_len: usize,
|
|
}
|
|
|
|
impl ReadyChunkBody {
|
|
fn new(chunk_polls: Rc<Cell<usize>>, remaining: usize, chunk_len: usize) -> Self {
|
|
Self {
|
|
chunk_polls,
|
|
remaining,
|
|
chunk_len,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl MessageBody for ReadyChunkBody {
|
|
type Error = Error;
|
|
|
|
fn size(&self) -> crate::body::BodySize {
|
|
crate::body::BodySize::Stream
|
|
}
|
|
|
|
fn poll_next(
|
|
mut self: Pin<&mut Self>,
|
|
_: &mut Context<'_>,
|
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
|
if self.remaining == 0 {
|
|
return Poll::Ready(None);
|
|
}
|
|
|
|
self.remaining -= 1;
|
|
self.chunk_polls.set(self.chunk_polls.get() + 1);
|
|
|
|
Poll::Ready(Some(Ok(Bytes::from(vec![b'x'; self.chunk_len]))))
|
|
}
|
|
}
|
|
|
|
struct PendingOnceWriteBuf {
|
|
io: TestBuffer,
|
|
block_next_write: bool,
|
|
}
|
|
|
|
impl PendingOnceWriteBuf {
|
|
fn new<T>(data: T) -> Self
|
|
where
|
|
T: Into<BytesMut>,
|
|
{
|
|
Self {
|
|
io: TestBuffer::new(data),
|
|
block_next_write: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl io::Read for PendingOnceWriteBuf {
|
|
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
|
|
self.io.read(dst)
|
|
}
|
|
}
|
|
|
|
impl io::Write for PendingOnceWriteBuf {
|
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
self.io.write(buf)
|
|
}
|
|
|
|
fn flush(&mut self) -> io::Result<()> {
|
|
self.io.flush()
|
|
}
|
|
}
|
|
|
|
impl actix_codec::AsyncRead for PendingOnceWriteBuf {
|
|
fn poll_read(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &mut actix_codec::ReadBuf<'_>,
|
|
) -> Poll<io::Result<()>> {
|
|
Pin::new(&mut self.io).poll_read(cx, buf)
|
|
}
|
|
}
|
|
|
|
impl actix_codec::AsyncWrite for PendingOnceWriteBuf {
|
|
fn poll_write(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context<'_>,
|
|
buf: &[u8],
|
|
) -> Poll<io::Result<usize>> {
|
|
if self.block_next_write {
|
|
self.block_next_write = false;
|
|
cx.waker().wake_by_ref();
|
|
return Poll::Pending;
|
|
}
|
|
|
|
Pin::new(&mut self.io).poll_write(cx, buf)
|
|
}
|
|
|
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
Pin::new(&mut self.io).poll_flush(cx)
|
|
}
|
|
|
|
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
|
Pin::new(&mut self.io).poll_shutdown(cx)
|
|
}
|
|
}
|
|
|
|
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
|
memchr::memmem::find(&haystack[from..], needle)
|
|
}
|
|
|
|
fn stabilize_date_header(payload: &mut [u8]) {
|
|
let mut from = 0;
|
|
while let Some(pos) = find_slice(payload, b"date", from) {
|
|
payload[(from + pos)..(from + pos + 35)]
|
|
.copy_from_slice(b"date: Thu, 01 Jan 1970 12:34:56 UTC");
|
|
from += 35;
|
|
}
|
|
}
|
|
|
|
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
|
status_service(StatusCode::OK)
|
|
}
|
|
|
|
fn status_service(
|
|
status: StatusCode,
|
|
) -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
|
|
fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status))))
|
|
}
|
|
|
|
fn echo_path_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error>
|
|
{
|
|
fn_service(|req: Request| {
|
|
let path = req.path().as_bytes();
|
|
ready(Ok::<_, Error>(
|
|
Response::ok().set_body(Bytes::copy_from_slice(path)),
|
|
))
|
|
})
|
|
}
|
|
|
|
fn drop_payload_service() -> impl Service<Request, Response = Response<&'static str>, Error = Error>
|
|
{
|
|
fn_service(|mut req: Request| async move {
|
|
let _ = req.take_payload();
|
|
Ok::<_, Error>(Response::with_body(StatusCode::OK, "payload dropped"))
|
|
})
|
|
}
|
|
|
|
fn ignore_payload_service(
|
|
) -> impl Service<Request, Response = Response<&'static str>, Error = Error> {
|
|
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::with_body(StatusCode::OK, "ok"))))
|
|
}
|
|
|
|
fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
|
|
fn_service(|mut req: Request| {
|
|
Box::pin(async move {
|
|
use futures_util::StreamExt as _;
|
|
|
|
let mut pl = req.take_payload();
|
|
let mut body = BytesMut::new();
|
|
while let Some(chunk) = pl.next().await {
|
|
body.extend_from_slice(chunk.unwrap().chunk())
|
|
}
|
|
|
|
Ok::<_, Error>(Response::ok().set_body(body.freeze()))
|
|
})
|
|
})
|
|
}
|
|
|
|
fn ready_chunk_body_service(
|
|
chunk_polls: Rc<Cell<usize>>,
|
|
chunk_count: usize,
|
|
chunk_len: usize,
|
|
) -> impl Service<Request, Response = Response<ReadyChunkBody>, Error = Error> {
|
|
fn_service(move |_req: Request| {
|
|
ready(Ok::<_, Error>(Response::ok().set_body(
|
|
ReadyChunkBody::new(chunk_polls.clone(), chunk_count, chunk_len),
|
|
)))
|
|
})
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn late_request() {
|
|
let mut buf = TestBuffer::empty();
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::from_millis(100),
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Ready(_) => panic!("first poll should not be ready"),
|
|
Poll::Pending => {}
|
|
}
|
|
|
|
// polls: initial
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
buf.extend_read_buf("GET /abcd HTTP/1.1\r\nConnection: close\r\n\r\n");
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => panic!("second poll should not be pending"),
|
|
Poll::Ready(res) => assert!(res.is_ok()),
|
|
}
|
|
|
|
// polls: initial pending => handle req => shutdown
|
|
assert_eq!(h1.poll_count, 3);
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 0\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn oneshot_connection() {
|
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::from_millis(100),
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => panic!("first poll should not be pending"),
|
|
Poll::Ready(res) => assert!(res.is_ok()),
|
|
}
|
|
|
|
// polls: initial => shutdown
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 5
|
|
connection: close
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
/abcd
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn keep_alive_timeout() {
|
|
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Timeout(Duration::from_millis(200)),
|
|
Duration::from_millis(100),
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
assert!(
|
|
h1.as_mut().poll(cx).is_pending(),
|
|
"keep-alive should prevent poll from resolving"
|
|
);
|
|
|
|
// polls: initial
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 5\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/abcd\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
|
|
// sleep slightly longer than keep-alive timeout
|
|
sleep(Duration::from_millis(250)).await;
|
|
|
|
lazy(|cx| {
|
|
assert!(
|
|
h1.as_mut().poll(cx).is_ready(),
|
|
"keep-alive should have resolved",
|
|
);
|
|
|
|
// polls: initial => keep-alive wake-up shutdown
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
|
|
// connection closed
|
|
assert!(inner.flags.contains(Flags::SHUTDOWN));
|
|
assert!(inner.flags.contains(Flags::WRITE_DISCONNECT));
|
|
// and nothing added to write buffer
|
|
assert!(buf.write_buf_slice().is_empty());
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn keep_alive_follow_up_req() {
|
|
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Timeout(Duration::from_millis(500)),
|
|
Duration::from_millis(100),
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
assert!(
|
|
h1.as_mut().poll(cx).is_pending(),
|
|
"keep-alive should prevent poll from resolving"
|
|
);
|
|
|
|
// polls: initial
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 5\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/abcd\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
|
|
// sleep for less than KA timeout
|
|
sleep(Duration::from_millis(100)).await;
|
|
|
|
lazy(|cx| {
|
|
assert!(
|
|
h1.as_mut().poll(cx).is_pending(),
|
|
"keep-alive should not have resolved dispatcher yet",
|
|
);
|
|
|
|
// polls: initial => manual
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
|
|
// connection not closed
|
|
assert!(!inner.flags.contains(Flags::SHUTDOWN));
|
|
assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT));
|
|
// and nothing added to write buffer
|
|
assert!(buf.write_buf_slice().is_empty());
|
|
}
|
|
})
|
|
.await;
|
|
|
|
lazy(|cx| {
|
|
buf.extend_read_buf(
|
|
"\
|
|
GET /efg HTTP/1.1\r\n\
|
|
Connection: close\r\n\
|
|
\r\n\r\n",
|
|
);
|
|
|
|
assert!(
|
|
h1.as_mut().poll(cx).is_ready(),
|
|
"connection close header should override keep-alive setting",
|
|
);
|
|
|
|
// polls: initial => manual => follow-up req => shutdown
|
|
assert_eq!(h1.poll_count, 4);
|
|
|
|
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
|
|
// connection closed
|
|
assert!(inner.flags.contains(Flags::SHUTDOWN));
|
|
assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT));
|
|
}
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 4\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/efg\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn req_parse_err() {
|
|
lazy(|cx| {
|
|
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
|
|
|
|
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
ServiceConfig::default(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => panic!(),
|
|
Poll::Ready(res) => assert!(res.is_err()),
|
|
}
|
|
|
|
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
|
|
assert!(inner.flags.contains(Flags::READ_DISCONNECT));
|
|
assert_eq!(
|
|
&buf.write_buf_slice()[..26],
|
|
b"HTTP/1.1 400 Bad Request\r\n"
|
|
);
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn pipelining_ok_then_ok() {
|
|
lazy(|cx| {
|
|
let buf = TestBuffer::new(
|
|
"\
|
|
GET /abcd HTTP/1.1\r\n\r\n\
|
|
GET /def HTTP/1.1\r\n\r\n\
|
|
",
|
|
);
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::from_millis(1),
|
|
Duration::from_millis(1),
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => panic!("first poll should not be pending"),
|
|
Poll::Ready(res) => assert!(res.is_ok()),
|
|
}
|
|
|
|
// polls: initial => shutdown
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
let mut res = buf.write_buf_slice_mut();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 5\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/abcd\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 4\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/def\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn early_response_with_payload_lingers_before_closing() {
|
|
lazy(|cx| {
|
|
let buf = TestSeqBuffer::new(http_msg(
|
|
r"
|
|
GET /unfinished HTTP/1.1
|
|
Content-Length: 2
|
|
",
|
|
));
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Os,
|
|
Duration::from_millis(1),
|
|
Duration::from_millis(1),
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => {}
|
|
Poll::Ready(res) => panic!("should still be lingering: {:?}", res),
|
|
}
|
|
|
|
// polls: initial
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 11\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/unfinished\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
|
|
buf.close_read();
|
|
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn buffered_upload_ignored_by_handler_should_not_shutdown_immediately() {
|
|
lazy(|cx| {
|
|
let buf = TestSeqBuffer::new(http_msg(
|
|
r"
|
|
POST / HTTP/1.1
|
|
Content-Length: 8
|
|
|
|
ab
|
|
",
|
|
));
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Os,
|
|
Duration::from_millis(1),
|
|
Duration::from_millis(1),
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => {}
|
|
Poll::Ready(res) => panic!("closed connection early: {:?}", res),
|
|
}
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 2
|
|
connection: close
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
ok
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
|
|
buf.close_read();
|
|
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn lingering_timeout_uses_graceful_shutdown() {
|
|
let buf = TestSeqBuffer::new(
|
|
"\
|
|
POST / HTTP/1.1\r\n\
|
|
Content-Length: 8\r\n\
|
|
\r\n\
|
|
ab\
|
|
",
|
|
);
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::ZERO,
|
|
Duration::from_millis(1),
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
assert!(matches!(
|
|
timeout(Duration::from_millis(100), h1).await,
|
|
Ok(Ok(()))
|
|
));
|
|
|
|
let mut res = buf.take_write_buf().to_vec();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 2\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
ok\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn pipelining_ok_then_bad() {
|
|
lazy(|cx| {
|
|
let buf = TestBuffer::new(
|
|
"\
|
|
GET /abcd HTTP/1.1\r\n\r\n\
|
|
GET /def HTTP/1\r\n\r\n\
|
|
",
|
|
);
|
|
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::from_millis(1),
|
|
Duration::from_millis(1),
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
match h1.as_mut().poll(cx) {
|
|
Poll::Pending => panic!("first poll should not be pending"),
|
|
Poll::Ready(res) => assert!(res.is_err()),
|
|
}
|
|
|
|
// polls: initial => shutdown
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
let mut res = buf.write_buf_slice_mut();
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = b"\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 5\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
/abcd\
|
|
HTTP/1.1 400 Bad Request\r\n\
|
|
content-length: 0\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
|
";
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn expect_handling() {
|
|
lazy(|cx| {
|
|
let mut buf = TestSeqBuffer::empty();
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::ZERO,
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
buf.extend_read_buf(
|
|
"\
|
|
POST /upload HTTP/1.1\r\n\
|
|
Content-Length: 5\r\n\
|
|
Expect: 100-continue\r\n\
|
|
\r\n\
|
|
",
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
// polls: manual
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
if let DispatcherState::Normal { ref inner } = h1.inner {
|
|
let io = inner.io.as_ref().unwrap();
|
|
let res = &io.write_buf()[..];
|
|
assert_eq!(
|
|
str::from_utf8(res).unwrap(),
|
|
"HTTP/1.1 100 Continue\r\n\r\n"
|
|
);
|
|
}
|
|
|
|
buf.extend_read_buf("12345");
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
|
|
// polls: manual manual shutdown
|
|
assert_eq!(h1.poll_count, 3);
|
|
|
|
if let DispatcherState::Normal { ref inner } = h1.inner {
|
|
let io = inner.io.as_ref().unwrap();
|
|
let mut res = io.write_buf()[..].to_owned();
|
|
stabilize_date_header(&mut res);
|
|
|
|
assert_eq!(
|
|
str::from_utf8(&res).unwrap(),
|
|
"\
|
|
HTTP/1.1 100 Continue\r\n\
|
|
\r\n\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 5\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
|
|
\r\n\
|
|
12345\
|
|
"
|
|
);
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn expect_eager() {
|
|
lazy(|cx| {
|
|
let mut buf = TestSeqBuffer::empty();
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::ZERO,
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
buf.extend_read_buf(
|
|
"\
|
|
POST /upload HTTP/1.1\r\n\
|
|
Content-Length: 5\r\n\
|
|
Expect: 100-continue\r\n\
|
|
\r\n\
|
|
",
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
|
|
|
// polls: manual shutdown
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
if let DispatcherState::Normal { ref inner } = h1.inner {
|
|
let io = inner.io.as_ref().unwrap();
|
|
let mut res = io.write_buf()[..].to_owned();
|
|
stabilize_date_header(&mut res);
|
|
|
|
// Despite the content-length header and even though the request payload has not
|
|
// been sent, this test expects a complete service response since the payload
|
|
// is not used at all. The service passed to dispatcher is path echo and doesn't
|
|
// consume payload bytes.
|
|
assert_eq!(
|
|
str::from_utf8(&res).unwrap(),
|
|
"\
|
|
HTTP/1.1 100 Continue\r\n\
|
|
\r\n\
|
|
HTTP/1.1 200 OK\r\n\
|
|
content-length: 7\r\n\
|
|
connection: close\r\n\
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
|
|
\r\n\
|
|
/upload\
|
|
"
|
|
);
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn upgrade_handling() {
|
|
struct TestUpgrade;
|
|
|
|
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
|
|
type Response = ();
|
|
type Error = Error;
|
|
type Future = Ready<Result<Self::Response, Self::Error>>;
|
|
|
|
actix_service::always_ready!();
|
|
|
|
fn call(&self, (req, _framed): (Request, Framed<T, Codec>)) -> Self::Future {
|
|
assert_eq!(req.method(), Method::GET);
|
|
assert!(req.upgrade());
|
|
assert_eq!(req.headers().get("upgrade").unwrap(), "websocket");
|
|
ready(Ok(()))
|
|
}
|
|
}
|
|
|
|
lazy(|cx| {
|
|
let mut buf = TestSeqBuffer::empty();
|
|
let cfg = ServiceConfig::new(
|
|
KeepAlive::Disabled,
|
|
Duration::ZERO,
|
|
Duration::ZERO,
|
|
false,
|
|
None,
|
|
);
|
|
|
|
let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));
|
|
|
|
let h1 = Dispatcher::<_, _, _, _, TestUpgrade>::new(
|
|
buf.clone(),
|
|
services,
|
|
cfg,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
|
|
buf.extend_read_buf(
|
|
"\
|
|
GET /ws HTTP/1.1\r\n\
|
|
Connection: Upgrade\r\n\
|
|
Upgrade: websocket\r\n\
|
|
\r\n\
|
|
",
|
|
);
|
|
|
|
pin!(h1);
|
|
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
assert!(matches!(&h1.inner, DispatcherState::Upgrade { .. }));
|
|
|
|
// polls: manual shutdown
|
|
assert_eq!(h1.poll_count, 2);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
// fix in #2624 reverted temporarily
|
|
// complete fix tracked in #2745
|
|
#[ignore]
|
|
#[actix_rt::test]
|
|
async fn handler_drop_payload() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut buf = TestBuffer::new(http_msg(
|
|
r"
|
|
POST /drop-payload HTTP/1.1
|
|
Content-Length: 3
|
|
|
|
abc
|
|
",
|
|
));
|
|
|
|
let services = HttpFlow::new(
|
|
drop_payload_service(),
|
|
ExpectHandler,
|
|
None::<UpgradeHandler>,
|
|
);
|
|
|
|
let h1 = Dispatcher::new(
|
|
buf.clone(),
|
|
services,
|
|
ServiceConfig::default(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
|
|
// polls: manual
|
|
assert_eq!(h1.poll_count, 1);
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 15
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
payload dropped
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
|
|
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
|
|
assert!(inner.state.is_none());
|
|
}
|
|
})
|
|
.await;
|
|
|
|
lazy(|cx| {
|
|
// add message that claims to have payload longer than provided
|
|
buf.extend_read_buf(http_msg(
|
|
r"
|
|
POST /drop-payload HTTP/1.1
|
|
Content-Length: 200
|
|
|
|
abc
|
|
",
|
|
));
|
|
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
|
|
// polls: manual => manual
|
|
assert_eq!(h1.poll_count, 2);
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
// expect response immediately even though request side has not finished reading payload
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 15
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
payload dropped
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
})
|
|
.await;
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_ready());
|
|
|
|
// polls: manual => manual => manual
|
|
assert_eq!(h1.poll_count, 3);
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
// expect that unrequested error response is sent back since connection could not be cleaned
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 500 Internal Server Error
|
|
content-length: 0
|
|
connection: close
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
// Handler drops request payload without reading it. Server should keep reading and discarding the
|
|
// rest of the request body so clients that do not read the response until they've finished
|
|
// writing the request (like `requests` in Python) do not deadlock.
|
|
// ref. https://github.com/actix/actix-web/issues/2972
|
|
#[actix_rt::test]
|
|
async fn handler_drop_payload_drains_body() {
|
|
let _ = env_logger::try_init();
|
|
|
|
let mut buf = TestSeqBuffer::new(http_msg(
|
|
r"
|
|
POST /drop-payload HTTP/1.1
|
|
Transfer-Encoding: chunked
|
|
|
|
",
|
|
));
|
|
|
|
let services = HttpFlow::new(
|
|
drop_payload_service(),
|
|
ExpectHandler,
|
|
None::<UpgradeHandler>,
|
|
);
|
|
|
|
let h1 = Dispatcher::new(
|
|
buf.clone(),
|
|
services,
|
|
ServiceConfig::default(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(h1);
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 15
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
payload dropped
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
})
|
|
.await;
|
|
|
|
// stream a body larger than the dispatcher read buffer limit; it should still be drained
|
|
// (read + decoded + discarded) without stalling.
|
|
for _ in 0..32 {
|
|
let data = vec![b'a'; 8192];
|
|
let mut chunk = BytesMut::new();
|
|
chunk.extend_from_slice(format!("{:x}\r\n", data.len()).as_bytes());
|
|
chunk.extend_from_slice(&data);
|
|
chunk.extend_from_slice(b"\r\n");
|
|
|
|
buf.extend_read_buf(chunk);
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
assert!(buf.take_write_buf().is_empty());
|
|
assert!(buf.read_buf().is_empty());
|
|
})
|
|
.await;
|
|
}
|
|
|
|
// terminating chunk
|
|
buf.extend_read_buf(b"0\r\n\r\n");
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
assert!(buf.take_write_buf().is_empty());
|
|
assert!(buf.read_buf().is_empty());
|
|
})
|
|
.await;
|
|
|
|
// connection should be able to accept another request after draining the previous body
|
|
buf.extend_read_buf(http_msg("GET /drop-payload HTTP/1.1"));
|
|
|
|
lazy(|cx| {
|
|
assert!(h1.as_mut().poll(cx).is_pending());
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let res = &res[..];
|
|
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 15
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
|
|
payload dropped
|
|
",
|
|
);
|
|
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
})
|
|
.await;
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn allow_half_closed() {
|
|
let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1"));
|
|
buf.close_read();
|
|
let services = HttpFlow::new(YieldService, ExpectHandler, None::<UpgradeHandler>);
|
|
|
|
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
|
|
let disptacher = Dispatcher::new(
|
|
buf.clone(),
|
|
services,
|
|
ServiceConfig::default(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(disptacher);
|
|
|
|
assert!(disptacher.as_mut().poll(&mut cx).is_pending());
|
|
assert_eq!(disptacher.poll_count, 1);
|
|
|
|
assert!(disptacher.as_mut().poll(&mut cx).is_ready());
|
|
assert_eq!(disptacher.poll_count, 3);
|
|
|
|
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
stabilize_date_header(&mut res);
|
|
let exp = http_msg(
|
|
r"
|
|
HTTP/1.1 200 OK
|
|
content-length: 0
|
|
date: Thu, 01 Jan 1970 12:34:56 UTC
|
|
",
|
|
);
|
|
assert_eq!(
|
|
res,
|
|
exp,
|
|
"\nexpected response not in write buffer:\n\
|
|
response: {:?}\n\
|
|
expected: {:?}",
|
|
String::from_utf8_lossy(&res),
|
|
String::from_utf8_lossy(&exp)
|
|
);
|
|
|
|
let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project()
|
|
else {
|
|
panic!("End dispatcher state should be Normal");
|
|
};
|
|
assert!(inner.state.is_none());
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn disallow_half_closed() {
|
|
use crate::{config::ServiceConfigBuilder, h1::dispatcher::State};
|
|
|
|
let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1"));
|
|
buf.close_read();
|
|
let services = HttpFlow::new(YieldService, ExpectHandler, None::<UpgradeHandler>);
|
|
let config = ServiceConfigBuilder::new()
|
|
.h1_allow_half_closed(false)
|
|
.build();
|
|
|
|
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
|
|
let disptacher = Dispatcher::new(
|
|
buf.clone(),
|
|
services,
|
|
config,
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(disptacher);
|
|
|
|
assert!(disptacher.as_mut().poll(&mut cx).is_pending());
|
|
assert_eq!(disptacher.poll_count, 1);
|
|
|
|
assert!(disptacher.as_mut().poll(&mut cx).is_ready());
|
|
assert_eq!(disptacher.poll_count, 2);
|
|
|
|
let res = BytesMut::from(buf.take_write_buf().as_ref());
|
|
assert!(res.is_empty());
|
|
|
|
let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project()
|
|
else {
|
|
panic!("End dispatcher state should be Normal");
|
|
};
|
|
assert!(matches!(inner.state, State::ServiceCall { .. }))
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
async fn h1_write_buffer_size_limits_buffering() {
|
|
let request = "GET /stream HTTP/1.1\r\nConnection: close\r\n\r\n";
|
|
|
|
let default_polls = Rc::new(Cell::new(0));
|
|
let default_services = HttpFlow::new(
|
|
ready_chunk_body_service(default_polls.clone(), 8, 1024),
|
|
ExpectHandler,
|
|
None::<UpgradeHandler>,
|
|
);
|
|
let default_io = PendingOnceWriteBuf::new(request);
|
|
let default_dispatcher = Dispatcher::new(
|
|
default_io,
|
|
default_services,
|
|
ServiceConfig::default(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(default_dispatcher);
|
|
|
|
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
|
|
assert!(default_dispatcher.as_mut().poll(&mut cx).is_pending());
|
|
assert_eq!(default_polls.get(), 8);
|
|
|
|
let custom_polls = Rc::new(Cell::new(0));
|
|
let custom_services = HttpFlow::new(
|
|
ready_chunk_body_service(custom_polls.clone(), 8, 1024),
|
|
ExpectHandler,
|
|
None::<UpgradeHandler>,
|
|
);
|
|
let custom_io = PendingOnceWriteBuf::new(request);
|
|
let custom_dispatcher = Dispatcher::new(
|
|
custom_io,
|
|
custom_services,
|
|
crate::config::ServiceConfigBuilder::new()
|
|
.h1_write_buffer_size(1024)
|
|
.build(),
|
|
None,
|
|
OnConnectData::default(),
|
|
);
|
|
pin!(custom_dispatcher);
|
|
|
|
assert!(custom_dispatcher.as_mut().poll(&mut cx).is_pending());
|
|
assert_eq!(custom_polls.get(), 1);
|
|
}
|
|
|
|
#[actix_rt::test]
|
|
#[should_panic(expected = "HTTP/1 write buffer size must be greater than zero")]
|
|
async fn h1_write_buffer_size_rejects_zero() {
|
|
let _ = crate::config::ServiceConfigBuilder::new().h1_write_buffer_size(0);
|
|
}
|
|
|
|
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
|
|
let mut msg = msg
|
|
.as_ref()
|
|
.trim()
|
|
.split('\n')
|
|
.map(|line| [line.trim_start(), "\r"].concat())
|
|
.collect::<Vec<_>>()
|
|
.join("\n");
|
|
|
|
// remove trailing \r
|
|
msg.pop();
|
|
|
|
if !msg.is_empty() && !msg.contains("\r\n\r\n") {
|
|
msg.push_str("\r\n\r\n");
|
|
}
|
|
|
|
BytesMut::from(msg.as_bytes())
|
|
}
|
|
|
|
#[test]
|
|
fn http_msg_creates_msg() {
|
|
assert_eq!(http_msg(r""), "");
|
|
|
|
assert_eq!(
|
|
http_msg(
|
|
r"
|
|
POST / HTTP/1.1
|
|
Content-Length: 3
|
|
|
|
abc
|
|
"
|
|
),
|
|
"POST / HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc"
|
|
);
|
|
|
|
assert_eq!(
|
|
http_msg(
|
|
r"
|
|
GET / HTTP/1.1
|
|
Content-Length: 3
|
|
|
|
"
|
|
),
|
|
"GET / HTTP/1.1\r\nContent-Length: 3\r\n\r\n"
|
|
);
|
|
}
|