diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index 4445bcd1e..1d5ddf55b 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -4,6 +4,8 @@ - Add multi-field multipart payload builders to `actix_multipart::test`. [#3575] - Add `MultipartForm` support for `Option>` fields. [#3577] +- Bound internal multipart parser buffering to prevent unbounded memory growth on malformed bodies. + - behavior change notice: There's now a cap for buffering (64KB). It can be changed with `MultipartConfig::buffer_limit`. - Fix user-triggerable panic when parsing multipart boundaries. - Minimum supported Rust version (MSRV) is now 1.88. - Update `rand` dependency to `0.10`. diff --git a/actix-multipart/src/lib.rs b/actix-multipart/src/lib.rs index ca5166d33..e7830b5e4 100644 --- a/actix-multipart/src/lib.rs +++ b/actix-multipart/src/lib.rs @@ -82,5 +82,5 @@ pub mod test; pub use self::{ error::Error as MultipartError, field::{Field, LimitExceeded}, - multipart::Multipart, + multipart::{Multipart, MultipartConfig}, }; diff --git a/actix-multipart/src/multipart.rs b/actix-multipart/src/multipart.rs index be0bc59f0..bde7d122f 100644 --- a/actix-multipart/src/multipart.rs +++ b/actix-multipart/src/multipart.rs @@ -11,7 +11,7 @@ use actix_web::{ dev, error::{ParseError, PayloadError}, http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue}, - web::Bytes, + web::{self, Bytes}, HttpRequest, }; use futures_core::stream::Stream; @@ -20,7 +20,7 @@ use mime::Mime; use crate::{ error::Error, field::InnerField, - payload::{PayloadBuffer, PayloadRef}, + payload::{PayloadBuffer, PayloadRef, DEFAULT_BUFFER_LIMIT}, safety::Safety, Field, }; @@ -44,6 +44,46 @@ enum Flow { Error(Option), } +/// [`Multipart`] extractor configuration. +/// +/// Add to your app data to have it picked up by [`Multipart`] extractors. +#[derive(Clone, Copy, Debug)] +#[non_exhaustive] +pub struct MultipartConfig { + buffer_limit: usize, +} + +impl MultipartConfig { + /// Creates a default multipart extractor configuration. + pub fn new() -> Self { + DEFAULT_CONFIG + } + + /// Sets maximum internal parser buffer size. By default this limit is 64 KiB. + pub fn buffer_limit(mut self, buffer_limit: usize) -> Self { + self.buffer_limit = buffer_limit; + self + } + + /// Extracts multipart config from app data. Check both `T` and `Data`, in that order, and + /// fall back to the default multipart config. + fn from_req(req: &HttpRequest) -> &Self { + req.app_data::() + .or_else(|| req.app_data::>().map(|d| d.as_ref())) + .unwrap_or(&DEFAULT_CONFIG) + } +} + +static DEFAULT_CONFIG: MultipartConfig = MultipartConfig { + buffer_limit: DEFAULT_BUFFER_LIMIT, +}; + +impl Default for MultipartConfig { + fn default() -> Self { + Self::new() + } +} + impl Multipart { /// Creates multipart instance from parts. pub fn new(headers: &HeaderMap, stream: S) -> Self @@ -58,8 +98,15 @@ impl Multipart { /// Creates multipart instance from parts. pub(crate) fn from_req(req: &HttpRequest, payload: &mut dev::Payload) -> Self { + let config = MultipartConfig::from_req(req); + match Self::find_ct_and_boundary(req.headers()) { - Ok((ct, boundary)) => Self::from_ct_and_boundary(ct, boundary, payload.take()), + Ok((ct, boundary)) => Self::from_ct_and_boundary_with_buffer_limit( + ct, + boundary, + payload.take(), + config.buffer_limit, + ), Err(err) => Self::from_error(err), } } @@ -93,13 +140,30 @@ impl Multipart { /// Constructs a new multipart reader from given Content-Type, boundary, and stream. pub(crate) fn from_ct_and_boundary(ct: Mime, boundary: String, stream: S) -> Multipart + where + S: Stream> + 'static, + { + Self::from_ct_and_boundary_with_buffer_limit( + ct, + boundary, + stream, + DEFAULT_CONFIG.buffer_limit, + ) + } + + fn from_ct_and_boundary_with_buffer_limit( + ct: Mime, + boundary: String, + stream: S, + buffer_limit: usize, + ) -> Multipart where S: Stream> + 'static, { Multipart { safety: Safety::new(), flow: Flow::InFlight(Inner { - payload: PayloadRef::new(PayloadBuffer::new(stream)), + payload: PayloadRef::new(PayloadBuffer::new_with_limit(stream, buffer_limit)), content_type: ct, boundary, state: State::FirstBoundary, @@ -596,6 +660,18 @@ mod tests { headers } + fn create_multipart_with_buffer_limit( + body: impl Stream> + 'static, + buffer_limit: usize, + ) -> Multipart { + Multipart::from_ct_and_boundary_with_buffer_limit( + "multipart/mixed; boundary=\"a\"".parse().unwrap(), + "a".to_owned(), + body, + buffer_limit, + ) + } + #[actix_rt::test] async fn empty_boundary_does_not_panic() { let payload = stream::once(async { Ok(Bytes::from_static(b"\n")) }); @@ -727,6 +803,69 @@ mod tests { assert!(multipart.next().await.is_none()); } + #[actix_rt::test] + async fn malformed_preamble_over_buffer_limit_errors() { + let body = stream::iter( + [b"aaaaaaaa", b"bbbbbbbb", b"cccccccc"].map(|chunk| Ok(Bytes::from_static(chunk))), + ); + + let mut multipart = create_multipart_with_buffer_limit(body, 16); + let res = multipart.next().await.unwrap(); + + assert_matches!(res, Err(Error::Payload(PayloadError::Overflow))); + } + + #[actix_rt::test] + async fn malformed_headers_over_buffer_limit_errors() { + let body = stream::iter( + [ + Bytes::from_static(b"--a\r\n"), + Bytes::from_static(b"X-Long: 12345678"), + Bytes::from_static(b"9012345678901234"), + Bytes::from_static(b"5678901234567890"), + ] + .map(Ok), + ); + + let mut multipart = create_multipart_with_buffer_limit(body, 24); + let res = multipart.next().await.unwrap(); + + assert_matches!(res, Err(Error::Payload(PayloadError::Overflow))); + } + + #[actix_rt::test] + async fn raw_extractor_uses_configured_buffer_limit() { + let (req, mut payload) = TestRequest::default() + .insert_header((header::CONTENT_TYPE, "multipart/mixed; boundary=\"a\"")) + .app_data(MultipartConfig::default().buffer_limit(16)) + .set_payload(Bytes::from_static(b"aaaaaaaabbbbbbbbcccccccc")) + .to_http_parts(); + + let mut multipart = Multipart::from_request(&req, &mut payload).await.unwrap(); + let res = multipart.next().await.unwrap(); + + assert_matches!(res, Err(Error::Payload(PayloadError::Overflow))); + } + + #[actix_rt::test] + async fn valid_large_field_streams_through_small_parser_buffer() { + let mut bytes = BytesMut::new(); + bytes.put(&b"--a\r\nContent-Length: 100\r\n\r\n"[..]); + bytes.put(&[b'x'; 100][..]); + bytes.put(&b"\r\n--a--\r\n"[..]); + let body = stream::once(async { Ok(bytes.freeze()) }); + + let mut multipart = create_multipart_with_buffer_limit(body, 32); + let mut field = multipart.next().await.unwrap().unwrap(); + + assert_eq!( + get_whole_field(&mut field).await, + Bytes::from(vec![b'x'; 100]) + ); + drop(field); + assert!(multipart.next().await.is_none()); + } + #[actix_rt::test] async fn test_multipart_no_end_crlf() { let (sender, payload) = create_stream(); diff --git a/actix-multipart/src/payload.rs b/actix-multipart/src/payload.rs index 858634bc0..4c9929aed 100644 --- a/actix-multipart/src/payload.rs +++ b/actix-multipart/src/payload.rs @@ -14,6 +14,9 @@ use futures_core::stream::{LocalBoxStream, Stream}; use crate::{error::Error, safety::Safety}; +pub(crate) const DEFAULT_BUFFER_LIMIT: usize = 65_536; // 64 KiB +const MAX_READY_CHUNKS_PER_POLL: usize = 16; + pub(crate) struct PayloadRef { payload: Rc>, } @@ -45,31 +48,64 @@ impl Clone for PayloadRef { /// Payload buffer. pub(crate) struct PayloadBuffer { pub(crate) stream: LocalBoxStream<'static, Result>, + pending: Option, pub(crate) buf: BytesMut, + buffer_limit: usize, /// EOF flag. If true, no more payload reads will be attempted. pub(crate) eof: bool, } impl PayloadBuffer { /// Constructs new payload buffer. - pub(crate) fn new(stream: S) -> Self + pub(crate) fn new_with_limit(stream: S, buffer_limit: usize) -> Self where S: Stream> + 'static, { PayloadBuffer { stream: Box::pin(stream), + pending: None, buf: BytesMut::with_capacity(1_024), // pre-allocate 1KiB + buffer_limit, eof: false, } } + /// Polls a bounded amount of payload into the parser buffer. + /// + /// This does not drain the stream to EOF in one call. Callers must be prepared to poll again + /// after consuming buffered data. pub(crate) fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> { - loop { + if self.buffer_limit == 0 { + return Err(PayloadError::Overflow); + } + + let mut appended = false; + + for _ in 0..MAX_READY_CHUNKS_PER_POLL { + if self.pending.is_some() { + appended |= self.append_pending()?; + + if self.pending.is_some() || self.buf.len() >= self.buffer_limit { + if appended { + cx.waker().wake_by_ref(); + } + return Ok(()); + } + + continue; + } + match Pin::new(&mut self.stream).poll_next(cx) { Poll::Ready(Some(Ok(data))) => { - self.buf.extend_from_slice(&data); - // try to read more data - continue; + self.pending = Some(data); + appended |= self.append_pending()?; + + if self.pending.is_some() || self.buf.len() >= self.buffer_limit { + if appended { + cx.waker().wake_by_ref(); + } + return Ok(()); + } } Poll::Ready(Some(Err(err))) => return Err(err), Poll::Ready(None) => { @@ -79,6 +115,40 @@ impl PayloadBuffer { Poll::Pending => return Ok(()), } } + + if appended { + cx.waker().wake_by_ref(); + } + + Ok(()) + } + + fn append_pending(&mut self) -> Result { + let Some(mut data) = self.pending.take() else { + return Ok(false); + }; + + if data.is_empty() { + return Ok(false); + } + + if self.buf.len() >= self.buffer_limit { + self.pending = Some(data); + return Err(PayloadError::Overflow); + } + + let available = self.buffer_limit - self.buf.len(); + let len = cmp::min(data.len(), available); + + if len == data.len() { + self.buf.extend_from_slice(&data); + } else { + let chunk = data.split_to(len); + self.buf.extend_from_slice(&chunk); + self.pending = Some(data); + } + + Ok(len != 0) } /// Reads exact number of bytes. @@ -162,7 +232,7 @@ mod tests { #[actix_rt::test] async fn basic() { let (_, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(payload.buf.len(), 0); lazy(|cx| payload.poll_stream(cx)).await.unwrap(); @@ -172,7 +242,7 @@ mod tests { #[actix_rt::test] async fn eof() { let (mut sender, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(None, payload.read_max(4).unwrap()); sender.feed_data(Bytes::from("data")); @@ -181,6 +251,8 @@ mod tests { assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap()); assert_eq!(payload.buf.len(), 0); + + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert!(payload.read_max(1).is_err()); assert!(payload.eof); } @@ -188,7 +260,7 @@ mod tests { #[actix_rt::test] async fn err() { let (mut sender, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(None, payload.read_max(1).unwrap()); sender.set_error(PayloadError::Incomplete(None)); lazy(|cx| payload.poll_stream(cx)).await.err().unwrap(); @@ -197,11 +269,12 @@ mod tests { #[actix_rt::test] async fn read_max() { let (mut sender, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(payload.buf.len(), 10); assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap()); @@ -214,13 +287,14 @@ mod tests { #[actix_rt::test] async fn read_exactly() { let (mut sender, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(None, payload.read_exact(2)); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2)); assert_eq!(payload.buf.len(), 8); @@ -232,13 +306,14 @@ mod tests { #[actix_rt::test] async fn read_until() { let (mut sender, payload) = h1::Payload::create(false); - let mut payload = PayloadBuffer::new(payload); + let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(None, payload.read_until(b"ne").unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!( Some(Bytes::from("line")), @@ -252,4 +327,38 @@ mod tests { ); assert_eq!(payload.buf.len(), 0); } + + #[actix_rt::test] + async fn poll_stream_does_not_exceed_buffer_limit() { + let stream = futures_util::stream::iter([ + Ok(Bytes::from_static(b"12345678")), + Ok(Bytes::from_static(b"abcdefgh")), + Ok(Bytes::from_static(b"overflow")), + ]); + let mut payload = PayloadBuffer::new_with_limit(stream, 16); + + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + assert_eq!(payload.buf.len(), 16); + + let err = lazy(|cx| payload.poll_stream(cx)).await.unwrap_err(); + assert!(matches!(err, PayloadError::Overflow)); + assert_eq!(payload.buf.len(), 16); + } + + #[actix_rt::test] + async fn oversized_chunk_can_be_consumed_incrementally() { + let stream = futures_util::stream::once(async { Ok(Bytes::from_static(b"12345678")) }); + let mut payload = PayloadBuffer::new_with_limit(stream, 4); + + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + assert_eq!(payload.buf, Bytes::from_static(b"1234")); + assert_eq!(payload.read_max(4).unwrap().unwrap(), "1234"); + + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + assert_eq!(payload.buf, Bytes::from_static(b"5678")); + assert_eq!(payload.read_max(4).unwrap().unwrap(), "5678"); + + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); + assert!(payload.eof); + } }