Migrate actix-server to std::future (#50)

Still not finished, this is more WIP, this is an aggregation of several commits, which
can be found in semtexzv/std-future-server-tmp branch
This commit is contained in:
Michal Hornický 2019-11-12 00:37:04 +01:00 committed by Nikolay Kim
parent ae40042ea7
commit 46382a922a
19 changed files with 564 additions and 263 deletions

View File

@ -17,7 +17,7 @@ path = "src/lib.rs"
features = ["ssl", "rust-tls", "uds"] features = ["ssl", "rust-tls", "uds"]
[features] [features]
default = [] default = ["ssl", "rustls" ]
# openssl # openssl
ssl = ["tokio-openssl"] ssl = ["tokio-openssl"]
@ -26,13 +26,14 @@ ssl = ["tokio-openssl"]
rust-tls = ["rustls", "tokio-rustls"] rust-tls = ["rustls", "tokio-rustls"]
# unix domain sockets # unix domain sockets
uds = ["tokio-uds"] # TODO: FIXME
uds = [] # ["tokio-uds"]
[dependencies] [dependencies]
futures = "0.1.25" futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
tokio-io = "0.1.12" tokio = "0.2.0-alpha.4"
tokio-tcp = "0.1"
tokio-openssl = { version="0.3.0", optional = true } tokio-openssl = { version="0.4.0-alpha.4", optional = true }
rustls = { version = "0.16.0", optional = true } rustls = { version = "0.16.0", optional = true }
tokio-rustls = { version = "0.10.0", optional = true } tokio-rustls = { version = "0.12.0-alpha.2", optional = true }
tokio-uds = { version="0.2.5", optional = true } #tokio-uds = { version="0.3.0-alpha.1", optional = true }

View File

@ -3,8 +3,8 @@ use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use std::{fmt, io, net, time}; use std::{fmt, io, net, time};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream; use tokio::net::tcp::TcpStream;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ServerConfig { pub struct ServerConfig {
@ -172,25 +172,25 @@ impl IoStream for TcpStream {
} }
#[cfg(any(feature = "ssl"))] #[cfg(any(feature = "ssl"))]
impl<T: IoStream> IoStream for tokio_openssl::SslStream<T> { impl<T: IoStream + Unpin> IoStream for tokio_openssl::SslStream<T> {
#[inline] #[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> { fn peer_addr(&self) -> Option<net::SocketAddr> {
self.get_ref().get_ref().peer_addr() self.get_ref().peer_addr()
} }
#[inline] #[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay) self.get_mut().set_nodelay(nodelay)
} }
#[inline] #[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur) self.get_mut().set_linger(dur)
} }
#[inline] #[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_keepalive(dur) self.get_mut().set_keepalive(dur)
} }
} }
@ -218,7 +218,7 @@ impl<T: IoStream> IoStream for tokio_rustls::server::TlsStream<T> {
} }
#[cfg(all(unix, feature = "uds"))] #[cfg(all(unix, feature = "uds"))]
impl IoStream for tokio_uds::UnixStream { impl IoStream for t::UnixStream {
#[inline] #[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> { fn peer_addr(&self) -> Option<net::SocketAddr> {
None None

View File

@ -7,6 +7,7 @@
* Update `rustls` to 0.16 * Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0 * Minimum required Rust version upped to 1.37.0
## [0.6.1] - 2019-09-25 ## [0.6.1] - 2019-09-25
### Added ### Added

View File

@ -33,7 +33,7 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
# uds # uds
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] # uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies] [dependencies]
actix-rt = "0.2.2" actix-rt = "0.2.2"
@ -45,28 +45,27 @@ num_cpus = "1.0"
mio = "0.6.19" mio = "0.6.19"
net2 = "0.2" net2 = "0.2"
futures = "0.1" futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
slab = "0.4" slab = "0.4"
tokio-io = "0.1" tokio = "0.2.0-alpha.4"
tokio-tcp = "0.1" tokio-io = "0.2.0-alpha.4"
tokio-timer = "0.2.8" tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
tokio-reactor = "0.1" tokio-timer = "0.3.0-alpha.4"
tokio-signal = "0.2"
# unix domain sockets # unix domain sockets
mio-uds = { version="0.6.7", optional = true } mio-uds = { version="0.6.7", optional = true }
tokio-uds = { version="0.2.5", optional = true } #tokio-uds = { version="0.2.5", optional = true }
# native-tls # native-tls
native-tls = { version="0.2", optional = true } native-tls = { version="0.2", optional = true }
# openssl # openssl
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.3", optional = true } tokio-openssl = { version="0.4.0-alpha.4", optional = true }
# rustls # rustls
rustls = { version = "0.16.0", optional = true } rustls = { version = "0.16.0", optional = true }
tokio-rustls = { version = "0.10.0", optional = true } tokio-rustls = { version = "0.12.0-alpha.2", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.17", optional = true } webpki-roots = { version = "0.17", optional = true }

View File

@ -3,15 +3,16 @@ use std::time::{Duration, Instant};
use std::{io, thread}; use std::{io, thread};
use actix_rt::System; use actix_rt::System;
use futures::future::{lazy, Future};
use log::{error, info}; use log::{error, info};
use slab::Slab; use slab::Slab;
use tokio_timer::Delay; use tokio_timer::delay;
use crate::server::Server; use crate::server::Server;
use crate::socket::{SocketAddr, SocketListener, StdListener}; use crate::socket::{SocketAddr, SocketListener, StdListener};
use crate::worker::{Conn, WorkerClient}; use crate::worker::{Conn, WorkerClient};
use crate::Token; use crate::Token;
use futures::{Future, FutureExt};
pub(crate) enum Command { pub(crate) enum Command {
Pause, Pause,
@ -440,14 +441,13 @@ impl Accept {
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone(); let r = self.timer.1.clone();
System::current().arbiter().send(lazy(move || { System::current().arbiter().send(
Delay::new(Instant::now() + Duration::from_millis(510)) async move {
.map_err(|_| ()) delay(Instant::now() + Duration::from_millis(510)).await;
.and_then(move |_| { r.set_readiness(mio::Ready::readable());
let _ = r.set_readiness(mio::Ready::readable()); }
Ok(()) .boxed(),
}) );
}));
return; return;
} }
} }

View File

@ -2,14 +2,14 @@ use std::time::Duration;
use std::{io, mem, net}; use std::{io, mem, net};
use actix_rt::{spawn, Arbiter, System}; use actix_rt::{spawn, Arbiter, System};
use futures::future::{lazy, ok}; use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::stream::futures_unordered; use futures::future::ready;
use futures::sync::mpsc::{unbounded, UnboundedReceiver}; use futures::stream::FuturesUnordered;
use futures::{Async, Future, Poll, Stream}; use futures::{ready, Future, FutureExt, Poll, Stream, StreamExt};
use log::{error, info}; use log::{error, info};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_tcp::TcpStream; use tokio_net::tcp::TcpStream;
use tokio_timer::sleep; use tokio_timer::sleep;
use crate::accept::{AcceptLoop, AcceptNotify, Command}; use crate::accept::{AcceptLoop, AcceptNotify, Command};
@ -20,6 +20,8 @@ use crate::signals::{Signal, Signals};
use crate::socket::StdListener; use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token}; use crate::{ssl, Token};
use std::pin::Pin;
use std::task::Context;
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
@ -320,10 +322,12 @@ impl ServerBuilder {
let services: Vec<Box<dyn InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new().send(lazy(move || { Arbiter::new().send(
Worker::start(rx1, rx2, services, avail, timeout); async move {
Ok::<_, ()>(()) Worker::start(rx1, rx2, services, avail, timeout);
})); }
.boxed(),
);
worker worker
} }
@ -381,31 +385,34 @@ impl ServerBuilder {
// stop workers // stop workers
if !self.workers.is_empty() && graceful { if !self.workers.is_empty() && graceful {
spawn( spawn(
futures_unordered( self.workers
self.workers .iter()
.iter() .map(move |worker| worker.1.stop(graceful))
.map(move |worker| worker.1.stop(graceful)), .collect::<FuturesUnordered<_>>()
) .collect::<Vec<_>>()
.collect() .then(move |_| {
.then(move |_| { if let Some(tx) = completion {
if let Some(tx) = completion { let _ = tx.send(());
let _ = tx.send(()); }
} if exit {
if exit { spawn(
spawn(sleep(Duration::from_millis(300)).then(|_| { async {
System::current().stop(); tokio_timer::sleep(Duration::from_millis(300))
ok(()) .await;
})); System::current().stop();
} }
ok(()) .boxed(),
}), );
}
ready(())
}),
) )
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
spawn(sleep(Duration::from_millis(300)).then(|_| { spawn(sleep(Duration::from_millis(300)).then(|_| {
System::current().stop(); System::current().stop();
ok(()) ready(())
})); }));
} }
if let Some(tx) = completion { if let Some(tx) = completion {
@ -447,15 +454,15 @@ impl ServerBuilder {
} }
impl Future for ServerBuilder { impl Future for ServerBuilder {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match self.cmd.poll() { match ready!(Pin::new(&mut self.cmd).poll_next(cx)) {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Some(it) => self.as_mut().get_mut().handle_cmd(it),
Ok(Async::NotReady) => return Ok(Async::NotReady), None => {
Ok(Async::Ready(Some(item))) => self.handle_cmd(item), return Poll::Pending;
}
} }
} }
} }

