From 35cadbbe0b2be6cd0d5a665d2b2fa6e2973d91a8 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 18 May 2024 12:30:19 -0500 Subject: [PATCH 01/13] actix-http: play with allowing bytes passthrough in the h1 encoder --- actix-http/src/h1/big_bytes.rs | 111 +++++++++++ actix-http/src/h1/codec.rs | 34 +++- actix-http/src/h1/dispatcher.rs | 38 ++-- actix-http/src/h1/encoder.rs | 323 +++++++++++++++++++++++++++++++- actix-http/src/h1/mod.rs | 1 + 5 files changed, 478 insertions(+), 29 deletions(-) create mode 100644 actix-http/src/h1/big_bytes.rs diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs new file mode 100644 index 00000000..ef31a61d --- /dev/null +++ b/actix-http/src/h1/big_bytes.rs @@ -0,0 +1,111 @@ +use std::collections::VecDeque; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +const SIXTYFOUR_KB: usize = 1024 * 64; + +pub(super) struct BigBytes { + buffer: BytesMut, + frozen: VecDeque, + frozen_len: usize, +} + +impl BigBytes { + pub(super) fn with_capacity(capacity: usize) -> Self { + Self { + buffer: BytesMut::with_capacity(capacity), + frozen: VecDeque::default(), + frozen_len: 0, + } + } + + // Clear the internal queue and buffer, resetting length to zero + pub(super) fn clear(&mut self) { + std::mem::take(&mut self.frozen); + self.frozen_len = 0; + self.buffer.clear(); + } + + // Return a mutable reference to the underlying buffer. This should only be used when dealing + // with small allocations (e.g. writing headers) + pub(super) fn buffer_mut(&mut self) -> &mut BytesMut { + &mut self.buffer + } + + // Reserve the requested size, if fewer than 64KB + pub(super) fn reserve(&mut self, count: usize) { + if count < SIXTYFOUR_KB { + self.buffer.reserve(count); + } + } + + pub(super) fn total_len(&mut self) -> usize { + self.frozen_len + self.buffer.len() + } + + pub(super) fn is_empty(&self) -> bool { + self.frozen_len == 0 && self.buffer.is_empty() + } + + // Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a + // queue, otherwise, it is added to a buffer. + pub(super) fn put_bytes(&mut self, bytes: Bytes) { + if bytes.len() < SIXTYFOUR_KB { + self.buffer.extend_from_slice(&bytes); + } else { + if !self.buffer.is_empty() { + let current = self.buffer.split().freeze(); + self.frozen_len += current.len(); + self.frozen.push_back(current); + } + + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); + } + } + + // Put a slice into the internal structure. This is always added to the internal buffer + pub(super) fn extend_from_slice(&mut self, slice: &[u8]) { + self.buffer.extend_from_slice(slice); + } + + // Returns a slice of the frontmost buffer + pub(super) fn front_slice(&self) -> &[u8] { + if let Some(front) = self.frozen.front() { + &front + } else { + &self.buffer + } + } + + // Advances the first buffer by `count` bytes. If the first buffer is advanced to completion, + // it is popped from the queue + pub(super) fn advance(&mut self, count: usize) { + if let Some(front) = self.frozen.front_mut() { + front.advance(count); + + if front.is_empty() { + self.frozen.pop_front(); + } + + self.frozen_len -= count; + } else { + self.buffer.advance(count); + } + } + + // Drain the BibBytes, writing everything into the provided BytesMut + pub(super) fn write_to(&mut self, dst: &mut BytesMut) { + dst.reserve(self.total_len()); + + for buf in &self.frozen { + dst.put_slice(buf); + } + + dst.put_slice(&self.buffer.split()); + + self.frozen_len = 0; + + std::mem::take(&mut self.frozen); + } +} diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 2b452f8f..a6ac3f89 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -6,6 +6,7 @@ use http::{Method, Version}; use tokio_util::codec::{Decoder, Encoder}; use super::{ + big_bytes::BigBytes, decoder::{self, PayloadDecoder, PayloadItem, PayloadType}, encoder, Message, MessageType, }; @@ -146,14 +147,12 @@ impl Decoder for Codec { } } -impl Encoder, BodySize)>> for Codec { - type Error = io::Error; - - fn encode( +impl Codec { + pub(super) fn encode_bigbytes( &mut self, item: Message<(Response<()>, BodySize)>, - dst: &mut BytesMut, - ) -> Result<(), Self::Error> { + dst: &mut BigBytes, + ) -> std::io::Result<()> { match item { Message::Item((mut res, length)) => { // set response version @@ -171,7 +170,7 @@ impl Encoder, BodySize)>> for Codec { }; // encode message - self.encoder.encode( + self.encoder.encode_bigbytes( dst, &mut res, self.flags.contains(Flags::HEAD), @@ -184,11 +183,11 @@ impl Encoder, BodySize)>> for Codec { } Message::Chunk(Some(bytes)) => { - self.encoder.encode_chunk(bytes.as_ref(), dst)?; + self.encoder.encode_chunk_bigbytes(bytes, dst)?; } Message::Chunk(None) => { - self.encoder.encode_eof(dst)?; + self.encoder.encode_eof_bigbytes(dst)?; } } @@ -196,6 +195,23 @@ impl Encoder, BodySize)>> for Codec { } } +impl Encoder, BodySize)>> for Codec { + type Error = io::Error; + + fn encode( + &mut self, + item: Message<(Response<()>, BodySize)>, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { + let mut bigbytes = BigBytes::with_capacity(1024 * 8); + self.encode_bigbytes(item, &mut bigbytes)?; + + bigbytes.write_to(dst); + + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 00b51360..30d8c61d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -12,14 +12,15 @@ use actix_codec::{Framed, FramedParts}; use actix_rt::time::sleep_until; use actix_service::Service; use bitflags::bitflags; -use bytes::{Buf, BytesMut}; +use bytes::BytesMut; use futures_core::ready; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::{Decoder as _, Encoder as _}; +use tokio_util::codec::Decoder as _; use tracing::{error, trace}; use super::{ + big_bytes::BigBytes, codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, @@ -165,7 +166,7 @@ pin_project! { pub(super) io: Option, read_buf: BytesMut, - write_buf: BytesMut, + write_buf: BigBytes, codec: Codec, } } @@ -277,7 +278,7 @@ where io: Some(io), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), - write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + write_buf: BigBytes::with_capacity(HW_BUFFER_SIZE), codec: Codec::new(config), }, }, @@ -329,20 +330,17 @@ where let InnerDispatcherProj { io, write_buf, .. } = self.project(); let mut io = Pin::new(io.as_mut().unwrap()); - let len = write_buf.len(); - let mut written = 0; - - while written < len { - match io.as_mut().poll_write(cx, &write_buf[written..])? { + while write_buf.total_len() > 0 { + match io.as_mut().poll_write(cx, write_buf.front_slice())? { Poll::Ready(0) => { + println!("WRITE ZERO"); error!("write zero; closing"); return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))); } - Poll::Ready(n) => written += n, + Poll::Ready(n) => write_buf.advance(n), Poll::Pending => { - write_buf.advance(written); return Poll::Pending; } } @@ -365,7 +363,7 @@ where let size = body.size(); this.codec - .encode(Message::Item((res, size)), this.write_buf) + .encode_bigbytes(Message::Item((res, size)), this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); @@ -493,15 +491,16 @@ where StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.total_len() < super::payload::MAX_BUFFER_SIZE { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec - .encode(Message::Chunk(Some(item)), this.write_buf)?; + .encode_bigbytes(Message::Chunk(Some(item)), this.write_buf)?; } Poll::Ready(None) => { - this.codec.encode(Message::Chunk(None), this.write_buf)?; + this.codec + .encode_bigbytes(Message::Chunk(None), this.write_buf)?; // payload stream finished. // set state to None and handle next message @@ -532,15 +531,16 @@ where // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.total_len() < super::payload::MAX_BUFFER_SIZE { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec - .encode(Message::Chunk(Some(item)), this.write_buf)?; + .encode_bigbytes(Message::Chunk(Some(item)), this.write_buf)?; } Poll::Ready(None) => { - this.codec.encode(Message::Chunk(None), this.write_buf)?; + this.codec + .encode_bigbytes(Message::Chunk(None), this.write_buf)?; // payload stream finished // set state to None and handle next message @@ -1027,7 +1027,7 @@ where mem::take(this.codec), mem::take(this.read_buf), ); - parts.write_buf = mem::take(this.write_buf); + this.write_buf.write_to(&mut parts.write_buf); let framed = Framed::from_parts(parts); this.flow.upgrade.as_ref().unwrap().call((req, framed)) } diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index abe396ce..90eef591 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -6,7 +6,7 @@ use std::{ slice::from_raw_parts_mut, }; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use crate::{ body::BodySize, @@ -16,6 +16,8 @@ use crate::{ helpers, ConnectionType, RequestHeadType, Response, ServiceConfig, StatusCode, Version, }; +use super::big_bytes::BigBytes; + const AVERAGE_HEADER_SIZE: usize = 30; #[derive(Debug)] @@ -49,8 +51,183 @@ pub(crate) trait MessageType: Sized { fn chunked(&self) -> bool; + fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()>; fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()>; + fn encode_headers_bigbytes( + &mut self, + dst: &mut BigBytes, + version: Version, + mut length: BodySize, + conn_type: ConnectionType, + config: &ServiceConfig, + ) -> io::Result<()> { + let chunked = self.chunked(); + let mut skip_len = length != BodySize::Stream; + let camel_case = self.camel_case(); + + // Content length + if let Some(status) = self.status() { + match status { + StatusCode::CONTINUE + | StatusCode::SWITCHING_PROTOCOLS + | StatusCode::PROCESSING + | StatusCode::NO_CONTENT => { + // skip content-length and transfer-encoding headers + // see https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.1 + // and https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2 + skip_len = true; + length = BodySize::None + } + + StatusCode::NOT_MODIFIED => { + // 304 responses should never have a body but should retain a manually set + // content-length header + // see https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 + skip_len = false; + length = BodySize::None; + } + + _ => {} + } + } + + match length { + BodySize::Stream => { + if chunked { + skip_len = true; + if camel_case { + dst.extend_from_slice(b"\r\nTransfer-Encoding: chunked\r\n") + } else { + dst.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") + } + } else { + skip_len = false; + dst.extend_from_slice(b"\r\n"); + } + } + BodySize::Sized(0) if camel_case => dst.extend_from_slice(b"\r\nContent-Length: 0\r\n"), + BodySize::Sized(0) => dst.extend_from_slice(b"\r\ncontent-length: 0\r\n"), + BodySize::Sized(len) => { + helpers::write_content_length(len, dst.buffer_mut(), camel_case) + } + BodySize::None => dst.extend_from_slice(b"\r\n"), + } + + // Connection + match conn_type { + ConnectionType::Upgrade => dst.extend_from_slice(b"connection: upgrade\r\n"), + ConnectionType::KeepAlive if version < Version::HTTP_11 => { + if camel_case { + dst.extend_from_slice(b"Connection: keep-alive\r\n") + } else { + dst.extend_from_slice(b"connection: keep-alive\r\n") + } + } + ConnectionType::Close if version >= Version::HTTP_11 => { + if camel_case { + dst.extend_from_slice(b"Connection: close\r\n") + } else { + dst.extend_from_slice(b"connection: close\r\n") + } + } + _ => {} + } + + // write headers + + let mut has_date = false; + + let dst = dst.buffer_mut(); + + let mut buf = dst.chunk_mut().as_mut_ptr(); + let mut remaining = dst.capacity() - dst.len(); + + // tracks bytes written since last buffer resize + // since buf is a raw pointer to a bytes container storage but is written to without the + // container's knowledge, this is used to sync the containers cursor after data is written + let mut pos = 0; + + self.write_headers(|key, value| { + match *key { + CONNECTION => return, + TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return, + DATE => has_date = true, + _ => {} + } + + let k = key.as_str().as_bytes(); + let k_len = k.len(); + + for val in value.iter() { + let v = val.as_ref(); + let v_len = v.len(); + + // key length + value length + colon + space + \r\n + let len = k_len + v_len + 4; + + if len > remaining { + // SAFETY: all the bytes written up to position "pos" are initialized + // the written byte count and pointer advancement are kept in sync + unsafe { + dst.advance_mut(pos); + } + + pos = 0; + dst.reserve(len * 2); + remaining = dst.capacity() - dst.len(); + + // re-assign buf raw pointer since it's possible that the buffer was + // reallocated and/or resized + buf = dst.chunk_mut().as_mut_ptr(); + } + + // SAFETY: on each write, it is enough to ensure that the advancement of + // the cursor matches the number of bytes written + unsafe { + if camel_case { + // use Camel-Case headers + write_camel_case(k, buf, k_len); + } else { + write_data(k, buf, k_len); + } + + buf = buf.add(k_len); + + write_data(b": ", buf, 2); + buf = buf.add(2); + + write_data(v, buf, v_len); + buf = buf.add(v_len); + + write_data(b"\r\n", buf, 2); + buf = buf.add(2); + }; + + pos += len; + remaining -= len; + } + }); + + // final cursor synchronization with the bytes container + // + // SAFETY: all the bytes written up to position "pos" are initialized + // the written byte count and pointer advancement are kept in sync + unsafe { + dst.advance_mut(pos); + } + + if !has_date { + // optimized date header, write_date_header writes its own \r\n + config.write_date_header(dst, camel_case); + } + + // end-of-headers marker + dst.extend_from_slice(b"\r\n"); + + Ok(()) + } + fn encode_headers( &mut self, dst: &mut BytesMut, @@ -263,6 +440,17 @@ impl MessageType for Response<()> { .contains(crate::message::Flags::CAMEL_CASE) } + fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { + let head = self.head(); + let reason = head.reason().as_bytes(); + dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE + reason.len()); + + // status line + helpers::write_status_line(head.version, head.status.as_u16(), dst.buffer_mut()); + dst.extend_from_slice(reason); + Ok(()) + } + fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.head(); let reason = head.reason().as_bytes(); @@ -296,6 +484,26 @@ impl MessageType for RequestHeadType { self.extra_headers() } + fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { + let head = self.as_ref(); + dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); + write!( + helpers::MutWriter(dst.buffer_mut()), + "{} {} {}", + head.method, + head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), + match head.version { + Version::HTTP_09 => "HTTP/0.9", + Version::HTTP_10 => "HTTP/1.0", + Version::HTTP_11 => "HTTP/1.1", + Version::HTTP_2 => "HTTP/2.0", + Version::HTTP_3 => "HTTP/3.0", + _ => return Err(io::Error::new(io::ErrorKind::Other, "unsupported version")), + } + ) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.as_ref(); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); @@ -323,11 +531,57 @@ impl MessageEncoder { self.te.encode(msg, buf) } + pub(super) fn encode_chunk_bigbytes( + &mut self, + msg: Bytes, + buf: &mut BigBytes, + ) -> io::Result { + self.te.encode_bigbytes(msg, buf) + } + /// Encode EOF. pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { self.te.encode_eof(buf) } + pub(super) fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { + self.te.encode_eof_bigbytes(buf) + } + + /// Encode message. + pub(super) fn encode_bigbytes( + &mut self, + dst: &mut BigBytes, + message: &mut T, + head: bool, + stream: bool, + version: Version, + length: BodySize, + conn_type: ConnectionType, + config: &ServiceConfig, + ) -> io::Result<()> { + // transfer encoding + if !head { + self.te = match length { + BodySize::Sized(0) => TransferEncoding::empty(), + BodySize::Sized(len) => TransferEncoding::length(len), + BodySize::Stream => { + if message.chunked() && !stream { + TransferEncoding::chunked() + } else { + TransferEncoding::eof() + } + } + BodySize::None => TransferEncoding::empty(), + }; + } else { + self.te = TransferEncoding::empty(); + } + + message.encode_status_bigbytes(dst)?; + message.encode_headers_bigbytes(dst, version, length, conn_type, config) + } + /// Encode message. pub fn encode( &mut self, @@ -414,6 +668,51 @@ impl TransferEncoding { } } + #[inline] + /// Encode message. Return `EOF` state of encoder + pub(super) fn encode_bigbytes(&mut self, msg: Bytes, buf: &mut BigBytes) -> io::Result { + match self.kind { + TransferEncodingKind::Eof => { + let eof = msg.is_empty(); + buf.put_bytes(msg); + Ok(eof) + } + TransferEncodingKind::Chunked(ref mut eof) => { + if *eof { + return Ok(true); + } + + if msg.is_empty() { + *eof = true; + buf.extend_from_slice(b"0\r\n\r\n"); + } else { + writeln!(helpers::MutWriter(buf.buffer_mut()), "{:X}\r", msg.len()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + buf.reserve(msg.len() + 2); + buf.put_bytes(msg); + buf.extend_from_slice(b"\r\n"); + } + Ok(*eof) + } + TransferEncodingKind::Length(ref mut remaining) => { + if *remaining > 0 { + if msg.is_empty() { + return Ok(*remaining == 0); + } + let len = cmp::min(*remaining, msg.len() as u64); + + buf.put_bytes(msg.slice(..len as usize)); + + *remaining -= len; + Ok(*remaining == 0) + } else { + Ok(true) + } + } + } + } + /// Encode message. Return `EOF` state of encoder #[inline] pub fn encode(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result { @@ -459,6 +758,28 @@ impl TransferEncoding { } } + /// Encode eof. Return `EOF` state of encoder + #[inline] + pub fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { + match self.kind { + TransferEncodingKind::Eof => Ok(()), + TransferEncodingKind::Length(rem) => { + if rem != 0 { + Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")) + } else { + Ok(()) + } + } + TransferEncodingKind::Chunked(ref mut eof) => { + if !*eof { + *eof = true; + buf.extend_from_slice(b"0\r\n\r\n"); + } + Ok(()) + } + } + } + /// Encode eof. Return `EOF` state of encoder #[inline] pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 9e44608d..267b2012 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -2,6 +2,7 @@ use bytes::{Bytes, BytesMut}; +mod big_bytes; mod chunked; mod client; mod codec; From b492b27e4a7f4d604194592f90b822a7fc834e79 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 18 May 2024 12:38:07 -0500 Subject: [PATCH 02/13] clippy --- actix-http/src/h1/big_bytes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index ef31a61d..2025916c 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -72,7 +72,7 @@ impl BigBytes { // Returns a slice of the frontmost buffer pub(super) fn front_slice(&self) -> &[u8] { if let Some(front) = self.frozen.front() { - &front + front } else { &self.buffer } From 09b460c72e68565cf809f83112241571cd8c4c5f Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 18 May 2024 12:53:41 -0500 Subject: [PATCH 03/13] Promote BigBytes from pub(super) to pub(crate) --- actix-http/src/h1/big_bytes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index 2025916c..755b42b9 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; const SIXTYFOUR_KB: usize = 1024 * 64; -pub(super) struct BigBytes { +pub(crate) struct BigBytes { buffer: BytesMut, frozen: VecDeque, frozen_len: usize, From a6b5c9893de0054d1907b357f593a48a3135aa0d Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 18 May 2024 13:14:10 -0500 Subject: [PATCH 04/13] Make fmt lint happy --- actix-http/src/h1/encoder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 90eef591..6fce4700 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -8,6 +8,7 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; +use super::big_bytes::BigBytes; use crate::{ body::BodySize, header::{ @@ -16,8 +17,6 @@ use crate::{ helpers, ConnectionType, RequestHeadType, Response, ServiceConfig, StatusCode, Version, }; -use super::big_bytes::BigBytes; - const AVERAGE_HEADER_SIZE: usize = 30; #[derive(Debug)] From 96f5ebb549bfbcab812523d55127f3c0e1cf44a5 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 16:08:55 -0500 Subject: [PATCH 05/13] Simplify bigbytes - always put bytes into queue --- actix-http/src/h1/big_bytes.rs | 30 +--- actix-http/src/h1/codec.rs | 6 +- actix-http/src/h1/dispatcher.rs | 2 + actix-http/src/h1/encoder.rs | 271 +------------------------------- 4 files changed, 13 insertions(+), 296 deletions(-) diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index 755b42b9..63fb78f0 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -2,8 +2,6 @@ use std::collections::VecDeque; use bytes::{Buf, BufMut, Bytes, BytesMut}; -const SIXTYFOUR_KB: usize = 1024 * 64; - pub(crate) struct BigBytes { buffer: BytesMut, frozen: VecDeque, @@ -32,13 +30,6 @@ impl BigBytes { &mut self.buffer } - // Reserve the requested size, if fewer than 64KB - pub(super) fn reserve(&mut self, count: usize) { - if count < SIXTYFOUR_KB { - self.buffer.reserve(count); - } - } - pub(super) fn total_len(&mut self) -> usize { self.frozen_len + self.buffer.len() } @@ -50,23 +41,14 @@ impl BigBytes { // Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a // queue, otherwise, it is added to a buffer. pub(super) fn put_bytes(&mut self, bytes: Bytes) { - if bytes.len() < SIXTYFOUR_KB { - self.buffer.extend_from_slice(&bytes); - } else { - if !self.buffer.is_empty() { - let current = self.buffer.split().freeze(); - self.frozen_len += current.len(); - self.frozen.push_back(current); - } - - self.frozen_len += bytes.len(); - self.frozen.push_back(bytes); + if !self.buffer.is_empty() { + let current = self.buffer.split().freeze(); + self.frozen_len += current.len(); + self.frozen.push_back(current); } - } - // Put a slice into the internal structure. This is always added to the internal buffer - pub(super) fn extend_from_slice(&mut self, slice: &[u8]) { - self.buffer.extend_from_slice(slice); + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); } // Returns a slice of the frontmost buffer diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index a6ac3f89..a648bd49 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -170,8 +170,8 @@ impl Codec { }; // encode message - self.encoder.encode_bigbytes( - dst, + self.encoder.encode( + dst.buffer_mut(), &mut res, self.flags.contains(Flags::HEAD), self.flags.contains(Flags::STREAM), @@ -187,7 +187,7 @@ impl Codec { } Message::Chunk(None) => { - self.encoder.encode_eof_bigbytes(dst)?; + self.encoder.encode_eof(dst.buffer_mut())?; } } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 30d8c61d..64a5d460 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -414,6 +414,7 @@ where fn send_continue(self: Pin<&mut Self>) { self.project() .write_buf + .buffer_mut() .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } @@ -575,6 +576,7 @@ where // to service call. Poll::Ready(Ok(req)) => { this.write_buf + .buffer_mut() .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 6fce4700..97ba0adf 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -50,183 +50,8 @@ pub(crate) trait MessageType: Sized { fn chunked(&self) -> bool; - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()>; fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()>; - fn encode_headers_bigbytes( - &mut self, - dst: &mut BigBytes, - version: Version, - mut length: BodySize, - conn_type: ConnectionType, - config: &ServiceConfig, - ) -> io::Result<()> { - let chunked = self.chunked(); - let mut skip_len = length != BodySize::Stream; - let camel_case = self.camel_case(); - - // Content length - if let Some(status) = self.status() { - match status { - StatusCode::CONTINUE - | StatusCode::SWITCHING_PROTOCOLS - | StatusCode::PROCESSING - | StatusCode::NO_CONTENT => { - // skip content-length and transfer-encoding headers - // see https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.1 - // and https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2 - skip_len = true; - length = BodySize::None - } - - StatusCode::NOT_MODIFIED => { - // 304 responses should never have a body but should retain a manually set - // content-length header - // see https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 - skip_len = false; - length = BodySize::None; - } - - _ => {} - } - } - - match length { - BodySize::Stream => { - if chunked { - skip_len = true; - if camel_case { - dst.extend_from_slice(b"\r\nTransfer-Encoding: chunked\r\n") - } else { - dst.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") - } - } else { - skip_len = false; - dst.extend_from_slice(b"\r\n"); - } - } - BodySize::Sized(0) if camel_case => dst.extend_from_slice(b"\r\nContent-Length: 0\r\n"), - BodySize::Sized(0) => dst.extend_from_slice(b"\r\ncontent-length: 0\r\n"), - BodySize::Sized(len) => { - helpers::write_content_length(len, dst.buffer_mut(), camel_case) - } - BodySize::None => dst.extend_from_slice(b"\r\n"), - } - - // Connection - match conn_type { - ConnectionType::Upgrade => dst.extend_from_slice(b"connection: upgrade\r\n"), - ConnectionType::KeepAlive if version < Version::HTTP_11 => { - if camel_case { - dst.extend_from_slice(b"Connection: keep-alive\r\n") - } else { - dst.extend_from_slice(b"connection: keep-alive\r\n") - } - } - ConnectionType::Close if version >= Version::HTTP_11 => { - if camel_case { - dst.extend_from_slice(b"Connection: close\r\n") - } else { - dst.extend_from_slice(b"connection: close\r\n") - } - } - _ => {} - } - - // write headers - - let mut has_date = false; - - let dst = dst.buffer_mut(); - - let mut buf = dst.chunk_mut().as_mut_ptr(); - let mut remaining = dst.capacity() - dst.len(); - - // tracks bytes written since last buffer resize - // since buf is a raw pointer to a bytes container storage but is written to without the - // container's knowledge, this is used to sync the containers cursor after data is written - let mut pos = 0; - - self.write_headers(|key, value| { - match *key { - CONNECTION => return, - TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return, - DATE => has_date = true, - _ => {} - } - - let k = key.as_str().as_bytes(); - let k_len = k.len(); - - for val in value.iter() { - let v = val.as_ref(); - let v_len = v.len(); - - // key length + value length + colon + space + \r\n - let len = k_len + v_len + 4; - - if len > remaining { - // SAFETY: all the bytes written up to position "pos" are initialized - // the written byte count and pointer advancement are kept in sync - unsafe { - dst.advance_mut(pos); - } - - pos = 0; - dst.reserve(len * 2); - remaining = dst.capacity() - dst.len(); - - // re-assign buf raw pointer since it's possible that the buffer was - // reallocated and/or resized - buf = dst.chunk_mut().as_mut_ptr(); - } - - // SAFETY: on each write, it is enough to ensure that the advancement of - // the cursor matches the number of bytes written - unsafe { - if camel_case { - // use Camel-Case headers - write_camel_case(k, buf, k_len); - } else { - write_data(k, buf, k_len); - } - - buf = buf.add(k_len); - - write_data(b": ", buf, 2); - buf = buf.add(2); - - write_data(v, buf, v_len); - buf = buf.add(v_len); - - write_data(b"\r\n", buf, 2); - buf = buf.add(2); - }; - - pos += len; - remaining -= len; - } - }); - - // final cursor synchronization with the bytes container - // - // SAFETY: all the bytes written up to position "pos" are initialized - // the written byte count and pointer advancement are kept in sync - unsafe { - dst.advance_mut(pos); - } - - if !has_date { - // optimized date header, write_date_header writes its own \r\n - config.write_date_header(dst, camel_case); - } - - // end-of-headers marker - dst.extend_from_slice(b"\r\n"); - - Ok(()) - } - fn encode_headers( &mut self, dst: &mut BytesMut, @@ -439,17 +264,6 @@ impl MessageType for Response<()> { .contains(crate::message::Flags::CAMEL_CASE) } - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { - let head = self.head(); - let reason = head.reason().as_bytes(); - dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE + reason.len()); - - // status line - helpers::write_status_line(head.version, head.status.as_u16(), dst.buffer_mut()); - dst.extend_from_slice(reason); - Ok(()) - } - fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.head(); let reason = head.reason().as_bytes(); @@ -483,26 +297,6 @@ impl MessageType for RequestHeadType { self.extra_headers() } - fn encode_status_bigbytes(&mut self, dst: &mut BigBytes) -> io::Result<()> { - let head = self.as_ref(); - dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); - write!( - helpers::MutWriter(dst.buffer_mut()), - "{} {} {}", - head.method, - head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), - match head.version { - Version::HTTP_09 => "HTTP/0.9", - Version::HTTP_10 => "HTTP/1.0", - Version::HTTP_11 => "HTTP/1.1", - Version::HTTP_2 => "HTTP/2.0", - Version::HTTP_3 => "HTTP/3.0", - _ => return Err(io::Error::new(io::ErrorKind::Other, "unsupported version")), - } - ) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - } - fn encode_status(&mut self, dst: &mut BytesMut) -> io::Result<()> { let head = self.as_ref(); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); @@ -543,44 +337,6 @@ impl MessageEncoder { self.te.encode_eof(buf) } - pub(super) fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { - self.te.encode_eof_bigbytes(buf) - } - - /// Encode message. - pub(super) fn encode_bigbytes( - &mut self, - dst: &mut BigBytes, - message: &mut T, - head: bool, - stream: bool, - version: Version, - length: BodySize, - conn_type: ConnectionType, - config: &ServiceConfig, - ) -> io::Result<()> { - // transfer encoding - if !head { - self.te = match length { - BodySize::Sized(0) => TransferEncoding::empty(), - BodySize::Sized(len) => TransferEncoding::length(len), - BodySize::Stream => { - if message.chunked() && !stream { - TransferEncoding::chunked() - } else { - TransferEncoding::eof() - } - } - BodySize::None => TransferEncoding::empty(), - }; - } else { - self.te = TransferEncoding::empty(); - } - - message.encode_status_bigbytes(dst)?; - message.encode_headers_bigbytes(dst, version, length, conn_type, config) - } - /// Encode message. pub fn encode( &mut self, @@ -683,14 +439,13 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - buf.extend_from_slice(b"0\r\n\r\n"); + buf.buffer_mut().extend_from_slice(b"0\r\n\r\n"); } else { writeln!(helpers::MutWriter(buf.buffer_mut()), "{:X}\r", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - buf.reserve(msg.len() + 2); buf.put_bytes(msg); - buf.extend_from_slice(b"\r\n"); + buf.buffer_mut().extend_from_slice(b"\r\n"); } Ok(*eof) } @@ -757,28 +512,6 @@ impl TransferEncoding { } } - /// Encode eof. Return `EOF` state of encoder - #[inline] - pub fn encode_eof_bigbytes(&mut self, buf: &mut BigBytes) -> io::Result<()> { - match self.kind { - TransferEncodingKind::Eof => Ok(()), - TransferEncodingKind::Length(rem) => { - if rem != 0 { - Err(io::Error::new(io::ErrorKind::UnexpectedEof, "")) - } else { - Ok(()) - } - } - TransferEncodingKind::Chunked(ref mut eof) => { - if !*eof { - *eof = true; - buf.extend_from_slice(b"0\r\n\r\n"); - } - Ok(()) - } - } - } - /// Encode eof. Return `EOF` state of encoder #[inline] pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { From 8eb1d10bae954b95c3e3592b00652a7d5e2b3c1c Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 16:17:44 -0500 Subject: [PATCH 06/13] Don't add empty bytes to queue --- actix-http/src/h1/big_bytes.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index 63fb78f0..b845d917 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -47,8 +47,10 @@ impl BigBytes { self.frozen.push_back(current); } - self.frozen_len += bytes.len(); - self.frozen.push_back(bytes); + if !bytes.is_empty() { + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); + } } // Returns a slice of the frontmost buffer From 42bd5eebdb0b55ad388e4a064e653da98ca6028a Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 16:34:43 -0500 Subject: [PATCH 07/13] Add buffer heuristic --- actix-http/src/h1/encoder.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 97ba0adf..75aa8a82 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -429,7 +429,11 @@ impl TransferEncoding { match self.kind { TransferEncodingKind::Eof => { let eof = msg.is_empty(); - buf.put_bytes(msg); + if msg.len() > 1024 * 64 { + buf.put_bytes(msg); + } else { + buf.buffer_mut().extend_from_slice(&msg); + } Ok(eof) } TransferEncodingKind::Chunked(ref mut eof) => { @@ -444,7 +448,12 @@ impl TransferEncoding { writeln!(helpers::MutWriter(buf.buffer_mut()), "{:X}\r", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - buf.put_bytes(msg); + if msg.len() > 1024 * 64 { + buf.put_bytes(msg); + } else { + buf.buffer_mut().reserve(msg.len() + 2); + buf.buffer_mut().extend_from_slice(&msg); + } buf.buffer_mut().extend_from_slice(b"\r\n"); } Ok(*eof) @@ -456,7 +465,11 @@ impl TransferEncoding { } let len = cmp::min(*remaining, msg.len() as u64); - buf.put_bytes(msg.slice(..len as usize)); + if len > 1024 * 64 { + buf.put_bytes(msg.slice(..len as usize)); + } else { + buf.buffer_mut().extend_from_slice(&msg[..len as usize]); + } *remaining -= len; Ok(*remaining == 0) From a94b5b89a566bdce2c727beb9b710634202af4c2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 16:52:59 -0500 Subject: [PATCH 08/13] Add other endpoints to actix web example --- actix-http/examples/actix-web.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/actix-http/examples/actix-web.rs b/actix-http/examples/actix-web.rs index 449e5899..75578e51 100644 --- a/actix-http/examples/actix-web.rs +++ b/actix-http/examples/actix-web.rs @@ -1,19 +1,33 @@ +use std::sync::OnceLock; + use actix_http::HttpService; use actix_server::Server; use actix_service::map_config; use actix_web::{dev::AppConfig, get, App}; +static LARGE: OnceLock = OnceLock::new(); + #[get("/")] async fn index() -> &'static str { "Hello, world. From Actix Web!" } +#[get("/large")] +async fn large() -> &'static str { + LARGE.get_or_init(|| "123456890".repeat(1024 * 10)) +} + +#[get("/medium")] +async fn medium() -> &'static str { + LARGE.get_or_init(|| "123456890".repeat(1024 * 5)) +} + #[tokio::main(flavor = "current_thread")] async fn main() -> std::io::Result<()> { Server::build() .bind("hello-world", "127.0.0.1:8080", || { // construct actix-web app - let app = App::new().service(index); + let app = App::new().service(index).service(large).service(medium); HttpService::build() // pass the app to service builder From 69ca0e7c572b212da2f35295c88494fc9c2aa4f2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 17:06:05 -0500 Subject: [PATCH 09/13] Separate medium & large, make large bigger --- actix-http/examples/actix-web.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/actix-http/examples/actix-web.rs b/actix-http/examples/actix-web.rs index 75578e51..b1feffbd 100644 --- a/actix-http/examples/actix-web.rs +++ b/actix-http/examples/actix-web.rs @@ -5,6 +5,7 @@ use actix_server::Server; use actix_service::map_config; use actix_web::{dev::AppConfig, get, App}; +static MEDIUM: OnceLock = OnceLock::new(); static LARGE: OnceLock = OnceLock::new(); #[get("/")] @@ -14,12 +15,12 @@ async fn index() -> &'static str { #[get("/large")] async fn large() -> &'static str { - LARGE.get_or_init(|| "123456890".repeat(1024 * 10)) + LARGE.get_or_init(|| "123456890".repeat(1024 * 100)) } #[get("/medium")] async fn medium() -> &'static str { - LARGE.get_or_init(|| "123456890".repeat(1024 * 5)) + MEDIUM.get_or_init(|| "123456890".repeat(1024 * 5)) } #[tokio::main(flavor = "current_thread")] From 94c5d4d641cd7073fab05991fac2fe132808de88 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 19 May 2024 19:24:37 -0500 Subject: [PATCH 10/13] Drop buffers in clear if 'too big' --- actix-http/src/h1/big_bytes.rs | 12 +++++++++++- actix-http/src/h1/dispatcher.rs | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs index b845d917..04ae8a6c 100644 --- a/actix-http/src/h1/big_bytes.rs +++ b/actix-http/src/h1/big_bytes.rs @@ -2,6 +2,9 @@ use std::collections::VecDeque; use bytes::{Buf, BufMut, Bytes, BytesMut}; +// 64KB max capacity (arbitrarily chosen) +const MAX_CAPACITY: usize = 1024 * 64; + pub(crate) struct BigBytes { buffer: BytesMut, frozen: VecDeque, @@ -18,10 +21,17 @@ impl BigBytes { } // Clear the internal queue and buffer, resetting length to zero - pub(super) fn clear(&mut self) { + // + // if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will + // be freed and a new buffer of capacity `new_capacity` will be allocated + pub(super) fn clear(&mut self, new_capacity: usize) { std::mem::take(&mut self.frozen); self.frozen_len = 0; self.buffer.clear(); + + if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) { + self.buffer = BytesMut::with_capacity(new_capacity); + } } // Return a mutable reference to the underlying buffer. This should only be used when dealing diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 64a5d460..d1bff8db 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -347,7 +347,7 @@ where } // everything has written to I/O; clear buffer - write_buf.clear(); + write_buf.clear(HW_BUFFER_SIZE); // flush the I/O and check if get blocked io.poll_flush(cx) From 627d11332379f4e718ceee5d7ddde76171f2a6f3 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Nov 2024 13:06:47 -0600 Subject: [PATCH 11/13] Expand BigBytes usage to ws encoding --- actix-http/src/big_bytes.rs | 124 ++++++++++++++++++++++++++++++++ actix-http/src/h1/big_bytes.rs | 105 --------------------------- actix-http/src/h1/codec.rs | 6 +- actix-http/src/h1/dispatcher.rs | 2 +- actix-http/src/h1/encoder.rs | 2 +- actix-http/src/h1/mod.rs | 1 - actix-http/src/lib.rs | 1 + actix-http/src/ws/codec.rs | 60 ++++++++++------ actix-http/src/ws/frame.rs | 79 ++++++++++++++------ 9 files changed, 227 insertions(+), 153 deletions(-) create mode 100644 actix-http/src/big_bytes.rs delete mode 100644 actix-http/src/h1/big_bytes.rs diff --git a/actix-http/src/big_bytes.rs b/actix-http/src/big_bytes.rs new file mode 100644 index 00000000..49839ab4 --- /dev/null +++ b/actix-http/src/big_bytes.rs @@ -0,0 +1,124 @@ +use std::collections::VecDeque; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +// 64KB max capacity (arbitrarily chosen) +const MAX_CAPACITY: usize = 1024 * 64; + +pub struct BigBytes { + buffer: BytesMut, + frozen: VecDeque, + frozen_len: usize, +} + +impl BigBytes { + /// Initialize a new BigBytes with the internal buffer set to `capacity` capacity + pub fn with_capacity(capacity: usize) -> Self { + Self { + buffer: BytesMut::with_capacity(capacity), + frozen: VecDeque::default(), + frozen_len: 0, + } + } + + /// Clear the internal queue and buffer, resetting length to zero + /// + /// if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will + /// be freed and a new buffer of capacity `new_capacity` will be allocated + pub fn clear(&mut self, new_capacity: usize) { + std::mem::take(&mut self.frozen); + self.frozen_len = 0; + self.buffer.clear(); + + if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) { + self.buffer = BytesMut::with_capacity(new_capacity); + } + } + + /// Return a mutable reference to the underlying buffer. This should only be used when dealing + /// with small allocations (e.g. writing headers) + pub fn buffer_mut(&mut self) -> &mut BytesMut { + &mut self.buffer + } + + /// Return the total length of the bytes stored in BigBytes + pub fn total_len(&mut self) -> usize { + self.frozen_len + self.buffer.len() + } + + /// Return whether there are no bytes present in the BigBytes + pub fn is_empty(&self) -> bool { + self.frozen_len == 0 && self.buffer.is_empty() + } + + /// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a + /// queue, otherwise, it is added to a buffer. + pub fn put_bytes(&mut self, bytes: Bytes) { + if !self.buffer.is_empty() { + let current = self.buffer.split().freeze(); + self.frozen_len += current.len(); + self.frozen.push_back(current); + } + + if !bytes.is_empty() { + self.frozen_len += bytes.len(); + self.frozen.push_back(bytes); + } + } + + /// Returns a slice of the frontmost buffer + /// + /// While there are bytes present in BigBytes, front_slice is guaranteed not to return an empty + /// slice. + pub fn front_slice(&self) -> &[u8] { + if let Some(front) = self.frozen.front() { + front + } else { + &self.buffer + } + } + + /// Advances the first buffer by `count` bytes. If the first buffer is advanced to completion, + /// it is popped from the queue + pub fn advance(&mut self, count: usize) { + if let Some(front) = self.frozen.front_mut() { + front.advance(count); + + if front.is_empty() { + self.frozen.pop_front(); + } + + self.frozen_len -= count; + } else { + self.buffer.advance(count); + } + } + + /// Pops the front Bytes from the BigBytes, or splits and freezes the internal buffer if no + /// Bytes are present. + pub fn pop_front(&mut self) -> Option { + if let Some(front) = self.frozen.pop_front() { + self.frozen_len -= front.len(); + Some(front) + } else if !self.buffer.is_empty() { + Some(self.buffer.split().freeze()) + } else { + None + } + } + + /// Drain the BigBytes, writing everything into the provided BytesMut + pub fn write_to(&mut self, dst: &mut BytesMut) { + dst.reserve(self.total_len()); + + for buf in &self.frozen { + dst.put_slice(buf); + } + + dst.put_slice(&self.buffer.split()); + + self.frozen_len = 0; + + std::mem::take(&mut self.frozen); + } +} diff --git a/actix-http/src/h1/big_bytes.rs b/actix-http/src/h1/big_bytes.rs deleted file mode 100644 index 04ae8a6c..00000000 --- a/actix-http/src/h1/big_bytes.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::collections::VecDeque; - -use bytes::{Buf, BufMut, Bytes, BytesMut}; - -// 64KB max capacity (arbitrarily chosen) -const MAX_CAPACITY: usize = 1024 * 64; - -pub(crate) struct BigBytes { - buffer: BytesMut, - frozen: VecDeque, - frozen_len: usize, -} - -impl BigBytes { - pub(super) fn with_capacity(capacity: usize) -> Self { - Self { - buffer: BytesMut::with_capacity(capacity), - frozen: VecDeque::default(), - frozen_len: 0, - } - } - - // Clear the internal queue and buffer, resetting length to zero - // - // if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will - // be freed and a new buffer of capacity `new_capacity` will be allocated - pub(super) fn clear(&mut self, new_capacity: usize) { - std::mem::take(&mut self.frozen); - self.frozen_len = 0; - self.buffer.clear(); - - if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) { - self.buffer = BytesMut::with_capacity(new_capacity); - } - } - - // Return a mutable reference to the underlying buffer. This should only be used when dealing - // with small allocations (e.g. writing headers) - pub(super) fn buffer_mut(&mut self) -> &mut BytesMut { - &mut self.buffer - } - - pub(super) fn total_len(&mut self) -> usize { - self.frozen_len + self.buffer.len() - } - - pub(super) fn is_empty(&self) -> bool { - self.frozen_len == 0 && self.buffer.is_empty() - } - - // Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a - // queue, otherwise, it is added to a buffer. - pub(super) fn put_bytes(&mut self, bytes: Bytes) { - if !self.buffer.is_empty() { - let current = self.buffer.split().freeze(); - self.frozen_len += current.len(); - self.frozen.push_back(current); - } - - if !bytes.is_empty() { - self.frozen_len += bytes.len(); - self.frozen.push_back(bytes); - } - } - - // Returns a slice of the frontmost buffer - pub(super) fn front_slice(&self) -> &[u8] { - if let Some(front) = self.frozen.front() { - front - } else { - &self.buffer - } - } - - // Advances the first buffer by `count` bytes. If the first buffer is advanced to completion, - // it is popped from the queue - pub(super) fn advance(&mut self, count: usize) { - if let Some(front) = self.frozen.front_mut() { - front.advance(count); - - if front.is_empty() { - self.frozen.pop_front(); - } - - self.frozen_len -= count; - } else { - self.buffer.advance(count); - } - } - - // Drain the BibBytes, writing everything into the provided BytesMut - pub(super) fn write_to(&mut self, dst: &mut BytesMut) { - dst.reserve(self.total_len()); - - for buf in &self.frozen { - dst.put_slice(buf); - } - - dst.put_slice(&self.buffer.split()); - - self.frozen_len = 0; - - std::mem::take(&mut self.frozen); - } -} diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index a648bd49..b097ddf9 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -6,11 +6,13 @@ use http::{Method, Version}; use tokio_util::codec::{Decoder, Encoder}; use super::{ - big_bytes::BigBytes, decoder::{self, PayloadDecoder, PayloadItem, PayloadType}, encoder, Message, MessageType, }; -use crate::{body::BodySize, error::ParseError, ConnectionType, Request, Response, ServiceConfig}; +use crate::{ + big_bytes::BigBytes, body::BodySize, error::ParseError, ConnectionType, Request, Response, + ServiceConfig, +}; bitflags! { #[derive(Debug, Clone, Copy)] diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index d1bff8db..40b47293 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -20,7 +20,6 @@ use tokio_util::codec::Decoder as _; use tracing::{error, trace}; use super::{ - big_bytes::BigBytes, codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, @@ -28,6 +27,7 @@ use super::{ Message, MessageType, }; use crate::{ + big_bytes::BigBytes, body::{BodySize, BoxBody, MessageBody}, config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 75aa8a82..42d7d677 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -8,8 +8,8 @@ use std::{ use bytes::{BufMut, Bytes, BytesMut}; -use super::big_bytes::BigBytes; use crate::{ + big_bytes::BigBytes, body::BodySize, header::{ map::Value, HeaderMap, HeaderName, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 267b2012..9e44608d 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -2,7 +2,6 @@ use bytes::{Bytes, BytesMut}; -mod big_bytes; mod chunked; mod client; mod codec; diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index f9697c4d..5d287a17 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -33,6 +33,7 @@ pub use http::{uri, uri::Uri, Method, StatusCode, Version}; +pub mod big_bytes; pub mod body; mod builder; mod config; diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index ad487e40..0a2c8996 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -4,6 +4,8 @@ use bytestring::ByteString; use tokio_util::codec::{Decoder, Encoder}; use tracing::error; +use crate::big_bytes::BigBytes; + use super::{ frame::Parser, proto::{CloseReason, OpCode}, @@ -116,51 +118,55 @@ impl Default for Codec { } } -impl Encoder for Codec { - type Error = ProtocolError; - - fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { +impl Codec { + pub fn encode_bigbytes( + &mut self, + item: Message, + dst: &mut BigBytes, + ) -> Result<(), ProtocolError> { match item { - Message::Text(txt) => Parser::write_message( + Message::Text(txt) => Parser::write_message_bigbytes( dst, - txt, + txt.into_bytes(), OpCode::Text, true, !self.flags.contains(Flags::SERVER), ), - Message::Binary(bin) => Parser::write_message( + Message::Binary(bin) => Parser::write_message_bigbytes( dst, bin, OpCode::Binary, true, !self.flags.contains(Flags::SERVER), ), - Message::Ping(txt) => Parser::write_message( + Message::Ping(txt) => Parser::write_message_bigbytes( dst, txt, OpCode::Ping, true, !self.flags.contains(Flags::SERVER), ), - Message::Pong(txt) => Parser::write_message( + Message::Pong(txt) => Parser::write_message_bigbytes( dst, txt, OpCode::Pong, true, !self.flags.contains(Flags::SERVER), ), - Message::Close(reason) => { - Parser::write_close(dst, reason, !self.flags.contains(Flags::SERVER)) - } + Message::Close(reason) => Parser::write_close( + dst.buffer_mut(), + reason, + !self.flags.contains(Flags::SERVER), + ), Message::Continuation(cont) => match cont { Item::FirstText(data) => { if self.flags.contains(Flags::W_CONTINUATION) { return Err(ProtocolError::ContinuationStarted); } else { self.flags.insert(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Text, false, !self.flags.contains(Flags::SERVER), @@ -172,9 +178,9 @@ impl Encoder for Codec { return Err(ProtocolError::ContinuationStarted); } else { self.flags.insert(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Binary, false, !self.flags.contains(Flags::SERVER), @@ -183,9 +189,9 @@ impl Encoder for Codec { } Item::Continue(data) => { if self.flags.contains(Flags::W_CONTINUATION) { - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Continue, false, !self.flags.contains(Flags::SERVER), @@ -197,9 +203,9 @@ impl Encoder for Codec { Item::Last(data) => { if self.flags.contains(Flags::W_CONTINUATION) { self.flags.remove(Flags::W_CONTINUATION); - Parser::write_message( + Parser::write_message_bigbytes( dst, - &data[..], + data, OpCode::Continue, true, !self.flags.contains(Flags::SERVER), @@ -215,6 +221,20 @@ impl Encoder for Codec { } } +impl Encoder for Codec { + type Error = ProtocolError; + + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut big_bytes = BigBytes::with_capacity(0); + + self.encode_bigbytes(item, &mut big_bytes)?; + + big_bytes.write_to(dst); + + Ok(()) + } +} + impl Decoder for Codec { type Item = Frame; type Error = ProtocolError; diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index c9fb0cde..a70063a4 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -1,8 +1,10 @@ use std::cmp::min; -use bytes::{Buf, BufMut, BytesMut}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use tracing::debug; +use crate::big_bytes::BigBytes; + use super::{ mask::apply_mask, proto::{CloseCode, CloseReason, OpCode}, @@ -156,21 +158,19 @@ impl Parser { } } - /// Generate binary representation - pub fn write_message>( - dst: &mut BytesMut, - pl: B, + pub fn write_message_bigbytes( + dst: &mut BigBytes, + pl: Bytes, op: OpCode, fin: bool, mask: bool, ) { - let payload = pl.as_ref(); let one: u8 = if fin { 0x80 | Into::::into(op) } else { op.into() }; - let payload_len = payload.len(); + let payload_len = pl.len(); let (two, p_len) = if mask { (0x80, payload_len + 4) } else { @@ -178,29 +178,50 @@ impl Parser { }; if payload_len < 126 { - dst.reserve(p_len + 2 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | payload_len as u8]); + dst.buffer_mut() + .reserve(p_len + 2 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | payload_len as u8]); } else if payload_len <= 65_535 { - dst.reserve(p_len + 4 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | 126]); - dst.put_u16(payload_len as u16); + dst.buffer_mut() + .reserve(p_len + 4 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | 126]); + dst.buffer_mut().put_u16(payload_len as u16); } else { - dst.reserve(p_len + 10 + if mask { 4 } else { 0 }); - dst.put_slice(&[one, two | 127]); - dst.put_u64(payload_len as u64); + dst.buffer_mut() + .reserve(p_len + 10 + if mask { 4 } else { 0 }); + dst.buffer_mut().put_slice(&[one, two | 127]); + dst.buffer_mut().put_u64(payload_len as u64); }; if mask { let mask = rand::random::<[u8; 4]>(); - dst.put_slice(mask.as_ref()); - dst.put_slice(payload.as_ref()); - let pos = dst.len() - payload_len; - apply_mask(&mut dst[pos..], mask); + dst.buffer_mut().put_slice(mask.as_ref()); + + match pl.try_into_mut() { + Ok(mut pl_mut) => { + apply_mask(&mut pl_mut, mask); + dst.put_bytes(pl_mut.freeze()); + } + Err(pl) => { + dst.buffer_mut().put_slice(pl.as_ref()); + let pos = dst.buffer_mut().len() - payload_len; + apply_mask(&mut dst.buffer_mut()[pos..], mask); + } + } } else { - dst.put_slice(payload.as_ref()); + dst.put_bytes(pl) } } + /// Generate binary representation + pub fn write_message(dst: &mut BytesMut, pl: Bytes, op: OpCode, fin: bool, mask: bool) { + let mut big_bytes = BigBytes::with_capacity(0); + + Self::write_message_bigbytes(&mut big_bytes, pl, op, fin, mask); + + big_bytes.write_to(dst); + } + /// Create a new Close control frame. #[inline] pub fn write_close(dst: &mut BytesMut, reason: Option, mask: bool) { @@ -215,7 +236,7 @@ impl Parser { } }; - Parser::write_message(dst, payload, OpCode::Close, true, mask) + Parser::write_message(dst, Bytes::from(payload), OpCode::Close, true, mask) } } @@ -368,7 +389,13 @@ mod tests { #[test] fn test_ping_frame() { let mut buf = BytesMut::new(); - Parser::write_message(&mut buf, Vec::from("data"), OpCode::Ping, true, false); + Parser::write_message( + &mut buf, + Bytes::from(Vec::from("data")), + OpCode::Ping, + true, + false, + ); let mut v = vec![137u8, 4u8]; v.extend(b"data"); @@ -378,7 +405,13 @@ mod tests { #[test] fn test_pong_frame() { let mut buf = BytesMut::new(); - Parser::write_message(&mut buf, Vec::from("data"), OpCode::Pong, true, false); + Parser::write_message( + &mut buf, + Bytes::from(Vec::from("data")), + OpCode::Pong, + true, + false, + ); let mut v = vec![138u8, 4u8]; v.extend(b"data"); From ab04286b01735a10f4981cad400f9d7ed326e36b Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Nov 2024 17:33:02 -0600 Subject: [PATCH 12/13] Ensure bytes >=1.7 --- actix-http/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index a999e73c..c2551713 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -87,7 +87,7 @@ actix-rt = { version = "2.2", default-features = false } ahash = "0.8" bitflags = "2" -bytes = "1" +bytes = "1.7" bytestring = "1" derive_more = "0.99.5" encoding_rs = "0.8" From 36a7e8cc6a20c5b8a1ea605f91e2cbbf94d001a7 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 4 Nov 2024 17:58:37 -0600 Subject: [PATCH 13/13] Don't reserve unless we need to --- actix-http/src/ws/frame.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index c95c1508..22c0eee9 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -171,21 +171,17 @@ impl Parser { op.into() }; let payload_len = pl.len(); - let (two, p_len) = if mask { - (0x80, payload_len + 4) - } else { - (0, payload_len) - }; + let two = if mask { 0x80 } else { 0 }; if payload_len < 126 { - dst.buffer_mut().reserve(p_len + 2); + dst.buffer_mut().reserve(2); dst.buffer_mut().put_slice(&[one, two | payload_len as u8]); } else if payload_len <= 65_535 { - dst.buffer_mut().reserve(p_len + 4); + dst.buffer_mut().reserve(4); dst.buffer_mut().put_slice(&[one, two | 126]); dst.buffer_mut().put_u16(payload_len as u16); } else { - dst.buffer_mut().reserve(p_len + 10); + dst.buffer_mut().reserve(10); dst.buffer_mut().put_slice(&[one, two | 127]); dst.buffer_mut().put_u64(payload_len as u64); }; @@ -195,11 +191,16 @@ impl Parser { dst.buffer_mut().put_slice(mask.as_ref()); match pl.try_into_mut() { + // Avoid copying bytes by mutating in-place Ok(mut pl_mut) => { apply_mask(&mut pl_mut, mask); dst.put_bytes(pl_mut.freeze()); } + + // We need to copy the bytes anyway at this point, so put them in the buffer + // directly Err(pl) => { + dst.buffer_mut().reserve(pl.len()); dst.buffer_mut().put_slice(pl.as_ref()); let pos = dst.buffer_mut().len() - payload_len; apply_mask(&mut dst.buffer_mut()[pos..], mask);