diff --git a/actix-codec/benches/lines.rs b/actix-codec/benches/lines.rs index c6d13123..4fa022c2 100644 --- a/actix-codec/benches/lines.rs +++ b/actix-codec/benches/lines.rs @@ -4,6 +4,8 @@ use bytes::BytesMut; use criterion::{criterion_group, criterion_main, Criterion}; const INPUT: &[u8] = include_bytes!("./lorem.txt"); +const PARTIAL_CHUNK: [u8; 64] = [b'a'; 64]; +const PARTIAL_CHUNKS: usize = 128; fn bench_lines_codec(c: &mut Criterion) { let mut decode_group = c.benchmark_group("lines decode"); @@ -30,6 +32,44 @@ fn bench_lines_codec(c: &mut Criterion) { decode_group.finish(); + let mut partial_decode_group = c.benchmark_group("lines decode partial"); + + partial_decode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Decoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1); + + for _ in 0..PARTIAL_CHUNKS { + buf.extend_from_slice(&PARTIAL_CHUNK); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + buf.extend_from_slice(b"\n"); + assert!(codec.decode(&mut buf).unwrap().is_some()); + }); + }); + + partial_decode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Decoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1); + + for _ in 0..PARTIAL_CHUNKS { + buf.extend_from_slice(&PARTIAL_CHUNK); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + buf.extend_from_slice(b"\n"); + assert!(codec.decode(&mut buf).unwrap().is_some()); + }); + }); + + partial_decode_group.finish(); + let mut encode_group = c.benchmark_group("lines encode"); encode_group.bench_function("actix", |b| { diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs index a41855cf..75cbfa35 100644 --- a/actix-codec/src/lines.rs +++ b/actix-codec/src/lines.rs @@ -20,6 +20,8 @@ use super::{Decoder, Encoder}; #[non_exhaustive] pub struct LinesCodec { max_length: usize, + // Next byte index to examine for `\n` after an incomplete decode. + next_index: usize, } impl LinesCodec { @@ -29,6 +31,7 @@ impl LinesCodec { pub const fn new() -> Self { Self { max_length: usize::MAX, + next_index: 0, } } @@ -40,7 +43,10 @@ impl LinesCodec { /// Using a length limit is recommended when working with untrusted input to avoid unbounded /// buffering. pub const fn new_with_max_length(max_length: usize) -> Self { - Self { max_length } + Self { + max_length, + next_index: 0, + } } /// Returns the maximum permitted line length, in bytes. @@ -74,39 +80,55 @@ impl Decoder for LinesCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.is_empty() { + self.next_index = 0; return Ok(None); } - let len = match memchr(b'\n', src) { - Some(n) => n, - None => { - // No delimiter yet; if current buffered data already exceeds the maximum line - // length, abort to avoid unbounded memory growth. - let max = self.max_length; - let max_cr = max.saturating_add(1); + // Framed reads append to the same buffer after incomplete decodes. We do not currently + // expect callers to replace it, but fall back to a fresh scan if they do. + let start = if self.next_index < src.len() { + self.next_index + } else { + 0 + }; - if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "max line length exceeded", - )); + let len = match memchr(b'\n', &src[start..]) { + Some(n) => start + n, + None => { + let max = self.max_length; + if max != usize::MAX { + // No delimiter yet; if current buffered data already exceeds the maximum line + // length, abort to avoid unbounded memory growth. + let max_cr = max.saturating_add(1); + + if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "max line length exceeded", + )); + } } + self.next_index = src.len(); return Ok(None); } }; // Reject overly long lines before splitting/advancing buffers. let max = self.max_length; - let max_cr = max.saturating_add(1); + if max != usize::MAX { + let max_cr = max.saturating_add(1); - if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "max line length exceeded", - )); + if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "max line length exceeded", + )); + } } + self.next_index = 0; + // split up to new line char let mut buf = src.split_to(len); debug_assert_eq!(len, buf.len()); @@ -132,6 +154,8 @@ impl Decoder for LinesCodec { Some(frame) => Ok(Some(frame)), None if src.is_empty() => Ok(None), None => { + self.next_index = 0; + let buf = match src.last() { // if last line ends in a CR then take everything up to it Some(b'\r') => src.split_to(src.len() - 1), @@ -229,4 +253,76 @@ mod tests { let err = codec.decode(&mut buf).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::InvalidData); } + + #[test] + fn lines_decoder_resumes_from_previous_search() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from(&b"partial"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line\n"); + assert_eq!("partial line", codec.decode(&mut buf).unwrap().unwrap()); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + #[test] + fn lines_decoder_resumes_across_multiple_partial_chunks() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::new(); + + buf.put_slice(b"partial"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" across chunks"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b"\n"); + assert_eq!( + "partial line across chunks", + codec.decode(&mut buf).unwrap().unwrap() + ); + } + + #[test] + fn lines_decoder_resumes_with_max_length() { + let mut codec = LinesCodec::new_with_max_length(18); + let mut buf = BytesMut::new(); + + buf.put_slice(b"partial"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" ok\n"); + assert_eq!("partial line ok", codec.decode(&mut buf).unwrap().unwrap()); + } + + #[test] + fn lines_decoder_errors_on_overlong_partial_line() { + let mut codec = LinesCodec::new_with_max_length(4); + let mut buf = BytesMut::from(&b"aa"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b"aaa"); + let err = codec.decode(&mut buf).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[test] + fn lines_decoder_resets_search_after_decode_eof() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from(&b"partial"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert_eq!("partial", codec.decode_eof(&mut buf).unwrap().unwrap()); + + buf.put_slice(b"next\n"); + assert_eq!("next", codec.decode(&mut buf).unwrap().unwrap()); + } }