This commit is contained in:
Yuki Okushi 2026-05-31 00:37:46 +08:00 committed by GitHub
commit db1f0bcc37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 155 additions and 19 deletions

View File

@ -4,6 +4,8 @@ use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};
const INPUT: &[u8] = include_bytes!("./lorem.txt");
const PARTIAL_CHUNK: [u8; 64] = [b'a'; 64];
const PARTIAL_CHUNKS: usize = 128;
fn bench_lines_codec(c: &mut Criterion) {
let mut decode_group = c.benchmark_group("lines decode");
@ -30,6 +32,44 @@ fn bench_lines_codec(c: &mut Criterion) {
decode_group.finish();
let mut partial_decode_group = c.benchmark_group("lines decode partial");
partial_decode_group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Decoder as _;
let mut codec = actix_codec::LinesCodec::default();
let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1);
for _ in 0..PARTIAL_CHUNKS {
buf.extend_from_slice(&PARTIAL_CHUNK);
assert!(codec.decode(&mut buf).unwrap().is_none());
}
buf.extend_from_slice(b"\n");
assert!(codec.decode(&mut buf).unwrap().is_some());
});
});
partial_decode_group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Decoder as _;
let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1);
for _ in 0..PARTIAL_CHUNKS {
buf.extend_from_slice(&PARTIAL_CHUNK);
assert!(codec.decode(&mut buf).unwrap().is_none());
}
buf.extend_from_slice(b"\n");
assert!(codec.decode(&mut buf).unwrap().is_some());
});
});
partial_decode_group.finish();
let mut encode_group = c.benchmark_group("lines encode");
encode_group.bench_function("actix", |b| {

View File

@ -20,6 +20,8 @@ use super::{Decoder, Encoder};
#[non_exhaustive]
pub struct LinesCodec {
max_length: usize,
// Next byte index to examine for `\n` after an incomplete decode.
next_index: usize,
}
impl LinesCodec {
@ -29,6 +31,7 @@ impl LinesCodec {
pub const fn new() -> Self {
Self {
max_length: usize::MAX,
next_index: 0,
}
}
@ -40,7 +43,10 @@ impl LinesCodec {
/// Using a length limit is recommended when working with untrusted input to avoid unbounded
/// buffering.
pub const fn new_with_max_length(max_length: usize) -> Self {
Self { max_length }
Self {
max_length,
next_index: 0,
}
}
/// Returns the maximum permitted line length, in bytes.
@ -74,39 +80,55 @@ impl Decoder for LinesCodec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
self.next_index = 0;
return Ok(None);
}
let len = match memchr(b'\n', src) {
Some(n) => n,
None => {
// No delimiter yet; if current buffered data already exceeds the maximum line
// length, abort to avoid unbounded memory growth.
let max = self.max_length;
let max_cr = max.saturating_add(1);
// Framed reads append to the same buffer after incomplete decodes. We do not currently
// expect callers to replace it, but fall back to a fresh scan if they do.
let start = if self.next_index < src.len() {
self.next_index
} else {
0
};
if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"max line length exceeded",
));
let len = match memchr(b'\n', &src[start..]) {
Some(n) => start + n,
None => {
let max = self.max_length;
if max != usize::MAX {
// No delimiter yet; if current buffered data already exceeds the maximum line
// length, abort to avoid unbounded memory growth.
let max_cr = max.saturating_add(1);
if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"max line length exceeded",
));
}
}
self.next_index = src.len();
return Ok(None);
}
};
// Reject overly long lines before splitting/advancing buffers.
let max = self.max_length;
let max_cr = max.saturating_add(1);
if max != usize::MAX {
let max_cr = max.saturating_add(1);
if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"max line length exceeded",
));
if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"max line length exceeded",
));
}
}
self.next_index = 0;
// split up to new line char
let mut buf = src.split_to(len);
debug_assert_eq!(len, buf.len());
@ -132,6 +154,8 @@ impl Decoder for LinesCodec {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None => {
self.next_index = 0;
let buf = match src.last() {
// if last line ends in a CR then take everything up to it
Some(b'\r') => src.split_to(src.len() - 1),
@ -229,4 +253,76 @@ mod tests {
let err = codec.decode(&mut buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn lines_decoder_resumes_from_previous_search() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::from(&b"partial"[..]);
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b" line\n");
assert_eq!("partial line", codec.decode(&mut buf).unwrap().unwrap());
assert!(codec.decode(&mut buf).unwrap().is_none());
}
#[test]
fn lines_decoder_resumes_across_multiple_partial_chunks() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::new();
buf.put_slice(b"partial");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b" line");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b" across chunks");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b"\n");
assert_eq!(
"partial line across chunks",
codec.decode(&mut buf).unwrap().unwrap()
);
}
#[test]
fn lines_decoder_resumes_with_max_length() {
let mut codec = LinesCodec::new_with_max_length(18);
let mut buf = BytesMut::new();
buf.put_slice(b"partial");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b" line");
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b" ok\n");
assert_eq!("partial line ok", codec.decode(&mut buf).unwrap().unwrap());
}
#[test]
fn lines_decoder_errors_on_overlong_partial_line() {
let mut codec = LinesCodec::new_with_max_length(4);
let mut buf = BytesMut::from(&b"aa"[..]);
assert!(codec.decode(&mut buf).unwrap().is_none());
buf.put_slice(b"aaa");
let err = codec.decode(&mut buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn lines_decoder_resets_search_after_decode_eof() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::from(&b"partial"[..]);
assert!(codec.decode(&mut buf).unwrap().is_none());
assert_eq!("partial", codec.decode_eof(&mut buf).unwrap().unwrap());
buf.put_slice(b"next\n");
assert_eq!("next", codec.decode(&mut buf).unwrap().unwrap());
}
}