From c2f820f6b56d829b43b6d41b231ad7eb8177721b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 14 Jan 2018 18:55:11 -0800 Subject: [PATCH] iovec experiments --- Cargo.toml | 1 + src/lib.rs | 1 + src/server/encoding.rs | 34 ++++++------ src/server/h1.rs | 5 +- src/server/h1writer.rs | 46 +++++++++------- src/server/h2.rs | 3 +- src/server/h2writer.rs | 24 ++++----- src/server/shared.rs | 117 ++++++++++++++++++++++++++++++++++++++++- 8 files changed, 177 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6c04a4acd..b7b02fd4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ cookie = { version="0.10", features=["percent-encode", "secure"] } mio = "0.6" net2 = "0.2" bytes = "0.4" +iovec = "0.1" futures = "0.1" tokio-io = "0.1" tokio-core = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 44d4d1518..5ccac0e16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ extern crate log; extern crate time; extern crate bytes; +extern crate iovec; extern crate sha1; extern crate regex; #[macro_use] diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 1c6ed7d76..0f3469fbd 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -20,7 +20,7 @@ use httprequest::HttpMessage; use httpresponse::HttpResponse; use payload::{PayloadSender, PayloadWriter}; -use super::shared::SharedBytes; +use super::shared::{SharedIo, SharedBytes}; impl ContentEncoding { @@ -339,11 +339,11 @@ pub(crate) struct PayloadEncoder(ContentEncoder); impl PayloadEncoder { - pub fn empty(bytes: SharedBytes) -> PayloadEncoder { + pub fn empty(bytes: SharedIo) -> PayloadEncoder { PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes))) } - pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder { + pub fn new(buf: SharedIo, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder { let version = resp.version().unwrap_or_else(|| req.version); let mut body = resp.replace_body(Body::Empty); let has_body = match body { @@ -385,7 +385,7 @@ impl PayloadEncoder { }, Body::Binary(ref mut bytes) => { if encoding.is_compression() { - let tmp = SharedBytes::default(); + let tmp = SharedIo::default(); let transfer = TransferEncoding::eof(tmp.clone()); let mut enc = match encoding { ContentEncoding::Deflate => ContentEncoder::Deflate( @@ -441,7 +441,7 @@ impl PayloadEncoder { ) } - fn streaming_encoding(buf: SharedBytes, version: Version, + fn streaming_encoding(buf: SharedIo, version: Version, resp: &mut HttpResponse) -> TransferEncoding { match resp.chunked() { Some(true) => { @@ -550,7 +550,7 @@ impl ContentEncoder { #[inline(always)] pub fn write_eof(&mut self) -> Result<(), io::Error> { let encoder = mem::replace( - self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty()))); + self, ContentEncoder::Identity(TransferEncoding::eof(SharedIo::default()))); match encoder { ContentEncoder::Br(encoder) => { @@ -637,7 +637,7 @@ impl ContentEncoder { #[derive(Debug, Clone)] pub(crate) struct TransferEncoding { kind: TransferEncodingKind, - buffer: SharedBytes, + buffer: SharedIo, } #[derive(Debug, PartialEq, Clone)] @@ -657,7 +657,7 @@ enum TransferEncodingKind { impl TransferEncoding { #[inline] - pub fn eof(bytes: SharedBytes) -> TransferEncoding { + pub fn eof(bytes: SharedIo) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Eof, buffer: bytes, @@ -665,7 +665,7 @@ impl TransferEncoding { } #[inline] - pub fn chunked(bytes: SharedBytes) -> TransferEncoding { + pub fn chunked(bytes: SharedIo) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Chunked(false), buffer: bytes, @@ -673,7 +673,7 @@ impl TransferEncoding { } #[inline] - pub fn length(len: u64, bytes: SharedBytes) -> TransferEncoding { + pub fn length(len: u64, bytes: SharedIo) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Length(len), buffer: bytes, @@ -695,7 +695,7 @@ impl TransferEncoding { match self.kind { TransferEncodingKind::Eof => { let eof = msg.is_empty(); - self.buffer.extend(msg); + self.buffer.push(msg); Ok(eof) }, TransferEncodingKind::Chunked(ref mut eof) => { @@ -705,14 +705,14 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + self.buffer.push((&b"0\r\n\r\n"[..]).into()); } else { let mut buf = BytesMut::new(); write!(&mut buf, "{:X}\r\n", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - self.buffer.extend(buf.into()); - self.buffer.extend(msg); - self.buffer.extend_from_slice(b"\r\n"); + self.buffer.push(buf.into()); + self.buffer.push(msg); + self.buffer.push((&b"\r\n"[..]).into()); } Ok(*eof) }, @@ -721,7 +721,7 @@ impl TransferEncoding { return Ok(*remaining == 0) } let max = cmp::min(*remaining, msg.len() as u64); - self.buffer.extend(msg.take().split_to(max as usize).into()); + self.buffer.push(msg.take().split_to(max as usize).into()); *remaining -= max as u64; Ok(*remaining == 0) @@ -737,7 +737,7 @@ impl TransferEncoding { TransferEncodingKind::Chunked(ref mut eof) => { if !*eof { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + self.buffer.push(Binary::from(&b"0\r\n\r\n"[..])); } }, } diff --git a/src/server/h1.rs b/src/server/h1.rs index 67ec26372..0df8a2765 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -19,6 +19,7 @@ use error::{ParseError, PayloadError, ResponseError}; use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE}; use super::{utils, Writer}; +use super::shared::SharedIo; use super::h1writer::H1Writer; use super::encoding::PayloadType; use super::settings::WorkerSettings; @@ -65,11 +66,11 @@ impl Http1 pub fn new(h: Rc>, stream: T, addr: Option, buf: BytesMut) -> Self { - let bytes = h.get_shared_bytes(); + // let bytes = h.get_shared_bytes(); Http1{ flags: Flags::KEEPALIVE, settings: h, addr: addr, - stream: H1Writer::new(stream, bytes), + stream: H1Writer::new(stream, SharedIo::default()), reader: Reader::new(), read_buf: buf, tasks: VecDeque::new(), diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 7f18170fe..2f605bc59 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -1,5 +1,5 @@ use std::io; -use bytes::BufMut; +use bytes::{Buf, BufMut, BytesMut}; use futures::{Async, Poll}; use tokio_io::AsyncWrite; use http::Version; @@ -10,7 +10,7 @@ use body::{Body, Binary}; use httprequest::HttpMessage; use httpresponse::HttpResponse; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; -use super::shared::SharedBytes; +use super::shared::{SharedBytes, SharedIo}; use super::encoding::PayloadEncoder; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -30,12 +30,12 @@ pub(crate) struct H1Writer { encoder: PayloadEncoder, written: u64, headers_size: u32, - buffer: SharedBytes, + buffer: SharedIo, } impl H1Writer { - pub fn new(stream: T, buf: SharedBytes) -> H1Writer { + pub fn new(stream: T, buf: SharedIo) -> H1Writer { H1Writer { flags: Flags::empty(), stream: stream, @@ -56,7 +56,7 @@ impl H1Writer { } pub fn disconnected(&mut self) { - self.buffer.take(); + self.buffer.clear(); } pub fn keepalive(&self) -> bool { @@ -65,17 +65,19 @@ impl H1Writer { fn write_to_stream(&mut self) -> io::Result { while !self.buffer.is_empty() { - match self.stream.write(self.buffer.as_ref()) { - Ok(n) => { - let _ = self.buffer.split_to(n); + match self.stream.write_buf(&mut self.buffer) { + Ok(Async::Ready(n)) => { + // println!("advance 2 {:?} {:?}", n, self.buffer.remaining()); + // let _ = self.buffer.advance(n); }, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + Ok(Async::NotReady) => { + println!("not ready"); + if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) } - } + }, Err(err) => return Err(err), } } @@ -129,12 +131,13 @@ impl Writer for H1Writer { // render message { - let mut buffer = self.buffer.get_mut(); - if let Body::Binary(ref bytes) = body { - buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); - } else { - buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE); - } + let mut buffer = BytesMut::with_capacity( + 256 + msg.headers().len() * AVERAGE_HEADER_SIZE); + //if let Body::Binary(ref bytes) = body { + // buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); + //} else { + // buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE); + //} // status line helpers::write_status_line(version, msg.status().as_u16(), &mut buffer); @@ -168,6 +171,9 @@ impl Writer for H1Writer { buffer.extend_from_slice(b"\r\n"); } self.headers_size = buffer.len() as u32; + + // push to write buffer + self.buffer.push(buffer.into()); } if let Body::Binary(bytes) = body { @@ -188,11 +194,11 @@ impl Writer for H1Writer { return Ok(WriterState::Done) } else { // might be response to EXCEPT - self.buffer.extend_from_slice(payload.as_ref()) + self.buffer.push(payload) } } - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -205,7 +211,7 @@ impl Writer for H1Writer { if !self.encoder.is_eof() { Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) - } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + } else if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) diff --git a/src/server/h2.rs b/src/server/h2.rs index c843fee89..15513d439 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -20,6 +20,7 @@ use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use payload::{Payload, PayloadWriter}; +use super::shared::SharedIo; use super::h2writer::H2Writer; use super::encoding::PayloadType; use super::settings::WorkerSettings; @@ -287,7 +288,7 @@ impl Entry { Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)), payload: psender, recv: recv, - stream: H2Writer::new(resp, settings.get_shared_bytes()), + stream: H2Writer::new(resp, SharedIo::default()), //settings.get_shared_bytes()), flags: EntryFlags::empty(), capacity: 0, } diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 0701d028e..c0b54ab23 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -1,5 +1,5 @@ use std::{io, cmp}; -use bytes::{Bytes, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use futures::{Async, Poll}; use http2::{Reason, SendStream}; use http2::server::SendResponse; @@ -11,7 +11,7 @@ use body::{Body, Binary}; use httprequest::HttpMessage; use httpresponse::HttpResponse; use super::encoding::PayloadEncoder; -use super::shared::SharedBytes; +use super::shared::{SharedIo, SharedBytes}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; const CHUNK_SIZE: usize = 16_384; @@ -30,12 +30,12 @@ pub(crate) struct H2Writer { encoder: PayloadEncoder, flags: Flags, written: u64, - buffer: SharedBytes, + buffer: SharedIo, } impl H2Writer { - pub fn new(respond: SendResponse, buf: SharedBytes) -> H2Writer { + pub fn new(respond: SendResponse, buf: SharedIo) -> H2Writer { H2Writer { respond: respond, stream: None, @@ -68,7 +68,7 @@ impl H2Writer { loop { match stream.poll_capacity() { Ok(Async::NotReady) => { - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { return Ok(WriterState::Pause) } else { return Ok(WriterState::Done) @@ -78,15 +78,15 @@ impl H2Writer { return Ok(WriterState::Done) } Ok(Async::Ready(Some(cap))) => { - let len = self.buffer.len(); + let len = self.buffer.remaining(); let bytes = self.buffer.split_to(cmp::min(cap, len)); let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF); self.written += bytes.len() as u64; - if let Err(err) = stream.send_data(bytes.freeze(), eof) { + if let Err(err) = stream.send_data(bytes, eof) { return Err(io::Error::new(io::ErrorKind::Other, err)) } else if !self.buffer.is_empty() { - let cap = cmp::min(self.buffer.len(), CHUNK_SIZE); + let cap = cmp::min(self.buffer.remaining(), CHUNK_SIZE); stream.reserve_capacity(cap); } else { return Ok(WriterState::Pause) @@ -168,7 +168,7 @@ impl Writer for H2Writer { self.written = bytes.len() as u64; self.encoder.write(bytes)?; if let Some(ref mut stream) = self.stream { - stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); + stream.reserve_capacity(cmp::min(self.buffer.remaining(), CHUNK_SIZE)); } Ok(WriterState::Pause) } else { @@ -186,11 +186,11 @@ impl Writer for H2Writer { self.encoder.write(payload)?; } else { // might be response for EXCEPT - self.buffer.extend_from_slice(payload.as_ref()) + self.buffer.push(payload); } } - if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) @@ -204,7 +204,7 @@ impl Writer for H2Writer { if !self.encoder.is_eof() { Err(io::Error::new(io::ErrorKind::Other, "Last payload item, but eof is not reached")) - } else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE { + } else if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE { Ok(WriterState::Pause) } else { Ok(WriterState::Done) diff --git a/src/server/shared.rs b/src/server/shared.rs index 15307e0fe..e08f4462f 100644 --- a/src/server/shared.rs +++ b/src/server/shared.rs @@ -1,8 +1,9 @@ -use std::mem; +use std::{cmp, mem}; use std::cell::RefCell; use std::rc::Rc; use std::collections::VecDeque; -use bytes::BytesMut; +use iovec::IoVec; +use bytes::{Buf, Bytes, BytesMut}; use body::Binary; @@ -113,3 +114,115 @@ impl Clone for SharedBytes { SharedBytes(self.0.clone(), self.1.clone()) } } + + +#[derive(Debug)] +pub(crate) struct SharedIo( + Rc> +); + +impl SharedIo { + #[inline(always)] + #[allow(mutable_transmutes)] + #[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))] + fn get_mut(&self) -> &mut VecDeque { + let r: &VecDeque<_> = self.0.as_ref(); + unsafe{mem::transmute(r)} + } + + pub fn clear(&self) { + self.get_mut().clear(); + } + + pub fn push(&self, data: Binary) { + self.get_mut().push_back(data); + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + pub fn split_to(&self, n: usize) -> Bytes { + let b = Bytes::from(&self.bytes()[..n]); + + let slf: &mut SharedIo = unsafe{mem::transmute(self as *const _ as *mut SharedIo)}; + slf.advance(n); + b + } + + pub fn take(&self) -> Bytes { + match self.0.len() { + 0 => Bytes::from_static(b""), + 1 => self.get_mut().pop_front().unwrap().into(), + _ => { + self.squash(); + self.take() + } + } + } + + fn squash(&self) { + let len = self.remaining(); + let buf = self.0.iter().fold( + BytesMut::with_capacity(len), + |mut buf, item| {buf.extend_from_slice(item.as_ref()); buf}); + let vec = self.get_mut(); + vec.clear(); + vec.push_back(buf.into()); + } +} + +impl Default for SharedIo { + fn default() -> SharedIo { + SharedIo(Rc::new(VecDeque::new())) + } +} + +impl Clone for SharedIo { + fn clone(&self) -> SharedIo { + SharedIo(Rc::clone(&self.0)) + } +} + +impl Buf for SharedIo { + fn remaining(&self) -> usize { + self.0.iter().fold(0, |cnt, item| cnt + item.len()) + } + + fn bytes(&self) -> &[u8] { + match self.0.len() { + 0 => b"", + 1 => self.0[0].as_ref(), + _ => { + self.squash(); + self.bytes() + } + } + } + + fn bytes_vec<'a>(&'a self, dst: &mut [&'a IoVec]) -> usize { + let num = cmp::min(dst.len(), self.0.len()); + for idx in 0..num { + dst[idx] = self.0[idx].as_ref().into(); + } + num + } + + fn advance(&mut self, mut cnt: usize) { + let vec = self.get_mut(); + while cnt > 0 { + if let Some(mut item) = vec.pop_front() { + if item.len() <= cnt { + cnt -= item.len(); + } else { + let mut item = item.take(); + item.split_to(cnt); + vec.push_front(item.into()); + break + } + } else { + break + } + } + } +}