View File

@ -1,19 +1,20 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::{fmt, io, net}; use std::{fmt, io, net};
use actix_server_config::{Io, ServerConfig};
use actix_service::{IntoNewService, NewService};
use futures::future::{join_all, Future};
use log::error;
use tokio_tcp::TcpStream;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use actix_server_config::{Io, ServerConfig};
use actix_service::{IntoNewService, NewService, ServiceExt};
use futures::future::{join_all, Future, FutureExt, LocalBoxFuture, TryFutureExt};
use log::error;
use std::pin::Pin;
use tokio_net::tcp::TcpStream;
use super::builder::bind_addr; use super::builder::bind_addr;
use super::services::{ use super::services::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
}; };
use super::Token; use super::Token;
use std::process::Output;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
@ -108,50 +109,39 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
// configure services // configure services
let mut rt = ServiceRuntime::new(self.services.clone()); let mut rt = ServiceRuntime::new(self.services.clone());
self.rt.configure(&mut rt); self.rt.configure(&mut rt);
rt.validate(); rt.validate();
let services = rt.services; let mut names = self.names.clone();
// on start futures // construct services
if rt.onstart.is_empty() { async move {
// construct services let services = rt.services;
let mut fut = Vec::new(); // TODO: Proper error handling here
for (token, ns) in services { for f in rt.onstart.into_iter() {
let config = ServerConfig::new(self.names[&token].1); f.await;
fut.push(ns.new_service(&config).map(move |service| (token, service)));
} }
let mut res = vec![];
for (token, ns) in services.into_iter() {
let config = ServerConfig::new(names[&token].1);
Box::new(join_all(fut).map_err(|e| { let newserv = ns.new_service(&config);
error!("Can not construct service: {:?}", e); match newserv.await {
})) Ok(serv) => {
} else { res.push((token, serv));
let names = self.names.clone(); }
Err(e) => {
// run onstart future and then construct services error!("Can not construct service {:?}", e);
Box::new( return Err(e);
join_all(rt.onstart) }
.map_err(|e| { };
error!("Can not construct service: {:?}", e); }
}) return Ok(res);
.and_then(move |_| {
// construct services
let mut fut = Vec::new();
for (token, ns) in services {
let config = ServerConfig::new(names[&token].1);
fut.push(
ns.new_service(&config).map(move |service| (token, service)),
);
}
join_all(fut).map_err(|e| {
error!("Can not construct service: {:?}", e);
})
}),
)
} }
.boxed_local()
} }
} }
@ -181,7 +171,7 @@ fn not_configured(_: &mut ServiceRuntime) {
pub struct ServiceRuntime { pub struct ServiceRuntime {
names: HashMap<String, Token>, names: HashMap<String, Token>,
services: HashMap<Token, BoxedNewService>, services: HashMap<Token, BoxedNewService>,
onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>, onstart: Vec<LocalBoxFuture<'static, ()>>,
} }
impl ServiceRuntime { impl ServiceRuntime {
@ -215,12 +205,15 @@ impl ServiceRuntime {
{ {
// let name = name.to_owned(); // let name = name.to_owned();
if let Some(token) = self.names.get(name) { if let Some(token) = self.names.get(name) {
self.services.insert( self.services.insert(
token.clone(), token.clone(),
Box::new(ServiceFactory { Box::new(ServiceFactory {
inner: service.into_new_service(), inner: service.into_new_service(),
}), }),
); );
} else { } else {
panic!("Unknown service: {:?}", name); panic!("Unknown service: {:?}", name);
} }
@ -229,9 +222,9 @@ impl ServiceRuntime {
/// Execute future before services initialization. /// Execute future before services initialization.
pub fn on_start<F>(&mut self, fut: F) pub fn on_start<F>(&mut self, fut: F)
where where
F: Future<Item = (), Error = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
self.onstart.push(Box::new(fut)) self.onstart.push(fut.boxed_local())
} }
} }
@ -243,7 +236,7 @@ type BoxedNewService = Box<
InitError = (), InitError = (),
Config = ServerConfig, Config = ServerConfig,
Service = BoxedServerService, Service = BoxedServerService,
Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>, Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
>, >,
>; >;
@ -265,12 +258,18 @@ where
type InitError = (); type InitError = ();
type Config = ServerConfig; type Config = ServerConfig;
type Service = BoxedServerService; type Service = BoxedServerService;
type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>; type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
// Box<dyn Future<Output=Result<Vec<(Token, BoxedServerService)>, ()>>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| { let fut = self.inner.new_service(cfg);
let service: BoxedServerService = Box::new(StreamService::new(s)); async move {
service return match fut.await {
})) Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => Err(()),
};
}
.boxed_local()
} }
} }

View File

@ -1,7 +1,8 @@
use std::cell::Cell; use std::cell::Cell;
use std::rc::Rc; use std::rc::Rc;
use futures::task::AtomicTask; use futures::task::AtomicWaker;
use std::task;
#[derive(Clone)] #[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number /// Simple counter with ability to notify task on reaching specific number
@ -13,7 +14,7 @@ pub struct Counter(Rc<CounterInner>);
struct CounterInner { struct CounterInner {
count: Cell<usize>, count: Cell<usize>,
capacity: usize, capacity: usize,
task: AtomicTask, task: AtomicWaker,
} }
impl Counter { impl Counter {
@ -22,7 +23,7 @@ impl Counter {
Counter(Rc::new(CounterInner { Counter(Rc::new(CounterInner {
capacity, capacity,
count: Cell::new(0), count: Cell::new(0),
task: AtomicTask::new(), task: AtomicWaker::new(),
})) }))
} }
@ -31,8 +32,8 @@ impl Counter {
} }
/// Check if counter is not at capacity /// Check if counter is not at capacity
pub fn available(&self) -> bool { pub fn available(&self, cx: &mut task::Context) -> bool {
self.0.available() self.0.available(cx)
} }
/// Get total number of acquired counts /// Get total number of acquired counts
@ -66,14 +67,14 @@ impl CounterInner {
let num = self.count.get(); let num = self.count.get();
self.count.set(num - 1); self.count.set(num - 1);
if num == self.capacity { if num == self.capacity {
self.task.notify(); self.task.wake();
} }
} }
fn available(&self) -> bool { fn available(&self, cx: &mut task::Context) -> bool {
let avail = self.count.get() < self.capacity; let avail = self.count.get() < self.capacity;
if !avail { if !avail {
self.task.register(); self.task.register(cx.waker());
} }
avail avail
} }

View File

@ -1,6 +1,6 @@
use futures::sync::mpsc::UnboundedSender; use futures::channel::mpsc::UnboundedSender;
use futures::sync::oneshot; use futures::channel::oneshot;
use futures::Future; use futures::{Future, TryFutureExt};
use crate::builder::ServerBuilder; use crate::builder::ServerBuilder;
use crate::signals::Signal; use crate::signals::Signal;
@ -43,14 +43,14 @@ impl Server {
/// ///
/// If socket contains some pending connection, they might be dropped. /// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active. /// All opened connection remains active.
pub fn pause(&self) -> impl Future<Item = (), Error = ()> { pub fn pause(&self) -> impl Future<Output = Result<(), ()>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map_err(|_| ()) rx.map_err(|_| ())
} }
/// Resume accepting incoming connections /// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Item = (), Error = ()> { pub fn resume(&self) -> impl Future<Output = Result<(), ()>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map_err(|_| ()) rx.map_err(|_| ())
@ -59,7 +59,7 @@ impl Server {
/// Stop incoming connection processing, stop all workers and exit. /// Stop incoming connection processing, stop all workers and exit.
/// ///
/// If server starts with `spawn()` method, then spawned thread get terminated. /// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Item = (), Error = ()> { pub fn stop(&self, graceful: bool) -> impl Future<Output = Result<(), ()>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop { let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful, graceful,

View File

@ -4,14 +4,16 @@ use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
use actix_server_config::{Io, ServerConfig}; use actix_server_config::{Io, ServerConfig};
use actix_service::{NewService, Service}; use actix_service::{NewService, Service, ServiceExt};
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, LocalBoxFuture, Ready};
use futures::{Future, Poll}; use futures::{Future, FutureExt, Poll, StreamExt, TryFutureExt};
use log::error; use log::error;
use super::Token; use super::Token;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use crate::socket::{FromStream, StdStream}; use crate::socket::{FromStream, StdStream};
use std::pin::Pin;
use std::task::Context;
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
@ -34,7 +36,7 @@ pub(crate) trait InternalServiceFactory: Send {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>; fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
@ -42,7 +44,7 @@ pub(crate) type BoxedServerService = Box<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
Future = FutureResult<(), ()>, Future = Ready<Result<(), ()>>,
>, >,
>; >;
@ -66,12 +68,20 @@ where
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = FutureResult<(), ()>; type Future = Ready<Result<(), ()>>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ()) self.service.poll_ready().map_err(|_| ())
} }
*/
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
@ -80,10 +90,14 @@ where
}); });
if let Ok(stream) = stream { if let Ok(stream) = stream {
spawn(self.service.call(Io::new(stream)).then(move |res| { let f = self.service.call(Io::new(stream));
drop(guard); spawn(
res.map_err(|_| ()).map(|_| ()) async move {
})); f.await;
drop(guard);
}
.boxed_local(),
);
ok(()) ok(())
} else { } else {
err(()) err(())
@ -142,19 +156,19 @@ where
}) })
} }
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
let token = self.token; let token = self.token;
let config = ServerConfig::new(self.addr); let config = ServerConfig::new(self.addr);
Box::new(
self.inner self.inner
.create() .create()
.new_service(&config) .new_service(&config)
.map_err(|_| ()) .map_err(|_| ())
.map(move |inner| { .map_ok(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner)); let service: BoxedServerService = Box::new(StreamService::new(inner));
vec![(token, service)] vec![(token, service)]
}), })
) .boxed_local()
} }
} }
@ -167,7 +181,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
self.as_ref().create() self.as_ref().create()
} }
} }

View File

@ -1,10 +1,17 @@
use std::io; use std::io;
use actix_rt::spawn; use actix_rt::spawn;
use futures::stream::futures_unordered; use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream};
use futures::{Async, Future, Poll, Stream}; use futures::{
Future, FutureExt, Poll, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt,
};
use crate::server::Server; use crate::server::Server;
use actix_service::ServiceExt;
use futures::future::LocalBoxFuture;
use std::pin::Pin;
use std::task::Context;
use tokio_net::signal::unix::signal;
/// Different types of process signals /// Different types of process signals
#[derive(PartialEq, Clone, Copy, Debug)] #[derive(PartialEq, Clone, Copy, Debug)]
@ -27,14 +34,14 @@ pub(crate) struct Signals {
streams: Vec<SigStream>, streams: Vec<SigStream>,
} }
type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>; type SigStream = LocalBoxStream<'static, Result<Signal, io::Error>>;
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: Server) {
let fut = { let fut = {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
tokio_signal::ctrl_c() tokio_net::signal::ctrl_c()
.map_err(|_| ()) .map_err(|_| ())
.and_then(move |stream| Signals { .and_then(move |stream| Signals {
srv, srv,
@ -44,51 +51,79 @@ impl Signals {
#[cfg(unix)] #[cfg(unix)]
{ {
use tokio_signal::unix; use tokio_net::signal::unix;
let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> = let mut sigs: Vec<_> = Vec::new();
Vec::new();
sigs.push(Box::new( let mut SIG_MAP = [
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { (
let s: SigStream = Box::new(stream.map(|_| Signal::Int)); tokio_net::signal::unix::SignalKind::interrupt(),
s Signal::Int,
}), ),
)); (tokio_net::signal::unix::SignalKind::hangup(), Signal::Hup),
sigs.push(Box::new( (
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGHUP).map( tokio_net::signal::unix::SignalKind::terminate(),
|stream: unix::Signal| { Signal::Term,
),
(tokio_net::signal::unix::SignalKind::quit(), Signal::Quit),
];
for (kind, sig) in SIG_MAP.into_iter() {
let sig = sig.clone();
let fut = signal(*kind).unwrap();
sigs.push(fut.map(move |_| Ok(sig)).boxed_local());
}
/* TODO: Finish rewriting this
sigs.push(
tokio_net::signal::unix::signal(tokio_net::signal::si).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Int));
s
}).boxed()
);
sigs.push(
tokio_net::signal::unix::signal(tokio_net::signal::unix::SignalKind::hangup()).unwrap()
.map(|stream: unix::Signal| {
let s: SigStream = Box::new(stream.map(|_| Signal::Hup)); let s: SigStream = Box::new(stream.map(|_| Signal::Hup));
s s
}, }).boxed()
), );
)); sigs.push(
sigs.push(Box::new( tokio_net::signal::unix::signal(
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGTERM).map( tokio_net::signal::unix::SignalKind::terminate()
|stream| { ).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Term)); let s: SigStream = Box::new(stream.map(|_| Signal::Term));
s s
}, }).boxed(),
), );
)); sigs.push(
sigs.push(Box::new( tokio_net::signal::unix::signal(
tokio_signal::unix::Signal::new(tokio_signal::unix::SIGQUIT).map( tokio_net::signal::unix::SignalKind::quit()
|stream| { ).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Quit)); let s: SigStream = Box::new(stream.map(|_| Signal::Quit));
s s
}, }).boxed()
), );
)); */
futures_unordered(sigs)
.collect() Signals { srv, streams: sigs }
.map_err(|_| ())
.and_then(move |streams| Signals { srv, streams })
} }
}; };
spawn(fut); spawn(async {});
} }
} }
impl Future for Signals { impl Future for Signals {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
/*
type Item = (); type Item = ();
type Error = (); type Error = ();
@ -115,4 +150,5 @@ impl Future for Signals {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
*/
} }

View File

@ -1,8 +1,8 @@
use std::{fmt, io, net}; use std::{fmt, io, net};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_reactor::Handle; use tokio_net::driver::Handle;
use tokio_tcp::TcpStream; use tokio_net::tcp::TcpStream;
pub(crate) enum StdListener { pub(crate) enum StdListener {
Tcp(net::TcpListener), Tcp(net::TcpListener),

View File

@ -9,6 +9,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER; use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig}; use crate::{Io, Protocol, ServerConfig};
use std::pin::Pin;
use std::task::Context;
/// Support `SSL` connections via native-tls package /// Support `SSL` connections via native-tls package
/// ///
@ -72,6 +74,18 @@ impl<T: AsyncRead + AsyncWrite, P> Service for NativeTlsAcceptorService<T, P> {
type Error = Error; type Error = Error;
type Future = Accept<T, P>; type Future = Accept<T, P>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.conns.available() { if self.conns.available() {
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -79,6 +93,7 @@ impl<T: AsyncRead + AsyncWrite, P> Service for NativeTlsAcceptorService<T, P> {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
*/
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
let (io, params, _) = req.into_parts(); let (io, params, _) = req.into_parts();

View File

@ -1,14 +1,18 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use futures::{future::ok, future::Ready, Future, FutureExt, Poll};
use openssl::ssl::{HandshakeError, SslAcceptor}; use openssl::ssl::SslAcceptor;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; use tokio_openssl::{HandshakeError, SslStream};
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER; use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig}; use crate::{Io, Protocol, ServerConfig};
use futures::future::LocalBoxFuture;
use std::io;
use std::pin::Pin;
use std::task::Context;
/// Support `SSL` connections via openssl package /// Support `SSL` connections via openssl package
/// ///
@ -37,14 +41,14 @@ impl<T: AsyncRead + AsyncWrite, P> Clone for OpensslAcceptor<T, P> {
} }
} }
impl<T: AsyncRead + AsyncWrite, P> NewService for OpensslAcceptor<T, P> { impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> NewService for OpensslAcceptor<T, P> {
type Request = Io<T, P>; type Request = Io<T, P>;
type Response = Io<SslStream<T>, P>; type Response = Io<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
type Config = ServerConfig; type Config = ServerConfig;
type Service = OpensslAcceptorService<T, P>; type Service = OpensslAcceptorService<T, P>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure(); cfg.set_secure();
@ -65,12 +69,20 @@ pub struct OpensslAcceptorService<T, P> {
io: PhantomData<(T, P)>, io: PhantomData<(T, P)>,
} }
impl<T: AsyncRead + AsyncWrite, P> Service for OpensslAcceptorService<T, P> { impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> Service for OpensslAcceptorService<T, P> {
type Request = Io<T, P>; type Request = Io<T, P>;
type Response = Io<SslStream<T>, P>; type Response = Io<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
type Future = OpensslAcceptorServiceFut<T, P>; type Future = OpensslAcceptorServiceFut<T, P>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.conns.available() { if self.conns.available() {
Ok(Async::Ready(())) Ok(Async::Ready(()))
@ -78,12 +90,18 @@ impl<T: AsyncRead + AsyncWrite, P> Service for OpensslAcceptorService<T, P> {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
*/
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
let (io, params, _) = req.into_parts(); let (io, params, _) = req.into_parts();
let acc = self.acceptor.clone();
OpensslAcceptorServiceFut { OpensslAcceptorServiceFut {
_guard: self.conns.get(), _guard: self.conns.get(),
fut: SslAcceptorExt::accept_async(&self.acceptor, io), fut: async move {
let acc = acc;
tokio_openssl::accept(&acc, io).await
}
.boxed_local::<'static>(),
params: Some(params), params: Some(params),
} }
} }
@ -93,17 +111,24 @@ pub struct OpensslAcceptorServiceFut<T, P>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
fut: AcceptAsync<T>, fut: LocalBoxFuture<'static, Result<SslStream<T>, HandshakeError<T>>>,
params: Option<P>, params: Option<P>,
_guard: CounterGuard, _guard: CounterGuard,
} }
impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> { impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
type Output = Result<Io<SslStream<T>, P>, HandshakeError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
/*
type Item = Io<SslStream<T>, P>; type Item = Io<SslStream<T>, P>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = futures::try_ready!(self.fut.poll()); let io = futures::ready!(self.fut.poll())?;
let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() { let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() {
const H2: &[u8] = b"\x02h2"; const H2: &[u8] = b"\x02h2";
const HTTP10: &[u8] = b"\x08http/1.0"; const HTTP10: &[u8] = b"\x08http/1.0";
@ -127,4 +152,5 @@ impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
proto, proto,
))) )))
} }
*/
} }

View File

@ -11,6 +11,8 @@ use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor};
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER; use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig as SrvConfig}; use crate::{Io, Protocol, ServerConfig as SrvConfig};
use std::pin::Pin;
use std::task::Context;
/// Support `SSL` connections via rustls package /// Support `SSL` connections via rustls package
/// ///
@ -74,8 +76,11 @@ impl<T: AsyncRead + AsyncWrite, P> Service for RustlsAcceptorService<T, P> {
type Error = io::Error; type Error = io::Error;
type Future = RustlsAcceptorServiceFut<T, P>; type Future = RustlsAcceptorServiceFut<T, P>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(
if self.conns.available() { self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)

View File

@ -1,11 +1,11 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, time}; use std::{mem, task, time};
use actix_rt::{spawn, Arbiter}; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot;
use futures::sync::oneshot; use futures::{future, Future, Poll, Stream, TryFutureExt};
use futures::{future, Async, Future, Poll, Stream}; use futures::{FutureExt, StreamExt};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio_timer::{sleep, Delay}; use tokio_timer::{sleep, Delay};
@ -14,6 +14,10 @@ use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream}; use crate::socket::{SocketAddr, StdStream};
use crate::Token; use crate::Token;
use actix_rt::spawn;
use futures::future::{LocalBoxFuture, MapOk};
use std::pin::Pin;
use std::task::Context;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@ -153,31 +157,36 @@ impl Worker {
state: WorkerState::Unavailable(Vec::new()), state: WorkerState::Unavailable(Vec::new()),
}); });
let mut fut = Vec::new(); let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
for (idx, factory) in wrk.factories.iter().enumerate() { for (idx, factory) in wrk.factories.iter().enumerate() {
fut.push(factory.create().map(move |res| { fut.push(factory.create().map_ok(move |r| {
res.into_iter() r.into_iter()
.map(|(t, s)| (idx, t, s)) .map(|(t, s): (Token, _)| (idx, t, s))
.collect::<Vec<_>>() .collect::<Vec<_>>()
})); }));
} }
spawn( spawn(
future::join_all(fut) async {
.map_err(|e| { let mut res = future::join_all(fut).await;
error!("Can not start worker: {:?}", e); let res: Result<Vec<_>, _> = res.into_iter().collect();
Arbiter::current().stop(); match res {
}) Ok(services) => {
.and_then(move |services| { for item in services {
for item in services { for (idx, token, service) in item {
for (idx, token, service) in item { while token.0 >= wrk.services.len() {
while token.0 >= wrk.services.len() { wrk.services.push(None);
wrk.services.push(None); }
} }
wrk.services[token.0] = Some((idx, service));
} }
Ok::<_, ()>(wrk);
} }
wrk Err(e) => {
}), //return Err(e);
}
}
}
.boxed_local(),
); );
} }
@ -198,13 +207,18 @@ impl Worker {
} }
} }
fn check_readiness(&mut self, trace: bool) -> Result<bool, (Token, usize)> { fn check_readiness(
&mut self,
trace: bool,
cx: &mut Context<'_>,
) -> Result<bool, (Token, usize)> {
/*
let mut ready = self.conns.available(); let mut ready = self.conns.available();
let mut failed = None; let mut failed = None;
for (token, service) in &mut self.services.iter_mut().enumerate() { for (token, service) in &mut self.services.iter_mut().enumerate() {
if let Some(service) = service { if let Some(service) = service {
match service.1.poll_ready() { match service.1.poll_ready(cx) {
Ok(Async::Ready(_)) => { Poll::Ready(Ok(_)) => {
if trace { if trace {
trace!( trace!(
"Service {:?} is available", "Service {:?} is available",
@ -212,8 +226,8 @@ impl Worker {
); );
} }
} }
Ok(Async::NotReady) => ready = false, Poll::NotReady => ready = false,
Err(_) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[service.0].name(Token(token)) self.factories[service.0].name(Token(token))
@ -227,7 +241,8 @@ impl Worker {
Err(idx) Err(idx)
} else { } else {
Ok(ready) Ok(ready)
} }*/
unimplemented!()
} }
} }
@ -238,15 +253,19 @@ enum WorkerState {
Restarting( Restarting(
usize, usize,
Token, Token,
Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>, Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>,
), ),
Shutdown(Delay, Delay, oneshot::Sender<bool>), Shutdown(Delay, Delay, oneshot::Sender<bool>),
} }
impl Future for Worker { impl Future for Worker {
type Item = (); type Output = ();
type Error = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
/*
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// `StopWorker` message handler // `StopWorker` message handler
if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() { if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() {
@ -435,4 +454,5 @@ impl Future for Worker {
WorkerState::None => panic!(), WorkerState::None => panic!(),
}; };
} }
*/
} }

View File

@ -4,11 +4,12 @@ use std::{net, thread, time};
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_server::{Io, Server, ServerConfig}; use actix_server::{Io, Server, ServerConfig};
use actix_service::{new_service_cfg, service_fn, IntoService}; use actix_service::{new_service_cfg, service_fn, IntoService, ServiceExt};
use bytes::Bytes; use bytes::Bytes;
use futures::{Future, Sink}; use futures::{Future, FutureExt, Sink, SinkExt};
use net2::TcpBuilder; use net2::TcpBuilder;
use tokio_tcp::TcpStream; use tokio::future::ok;
use tokio_net::tcp::TcpStream;
fn unused_addr() -> net::SocketAddr { fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
@ -30,7 +31,7 @@ fn test_bind() {
.bind("test", addr, move || { .bind("test", addr, move || {
new_service_cfg(move |cfg: &ServerConfig| { new_service_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
}) })
}) })
.unwrap() .unwrap()
@ -54,7 +55,7 @@ fn test_bind_no_config() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let srv = Server::build() let srv = Server::build()
.bind("test", addr, move || service_fn(|_| Ok::<_, ()>(()))) .bind("test", addr, move || service_fn(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start(); .start();
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
@ -78,7 +79,7 @@ fn test_listen() {
.listen("test", lst, move || { .listen("test", lst, move || {
new_service_cfg(move |cfg: &ServerConfig| { new_service_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
}) })
}) })
.unwrap() .unwrap()
@ -107,14 +108,15 @@ fn test_start() {
.bind("test", addr, move || { .bind("test", addr, move || {
new_service_cfg(move |cfg: &ServerConfig| { new_service_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>(
(|io: Io<TcpStream>| { let serv_creator = (move |io: Io<TcpStream>| async {
Framed::new(io.into_parts().0, BytesCodec) panic!("Stream");
.send(Bytes::from_static(b"test")) let mut f = Framed::new(io.into_parts().0, BytesCodec);
.then(|_| Ok::<_, ()>(())) f.send(Bytes::from_static(b"test")).await.unwrap();
}) Ok::<_, ()>(())
.into_service(), }).into_service();
)
ok::<_, ()>(serv_creator)
}) })
}) })
.unwrap() .unwrap()
@ -125,7 +127,7 @@ fn test_start() {
}); });
let (srv, sys) = rx.recv().unwrap(); let (srv, sys) = rx.recv().unwrap();
let mut buf = [0u8; 4]; let mut buf = [1u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap(); let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf); let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]); assert_eq!(buf, b"test"[..]);

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-test-server" name = "actix-test-server"
version = "0.2.3" version = "0.2.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix test server" description = "Actix test server"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -11,11 +11,27 @@ categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018" edition = "2018"
workspace = ".."
[package.metadata.docs.rs]
features = ["ssl", "tls", "rust-tls"]
[lib] [lib]
name = "actix_test_server" name = "actix_test_server"
path = "src/lib.rs" path = "src/lib.rs"
[features]
default = []
# tls
tls = ["native-tls", "actix-server/tls"]
# openssl
ssl = ["openssl", "actix-server/ssl"]
# rustls
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
[dependencies] [dependencies]
actix-rt = "0.2.1" actix-rt = "0.2.1"
actix-server = "0.5.0" actix-server = "0.5.0"
@ -24,9 +40,21 @@ actix-testing = "0.1.0"
log = "0.4" log = "0.4"
net2 = "0.2" net2 = "0.2"
futures = "0.1" futures = { package = "futures-preview", version = "0.3.0-alpha.18"}
tokio-tcp = "0.1" tokio-io = "0.2.0-alpha.4"
tokio-reactor = "0.1" tokio-net = "0.2.0-alpha.4"
# native-tls
native-tls = { version="0.2", optional = true }
# openssl
openssl = { version="0.10", optional = true }
#rustls
rustls = { version = "^0.16", optional = true }
tokio-rustls = { version = "^0.12.0-alpha.2", optional = true }
webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.17", optional = true }
[dev-dependencies] [dev-dependencies]
actix-service = "0.4.0" actix-service = "0.4.0"

View File

@ -1,2 +1,149 @@
//! Various helpers for Actix applications to use during testing. //! Various helpers for Actix applications to use during testing.
pub use actix_testing::*; use std::sync::mpsc;
use std::{net, thread};
use actix_rt::{Runtime, System};
use actix_server::{Server, StreamServiceFactory};
pub use actix_server_config::{Io, ServerConfig};
use futures::future::{lazy, Future, IntoFuture};
use net2::TcpBuilder;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
/// The `TestServer` type.
///
/// `TestServer` is very simple test server that simplify process of writing
/// integration tests for actix-net applications.
///
/// # Examples
///
/// ```rust
/// use actix_service::{service_fn, IntoNewService};
/// use actix_test_server::TestServer;
///
/// fn main() {
/// let srv = TestServer::with(|| service_fn(
/// |sock| {
/// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(())
/// }
/// ));
///
/// println!("SOCKET: {:?}", srv.connect());
/// }
/// ```
pub struct TestServer;
/// Test server runstime
pub struct TestServerRuntime {
addr: net::SocketAddr,
host: String,
port: u16,
rt: Runtime,
}
impl TestServer {
/// Start new test server with application factory
pub fn with<F: StreamServiceFactory>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let sys = System::new("actix-test-server");
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
Server::build()
.listen("test", tcp, factory)?
.workers(1)
.disable_signals()
.start();
tx.send((System::current(), local_addr)).unwrap();
sys.run()
});
let (system, addr) = rx.recv().unwrap();
System::set_current(system);
let rt = Runtime::new().unwrap();
let host = format!("{}", addr.ip());
let port = addr.port();
TestServerRuntime {
addr,
rt,
host,
port,
}
}
/// Get firat available unused local address
pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = TcpBuilder::new_v4().unwrap();
socket.bind(&addr).unwrap();
socket.reuse_address(true).unwrap();
let tcp = socket.to_tcp_listener().unwrap();
tcp.local_addr().unwrap()
}
}
impl TestServerRuntime {
/// Execute future on current runtime
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
where
F: Future<Item = I, Error = E>,
{
self.rt.block_on(fut)
}
/// Runs the provided function, with runtime enabled.
pub fn run_on<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
where
F: FnOnce() -> R,
R: IntoFuture,
{
self.rt.block_on(lazy(|| f().into_future()))
}
/// Spawn future to the current runtime
pub fn spawn<F>(&mut self, fut: F)
where
F: Future<Item = (), Error = ()> + 'static,
{
self.rt.spawn(fut);
}
/// Test server host
pub fn host(&self) -> &str {
&self.host
}
/// Test server port
pub fn port(&self) -> u16 {
self.port
}
/// Get test server address
pub fn addr(&self) -> net::SocketAddr {
self.addr
}
/// Stop http server
fn stop(&mut self) {
System::current().stop();
}
/// Connect to server, return tokio TcpStream
pub fn connect(&self) -> std::io::Result<TcpStream> {
TcpStream::from_std(net::TcpStream::connect(self.addr)?, &Handle::default())
}
}
impl Drop for TestServerRuntime {
fn drop(&mut self) {
self.stop()
}
}