stopping point

This commit is contained in:
Nikolay Kim 2018-04-21 07:54:48 -07:00
parent f8af3ef7f4
commit 2788a838bc
21 changed files with 1620 additions and 1126 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "actix-web"
version = "0.5.5"
version = "0.6.0-dev"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
readme = "README.md"
@ -79,8 +79,11 @@ bytes = "0.4"
byteorder = "1"
futures = "0.1"
futures-cpupool = "0.1"
tokio-io = "0.1"
tokio-core = "0.1"
tokio-io = "=0.1.5"
tokio-core = "=0.1.12"
slab = "0.4"
iovec = "0.1"
# native-tls
native-tls = { version="0.1", optional = true }
@ -102,6 +105,9 @@ lto = true
opt-level = 3
codegen-units = 1
[replace]
"mio:0.6.14" = {path="../mio"}
[workspace]
members = [
"./",

29
examples/test.rs Normal file
View File

@ -0,0 +1,29 @@
extern crate actix;
extern crate actix_web;
extern crate env_logger;
use actix_web::{middleware, server, App, HttpRequest};
fn index(_req: HttpRequest) -> &'static str {
"Hello world!"
}
fn main() {
//::std::env::set_var("RUST_LOG", "actix_web=info");
//env_logger::init();
let sys = actix::System::new("hello-world");
server::new(|| {
App::new()
// enable logger
//.middleware(middleware::Logger::default())
.resource("/index.html", |r| r.f(|_| "Hello world!"))
.resource("/", |r| r.f(index))
}).bind("127.0.0.1:8080")
.unwrap()
.threads(1)
.start();
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}

View File

@ -1,7 +1,5 @@
max_width = 89
reorder_imports = true
reorder_imports_in_group = true
reorder_imported_names = true
wrap_comments = true
fn_args_density = "Compressed"
#use_small_heuristics = false
use_small_heuristics = false

View File

