diff --git a/Cargo.toml b/Cargo.toml index 78c0d723b..f982eb302 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "0.5.5" +version = "0.6.0-dev" authors = ["Nikolay Kim "] description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust." readme = "README.md" @@ -79,8 +79,11 @@ bytes = "0.4" byteorder = "1" futures = "0.1" futures-cpupool = "0.1" -tokio-io = "0.1" -tokio-core = "0.1" +tokio-io = "=0.1.5" +tokio-core = "=0.1.12" + +slab = "0.4" +iovec = "0.1" # native-tls native-tls = { version="0.1", optional = true } @@ -102,6 +105,9 @@ lto = true opt-level = 3 codegen-units = 1 +[replace] +"mio:0.6.14" = {path="../mio"} + [workspace] members = [ "./", diff --git a/examples/test.rs b/examples/test.rs new file mode 100644 index 000000000..8dd72873f --- /dev/null +++ b/examples/test.rs @@ -0,0 +1,29 @@ +extern crate actix; +extern crate actix_web; +extern crate env_logger; + +use actix_web::{middleware, server, App, HttpRequest}; + +fn index(_req: HttpRequest) -> &'static str { + "Hello world!" +} + +fn main() { + //::std::env::set_var("RUST_LOG", "actix_web=info"); + //env_logger::init(); + let sys = actix::System::new("hello-world"); + + server::new(|| { + App::new() + // enable logger + //.middleware(middleware::Logger::default()) + .resource("/index.html", |r| r.f(|_| "Hello world!")) + .resource("/", |r| r.f(index)) + }).bind("127.0.0.1:8080") + .unwrap() + .threads(1) + .start(); + + println!("Started http server: 127.0.0.1:8080"); + let _ = sys.run(); +} diff --git a/rustfmt.toml b/rustfmt.toml index 98d2ba7db..465636641 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,7 +1,5 @@ max_width = 89 reorder_imports = true -reorder_imports_in_group = true -reorder_imported_names = true wrap_comments = true fn_args_density = "Compressed" -#use_small_heuristics = false +use_small_heuristics = false diff --git a/src/client/parser.rs b/src/client/parser.rs index 0d4da4c4d..d9e248850 100644 --- a/src/client/parser.rs +++ b/src/client/parser.rs @@ -7,18 +7,19 @@ use std::mem; use error::{ParseError, PayloadError}; -use server::h1::{chunked, Decoder}; +//use server::h1decoder::{chunked, EncodingDecoder}; +use server::h1decoder::EncodingDecoder; use server::{utils, IoStream}; -use super::ClientResponse; use super::response::ClientMessage; +use super::ClientResponse; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; #[derive(Default)] pub struct HttpResponseParser { - decoder: Option, + decoder: Option, } #[derive(Debug, Fail)] @@ -32,7 +33,7 @@ pub enum HttpResponseParserError { impl HttpResponseParser { pub fn parse( - &mut self, io: &mut T, buf: &mut BytesMut + &mut self, io: &mut T, buf: &mut BytesMut, ) -> Poll where T: IoStream, @@ -75,7 +76,7 @@ impl HttpResponseParser { } pub fn parse_payload( - &mut self, io: &mut T, buf: &mut BytesMut + &mut self, io: &mut T, buf: &mut BytesMut, ) -> Poll, PayloadError> where T: IoStream, @@ -113,8 +114,8 @@ impl HttpResponseParser { } fn parse_message( - buf: &mut BytesMut - ) -> Poll<(ClientResponse, Option), ParseError> { + buf: &mut BytesMut, + ) -> Poll<(ClientResponse, Option), ParseError> { // Parse http message let bytes_ptr = buf.as_ref().as_ptr() as usize; let mut headers: [httparse::Header; MAX_HEADERS] = @@ -160,12 +161,12 @@ impl HttpResponseParser { } let decoder = if status == StatusCode::SWITCHING_PROTOCOLS { - Some(Decoder::eof()) + Some(EncodingDecoder::eof()) } else if let Some(len) = hdrs.get(header::CONTENT_LENGTH) { // Content-Length if let Ok(s) = len.to_str() { if let Ok(len) = s.parse::() { - Some(Decoder::length(len)) + Some(EncodingDecoder::length(len)) } else { debug!("illegal Content-Length: {:?}", len); return Err(ParseError::Header); @@ -174,9 +175,9 @@ impl HttpResponseParser { debug!("illegal Content-Length: {:?}", len); return Err(ParseError::Header); } - } else if chunked(&hdrs)? { - // Chunked encoding - Some(Decoder::chunked()) + //} else if chunked(&hdrs)? { + // Chunked encoding + // Some(EncodingDecoder::chunked()) } else { None }; diff --git a/src/context.rs b/src/context.rs index b095c29bc..1c7c4eb72 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,9 +1,10 @@ +use std::marker::PhantomData; +use std::mem; + use futures::sync::oneshot::Sender; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use smallvec::SmallVec; -use std::marker::PhantomData; -use std::mem; use actix::dev::{ContextImpl, SyncEnvelope, ToEnvelope}; use actix::fut::ActorFuture; @@ -261,7 +262,7 @@ impl ActorFuture for Drain { #[inline] fn poll( - &mut self, _: &mut A, _: &mut ::Context + &mut self, _: &mut A, _: &mut ::Context, ) -> Poll { self.fut.poll().map_err(|_| ()) } diff --git a/src/httprequest.rs b/src/httprequest.rs index ee2bd5a79..b69ddae26 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -314,7 +314,7 @@ impl HttpRequest { /// } /// ``` pub fn url_for( - &self, name: &str, elements: U + &self, name: &str, elements: U, ) -> Result where U: IntoIterator, @@ -592,6 +592,29 @@ impl fmt::Debug for HttpRequest { } } +impl fmt::Debug for HttpInnerMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let res = writeln!( + f, + "\nHttpInnerMessage {:?} {}:{}", + self.version, + self.method, + self.url.path() + ); + if let Some(query) = self.url.uri().query().as_ref() { + let _ = writeln!(f, " query: ?{:?}", query); + } + if !self.params.is_empty() { + let _ = writeln!(f, " params: {:?}", self.params); + } + let _ = writeln!(f, " headers:"); + for (key, val) in self.headers.iter() { + let _ = writeln!(f, " {:?}: {:?}", key, val); + } + res + } +} + #[cfg(test)] mod tests { #![allow(deprecated)] @@ -681,12 +704,10 @@ mod tests { let mut resource = ResourceHandler::<()>::default(); resource.name("index"); - let routes = vec![ - ( - Resource::new("index", "/user/{name}.{ext}"), - Some(resource), - ), - ]; + let routes = vec![( + Resource::new("index", "/user/{name}.{ext}"), + Some(resource), + )]; let (router, _) = Router::new("/", ServerSettings::default(), routes); assert!(router.has_route("/user/test.html")); assert!(!router.has_route("/test/unknown")); @@ -715,12 +736,10 @@ mod tests { let mut resource = ResourceHandler::<()>::default(); resource.name("index"); - let routes = vec![ - ( - Resource::new("index", "/user/{name}.{ext}"), - Some(resource), - ), - ]; + let routes = vec![( + Resource::new("index", "/user/{name}.{ext}"), + Some(resource), + )]; let (router, _) = Router::new("/prefix/", ServerSettings::default(), routes); assert!(router.has_route("/user/test.html")); assert!(!router.has_route("/prefix/user/test.html")); @@ -739,12 +758,10 @@ mod tests { let mut resource = ResourceHandler::<()>::default(); resource.name("index"); - let routes = vec![ - ( - Resource::external("youtube", "https://youtube.com/watch/{video_id}"), - None, - ), - ]; + let routes = vec![( + Resource::external("youtube", "https://youtube.com/watch/{video_id}"), + None, + )]; let (router, _) = Router::new::<()>("", ServerSettings::default(), routes); assert!(!router.has_route("https://youtube.com/watch/unknown")); diff --git a/src/io/channel.rs b/src/io/channel.rs new file mode 100644 index 000000000..51778d9f2 --- /dev/null +++ b/src/io/channel.rs @@ -0,0 +1,156 @@ +#![allow(dead_code, unused_imports)] +use std::cell::UnsafeCell; +use std::net::{SocketAddr, TcpStream}; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{mpsc, Arc}; +use std::{io, thread}; + +use bytes::Bytes; +use futures::task::{current as current_task, Task}; +use mio; + +use super::IoStream; +use io::{Core, IoToken}; +use server::h1decoder::{Decoder, DecoderError, Message}; + +pub(crate) enum TaskCommand { + Stream(IoStream), +} + +pub(crate) enum IoCommand { + AddSource(TcpStream, Option), + Bytes(IoToken, Bytes), + Pause(IoToken), + Drain(IoToken), + Resume(IoToken), + Done { token: IoToken, graceful: bool }, +} + +struct Shared { + io: AtomicBool, + io_tx: mpsc::Sender, + io_rx: mpsc::Receiver, + io_reg: mio::Registration, + io_notify: mio::SetReadiness, + + task: AtomicBool, + task_tx: mpsc::Sender, + task_rx: mpsc::Receiver, + task_notify: UnsafeCell, +} + +pub(crate) struct IoChannel { + shared: Arc, +} + +impl IoChannel { + pub fn new() -> Self { + let (reg, notify) = mio::Registration::new2(); + let (tx1, rx1) = mpsc::channel(); + let (tx2, rx2) = mpsc::channel(); + + let shared = Arc::new(Shared { + io: AtomicBool::new(true), + io_tx: tx1, + io_rx: rx1, + io_reg: reg, + io_notify: notify, + + task: AtomicBool::new(true), + task_tx: tx2, + task_rx: rx2, + task_notify: UnsafeCell::new(current_task()), + }); + + let ch = TaskChannel { + shared: Arc::clone(&shared), + }; + thread::spawn(move || { + let core = Core::new(ch).unwrap(); + core.run() + }); + + IoChannel { shared } + } + + pub fn notify(&self) { + if !self.shared.io.load(Ordering::Relaxed) { + let _ = self.shared + .io_notify + .set_readiness(mio::Ready::readable()); + } + } + + pub fn set_notify(&self, task: Task) { + unsafe { *self.shared.task_notify.get() = task }; + self.shared.task.store(false, Ordering::Relaxed); + } + + pub fn add_source(&self, io: TcpStream, peer: Option, _http2: bool) { + self.send(IoCommand::AddSource(io, peer)); + self.notify(); + } + + #[inline] + pub fn start(&self) { + self.shared.task.store(true, Ordering::Relaxed) + } + + #[inline] + pub fn end(&self) { + self.shared.task.store(false, Ordering::Relaxed) + } + + #[inline] + pub fn send(&self, msg: IoCommand) { + let _ = self.shared.io_tx.send(msg); + self.notify(); + } + + #[inline] + pub fn try_recv(&self) -> Result { + self.shared.task_rx.try_recv() + } +} + +pub(crate) struct TaskChannel { + shared: Arc, +} + +unsafe impl Send for TaskChannel {} + +impl TaskChannel { + #[inline] + pub fn notify(&self) { + if !self.shared.task.load(Ordering::Relaxed) { + let task = unsafe { &mut *self.shared.task_notify.get() }; + task.notify(); + } + } + + #[inline] + pub fn send(&self, msg: TaskCommand) { + let _ = self.shared.task_tx.send(msg); + } + + #[inline] + pub fn registration(&self) -> &mio::Registration { + &self.shared.io_reg + } + + #[inline] + pub fn start(&self) { + self.shared.io.store(true, Ordering::Relaxed) + } + + #[inline] + pub fn end(&self) { + self.shared.io.store(false, Ordering::Relaxed) + } + + #[inline] + pub fn try_recv(&self) -> Result { + self.shared.io_rx.try_recv() + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs new file mode 100644 index 000000000..befaece30 --- /dev/null +++ b/src/io/mod.rs @@ -0,0 +1,383 @@ +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::io::{self, Read, Write}; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_BOOL_INIT}; +use std::sync::{mpsc, Arc}; +use std::{mem, net}; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::task::Task; +use futures::{Async, Poll}; +use mio; +use mio::event::Evented; +use mio::net::TcpStream; +use slab::Slab; + +use server::h1decoder::{Decoder, DecoderError, Message}; +use server::helpers::SharedMessagePool; + +mod channel; +pub(crate) use self::channel::{IoChannel, IoCommand, TaskChannel, TaskCommand}; + +const TOKEN_NOTIFY: usize = 0; +const TOKEN_START: usize = 2; +const LW_BUFFER_SIZE: usize = 4096; +const HW_BUFFER_SIZE: usize = 32_768; + +pub(crate) struct Core { + mio: mio::Poll, + events: mio::Events, + state: Arc, + io: Slab, + channel: TaskChannel, + pool: Arc, +} + +impl Core { + pub fn new(channel: TaskChannel) -> io::Result { + let mio = mio::Poll::new()?; + let notify = mio::Token(TOKEN_NOTIFY); + + // notify stream + mio.register( + channel.registration(), + notify, + mio::Ready::readable(), + mio::PollOpt::edge(), + )?; + + Ok(Core { + mio, + channel, + io: Slab::new(), + events: mio::Events::with_capacity(1024), + state: Arc::new(AtomicBool::new(true)), + pool: Arc::new(SharedMessagePool::new()), + }) + } + + pub fn run(mut self) { + loop { + self.dispatch_commands(); + self.poll_io(); + } + } + + fn poll_io(&mut self) { + //println!("POLL IO"); + // Block waiting for an event to happen + let _amt = match self.mio.poll(&mut self.events, None) { + Ok(a) => a, + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => return, + Err(e) => panic!("Poll error: {}", e), + }; + + let mut modified = false; + for event in self.events.iter() { + let token = usize::from(event.token()); + //println!("event: {:?}", event); + + if token != TOKEN_NOTIFY { + let mut remove = false; + if let Some(io) = self.io.get_mut(token - TOKEN_START) { + match io.poll(event.readiness(), &self.channel) { + IoResult::Notify => modified = true, + IoResult::Remove => remove = true, + IoResult::StopReading => {} + IoResult::StopWriting => {} + IoResult::NotReady => (), + } + } + if remove { + if self.io.contains(token - TOKEN_START) { + let _ = self.io.remove(token - TOKEN_START); + } + } + } + } + + if modified { + self.channel.notify(); + } + } + + fn dispatch_commands(&mut self) { + self.channel.start(); + loop { + match self.channel.try_recv() { + Ok(IoCommand::AddSource(source, peer)) => { + match TcpStream::from_stream(source) { + Ok(stream) => match self.add_source(stream, peer) { + Ok(token) => (), + Err(_) => (), + }, + Err(e) => { + error!("Can not register io object: {}", e); + } + } + } + Ok(IoCommand::Bytes(token, bytes)) => { + if let Some(io) = self.io.get_mut(token.token) { + io.write(bytes); + } + } + Ok(IoCommand::Drain(token)) => {} + Ok(IoCommand::Pause(token)) => {} + Ok(IoCommand::Resume(token)) => { + if let Some(io) = self.io.get_mut(token.token) { + // io.as_mut().ready = true; + } + } + Ok(IoCommand::Done { + token, + graceful, + }) => { + if self.io.contains(token.token) { + let _ = self.io.remove(token.token); + } + } + Err(_) => break, + } + } + self.channel.end(); + } + + fn add_source( + &mut self, io: TcpStream, peer: Option, + ) -> io::Result { + debug!("adding a new I/O source"); + if self.io.len() == self.io.capacity() { + let amt = self.io.len(); + self.io.reserve_exact(amt); + } + let entry = self.io.vacant_entry(); + let token = entry.key(); + + self.mio.register( + &io, + mio::Token(TOKEN_START + token), + mio::Ready::readable() | mio::Ready::writable(), + mio::PollOpt::edge(), + )?; + + let token = IoToken { + token, + }; + let decoder = Decoder::new(Arc::clone(&self.pool)); + let io = Io { + buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + inner: Arc::new(UnsafeCell::new(Inner { + io, + token, + decoder, + peer, + lock: ATOMIC_BOOL_INIT, + task: None, + ready: false, + started: false, + buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + messages: VecDeque::new(), + })), + }; + + entry.insert(io); + Ok(token) + } +} + +#[derive(Clone, Copy, Debug)] +pub struct IoToken { + token: usize, +} + +#[derive(Debug)] +enum IoResult { + NotReady, + Notify, + Remove, + StopReading, + StopWriting, +} + +struct Io { + inner: Arc>, + buf: BytesMut, +} + +impl Io { + #[inline] + fn as_mut(&mut self) -> &mut Inner { + unsafe { &mut *self.inner.as_ref().get() } + } + + fn write(&mut self, data: Bytes) { + //self.buf.extend_from_slice(&data); + + let inner: &mut Inner = unsafe { &mut *self.inner.as_ref().get() }; + + //while !self.buf.is_empty() { + match inner.io.write(&data) { + Ok(0) => { + // self.disconnected(); + // return Err(io::Error::new(io::ErrorKind::WriteZero, "")); + return + } + Ok(n) => { + //self.buf.split_to(n); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + return + } + Err(err) => return // Err(err), + } + //} + } + + fn poll(&mut self, ready: mio::Ready, channel: &TaskChannel) -> IoResult { + let inner: &mut Inner = unsafe { mem::transmute(self.as_mut()) }; + let mut updated = IoResult::NotReady; + let (read, eof) = match inner.read_from_io() { + Ok(Async::Ready((n, eof))) => (n, eof), + Ok(Async::NotReady) => return IoResult::NotReady, + Err(e) => { + if inner.started { + info!("error during io: {:?}", e); + inner.send(Err(e.into())); + return IoResult::NotReady; + } else { + info!("error during io before message: {:?}", e); + // first message is not ready, so we can drop connection + return IoResult::Remove; + } + } + }; + loop { + let msg = match inner.decoder.decode(&mut inner.buf) { + Ok(Async::NotReady) => { + if eof { + if inner.started { + inner.send(Ok(Message::Hup)); + } else { + return IoResult::Remove; + } + } + break; + } + Ok(Async::Ready(msg)) => Ok(msg), + Err(e) => Err(e), + }; + + if inner.started { + inner.send(msg); + } else { + if msg.is_ok() { + inner.started = true; + inner.messages.push_back(msg); + let inner = self.inner.clone(); + let _ = channel.send(TaskCommand::Stream(IoStream { + inner, + })); + } else { + // first message is not ready, so we can drop connection + return IoResult::Remove; + } + } + } + //println!("READY {:?} {:?}", ready, updated); + updated + } +} + +pub(crate) struct IoStream { + inner: Arc>, +} + +impl IoStream { + pub fn token(&self) -> IoToken { + self.as_mut().token + } + + pub fn peer(&self) -> Option { + self.as_mut().peer + } + + pub fn set_notify(&self, task: Task) { + let inner = self.as_mut(); + while inner.lock.compare_and_swap(false, true, Ordering::Acquire) != false {} + inner.task = Some(task); + inner.lock.store(false, Ordering::Release); + } + + #[inline] + fn as_mut(&self) -> &mut Inner { + unsafe { &mut *self.inner.as_ref().get() } + } + + pub fn try_recv(&self) -> Option> { + let inner = self.as_mut(); + while inner.lock.compare_and_swap(false, true, Ordering::Acquire) != false {} + let result = inner.messages.pop_front(); + + inner.lock.store(false, Ordering::Release); + result + } +} + +struct Inner { + lock: AtomicBool, + token: IoToken, + io: TcpStream, + decoder: Decoder, + buf: BytesMut, + task: Option, + peer: Option, + ready: bool, + started: bool, + messages: VecDeque>, +} + +impl Inner { + fn send(&mut self, msg: Result) { + while self.lock.compare_and_swap(false, true, Ordering::Acquire) != false {} + self.messages.push_back(msg); + + if let Some(ref task) = self.task.as_ref() { + task.notify() + } + + self.lock.store(false, Ordering::Release); + } + + fn read_from_io(&mut self) -> Poll<(usize, bool), io::Error> { + let mut read = 0; + loop { + unsafe { + if self.buf.remaining_mut() < LW_BUFFER_SIZE { + self.buf.reserve(HW_BUFFER_SIZE); + } + match self.io.read(self.buf.bytes_mut()) { + Ok(n) => { + read += n; + if n == 0 { + return Ok(Async::Ready((read, true))); + } else { + self.buf.advance_mut(n); + } + } + Err(e) => { + return if e.kind() == io::ErrorKind::WouldBlock { + if read != 0 { + Ok(Async::Ready((read, false))) + } else { + Ok(Async::NotReady) + } + } else { + Err(e) + }; + } + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 1a0ac8ade..e64d92690 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,8 +64,11 @@ #![cfg_attr(actix_nightly, feature( specialization, // for impl ErrorResponse for std::error::Error ))] -#![cfg_attr(feature = "cargo-clippy", - allow(decimal_literal_representation, suspicious_arithmetic_impl))] +#![cfg_attr( + feature = "cargo-clippy", + allow(decimal_literal_representation, suspicious_arithmetic_impl) +)] +#![allow(dead_code, unused_mut, unused_variables, unused_imports)] #[macro_use] extern crate log; @@ -127,6 +130,9 @@ extern crate openssl; #[cfg(feature = "openssl")] extern crate tokio_openssl; +extern crate iovec; +extern crate slab; + mod application; mod body; mod context; @@ -139,6 +145,7 @@ mod httpmessage; mod httprequest; mod httpresponse; mod info; +mod io; mod json; mod param; mod payload; @@ -215,6 +222,7 @@ pub mod http { //! Various HTTP related types // re-exports + pub use modhttp::header::{HeaderName, HeaderValue}; pub use modhttp::{Method, StatusCode, Version}; #[doc(hidden)] diff --git a/src/server/channel.rs b/src/server/channel.rs index 7a4bc64bf..ab5146752 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,3 +1,8 @@ +#![allow( + dead_code, unused_imports, unused_imports, unreachable_code, unreachable_code, + unused_variables +)] + use std::net::{Shutdown, SocketAddr}; use std::rc::Rc; use std::{io, mem, ptr, time}; @@ -7,10 +12,12 @@ use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use super::settings::WorkerSettings; -use super::{utils, HttpHandler, IoStream, h1, h2}; +//use super::{h1, h2, utils, HttpHandler, IoStream}; +use super::{h1, utils, HttpHandler, IoStream}; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; +/* enum HttpProtocol { H1(h1::Http1), H2(h2::Http2), @@ -163,10 +170,7 @@ where match kind { ProtocolKind::Http1 => { self.proto = Some(HttpProtocol::H1(h1::Http1::new( - settings, - io, - addr, - buf, + settings, io, addr, buf, ))); return self.poll(); } @@ -183,7 +187,7 @@ where } unreachable!() } -} +}*/ pub(crate) struct Node { next: Option<*mut Node<()>>, @@ -250,9 +254,9 @@ impl Node<()> { next = n.next.as_ref(); if !n.element.is_null() { - let ch: &mut HttpChannel = - mem::transmute(&mut *(n.element as *mut _)); - ch.shutdown(); + //let ch: &mut HttpChannel = + // mem::transmute(&mut *(n.element as *mut _)); + //ch.shutdown(); } } } else { diff --git a/src/server/h1.rs b/src/server/h1.rs index ec0b1938a..eb0906c95 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -1,13 +1,17 @@ -#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] +#![allow( + dead_code, unused_imports, unused_imports, unreachable_code, unreachable_code, + unused_variables +)] use std::collections::VecDeque; use std::net::SocketAddr; use std::rc::Rc; +use std::sync::Arc; use std::time::Duration; -use std::{self, io}; use actix::Arbiter; use bytes::{Bytes, BytesMut}; +use futures::task::current; use futures::{Async, Future, Poll}; use http::header::{self, HeaderName, HeaderValue}; use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; @@ -22,10 +26,13 @@ use pipeline::Pipeline; use uri::Url; use super::encoding::PayloadType; +use super::h1decoder::{DecoderError, Message}; use super::h1writer::H1Writer; use super::settings::WorkerSettings; -use super::{HttpHandler, HttpHandlerTask, IoStream}; +use super::worker::IoWriter; use super::{utils, Writer}; +use super::{HttpHandler, HttpHandlerTask}; +use io::{IoCommand, IoStream}; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; @@ -37,6 +44,7 @@ bitflags! { const ERROR = 0b0000_0010; const KEEPALIVE = 0b0000_0100; const SHUTDOWN = 0b0000_1000; + const DISCONNECTED = 0b0001_0000; } } @@ -48,14 +56,15 @@ bitflags! { } } -pub(crate) struct Http1 { +pub(crate) struct Http1 { flags: Flags, settings: Rc>, addr: Option, - stream: H1Writer, - reader: Reader, - read_buf: BytesMut, + stream: IoStream, + writer: H1Writer, + payload: Option, tasks: VecDeque, + others: VecDeque, keepalive_timer: Option, } @@ -64,25 +73,27 @@ struct Entry { flags: EntryFlags, } -impl Http1 +impl Http1 where - T: IoStream, H: HttpHandler + 'static, { pub fn new( - settings: Rc>, stream: T, addr: Option, - read_buf: BytesMut, + settings: Rc>, stream: IoStream, writer: IoWriter, ) -> Self { + let addr = stream.peer(); + let token = stream.token(); let bytes = settings.get_shared_bytes(); + let writer = H1Writer::new(token, writer, bytes, Rc::clone(&settings)); Http1 { - flags: Flags::KEEPALIVE, - stream: H1Writer::new(stream, bytes, Rc::clone(&settings)), - reader: Reader::new(), - tasks: VecDeque::new(), - keepalive_timer: None, addr, - read_buf, + stream, settings, + writer, + flags: Flags::KEEPALIVE, + tasks: VecDeque::new(), + others: VecDeque::new(), + payload: None, + keepalive_timer: None, } } @@ -90,127 +101,96 @@ where self.settings.as_ref() } - pub(crate) fn io(&mut self) -> &mut T { - self.stream.get_mut() - } - - pub fn poll(&mut self) -> Poll<(), ()> { - // keep-alive timer - if let Some(ref mut timer) = self.keepalive_timer { - match timer.poll() { - Ok(Async::Ready(_)) => { - trace!("Keep-alive timeout, close connection"); - self.flags.insert(Flags::SHUTDOWN); - } - Ok(Async::NotReady) => (), - Err(_) => unreachable!(), - } - } - - // shutdown - if self.flags.contains(Flags::SHUTDOWN) { - match self.stream.poll_completed(true) { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(_)) => return Ok(Async::Ready(())), - Err(err) => { - debug!("Error sending data: {}", err); - return Err(()); - } - } - } - - loop { - match self.poll_io()? { - Async::Ready(true) => (), - Async::Ready(false) => { - self.flags.insert(Flags::SHUTDOWN); - return self.poll(); - } - Async::NotReady => return Ok(Async::NotReady), - } - } - } - - // TODO: refactor - pub fn poll_io(&mut self) -> Poll { - // read incoming data - let need_read = if !self.flags.intersects(Flags::ERROR) - && self.tasks.len() < MAX_PIPELINED_MESSAGES - { - 'outer: loop { - match self.reader.parse( - self.stream.get_mut(), - &mut self.read_buf, - &self.settings, - ) { - Ok(Async::Ready(mut req)) => { - self.flags.insert(Flags::STARTED); - - // set remote addr - req.set_peer_addr(self.addr); - - // stop keepalive timer - self.keepalive_timer.take(); - - // start request processing - for h in self.settings.handlers().iter_mut() { - req = match h.handle(req) { - Ok(pipe) => { - self.tasks.push_back(Entry { - pipe, - flags: EntryFlags::empty(), - }); - continue 'outer; - } - Err(req) => req, - } - } - - self.tasks.push_back(Entry { - pipe: Pipeline::error(HttpResponse::NotFound()), - flags: EntryFlags::empty(), - }); - continue; - } - Ok(Async::NotReady) => (), - Err(err) => { - trace!("Parse error: {:?}", err); - - // notify all tasks - self.stream.disconnected(); - for entry in &mut self.tasks { - entry.pipe.disconnected() - } - - // kill keepalive - self.flags.remove(Flags::KEEPALIVE); - self.keepalive_timer.take(); - - // on parse error, stop reading stream but tasks need to be - // completed - self.flags.insert(Flags::ERROR); - - match err { - ReaderError::Disconnect => (), - _ => if self.tasks.is_empty() { - if let ReaderError::Error(err) = err { - self.tasks.push_back(Entry { - pipe: Pipeline::error(err.error_response()), - flags: EntryFlags::empty(), - }); - } - }, - } - } - } - break; - } - false + #[inline] + fn need_read(&self) -> PayloadStatus { + if let Some(ref info) = self.payload { + info.need_read() } else { - true - }; + PayloadStatus::Read + } + } - let retry = self.reader.need_read() == PayloadStatus::Read; + fn poll_stream(&mut self) { + //println!("STREAM"); + + 'outter: loop { + match self.stream.try_recv() { + Some(Ok(Message::Message { + msg, + payload, + })) => { + if payload { + let (ps, pl) = Payload::new(false); + msg.get_mut().payload = Some(pl); + self.payload = + Some(PayloadType::new(&msg.get_ref().headers, ps)); + } + + let mut req = HttpRequest::from_message(msg); + //println!("{:?}", req); + + // search handler for request + for h in self.settings.handlers().iter_mut() { + req = match h.handle(req) { + Ok(pipe) => { + self.tasks.push_back(Entry { + pipe, + flags: EntryFlags::empty(), + }); + continue 'outter; + } + Err(req) => req, + } + } + + // handler is not found + self.tasks.push_back(Entry { + pipe: Pipeline::error(HttpResponse::NotFound()), + flags: EntryFlags::empty(), + }); + } + Some(Ok(Message::Chunk(chunk))) => { + if let Some(ref mut payload) = self.payload { + payload.feed_data(chunk); + } else { + panic!(""); + } + } + Some(Ok(Message::Eof)) => { + if let Some(ref mut payload) = self.payload { + payload.feed_eof(); + } else { + panic!(""); + } + } + Some(Ok(Message::Hup)) => { + self.writer.done(false); + self.flags.insert(Flags::DISCONNECTED); + if let Some(ref mut payload) = self.payload { + payload.set_error(PayloadError::Incomplete); + } + break; + } + Some(Err(e)) => { + self.writer.done(false); + self.flags.insert(Flags::ERROR); + if let Some(ref mut payload) = self.payload { + let e = match e { + DecoderError::Io(e) => PayloadError::Io(e), + DecoderError::Error(e) => PayloadError::EncodingCorrupted, + }; + payload.set_error(e); + } + } + None => break, + } + } + } + + fn poll_io(&mut self) -> Poll { + //println!("IO"); + + let retry = self.need_read() == PayloadStatus::Read; // check in-flight messages let mut io = false; @@ -221,35 +201,32 @@ where if !io && !item.flags.contains(EntryFlags::EOF) { // io is corrupted, send buffer if item.flags.contains(EntryFlags::ERROR) { - if let Ok(Async::NotReady) = self.stream.poll_completed(true) { - return Ok(Async::NotReady); - } + self.writer.done(true); return Err(()); } - match item.pipe.poll_io(&mut self.stream) { + match item.pipe.poll_io(&mut self.writer) { Ok(Async::Ready(ready)) => { // override keep-alive state - if self.stream.keepalive() { - self.flags.insert(Flags::KEEPALIVE); - } else { - self.flags.remove(Flags::KEEPALIVE); - } + // if self.stream.keepalive() { + // self.flags.insert(Flags::KEEPALIVE); + // } else { + // self.flags.remove(Flags::KEEPALIVE); + // } // prepare stream for next response - self.stream.reset(); + // self.stream.reset(); if ready { - item.flags - .insert(EntryFlags::EOF | EntryFlags::FINISHED); + item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED); } else { item.flags.insert(EntryFlags::FINISHED); } } // no more IO for this iteration Ok(Async::NotReady) => { - if self.reader.need_read() == PayloadStatus::Read && !retry { - return Ok(Async::Ready(true)); - } + //if self.need_read() == PayloadStatus::Read && !retry { + //self.writer.resume(); + //} io = true; } Err(err) => { @@ -259,9 +236,8 @@ where item.flags.insert(EntryFlags::ERROR); // check stream state, we still can have valid data in buffer - if let Ok(Async::NotReady) = self.stream.poll_completed(true) { - return Ok(Async::NotReady); - } + self.writer.done(true); + return Err(()); } } @@ -271,7 +247,7 @@ where Ok(Async::Ready(_)) => item.flags.insert(EntryFlags::FINISHED), Err(err) => { item.flags.insert(EntryFlags::ERROR); - error!("Unhandled error: {}", err); + error!("Unhandled handler error: {}", err); } } } @@ -281,23 +257,17 @@ where // cleanup finished tasks let mut popped = false; while !self.tasks.is_empty() { - if self.tasks[0] - .flags - .contains(EntryFlags::EOF | EntryFlags::FINISHED) - { + if self.tasks[0].flags.contains(EntryFlags::EOF | EntryFlags::FINISHED) { popped = true; self.tasks.pop_front(); } else { break; } } - if need_read && popped { - return self.poll_io(); - } // check stream state if self.flags.contains(Flags::STARTED) { - match self.stream.poll_completed(false) { + match self.writer.poll_completed(false) { Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { debug!("Error sending data: {}", err); @@ -307,634 +277,37 @@ where } } - // deal with keep-alive - if self.tasks.is_empty() { - // no keep-alive situations - if self.flags.contains(Flags::ERROR) - || (!self.flags.contains(Flags::KEEPALIVE) - || !self.settings.keep_alive_enabled()) - && self.flags.contains(Flags::STARTED) - { - return Ok(Async::Ready(false)); - } - - // start keep-alive timer - let keep_alive = self.settings.keep_alive(); - if self.keepalive_timer.is_none() && keep_alive > 0 { - trace!("Start keep-alive timer"); - let mut timer = - Timeout::new(Duration::new(keep_alive, 0), Arbiter::handle()) - .unwrap(); - // register timer - let _ = timer.poll(); - self.keepalive_timer = Some(timer); - } - } Ok(Async::NotReady) } } -struct Reader { - payload: Option, -} +impl Future for Http1 +where + H: HttpHandler + 'static, +{ + type Item = (); + type Error = (); -enum Decoding { - Ready, - NotReady, -} + fn poll(&mut self) -> Poll<(), ()> { + //println!("HTTP1 POLL"); -struct PayloadInfo { - tx: PayloadType, - decoder: Decoder, -} - -#[derive(Debug)] -enum ReaderError { - Disconnect, - Payload, - PayloadDropped, - Error(ParseError), -} - -impl Reader { - pub fn new() -> Reader { - Reader { payload: None } - } - - #[inline] - fn need_read(&self) -> PayloadStatus { - if let Some(ref info) = self.payload { - info.tx.need_read() - } else { - PayloadStatus::Read + // started + if !self.flags.contains(Flags::STARTED) { + self.flags.insert(Flags::STARTED); + self.stream.set_notify(current()); } - } - - #[inline] - fn decode( - &mut self, buf: &mut BytesMut, payload: &mut PayloadInfo - ) -> Result { - while !buf.is_empty() { - match payload.decoder.decode(buf) { - Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes); - if payload.decoder.is_eof() { - payload.tx.feed_eof(); - return Ok(Decoding::Ready); - } - } - Ok(Async::Ready(None)) => { - payload.tx.feed_eof(); - return Ok(Decoding::Ready); - } - Ok(Async::NotReady) => return Ok(Decoding::NotReady), - Err(err) => { - payload.tx.set_error(err.into()); - return Err(ReaderError::Payload); - } - } - } - Ok(Decoding::NotReady) - } - - pub fn parse( - &mut self, io: &mut T, buf: &mut BytesMut, settings: &WorkerSettings - ) -> Poll - where - T: IoStream, - { - match self.need_read() { - PayloadStatus::Read => (), - PayloadStatus::Pause => return Ok(Async::NotReady), - PayloadStatus::Dropped => return Err(ReaderError::PayloadDropped), - } - - // read payload - let done = { - if let Some(ref mut payload) = self.payload { - 'buf: loop { - let not_ready = match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - payload.tx.set_error(PayloadError::Incomplete); - - // http channel should not deal with payload errors - return Err(ReaderError::Payload); - } - Ok(Async::NotReady) => true, - Err(err) => { - payload.tx.set_error(err.into()); - - // http channel should not deal with payload errors - return Err(ReaderError::Payload); - } - _ => false, - }; - loop { - match payload.decoder.decode(buf) { - Ok(Async::Ready(Some(bytes))) => { - payload.tx.feed_data(bytes); - if payload.decoder.is_eof() { - payload.tx.feed_eof(); - break 'buf true; - } - } - Ok(Async::Ready(None)) => { - payload.tx.feed_eof(); - break 'buf true; - } - Ok(Async::NotReady) => { - // if buffer is full then - // socket still can contain more data - if not_ready { - return Ok(Async::NotReady); - } - continue 'buf; - } - Err(err) => { - payload.tx.set_error(err.into()); - return Err(ReaderError::Payload); - } - } - } - } - } else { - false - } - }; - if done { - self.payload = None - } - - // if buf is empty parse_message will always return NotReady, let's avoid that - if buf.is_empty() { - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => return Err(ReaderError::Disconnect), - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(ReaderError::Error(err.into())), - } - }; loop { - match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? { - Async::Ready((msg, decoder)) => { - // process payload - if let Some(mut payload) = decoder { - match self.decode(buf, &mut payload)? { - Decoding::Ready => (), - Decoding::NotReady => self.payload = Some(payload), - } - } - return Ok(Async::Ready(msg)); - } - Async::NotReady => { - if buf.len() >= MAX_BUFFER_SIZE { - error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - return Err(ReaderError::Error(ParseError::TooLarge)); - } - match utils::read_from_io(io, buf) { - Ok(Async::Ready(0)) => { - debug!("Ignored premature client disconnection"); - return Err(ReaderError::Disconnect); - } - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => return Err(ReaderError::Error(err.into())), - } - } + // process input stream + if !self.flags.contains(Flags::DISCONNECTED) { + self.poll_stream(); } - } - } - fn parse_message( - buf: &mut BytesMut, settings: &WorkerSettings - ) -> Poll<(HttpRequest, Option), ParseError> { - // Parse http message - let mut has_upgrade = false; - let mut chunked = false; - let mut content_length = None; - - let msg = { - let bytes_ptr = buf.as_ref().as_ptr() as usize; - let mut headers: [httparse::Header; MAX_HEADERS] = - unsafe { std::mem::uninitialized() }; - - let (len, method, path, version, headers_len) = { - let b = unsafe { - let b: &[u8] = buf; - std::mem::transmute(b) - }; - let mut req = httparse::Request::new(&mut headers); - match req.parse(b)? { - httparse::Status::Complete(len) => { - let method = Method::from_bytes(req.method.unwrap().as_bytes()) - .map_err(|_| ParseError::Method)?; - let path = Url::new(Uri::try_from(req.path.unwrap())?); - let version = if req.version.unwrap() == 1 { - Version::HTTP_11 - } else { - Version::HTTP_10 - }; - (len, method, path, version, req.headers.len()) - } - httparse::Status::Partial => return Ok(Async::NotReady), - } - }; - - let slice = buf.split_to(len).freeze(); - - // convert headers - let msg = settings.get_http_message(); - { - let msg_mut = msg.get_mut(); - msg_mut.keep_alive = version != Version::HTTP_10; - - for header in headers[..headers_len].iter() { - if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) { - has_upgrade = has_upgrade || name == header::UPGRADE; - let v_start = header.value.as_ptr() as usize - bytes_ptr; - let v_end = v_start + header.value.len(); - let value = unsafe { - HeaderValue::from_shared_unchecked( - slice.slice(v_start, v_end), - ) - }; - match name { - header::CONTENT_LENGTH => { - if let Ok(s) = value.to_str() { - if let Ok(len) = s.parse::() { - content_length = Some(len) - } else { - debug!("illegal Content-Length: {:?}", len); - return Err(ParseError::Header); - } - } else { - debug!("illegal Content-Length: {:?}", len); - return Err(ParseError::Header); - } - }, - // transfer-encoding - header::TRANSFER_ENCODING => { - if let Ok(s) = value.to_str() { - chunked = s.to_lowercase().contains("chunked"); - } else { - return Err(ParseError::Header) - } - }, - // connection keep-alive state - header::CONNECTION => { - msg_mut.keep_alive = if let Ok(conn) = value.to_str() { - if version == Version::HTTP_10 - && conn.contains("keep-alive") - { - true - } else { - version == Version::HTTP_11 - && !(conn.contains("close") - || conn.contains("upgrade")) - } - } else { - false - }; - }, - _ => (), - } - - msg_mut.headers.append(name, value); - } else { - return Err(ParseError::Header); - } - } - - msg_mut.url = path; - msg_mut.method = method; - msg_mut.version = version; + match self.poll_io()? { + Async::Ready(true) => (), + Async::Ready(false) => return Ok(Async::Ready(())), + Async::NotReady => return Ok(Async::NotReady), } - msg - }; - - // https://tools.ietf.org/html/rfc7230#section-3.3.3 - let decoder = if chunked { - // Chunked encoding - Some(Decoder::chunked()) - } else if let Some(len) = content_length { - // Content-Length - Some(Decoder::length(len)) - } else if has_upgrade || msg.get_ref().method == Method::CONNECT { - // upgrade(websocket) or connect - Some(Decoder::eof()) - } else { - None - }; - - if let Some(decoder) = decoder { - let (psender, payload) = Payload::new(false); - let info = PayloadInfo { - tx: PayloadType::new(&msg.get_ref().headers, psender), - decoder, - }; - msg.get_mut().payload = Some(payload); - Ok(Async::Ready(( - HttpRequest::from_message(msg), - Some(info), - ))) - } else { - Ok(Async::Ready((HttpRequest::from_message(msg), None))) - } - } -} - -/// Check if request has chunked transfer encoding -pub fn chunked(headers: &HeaderMap) -> Result { - if let Some(encodings) = headers.get(header::TRANSFER_ENCODING) { - if let Ok(s) = encodings.to_str() { - Ok(s.to_lowercase().contains("chunked")) - } else { - Err(ParseError::Header) - } - } else { - Ok(false) - } -} - -/// Decoders to handle different Transfer-Encodings. -/// -/// If a message body does not include a Transfer-Encoding, it *should* -/// include a Content-Length header. -#[derive(Debug, Clone, PartialEq)] -pub struct Decoder { - kind: Kind, -} - -impl Decoder { - pub fn length(x: u64) -> Decoder { - Decoder { - kind: Kind::Length(x), - } - } - - pub fn chunked() -> Decoder { - Decoder { - kind: Kind::Chunked(ChunkedState::Size, 0), - } - } - - pub fn eof() -> Decoder { - Decoder { - kind: Kind::Eof(false), - } - } -} - -#[derive(Debug, Clone, PartialEq)] -enum Kind { - /// A Reader used when a Content-Length header is passed with a positive - /// integer. - Length(u64), - /// A Reader used when Transfer-Encoding is `chunked`. - Chunked(ChunkedState, u64), - /// A Reader used for responses that don't indicate a length or chunked. - /// - /// Note: This should only used for `Response`s. It is illegal for a - /// `Request` to be made with both `Content-Length` and - /// `Transfer-Encoding: chunked` missing, as explained from the spec: - /// - /// > If a Transfer-Encoding header field is present in a response and - /// > the chunked transfer coding is not the final encoding, the - /// > message body length is determined by reading the connection until - /// > it is closed by the server. If a Transfer-Encoding header field - /// > is present in a request and the chunked transfer coding is not - /// > the final encoding, the message body length cannot be determined - /// > reliably; the server MUST respond with the 400 (Bad Request) - /// > status code and then close the connection. - Eof(bool), -} - -#[derive(Debug, PartialEq, Clone)] -enum ChunkedState { - Size, - SizeLws, - Extension, - SizeLf, - Body, - BodyCr, - BodyLf, - EndCr, - EndLf, - End, -} - -impl Decoder { - pub fn is_eof(&self) -> bool { - match self.kind { - Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => { - true - } - _ => false, - } - } - - pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { - match self.kind { - Kind::Length(ref mut remaining) => { - if *remaining == 0 { - Ok(Async::Ready(None)) - } else { - if body.is_empty() { - return Ok(Async::NotReady); - } - let len = body.len() as u64; - let buf; - if *remaining > len { - buf = body.take().freeze(); - *remaining -= len; - } else { - buf = body.split_to(*remaining as usize).freeze(); - *remaining = 0; - } - trace!("Length read: {}", buf.len()); - Ok(Async::Ready(Some(buf))) - } - } - Kind::Chunked(ref mut state, ref mut size) => { - loop { - let mut buf = None; - // advances the chunked state - *state = try_ready!(state.step(body, size, &mut buf)); - if *state == ChunkedState::End { - trace!("End of chunked stream"); - return Ok(Async::Ready(None)); - } - if let Some(buf) = buf { - return Ok(Async::Ready(Some(buf))); - } - if body.is_empty() { - return Ok(Async::NotReady); - } - } - } - Kind::Eof(ref mut is_eof) => { - if *is_eof { - Ok(Async::Ready(None)) - } else if !body.is_empty() { - Ok(Async::Ready(Some(body.take().freeze()))) - } else { - Ok(Async::NotReady) - } - } - } - } -} - -macro_rules! byte ( - ($rdr:ident) => ({ - if $rdr.len() > 0 { - let b = $rdr[0]; - $rdr.split_to(1); - b - } else { - return Ok(Async::NotReady) - } - }) -); - -impl ChunkedState { - fn step( - &self, body: &mut BytesMut, size: &mut u64, buf: &mut Option - ) -> Poll { - use self::ChunkedState::*; - match *self { - Size => ChunkedState::read_size(body, size), - SizeLws => ChunkedState::read_size_lws(body), - Extension => ChunkedState::read_extension(body), - SizeLf => ChunkedState::read_size_lf(body, size), - Body => ChunkedState::read_body(body, size, buf), - BodyCr => ChunkedState::read_body_cr(body), - BodyLf => ChunkedState::read_body_lf(body), - EndCr => ChunkedState::read_end_cr(body), - EndLf => ChunkedState::read_end_lf(body), - End => Ok(Async::Ready(ChunkedState::End)), - } - } - fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { - let radix = 16; - match byte!(rdr) { - b @ b'0'...b'9' => { - *size *= radix; - *size += u64::from(b - b'0'); - } - b @ b'a'...b'f' => { - *size *= radix; - *size += u64::from(b + 10 - b'a'); - } - b @ b'A'...b'F' => { - *size *= radix; - *size += u64::from(b + 10 - b'A'); - } - b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => return Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), - _ => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size line: Invalid Size", - )); - } - } - Ok(Async::Ready(ChunkedState::Size)) - } - fn read_size_lws(rdr: &mut BytesMut) -> Poll { - trace!("read_size_lws"); - match byte!(rdr) { - // LWS can follow the chunk size, but no more digits can come - b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), - b';' => Ok(Async::Ready(ChunkedState::Extension)), - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size linear white space", - )), - } - } - fn read_extension(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), - _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions - } - } - fn read_size_lf( - rdr: &mut BytesMut, size: &mut u64 - ) -> Poll { - match byte!(rdr) { - b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), - b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk size LF", - )), - } - } - - fn read_body( - rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option - ) -> Poll { - trace!("Chunked read, remaining={:?}", rem); - - let len = rdr.len() as u64; - if len == 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - let slice; - if *rem > len { - slice = rdr.take().freeze(); - *rem -= len; - } else { - slice = rdr.split_to(*rem as usize).freeze(); - *rem = 0; - } - *buf = Some(slice); - if *rem > 0 { - Ok(Async::Ready(ChunkedState::Body)) - } else { - Ok(Async::Ready(ChunkedState::BodyCr)) - } - } - } - - fn read_body_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk body CR", - )), - } - } - fn read_body_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::Size)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk body LF", - )), - } - } - fn read_end_cr(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk end CR", - )), - } - } - fn read_end_lf(rdr: &mut BytesMut) -> Poll { - match byte!(rdr) { - b'\n' => Ok(Async::Ready(ChunkedState::End)), - _ => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid chunk end LF", - )), } } } @@ -1204,10 +577,7 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert_eq!( - req.headers().get("test").unwrap().as_bytes(), - b"value" - ); + assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); } Ok(_) | Err(_) => unreachable!("Error during parsing http request"), } @@ -1430,10 +800,7 @@ mod tests { let mut req = parse_ready!(&mut buf); assert!(!req.keep_alive()); assert!(req.upgrade()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"some raw data" - ); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"some raw data"); } #[test] @@ -1491,10 +858,7 @@ mod tests { let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(!req.payload().eof()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert!(req.payload().eof()); } @@ -1526,10 +890,7 @@ mod tests { assert!(req2.chunked().unwrap()); assert!(!req2.payload().eof()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert!(req.payload().eof()); } @@ -1577,10 +938,7 @@ mod tests { let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert!(!req.payload().eof()); buf.feed_data("\r\n"); @@ -1608,10 +966,7 @@ mod tests { let _ = req.payload_mut().poll(); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(!req.payload().eof()); - assert_eq!( - req.payload_mut().readall().unwrap().as_ref(), - b"dataline" - ); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); assert!(req.payload().eof()); } diff --git a/src/server/h1decoder.rs b/src/server/h1decoder.rs new file mode 100644 index 000000000..48ff7c42a --- /dev/null +++ b/src/server/h1decoder.rs @@ -0,0 +1,501 @@ +#![allow(dead_code, unused_variables)] + +use std::rc::Rc; +use std::sync::Arc; +use std::{io, mem}; + +use bytes::{Bytes, BytesMut}; +use futures::{Async, Poll}; +use httparse; + +use super::helpers::{SharedHttpInnerMessage, SharedMessagePool}; +use error::ParseError; +use http::{header, HeaderName, HeaderValue, HttpTryFrom, Method, Uri, Version}; +use uri::Url; + +const MAX_BUFFER_SIZE: usize = 131_072; +const MAX_HEADERS: usize = 96; + +pub(crate) struct Decoder { + decoder: Option, + pool: Arc, +} + +#[derive(Debug)] +pub(crate) enum Message { + Message { + msg: SharedHttpInnerMessage, + payload: bool, + }, + Chunk(Bytes), + Eof, + Hup, +} + +unsafe impl Send for Message {} +unsafe impl Sync for Message {} + +#[derive(Debug)] +pub(crate) enum DecoderError { + Io(io::Error), + Error(ParseError), +} + +impl From for DecoderError { + fn from(err: io::Error) -> DecoderError { + DecoderError::Io(err) + } +} + +impl Decoder { + pub fn new(pool: Arc) -> Decoder { + Decoder { + pool, + decoder: None, + } + } + + pub fn decode(&mut self, src: &mut BytesMut) -> Poll { + // read payload + if self.decoder.is_some() { + match self.decoder.as_mut().unwrap().decode(src)? { + Async::Ready(Some(bytes)) => { + return Ok(Async::Ready(Message::Chunk(bytes))) + } + Async::Ready(None) => { + self.decoder.take(); + return Ok(Async::Ready(Message::Eof)); + } + Async::NotReady => return Ok(Async::NotReady), + } + } + + match self.parse_message(src).map_err(DecoderError::Error)? { + Async::Ready((msg, decoder)) => { + if let Some(decoder) = decoder { + self.decoder = Some(decoder); + return Ok(Async::Ready(Message::Message { + msg, + payload: true, + })); + } else { + return Ok(Async::Ready(Message::Message { + msg, + payload: false, + })); + } + } + Async::NotReady => { + if src.len() >= MAX_BUFFER_SIZE { + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + return Err(DecoderError::Error(ParseError::TooLarge)); + } + return Ok(Async::NotReady); + } + } + } + + fn parse_message( + &self, buf: &mut BytesMut, + ) -> Poll<(SharedHttpInnerMessage, Option), ParseError> { + // Parse http message + let mut has_upgrade = false; + let mut chunked = false; + let mut content_length = None; + + let msg = { + let bytes_ptr = buf.as_ref().as_ptr() as usize; + let mut headers: [httparse::Header; MAX_HEADERS] = + unsafe { mem::uninitialized() }; + + let (len, method, path, version, headers_len) = { + let b = unsafe { + let b: &[u8] = buf; + mem::transmute(b) + }; + let mut req = httparse::Request::new(&mut headers); + match req.parse(b)? { + httparse::Status::Complete(len) => { + let method = Method::from_bytes(req.method.unwrap().as_bytes()) + .map_err(|_| ParseError::Method)?; + let path = Url::new(Uri::try_from(req.path.unwrap())?); + let version = if req.version.unwrap() == 1 { + Version::HTTP_11 + } else { + Version::HTTP_10 + }; + (len, method, path, version, req.headers.len()) + } + httparse::Status::Partial => return Ok(Async::NotReady), + } + }; + + let slice = buf.split_to(len).freeze(); + + // convert headers + let msg = + SharedHttpInnerMessage::new(self.pool.get(), Arc::clone(&self.pool)); + { + let msg_mut = msg.get_mut(); + let msg_mut = msg.get_mut(); + msg_mut.keep_alive = version != Version::HTTP_10; + + for header in headers[..headers_len].iter() { + if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) { + has_upgrade = has_upgrade || name == header::UPGRADE; + + let v_start = header.value.as_ptr() as usize - bytes_ptr; + let v_end = v_start + header.value.len(); + let value = unsafe { + HeaderValue::from_shared_unchecked( + slice.slice(v_start, v_end), + ) + }; + match name { + header::CONTENT_LENGTH => { + if let Ok(s) = value.to_str() { + if let Ok(len) = s.parse::() { + content_length = Some(len) + } else { + debug!("illegal Content-Length: {:?}", len); + return Err(ParseError::Header); + } + } else { + debug!("illegal Content-Length: {:?}", len); + return Err(ParseError::Header); + } + } + // transfer-encoding + header::TRANSFER_ENCODING => { + if let Ok(s) = value.to_str() { + chunked = s.to_lowercase().contains("chunked"); + } else { + return Err(ParseError::Header); + } + } + // connection keep-alive state + header::CONNECTION => { + msg_mut.keep_alive = if let Ok(conn) = value.to_str() { + if version == Version::HTTP_10 + && conn.contains("keep-alive") + { + true + } else { + version == Version::HTTP_11 + && !(conn.contains("close") + || conn.contains("upgrade")) + } + } else { + false + }; + } + _ => (), + } + + msg_mut.headers.append(name, value); + } else { + return Err(ParseError::Header); + } + } + + msg_mut.url = path; + msg_mut.method = method; + msg_mut.version = version; + } + msg + }; + + // https://tools.ietf.org/html/rfc7230#section-3.3.3 + let decoder = if chunked { + // Chunked encoding + Some(EncodingDecoder::chunked()) + } else if let Some(len) = content_length { + // Content-Length + Some(EncodingDecoder::length(len)) + } else if has_upgrade || msg.get_ref().method == Method::CONNECT { + // upgrade(websocket) or connect + Some(EncodingDecoder::eof()) + } else { + None + }; + + Ok(Async::Ready((msg, decoder))) + } +} + +/// Decoders to handle different Transfer-Encodings. +/// +/// If a message body does not include a Transfer-Encoding, it *should* +/// include a Content-Length header. +#[derive(Debug, Clone, PartialEq)] +pub struct EncodingDecoder { + kind: Kind, +} + +impl EncodingDecoder { + pub fn length(x: u64) -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Length(x), + } + } + + pub fn chunked() -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Chunked(ChunkedState::Size, 0), + } + } + + pub fn eof() -> EncodingDecoder { + EncodingDecoder { + kind: Kind::Eof(false), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +enum Kind { + /// A Reader used when a Content-Length header is passed with a positive + /// integer. + Length(u64), + /// A Reader used when Transfer-Encoding is `chunked`. + Chunked(ChunkedState, u64), + /// A Reader used for responses that don't indicate a length or chunked. + /// + /// Note: This should only used for `Response`s. It is illegal for a + /// `Request` to be made with both `Content-Length` and + /// `Transfer-Encoding: chunked` missing, as explained from the spec: + /// + /// > If a Transfer-Encoding header field is present in a response and + /// > the chunked transfer coding is not the final encoding, the + /// > message body length is determined by reading the connection until + /// > it is closed by the server. If a Transfer-Encoding header field + /// > is present in a request and the chunked transfer coding is not + /// > the final encoding, the message body length cannot be determined + /// > reliably; the server MUST respond with the 400 (Bad Request) + /// > status code and then close the connection. + Eof(bool), +} + +#[derive(Debug, PartialEq, Clone)] +enum ChunkedState { + Size, + SizeLws, + Extension, + SizeLf, + Body, + BodyCr, + BodyLf, + EndCr, + EndLf, + End, +} + +impl EncodingDecoder { + pub fn is_eof(&self) -> bool { + match self.kind { + Kind::Length(0) | Kind::Chunked(ChunkedState::End, _) | Kind::Eof(true) => { + true + } + _ => false, + } + } + + pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { + match self.kind { + Kind::Length(ref mut remaining) => { + if *remaining == 0 { + Ok(Async::Ready(None)) + } else { + if body.is_empty() { + return Ok(Async::NotReady); + } + let len = body.len() as u64; + let buf; + if *remaining > len { + buf = body.take().freeze(); + *remaining -= len; + } else { + buf = body.split_to(*remaining as usize).freeze(); + *remaining = 0; + } + trace!("Length read: {}", buf.len()); + Ok(Async::Ready(Some(buf))) + } + } + Kind::Chunked(ref mut state, ref mut size) => { + loop { + let mut buf = None; + // advances the chunked state + *state = try_ready!(state.step(body, size, &mut buf)); + if *state == ChunkedState::End { + trace!("End of chunked stream"); + return Ok(Async::Ready(None)); + } + if let Some(buf) = buf { + return Ok(Async::Ready(Some(buf))); + } + if body.is_empty() { + return Ok(Async::NotReady); + } + } + } + Kind::Eof(ref mut is_eof) => { + if *is_eof { + Ok(Async::Ready(None)) + } else if !body.is_empty() { + Ok(Async::Ready(Some(body.take().freeze()))) + } else { + Ok(Async::NotReady) + } + } + } + } +} + +macro_rules! byte ( + ($rdr:ident) => ({ + if $rdr.len() > 0 { + let b = $rdr[0]; + $rdr.split_to(1); + b + } else { + return Ok(Async::NotReady) + } + }) +); + +impl ChunkedState { + fn step( + &self, body: &mut BytesMut, size: &mut u64, buf: &mut Option, + ) -> Poll { + use self::ChunkedState::*; + match *self { + Size => ChunkedState::read_size(body, size), + SizeLws => ChunkedState::read_size_lws(body), + Extension => ChunkedState::read_extension(body), + SizeLf => ChunkedState::read_size_lf(body, size), + Body => ChunkedState::read_body(body, size, buf), + BodyCr => ChunkedState::read_body_cr(body), + BodyLf => ChunkedState::read_body_lf(body), + EndCr => ChunkedState::read_end_cr(body), + EndLf => ChunkedState::read_end_lf(body), + End => Ok(Async::Ready(ChunkedState::End)), + } + } + fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll { + let radix = 16; + match byte!(rdr) { + b @ b'0'...b'9' => { + *size *= radix; + *size += u64::from(b - b'0'); + } + b @ b'a'...b'f' => { + *size *= radix; + *size += u64::from(b + 10 - b'a'); + } + b @ b'A'...b'F' => { + *size *= radix; + *size += u64::from(b + 10 - b'A'); + } + b'\t' | b' ' => return Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => return Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => return Ok(Async::Ready(ChunkedState::SizeLf)), + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size line: Invalid Size", + )); + } + } + Ok(Async::Ready(ChunkedState::Size)) + } + fn read_size_lws(rdr: &mut BytesMut) -> Poll { + trace!("read_size_lws"); + match byte!(rdr) { + // LWS can follow the chunk size, but no more digits can come + b'\t' | b' ' => Ok(Async::Ready(ChunkedState::SizeLws)), + b';' => Ok(Async::Ready(ChunkedState::Extension)), + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid chunk size linear white space", + )), + } + } + fn read_extension(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), + _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions + } + } + fn read_size_lf( + rdr: &mut BytesMut, size: &mut u64, + ) -> Poll { + match byte!(rdr) { + b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), + b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk size LF")) + } + } + } + + fn read_body( + rdr: &mut BytesMut, rem: &mut u64, buf: &mut Option, + ) -> Poll { + trace!("Chunked read, remaining={:?}", rem); + + let len = rdr.len() as u64; + if len == 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + let slice; + if *rem > len { + slice = rdr.take().freeze(); + *rem -= len; + } else { + slice = rdr.split_to(*rem as usize).freeze(); + *rem = 0; + } + *buf = Some(slice); + if *rem > 0 { + Ok(Async::Ready(ChunkedState::Body)) + } else { + Ok(Async::Ready(ChunkedState::BodyCr)) + } + } + } + + fn read_body_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::BodyLf)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body CR")) + } + } + } + fn read_body_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::Size)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk body LF")) + } + } + } + fn read_end_cr(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\r' => Ok(Async::Ready(ChunkedState::EndLf)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end CR")) + } + } + } + fn read_end_lf(rdr: &mut BytesMut) -> Poll { + match byte!(rdr) { + b'\n' => Ok(Async::Ready(ChunkedState::End)), + _ => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid chunk end LF")) + } + } + } +} diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 3d94d44cf..598c842a6 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -1,4 +1,5 @@ #![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))] +#![allow(dead_code, unused_mut, unused_variables)] use bytes::BufMut; use futures::{Async, Poll}; @@ -13,10 +14,12 @@ use super::shared::SharedBytes; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use body::{Binary, Body}; use header::ContentEncoding; +use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE}; +use http::{Method, Version}; use httprequest::HttpInnerMessage; use httpresponse::HttpResponse; -use http::{Method, Version}; -use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE}; +use io::{IoCommand, IoToken}; +use server::worker::IoWriter; const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific @@ -29,9 +32,10 @@ bitflags! { } } -pub(crate) struct H1Writer { +pub(crate) struct H1Writer { flags: Flags, - stream: T, + token: IoToken, + stream: IoWriter, encoder: ContentEncoder, written: u64, headers_size: u32, @@ -40,23 +44,25 @@ pub(crate) struct H1Writer { settings: Rc>, } -impl H1Writer { +impl H1Writer { pub fn new( - stream: T, buf: SharedBytes, settings: Rc> - ) -> H1Writer { + token: IoToken, stream: IoWriter, buf: SharedBytes, + settings: Rc>, + ) -> H1Writer { H1Writer { + token, + stream, + settings, flags: Flags::empty(), encoder: ContentEncoder::empty(buf.clone()), written: 0, headers_size: 0, buffer: buf, buffer_capacity: 0, - stream, - settings, } } - pub fn get_mut(&mut self) -> &mut T { + pub fn get_mut(&mut self) -> &mut IoWriter { &mut self.stream } @@ -73,28 +79,19 @@ impl H1Writer { self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE) } - fn write_data(&mut self, data: &[u8]) -> io::Result { - let mut written = 0; - while written < data.len() { - match self.stream.write(&data[written..]) { - Ok(0) => { - self.disconnected(); - return Err(io::Error::new(io::ErrorKind::WriteZero, "")); - } - Ok(n) => { - written += n; - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - return Ok(written) - } - Err(err) => return Err(err), - } - } - Ok(written) + pub fn done(&self, graceful: bool) { + self.stream.send(IoCommand::Done { + graceful, + token: self.token, + }); + } + + pub fn resume(&self) { + self.stream.send(IoCommand::Resume(self.token)); } } -impl Writer for H1Writer { +impl Writer for H1Writer { #[inline] fn written(&self) -> u64 { self.written @@ -227,16 +224,7 @@ impl Writer for H1Writer { if self.flags.contains(Flags::STARTED) { // shortcut for upgraded connection if self.flags.contains(Flags::UPGRADE) { - if self.buffer.is_empty() { - let pl: &[u8] = payload.as_ref(); - let n = self.write_data(pl)?; - if n < pl.len() { - self.buffer.extend_from_slice(&pl[n..]); - return Ok(WriterState::Done); - } - } else { - self.buffer.extend(payload); - } + self.buffer.extend(payload); } else { // TODO: add warning, write after EOF self.encoder.write(payload)?; @@ -272,6 +260,12 @@ impl Writer for H1Writer { #[inline] fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error> { if !self.buffer.is_empty() { + self.stream.send(IoCommand::Bytes( + self.token, + self.buffer.take().freeze(), + )); + } + /* let buf: &[u8] = unsafe { mem::transmute(self.buffer.as_ref()) }; let written = self.write_data(buf)?; let _ = self.buffer.split_to(written); @@ -281,8 +275,7 @@ impl Writer for H1Writer { } if shutdown { self.stream.shutdown() - } else { - Ok(Async::Ready(())) - } + } else {*/ + Ok(Async::Ready(())) } } diff --git a/src/server/helpers.rs b/src/server/helpers.rs index bb8730ec6..0bef60f00 100644 --- a/src/server/helpers.rs +++ b/src/server/helpers.rs @@ -1,24 +1,55 @@ use bytes::{BufMut, BytesMut}; -use http::Version; -use std::cell::RefCell; -use std::collections::VecDeque; use std::rc::Rc; -use std::{mem, ptr, slice}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::{fmt, mem, ptr, slice}; +use http::Version; use httprequest::HttpInnerMessage; +const SIZE: usize = 128; + /// Internal use only! unsafe -pub(crate) struct SharedMessagePool(RefCell>>); +pub(crate) struct SharedMessagePool { + head: AtomicUsize, + tail: AtomicUsize, + data: Option<[Rc; 128]>, +} impl SharedMessagePool { pub fn new() -> SharedMessagePool { - SharedMessagePool(RefCell::new(VecDeque::with_capacity(128))) + SharedMessagePool { + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), + data: Some(unsafe { mem::uninitialized() }), + } + } + + #[inline] + pub fn len(&self) -> usize { + let tail = self.tail.load(Ordering::Relaxed); + let head = self.head.load(Ordering::Relaxed); + + if tail > head { + tail - head + } else if tail < head { + tail + SIZE - head + } else { + 0 + } + } + + fn as_mut_ptr(&self) -> *mut Rc { + &self.data as *const _ as *mut _ } #[inline] pub fn get(&self) -> Rc { - if let Some(msg) = self.0.borrow_mut().pop_front() { - msg + if self.len() > 0 { + let head = self.head.load(Ordering::Relaxed); + self.head.store((head + 1) % SIZE, Ordering::Relaxed); + + unsafe { ptr::read(self.as_mut_ptr().offset(head as isize)) } } else { Rc::new(HttpInnerMessage::default()) } @@ -26,24 +57,32 @@ impl SharedMessagePool { #[inline] pub fn release(&self, mut msg: Rc) { - let v = &mut self.0.borrow_mut(); - if v.len() < 128 { - Rc::get_mut(&mut msg).unwrap().reset(); - v.push_front(msg); + if self.len() < SIZE - 1 { + let tail = self.tail.load(Ordering::Relaxed); + unsafe { + ptr::write(self.as_mut_ptr().offset(tail as isize), msg); + } + self.tail.store((tail + 1) % SIZE, Ordering::Relaxed); } } } pub(crate) struct SharedHttpInnerMessage( Option>, - Option>, + Option>, ); impl Drop for SharedHttpInnerMessage { fn drop(&mut self) { if let Some(ref pool) = self.1 { - if let Some(msg) = self.0.take() { - if Rc::strong_count(&msg) == 1 { + if let Some(mut msg) = self.0.take() { + let release = if let Some(ref mut msg) = Rc::get_mut(&mut msg) { + msg.reset(); + true + } else { + false + }; + if release { pool.release(msg); } } @@ -63,13 +102,19 @@ impl Default for SharedHttpInnerMessage { } } +impl fmt::Debug for SharedHttpInnerMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + impl SharedHttpInnerMessage { pub fn from_message(msg: HttpInnerMessage) -> SharedHttpInnerMessage { SharedHttpInnerMessage(Some(Rc::new(msg)), None) } pub fn new( - msg: Rc, pool: Rc + msg: Rc, pool: Arc, ) -> SharedHttpInnerMessage { SharedHttpInnerMessage(Some(msg), Some(pool)) } @@ -96,9 +141,8 @@ const DEC_DIGITS_LUT: &[u8] = b"0001020304050607080910111213141516171819\ 8081828384858687888990919293949596979899"; pub(crate) fn write_status_line(version: Version, mut n: u16, bytes: &mut BytesMut) { - let mut buf: [u8; 13] = [ - b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' ' - ]; + let mut buf: [u8; 13] = + [b'H', b'T', b'T', b'P', b'/', b'1', b'.', b'1', b' ', b' ', b' ', b' ', b' ']; match version { Version::HTTP_2 => buf[5] = b'2', Version::HTTP_10 => buf[7] = b'0', @@ -251,63 +295,33 @@ mod tests { let mut bytes = BytesMut::new(); bytes.reserve(50); write_content_length(0, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 0\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 0\r\n"[..]); bytes.reserve(50); write_content_length(9, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 9\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 9\r\n"[..]); bytes.reserve(50); write_content_length(10, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 10\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 10\r\n"[..]); bytes.reserve(50); write_content_length(99, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 99\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 99\r\n"[..]); bytes.reserve(50); write_content_length(100, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 100\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 100\r\n"[..]); bytes.reserve(50); write_content_length(101, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 101\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 101\r\n"[..]); bytes.reserve(50); write_content_length(998, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 998\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 998\r\n"[..]); bytes.reserve(50); write_content_length(1000, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 1000\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1000\r\n"[..]); bytes.reserve(50); write_content_length(1001, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 1001\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 1001\r\n"[..]); bytes.reserve(50); write_content_length(5909, &mut bytes); - assert_eq!( - bytes.take().freeze(), - b"\r\ncontent-length: 5909\r\n"[..] - ); + assert_eq!(bytes.take().freeze(), b"\r\ncontent-length: 5909\r\n"[..]); } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 85faf77b3..edc29c441 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -10,9 +10,10 @@ use tokio_io::{AsyncRead, AsyncWrite}; mod channel; pub(crate) mod encoding; pub(crate) mod h1; +pub(crate) mod h1decoder; mod h1writer; -mod h2; -mod h2writer; +//mod h2; +//mod h2writer; pub(crate) mod helpers; mod settings; pub(crate) mod shared; @@ -28,6 +29,8 @@ use error::Error; use header::ContentEncoding; use httprequest::{HttpInnerMessage, HttpRequest}; use httpresponse::HttpResponse; +use io::IoCommand; +use server::worker::IoWriter; /// max buffer size 64k pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; diff --git a/src/server/settings.rs b/src/server/settings.rs index 1b57db1a2..fde5b3ed2 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -1,3 +1,5 @@ +#![allow(dead_code, unused_mut, unused_variables)] + use bytes::BytesMut; use futures_cpupool::{Builder, CpuPool}; use http::StatusCode; @@ -8,10 +10,10 @@ use std::sync::Arc; use std::{fmt, mem, net}; use time; -use super::KeepAlive; use super::channel::Node; use super::helpers; use super::shared::{SharedBytes, SharedBytesPool}; +use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; @@ -72,7 +74,7 @@ impl Default for ServerSettings { impl ServerSettings { /// Crate server settings instance pub(crate) fn new( - addr: Option, host: &Option, secure: bool + addr: Option, host: &Option, secure: bool, ) -> ServerSettings { let host = if let Some(ref host) = *host { host.clone() @@ -119,7 +121,7 @@ impl ServerSettings { #[inline] pub(crate) fn get_response_builder( - &self, status: StatusCode + &self, status: StatusCode, ) -> HttpResponseBuilder { HttpResponsePool::get_builder(&self.responses, status) } @@ -133,7 +135,7 @@ pub(crate) struct WorkerSettings { keep_alive: u64, ka_enabled: bool, bytes: Rc, - messages: Rc, + messages: Arc, channels: Cell, node: Box>, date: UnsafeCell, @@ -152,7 +154,7 @@ impl WorkerSettings { ka_enabled, h: RefCell::new(h), bytes: Rc::new(SharedBytesPool::new()), - messages: Rc::new(helpers::SharedMessagePool::new()), + messages: Arc::new(helpers::SharedMessagePool::new()), channels: Cell::new(0), node: Box::new(Node::head()), date: UnsafeCell::new(Date::new()), @@ -186,7 +188,7 @@ impl WorkerSettings { pub fn get_http_message(&self) -> helpers::SharedHttpInnerMessage { helpers::SharedHttpInnerMessage::new( self.messages.get(), - Rc::clone(&self.messages), + Arc::clone(&self.messages), ) } @@ -255,10 +257,7 @@ mod tests { #[test] fn test_date_len() { - assert_eq!( - DATE_VALUE_LENGTH, - "Sun, 06 Nov 1994 08:49:37 GMT".len() - ); + assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len()); } #[test] diff --git a/src/server/srv.rs b/src/server/srv.rs index 314dc8369..58fc276ba 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -18,7 +18,8 @@ use native_tls::TlsAcceptor; #[cfg(feature = "alpn")] use openssl::ssl::{AlpnError, SslAcceptorBuilder}; -use super::channel::{HttpChannel, WrapperStream}; +//use super::channel::{HttpChannel, WrapperStream}; +use super::channel::WrapperStream; use super::settings::{ServerSettings, WorkerSettings}; use super::worker::{Conn, StopWorker, StreamHandlerType, Worker}; use super::{IntoHttpHandler, IoStream, KeepAlive}; @@ -230,7 +231,7 @@ where } fn start_workers( - &mut self, settings: &ServerSettings, handler: &StreamHandlerType + &mut self, settings: &ServerSettings, handler: &StreamHandlerType, ) -> Vec<(usize, mpsc::UnboundedSender>)> { // start workers let mut workers = Vec::new(); @@ -427,7 +428,7 @@ impl HttpServer { /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn start_ssl( - mut self, mut builder: SslAcceptorBuilder + mut self, mut builder: SslAcceptorBuilder, ) -> io::Result> { if self.sockets.is_empty() { Err(io::Error::new( @@ -643,12 +644,12 @@ where type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { - Arbiter::handle().spawn(HttpChannel::new( - Rc::clone(self.h.as_ref().unwrap()), - msg.io, - msg.peer, - msg.http2, - )); + //Arbiter::handle().spawn(HttpChannel::new( + // Rc::clone(self.h.as_ref().unwrap()), + // msg.io, + // msg.peer, + // msg.http2, + //)); } } @@ -777,7 +778,7 @@ fn start_accept_thread( ®, CMD, mio::Ready::readable(), - mio::PollOpt::edge(), + mio::PollOpt::level(), ) { panic!("Can not register Registration: {}", err); } @@ -789,133 +790,115 @@ fn start_accept_thread( let sleep = Duration::from_millis(100); let mut next = 0; - loop { - if let Err(err) = poll.poll(&mut events, None) { - panic!("Poll error: {}", err); - } + #[cfg_attr(rustfmt, rustfmt_skip)] + +loop { + if let Err(err) = poll.poll(&mut events, None) { + panic!("Poll error: {}", err); + } - for event in events.iter() { - match event.token() { - SRV => if let Some(ref server) = server { - loop { - match server.accept_std() { - Ok((sock, addr)) => { - let mut msg = Conn { - io: sock, - peer: Some(addr), - http2: false, - }; - while !workers.is_empty() { - match workers[next].1.unbounded_send(msg) { - Ok(_) => (), - Err(err) => { - let _ = srv.unbounded_send( - ServerCommand::WorkerDied( - workers[next].0, - info.clone(), - ), - ); - msg = err.into_inner(); - workers.swap_remove(next); - if workers.is_empty() { - error!("No workers"); - thread::sleep(sleep); - break; - } else if workers.len() <= next { - next = 0; - } - continue; - } - } - next = (next + 1) % workers.len(); + for event in events.iter() { + match event.token() { + SRV => if let Some(ref server) = server { + loop { + match server.accept_std() { + Ok((sock, addr)) => { + let mut msg = Conn { + io: sock, + peer: Some(addr), + http2: false, + }; + while !workers.is_empty() { + match workers[next].1.unbounded_send(msg) { + Ok(_) => (), + Err(err) => { + let _ = srv.unbounded_send( + ServerCommand::WorkerDied( + workers[next].0, info.clone())); + msg = err.into_inner(); + workers.swap_remove(next); + if workers.is_empty() { + error!("No workers"); + thread::sleep(sleep); break; + } else if workers.len() <= next { + next = 0; } - } - Err(ref e) - if e.kind() == io::ErrorKind::WouldBlock => - { - break - } - Err(ref e) if connection_error(e) => continue, - Err(e) => { - error!("Error accepting connection: {}", e); - // sleep after error - thread::sleep(sleep); - break; + continue; } } + next = (next + 1) % workers.len(); + break; } - }, - CMD => match rx.try_recv() { - Ok(cmd) => match cmd { - Command::Pause => if let Some(server) = server.take() { - if let Err(err) = poll.deregister(&server) { - error!( - "Can not deregister server socket {}", - err - ); - } else { - info!( - "Paused accepting connections on {}", - addr - ); - } - }, - Command::Resume => { - let lst = create_tcp_listener(addr, backlog) - .expect("Can not create net::TcpListener"); - - server = Some( - mio::net::TcpListener::from_std(lst).expect( - "Can not create mio::net::TcpListener", - ), - ); - - if let Some(ref server) = server { - if let Err(err) = poll.register( - server, - SRV, - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", - addr); - } - } - } - Command::Stop => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - Command::Worker(idx, addr) => { - workers.push((idx, addr)); - } - }, - Err(err) => match err { - sync_mpsc::TryRecvError::Empty => (), - sync_mpsc::TryRecvError::Disconnected => { - if let Some(server) = server.take() { - let _ = poll.deregister(&server); - } - return; - } - }, - }, - _ => unreachable!(), + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { break } + Err(ref e) if connection_error(e) => continue, + Err(e) => { + error!("Error accepting connection: {}", e); + // sleep after error + thread::sleep(sleep); + break; + } } } - } + }, + CMD => match rx.try_recv() { + Ok(cmd) => match cmd { + Command::Pause => if let Some(server) = server.take() { + if let Err(err) = poll.deregister(&server) { + error!("Can not deregister server socket {}", err); + } else { + info!("Paused accepting connections on {}", addr); + } + }, + Command::Resume => { + let lst = create_tcp_listener(addr, backlog) + .expect("Can not create net::TcpListener"); + + server = Some(mio::net::TcpListener::from_std(lst).expect( + "Can not create mio::net::TcpListener")); + + if let Some(ref server) = server { + if let Err(err) = poll.register( + server, SRV, mio::Ready::readable(), mio::PollOpt::edge()) + { + error!("Can not resume socket accept process: {}", err); + } else { + info!("Accepting connections on {} has been resumed", addr); + } + } + } + Command::Stop => { + if let Some(server) = server.take() { + let _ = poll.deregister(&server); + } + return; + } + Command::Worker(idx, addr) => { + workers.push((idx, addr)); + } + }, + Err(err) => match err { + sync_mpsc::TryRecvError::Empty => (), + sync_mpsc::TryRecvError::Disconnected => { + if let Some(server) = server.take() { + let _ = poll.deregister(&server); + } + return; + } + }, + }, + _ => unreachable!(), + } + } +} }); (readiness, tx) } fn create_tcp_listener( - addr: net::SocketAddr, backlog: i32 + addr: net::SocketAddr, backlog: i32, ) -> io::Result { let builder = match addr { net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, diff --git a/src/server/worker.rs b/src/server/worker.rs index a6ec0711d..3a7f1056c 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -1,8 +1,10 @@ -use futures::Future; -use futures::unsync::oneshot; -use net2::TcpStreamExt; use std::rc::Rc; -use std::{net, time}; +use std::{marker, net, time}; + +use futures::task::current as current_task; +use futures::unsync::oneshot; +use futures::{Async, Future, Poll}; +use net2::TcpStreamExt; use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; @@ -22,10 +24,14 @@ use tokio_openssl::SslAcceptorExt; use actix::msgs::StopArbiter; use actix::*; -use server::channel::HttpChannel; +//use server::channel::HttpChannel; +use server::h1; +use server::h1decoder::Message as H1Message; use server::settings::WorkerSettings; use server::{HttpHandler, KeepAlive}; +use io::{IoChannel, IoCommand, TaskCommand}; + #[derive(Message)] pub(crate) struct Conn { pub io: T, @@ -55,21 +61,26 @@ where hnd: Handle, handler: StreamHandlerType, tcp_ka: Option, + io: IoWriter, } impl Worker { pub(crate) fn new( - h: Vec, handler: StreamHandlerType, keep_alive: KeepAlive + h: Vec, handler: StreamHandlerType, keep_alive: KeepAlive, ) -> Worker { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) } else { None }; + let io = IoWriter { + inner: Rc::new(IoChannel::new()), + }; Worker { settings: Rc::new(WorkerSettings::new(h, keep_alive)), hnd: Arbiter::handle().clone(), + io, handler, tcp_ka, } @@ -83,7 +94,7 @@ impl Worker { } fn shutdown_timeout( - &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration + &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { @@ -111,6 +122,8 @@ where fn started(&mut self, ctx: &mut Self::Context) { self.update_time(ctx); + self.io.inner.set_notify(current_task()); + ctx.spawn(IoQueue(marker::PhantomData)); } } @@ -124,8 +137,10 @@ where if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() { error!("Can not set socket keep-alive option"); } - self.handler - .handle(Rc::clone(&self.settings), &self.hnd, msg); + //println!("HANDLE: {:?}", msg.io); + self.io.inner.add_source(msg.io, msg.peer, msg.http2); + //self.handler + //.handle(Rc::clone(&self.settings), &self.hnd, msg); } } @@ -163,9 +178,10 @@ pub(crate) enum StreamHandlerType { Alpn(SslAcceptor), } +/* impl StreamHandlerType { fn handle( - &mut self, h: Rc>, hnd: &Handle, msg: Conn + &mut self, h: Rc>, hnd: &Handle, msg: Conn, ) { match *self { StreamHandlerType::Normal => { @@ -185,12 +201,8 @@ impl StreamHandlerType { hnd.spawn( TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { match res { - Ok(io) => Arbiter::handle().spawn(HttpChannel::new( - h, - io, - peer, - http2, - )), + Ok(io) => Arbiter::handle() + .spawn(HttpChannel::new(h, io, peer, http2)), Err(err) => { trace!("Error during handling tls connection: {}", err) } @@ -217,12 +229,8 @@ impl StreamHandlerType { } else { false }; - Arbiter::handle().spawn(HttpChannel::new( - h, - io, - peer, - http2, - )); + Arbiter::handle() + .spawn(HttpChannel::new(h, io, peer, http2)); } Err(err) => { trace!("Error during handling tls connection: {}", err) @@ -234,4 +242,43 @@ impl StreamHandlerType { } } } +}*/ + +struct IoQueue(marker::PhantomData); + +impl fut::ActorFuture for IoQueue { + type Item = (); + type Error = (); + type Actor = Worker; + + fn poll(&mut self, act: &mut Worker, _: &mut Context>) -> Poll<(), ()> { + act.io.inner.as_ref().start(); + + loop { + match act.io.inner.try_recv() { + Ok(TaskCommand::Stream(stream)) => { + act.hnd.spawn(h1::Http1::new( + Rc::clone(&act.settings), + stream, + act.io.clone(), + )); + } + Err(_) => break, + } + } + act.io.inner.as_ref().end(); + + Ok(Async::NotReady) + } +} + +#[derive(Clone)] +pub(crate) struct IoWriter { + pub inner: Rc, +} + +impl IoWriter { + pub fn send(&self, msg: IoCommand) { + self.inner.send(msg) + } } diff --git a/tests/test_client.rs b/tests/test_client.rs index b9154cc45..3c4c85861 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -9,8 +9,8 @@ use std::io::Read; use bytes::Bytes; use flate2::read::GzDecoder; -use futures::Future; use futures::stream::once; +use futures::Future; use rand::Rng; use actix_web::*; diff --git a/tests/test_server.rs b/tests/test_server.rs index cfbff6d87..19b6c9195 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -14,9 +14,9 @@ extern crate brotli2; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::{Bytes, BytesMut}; -use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder}; +use flate2::Compression; use futures::stream::once; use futures::{future, Future, Stream}; use h2::client as h2client; @@ -62,11 +62,9 @@ fn test_start() { thread::spawn(move || { let sys = System::new("test"); let srv = server::new(|| { - vec![ - App::new().resource("/", |r| { - r.method(http::Method::GET).f(|_| HttpResponse::Ok()) - }), - ] + vec![App::new().resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + })] }); let srv = srv.bind("127.0.0.1:0").unwrap(); @@ -113,11 +111,9 @@ fn test_shutdown() { thread::spawn(move || { let sys = System::new("test"); let srv = server::new(|| { - vec![ - App::new().resource("/", |r| { - r.method(http::Method::GET).f(|_| HttpResponse::Ok()) - }), - ] + vec![App::new().resource("/", |r| { + r.method(http::Method::GET).f(|_| HttpResponse::Ok()) + })] }); let srv = srv.bind("127.0.0.1:0").unwrap(); @@ -844,7 +840,7 @@ impl middleware::Middleware for MiddlewareTest { } fn response( - &self, _: &mut HttpRequest, resp: HttpResponse + &self, _: &mut HttpRequest, resp: HttpResponse, ) -> Result { self.response.store( self.response.load(Ordering::Relaxed) + 1, diff --git a/tools/wsload/src/wsclient.rs b/tools/wsload/src/wsclient.rs index 6c431f2d6..d8d7b660e 100644 --- a/tools/wsload/src/wsclient.rs +++ b/tools/wsload/src/wsclient.rs @@ -14,8 +14,8 @@ extern crate url; use futures::Future; use rand::{thread_rng, Rng}; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use actix::prelude::*;