iovec experiments

This commit is contained in:
Nikolay Kim 2018-01-14 18:55:11 -08:00
parent 89a89e7b18
commit c2f820f6b5
8 changed files with 177 additions and 54 deletions

View File

@ -62,6 +62,7 @@ cookie = { version="0.10", features=["percent-encode", "secure"] }
mio = "0.6"
net2 = "0.2"
bytes = "0.4"
iovec = "0.1"
futures = "0.1"
tokio-io = "0.1"
tokio-core = "0.1"

View File

@ -48,6 +48,7 @@
extern crate log;
extern crate time;
extern crate bytes;
extern crate iovec;
extern crate sha1;
extern crate regex;
#[macro_use]

View File

@ -20,7 +20,7 @@ use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter};
use super::shared::SharedBytes;
use super::shared::{SharedIo, SharedBytes};
impl ContentEncoding {
@ -339,11 +339,11 @@ pub(crate) struct PayloadEncoder(ContentEncoder);
impl PayloadEncoder {
pub fn empty(bytes: SharedBytes) -> PayloadEncoder {
pub fn empty(bytes: SharedIo) -> PayloadEncoder {
PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes)))
}
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
pub fn new(buf: SharedIo, req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty);
let has_body = match body {
@ -385,7 +385,7 @@ impl PayloadEncoder {
},
Body::Binary(ref mut bytes) => {
if encoding.is_compression() {
let tmp = SharedBytes::default();
let tmp = SharedIo::default();
let transfer = TransferEncoding::eof(tmp.clone());
let mut enc = match encoding {
ContentEncoding::Deflate => ContentEncoder::Deflate(
@ -441,7 +441,7 @@ impl PayloadEncoder {
)
}
fn streaming_encoding(buf: SharedBytes, version: Version,
fn streaming_encoding(buf: SharedIo, version: Version,
resp: &mut HttpResponse) -> TransferEncoding {
match resp.chunked() {
Some(true) => {
@ -550,7 +550,7 @@ impl ContentEncoder {
#[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> {
let encoder = mem::replace(
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())));
self, ContentEncoder::Identity(TransferEncoding::eof(SharedIo::default())));
match encoder {
ContentEncoder::Br(encoder) => {
@ -637,7 +637,7 @@ impl ContentEncoder {
#[derive(Debug, Clone)]
pub(crate) struct TransferEncoding {
kind: TransferEncodingKind,
buffer: SharedBytes,
buffer: SharedIo,
}
#[derive(Debug, PartialEq, Clone)]
@ -657,7 +657,7 @@ enum TransferEncodingKind {
impl TransferEncoding {
#[inline]
pub fn eof(bytes: SharedBytes) -> TransferEncoding {
pub fn eof(bytes: SharedIo) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Eof,
buffer: bytes,
@ -665,7 +665,7 @@ impl TransferEncoding {
}
#[inline]
pub fn chunked(bytes: SharedBytes) -> TransferEncoding {
pub fn chunked(bytes: SharedIo) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Chunked(false),
buffer: bytes,
@ -673,7 +673,7 @@ impl TransferEncoding {
}
#[inline]
pub fn length(len: u64, bytes: SharedBytes) -> TransferEncoding {
pub fn length(len: u64, bytes: SharedIo) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Length(len),
buffer: bytes,
@ -695,7 +695,7 @@ impl TransferEncoding {
match self.kind {
TransferEncodingKind::Eof => {
let eof = msg.is_empty();
self.buffer.extend(msg);
self.buffer.push(msg);
Ok(eof)
},
TransferEncodingKind::Chunked(ref mut eof) => {
@ -705,14 +705,14 @@ impl TransferEncoding {
if msg.is_empty() {
*eof = true;
self.buffer.extend_from_slice(b"0\r\n\r\n");
self.buffer.push((&b"0\r\n\r\n"[..]).into());
} else {
let mut buf = BytesMut::new();
write!(&mut buf, "{:X}\r\n", msg.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
self.buffer.extend(buf.into());
self.buffer.extend(msg);
self.buffer.extend_from_slice(b"\r\n");
self.buffer.push(buf.into());
self.buffer.push(msg);
self.buffer.push((&b"\r\n"[..]).into());
}
Ok(*eof)
},
@ -721,7 +721,7 @@ impl TransferEncoding {
return Ok(*remaining == 0)
}
let max = cmp::min(*remaining, msg.len() as u64);
self.buffer.extend(msg.take().split_to(max as usize).into());
self.buffer.push(msg.take().split_to(max as usize).into());
*remaining -= max as u64;
Ok(*remaining == 0)
@ -737,7 +737,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof {
*eof = true;
self.buffer.extend_from_slice(b"0\r\n\r\n");
self.buffer.push(Binary::from(&b"0\r\n\r\n"[..]));
}
},
}

View File

@ -19,6 +19,7 @@ use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
use super::{utils, Writer};
use super::shared::SharedIo;
use super::h1writer::H1Writer;
use super::encoding::PayloadType;
use super::settings::WorkerSettings;
@ -65,11 +66,11 @@ impl<T, H> Http1<T, H>
pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>, buf: BytesMut)
-> Self
{
let bytes = h.get_shared_bytes();
// let bytes = h.get_shared_bytes();
Http1{ flags: Flags::KEEPALIVE,
settings: h,
addr: addr,
stream: H1Writer::new(stream, bytes),
stream: H1Writer::new(stream, SharedIo::default()),
reader: Reader::new(),
read_buf: buf,
tasks: VecDeque::new(),

View File

@ -1,5 +1,5 @@
use std::io;
use bytes::BufMut;
use bytes::{Buf, BufMut, BytesMut};
use futures::{Async, Poll};
use tokio_io::AsyncWrite;
use http::Version;
@ -10,7 +10,7 @@ use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use super::shared::SharedBytes;
use super::shared::{SharedBytes, SharedIo};
use super::encoding::PayloadEncoder;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -30,12 +30,12 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
encoder: PayloadEncoder,
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer: SharedIo,
}
impl<T: AsyncWrite> H1Writer<T> {
pub fn new(stream: T, buf: SharedBytes) -> H1Writer<T> {
pub fn new(stream: T, buf: SharedIo) -> H1Writer<T> {
H1Writer {
flags: Flags::empty(),
stream: stream,
@ -56,7 +56,7 @@ impl<T: AsyncWrite> H1Writer<T> {
}
pub fn disconnected(&mut self) {
self.buffer.take();
self.buffer.clear();
}
pub fn keepalive(&self) -> bool {
@ -65,17 +65,19 @@ impl<T: AsyncWrite> H1Writer<T> {
fn write_to_stream(&mut self) -> io::Result<WriterState> {
while !self.buffer.is_empty() {
match self.stream.write(self.buffer.as_ref()) {
Ok(n) => {
let _ = self.buffer.split_to(n);
match self.stream.write_buf(&mut self.buffer) {
Ok(Async::Ready(n)) => {
// println!("advance 2 {:?} {:?}", n, self.buffer.remaining());
// let _ = self.buffer.advance(n);
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
Ok(Async::NotReady) => {
println!("not ready");
if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
}
}
},
Err(err) => return Err(err),
}
}
@ -129,12 +131,13 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// render message
{
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
let mut buffer = BytesMut::with_capacity(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
//if let Body::Binary(ref bytes) = body {
// buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
//} else {
// buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
//}
// status line
helpers::write_status_line(version, msg.status().as_u16(), &mut buffer);
@ -168,6 +171,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
buffer.extend_from_slice(b"\r\n");
}
self.headers_size = buffer.len() as u32;
// push to write buffer
self.buffer.push(buffer.into());
}
if let Body::Binary(bytes) = body {
@ -188,11 +194,11 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
return Ok(WriterState::Done)
} else {
// might be response to EXCEPT
self.buffer.extend_from_slice(payload.as_ref())
self.buffer.push(payload)
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@ -205,7 +211,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -20,6 +20,7 @@ use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use payload::{Payload, PayloadWriter};
use super::shared::SharedIo;
use super::h2writer::H2Writer;
use super::encoding::PayloadType;
use super::settings::WorkerSettings;
@ -287,7 +288,7 @@ impl Entry {
Entry {task: task.unwrap_or_else(|| Pipeline::error(HTTPNotFound)),
payload: psender,
recv: recv,
stream: H2Writer::new(resp, settings.get_shared_bytes()),
stream: H2Writer::new(resp, SharedIo::default()), //settings.get_shared_bytes()),
flags: EntryFlags::empty(),
capacity: 0,
}

View File

@ -1,5 +1,5 @@
use std::{io, cmp};
use bytes::{Bytes, BytesMut};
use bytes::{Buf, Bytes, BytesMut};
use futures::{Async, Poll};
use http2::{Reason, SendStream};
use http2::server::SendResponse;
@ -11,7 +11,7 @@ use body::{Body, Binary};
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
use super::encoding::PayloadEncoder;
use super::shared::SharedBytes;
use super::shared::{SharedIo, SharedBytes};
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
const CHUNK_SIZE: usize = 16_384;
@ -30,12 +30,12 @@ pub(crate) struct H2Writer {
encoder: PayloadEncoder,
flags: Flags,
written: u64,
buffer: SharedBytes,
buffer: SharedIo,
}
impl H2Writer {
pub fn new(respond: SendResponse<Bytes>, buf: SharedBytes) -> H2Writer {
pub fn new(respond: SendResponse<Bytes>, buf: SharedIo) -> H2Writer {
H2Writer {
respond: respond,
stream: None,
@ -68,7 +68,7 @@ impl H2Writer {
loop {
match stream.poll_capacity() {
Ok(Async::NotReady) => {
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
return Ok(WriterState::Pause)
} else {
return Ok(WriterState::Done)
@ -78,15 +78,15 @@ impl H2Writer {
return Ok(WriterState::Done)
}
Ok(Async::Ready(Some(cap))) => {
let len = self.buffer.len();
let len = self.buffer.remaining();
let bytes = self.buffer.split_to(cmp::min(cap, len));
let eof = self.buffer.is_empty() && self.flags.contains(Flags::EOF);
self.written += bytes.len() as u64;
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
if let Err(err) = stream.send_data(bytes, eof) {
return Err(io::Error::new(io::ErrorKind::Other, err))
} else if !self.buffer.is_empty() {
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
let cap = cmp::min(self.buffer.remaining(), CHUNK_SIZE);
stream.reserve_capacity(cap);
} else {
return Ok(WriterState::Pause)
@ -168,7 +168,7 @@ impl Writer for H2Writer {
self.written = bytes.len() as u64;
self.encoder.write(bytes)?;
if let Some(ref mut stream) = self.stream {
stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE));
stream.reserve_capacity(cmp::min(self.buffer.remaining(), CHUNK_SIZE));
}
Ok(WriterState::Pause)
} else {
@ -186,11 +186,11 @@ impl Writer for H2Writer {
self.encoder.write(payload)?;
} else {
// might be response for EXCEPT
self.buffer.extend_from_slice(payload.as_ref())
self.buffer.push(payload);
}
}
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)
@ -204,7 +204,7 @@ impl Writer for H2Writer {
if !self.encoder.is_eof() {
Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached"))
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
} else if self.buffer.remaining() > MAX_WRITE_BUFFER_SIZE {
Ok(WriterState::Pause)
} else {
Ok(WriterState::Done)

View File

@ -1,8 +1,9 @@
use std::mem;
use std::{cmp, mem};
use std::cell::RefCell;
use std::rc::Rc;
use std::collections::VecDeque;
use bytes::BytesMut;
use iovec::IoVec;
use bytes::{Buf, Bytes, BytesMut};
use body::Binary;
@ -113,3 +114,115 @@ impl Clone for SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
#[derive(Debug)]
pub(crate) struct SharedIo(
Rc<VecDeque<Binary>>
);
impl SharedIo {
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
fn get_mut(&self) -> &mut VecDeque<Binary> {
let r: &VecDeque<_> = self.0.as_ref();
unsafe{mem::transmute(r)}
}
pub fn clear(&self) {
self.get_mut().clear();
}
pub fn push(&self, data: Binary) {
self.get_mut().push_back(data);
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn split_to(&self, n: usize) -> Bytes {
let b = Bytes::from(&self.bytes()[..n]);
let slf: &mut SharedIo = unsafe{mem::transmute(self as *const _ as *mut SharedIo)};
slf.advance(n);
b
}
pub fn take(&self) -> Bytes {
match self.0.len() {
0 => Bytes::from_static(b""),
1 => self.get_mut().pop_front().unwrap().into(),
_ => {
self.squash();
self.take()
}
}
}
fn squash(&self) {
let len = self.remaining();
let buf = self.0.iter().fold(
BytesMut::with_capacity(len),
|mut buf, item| {buf.extend_from_slice(item.as_ref()); buf});
let vec = self.get_mut();
vec.clear();
vec.push_back(buf.into());
}
}
impl Default for SharedIo {
fn default() -> SharedIo {
SharedIo(Rc::new(VecDeque::new()))
}
}
impl Clone for SharedIo {
fn clone(&self) -> SharedIo {
SharedIo(Rc::clone(&self.0))
}
}
impl Buf for SharedIo {
fn remaining(&self) -> usize {
self.0.iter().fold(0, |cnt, item| cnt + item.len())
}
fn bytes(&self) -> &[u8] {
match self.0.len() {
0 => b"",
1 => self.0[0].as_ref(),
_ => {
self.squash();
self.bytes()
}
}
}
fn bytes_vec<'a>(&'a self, dst: &mut [&'a IoVec]) -> usize {
let num = cmp::min(dst.len(), self.0.len());
for idx in 0..num {
dst[idx] = self.0[idx].as_ref().into();
}
num
}
fn advance(&mut self, mut cnt: usize) {
let vec = self.get_mut();
while cnt > 0 {
if let Some(mut item) = vec.pop_front() {
if item.len() <= cnt {
cnt -= item.len();
} else {
let mut item = item.take();
item.split_to(cnt);
vec.push_front(item.into());
break
}
} else {
break
}
}
}
}