@ -7,18 +7,19 @@ use std::mem;
use error::{ParseError, PayloadError};
use server::h1::{chunked, Decoder};
//use server::h1decoder::{chunked, EncodingDecoder};
use server::h1decoder::EncodingDecoder;
use server::{utils, IoStream};
use super::ClientResponse;
use super::response::ClientMessage;
use super::ClientResponse;
const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 96;
#[derive(Default)]
pub struct HttpResponseParser {
decoder: Option<Decoder>,
decoder: Option<EncodingDecoder>,
}
#[derive(Debug, Fail)]
@ -32,7 +33,7 @@ pub enum HttpResponseParserError {
impl HttpResponseParser {
pub fn parse<T>(
&mut self, io: &mut T, buf: &mut BytesMut
&mut self, io: &mut T, buf: &mut BytesMut,
) -> Poll<ClientResponse, HttpResponseParserError>
where
T: IoStream,
@ -75,7 +76,7 @@ impl HttpResponseParser {
}
pub fn parse_payload<T>(
&mut self, io: &mut T, buf: &mut BytesMut
&mut self, io: &mut T, buf: &mut BytesMut,
) -> Poll<Option<Bytes>, PayloadError>
where
T: IoStream,
@ -113,8 +114,8 @@ impl HttpResponseParser {
}
fn parse_message(
buf: &mut BytesMut
) -> Poll<(ClientResponse, Option<Decoder>), ParseError> {
buf: &mut BytesMut,
) -> Poll<(ClientResponse, Option<EncodingDecoder>), ParseError> {
// Parse http message
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
@ -160,12 +161,12 @@ impl HttpResponseParser {
}
let decoder = if status == StatusCode::SWITCHING_PROTOCOLS {
Some(Decoder::eof())
Some(EncodingDecoder::eof())
} else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(Decoder::length(len))
Some(EncodingDecoder::length(len))
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
@ -174,9 +175,9 @@ impl HttpResponseParser {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
} else if chunked(&hdrs)? {
// Chunked encoding
Some(Decoder::chunked())
//} else if chunked(&hdrs)? {
// Chunked encoding
// Some(EncodingDecoder::chunked())
} else {
None
};

View File

@ -1,9 +1,10 @@
use std::marker::PhantomData;
use std::mem;
use futures::sync::oneshot::Sender;
use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use smallvec::SmallVec;
use std::marker::PhantomData;
use std::mem;
use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope};
use actix::fut::ActorFuture;
@ -261,7 +262,7 @@ impl<A: Actor> ActorFuture for Drain<A> {
#[inline]
fn poll(
&mut self, _: &mut A, _: &mut <Self::Actor as Actor>::Context
&mut self, _: &mut A, _: &mut <Self::Actor as Actor>::Context,
) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(|_| ())
}

View File

@ -314,7 +314,7 @@ impl<S> HttpRequest<S> {
/// }
/// ```
pub fn url_for<U, I>(
&self, name: &str, elements: U
&self, name: &str, elements: U,
) -> Result<Url, UrlGenerationError>
where
U: IntoIterator<Item = I>,
@ -592,6 +592,29 @@ impl<S> fmt::Debug for HttpRequest<S> {
}
}
impl fmt::Debug for HttpInnerMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = writeln!(
f,
"\nHttpInnerMessage {:?} {}:{}",
self.version,
self.method,
self.url.path()
);
if let Some(query) = self.url.uri().query().as_ref() {
let _ = writeln!(f, " query: ?{:?}", query);
}
if !self.params.is_empty() {
let _ = writeln!(f, " params: {:?}", self.params);
}
let _ = writeln!(f, " headers:");
for (key, val) in self.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
}
res
}
}
#[cfg(test)]
mod tests {
#![allow(deprecated)]
@ -681,12 +704,10 @@ mod tests {
let mut resource = ResourceHandler::<()>::default();
resource.name("index");
let routes = vec![
(
Resource::new("index", "/user/{name}.{ext}"),
Some(resource),
),
];
let routes = vec![(
Resource::new("index", "/user/{name}.{ext}"),
Some(resource),
)];
let (router, _) = Router::new("/", ServerSettings::default(), routes);
assert!(router.has_route("/user/test.html"));
assert!(!router.has_route("/test/unknown"));
@ -715,12 +736,10 @@ mod tests {
let mut resource = ResourceHandler::<()>::default();
resource.name("index");
let routes = vec![
(
Resource::new("index", "/user/{name}.{ext}"),
Some(resource),
),
];
let routes = vec![(
Resource::new("index", "/user/{name}.{ext}"),
Some(resource),
)];
let (router, _) = Router::new("/prefix/", ServerSettings::default(), routes);
assert!(router.has_route("/user/test.html"));
assert!(!router.has_route("/prefix/user/test.html"));
@ -739,12 +758,10 @@ mod tests {
let mut resource = ResourceHandler::<()>::default();
resource.name("index");
let routes = vec![
(
Resource::external("youtube", "https://youtube.com/watch/{video_id}"),
None,
),
];
let routes = vec![(
Resource::external("youtube", "https://youtube.com/watch/{video_id}"),
None,
)];
let (router, _) = Router::new::<()>("", ServerSettings::default(), routes);
assert!(!router.has_route("https://youtube.com/watch/unknown"));

156
src/io/channel.rs Normal file
View File

@ -0,0 +1,156 @@
#![allow(dead_code, unused_imports)]
use std::cell::UnsafeCell;
use std::net::{SocketAddr, TcpStream};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::{io, thread};
use bytes::Bytes;
use futures::task::{current as current_task, Task};
use mio;
use super::IoStream;
use io::{Core, IoToken};
use server::h1decoder::{Decoder, DecoderError, Message};
pub(crate) enum TaskCommand {
Stream(IoStream),
}
pub(crate) enum IoCommand {
AddSource(TcpStream, Option<SocketAddr>),
Bytes(IoToken, Bytes),
Pause(IoToken),
Drain(IoToken),
Resume(IoToken),
Done { token: IoToken, graceful: bool },
}
struct Shared {
io: AtomicBool,
io_tx: mpsc::Sender<IoCommand>,
io_rx: mpsc::Receiver<IoCommand>,
io_reg: mio::Registration,
io_notify: mio::SetReadiness,
task: AtomicBool,
task_tx: mpsc::Sender<TaskCommand>,
task_rx: mpsc::Receiver<TaskCommand>,
task_notify: UnsafeCell<Task>,
}
pub(crate) struct IoChannel {
shared: Arc<Shared>,
}
impl IoChannel {
pub fn new() -> Self {
let (reg, notify) = mio::Registration::new2();
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
let shared = Arc::new(Shared {
io: AtomicBool::new(true),
io_tx: tx1,
io_rx: rx1,
io_reg: reg,
io_notify: notify,
task: AtomicBool::new(true),
task_tx: tx2,
task_rx: rx2,
task_notify: UnsafeCell::new(current_task()),
});
let ch = TaskChannel {
shared: Arc::clone(&shared),
};
thread::spawn(move || {
let core = Core::new(ch).unwrap();
core.run()
});
IoChannel { shared }
}
pub fn notify(&self) {
if !self.shared.io.load(Ordering::Relaxed) {
let _ = self.shared
.io_notify
.set_readiness(mio::Ready::readable());
}
}
pub fn set_notify(&self, task: Task) {
unsafe { *self.shared.task_notify.get() = task };
self.shared.task.store(false, Ordering::Relaxed);
}
pub fn add_source(&self, io: TcpStream, peer: Option<SocketAddr>, _http2: bool) {
self.send(IoCommand::AddSource(io, peer));
self.notify();
}
#[inline]
pub fn start(&self) {
self.shared.task.store(true, Ordering::Relaxed)
}
#[inline]
pub fn end(&self) {
self.shared.task.store(false, Ordering::Relaxed)
}
#[inline]
pub fn send(&self, msg: IoCommand) {
let _ = self.shared.io_tx.send(msg);
self.notify();
}
#[inline]
pub fn try_recv(&self) -> Result<TaskCommand, mpsc::TryRecvError> {
self.shared.task_rx.try_recv()
}
}
pub(crate) struct TaskChannel {
shared: Arc<Shared>,
}
unsafe impl Send for TaskChannel {}
impl TaskChannel {
#[inline]
pub fn notify(&self) {
if !self.shared.task.load(Ordering::Relaxed) {
let task = unsafe { &mut *self.shared.task_notify.get() };
task.notify();
}
}
#[inline]
pub fn send(&self, msg: TaskCommand) {
let _ = self.shared.task_tx.send(msg);
}
#[inline]
pub fn registration(&self) -> &mio::Registration {
&self.shared.io_reg
}
#[inline]
pub fn start(&self) {
self.shared.io.store(true, Ordering::Relaxed)
}
#[inline]
pub fn end(&self) {
self.shared.io.store(false, Ordering::Relaxed)
}
#[inline]
pub fn try_recv(&self) -> Result<IoCommand, mpsc::TryRecvError> {
self.shared.io_rx.try_recv()
}
}

383
src/io/mod.rs Normal file
View File

@ -0,0 +1,383 @@
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::io::{self, Read, Write};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_BOOL_INIT};
use std::sync::{mpsc, Arc};
use std::{mem, net};
use bytes::{BufMut, Bytes, BytesMut};
use futures::task::Task;
use futures::{Async, Poll};
use mio;
use mio::event::Evented;
use mio::net::TcpStream;
use slab::Slab;
use server::h1decoder::{Decoder, DecoderError, Message};
use server::helpers::SharedMessagePool;
mod channel;
pub(crate) use self::channel::{IoChannel, IoCommand, TaskChannel, TaskCommand};
const TOKEN_NOTIFY: usize = 0;
const TOKEN_START: usize = 2;
const LW_BUFFER_SIZE: usize = 4096;
const HW_BUFFER_SIZE: usize = 32_768;
pub(crate) struct Core {
mio: mio::Poll,
events: mio::Events,
state: Arc<AtomicBool>,
io: Slab<Io>,
channel: TaskChannel,
pool: Arc<SharedMessagePool>,
}
impl Core {
pub fn new(channel: TaskChannel) -> io::Result<Core> {
let mio = mio::Poll::new()?;
let notify = mio::Token(TOKEN_NOTIFY);
// notify stream
mio.register(
channel.registration(),
notify,
mio::Ready::readable(),
mio::PollOpt::edge(),
)?;
Ok(Core {
mio,
channel,
io: Slab::new(),
events: mio::Events::with_capacity(1024),
state: Arc::new(AtomicBool::new(true)),
pool: Arc::new(SharedMessagePool::new()),
})
}
pub fn run(mut self) {
loop {
self.dispatch_commands();
self.poll_io();
}
}
fn poll_io(&mut self) {
//println!("POLL IO");
// Block waiting for an event to happen
let _amt = match self.mio.poll(&mut self.events, None) {
Ok(a) => a,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => return,
Err(e) => panic!("Poll error: {}", e),
};
let mut modified = false;
for event in self.events.iter() {
let token = usize::from(event.token());
//println!("event: {:?}", event);
if token != TOKEN_NOTIFY {
let mut remove = false;
if let Some(io) = self.io.get_mut(token - TOKEN_START) {
match io.poll(event.readiness(), &self.channel) {
IoResult::Notify => modified = true,
IoResult::Remove => remove = true,
IoResult::StopReading => {}
IoResult::StopWriting => {}
IoResult::NotReady => (),
}
}
if remove {
if self.io.contains(token - TOKEN_START) {
let _ = self.io.remove(token - TOKEN_START);
}
}
}
}
if modified {
self.channel.notify();
}
}
fn dispatch_commands(&mut self) {
self.channel.start();
loop {
match self.channel.try_recv() {
Ok(IoCommand::AddSource(source, peer)) => {
match TcpStream::from_stream(source) {
Ok(stream) => match self.add_source(stream, peer) {
Ok(token) => (),
Err(_) => (),
},
Err(e) => {
error!("Can not register io object: {}", e);
}
}
}
Ok(IoCommand::Bytes(token, bytes)) => {
if let Some(io) = self.io.get_mut(token.token) {
io.write(bytes);
}
}
Ok(IoCommand::Drain(token)) => {}
Ok(IoCommand::Pause(token)) => {}
Ok(IoCommand::Resume(token)) => {
if let Some(io) = self.io.get_mut(token.token) {
// io.as_mut().ready = true;
}
}
Ok(IoCommand::Done {
token,
graceful,
}) => {
if self.io.contains(token.token) {
let _ = self.io.remove(token.token);
}
}
Err(_) => break,
}
}
self.channel.end();
}
fn add_source(
&mut self, io: TcpStream, peer: Option<net::SocketAddr>,
) -> io::Result<IoToken> {
debug!("adding a new I/O source");
if self.io.len() == self.io.capacity() {
let amt = self.io.len();
self.io.reserve_exact(amt);
}
let entry = self.io.vacant_entry();
let token = entry.key();
self.mio.register(
&io,
mio::Token(TOKEN_START + token),
mio::Ready::readable() | mio::Ready::writable(),
mio::PollOpt::edge(),
)?;
let token = IoToken {
token,
};
let decoder = Decoder::new(Arc::clone(&self.pool));
let io = Io {
buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
inner: Arc::new(UnsafeCell::new(Inner {
io,
token,
decoder,
peer,
lock: ATOMIC_BOOL_INIT,
task: None,
ready: false,
started: false,
buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
messages: VecDeque::new(),
})),
};
entry.insert(io);
Ok(token)
}
}
#[derive(Clone, Copy, Debug)]
pub struct IoToken {
token: usize,
}
#[derive(Debug)]
enum IoResult {
NotReady,
Notify,
Remove,
StopReading,
StopWriting,
}
struct Io {
inner: Arc<UnsafeCell<Inner>>,
buf: BytesMut,
}
impl Io {
#[inline]
fn as_mut(&mut self) -> &mut Inner {
unsafe { &mut *self.inner.as_ref().get() }
}
fn write(&mut self, data: Bytes) {
//self.buf.extend_from_slice(&data);
let inner: &mut Inner = unsafe { &mut *self.inner.as_ref().get() };
//while !self.buf.is_empty() {
match inner.io.write(&data) {
Ok(0) => {
// self.disconnected();
// return Err(io::Error::new(io::ErrorKind::WriteZero, ""));
return
}
Ok(n) => {
//self.buf.split_to(n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return
}
Err(err) => return // Err(err),
}
//}
}
fn poll(&mut self, ready: mio::Ready, channel: &TaskChannel) -> IoResult {
let inner: &mut Inner = unsafe { mem::transmute(self.as_mut()) };
let mut updated = IoResult::NotReady;
let (read, eof) = match inner.read_from_io() {
Ok(Async::Ready((n, eof))) => (n, eof),
Ok(Async::NotReady) => return IoResult::NotReady,
Err(e) => {
if inner.started {
info!("error during io: {:?}", e);
inner.send(Err(e.into()));
return IoResult::NotReady;
} else {
info!("error during io before message: {:?}", e);
// first message is not ready, so we can drop connection
return IoResult::Remove;
}
}
};
loop {
let msg = match inner.decoder.decode(&mut inner.buf) {
Ok(Async::NotReady) => {
if eof {
if inner.started {
inner.send(Ok(Message::Hup));
} else {
return IoResult::Remove;
}
}
break;
}
Ok(Async::Ready(msg)) => Ok(msg),
Err(e) => Err(e),
};
if inner.started {
inner.send(msg);
} else {
if msg.is_ok() {
inner.started = true;
inner.messages.push_back(msg);
let inner = self.inner.clone();
let _ = channel.send(TaskCommand::Stream(IoStream {
inner,
}));
} else {
// first message is not ready, so we can drop connection
return IoResult::Remove;
}
}
}
//println!("READY {:?} {:?}", ready, updated);
updated
}
}
pub(crate) struct IoStream {
inner: Arc<UnsafeCell<Inner>>,
}
impl IoStream {
pub fn token(&self) -> IoToken {
self.as_mut().token
}
pub fn peer(&self) -> Option<net::SocketAddr> {
self.as_mut().peer
}
pub fn set_notify(&self, task: Task) {
let inner = self.as_mut();
while inner.lock.compare_and_swap(false, true, Ordering::Acquire) != false {}
inner.task = Some(task);
inner.lock.store(false, Ordering::Release);
}
#[inline]
fn as_mut(&self) -> &mut Inner {
unsafe { &mut *self.inner.as_ref().get() }
}
pub fn try_recv(&self) -> Option<Result<Message, DecoderError>> {
let inner = self.as_mut();
while inner.lock.compare_and_swap(false, true, Ordering::Acquire) != false {}
let result = inner.messages.pop_front();
inner.lock.store(false, Ordering::Release);
result
}
}
struct Inner {
lock: AtomicBool,
token: IoToken,
io: TcpStream,
decoder: Decoder,
buf: BytesMut,
task: Option<Task>,
peer: Option<net::SocketAddr>,
ready: bool,
started: bool,
messages: VecDeque<Result<Message, DecoderError>>,
}
impl Inner {
fn send(&mut self, msg: Result<Message, DecoderError>) {
while self.lock.compare_and_swap(false, true, Ordering::Acquire) != false {}
self.messages.push_back(msg);
if let Some(ref task) = self.task.as_ref() {
task.notify()
}
self.lock.store(false, Ordering::Release);
}
fn read_from_io(&mut self) -> Poll<(usize, bool), io::Error> {
let mut read = 0;
loop {
unsafe {
if self.buf.remaining_mut() < LW_BUFFER_SIZE {
self.buf.reserve(HW_BUFFER_SIZE);
}
match self.io.read(self.buf.bytes_mut()) {
Ok(n) => {
read += n;
if n == 0 {
return Ok(Async::Ready((read, true)));
} else {
self.buf.advance_mut(n);
}
}
Err(e) => {
return if e.kind() == io::ErrorKind::WouldBlock {
if read != 0 {
Ok(Async::Ready((read, false)))
} else {
Ok(Async::NotReady)
}
} else {
Err(e)
};
}
}
}
}
}
}

View File

@ -64,8 +64,11 @@
#![cfg_attr(actix_nightly, feature(
specialization, // for impl ErrorResponse for std::error::Error
))]
#![cfg_attr(feature = "cargo-clippy",
allow(decimal_literal_representation, suspicious_arithmetic_impl))]
#![cfg_attr(
feature = "cargo-clippy",
allow(decimal_literal_representation, suspicious_arithmetic_impl)
)]
#![allow(dead_code, unused_mut, unused_variables, unused_imports)]
#[macro_use]
extern crate log;
@ -127,6 +130,9 @@ extern crate openssl;
#[cfg(feature = "openssl")]
extern crate tokio_openssl;
extern crate iovec;
extern crate slab;
mod application;
mod body;
mod context;
@ -139,6 +145,7 @@ mod httpmessage;
mod httprequest;
mod httpresponse;
mod info;
mod io;
mod json;
mod param;
mod payload;
@ -215,6 +222,7 @@ pub mod http {
//! Various HTTP related types
// re-exports
pub use modhttp::header::{HeaderName, HeaderValue};
pub use modhttp::{Method, StatusCode, Version};
#[doc(hidden)]

View File

@ -1,3 +1,8 @@
#![allow(
dead_code, unused_imports, unused_imports, unreachable_code, unreachable_code,
unused_variables
)]
use std::net::{Shutdown, SocketAddr};
use std::rc::Rc;
use std::{io, mem, ptr, time};
@ -7,10 +12,12 @@ use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use super::settings::WorkerSettings;
use super::{utils, HttpHandler, IoStream, h1, h2};
//use super::{h1, h2, utils, HttpHandler, IoStream};
use super::{h1, utils, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
/*
enum HttpProtocol<T: IoStream, H: 'static> {
H1(h1::Http1<T, H>),
H2(h2::Http2<T, H>),
@ -163,10 +170,7 @@ where
match kind {
ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new(
settings,
io,
addr,
buf,
settings, io, addr, buf,
)));
return self.poll();
}
@ -183,7 +187,7 @@ where
}
unreachable!()
}
}
}*/
pub(crate) struct Node<T> {
next: Option<*mut Node<()>>,
@ -250,9 +254,9 @@ impl Node<()> {
next = n.next.as_ref();
if !n.element.is_null() {
let ch: &mut HttpChannel<T, H> =
mem::transmute(&mut *(n.element as *mut _));
ch.shutdown();
//let ch: &mut HttpChannel<T, H> =
// mem::transmute(&mut *(n.element as *mut _));
//ch.shutdown();
}
}
} else {

File diff suppressed because it is too large Load Diff

501
src/server/h1decoder.rs Normal file
View File

@ -0,0 +1,501 @@
#![allow(dead_code, unused_variables)]
use std::rc::Rc;
use std::sync::Arc;
use std::{io, mem};
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll};
use httparse;
use super::helpers::{SharedHttpInnerMessage, SharedMessagePool};
use error::ParseError;
use http::{header, HeaderName, HeaderValue, HttpTryFrom, Method, Uri, Version};
use uri::Url;
const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 96;
pub(crate) struct Decoder {
decoder: Option<EncodingDecoder>,
pool: Arc<SharedMessagePool>,
}
#[derive(Debug)]
pub(crate) enum Message {
Message {
msg: SharedHttpInnerMessage,
payload: bool,
},
Chunk(Bytes),
Eof,
Hup,
}
unsafe impl Send for Message {}
unsafe impl Sync for Message {}
#[derive(Debug)]
pub(crate) enum DecoderError {
Io(io::Error),
Error(ParseError),
}
impl From<io::Error> for DecoderError {
fn from(err: io::Error) -> DecoderError {
DecoderError::Io(err)
}
}
impl Decoder {
pub fn new(pool: Arc<SharedMessagePool>) -> Decoder {
Decoder {
pool,
decoder: None,
}
}
pub fn decode(&mut self, src: &mut BytesMut) -> Poll<Message, DecoderError> {
// read payload
if self.decoder.is_some() {
match self.decoder.as_mut().unwrap().decode(src)? {
Async::Ready(Some(bytes)) => {
return Ok(Async::Ready(Message::Chunk(bytes)))
}
Async::Ready(None) => {
self.decoder.take();
return Ok(Async::Ready(Message::Eof));
}
Async::NotReady => return Ok(Async::NotReady),
}
}
match self.parse_message(src).map_err(DecoderError::Error)? {
Async::Ready((msg, decoder)) => {
if let Some(decoder) = decoder {
self.decoder = Some(decoder);
return Ok(Async::Ready(Message::Message {
msg,
payload: true,
}));
} else {
return Ok(Async::Ready(Message::Message {
msg,
payload: false,
}));
}
}
Async::NotReady => {
if src.len() >= MAX_BUFFER_SIZE {
error!("MAX_BUFFER_SIZE unprocessed data reached, closing");
return Err(DecoderError::Error(ParseError::TooLarge));
}
return Ok(Async::NotReady);
}
}
}
fn parse_message(
&self, buf: &mut BytesMut,
) -> Poll<(SharedHttpInnerMessage, Option<EncodingDecoder>), ParseError> {
// Parse http message
let mut has_upgrade = false;
let mut chunked = false;
let mut content_length = None;
let msg = {
let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] =
unsafe { mem::uninitialized() };
let (len, method, path, version, headers_len) = {
let b = unsafe {
let b: &[u8] = buf;
mem::transmute(b)
};
let mut req = httparse::Request::new(&mut headers);
match req.parse(b)? {
httparse::Status::Complete(len) => {
let method = Method::from_bytes(req.method.unwrap().as_bytes())
.map_err(|_| ParseError::Method)?;
let path = Url::new(Uri::try_from(req.path.unwrap())?);
let version = if req.version.unwrap() == 1 {
Version::HTTP_11
} else {
Version::HTTP_10
};
(len, method, path, version, req.headers.len())
}
httparse::Status::Partial => return Ok(Async::NotReady),
}
};
let slice = buf.split_to(len).freeze();
// convert headers
let msg =
SharedHttpInnerMessage::new(self.pool.get(), Arc::clone(&self.pool));
{
let msg_mut = msg.get_mut();
let msg_mut = msg.get_mut();
msg_mut.keep_alive = version != Version::HTTP_10;
for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) {
has_upgrade = has_upgrade || name == header::UPGRADE;
let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len();
let value = unsafe {
HeaderValue::from_shared_unchecked(
slice.slice(v_start, v_end),
)
};
match name {
header::CONTENT_LENGTH => {
if let Ok(s) = value.to_str() {
if let Ok(len) = s.parse::<u64>() {
content_length = Some(len)
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
}
// transfer-encoding
header::TRANSFER_ENCODING => {
if let Ok(s) = value.to_str() {
chunked = s.to_lowercase().contains("chunked");
} else {
return Err(ParseError::Header);
}
}
// connection keep-alive state
header::CONNECTION => {
msg_mut.keep_alive = if let Ok(conn) = value.to_str() {
if version == Version::HTTP_10
&& conn.contains("keep-alive")
{
true
} else {
version == Version::HTTP_11
&& !(conn.contains("close")
|| conn.contains("upgrade"))
}
} else {
false
};
}
_ => (),
}
msg_mut.headers.append(name, value);
} else {
return Err(ParseError::Header);
}
}
msg_mut.url = path;
msg_mut.method = method;
msg_mut.version = version;
}
msg
};
// https://tools.ietf.org/html/rfc7230#section-3.3.3
let decoder = if chunked {
// Chunked encoding
Some(EncodingDecoder::chunked())
} else if let Some(len) = content_length {
// Content-Length
Some(EncodingDecoder::length(len))
} else if has_upgrade || msg.get_ref().method == Method::CONNECT {
// upgrade(websocket) or connect
Some(EncodingDecoder::eof())
} else {
None
};
Ok(Async::Ready((msg, decoder)))
}
}
/// Decoders to handle different Transfer-Encodings.
///
/// If a message body does not include a Transfer-Encoding, it *should*
/// include a Content-Length header.
#[derive(Debug, Clone, PartialEq)]
pub struct EncodingDecoder {
kind: Kind,
}
impl EncodingDecoder {
pub fn length(x: u64) -> EncodingDecoder {
EncodingDecoder {
kind: Kind::Length(x),
}
}
pub fn chunked() -> EncodingDecoder {
EncodingDecoder {
kind: Kind::Chunked(ChunkedState::Size, 0),
}
}
pub fn eof() -> EncodingDecoder {
EncodingDecoder {
kind: Kind::Eof(false),
}
}
}
#[derive(Debug, Clone, PartialEq)]
enum Kind {
/// A Reader used when a Content-Length header is passed with a positive
/// integer.
Length(u64),
/// A Reader used when Transfer-Encoding is `chunked`.
Chunked(ChunkedState, u64),
/// A Reader used for responses that don't indicate a length or chunked.
///
/// Note: This should only used for `Response`s. It is illegal for a
/// `Request` to be made with both `Content-Length` and
/// `Transfer-Encoding: chunked` missing, as explained from the spec:
///
/// > If a Transfer-Encoding header field is present in a response and
/// > the chunked transfer coding is not the final encoding, the
/// > message body length is determined by reading the connection until
/// > it is closed by the server. If a Transfer-Encoding header field
/// > is present in a request and the chunked transfer coding is not
/// > the final encoding, the message body length cannot be determined
/// > reliably; the server MUST respond with the 400 (Bad Request)
/// > status code and then close the connection.
Eof(bool),
}
#[derive(Debug, PartialEq, Clone)]
enum ChunkedState {
Size,
SizeLws,
Extension,
SizeLf,
Body,
BodyCr,
BodyLf,
EndCr,
EndLf,
End,
}
impl EncodingDecoder {
pub fn is_eof(&self) -> bool {
match self.kind {
Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => {
true
}
_ => false,
}
}
pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> {
match self.kind {
Kind::Length(ref mut remaining) => {
if *remaining == 0 {
Ok(Async::Ready(None))
} else {
if body.is_empty() {
return Ok(Async::NotReady);
}
let len = body.len() as u64;
let buf;
if *remaining > len {
buf = body.take().freeze();
*remaining -= len;
} else {
buf = body.split_to(*remaining as usize).freeze();
*remaining = 0;
}
trace!("Length read: {}", buf.len());
Ok(Async::Ready(Some(buf)))
}
}
Kind::Chunked(ref mut state, ref mut size) => {
loop {
let mut buf = None;
// advances the chunked state
*state = try_ready!(state.step(body, size, &mut buf));
if *state == ChunkedState::End {
trace!("End of chunked stream");
return Ok(Async::Ready(None));
}
if let Some(buf) = buf {
return Ok(Async::Ready(Some(buf)));
}
if body.is_empty() {
return Ok(Async::NotReady);
}
}
}
Kind::Eof(ref mut is_eof) => {
if *is_eof {
Ok(Async::Ready(None))
} else if !body.is_empty() {
Ok(Async::Ready(Some(body.take().freeze())))
} else {
Ok(Async::NotReady)
}
}
}
}
}
macro_rules! byte (
($rdr:ident) => ({
if $rdr.len() > 0 {
let b = $rdr[0];
$rdr.split_to(1);
b
} else {
return Ok(Async::NotReady)
}
})
);
impl ChunkedState {
fn step(
&self, body: &mut BytesMut, size: &mut u64, buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> {
use self::ChunkedState::*;
match *self {
Size => ChunkedState::read_size(body, size),
SizeLws => ChunkedState::read_size_lws(body),
Extension => ChunkedState::read_extension(body),
SizeLf => ChunkedState::read_size_lf(body, size),
Body => ChunkedState::read_body(body, size, buf),
BodyCr => ChunkedState::read_body_cr(body),
BodyLf => ChunkedState::read_body_lf(body),
EndCr => ChunkedState::read_end_cr(body),
EndLf => ChunkedState::read_end_lf(body),
End => Ok(Async::Ready(ChunkedState::End)),
}
}
fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> {
let radix = 16;
match byte!(rdr) {
b @ b'0'...b'9' => {
*size *= radix;
*size += u64::from(b - b'0');
}
b @ b'a'...b'f' => {
*size *= radix;
*size += u64::from(b + 10 - b'a');
}
b @ b'A'...b'F' => {
*size *= radix;
*size += u64::from(b + 10 - b'A');
}
b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)),
b';' => return Ok(Async::Ready(ChunkedState::Extension)),
b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)),
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size line: Invalid Size",
));
}
}
Ok(Async::Ready(ChunkedState::Size))
}
fn read_size_lws(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
trace!("read_size_lws");
match byte!(rdr) {
// LWS can follow the chunk size, but no more digits can come
b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)),
b';' => Ok(Async::Ready(ChunkedState::Extension)),
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid chunk size linear white space",
)),
}
}
fn read_extension(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
}
}
fn read_size_lf(
rdr: &mut BytesMut, size: &mut u64,
) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF"))
}
}
}
fn read_body(
rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option<Bytes>,
) -> Poll<ChunkedState, io::Error> {
trace!("Chunked read, remaining={:?}", rem);
let len = rdr.len() as u64;
if len == 0 {
Ok(Async::Ready(ChunkedState::Body))
} else {
let slice;
if *rem > len {
slice = rdr.take().freeze();
*rem -= len;
} else {
slice = rdr.split_to(*rem as usize).freeze();
*rem = 0;
}
*buf = Some(slice);
if *rem > 0 {
Ok(Async::Ready(ChunkedState::Body))
} else {
Ok(Async::Ready(ChunkedState::BodyCr))
}
}
}
fn read_body_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR"))
}
}
}
fn read_body_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::Size)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF"))
}
}
}
fn read_end_cr(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::EndLf)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR"))
}
}
}
fn read_end_lf(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
match byte!(rdr) {
b'\n' => Ok(Async::Ready(ChunkedState::End)),
_ => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF"))
}
}
}
}

