diff --git a/Cargo.toml b/Cargo.toml index 205e178b..0e95c327 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,8 +60,8 @@ flate2-rust = ["flate2/rust_backend"] [dependencies] actix = "0.7.0" -actix-net = { git="https://github.com/actix/actix-net.git" } -#actix-net = { path = "../actix-net" } +#actix-net = { git="https://github.com/actix/actix-net.git" } +actix-net = { path = "../actix-net" } base64 = "0.9" bitflags = "1.0" diff --git a/src/client/connector.rs b/src/client/connector.rs index 8d71913f..6e82e3fd 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -51,7 +51,7 @@ type SslConnector = Arc<ClientConfig>; feature = "ssl", feature = "tls", feature = "rust-tls", -),))] +)))] type SslConnector = (); use server::IoStream; @@ -290,7 +290,7 @@ impl Default for ClientConnector { feature = "ssl", feature = "tls", feature = "rust-tls", - ),))] + )))] { () } diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs index d7847416..caad0e2e 100644 --- a/src/server/acceptor.rs +++ b/src/server/acceptor.rs @@ -1,3 +1,4 @@ +use std::net; use std::time::Duration; use actix_net::server::ServerMessage; @@ -8,6 +9,7 @@ use tokio_reactor::Handle; use tokio_tcp::TcpStream; use tokio_timer::{sleep, Delay}; +use super::error::AcceptorError; use super::handler::HttpHandler; use super::settings::WorkerSettings; use super::IoStream; @@ -15,12 +17,7 @@ use super::IoStream; /// This trait indicates types that can create acceptor service for http server. pub trait AcceptorServiceFactory: Send + Clone + 'static { type Io: IoStream + Send; - type NewService: NewService< - Request = TcpStream, - Response = Self::Io, - Error = (), - InitError = (), - >; + type NewService: NewService<Request = TcpStream, Response = Self::Io>; fn create(&self) -> Self::NewService; } @@ -29,7 +26,7 @@ impl<F, T> AcceptorServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T::Response: IoStream + Send, - T: NewService<Request = TcpStream, Error = (), InitError = ()>, + T: NewService<Request = TcpStream>, { type Io = T::Response; type NewService = T; @@ -80,144 +77,91 @@ impl Service for DefaultAcceptor { } } -pub(crate) struct TcpAcceptor<T, H: HttpHandler> { +pub(crate) struct TcpAcceptor<T> { inner: T, - settings: WorkerSettings<H>, } -impl<T, H> TcpAcceptor<T, H> +impl<T, E> TcpAcceptor<T> where - H: HttpHandler, - T: NewService<Request = TcpStream>, + T: NewService<Request = TcpStream, Error = AcceptorError<E>>, { - pub(crate) fn new(settings: WorkerSettings<H>, inner: T) -> Self { - TcpAcceptor { inner, settings } + pub(crate) fn new(inner: T) -> Self { + TcpAcceptor { inner } } } -impl<T, H> NewService for TcpAcceptor<T, H> +impl<T, E> NewService for TcpAcceptor<T> where - H: HttpHandler, - T: NewService<Request = TcpStream>, + T: NewService<Request = TcpStream, Error = AcceptorError<E>>, { - type Request = ServerMessage; - type Response = (); - type Error = (); - type InitError = (); - type Service = TcpAcceptorService<T::Service, H>; - type Future = TcpAcceptorResponse<T, H>; + type Request = net::TcpStream; + type Response = T::Response; + type Error = AcceptorError<E>; + type InitError = T::InitError; + type Service = TcpAcceptorService<T::Service>; + type Future = TcpAcceptorResponse<T>; fn new_service(&self) -> Self::Future { TcpAcceptorResponse { fut: self.inner.new_service(), - settings: self.settings.clone(), } } } -pub(crate) struct TcpAcceptorResponse<T, H> +pub(crate) struct TcpAcceptorResponse<T> where - H: HttpHandler, T: NewService<Request = TcpStream>, { fut: T::Future, - settings: WorkerSettings<H>, } -impl<T, H> Future for TcpAcceptorResponse<T, H> +impl<T> Future for TcpAcceptorResponse<T> where - H: HttpHandler, T: NewService<Request = TcpStream>, { - type Item = TcpAcceptorService<T::Service, H>; - type Error = (); + type Item = TcpAcceptorService<T::Service>; + type Error = T::InitError; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.fut.poll() { - Err(_) => Err(()), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(service)) => Ok(Async::Ready(TcpAcceptorService { - inner: service, - settings: self.settings.clone(), - })), + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(service) => { + Ok(Async::Ready(TcpAcceptorService { inner: service })) + } } } } -pub(crate) struct TcpAcceptorService<T, H: HttpHandler> { +pub(crate) struct TcpAcceptorService<T> { inner: T, - settings: WorkerSettings<H>, } -impl<T, H> Service for TcpAcceptorService<T, H> +impl<T, E> Service for TcpAcceptorService<T> where - H: HttpHandler, - T: Service<Request = TcpStream>, + T: Service<Request = TcpStream, Error = AcceptorError<E>>, { - type Request = ServerMessage; - type Response = (); - type Error = (); - type Future = Either<TcpAcceptorServiceFut<T::Future>, FutureResult<(), ()>>; + type Request = net::TcpStream; + type Response = T::Response; + type Error = AcceptorError<E>; + type Future = Either<T::Future, FutureResult<Self::Response, Self::Error>>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(|_| ()) + self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { - match req { - ServerMessage::Connect(stream) => { - let stream = - TcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); + let stream = TcpStream::from_std(req, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + AcceptorError::Io(e) + }); - if let Ok(stream) = stream { - Either::A(TcpAcceptorServiceFut { - fut: self.inner.call(stream), - }) - } else { - Either::B(err(())) - } - } - ServerMessage::Shutdown(timeout) => Either::B(ok(())), - ServerMessage::ForceShutdown => { - // self.settings.head().traverse::<TcpStream, H>(); - Either::B(ok(())) - } + match stream { + Ok(stream) => Either::A(self.inner.call(stream)), + Err(e) => Either::B(err(e)), } } } -pub(crate) struct TcpAcceptorServiceFut<T: Future> { - fut: T, -} - -impl<T> Future for TcpAcceptorServiceFut<T> -where - T: Future, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.fut.poll() { - Err(_) => Err(()), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Ok(Async::Ready(())), - } - } -} - -/// Errors produced by `AcceptorTimeout` service. -#[derive(Debug)] -pub enum TimeoutError<T> { - /// The inner service error - Service(T), - - /// The request did not complete within the specified timeout. - Timeout, -} - /// Acceptor timeout middleware /// /// Applies timeout to request prcoessing. @@ -235,7 +179,7 @@ impl<T: NewService> AcceptorTimeout<T> { impl<T: NewService> NewService for AcceptorTimeout<T> { type Request = T::Request; type Response = T::Response; - type Error = TimeoutError<T::Error>; + type Error = AcceptorError<T::Error>; type InitError = T::InitError; type Service = AcceptorTimeoutService<T::Service>; type Future = AcceptorTimeoutFut<T>; @@ -278,11 +222,11 @@ pub(crate) struct AcceptorTimeoutService<T> { impl<T: Service> Service for AcceptorTimeoutService<T> { type Request = T::Request; type Response = T::Response; - type Error = TimeoutError<T::Error>; + type Error = AcceptorError<T::Error>; type Future = AcceptorTimeoutResponse<T>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(TimeoutError::Service) + self.inner.poll_ready().map_err(AcceptorError::Service) } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -299,17 +243,134 @@ pub(crate) struct AcceptorTimeoutResponse<T: Service> { } impl<T: Service> Future for AcceptorTimeoutResponse<T> { type Item = T::Response; - type Error = TimeoutError<T::Error>; + type Error = AcceptorError<T::Error>; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - match self.fut.poll() { - Ok(Async::NotReady) => match self.sleep.poll() { - Err(_) => Err(TimeoutError::Timeout), - Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), + match self.fut.poll().map_err(AcceptorError::Service)? { + Async::NotReady => match self.sleep.poll() { + Err(_) => Err(AcceptorError::Timeout), + Ok(Async::Ready(_)) => Err(AcceptorError::Timeout), Ok(Async::NotReady) => Ok(Async::NotReady), }, - Ok(Async::Ready(resp)) => Ok(Async::Ready(resp)), - Err(err) => Err(TimeoutError::Service(err)), + Async::Ready(resp) => Ok(Async::Ready(resp)), + } + } +} + +pub(crate) struct ServerMessageAcceptor<T, H: HttpHandler> { + inner: T, + settings: WorkerSettings<H>, +} + +impl<T, H> ServerMessageAcceptor<T, H> +where + H: HttpHandler, + T: NewService<Request = net::TcpStream>, +{ + pub(crate) fn new(settings: WorkerSettings<H>, inner: T) -> Self { + ServerMessageAcceptor { inner, settings } + } +} + +impl<T, H> NewService for ServerMessageAcceptor<T, H> +where + H: HttpHandler, + T: NewService<Request = net::TcpStream>, +{ + type Request = ServerMessage; + type Response = (); + type Error = T::Error; + type InitError = T::InitError; + type Service = ServerMessageAcceptorService<T::Service, H>; + type Future = ServerMessageAcceptorResponse<T, H>; + + fn new_service(&self) -> Self::Future { + ServerMessageAcceptorResponse { + fut: self.inner.new_service(), + settings: self.settings.clone(), + } + } +} + +pub(crate) struct ServerMessageAcceptorResponse<T, H> +where + H: HttpHandler, + T: NewService<Request = net::TcpStream>, +{ + fut: T::Future, + settings: WorkerSettings<H>, +} + +impl<T, H> Future for ServerMessageAcceptorResponse<T, H> +where + H: HttpHandler, + T: NewService<Request = net::TcpStream>, +{ + type Item = ServerMessageAcceptorService<T::Service, H>; + type Error = T::InitError; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(service) => Ok(Async::Ready(ServerMessageAcceptorService { + inner: service, + settings: self.settings.clone(), + })), + } + } +} + +pub(crate) struct ServerMessageAcceptorService<T, H: HttpHandler> { + inner: T, + settings: WorkerSettings<H>, +} + +impl<T, H> Service for ServerMessageAcceptorService<T, H> +where + H: HttpHandler, + T: Service<Request = net::TcpStream>, +{ + type Request = ServerMessage; + type Response = (); + type Error = T::Error; + type Future = + Either<ServerMessageAcceptorServiceFut<T>, FutureResult<(), Self::Error>>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match req { + ServerMessage::Connect(stream) => { + Either::A(ServerMessageAcceptorServiceFut { + fut: self.inner.call(stream), + }) + } + ServerMessage::Shutdown(timeout) => Either::B(ok(())), + ServerMessage::ForceShutdown => { + // self.settings.head().traverse::<TcpStream, H>(); + Either::B(ok(())) + } + } + } +} + +pub(crate) struct ServerMessageAcceptorServiceFut<T: Service> { + fut: T::Future, +} + +impl<T> Future for ServerMessageAcceptorServiceFut<T> +where + T: Service, +{ + type Item = (); + type Error = T::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(_) => Ok(Async::Ready(())), } } } diff --git a/src/server/builder.rs b/src/server/builder.rs index 28541820..46ab9f46 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -5,7 +5,10 @@ use actix_net::either::Either; use actix_net::server::{Server, ServiceFactory}; use actix_net::service::{NewService, NewServiceExt}; -use super::acceptor::{AcceptorServiceFactory, AcceptorTimeout, TcpAcceptor}; +use super::acceptor::{ + AcceptorServiceFactory, AcceptorTimeout, ServerMessageAcceptor, TcpAcceptor, +}; +use super::error::AcceptorError; use super::handler::{HttpHandler, IntoHttpHandler}; use super::service::HttpService; use super::settings::{ServerSettings, WorkerSettings}; @@ -99,16 +102,30 @@ where ); if timeout == 0 { - Either::A(TcpAcceptor::new( + Either::A(ServerMessageAcceptor::new( settings.clone(), - acceptor.create().and_then(pipeline.create(settings)), + TcpAcceptor::new(acceptor.create().map_err(AcceptorError::Service)) + .map_err(|_| ()) + .map_init_err(|_| ()) + .and_then( + pipeline + .create(settings) + .map_init_err(|_| ()) + .map_err(|_| ()), + ), )) } else { - Either::B(TcpAcceptor::new( + Either::B(ServerMessageAcceptor::new( settings.clone(), - AcceptorTimeout::new(timeout, acceptor.create()) + TcpAcceptor::new(AcceptorTimeout::new(timeout, acceptor.create())) .map_err(|_| ()) - .and_then(pipeline.create(settings)), + .map_init_err(|_| ()) + .and_then( + pipeline + .create(settings) + .map_init_err(|_| ()) + .map_err(|_| ()), + ), )) } } @@ -153,12 +170,7 @@ where pub trait HttpPipelineFactory<H: HttpHandler>: Send + Clone + 'static { type Io: IoStream; - type NewService: NewService< - Request = Self::Io, - Response = (), - Error = (), - InitError = (), - >; + type NewService: NewService<Request = Self::Io, Response = ()>; fn create(&self, settings: WorkerSettings<H>) -> Self::NewService; } @@ -166,7 +178,7 @@ pub trait HttpPipelineFactory<H: HttpHandler>: Send + Clone + 'static { impl<F, T, H> HttpPipelineFactory<H> for F where F: Fn(WorkerSettings<H>) -> T + Send + Clone + 'static, - T: NewService<Response = (), Error = (), InitError = ()>, + T: NewService<Response = ()>, T::Request: IoStream, H: HttpHandler, { diff --git a/src/server/channel.rs b/src/server/channel.rs index 0d92c23a..c1e6b6b2 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,7 +1,6 @@ use std::net::{Shutdown, SocketAddr}; use std::{io, ptr, time}; -use actix::Message; use bytes::{Buf, BufMut, BytesMut}; use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -283,10 +282,6 @@ where io: T, } -impl<T: AsyncRead + AsyncWrite + 'static> Message for WrapperStream<T> { - type Result = (); -} - impl<T> WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static, diff --git a/src/server/error.rs b/src/server/error.rs index d08ccf87..ff8b831a 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -1,9 +1,24 @@ +use std::io; + use futures::{Async, Poll}; use super::{helpers, HttpHandlerTask, Writer}; use http::{StatusCode, Version}; use Error; +/// Errors produced by `AcceptorError` service. +#[derive(Debug)] +pub enum AcceptorError<T> { + /// The inner service error + Service(T), + + /// Io specific error + Io(io::Error), + + /// The request did not complete within the specified timeout. + Timeout, +} + pub(crate) struct ServerError(Version, StatusCode); impl ServerError { diff --git a/src/server/http.rs b/src/server/http.rs index 81c4d3ad..846f7f01 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,13 +1,11 @@ use std::{io, mem, net}; -use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; +use actix::{Addr, System}; use actix_net::server::Server; use actix_net::ssl; -use futures::Stream; use net2::TcpBuilder; use num_cpus; -use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] use native_tls::TlsAcceptor; @@ -20,9 +18,6 @@ use rustls::ServerConfig; use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor}; use super::builder::{DefaultPipelineFactory, HttpServiceBuilder, ServiceProvider}; -use super::channel::{HttpChannel, WrapperStream}; -use super::handler::HttpHandler; -use super::settings::{ServerSettings, WorkerSettings}; use super::{IntoHttpHandler, KeepAlive}; struct Socket { @@ -42,9 +37,10 @@ where H: IntoHttpHandler + 'static, F: Fn() -> H + Send + Clone, { - factory: F, - host: Option<String>, - keep_alive: KeepAlive, + pub(super) factory: F, + pub(super) host: Option<String>, + pub(super) keep_alive: KeepAlive, + pub(super) client_timeout: usize, backlog: i32, threads: usize, exit: bool, @@ -53,7 +49,6 @@ where no_signals: bool, maxconn: usize, maxconnrate: usize, - client_timeout: usize, sockets: Vec<Socket>, } @@ -524,61 +519,6 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> { } } -impl<H, F> HttpServer<H, F> -where - H: IntoHttpHandler, - F: Fn() -> H + Send + Clone, -{ - #[doc(hidden)] - #[deprecated(since = "0.7.8")] - /// Start listening for incoming connections from a stream. - /// - /// This method uses only one thread for handling incoming connections. - pub fn start_incoming<T, S>(self, stream: S, secure: bool) - where - S: Stream<Item = T, Error = io::Error> + 'static, - T: AsyncRead + AsyncWrite + 'static, - { - // set server settings - let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let apps = (self.factory)().into_handler(); - let settings = WorkerSettings::new( - apps, - self.keep_alive, - self.client_timeout as u64, - ServerSettings::new(Some(addr), &self.host, secure), - ); - - // start server - HttpIncoming::create(move |ctx| { - ctx.add_message_stream( - stream.map_err(|_| ()).map(move |t| WrapperStream::new(t)), - ); - HttpIncoming { settings } - }); - } -} - -struct HttpIncoming<H: HttpHandler> { - settings: WorkerSettings<H>, -} - -impl<H: HttpHandler> Actor for HttpIncoming<H> { - type Context = Context<Self>; -} - -impl<T, H> Handler<WrapperStream<T>> for HttpIncoming<H> -where - T: AsyncRead + AsyncWrite, - H: HttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: WrapperStream<T>, _: &mut Context<Self>) -> Self::Result { - Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); - } -} - fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result<net::TcpListener> { diff --git a/src/server/incoming.rs b/src/server/incoming.rs new file mode 100644 index 00000000..7ab289d0 --- /dev/null +++ b/src/server/incoming.rs @@ -0,0 +1,70 @@ +//! Support for `Stream<Item=T::AsyncReady+AsyncWrite>`, deprecated! +use std::{io, net}; + +use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message}; +use futures::Stream; +use tokio_io::{AsyncRead, AsyncWrite}; + +use super::channel::{HttpChannel, WrapperStream}; +use super::handler::{HttpHandler, IntoHttpHandler}; +use super::http::HttpServer; +use super::settings::{ServerSettings, WorkerSettings}; + +impl<T: AsyncRead + AsyncWrite + 'static> Message for WrapperStream<T> { + type Result = (); +} + +impl<H, F> HttpServer<H, F> +where + H: IntoHttpHandler, + F: Fn() -> H + Send + Clone, +{ + #[doc(hidden)] + #[deprecated(since = "0.7.8")] + /// Start listening for incoming connections from a stream. + /// + /// This method uses only one thread for handling incoming connections. + pub fn start_incoming<T, S>(self, stream: S, secure: bool) + where + S: Stream<Item = T, Error = io::Error> + 'static, + T: AsyncRead + AsyncWrite + 'static, + { + // set server settings + let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let apps = (self.factory)().into_handler(); + let settings = WorkerSettings::new( + apps, + self.keep_alive, + self.client_timeout as u64, + ServerSettings::new(Some(addr), &self.host, secure), + ); + + // start server + HttpIncoming::create(move |ctx| { + ctx.add_message_stream( + stream.map_err(|_| ()).map(move |t| WrapperStream::new(t)), + ); + HttpIncoming { settings } + }); + } +} + +struct HttpIncoming<H: HttpHandler> { + settings: WorkerSettings<H>, +} + +impl<H: HttpHandler> Actor for HttpIncoming<H> { + type Context = Context<Self>; +} + +impl<T, H> Handler<WrapperStream<T>> for HttpIncoming<H> +where + T: AsyncRead + AsyncWrite, + H: HttpHandler, +{ + type Result = (); + + fn handle(&mut self, msg: WrapperStream<T>, _: &mut Context<Self>) -> Self::Result { + Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 9e91eda0..1e145571 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -129,6 +129,7 @@ mod h2writer; mod handler; pub(crate) mod helpers; mod http; +pub(crate) mod incoming; pub(crate) mod input; pub(crate) mod message; pub(crate) mod output;