From bb8874cf5953c4a075129b5dfd32aa223365f525 Mon Sep 17 00:00:00 2001 From: Thales Fragoso Date: Tue, 23 Sep 2025 20:58:07 -0300 Subject: [PATCH] Add tests for h1 half closures Signed-off-by: Thales Fragoso --- actix-http/src/h1/dispatcher_tests.rs | 116 +++++++++++++++++++++++++- actix-http/src/test.rs | 42 ++++++++-- 2 files changed, 147 insertions(+), 11 deletions(-) diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 50259e6ce..5095e0b47 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,4 +1,10 @@ -use std::{future::Future, str, task::Poll, time::Duration}; +use std::{ + future::Future, + pin::Pin, + str, + task::{Context, Poll}, + time::Duration, +}; use actix_codec::Framed; use actix_rt::{pin, time::sleep}; @@ -9,7 +15,7 @@ use futures_util::future::lazy; use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags}; use crate::{ - body::MessageBody, + body::{BoxBody, MessageBody}, config::ServiceConfig, h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, @@ -17,6 +23,26 @@ use crate::{ Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode, }; +struct YieldService; + +impl Service for YieldService { + type Response = Response; + type Error = Response; + type Future = Pin>>>; + + actix_service::always_ready!(); + + fn call(&self, _: Request) -> Self::Future { + Box::pin(async { + // Yield twice because the dispatcher can poll the service twice per dispatcher's poll: + // once in `handle_request` and another in `poll_response` + actix_rt::task::yield_now().await; + actix_rt::task::yield_now().await; + Ok(Response::ok()) + }) + } +} + fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { memchr::memmem::find(&haystack[from..], needle) } @@ -924,6 +950,92 @@ async fn handler_drop_payload() { .await; } +#[actix_rt::test] +async fn allow_half_closed() { + let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); + buf.close_read(); + let services = HttpFlow::new(YieldService, ExpectHandler, None::); + + let mut cx = Context::from_waker(futures_util::task::noop_waker_ref()); + let disptacher = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(disptacher); + + assert!(disptacher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(disptacher.poll_count, 1); + + assert!(disptacher.as_mut().poll(&mut cx).is_ready()); + assert_eq!(disptacher.poll_count, 3); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 0 + date: Thu, 01 Jan 1970 12:34:56 UTC + ", + ); + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(&res), + String::from_utf8_lossy(&exp) + ); + + let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project() + else { + panic!("End dispatcher state should be Normal"); + }; + assert!(inner.state.is_none()); +} + +#[actix_rt::test] +async fn disallow_half_closed() { + use crate::config::ServiceConfigBuilder; + use crate::h1::dispatcher::State; + + let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); + buf.close_read(); + let services = HttpFlow::new(YieldService, ExpectHandler, None::); + let config = ServiceConfigBuilder::new() + .h1_allow_half_closed(false) + .build(); + + let mut cx = Context::from_waker(futures_util::task::noop_waker_ref()); + let disptacher = Dispatcher::new( + buf.clone(), + services, + config, + None, + OnConnectData::default(), + ); + pin!(disptacher); + + assert!(disptacher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(disptacher.poll_count, 1); + + assert!(disptacher.as_mut().poll(&mut cx).is_ready()); + assert_eq!(disptacher.poll_count, 2); + + let res = BytesMut::from(buf.take_write_buf().as_ref()); + assert!(res.is_empty()); + + let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project() + else { + panic!("End dispatcher state should be Normal"); + }; + assert!(matches!(inner.state, State::ServiceCall { .. })) +} + fn http_msg(msg: impl AsRef) -> BytesMut { let mut msg = msg .as_ref() diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 5632a310c..32d09d12c 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -275,6 +275,7 @@ impl TestSeqBuffer { { Self(Rc::new(RefCell::new(TestSeqInner { read_buf: data.into(), + read_closed: false, write_buf: BytesMut::new(), err: None, }))) @@ -293,36 +294,59 @@ impl TestSeqBuffer { Ref::map(self.0.borrow(), |inner| &inner.write_buf) } + pub fn take_write_buf(&self) -> Bytes { + self.0.borrow_mut().write_buf.split().freeze() + } + pub fn err(&self) -> Ref<'_, Option> { Ref::map(self.0.borrow(), |inner| &inner.err) } /// Add data to read buffer. + /// + /// # Panics + /// + /// Panics if called after [`TestSeqBuffer::close_read`] has been called pub fn extend_read_buf>(&mut self, data: T) { - self.0 - .borrow_mut() - .read_buf - .extend_from_slice(data.as_ref()) + let mut inner = self.0.borrow_mut(); + if inner.read_closed { + panic!("Tried to extend the read buffer after calling close_read"); + } + + inner.read_buf.extend_from_slice(data.as_ref()) + } + + /// Closes the [`AsyncRead`]/[`Read`] part of this test buffer. + /// + /// The current data in the buffer will still be returned by a call to read/poll_read, however, + /// after the buffer is empty, it will return `Ok(0)` to signify the EOF condition + pub fn close_read(&self) { + self.0.borrow_mut().read_closed = true; } } pub struct TestSeqInner { read_buf: BytesMut, + read_closed: bool, write_buf: BytesMut, err: Option, } impl io::Read for TestSeqBuffer { fn read(&mut self, dst: &mut [u8]) -> Result { - if self.0.borrow().read_buf.is_empty() { - if self.0.borrow().err.is_some() { - Err(self.0.borrow_mut().err.take().unwrap()) + let mut inner = self.0.borrow_mut(); + + if inner.read_buf.is_empty() { + if let Some(e) = inner.err.take() { + Err(e) + } else if inner.read_closed { + Ok(0) } else { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } } else { - let size = std::cmp::min(self.0.borrow().read_buf.len(), dst.len()); - let b = self.0.borrow_mut().read_buf.split_to(size); + let size = std::cmp::min(inner.read_buf.len(), dst.len()); + let b = inner.read_buf.split_to(size); dst[..size].copy_from_slice(&b); Ok(size) }