From 063deead6b3837be8817496bb94bcc6422a83da8 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 30 Apr 2026 17:33:02 +0900 Subject: [PATCH 1/2] perf(codec): improve performance by storing next index --- actix-codec/benches/lines.rs | 40 ++++++++++ actix-codec/src/lines.rs | 139 ++++++++++++++++++++++++++++++----- 2 files changed, 160 insertions(+), 19 deletions(-) diff --git a/actix-codec/benches/lines.rs b/actix-codec/benches/lines.rs index c6d13123..4fa022c2 100644 --- a/actix-codec/benches/lines.rs +++ b/actix-codec/benches/lines.rs @@ -4,6 +4,8 @@ use bytes::BytesMut; use criterion::{criterion_group, criterion_main, Criterion}; const INPUT: &[u8] = include_bytes!("./lorem.txt"); +const PARTIAL_CHUNK: [u8; 64] = [b'a'; 64]; +const PARTIAL_CHUNKS: usize = 128; fn bench_lines_codec(c: &mut Criterion) { let mut decode_group = c.benchmark_group("lines decode"); @@ -30,6 +32,44 @@ fn bench_lines_codec(c: &mut Criterion) { decode_group.finish(); + let mut partial_decode_group = c.benchmark_group("lines decode partial"); + + partial_decode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Decoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1); + + for _ in 0..PARTIAL_CHUNKS { + buf.extend_from_slice(&PARTIAL_CHUNK); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + buf.extend_from_slice(b"\n"); + assert!(codec.decode(&mut buf).unwrap().is_some()); + }); + }); + + partial_decode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Decoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1); + + for _ in 0..PARTIAL_CHUNKS { + buf.extend_from_slice(&PARTIAL_CHUNK); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + buf.extend_from_slice(b"\n"); + assert!(codec.decode(&mut buf).unwrap().is_some()); + }); + }); + + partial_decode_group.finish(); + let mut encode_group = c.benchmark_group("lines encode"); encode_group.bench_function("actix", |b| { diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs index a41855cf..e8fbb771 100644 --- a/actix-codec/src/lines.rs +++ b/actix-codec/src/lines.rs @@ -16,10 +16,19 @@ use super::{Decoder, Encoder}; /// [`LinesCodec::new_with_max_length`]. Without a length limit, the internal read buffer can grow /// without bound if a peer sends an unbounded amount of data without a `\n`, potentially leading /// to memory exhaustion (DoS). +/// +/// # Direct `Decoder` Use +/// +/// `LinesCodec` caches the index after the last byte it searched when [`Decoder::decode`] returns +/// `Ok(None)`. Callers that invoke `decode` directly must pass the same [`BytesMut`] with newly +/// read bytes appended. If the buffer is replaced or bytes before the cached offset are changed, +/// create a new `LinesCodec` before decoding the replacement buffer. #[derive(Debug, Copy, Clone)] #[non_exhaustive] pub struct LinesCodec { max_length: usize, + // Next byte index to examine for `\n` after a previous incomplete decode. + next_index: usize, } impl LinesCodec { @@ -29,6 +38,7 @@ impl LinesCodec { pub const fn new() -> Self { Self { max_length: usize::MAX, + next_index: 0, } } @@ -40,7 +50,10 @@ impl LinesCodec { /// Using a length limit is recommended when working with untrusted input to avoid unbounded /// buffering. pub const fn new_with_max_length(max_length: usize) -> Self { - Self { max_length } + Self { + max_length, + next_index: 0, + } } /// Returns the maximum permitted line length, in bytes. @@ -74,39 +87,53 @@ impl Decoder for LinesCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.is_empty() { + self.next_index = 0; return Ok(None); } - let len = match memchr(b'\n', src) { - Some(n) => n, - None => { - // No delimiter yet; if current buffered data already exceeds the maximum line - // length, abort to avoid unbounded memory growth. - let max = self.max_length; - let max_cr = max.saturating_add(1); + let start = self.next_index.min(src.len()); + debug_assert!( + memchr(b'\n', &src[..start]).is_none(), + "LinesCodec buffer changed before cached search offset" + ); - if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "max line length exceeded", - )); + let len = match memchr(b'\n', &src[start..]) { + Some(n) => start + n, + None => { + let max = self.max_length; + if max != usize::MAX { + // No delimiter yet; if current buffered data already exceeds the maximum line + // length, abort to avoid unbounded memory growth. + let max_cr = max.saturating_add(1); + + if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "max line length exceeded", + )); + } } + self.next_index = src.len(); return Ok(None); } }; // Reject overly long lines before splitting/advancing buffers. let max = self.max_length; - let max_cr = max.saturating_add(1); + if max != usize::MAX { + let max_cr = max.saturating_add(1); - if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "max line length exceeded", - )); + if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "max line length exceeded", + )); + } } + self.next_index = 0; + // split up to new line char let mut buf = src.split_to(len); debug_assert_eq!(len, buf.len()); @@ -132,6 +159,8 @@ impl Decoder for LinesCodec { Some(frame) => Ok(Some(frame)), None if src.is_empty() => Ok(None), None => { + self.next_index = 0; + let buf = match src.last() { // if last line ends in a CR then take everything up to it Some(b'\r') => src.split_to(src.len() - 1), @@ -229,4 +258,76 @@ mod tests { let err = codec.decode(&mut buf).unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::InvalidData); } + + #[test] + fn lines_decoder_resumes_from_previous_search() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from(&b"partial"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line\n"); + assert_eq!("partial line", codec.decode(&mut buf).unwrap().unwrap()); + assert!(codec.decode(&mut buf).unwrap().is_none()); + } + + #[test] + fn lines_decoder_resumes_across_multiple_partial_chunks() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::new(); + + buf.put_slice(b"partial"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" across chunks"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b"\n"); + assert_eq!( + "partial line across chunks", + codec.decode(&mut buf).unwrap().unwrap() + ); + } + + #[test] + fn lines_decoder_resumes_with_max_length() { + let mut codec = LinesCodec::new_with_max_length(18); + let mut buf = BytesMut::new(); + + buf.put_slice(b"partial"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" line"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b" ok\n"); + assert_eq!("partial line ok", codec.decode(&mut buf).unwrap().unwrap()); + } + + #[test] + fn lines_decoder_errors_on_overlong_partial_line() { + let mut codec = LinesCodec::new_with_max_length(4); + let mut buf = BytesMut::from(&b"aa"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + + buf.put_slice(b"aaa"); + let err = codec.decode(&mut buf).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[test] + fn lines_decoder_resets_search_after_decode_eof() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from(&b"partial"[..]); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert_eq!("partial", codec.decode_eof(&mut buf).unwrap().unwrap()); + + buf.put_slice(b"next\n"); + assert_eq!("next", codec.decode(&mut buf).unwrap().unwrap()); + } } From d26ccb6c0bdf0b054e95c3aa18737673d5cf9a25 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 7 May 2026 19:49:07 +0900 Subject: [PATCH 2/2] do not support external use --- actix-codec/src/lines.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs index e8fbb771..75cbfa35 100644 --- a/actix-codec/src/lines.rs +++ b/actix-codec/src/lines.rs @@ -16,18 +16,11 @@ use super::{Decoder, Encoder}; /// [`LinesCodec::new_with_max_length`]. Without a length limit, the internal read buffer can grow /// without bound if a peer sends an unbounded amount of data without a `\n`, potentially leading /// to memory exhaustion (DoS). -/// -/// # Direct `Decoder` Use -/// -/// `LinesCodec` caches the index after the last byte it searched when [`Decoder::decode`] returns -/// `Ok(None)`. Callers that invoke `decode` directly must pass the same [`BytesMut`] with newly -/// read bytes appended. If the buffer is replaced or bytes before the cached offset are changed, -/// create a new `LinesCodec` before decoding the replacement buffer. #[derive(Debug, Copy, Clone)] #[non_exhaustive] pub struct LinesCodec { max_length: usize, - // Next byte index to examine for `\n` after a previous incomplete decode. + // Next byte index to examine for `\n` after an incomplete decode. next_index: usize, } @@ -91,11 +84,13 @@ impl Decoder for LinesCodec { return Ok(None); } - let start = self.next_index.min(src.len()); - debug_assert!( - memchr(b'\n', &src[..start]).is_none(), - "LinesCodec buffer changed before cached search offset" - ); + // Framed reads append to the same buffer after incomplete decodes. We do not currently + // expect callers to replace it, but fall back to a fresh scan if they do. + let start = if self.next_index < src.len() { + self.next_index + } else { + 0 + }; let len = match memchr(b'\n', &src[start..]) { Some(n) => start + n,