fix(multipart): set cap for parser buffering (#4025)

This commit is contained in:
Yuki Okushi 2026-04-18 00:24:37 +09:00 committed by GitHub
parent be4566d669
commit be62050f9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 266 additions and 16 deletions

View File

@ -4,6 +4,8 @@
- Add multi-field multipart payload builders to `actix_multipart::test`. [#3575]
- Add `MultipartForm` support for `Option<Vec<T>>` fields. [#3577]
- Bound internal multipart parser buffering to prevent unbounded memory growth on malformed bodies.
- behavior change notice: There's now a cap for buffering (64KB). It can be changed with `MultipartConfig::buffer_limit`.
- Fix user-triggerable panic when parsing multipart boundaries.
- Minimum supported Rust version (MSRV) is now 1.88.
- Update `rand` dependency to `0.10`.

View File

@ -82,5 +82,5 @@ pub mod test;
pub use self::{
error::Error as MultipartError,
field::{Field, LimitExceeded},
multipart::Multipart,
multipart::{Multipart, MultipartConfig},
};

View File

@ -11,7 +11,7 @@ use actix_web::{
dev,
error::{ParseError, PayloadError},
http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
web::Bytes,
web::{self, Bytes},
HttpRequest,
};
use futures_core::stream::Stream;
@ -20,7 +20,7 @@ use mime::Mime;
use crate::{
error::Error,
field::InnerField,
payload::{PayloadBuffer, PayloadRef},
payload::{PayloadBuffer, PayloadRef, DEFAULT_BUFFER_LIMIT},
safety::Safety,
Field,
};
@ -44,6 +44,46 @@ enum Flow {
Error(Option<Error>),
}
/// [`Multipart`] extractor configuration.
///
/// Add to your app data to have it picked up by [`Multipart`] extractors.
#[derive(Clone, Copy, Debug)]
#[non_exhaustive]
pub struct MultipartConfig {
buffer_limit: usize,
}
impl MultipartConfig {
/// Creates a default multipart extractor configuration.
pub fn new() -> Self {
DEFAULT_CONFIG
}
/// Sets maximum internal parser buffer size. By default this limit is 64 KiB.
pub fn buffer_limit(mut self, buffer_limit: usize) -> Self {
self.buffer_limit = buffer_limit;
self
}
/// Extracts multipart config from app data. Check both `T` and `Data<T>`, in that order, and
/// fall back to the default multipart config.
fn from_req(req: &HttpRequest) -> &Self {
req.app_data::<Self>()
.or_else(|| req.app_data::<web::Data<Self>>().map(|d| d.as_ref()))
.unwrap_or(&DEFAULT_CONFIG)
}
}
static DEFAULT_CONFIG: MultipartConfig = MultipartConfig {
buffer_limit: DEFAULT_BUFFER_LIMIT,
};
impl Default for MultipartConfig {
fn default() -> Self {
Self::new()
}
}
impl Multipart {
/// Creates multipart instance from parts.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Self
@ -58,8 +98,15 @@ impl Multipart {
/// Creates multipart instance from parts.
pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self {
let config = MultipartConfig::from_req(req);
match Self::find_ct_and_boundary(req.headers()) {
Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, payload.take()),
Ok((ct, boundary)) => Self::from_ct_and_boundary_with_buffer_limit(
ct,
boundary,
payload.take(),
config.buffer_limit,
),
Err(err) => Self::from_error(err),
}
}
@ -93,13 +140,30 @@ impl Multipart {
/// Constructs a new multipart reader from given Content-Type, boundary, and stream.
pub(crate) fn from_ct_and_boundary<S>(ct: Mime, boundary: String, stream: S) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
Self::from_ct_and_boundary_with_buffer_limit(
ct,
boundary,
stream,
DEFAULT_CONFIG.buffer_limit,
)
}
fn from_ct_and_boundary_with_buffer_limit<S>(
ct: Mime,
boundary: String,
stream: S,
buffer_limit: usize,
) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
Multipart {
safety: Safety::new(),
flow: Flow::InFlight(Inner {
payload: PayloadRef::new(PayloadBuffer::new(stream)),
payload: PayloadRef::new(PayloadBuffer::new_with_limit(stream, buffer_limit)),
content_type: ct,
boundary,
state: State::FirstBoundary,
@ -596,6 +660,18 @@ mod tests {
headers
}
fn create_multipart_with_buffer_limit(
body: impl Stream<Item = Result<Bytes, PayloadError>> + 'static,
buffer_limit: usize,
) -> Multipart {
Multipart::from_ct_and_boundary_with_buffer_limit(
"multipart/mixed; boundary=\"a\"".parse().unwrap(),
"a".to_owned(),
body,
buffer_limit,
)
}
#[actix_rt::test]
async fn empty_boundary_does_not_panic() {
let payload = stream::once(async { Ok(Bytes::from_static(b"\n")) });
@ -727,6 +803,69 @@ mod tests {
assert!(multipart.next().await.is_none());
}
#[actix_rt::test]
async fn malformed_preamble_over_buffer_limit_errors() {
let body = stream::iter(
[b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"].map(|chunk| Ok(Bytes::from_static(chunk))),
);
let mut multipart = create_multipart_with_buffer_limit(body, 16);
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn malformed_headers_over_buffer_limit_errors() {
let body = stream::iter(
[
Bytes::from_static(b"--a\r\n"),
Bytes::from_static(b"X-Long: 12345678"),
Bytes::from_static(b"9012345678901234"),
Bytes::from_static(b"5678901234567890"),
]
.map(Ok),
);
let mut multipart = create_multipart_with_buffer_limit(body, 24);
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn raw_extractor_uses_configured_buffer_limit() {
let (req, mut payload) = TestRequest::default()
.insert_header((header::CONTENT_TYPE, "multipart/mixed; boundary=\"a\""))
.app_data(MultipartConfig::default().buffer_limit(16))
.set_payload(Bytes::from_static(b"aaaaaaaabbbbbbbbcccccccc"))
.to_http_parts();
let mut multipart = Multipart::from_request(&req, &mut payload).await.unwrap();
let res = multipart.next().await.unwrap();
assert_matches!(res, Err(Error::Payload(PayloadError::Overflow)));
}
#[actix_rt::test]
async fn valid_large_field_streams_through_small_parser_buffer() {
let mut bytes = BytesMut::new();
bytes.put(&b"--a\r\nContent-Length: 100\r\n\r\n"[..]);
bytes.put(&[b'x'; 100][..]);
bytes.put(&b"\r\n--a--\r\n"[..]);
let body = stream::once(async { Ok(bytes.freeze()) });
let mut multipart = create_multipart_with_buffer_limit(body, 32);
let mut field = multipart.next().await.unwrap().unwrap();
assert_eq!(
get_whole_field(&mut field).await,
Bytes::from(vec![b'x'; 100])
);
drop(field);
assert!(multipart.next().await.is_none());
}
#[actix_rt::test]
async fn test_multipart_no_end_crlf() {
let (sender, payload) = create_stream();

View File

@ -14,6 +14,9 @@ use futures_core::stream::{LocalBoxStream, Stream};
use crate::{error::Error, safety::Safety};
pub(crate) const DEFAULT_BUFFER_LIMIT: usize = 65_536; // 64 KiB
const MAX_READY_CHUNKS_PER_POLL: usize = 16;
pub(crate) struct PayloadRef {
payload: Rc<RefCell<PayloadBuffer>>,
}
@ -45,31 +48,64 @@ impl Clone for PayloadRef {
/// Payload buffer.
pub(crate) struct PayloadBuffer {
pub(crate) stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
pending: Option<Bytes>,
pub(crate) buf: BytesMut,
buffer_limit: usize,
/// EOF flag. If true, no more payload reads will be attempted.
pub(crate) eof: bool,
}
impl PayloadBuffer {
/// Constructs new payload buffer.
pub(crate) fn new<S>(stream: S) -> Self
pub(crate) fn new_with_limit<S>(stream: S, buffer_limit: usize) -> Self
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
PayloadBuffer {
stream: Box::pin(stream),
pending: None,
buf: BytesMut::with_capacity(1_024), // pre-allocate 1KiB
buffer_limit,
eof: false,
}
}
/// Polls a bounded amount of payload into the parser buffer.
///
/// This does not drain the stream to EOF in one call. Callers must be prepared to poll again
/// after consuming buffered data.
pub(crate) fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
loop {
if self.buffer_limit == 0 {
return Err(PayloadError::Overflow);
}
let mut appended = false;
for _ in 0..MAX_READY_CHUNKS_PER_POLL {
if self.pending.is_some() {
appended |= self.append_pending()?;
if self.pending.is_some() || self.buf.len() >= self.buffer_limit {
if appended {
cx.waker().wake_by_ref();
}
return Ok(());
}
continue;
}
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => {
self.buf.extend_from_slice(&data);
// try to read more data
continue;
self.pending = Some(data);
appended |= self.append_pending()?;
if self.pending.is_some() || self.buf.len() >= self.buffer_limit {
if appended {
cx.waker().wake_by_ref();
}
return Ok(());
}
}
Poll::Ready(Some(Err(err))) => return Err(err),
Poll::Ready(None) => {
@ -79,6 +115,40 @@ impl PayloadBuffer {
Poll::Pending => return Ok(()),
}
}
if appended {
cx.waker().wake_by_ref();
}
Ok(())
}
fn append_pending(&mut self) -> Result<bool, PayloadError> {
let Some(mut data) = self.pending.take() else {
return Ok(false);
};
if data.is_empty() {
return Ok(false);
}
if self.buf.len() >= self.buffer_limit {
self.pending = Some(data);
return Err(PayloadError::Overflow);
}
let available = self.buffer_limit - self.buf.len();
let len = cmp::min(data.len(), available);
if len == data.len() {
self.buf.extend_from_slice(&data);
} else {
let chunk = data.split_to(len);
self.buf.extend_from_slice(&chunk);
self.pending = Some(data);
}
Ok(len != 0)
}
/// Reads exact number of bytes.
@ -162,7 +232,7 @@ mod tests {
#[actix_rt::test]
async fn basic() {
let (_, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(payload.buf.len(), 0);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
@ -172,7 +242,7 @@ mod tests {
#[actix_rt::test]
async fn eof() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_max(4).unwrap());
sender.feed_data(Bytes::from("data"));
@ -181,6 +251,8 @@ mod tests {
assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap());
assert_eq!(payload.buf.len(), 0);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert!(payload.read_max(1).is_err());
assert!(payload.eof);
}
@ -188,7 +260,7 @@ mod tests {
#[actix_rt::test]
async fn err() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_max(1).unwrap());
sender.set_error(PayloadError::Incomplete(None));
lazy(|cx| payload.poll_stream(cx)).await.err().unwrap();
@ -197,11 +269,12 @@ mod tests {
#[actix_rt::test]
async fn read_max() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf.len(), 10);
assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap());
@ -214,13 +287,14 @@ mod tests {
#[actix_rt::test]
async fn read_exactly() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_exact(2));
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2));
assert_eq!(payload.buf.len(), 8);
@ -232,13 +306,14 @@ mod tests {
#[actix_rt::test]
async fn read_until() {
let (mut sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);
assert_eq!(None, payload.read_until(b"ne").unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(
Some(Bytes::from("line")),
@ -252,4 +327,38 @@ mod tests {
);
assert_eq!(payload.buf.len(), 0);
}
#[actix_rt::test]
async fn poll_stream_does_not_exceed_buffer_limit() {
let stream = futures_util::stream::iter([
Ok(Bytes::from_static(b"12345678")),
Ok(Bytes::from_static(b"abcdefgh")),
Ok(Bytes::from_static(b"overflow")),
]);
let mut payload = PayloadBuffer::new_with_limit(stream, 16);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf.len(), 16);
let err = lazy(|cx| payload.poll_stream(cx)).await.unwrap_err();
assert!(matches!(err, PayloadError::Overflow));
assert_eq!(payload.buf.len(), 16);
}
#[actix_rt::test]
async fn oversized_chunk_can_be_consumed_incrementally() {
let stream = futures_util::stream::once(async { Ok(Bytes::from_static(b"12345678")) });
let mut payload = PayloadBuffer::new_with_limit(stream, 4);
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf, Bytes::from_static(b"1234"));
assert_eq!(payload.read_max(4).unwrap().unwrap(), "1234");
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert_eq!(payload.buf, Bytes::from_static(b"5678"));
assert_eq!(payload.read_max(4).unwrap().unwrap(), "5678");
lazy(|cx| payload.poll_stream(cx)).await.unwrap();
assert!(payload.eof);
}
}