mirror of https://github.com/fafhrd91/actix-net
perf(codec): improve performance by storing next index
This commit is contained in:
parent
4839132d08
commit
063deead6b
|
|
@ -4,6 +4,8 @@ use bytes::BytesMut;
|
||||||
use criterion::{criterion_group, criterion_main, Criterion};
|
use criterion::{criterion_group, criterion_main, Criterion};
|
||||||
|
|
||||||
const INPUT: &[u8] = include_bytes!("./lorem.txt");
|
const INPUT: &[u8] = include_bytes!("./lorem.txt");
|
||||||
|
const PARTIAL_CHUNK: [u8; 64] = [b'a'; 64];
|
||||||
|
const PARTIAL_CHUNKS: usize = 128;
|
||||||
|
|
||||||
fn bench_lines_codec(c: &mut Criterion) {
|
fn bench_lines_codec(c: &mut Criterion) {
|
||||||
let mut decode_group = c.benchmark_group("lines decode");
|
let mut decode_group = c.benchmark_group("lines decode");
|
||||||
|
|
@ -30,6 +32,44 @@ fn bench_lines_codec(c: &mut Criterion) {
|
||||||
|
|
||||||
decode_group.finish();
|
decode_group.finish();
|
||||||
|
|
||||||
|
let mut partial_decode_group = c.benchmark_group("lines decode partial");
|
||||||
|
|
||||||
|
partial_decode_group.bench_function("actix", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
use actix_codec::Decoder as _;
|
||||||
|
|
||||||
|
let mut codec = actix_codec::LinesCodec::default();
|
||||||
|
let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1);
|
||||||
|
|
||||||
|
for _ in 0..PARTIAL_CHUNKS {
|
||||||
|
buf.extend_from_slice(&PARTIAL_CHUNK);
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_from_slice(b"\n");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_some());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
partial_decode_group.bench_function("tokio", |b| {
|
||||||
|
b.iter(|| {
|
||||||
|
use tokio_util::codec::Decoder as _;
|
||||||
|
|
||||||
|
let mut codec = tokio_util::codec::LinesCodec::new();
|
||||||
|
let mut buf = BytesMut::with_capacity(PARTIAL_CHUNK.len() * PARTIAL_CHUNKS + 1);
|
||||||
|
|
||||||
|
for _ in 0..PARTIAL_CHUNKS {
|
||||||
|
buf.extend_from_slice(&PARTIAL_CHUNK);
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
buf.extend_from_slice(b"\n");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_some());
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
partial_decode_group.finish();
|
||||||
|
|
||||||
let mut encode_group = c.benchmark_group("lines encode");
|
let mut encode_group = c.benchmark_group("lines encode");
|
||||||
|
|
||||||
encode_group.bench_function("actix", |b| {
|
encode_group.bench_function("actix", |b| {
|
||||||
|
|
|
||||||
|
|
@ -16,10 +16,19 @@ use super::{Decoder, Encoder};
|
||||||
/// [`LinesCodec::new_with_max_length`]. Without a length limit, the internal read buffer can grow
|
/// [`LinesCodec::new_with_max_length`]. Without a length limit, the internal read buffer can grow
|
||||||
/// without bound if a peer sends an unbounded amount of data without a `\n`, potentially leading
|
/// without bound if a peer sends an unbounded amount of data without a `\n`, potentially leading
|
||||||
/// to memory exhaustion (DoS).
|
/// to memory exhaustion (DoS).
|
||||||
|
///
|
||||||
|
/// # Direct `Decoder` Use
|
||||||
|
///
|
||||||
|
/// `LinesCodec` caches the index after the last byte it searched when [`Decoder::decode`] returns
|
||||||
|
/// `Ok(None)`. Callers that invoke `decode` directly must pass the same [`BytesMut`] with newly
|
||||||
|
/// read bytes appended. If the buffer is replaced or bytes before the cached offset are changed,
|
||||||
|
/// create a new `LinesCodec` before decoding the replacement buffer.
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
pub struct LinesCodec {
|
pub struct LinesCodec {
|
||||||
max_length: usize,
|
max_length: usize,
|
||||||
|
// Next byte index to examine for `\n` after a previous incomplete decode.
|
||||||
|
next_index: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LinesCodec {
|
impl LinesCodec {
|
||||||
|
|
@ -29,6 +38,7 @@ impl LinesCodec {
|
||||||
pub const fn new() -> Self {
|
pub const fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
max_length: usize::MAX,
|
max_length: usize::MAX,
|
||||||
|
next_index: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -40,7 +50,10 @@ impl LinesCodec {
|
||||||
/// Using a length limit is recommended when working with untrusted input to avoid unbounded
|
/// Using a length limit is recommended when working with untrusted input to avoid unbounded
|
||||||
/// buffering.
|
/// buffering.
|
||||||
pub const fn new_with_max_length(max_length: usize) -> Self {
|
pub const fn new_with_max_length(max_length: usize) -> Self {
|
||||||
Self { max_length }
|
Self {
|
||||||
|
max_length,
|
||||||
|
next_index: 0,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the maximum permitted line length, in bytes.
|
/// Returns the maximum permitted line length, in bytes.
|
||||||
|
|
@ -74,39 +87,53 @@ impl Decoder for LinesCodec {
|
||||||
|
|
||||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
if src.is_empty() {
|
if src.is_empty() {
|
||||||
|
self.next_index = 0;
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let len = match memchr(b'\n', src) {
|
let start = self.next_index.min(src.len());
|
||||||
Some(n) => n,
|
debug_assert!(
|
||||||
None => {
|
memchr(b'\n', &src[..start]).is_none(),
|
||||||
// No delimiter yet; if current buffered data already exceeds the maximum line
|
"LinesCodec buffer changed before cached search offset"
|
||||||
// length, abort to avoid unbounded memory growth.
|
);
|
||||||
let max = self.max_length;
|
|
||||||
let max_cr = max.saturating_add(1);
|
|
||||||
|
|
||||||
if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) {
|
let len = match memchr(b'\n', &src[start..]) {
|
||||||
return Err(io::Error::new(
|
Some(n) => start + n,
|
||||||
io::ErrorKind::InvalidData,
|
None => {
|
||||||
"max line length exceeded",
|
let max = self.max_length;
|
||||||
));
|
if max != usize::MAX {
|
||||||
|
// No delimiter yet; if current buffered data already exceeds the maximum line
|
||||||
|
// length, abort to avoid unbounded memory growth.
|
||||||
|
let max_cr = max.saturating_add(1);
|
||||||
|
|
||||||
|
if src.len() > max && !(src.len() == max_cr && src.last() == Some(&b'\r')) {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"max line length exceeded",
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.next_index = src.len();
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Reject overly long lines before splitting/advancing buffers.
|
// Reject overly long lines before splitting/advancing buffers.
|
||||||
let max = self.max_length;
|
let max = self.max_length;
|
||||||
let max_cr = max.saturating_add(1);
|
if max != usize::MAX {
|
||||||
|
let max_cr = max.saturating_add(1);
|
||||||
|
|
||||||
if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) {
|
if len > max && !(len == max_cr && src.get(len - 1) == Some(&b'\r')) {
|
||||||
return Err(io::Error::new(
|
return Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidData,
|
io::ErrorKind::InvalidData,
|
||||||
"max line length exceeded",
|
"max line length exceeded",
|
||||||
));
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.next_index = 0;
|
||||||
|
|
||||||
// split up to new line char
|
// split up to new line char
|
||||||
let mut buf = src.split_to(len);
|
let mut buf = src.split_to(len);
|
||||||
debug_assert_eq!(len, buf.len());
|
debug_assert_eq!(len, buf.len());
|
||||||
|
|
@ -132,6 +159,8 @@ impl Decoder for LinesCodec {
|
||||||
Some(frame) => Ok(Some(frame)),
|
Some(frame) => Ok(Some(frame)),
|
||||||
None if src.is_empty() => Ok(None),
|
None if src.is_empty() => Ok(None),
|
||||||
None => {
|
None => {
|
||||||
|
self.next_index = 0;
|
||||||
|
|
||||||
let buf = match src.last() {
|
let buf = match src.last() {
|
||||||
// if last line ends in a CR then take everything up to it
|
// if last line ends in a CR then take everything up to it
|
||||||
Some(b'\r') => src.split_to(src.len() - 1),
|
Some(b'\r') => src.split_to(src.len() - 1),
|
||||||
|
|
@ -229,4 +258,76 @@ mod tests {
|
||||||
let err = codec.decode(&mut buf).unwrap_err();
|
let err = codec.decode(&mut buf).unwrap_err();
|
||||||
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn lines_decoder_resumes_from_previous_search() {
|
||||||
|
let mut codec = LinesCodec::default();
|
||||||
|
let mut buf = BytesMut::from(&b"partial"[..]);
|
||||||
|
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b" line\n");
|
||||||
|
assert_eq!("partial line", codec.decode(&mut buf).unwrap().unwrap());
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn lines_decoder_resumes_across_multiple_partial_chunks() {
|
||||||
|
let mut codec = LinesCodec::default();
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
|
||||||
|
buf.put_slice(b"partial");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b" line");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b" across chunks");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b"\n");
|
||||||
|
assert_eq!(
|
||||||
|
"partial line across chunks",
|
||||||
|
codec.decode(&mut buf).unwrap().unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn lines_decoder_resumes_with_max_length() {
|
||||||
|
let mut codec = LinesCodec::new_with_max_length(18);
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
|
||||||
|
buf.put_slice(b"partial");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b" line");
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b" ok\n");
|
||||||
|
assert_eq!("partial line ok", codec.decode(&mut buf).unwrap().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn lines_decoder_errors_on_overlong_partial_line() {
|
||||||
|
let mut codec = LinesCodec::new_with_max_length(4);
|
||||||
|
let mut buf = BytesMut::from(&b"aa"[..]);
|
||||||
|
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
|
||||||
|
buf.put_slice(b"aaa");
|
||||||
|
let err = codec.decode(&mut buf).unwrap_err();
|
||||||
|
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn lines_decoder_resets_search_after_decode_eof() {
|
||||||
|
let mut codec = LinesCodec::default();
|
||||||
|
let mut buf = BytesMut::from(&b"partial"[..]);
|
||||||
|
|
||||||
|
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||||
|
assert_eq!("partial", codec.decode_eof(&mut buf).unwrap().unwrap());
|
||||||
|
|
||||||
|
buf.put_slice(b"next\n");
|
||||||
|
assert_eq!("next", codec.decode(&mut buf).unwrap().unwrap());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue