diff --git a/actix-server-config/Cargo.toml b/actix-server-config/Cargo.toml index 5c7053be..58d0486d 100644 --- a/actix-server-config/Cargo.toml +++ b/actix-server-config/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" features = ["ssl", "rust-tls", "uds"] [features] -default = [] +default = ["ssl", "rustls" ] # openssl ssl = ["tokio-openssl"] @@ -26,13 +26,14 @@ ssl = ["tokio-openssl"] rust-tls = ["rustls", "tokio-rustls"] # unix domain sockets -uds = ["tokio-uds"] +# TODO: FIXME +uds = [] # ["tokio-uds"] [dependencies] -futures = "0.1.25" -tokio-io = "0.1.12" -tokio-tcp = "0.1" -tokio-openssl = { version="0.3.0", optional = true } +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +tokio = "0.2.0-alpha.4" + +tokio-openssl = { version="0.4.0-alpha.4", optional = true } rustls = { version = "0.16.0", optional = true } -tokio-rustls = { version = "0.10.0", optional = true } -tokio-uds = { version="0.2.5", optional = true } +tokio-rustls = { version = "0.12.0-alpha.2", optional = true } +#tokio-uds = { version="0.3.0-alpha.1", optional = true } diff --git a/actix-server-config/src/lib.rs b/actix-server-config/src/lib.rs index fc5dac04..bc487b86 100644 --- a/actix-server-config/src/lib.rs +++ b/actix-server-config/src/lib.rs @@ -3,8 +3,8 @@ use std::net::SocketAddr; use std::rc::Rc; use std::{fmt, io, net, time}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_tcp::TcpStream; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::tcp::TcpStream; #[derive(Debug, Clone)] pub struct ServerConfig { @@ -172,25 +172,25 @@ impl IoStream for TcpStream { } #[cfg(any(feature = "ssl"))] -impl IoStream for tokio_openssl::SslStream { +impl IoStream for tokio_openssl::SslStream { #[inline] fn peer_addr(&self) -> Option { - self.get_ref().get_ref().peer_addr() + self.get_ref().peer_addr() } #[inline] fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { - self.get_mut().get_mut().set_nodelay(nodelay) + self.get_mut().set_nodelay(nodelay) } #[inline] fn set_linger(&mut self, dur: Option) -> io::Result<()> { - self.get_mut().get_mut().set_linger(dur) + self.get_mut().set_linger(dur) } #[inline] fn set_keepalive(&mut self, dur: Option) -> io::Result<()> { - self.get_mut().get_mut().set_keepalive(dur) + self.get_mut().set_keepalive(dur) } } @@ -218,7 +218,7 @@ impl IoStream for tokio_rustls::server::TlsStream { } #[cfg(all(unix, feature = "uds"))] -impl IoStream for tokio_uds::UnixStream { +impl IoStream for t::UnixStream { #[inline] fn peer_addr(&self) -> Option { None diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 1b3aa478..8680373f 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -7,6 +7,7 @@ * Update `rustls` to 0.16 * Minimum required Rust version upped to 1.37.0 + ## [0.6.1] - 2019-09-25 ### Added diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 3720b887..011c76c3 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -33,7 +33,7 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"] # uds -uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] +# uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] [dependencies] actix-rt = "0.2.2" @@ -45,28 +45,27 @@ num_cpus = "1.0" mio = "0.6.19" net2 = "0.2" -futures = "0.1" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } slab = "0.4" -tokio-io = "0.1" -tokio-tcp = "0.1" -tokio-timer = "0.2.8" -tokio-reactor = "0.1" -tokio-signal = "0.2" +tokio = "0.2.0-alpha.4" +tokio-io = "0.2.0-alpha.4" +tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] } +tokio-timer = "0.3.0-alpha.4" # unix domain sockets mio-uds = { version="0.6.7", optional = true } -tokio-uds = { version="0.2.5", optional = true } +#tokio-uds = { version="0.2.5", optional = true } # native-tls native-tls = { version="0.2", optional = true } # openssl openssl = { version="0.10", optional = true } -tokio-openssl = { version="0.3", optional = true } +tokio-openssl = { version="0.4.0-alpha.4", optional = true } # rustls rustls = { version = "0.16.0", optional = true } -tokio-rustls = { version = "0.10.0", optional = true } +tokio-rustls = { version = "0.12.0-alpha.2", optional = true } webpki = { version = "0.21", optional = true } webpki-roots = { version = "0.17", optional = true } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ce7c9325..597471ea 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -3,15 +3,16 @@ use std::time::{Duration, Instant}; use std::{io, thread}; use actix_rt::System; -use futures::future::{lazy, Future}; + use log::{error, info}; use slab::Slab; -use tokio_timer::Delay; +use tokio_timer::delay; use crate::server::Server; use crate::socket::{SocketAddr, SocketListener, StdListener}; use crate::worker::{Conn, WorkerClient}; use crate::Token; +use futures::{Future, FutureExt}; pub(crate) enum Command { Pause, @@ -440,14 +441,13 @@ impl Accept { info.timeout = Some(Instant::now() + Duration::from_millis(500)); let r = self.timer.1.clone(); - System::current().arbiter().send(lazy(move || { - Delay::new(Instant::now() + Duration::from_millis(510)) - .map_err(|_| ()) - .and_then(move |_| { - let _ = r.set_readiness(mio::Ready::readable()); - Ok(()) - }) - })); + System::current().arbiter().send( + async move { + delay(Instant::now() + Duration::from_millis(510)).await; + r.set_readiness(mio::Ready::readable()); + } + .boxed(), + ); return; } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index a0f0c3aa..7a22b156 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -2,14 +2,14 @@ use std::time::Duration; use std::{io, mem, net}; use actix_rt::{spawn, Arbiter, System}; -use futures::future::{lazy, ok}; -use futures::stream::futures_unordered; -use futures::sync::mpsc::{unbounded, UnboundedReceiver}; -use futures::{Async, Future, Poll, Stream}; +use futures::channel::mpsc::{unbounded, UnboundedReceiver}; +use futures::future::ready; +use futures::stream::FuturesUnordered; +use futures::{ready, Future, FutureExt, Poll, Stream, StreamExt}; use log::{error, info}; use net2::TcpBuilder; use num_cpus; -use tokio_tcp::TcpStream; +use tokio_net::tcp::TcpStream; use tokio_timer::sleep; use crate::accept::{AcceptLoop, AcceptNotify, Command}; @@ -20,6 +20,8 @@ use crate::signals::{Signal, Signals}; use crate::socket::StdListener; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::{ssl, Token}; +use std::pin::Pin; +use std::task::Context; /// Server builder pub struct ServerBuilder { @@ -320,10 +322,12 @@ impl ServerBuilder { let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Arbiter::new().send(lazy(move || { - Worker::start(rx1, rx2, services, avail, timeout); - Ok::<_, ()>(()) - })); + Arbiter::new().send( + async move { + Worker::start(rx1, rx2, services, avail, timeout); + } + .boxed(), + ); worker } @@ -381,31 +385,34 @@ impl ServerBuilder { // stop workers if !self.workers.is_empty() && graceful { spawn( - futures_unordered( - self.workers - .iter() - .map(move |worker| worker.1.stop(graceful)), - ) - .collect() - .then(move |_| { - if let Some(tx) = completion { - let _ = tx.send(()); - } - if exit { - spawn(sleep(Duration::from_millis(300)).then(|_| { - System::current().stop(); - ok(()) - })); - } - ok(()) - }), + self.workers + .iter() + .map(move |worker| worker.1.stop(graceful)) + .collect::>() + .collect::>() + .then(move |_| { + if let Some(tx) = completion { + let _ = tx.send(()); + } + if exit { + spawn( + async { + tokio_timer::sleep(Duration::from_millis(300)) + .await; + System::current().stop(); + } + .boxed(), + ); + } + ready(()) + }), ) } else { // we need to stop system if server was spawned if self.exit { spawn(sleep(Duration::from_millis(300)).then(|_| { System::current().stop(); - ok(()) + ready(()) })); } if let Some(tx) = completion { @@ -447,15 +454,15 @@ impl ServerBuilder { } impl Future for ServerBuilder { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.cmd.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some(item))) => self.handle_cmd(item), + match ready!(Pin::new(&mut self.cmd).poll_next(cx)) { + Some(it) => self.as_mut().get_mut().handle_cmd(it), + None => { + return Poll::Pending; + } } } } diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index e224d6a4..e8b84d56 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -1,19 +1,20 @@ use std::collections::HashMap; use std::{fmt, io, net}; -use actix_server_config::{Io, ServerConfig}; -use actix_service::{IntoNewService, NewService}; -use futures::future::{join_all, Future}; -use log::error; -use tokio_tcp::TcpStream; - use crate::counter::CounterGuard; +use actix_server_config::{Io, ServerConfig}; +use actix_service::{IntoNewService, NewService, ServiceExt}; +use futures::future::{join_all, Future, FutureExt, LocalBoxFuture, TryFutureExt}; +use log::error; +use std::pin::Pin; +use tokio_net::tcp::TcpStream; use super::builder::bind_addr; use super::services::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; use super::Token; +use std::process::Output; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, @@ -108,50 +109,39 @@ impl InternalServiceFactory for ConfiguredService { }) } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { // configure services let mut rt = ServiceRuntime::new(self.services.clone()); self.rt.configure(&mut rt); rt.validate(); - let services = rt.services; + let mut names = self.names.clone(); - // on start futures - if rt.onstart.is_empty() { - // construct services - let mut fut = Vec::new(); - for (token, ns) in services { - let config = ServerConfig::new(self.names[&token].1); - fut.push(ns.new_service(&config).map(move |service| (token, service))); + // construct services + async move { + let services = rt.services; + // TODO: Proper error handling here + for f in rt.onstart.into_iter() { + f.await; } + let mut res = vec![]; + for (token, ns) in services.into_iter() { + let config = ServerConfig::new(names[&token].1); - Box::new(join_all(fut).map_err(|e| { - error!("Can not construct service: {:?}", e); - })) - } else { - let names = self.names.clone(); - - // run onstart future and then construct services - Box::new( - join_all(rt.onstart) - .map_err(|e| { - error!("Can not construct service: {:?}", e); - }) - .and_then(move |_| { - // construct services - let mut fut = Vec::new(); - for (token, ns) in services { - let config = ServerConfig::new(names[&token].1); - fut.push( - ns.new_service(&config).map(move |service| (token, service)), - ); - } - join_all(fut).map_err(|e| { - error!("Can not construct service: {:?}", e); - }) - }), - ) + let newserv = ns.new_service(&config); + match newserv.await { + Ok(serv) => { + res.push((token, serv)); + } + Err(e) => { + error!("Can not construct service {:?}", e); + return Err(e); + } + }; + } + return Ok(res); } + .boxed_local() } } @@ -181,7 +171,7 @@ fn not_configured(_: &mut ServiceRuntime) { pub struct ServiceRuntime { names: HashMap, services: HashMap, - onstart: Vec>>, + onstart: Vec>, } impl ServiceRuntime { @@ -215,12 +205,15 @@ impl ServiceRuntime { { // let name = name.to_owned(); if let Some(token) = self.names.get(name) { + + self.services.insert( - token.clone(), - Box::new(ServiceFactory { - inner: service.into_new_service(), - }), - ); + token.clone(), + Box::new(ServiceFactory { + inner: service.into_new_service(), + }), + ); + } else { panic!("Unknown service: {:?}", name); } @@ -229,9 +222,9 @@ impl ServiceRuntime { /// Execute future before services initialization. pub fn on_start(&mut self, fut: F) where - F: Future + 'static, + F: Future + 'static, { - self.onstart.push(Box::new(fut)) + self.onstart.push(fut.boxed_local()) } } @@ -243,7 +236,7 @@ type BoxedNewService = Box< InitError = (), Config = ServerConfig, Service = BoxedServerService, - Future = Box>, + Future = LocalBoxFuture<'static, Result>, >, >; @@ -265,12 +258,18 @@ where type InitError = (); type Config = ServerConfig; type Service = BoxedServerService; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; + + // Box, ()>>>; fn new_service(&self, cfg: &ServerConfig) -> Self::Future { - Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| { - let service: BoxedServerService = Box::new(StreamService::new(s)); - service - })) + let fut = self.inner.new_service(cfg); + async move { + return match fut.await { + Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), + Err(e) => Err(()), + }; + } + .boxed_local() } } diff --git a/actix-server/src/counter.rs b/actix-server/src/counter.rs index 539ce497..74f716da 100644 --- a/actix-server/src/counter.rs +++ b/actix-server/src/counter.rs @@ -1,7 +1,8 @@ use std::cell::Cell; use std::rc::Rc; -use futures::task::AtomicTask; +use futures::task::AtomicWaker; +use std::task; #[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number @@ -13,7 +14,7 @@ pub struct Counter(Rc); struct CounterInner { count: Cell, capacity: usize, - task: AtomicTask, + task: AtomicWaker, } impl Counter { @@ -22,7 +23,7 @@ impl Counter { Counter(Rc::new(CounterInner { capacity, count: Cell::new(0), - task: AtomicTask::new(), + task: AtomicWaker::new(), })) } @@ -31,8 +32,8 @@ impl Counter { } /// Check if counter is not at capacity - pub fn available(&self) -> bool { - self.0.available() + pub fn available(&self, cx: &mut task::Context) -> bool { + self.0.available(cx) } /// Get total number of acquired counts @@ -66,14 +67,14 @@ impl CounterInner { let num = self.count.get(); self.count.set(num - 1); if num == self.capacity { - self.task.notify(); + self.task.wake(); } } - fn available(&self) -> bool { + fn available(&self, cx: &mut task::Context) -> bool { let avail = self.count.get() < self.capacity; if !avail { - self.task.register(); + self.task.register(cx.waker()); } avail } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f1d862c9..bb7f0649 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,6 +1,6 @@ -use futures::sync::mpsc::UnboundedSender; -use futures::sync::oneshot; -use futures::Future; +use futures::channel::mpsc::UnboundedSender; +use futures::channel::oneshot; +use futures::{Future, TryFutureExt}; use crate::builder::ServerBuilder; use crate::signals::Signal; @@ -43,14 +43,14 @@ impl Server { /// /// If socket contains some pending connection, they might be dropped. /// All opened connection remains active. - pub fn pause(&self) -> impl Future { + pub fn pause(&self) -> impl Future> { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); rx.map_err(|_| ()) } /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future { + pub fn resume(&self) -> impl Future> { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); rx.map_err(|_| ()) @@ -59,7 +59,7 @@ impl Server { /// Stop incoming connection processing, stop all workers and exit. /// /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future { + pub fn stop(&self, graceful: bool) -> impl Future> { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Stop { graceful, diff --git a/actix-server/src/services.rs b/actix-server/src/services.rs index 6dd90838..5d6181a5 100644 --- a/actix-server/src/services.rs +++ b/actix-server/src/services.rs @@ -4,14 +4,16 @@ use std::time::Duration; use actix_rt::spawn; use actix_server_config::{Io, ServerConfig}; -use actix_service::{NewService, Service}; -use futures::future::{err, ok, FutureResult}; -use futures::{Future, Poll}; +use actix_service::{NewService, Service, ServiceExt}; +use futures::future::{err, ok, LocalBoxFuture, Ready}; +use futures::{Future, FutureExt, Poll, StreamExt, TryFutureExt}; use log::error; use super::Token; use crate::counter::CounterGuard; use crate::socket::{FromStream, StdStream}; +use std::pin::Pin; +use std::task::Context; /// Server message pub(crate) enum ServerMessage { @@ -34,7 +36,7 @@ pub(crate) trait InternalServiceFactory: Send { fn clone_factory(&self) -> Box; - fn create(&self) -> Box, Error = ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; } pub(crate) type BoxedServerService = Box< @@ -42,7 +44,7 @@ pub(crate) type BoxedServerService = Box< Request = (Option, ServerMessage), Response = (), Error = (), - Future = FutureResult<(), ()>, + Future = Ready>, >, >; @@ -66,12 +68,20 @@ where type Request = (Option, ServerMessage); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + /* fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready().map_err(|_| ()) } - + */ fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { @@ -80,10 +90,14 @@ where }); if let Ok(stream) = stream { - spawn(self.service.call(Io::new(stream)).then(move |res| { - drop(guard); - res.map_err(|_| ()).map(|_| ()) - })); + let f = self.service.call(Io::new(stream)); + spawn( + async move { + f.await; + drop(guard); + } + .boxed_local(), + ); ok(()) } else { err(()) @@ -142,19 +156,19 @@ where }) } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { let token = self.token; let config = ServerConfig::new(self.addr); - Box::new( - self.inner - .create() - .new_service(&config) - .map_err(|_| ()) - .map(move |inner| { - let service: BoxedServerService = Box::new(StreamService::new(inner)); - vec![(token, service)] - }), - ) + + self.inner + .create() + .new_service(&config) + .map_err(|_| ()) + .map_ok(move |inner| { + let service: BoxedServerService = Box::new(StreamService::new(inner)); + vec![(token, service)] + }) + .boxed_local() } } @@ -167,7 +181,7 @@ impl InternalServiceFactory for Box { self.as_ref().clone_factory() } - fn create(&self) -> Box, Error = ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { self.as_ref().create() } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 4d9e085f..85c88734 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,10 +1,17 @@ use std::io; use actix_rt::spawn; -use futures::stream::futures_unordered; -use futures::{Async, Future, Poll, Stream}; +use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream}; +use futures::{ + Future, FutureExt, Poll, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt, +}; use crate::server::Server; +use actix_service::ServiceExt; +use futures::future::LocalBoxFuture; +use std::pin::Pin; +use std::task::Context; +use tokio_net::signal::unix::signal; /// Different types of process signals #[derive(PartialEq, Clone, Copy, Debug)] @@ -27,14 +34,14 @@ pub(crate) struct Signals { streams: Vec, } -type SigStream = Box>; +type SigStream = LocalBoxStream<'static, Result>; impl Signals { pub(crate) fn start(srv: Server) { let fut = { #[cfg(not(unix))] { - tokio_signal::ctrl_c() + tokio_net::signal::ctrl_c() .map_err(|_| ()) .and_then(move |stream| Signals { srv, @@ -44,51 +51,79 @@ impl Signals { #[cfg(unix)] { - use tokio_signal::unix; + use tokio_net::signal::unix; - let mut sigs: Vec>> = - Vec::new(); - sigs.push(Box::new( - tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { - let s: SigStream = Box::new(stream.map(|_| Signal::Int)); - s - }), - )); - sigs.push(Box::new( - tokio_signal::unix::Signal::new(tokio_signal::unix::SIGHUP).map( - |stream: unix::Signal| { + let mut sigs: Vec<_> = Vec::new(); + + let mut SIG_MAP = [ + ( + tokio_net::signal::unix::SignalKind::interrupt(), + Signal::Int, + ), + (tokio_net::signal::unix::SignalKind::hangup(), Signal::Hup), + ( + tokio_net::signal::unix::SignalKind::terminate(), + Signal::Term, + ), + (tokio_net::signal::unix::SignalKind::quit(), Signal::Quit), + ]; + + for (kind, sig) in SIG_MAP.into_iter() { + let sig = sig.clone(); + let fut = signal(*kind).unwrap(); + sigs.push(fut.map(move |_| Ok(sig)).boxed_local()); + } + /* TODO: Finish rewriting this + sigs.push( + tokio_net::signal::unix::signal(tokio_net::signal::si).unwrap() + .map(|stream| { + let s: SigStream = Box::new(stream.map(|_| Signal::Int)); + s + }).boxed() + ); + sigs.push( + + tokio_net::signal::unix::signal(tokio_net::signal::unix::SignalKind::hangup()).unwrap() + .map(|stream: unix::Signal| { let s: SigStream = Box::new(stream.map(|_| Signal::Hup)); s - }, - ), - )); - sigs.push(Box::new( - tokio_signal::unix::Signal::new(tokio_signal::unix::SIGTERM).map( - |stream| { + }).boxed() + ); + sigs.push( + tokio_net::signal::unix::signal( + tokio_net::signal::unix::SignalKind::terminate() + ).unwrap() + .map(|stream| { let s: SigStream = Box::new(stream.map(|_| Signal::Term)); s - }, - ), - )); - sigs.push(Box::new( - tokio_signal::unix::Signal::new(tokio_signal::unix::SIGQUIT).map( - |stream| { + }).boxed(), + ); + sigs.push( + tokio_net::signal::unix::signal( + tokio_net::signal::unix::SignalKind::quit() + ).unwrap() + .map(|stream| { let s: SigStream = Box::new(stream.map(|_| Signal::Quit)); s - }, - ), - )); - futures_unordered(sigs) - .collect() - .map_err(|_| ()) - .and_then(move |streams| Signals { srv, streams }) + }).boxed() + ); + */ + + Signals { srv, streams: sigs } } }; - spawn(fut); + spawn(async {}); } } impl Future for Signals { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unimplemented!() + } + + /* type Item = (); type Error = (); @@ -115,4 +150,5 @@ impl Future for Signals { Ok(Async::NotReady) } } + */ } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 4cbefb80..574e6a1e 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,8 +1,8 @@ use std::{fmt, io, net}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_reactor::Handle; -use tokio_tcp::TcpStream; +use tokio_net::driver::Handle; +use tokio_net::tcp::TcpStream; pub(crate) enum StdListener { Tcp(net::TcpListener), diff --git a/actix-server/src/ssl/nativetls.rs b/actix-server/src/ssl/nativetls.rs index 3a6859fe..60099bef 100644 --- a/actix-server/src/ssl/nativetls.rs +++ b/actix-server/src/ssl/nativetls.rs @@ -9,6 +9,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; use crate::{Io, Protocol, ServerConfig}; +use std::pin::Pin; +use std::task::Context; /// Support `SSL` connections via native-tls package /// @@ -72,6 +74,18 @@ impl Service for NativeTlsAcceptorService { type Error = Error; type Future = Accept; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + if self.conns.available(ctx) { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + + /* fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.conns.available() { Ok(Async::Ready(())) @@ -79,6 +93,7 @@ impl Service for NativeTlsAcceptorService { Ok(Async::NotReady) } } + */ fn call(&mut self, req: Self::Request) -> Self::Future { let (io, params, _) = req.into_parts(); diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index 9a9b2120..22e9b3e7 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -1,14 +1,18 @@ use std::marker::PhantomData; use actix_service::{NewService, Service}; -use futures::{future::ok, future::FutureResult, Async, Future, Poll}; -use openssl::ssl::{HandshakeError, SslAcceptor}; +use futures::{future::ok, future::Ready, Future, FutureExt, Poll}; +use openssl::ssl::SslAcceptor; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; +use tokio_openssl::{HandshakeError, SslStream}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; use crate::{Io, Protocol, ServerConfig}; +use futures::future::LocalBoxFuture; +use std::io; +use std::pin::Pin; +use std::task::Context; /// Support `SSL` connections via openssl package /// @@ -37,14 +41,14 @@ impl Clone for OpensslAcceptor { } } -impl NewService for OpensslAcceptor { +impl NewService for OpensslAcceptor { type Request = Io; type Response = Io, P>; type Error = HandshakeError; type Config = ServerConfig; type Service = OpensslAcceptorService; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, cfg: &ServerConfig) -> Self::Future { cfg.set_secure(); @@ -65,12 +69,20 @@ pub struct OpensslAcceptorService { io: PhantomData<(T, P)>, } -impl Service for OpensslAcceptorService { +impl Service for OpensslAcceptorService { type Request = Io; type Response = Io, P>; type Error = HandshakeError; type Future = OpensslAcceptorServiceFut; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + /* fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.conns.available() { Ok(Async::Ready(())) @@ -78,12 +90,18 @@ impl Service for OpensslAcceptorService { Ok(Async::NotReady) } } + */ fn call(&mut self, req: Self::Request) -> Self::Future { let (io, params, _) = req.into_parts(); + let acc = self.acceptor.clone(); OpensslAcceptorServiceFut { _guard: self.conns.get(), - fut: SslAcceptorExt::accept_async(&self.acceptor, io), + fut: async move { + let acc = acc; + tokio_openssl::accept(&acc, io).await + } + .boxed_local::<'static>(), params: Some(params), } } @@ -93,17 +111,24 @@ pub struct OpensslAcceptorServiceFut where T: AsyncRead + AsyncWrite, { - fut: AcceptAsync, + fut: LocalBoxFuture<'static, Result, HandshakeError>>, params: Option

, _guard: CounterGuard, } impl Future for OpensslAcceptorServiceFut { + type Output = Result, P>, HandshakeError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unimplemented!() + } + + /* type Item = Io, P>; type Error = HandshakeError; fn poll(&mut self) -> Poll { - let io = futures::try_ready!(self.fut.poll()); + let io = futures::ready!(self.fut.poll())?; let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() { const H2: &[u8] = b"\x02h2"; const HTTP10: &[u8] = b"\x08http/1.0"; @@ -127,4 +152,5 @@ impl Future for OpensslAcceptorServiceFut { proto, ))) } + */ } diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index 06edc0f5..605863f3 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -11,6 +11,8 @@ use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; use crate::{Io, Protocol, ServerConfig as SrvConfig}; +use std::pin::Pin; +use std::task::Context; /// Support `SSL` connections via rustls package /// @@ -74,8 +76,11 @@ impl Service for RustlsAcceptorService { type Error = io::Error; type Future = RustlsAcceptorServiceFut; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.conns.available() { + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + if self.conns.available(cx) { Ok(Async::Ready(())) } else { Ok(Async::NotReady) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index e03688a8..613d8eaa 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,11 +1,11 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::{mem, time}; +use std::{mem, task, time}; -use actix_rt::{spawn, Arbiter}; -use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures::sync::oneshot; -use futures::{future, Async, Future, Poll, Stream}; +use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot; +use futures::{future, Future, Poll, Stream, TryFutureExt}; +use futures::{FutureExt, StreamExt}; use log::{error, info, trace}; use tokio_timer::{sleep, Delay}; @@ -14,6 +14,10 @@ use crate::counter::Counter; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::socket::{SocketAddr, StdStream}; use crate::Token; +use actix_rt::spawn; +use futures::future::{LocalBoxFuture, MapOk}; +use std::pin::Pin; +use std::task::Context; pub(crate) struct WorkerCommand(Conn); @@ -153,31 +157,36 @@ impl Worker { state: WorkerState::Unavailable(Vec::new()), }); - let mut fut = Vec::new(); + let mut fut: Vec, _>> = Vec::new(); for (idx, factory) in wrk.factories.iter().enumerate() { - fut.push(factory.create().map(move |res| { - res.into_iter() - .map(|(t, s)| (idx, t, s)) + fut.push(factory.create().map_ok(move |r| { + r.into_iter() + .map(|(t, s): (Token, _)| (idx, t, s)) .collect::>() })); } + spawn( - future::join_all(fut) - .map_err(|e| { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - }) - .and_then(move |services| { - for item in services { - for (idx, token, service) in item { - while token.0 >= wrk.services.len() { - wrk.services.push(None); + async { + let mut res = future::join_all(fut).await; + let res: Result, _> = res.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (idx, token, service) in item { + while token.0 >= wrk.services.len() { + wrk.services.push(None); + } } - wrk.services[token.0] = Some((idx, service)); } + Ok::<_, ()>(wrk); } - wrk - }), + Err(e) => { + //return Err(e); + } + } + } + .boxed_local(), ); } @@ -198,13 +207,18 @@ impl Worker { } } - fn check_readiness(&mut self, trace: bool) -> Result { + fn check_readiness( + &mut self, + trace: bool, + cx: &mut Context<'_>, + ) -> Result { + /* let mut ready = self.conns.available(); let mut failed = None; for (token, service) in &mut self.services.iter_mut().enumerate() { if let Some(service) = service { - match service.1.poll_ready() { - Ok(Async::Ready(_)) => { + match service.1.poll_ready(cx) { + Poll::Ready(Ok(_)) => { if trace { trace!( "Service {:?} is available", @@ -212,8 +226,8 @@ impl Worker { ); } } - Ok(Async::NotReady) => ready = false, - Err(_) => { + Poll::NotReady => ready = false, + Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", self.factories[service.0].name(Token(token)) @@ -227,7 +241,8 @@ impl Worker { Err(idx) } else { Ok(ready) - } + }*/ + unimplemented!() } } @@ -238,15 +253,19 @@ enum WorkerState { Restarting( usize, Token, - Box, Error = ()>>, + Box, ()>>>, ), Shutdown(Delay, Delay, oneshot::Sender), } impl Future for Worker { - type Item = (); - type Error = (); + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unimplemented!() + } + + /* fn poll(&mut self) -> Poll { // `StopWorker` message handler if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() { @@ -435,4 +454,5 @@ impl Future for Worker { WorkerState::None => panic!(), }; } + */ } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index de7f7b09..2cce4a7d 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -4,11 +4,12 @@ use std::{net, thread, time}; use actix_codec::{BytesCodec, Framed}; use actix_server::{Io, Server, ServerConfig}; -use actix_service::{new_service_cfg, service_fn, IntoService}; +use actix_service::{new_service_cfg, service_fn, IntoService, ServiceExt}; use bytes::Bytes; -use futures::{Future, Sink}; +use futures::{Future, FutureExt, Sink, SinkExt}; use net2::TcpBuilder; -use tokio_tcp::TcpStream; +use tokio::future::ok; +use tokio_net::tcp::TcpStream; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); @@ -30,7 +31,7 @@ fn test_bind() { .bind("test", addr, move || { new_service_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); - Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) + ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) }) }) .unwrap() @@ -54,7 +55,7 @@ fn test_bind_no_config() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let srv = Server::build() - .bind("test", addr, move || service_fn(|_| Ok::<_, ()>(()))) + .bind("test", addr, move || service_fn(|_| ok::<_, ()>(()))) .unwrap() .start(); let _ = tx.send((srv, actix_rt::System::current())); @@ -78,7 +79,7 @@ fn test_listen() { .listen("test", lst, move || { new_service_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); - Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) + ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) }) }) .unwrap() @@ -107,14 +108,15 @@ fn test_start() { .bind("test", addr, move || { new_service_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); - Ok::<_, ()>( - (|io: Io| { - Framed::new(io.into_parts().0, BytesCodec) - .send(Bytes::from_static(b"test")) - .then(|_| Ok::<_, ()>(())) - }) - .into_service(), - ) + + let serv_creator = (move |io: Io| async { + panic!("Stream"); + let mut f = Framed::new(io.into_parts().0, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + }).into_service(); + + ok::<_, ()>(serv_creator) }) }) .unwrap() @@ -125,7 +127,7 @@ fn test_start() { }); let (srv, sys) = rx.recv().unwrap(); - let mut buf = [0u8; 4]; + let mut buf = [1u8; 4]; let mut conn = net::TcpStream::connect(addr).unwrap(); let _ = conn.read_exact(&mut buf); assert_eq!(buf, b"test"[..]); diff --git a/actix-test-server/Cargo.toml b/actix-test-server/Cargo.toml index 2ce973a3..e0f07bb9 100644 --- a/actix-test-server/Cargo.toml +++ b/actix-test-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-test-server" -version = "0.2.3" +version = "0.2.2" authors = ["Nikolay Kim "] description = "Actix test server" keywords = ["network", "framework", "async", "futures"] @@ -11,11 +11,27 @@ categories = ["network-programming", "asynchronous"] license = "MIT/Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] edition = "2018" +workspace = ".." + +[package.metadata.docs.rs] +features = ["ssl", "tls", "rust-tls"] [lib] name = "actix_test_server" path = "src/lib.rs" +[features] +default = [] + +# tls +tls = ["native-tls", "actix-server/tls"] + +# openssl +ssl = ["openssl", "actix-server/ssl"] + +# rustls +rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] + [dependencies] actix-rt = "0.2.1" actix-server = "0.5.0" @@ -24,9 +40,21 @@ actix-testing = "0.1.0" log = "0.4" net2 = "0.2" -futures = "0.1" -tokio-tcp = "0.1" -tokio-reactor = "0.1" +futures = { package = "futures-preview", version = "0.3.0-alpha.18"} +tokio-io = "0.2.0-alpha.4" +tokio-net = "0.2.0-alpha.4" + +# native-tls +native-tls = { version="0.2", optional = true } + +# openssl +openssl = { version="0.10", optional = true } + +#rustls +rustls = { version = "^0.16", optional = true } +tokio-rustls = { version = "^0.12.0-alpha.2", optional = true } +webpki = { version = "0.21", optional = true } +webpki-roots = { version = "0.17", optional = true } [dev-dependencies] actix-service = "0.4.0" diff --git a/actix-test-server/src/lib.rs b/actix-test-server/src/lib.rs index 4681ce9b..7a3e0623 100644 --- a/actix-test-server/src/lib.rs +++ b/actix-test-server/src/lib.rs @@ -1,2 +1,149 @@ //! Various helpers for Actix applications to use during testing. -pub use actix_testing::*; +use std::sync::mpsc; +use std::{net, thread}; + +use actix_rt::{Runtime, System}; +use actix_server::{Server, StreamServiceFactory}; +pub use actix_server_config::{Io, ServerConfig}; + +use futures::future::{lazy, Future, IntoFuture}; +use net2::TcpBuilder; +use tokio_reactor::Handle; +use tokio_tcp::TcpStream; + +/// The `TestServer` type. +/// +/// `TestServer` is very simple test server that simplify process of writing +/// integration tests for actix-net applications. +/// +/// # Examples +/// +/// ```rust +/// use actix_service::{service_fn, IntoNewService}; +/// use actix_test_server::TestServer; +/// +/// fn main() { +/// let srv = TestServer::with(|| service_fn( +/// |sock| { +/// println!("New connection: {:?}", sock); +/// Ok::<_, ()>(()) +/// } +/// )); +/// +/// println!("SOCKET: {:?}", srv.connect()); +/// } +/// ``` +pub struct TestServer; + +/// Test server runstime +pub struct TestServerRuntime { + addr: net::SocketAddr, + host: String, + port: u16, + rt: Runtime, +} + +impl TestServer { + /// Start new test server with application factory + pub fn with(factory: F) -> TestServerRuntime { + let (tx, rx) = mpsc::channel(); + + // run server in separate thread + thread::spawn(move || { + let sys = System::new("actix-test-server"); + let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = tcp.local_addr().unwrap(); + + Server::build() + .listen("test", tcp, factory)? + .workers(1) + .disable_signals() + .start(); + + tx.send((System::current(), local_addr)).unwrap(); + sys.run() + }); + + let (system, addr) = rx.recv().unwrap(); + System::set_current(system); + + let rt = Runtime::new().unwrap(); + let host = format!("{}", addr.ip()); + let port = addr.port(); + + TestServerRuntime { + addr, + rt, + host, + port, + } + } + + /// Get firat available unused local address + pub fn unused_addr() -> net::SocketAddr { + let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); + let socket = TcpBuilder::new_v4().unwrap(); + socket.bind(&addr).unwrap(); + socket.reuse_address(true).unwrap(); + let tcp = socket.to_tcp_listener().unwrap(); + tcp.local_addr().unwrap() + } +} + +impl TestServerRuntime { + /// Execute future on current runtime + pub fn block_on(&mut self, fut: F) -> Result + where + F: Future, + { + self.rt.block_on(fut) + } + + /// Runs the provided function, with runtime enabled. + pub fn run_on(&mut self, f: F) -> Result + where + F: FnOnce() -> R, + R: IntoFuture, + { + self.rt.block_on(lazy(|| f().into_future())) + } + + /// Spawn future to the current runtime + pub fn spawn(&mut self, fut: F) + where + F: Future + 'static, + { + self.rt.spawn(fut); + } + + /// Test server host + pub fn host(&self) -> &str { + &self.host + } + + /// Test server port + pub fn port(&self) -> u16 { + self.port + } + + /// Get test server address + pub fn addr(&self) -> net::SocketAddr { + self.addr + } + + /// Stop http server + fn stop(&mut self) { + System::current().stop(); + } + + /// Connect to server, return tokio TcpStream + pub fn connect(&self) -> std::io::Result { + TcpStream::from_std(net::TcpStream::connect(self.addr)?, &Handle::default()) + } +} + +impl Drop for TestServerRuntime { + fn drop(&mut self) { + self.stop() + } +}