Add tests for h1 half closures

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>
This commit is contained in:
Thales Fragoso 2025-09-23 20:58:07 -03:00 committed by Yuki Okushi
parent fa5fdcbfea
commit bb8874cf59
2 changed files with 147 additions and 11 deletions

View File

@ -1,4 +1,10 @@
use std::{future::Future, str, task::Poll, time::Duration};
use std::{
future::Future,
pin::Pin,
str,
task::{Context, Poll},
time::Duration,
};
use actix_codec::Framed;
use actix_rt::{pin, time::sleep};
@ -9,7 +15,7 @@ use futures_util::future::lazy;
use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags};
use crate::{
body::MessageBody,
body::{BoxBody, MessageBody},
config::ServiceConfig,
h1::{Codec, ExpectHandler, UpgradeHandler},
service::HttpFlow,
@ -17,6 +23,26 @@ use crate::{
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode,
};
struct YieldService;
impl Service<Request> for YieldService {
type Response = Response<BoxBody>;
type Error = Response<BoxBody>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
actix_service::always_ready!();
fn call(&self, _: Request) -> Self::Future {
Box::pin(async {
// Yield twice because the dispatcher can poll the service twice per dispatcher's poll:
// once in `handle_request` and another in `poll_response`
actix_rt::task::yield_now().await;
actix_rt::task::yield_now().await;
Ok(Response::ok())
})
}
}
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
memchr::memmem::find(&haystack[from..], needle)
}
@ -924,6 +950,92 @@ async fn handler_drop_payload() {
.await;
}
#[actix_rt::test]
async fn allow_half_closed() {
let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1"));
buf.close_read();
let services = HttpFlow::new(YieldService, ExpectHandler, None::<UpgradeHandler>);
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
let disptacher = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(disptacher);
assert!(disptacher.as_mut().poll(&mut cx).is_pending());
assert_eq!(disptacher.poll_count, 1);
assert!(disptacher.as_mut().poll(&mut cx).is_ready());
assert_eq!(disptacher.poll_count, 3);
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 0
date: Thu, 01 Jan 1970 12:34:56 UTC
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(&res),
String::from_utf8_lossy(&exp)
);
let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project()
else {
panic!("End dispatcher state should be Normal");
};
assert!(inner.state.is_none());
}
#[actix_rt::test]
async fn disallow_half_closed() {
use crate::config::ServiceConfigBuilder;
use crate::h1::dispatcher::State;
let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1"));
buf.close_read();
let services = HttpFlow::new(YieldService, ExpectHandler, None::<UpgradeHandler>);
let config = ServiceConfigBuilder::new()
.h1_allow_half_closed(false)
.build();
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
let disptacher = Dispatcher::new(
buf.clone(),
services,
config,
None,
OnConnectData::default(),
);
pin!(disptacher);
assert!(disptacher.as_mut().poll(&mut cx).is_pending());
assert_eq!(disptacher.poll_count, 1);
assert!(disptacher.as_mut().poll(&mut cx).is_ready());
assert_eq!(disptacher.poll_count, 2);
let res = BytesMut::from(buf.take_write_buf().as_ref());
assert!(res.is_empty());
let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project()
else {
panic!("End dispatcher state should be Normal");
};
assert!(matches!(inner.state, State::ServiceCall { .. }))
}
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
let mut msg = msg
.as_ref()

View File

@ -275,6 +275,7 @@ impl TestSeqBuffer {
{
Self(Rc::new(RefCell::new(TestSeqInner {
read_buf: data.into(),
read_closed: false,
write_buf: BytesMut::new(),
err: None,
})))
@ -293,36 +294,59 @@ impl TestSeqBuffer {
Ref::map(self.0.borrow(), |inner| &inner.write_buf)
}
pub fn take_write_buf(&self) -> Bytes {
self.0.borrow_mut().write_buf.split().freeze()
}
pub fn err(&self) -> Ref<'_, Option<io::Error>> {
Ref::map(self.0.borrow(), |inner| &inner.err)
}
/// Add data to read buffer.
///
/// # Panics
///
/// Panics if called after [`TestSeqBuffer::close_read`] has been called
pub fn extend_read_buf<T: AsRef<[u8]>>(&mut self, data: T) {
self.0
.borrow_mut()
.read_buf
.extend_from_slice(data.as_ref())
let mut inner = self.0.borrow_mut();
if inner.read_closed {
panic!("Tried to extend the read buffer after calling close_read");
}
inner.read_buf.extend_from_slice(data.as_ref())
}
/// Closes the [`AsyncRead`]/[`Read`] part of this test buffer.
///
/// The current data in the buffer will still be returned by a call to read/poll_read, however,
/// after the buffer is empty, it will return `Ok(0)` to signify the EOF condition
pub fn close_read(&self) {
self.0.borrow_mut().read_closed = true;
}
}
pub struct TestSeqInner {
read_buf: BytesMut,
read_closed: bool,
write_buf: BytesMut,
err: Option<io::Error>,
}
impl io::Read for TestSeqBuffer {
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
if self.0.borrow().read_buf.is_empty() {
if self.0.borrow().err.is_some() {
Err(self.0.borrow_mut().err.take().unwrap())
let mut inner = self.0.borrow_mut();
if inner.read_buf.is_empty() {
if let Some(e) = inner.err.take() {
Err(e)
} else if inner.read_closed {
Ok(0)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
} else {
let size = std::cmp::min(self.0.borrow().read_buf.len(), dst.len());
let b = self.0.borrow_mut().read_buf.split_to(size);
let size = std::cmp::min(inner.read_buf.len(), dst.len());
let b = inner.read_buf.split_to(size);
dst[..size].copy_from_slice(&b);
Ok(size)
}