View File

@ -1,4 +1,5 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
#![allow(dead_code, unused_mut, unused_variables)]
use bytes::BufMut;
use futures::{Async, Poll};
@ -13,10 +14,12 @@ use super::shared::SharedBytes;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body};
use header::ContentEncoding;
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use http::{Method, Version};
use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use io::{IoCommand, IoToken};
use server::worker::IoWriter;
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific
@ -29,9 +32,10 @@ bitflags! {
}
}
pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
pub(crate) struct H1Writer<H: 'static> {
flags: Flags,
stream: T,
token: IoToken,
stream: IoWriter,
encoder: ContentEncoder,
written: u64,
headers_size: u32,
@ -40,23 +44,25 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
settings: Rc<WorkerSettings<H>>,
}
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
impl<H: 'static> H1Writer<H> {
pub fn new(
stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>
) -> H1Writer<T, H> {
token: IoToken, stream: IoWriter, buf: SharedBytes,
settings: Rc<WorkerSettings<H>>,
) -> H1Writer<H> {
H1Writer {
token,
stream,
settings,
flags: Flags::empty(),
encoder: ContentEncoder::empty(buf.clone()),
written: 0,
headers_size: 0,
buffer: buf,
buffer_capacity: 0,
stream,
settings,
}
}
pub fn get_mut(&mut self) -> &mut T {
pub fn get_mut(&mut self) -> &mut IoWriter {
&mut self.stream
}
@ -73,28 +79,19 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
}
fn write_data(&mut self, data: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < data.len() {
match self.stream.write(&data[written..]) {
Ok(0) => {
self.disconnected();
return Err(io::Error::new(io::ErrorKind::WriteZero, ""));
}
Ok(n) => {
written += n;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(written)
}
Err(err) => return Err(err),
}
}
Ok(written)
pub fn done(&self, graceful: bool) {
self.stream.send(IoCommand::Done {
graceful,
token: self.token,
});
}
pub fn resume(&self) {
self.stream.send(IoCommand::Resume(self.token));
}
}
impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
impl<H: 'static> Writer for H1Writer<H> {
#[inline]
fn written(&self) -> u64 {
self.written
@ -227,16 +224,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
if self.flags.contains(Flags::STARTED) {
// shortcut for upgraded connection
if self.flags.contains(Flags::UPGRADE) {
if self.buffer.is_empty() {
let pl: &[u8] = payload.as_ref();
let n = self.write_data(pl)?;
if n < pl.len() {
self.buffer.extend_from_slice(&pl[n..]);
return Ok(WriterState::Done);
}
} else {
self.buffer.extend(payload);
}
self.buffer.extend(payload);
} else {
// TODO: add warning, write after EOF
self.encoder.write(payload)?;
@ -272,6 +260,12 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
#[inline]
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> {
if !self.buffer.is_empty() {
self.stream.send(IoCommand::Bytes(
self.token,
self.buffer.take().freeze(),
));
}
/*
let buf: &[u8] = unsafe { mem::transmute(self.buffer.as_ref()) };
let written = self.write_data(buf)?;
let _ = self.buffer.split_to(written);
@ -281,8 +275,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
}
if shutdown {
self.stream.shutdown()
} else {
Ok(Async::Ready(()))
}
} else {*/
Ok(Async::Ready(()))
}
}

View File

@ -1,24 +1,55 @@
use bytes::{BufMut, BytesMut};
use http::Version;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::{mem, ptr, slice};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{fmt, mem, ptr, slice};
use http::Version;
use httprequest::HttpInnerMessage;
const SIZE: usize = 128;
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpInnerMessage>>>);
pub(crate) struct SharedMessagePool {
head: AtomicUsize,
tail: AtomicUsize,
data: Option<[Rc<HttpInnerMessage>; 128]>,
}
impl SharedMessagePool {
pub fn new() -> SharedMessagePool {
SharedMessagePool(RefCell::new(VecDeque::with_capacity(128)))
SharedMessagePool {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
data: Some(unsafe { mem::uninitialized() }),
}
}
#[inline]
pub fn len(&self) -> usize {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Relaxed);
if tail > head {
tail - head
} else if tail < head {
tail + SIZE - head
} else {
0
}
}
fn as_mut_ptr(&self) -> *mut Rc<HttpInnerMessage> {
&self.data as *const _ as *mut _
}
#[inline]
pub fn get(&self) -> Rc<HttpInnerMessage> {
if let Some(msg) = self.0.borrow_mut().pop_front() {
msg
if self.len() > 0 {
let head = self.head.load(Ordering::Relaxed);
self.head.store((head + 1) % SIZE, Ordering::Relaxed);
unsafe { ptr::read(self.as_mut_ptr().offset(head as isize)) }
} else {
Rc::new(HttpInnerMessage::default())
}
@ -26,24 +57,32 @@ impl SharedMessagePool {
#[inline]
pub fn release(&self, mut msg: Rc<HttpInnerMessage>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
Rc::get_mut(&mut msg).unwrap().reset();
v.push_front(msg);
if self.len() < SIZE - 1 {
let tail = self.tail.load(Ordering::Relaxed);
unsafe {
ptr::write(self.as_mut_ptr().offset(tail as isize), msg);
}
self.tail.store((tail + 1) % SIZE, Ordering::Relaxed);
}
}
}
pub(crate) struct SharedHttpInnerMessage(
Option<Rc<HttpInnerMessage>>,
Option<Rc<SharedMessagePool>>,
Option<Arc<SharedMessagePool>>,
);
impl Drop for SharedHttpInnerMessage {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(msg) = self.0.take() {
if Rc::strong_count(&msg) == 1 {
if let Some(mut msg) = self.0.take() {
let release = if let Some(ref mut msg) = Rc::get_mut(&mut msg) {
msg.reset();
true
} else {
false
};
if release {
pool.release(msg);
}
}
@ -63,13 +102,19 @@ impl Default for SharedHttpInnerMessage {
}
}
impl fmt::Debug for SharedHttpInnerMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}
impl SharedHttpInnerMessage {
pub fn from_message(msg: HttpInnerMessage) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(Rc::new(msg)), None)
}
pub fn new(
msg: Rc<HttpInnerMessage>, pool: Rc<SharedMessagePool>
msg: Rc<HttpInnerMessage>, pool: Arc<SharedMessagePool>,
) -> SharedHttpInnerMessage {
SharedHttpInnerMessage(Some(msg), Some(pool))
}
@ -96,9 +141,8 @@ const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\
8081828384858687888990919293949596979899";
pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesMut) {
let mut buf: [u8; 13] = [
b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' '
];
let mut buf: [u8; 13] =
[b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' '];
match version {
Version::HTTP_2 => buf[5] = b'2',
Version::HTTP_10 => buf[7] = b'0',
@ -251,63 +295,33 @@ mod tests {
let mut bytes = BytesMut::new();
bytes.reserve(50);
write_content_length(0, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 0\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 0\r\n"[..]);
bytes.reserve(50);
write_content_length(9, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 9\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 9\r\n"[..]);
bytes.reserve(50);
write_content_length(10, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 10\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 10\r\n"[..]);
bytes.reserve(50);
write_content_length(99, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 99\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 99\r\n"[..]);
bytes.reserve(50);
write_content_length(100, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 100\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 100\r\n"[..]);
bytes.reserve(50);
write_content_length(101, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 101\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 101\r\n"[..]);
bytes.reserve(50);
write_content_length(998, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 998\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 998\r\n"[..]);
bytes.reserve(50);
write_content_length(1000, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 1000\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1000\r\n"[..]);
bytes.reserve(50);
write_content_length(1001, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 1001\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1001\r\n"[..]);
bytes.reserve(50);
write_content_length(5909, &mut bytes);
assert_eq!(
bytes.take().freeze(),
b"\r\ncontent-length: 5909\r\n"[..]
);
assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 5909\r\n"[..]);
}
}

View File

@ -10,9 +10,10 @@ use tokio_io::{AsyncRead, AsyncWrite};
mod channel;
pub(crate) mod encoding;
pub(crate) mod h1;
pub(crate) mod h1decoder;
mod h1writer;
mod h2;
mod h2writer;
//mod h2;
//mod h2writer;
pub(crate) mod helpers;
mod settings;
pub(crate) mod shared;
@ -28,6 +29,8 @@ use error::Error;
use header::ContentEncoding;
use httprequest::{HttpInnerMessage, HttpRequest};
use httpresponse::HttpResponse;
use io::IoCommand;
use server::worker::IoWriter;
/// max buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;

View File

@ -1,3 +1,5 @@
#![allow(dead_code, unused_mut, unused_variables)]
use bytes::BytesMut;
use futures_cpupool::{Builder, CpuPool};
use http::StatusCode;
@ -8,10 +10,10 @@ use std::sync::Arc;
use std::{fmt, mem, net};
use time;
use super::KeepAlive;
use super::channel::Node;
use super::helpers;
use super::shared::{SharedBytes, SharedBytesPool};
use super::KeepAlive;
use body::Body;
use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool};
@ -72,7 +74,7 @@ impl Default for ServerSettings {
impl ServerSettings {
/// Crate server settings instance
pub(crate) fn new(
addr: Option<net::SocketAddr>, host: &Option<String>, secure: bool
addr: Option<net::SocketAddr>, host: &Option<String>, secure: bool,
) -> ServerSettings {
let host = if let Some(ref host) = *host {
host.clone()
@ -119,7 +121,7 @@ impl ServerSettings {
#[inline]
pub(crate) fn get_response_builder(
&self, status: StatusCode
&self, status: StatusCode,
) -> HttpResponseBuilder {
HttpResponsePool::get_builder(&self.responses, status)
}
@ -133,7 +135,7 @@ pub(crate) struct WorkerSettings<H> {
keep_alive: u64,
ka_enabled: bool,
bytes: Rc<SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
messages: Arc<helpers::SharedMessagePool>,
channels: Cell<usize>,
node: Box<Node<()>>,
date: UnsafeCell<Date>,
@ -152,7 +154,7 @@ impl<H> WorkerSettings<H> {
ka_enabled,
h: RefCell::new(h),
bytes: Rc::new(SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
messages: Arc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
node: Box::new(Node::head()),
date: UnsafeCell::new(Date::new()),
@ -186,7 +188,7 @@ impl<H> WorkerSettings<H> {
pub fn get_http_message(&self) -> helpers::SharedHttpInnerMessage {
helpers::SharedHttpInnerMessage::new(
self.messages.get(),
Rc::clone(&self.messages),
Arc::clone(&self.messages),
)
}
@ -255,10 +257,7 @@ mod tests {
#[test]
fn test_date_len() {
assert_eq!(
DATE_VALUE_LENGTH,
"Sun, 06 Nov 1994 08:49:37 GMT".len()
);
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
}
#[test]

View File

@ -18,7 +18,8 @@ use native_tls::TlsAcceptor;
#[cfg(feature = "alpn")]
use openssl::ssl::{AlpnError, SslAcceptorBuilder};
use super::channel::{HttpChannel, WrapperStream};
//use super::channel::{HttpChannel, WrapperStream};
use super::channel::WrapperStream;
use super::settings::{ServerSettings, WorkerSettings};
use super::worker::{Conn, StopWorker, StreamHandlerType, Worker};
use super::{IntoHttpHandler, IoStream, KeepAlive};
@ -230,7 +231,7 @@ where
}
fn start_workers(
&mut self, settings: &ServerSettings, handler: &StreamHandlerType
&mut self, settings: &ServerSettings, handler: &StreamHandlerType,
) -> Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)> {
// start workers
let mut workers = Vec::new();
@ -427,7 +428,7 @@ impl<H: IntoHttpHandler> HttpServer<H> {
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn start_ssl(
mut self, mut builder: SslAcceptorBuilder
mut self, mut builder: SslAcceptorBuilder,
) -> io::Result<Addr<Syn, Self>> {
if self.sockets.is_empty() {
Err(io::Error::new(
@ -643,12 +644,12 @@ where
type Result = ();
fn handle(&mut self, msg: Conn<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::handle().spawn(HttpChannel::new(
Rc::clone(self.h.as_ref().unwrap()),
msg.io,
msg.peer,
msg.http2,
));
//Arbiter::handle().spawn(HttpChannel::new(
// Rc::clone(self.h.as_ref().unwrap()),
// msg.io,
// msg.peer,
// msg.http2,
//));
}
}
@ -777,7 +778,7 @@ fn start_accept_thread(
&reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
mio::PollOpt::level(),
) {
panic!("Can not register Registration: {}", err);
}
@ -789,133 +790,115 @@ fn start_accept_thread(
let sleep = Duration::from_millis(100);
let mut next = 0;
loop {
if let Err(err) = poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
#[cfg_attr(rustfmt, rustfmt_skip)]
for event in events.iter() {
match event.token() {
SRV => if let Some(ref server) = server {
loop {
match server.accept_std() {
Ok((sock, addr)) => {
let mut msg = Conn {
io: sock,
peer: Some(addr),
http2: false,
};
while !workers.is_empty() {
match workers[next].1.unbounded_send(msg) {
Ok(_) => (),
Err(err) => {
let _ = srv.unbounded_send(
ServerCommand::WorkerDied(
workers[next].0,
info.clone(),
),
);
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break;
} else if workers.len() <= next {
next = 0;
}
continue;
}
}
next = (next + 1) % workers.len();
loop {
if let Err(err) = poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
match event.token() {
SRV => if let Some(ref server) = server {
loop {
match server.accept_std() {
Ok((sock, addr)) => {
let mut msg = Conn {
io: sock,
peer: Some(addr),
http2: false,
};
while !workers.is_empty() {
match workers[next].1.unbounded_send(msg) {
Ok(_) => (),
Err(err) => {
let _ = srv.unbounded_send(
ServerCommand::WorkerDied(
workers[next].0, info.clone()));
msg = err.into_inner();
workers.swap_remove(next);
if workers.is_empty() {
error!("No workers");
thread::sleep(sleep);
break;
} else if workers.len() <= next {
next = 0;
}
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock =>
{
break
}
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// sleep after error
thread::sleep(sleep);
break;
continue;
}
}
next = (next + 1) % workers.len();
break;
}
},
CMD => match rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => if let Some(server) = server.take() {
if let Err(err) = poll.deregister(&server) {
error!(
"Can not deregister server socket {}",
err
);
} else {
info!(
"Paused accepting connections on {}",
addr
);
}
},
Command::Resume => {
let lst = create_tcp_listener(addr, backlog)
.expect("Can not create net::TcpListener");
server = Some(
mio::net::TcpListener::from_std(lst).expect(
"Can not create mio::net::TcpListener",
),
);
if let Some(ref server) = server {
if let Err(err) = poll.register(
server,
SRV,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed",
addr);
}
}
}
Command::Stop => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
Command::Worker(idx, addr) => {
workers.push((idx, addr));
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => (),
sync_mpsc::TryRecvError::Disconnected => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
},
},
_ => unreachable!(),
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { break }
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// sleep after error
thread::sleep(sleep);
break;
}
}
}
}
},
CMD => match rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => if let Some(server) = server.take() {
if let Err(err) = poll.deregister(&server) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", addr);
}
},
Command::Resume => {
let lst = create_tcp_listener(addr, backlog)
.expect("Can not create net::TcpListener");
server = Some(mio::net::TcpListener::from_std(lst).expect(
"Can not create mio::net::TcpListener"));
if let Some(ref server) = server {
if let Err(err) = poll.register(
server, SRV, mio::Ready::readable(), mio::PollOpt::edge())
{
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", addr);
}
}
}
Command::Stop => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
Command::Worker(idx, addr) => {
workers.push((idx, addr));
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => (),
sync_mpsc::TryRecvError::Disconnected => {
if let Some(server) = server.take() {
let _ = poll.deregister(&server);
}
return;
}
},
},
_ => unreachable!(),
}
}
}
});
(readiness, tx)
}
fn create_tcp_listener(
addr: net::SocketAddr, backlog: i32
addr: net::SocketAddr, backlog: i32,
) -> io::Result<net::TcpListener> {
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,

View File

@ -1,8 +1,10 @@
use futures::Future;
use futures::unsync::oneshot;
use net2::TcpStreamExt;
use std::rc::Rc;
use std::{net, time};
use std::{marker, net, time};
use futures::task::current as current_task;
use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use net2::TcpStreamExt;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
@ -22,10 +24,14 @@ use tokio_openssl::SslAcceptorExt;
use actix::msgs::StopArbiter;
use actix::*;
use server::channel::HttpChannel;
//use server::channel::HttpChannel;
use server::h1;
use server::h1decoder::Message as H1Message;
use server::settings::WorkerSettings;
use server::{HttpHandler, KeepAlive};
use io::{IoChannel, IoCommand, TaskCommand};
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
@ -55,21 +61,26 @@ where
hnd: Handle,
handler: StreamHandlerType,
tcp_ka: Option<time::Duration>,
io: IoWriter,
}
impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(
h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive
h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive,
) -> Worker<H> {
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
let io = IoWriter {
inner: Rc::new(IoChannel::new()),
};
Worker {
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
io,
handler,
tcp_ka,
}
@ -83,7 +94,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
}
fn shutdown_timeout(
&self, ctx: &mut Context<Self>, tx: oneshot::Sender<bool>, dur: time::Duration
&self, ctx: &mut Context<Self>, tx: oneshot::Sender<bool>, dur: time::Duration,
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
@ -111,6 +122,8 @@ where
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
self.io.inner.set_notify(current_task());
ctx.spawn(IoQueue(marker::PhantomData));
}
}
@ -124,8 +137,10 @@ where
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
error!("Can not set socket keep-alive option");
}
self.handler
.handle(Rc::clone(&self.settings), &self.hnd, msg);
//println!("HANDLE: {:?}", msg.io);
self.io.inner.add_source(msg.io, msg.peer, msg.http2);
//self.handler
//.handle(Rc::clone(&self.settings), &self.hnd, msg);
}
}
@ -163,9 +178,10 @@ pub(crate) enum StreamHandlerType {
Alpn(SslAcceptor),
}
/*
impl StreamHandlerType {
fn handle<H: HttpHandler>(
&mut self, h: Rc<WorkerSettings<H>>, hnd: &Handle, msg: Conn<net::TcpStream>
&mut self, h: Rc<WorkerSettings<H>>, hnd: &Handle, msg: Conn<net::TcpStream>,
) {
match *self {
StreamHandlerType::Normal => {
@ -185,12 +201,8 @@ impl StreamHandlerType {
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(HttpChannel::new(
h,
io,
peer,
http2,
)),
Ok(io) => Arbiter::handle()
.spawn(HttpChannel::new(h, io, peer, http2)),
Err(err) => {
trace!("Error during handling tls connection: {}", err)
}
@ -217,12 +229,8 @@ impl StreamHandlerType {
} else {
false
};
Arbiter::handle().spawn(HttpChannel::new(
h,
io,
peer,
http2,
));
Arbiter::handle()
.spawn(HttpChannel::new(h, io, peer, http2));
}
Err(err) => {
trace!("Error during handling tls connection: {}", err)
@ -234,4 +242,43 @@ impl StreamHandlerType {
}
}
}
}*/
struct IoQueue<H>(marker::PhantomData<H>);
impl<H: HttpHandler + 'static> fut::ActorFuture for IoQueue<H> {
type Item = ();
type Error = ();
type Actor = Worker<H>;
fn poll(&mut self, act: &mut Worker<H>, _: &mut Context<Worker<H>>) -> Poll<(), ()> {
act.io.inner.as_ref().start();
loop {
match act.io.inner.try_recv() {
Ok(TaskCommand::Stream(stream)) => {
act.hnd.spawn(h1::Http1::new(
Rc::clone(&act.settings),
stream,
act.io.clone(),
));
}
Err(_) => break,
}
}
act.io.inner.as_ref().end();
Ok(Async::NotReady)
}
}
#[derive(Clone)]
pub(crate) struct IoWriter {
pub inner: Rc<IoChannel>,
}
impl IoWriter {
pub fn send(&self, msg: IoCommand) {
self.inner.send(msg)
}
}

View File

@ -9,8 +9,8 @@ use std::io::Read;
use bytes::Bytes;
use flate2::read::GzDecoder;
use futures::Future;
use futures::stream::once;
use futures::Future;
use rand::Rng;
use actix_web::*;

View File

@ -14,9 +14,9 @@ extern crate brotli2;
#[cfg(feature = "brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut};
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder};
use flate2::Compression;
use futures::stream::once;
use futures::{future, Future, Stream};
use h2::client as h2client;
@ -62,11 +62,9 @@ fn test_start() {
thread::spawn(move || {
let sys = System::new("test");
let srv = server::new(|| {
vec![
App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
}),
]
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
let srv = srv.bind("127.0.0.1:0").unwrap();
@ -113,11 +111,9 @@ fn test_shutdown() {
thread::spawn(move || {
let sys = System::new("test");
let srv = server::new(|| {
vec![
App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
}),
]
vec![App::new().resource("/", |r| {
r.method(http::Method::GET).f(|_| HttpResponse::Ok())
})]
});
let srv = srv.bind("127.0.0.1:0").unwrap();
@ -844,7 +840,7 @@ impl<S> middleware::Middleware<S> for MiddlewareTest {
}
fn response(
&self, _: &mut HttpRequest<S>, resp: HttpResponse
&self, _: &mut HttpRequest<S>, resp: HttpResponse,
) -> Result<middleware::Response> {
self.response.store(
self.response.load(Ordering::Relaxed) + 1,

View File

@ -14,8 +14,8 @@ extern crate url;
use futures::Future;
use rand::{thread_rng, Rng};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use actix::prelude::*;