Compare commits

...

3 Commits

Author SHA1 Message Date
Yuki Okushi e6d09913d9
chore(multipart,derive): prepare 0.8.0 (#4027) 2026-04-18 02:12:22 +09:00
Yuki Okushi 4434a494ee
fix(multipart): count ignored fields towards `MultipartFormConfig` li… (#4026)
fix(multipart): count ignored fields towards `MultipartFormConfig` limits
2026-04-18 00:57:50 +09:00
Yuki Okushi be62050f9d
fix(multipart): set cap for parser buffering (#4025) 2026-04-18 00:24:37 +09:00
12 changed files with 324 additions and 30 deletions

4
Cargo.lock generated
View File

@ -142,7 +142,7 @@ dependencies = [
[[package]]
name = "actix-multipart"
version = "0.7.2"
version = "0.8.0"
dependencies = [
"actix-http",
"actix-multipart-derive",
@ -175,7 +175,7 @@ dependencies = [
[[package]]
name = "actix-multipart-derive"
version = "0.7.0"
version = "0.8.0"
dependencies = [
"actix-multipart",
"actix-web",

View File

@ -2,6 +2,8 @@
## Unreleased
## 0.8.0
- Minimum supported Rust version (MSRV) is now 1.88.
## 0.7.0

View File

@ -1,6 +1,6 @@
[package]
name = "actix-multipart-derive"
version = "0.7.0"
version = "0.8.0"
authors = ["Jacob Halsey <jacob@jhalsey.com>"]
description = "Multipart form derive macro for Actix Web"
keywords = ["http", "web", "framework", "async", "futures"]
@ -24,7 +24,7 @@ quote = "1"
syn = "2"
[dev-dependencies]
actix-multipart = "0.7"
actix-multipart = "0.8"
actix-web = "4"
rustversion-msrv = "0.100"
trybuild = "1"

View File

@ -5,11 +5,11 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/actix-multipart-derive?label=latest)](https://crates.io/crates/actix-multipart-derive)
[![Documentation](https://docs.rs/actix-multipart-derive/badge.svg?version=0.7.0)](https://docs.rs/actix-multipart-derive/0.7.0)
[![Documentation](https://docs.rs/actix-multipart-derive/badge.svg?version=0.8.0)](https://docs.rs/actix-multipart-derive/0.8.0)
![Version](https://img.shields.io/badge/rustc-1.88+-ab6000.svg)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-multipart-derive.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-multipart-derive/0.7.0/status.svg)](https://deps.rs/crate/actix-multipart-derive/0.7.0)
[![dependency status](https://deps.rs/crate/actix-multipart-derive/0.8.0/status.svg)](https://deps.rs/crate/actix-multipart-derive/0.8.0)
[![Download](https://img.shields.io/crates/d/actix-multipart-derive.svg)](https://crates.io/crates/actix-multipart-derive)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -227,7 +227,7 @@ pub fn impl_multipart_form(input: proc_macro::TokenStream) -> proc_macro::TokenS
::actix_multipart::MultipartError::UnknownField(field.name().unwrap().to_string())
))
} else {
quote!(::std::result::Result::Ok(()))
quote!(::actix_multipart::form::discard_field(field, limits).await)
};
// Value for duplicate action
@ -289,7 +289,7 @@ pub fn impl_multipart_form(input: proc_macro::TokenStream) -> proc_macro::TokenS
) -> ::std::pin::Pin<::std::boxed::Box<dyn ::std::future::Future<Output = ::std::result::Result<(), ::actix_multipart::MultipartError>> + 't>> {
match field.name().unwrap() {
#handle_field_impl
_ => return ::std::boxed::Box::pin(::std::future::ready(#unknown_field_result)),
_ => return ::std::boxed::Box::pin(async move { #unknown_field_result }),
}
}

View File

@ -2,8 +2,12 @@
## Unreleased
## 0.8.0
- Add multi-field multipart payload builders to `actix_multipart::test`. [#3575]
- Add `MultipartForm` support for `Option<Vec<T>>` fields. [#3577]
- Bound internal multipart parser buffering to prevent unbounded memory growth on malformed bodies.
- behavior change notice: There's now a cap for buffering (64KB). It can be changed with `MultipartConfig::buffer_limit`.
- Fix user-triggerable panic when parsing multipart boundaries.
- Minimum supported Rust version (MSRV) is now 1.88.
- Update `rand` dependency to `0.10`.

View File

@ -1,6 +1,6 @@
[package]
name = "actix-multipart"
version = "0.7.2"
version = "0.8.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Jacob Halsey <jacob@jhalsey.com>",
@ -37,7 +37,7 @@ derive = ["actix-multipart-derive"]
tempfile = ["dep:tempfile", "tokio/fs"]
[dependencies]
actix-multipart-derive = { version = "=0.7.0", optional = true }
actix-multipart-derive = { version = "=0.8.0", optional = true }
actix-utils = "3"
actix-web = { version = "4", default-features = false }

View File

@ -3,11 +3,11 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/actix-multipart?label=latest)](https://crates.io/crates/actix-multipart)
[![Documentation](https://docs.rs/actix-multipart/badge.svg?version=0.7.2)](https://docs.rs/actix-multipart/0.7.2)
[![Documentation](https://docs.rs/actix-multipart/badge.svg?version=0.8.0)](https://docs.rs/actix-multipart/0.8.0)
![Version](https://img.shields.io/badge/rustc-1.88+-ab6000.svg)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-multipart.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-multipart/0.7.2/status.svg)](https://deps.rs/crate/actix-multipart/0.7.2)
[![dependency status](https://deps.rs/crate/actix-multipart/0.8.0/status.svg)](https://deps.rs/crate/actix-multipart/0.8.0)
[![Download](https://img.shields.io/crates/d/actix-multipart.svg)](https://crates.io/crates/actix-multipart)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -82,7 +82,9 @@ where
) -> Self::Future {
if state.contains_key(&field.form_field_name) {
match duplicate_field {
DuplicateField::Ignore => return Box::pin(ready(Ok(()))),
DuplicateField::Ignore => {
return Box::pin(async move { discard_field(field, limits).await });
}
DuplicateField::Deny => {
return Box::pin(ready(Err(MultipartError::DuplicateField(
@ -159,7 +161,9 @@ where
) -> Self::Future {
if state.contains_key(&field.form_field_name) {
match duplicate_field {
DuplicateField::Ignore => return Box::pin(ready(Ok(()))),
DuplicateField::Ignore => {
return Box::pin(async move { discard_field(field, limits).await });
}
DuplicateField::Deny => {
return Box::pin(ready(Err(MultipartError::DuplicateField(
@ -312,6 +316,16 @@ impl Limits {
}
}
/// Drain a field that will not be retained while still accounting for form limits.
#[doc(hidden)]
pub async fn discard_field(mut field: Field, limits: &mut Limits) -> Result<(), MultipartError> {
while let Some(chunk) = field.try_next().await? {
limits.try_consume_limits(chunk.len(), false)?;
}
Ok(())
}
/// Typed `multipart/form-data` extractor.
///
/// To extract typed data from a multipart stream, the inner type `T` must implement the
@ -710,6 +724,32 @@ mod tests {
assert_eq!(response.status(), StatusCode::OK);
}
#[actix_rt::test]
async fn test_discarded_fields_count_towards_total_limit() {
let srv = actix_test::start(|| {
App::new()
.route("/unknown", web::post().to(test_upload_limits_memory))
.route("/duplicate", web::post().to(test_duplicate_ignore_route))
.app_data(
MultipartFormConfig::default()
.memory_limit(usize::MAX)
.total_limit(20),
)
});
let mut form = multipart::Form::default();
form.add_text("field", "7 bytes");
form.add_text("unknown", "this string is 28 bytes long");
let response = send_form(&srv, form, "/unknown").await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
let mut form = multipart::Form::default();
form.add_text("field", "first_value");
form.add_text("field", "this string is 28 bytes long");
let response = send_form(&srv, form, "/duplicate").await;
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
}
/// Test the Limits.
#[derive(MultipartForm)]
struct TestMemoryUploadLimits {

View File

@ -82,5 +82,5 @@ pub mod test;
pub use self::{
error::Error as MultipartError,
field::{Field, LimitExceeded},
multipart::Multipart,
multipart::{Multipart, MultipartConfig},
};

View File

@ -11,7 +11,7 @@ use actix_web::{
dev,
error::{ParseError, PayloadError},
http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
web::Bytes,
web::{self, Bytes},
HttpRequest,
};
use futures_core::stream::Stream;
@ -20,7 +20,7 @@ use mime::Mime;
use crate::{
error::Error,
field::InnerField,
payload::{PayloadBuffer, PayloadRef},
payload::{PayloadBuffer, PayloadRef, DEFAULT_BUFFER_LIMIT},
safety::Safety,
Field,
};
@ -44,6 +44,46 @@ enum Flow {
Error(Option<Error>),
}
/// [`Multipart`] extractor configuration.
///
/// Add to your app data to have it picked up by [`Multipart`] extractors.
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct MultipartConfig {
buffer_limit: usize,
}
impl MultipartConfig {
/// Creates a default multipart extractor configuration.
pub fn new() -> Self {
DEFAULT_CONFIG
}
/// Sets maximum internal parser buffer size. By default this limit is 64 KiB.
pub fn buffer_limit(mut self, buffer_limit: usize) -> Self {
self.buffer_limit = buffer_limit;
self
}
/// Extracts multipart config from app data. Check both `T` and `Data<T>`, in that order, and
/// fall back to the default multipart config.
fn from_req(req: &HttpRequest) -> &Self {
req.app_data::<Self>()
.or_else(|| req.app_data::<web::Data<Self>>().map(|d| d.as_ref()))
.unwrap_or(&DEFAULT_CONFIG)
}
}
static DEFAULT_CONFIG: MultipartConfig = MultipartConfig {
buffer_limit: DEFAULT_BUFFER_LIMIT,
};
impl Default for MultipartConfig {
fn default() -> Self {
Self::new()
}
}
impl Multipart {
/// Creates multipart instance from parts.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
@ -58,8 +98,15 @@ impl Multipart {
/// Creates multipart instance from parts.
pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
let config = MultipartConfig::from_req(req);
match Self::find_ct_and_boundary(req.headers()) {
Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, payload.take()),
Ok((ct, boundary)) => Self::from_ct_and_boundary_with_buffer_limit(
ct,
boundary,
payload.take(),
config.buffer_limit,
),
Err(err) => Self::from_error(err),
}
}
@ -93,13 +140,30 @@ impl Multipart {
/// Constructs a new multipart reader from given Content-Type, boundary, and stream.
pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
Self::from_ct_and_boundary_with_buffer_limit(
ct,
boundary,
stream,
DEFAULT_CONFIG.buffer_limit,
)
}
fn from_ct_and_boundary_with_buffer_limit<S>(
ct: Mime,
boundary: String,
stream: S,
buffer_limit: usize,
) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
Multipart {
safety: Safety::new(),
flow: Flow::InFlight(Inner {
payload: PayloadRef::new(PayloadBuffer::new(stream)),
payload: PayloadRef::new(PayloadBuffer::new_with_limit(stream, buffer_limit)),
content_type: ct,
boundary,
state: State::FirstBoundary,
@ -596,6 +660,18 @@ mod tests {
headers
}
fn create_multipart_with_buffer_limit(
body: impl Stream<Item = Result<Bytes, PayloadError>> + 'static,
buffer_limit: usize,
) -> Multipart {
Multipart::from_ct_and_boundary_with_buffer_limit(
"multipart/mixed; boundary=\"a\"".parse().unwrap(),
"a".to_owned(),
body,
buffer_limit,
)
}
#[actix_rt::test]
async fn empty_boundary_does_not_panic() {
let payload = stream::once(async { Ok(Bytes::from_static(b"\n")) });
@ -727,6 +803,69 @@ mod tests {
assert!(multipart.next().await.is_none());
}
#[actix_rt::test]
async fn malformed_preamble_over_buffer_limit_errors() {
let body = stream::iter(
[b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"].map(|chunk| Ok(Bytes::from_static(chunk))),
);
let mut multipart = create_multipart_with_buffer_limit(body, 16);
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn malformed_headers_over_buffer_limit_errors() {
let body = stream::iter(
[
Bytes::from_static(b"--a\r\n"),
Bytes::from_static(b"X-Long: 12345678"),
Bytes::from_static(b"9012345678901234"),
Bytes::from_static(b"5678901234567890"),
]
.map(Ok),
);
let mut multipart = create_multipart_with_buffer_limit(body, 24);
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn raw_extractor_uses_configured_buffer_limit() {
let (req, mut payload) = TestRequest::default()
.insert_header((header::CONTENT_TYPE, "multipart/mixed; boundary=\"a\""))
.app_data(MultipartConfig::default().buffer_limit(16))
.set_payload(Bytes::from_static(b"aaaaaaaabbbbbbbbcccccccc"))
.to_http_parts();
let mut multipart = Multipart::from_request(&req, &mut payload).await.unwrap();
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn valid_large_field_streams_through_small_parser_buffer() {
let mut bytes = BytesMut::new();
bytes.put(&b"--a\r\nContent-Length: 100\r\n\r\n"[..]);
bytes.put(&[b'x'; 100][..]);
bytes.put(&b"\r\n--a--\r\n"[..]);
let body = stream::once(async { Ok(bytes.freeze()) });
let mut multipart = create_multipart_with_buffer_limit(body, 32);
let mut field = multipart.next().await.unwrap().unwrap();
assert_eq!(
get_whole_field(&mut field).await,
Bytes::from(vec![b'x'; 100])
);
drop(field);
assert!(multipart.next().await.is_none());
}
#[actix_rt::test]
async fn test_multipart_no_end_crlf() {
let (sender, payload) = create_stream();

View File

@ -14,6 +14,9 @@ use futures_core::stream::{LocalBoxStream, Stream};
use crate::{error::Error, safety::Safety};
pub(crate) const DEFAULT_BUFFER_LIMIT: usize = 65_536; // 64 KiB
const MAX_READY_CHUNKS_PER_POLL: usize = 16;
pub(crate) struct PayloadRef {
payload: Rc<RefCell<PayloadBuffer>>,
}
@ -45,31 +48,64 @@ impl Clone for PayloadRef {
/// Payload buffer.
pub(crate) struct PayloadBuffer {
pub(crate) stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
pending: Option<Bytes>,
pub(crate) buf: BytesMut,
buffer_limit: usize,
/// EOF flag. If true, no more payload reads will be attempted.
pub(crate) eof: bool,
}
impl PayloadBuffer {
/// Constructs new payload buffer.
pub(crate) fn new<S>(stream: S) -> Self
pub(crate) fn new_with_limit<S>(stream: S, buffer_limit: usize) -> Self
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
PayloadBuffer {
stream: Box::pin(stream),
pending: None,
buf: BytesMut::with_capacity(1_024), // pre-allocate 1KiB
buffer_limit,
eof: false,
}
}
/// Polls a bounded amount of payload into the parser buffer.
///
/// This does not drain the stream to EOF in one call. Callers must be prepared to poll again
/// after consuming buffered data.
pub(crate) fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
loop {
if self.buffer_limit == 0 {
return Err(PayloadError::Overflow);
}
let mut appended = false;
for _ in 0..MAX_READY_CHUNKS_PER_POLL {
if self.pending.is_some() {
appended |= self.append_pending()?;
if self.pending.is_some() || self.buf.len() >= self.buffer_limit {
if appended {
cx.waker().wake_by_ref();
}
return Ok(());
}
continue;
}
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => {
self.buf.extend_from_slice(&data);
// try to read more data
continue;
self.pending = Some(data);
appended |= self.append_pending()?;
if self.pending.is_some() || self.buf.len() >= self.buffer_limit {
if appended {
cx.waker().wake_by_ref();
}
return Ok(());
}
}
Poll::Ready(Some(Err(err))) => return Err(err),
Poll::Ready(None) => {
@ -79,6 +115,40 @@ impl PayloadBuffer {
Poll::Pending => return Ok(()),
}
}
if appended {
cx.waker().wake_by_ref();
}
Ok(())
}
fn append_pending(&mut self) -> Result<bool, PayloadError> {
let Some(mut data) = self.pending.take() else {
return Ok(false);
};
if data.is_empty() {
return Ok(false);
}
if self.buf.len() >= self.buffer_limit {
self.pending = Some(data);
return Err(PayloadError::Overflow);
}
let available = self.buffer_limit - self.buf.len();
let len = cmp::min(data.len(), available);
if len == data.len() {
self.buf.extend_from_slice(&data);
} else {
let chunk = data.split_to(len);
self.buf.extend_from_slice(&chunk);
self.pending = Some(data);
}
Ok(len != 0)
}
/// Reads exact number of bytes.
@ -162,7 +232,7 @@ mod tests {
#[actix_rt::test]
async fn basic() {
let (_, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(payload.buf.len(), 0);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
@ -172,7 +242,7 @@ mod tests {
#[actix_rt::test]
async fn eof() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_max(4).unwrap());
sender.feed_data(Bytes::from("data"));
@ -181,6 +251,8 @@ mod tests {
assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
assert_eq!(payload.buf.len(), 0);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert!(payload.read_max(1).is_err());
assert!(payload.eof);
}
@ -188,7 +260,7 @@ mod tests {
#[actix_rt::test]
async fn err() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_max(1).unwrap());
sender.set_error(PayloadError::Incomplete(None));
lazy(|cx| payload.poll_stream(cx)).await.err().unwrap();
@ -197,11 +269,12 @@ mod tests {
#[actix_rt::test]
async fn read_max() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf.len(), 10);
assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
@ -214,13 +287,14 @@ mod tests {
#[actix_rt::test]
async fn read_exactly() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_exact(2));
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
assert_eq!(payload.buf.len(), 8);
@ -232,13 +306,14 @@ mod tests {
#[actix_rt::test]
async fn read_until() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_until(b"ne").unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(
Some(Bytes::from("line")),
@ -252,4 +327,38 @@ mod tests {
);
assert_eq!(payload.buf.len(), 0);
}
#[actix_rt::test]
async fn poll_stream_does_not_exceed_buffer_limit() {
let stream = futures_util::stream::iter([
Ok(Bytes::from_static(b"12345678")),
Ok(Bytes::from_static(b"abcdefgh")),
Ok(Bytes::from_static(b"overflow")),
]);
let mut payload = PayloadBuffer::new_with_limit(stream, 16);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf.len(), 16);
let err = lazy(|cx| payload.poll_stream(cx)).await.unwrap_err();
assert!(matches!(err, PayloadError::Overflow));
assert_eq!(payload.buf.len(), 16);
}
#[actix_rt::test]
async fn oversized_chunk_can_be_consumed_incrementally() {
let stream = futures_util::stream::once(async { Ok(Bytes::from_static(b"12345678")) });
let mut payload = PayloadBuffer::new_with_limit(stream, 4);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf, Bytes::from_static(b"1234"));
assert_eq!(payload.read_max(4).unwrap().unwrap(), "1234");
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf, Bytes::from_static(b"5678"));
assert_eq!(payload.read_max(4).unwrap().unwrap(), "5678");
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert!(payload.eof);
}
}