mirror of https://github.com/fafhrd91/actix-net
				
				
				
			Migrate actix-server to std::future
Still not finished, this is more WIP, this is an aggregation of several commits, which can be found in semtexzv/std-future-server-tmp branch
This commit is contained in:
		
							parent
							
								
									aa9bbe2114
								
							
						
					
					
						commit
						f3ce89f95a
					
				| 
						 | 
				
			
			@ -17,7 +17,7 @@ path = "src/lib.rs"
 | 
			
		|||
features = ["ssl", "rust-tls", "uds"]
 | 
			
		||||
 | 
			
		||||
[features]
 | 
			
		||||
default = []
 | 
			
		||||
default = ["ssl", "rustls" ]
 | 
			
		||||
 | 
			
		||||
# openssl
 | 
			
		||||
ssl = ["tokio-openssl"]
 | 
			
		||||
| 
						 | 
				
			
			@ -26,13 +26,14 @@ ssl = ["tokio-openssl"]
 | 
			
		|||
rust-tls = ["rustls", "tokio-rustls"]
 | 
			
		||||
 | 
			
		||||
# unix domain sockets
 | 
			
		||||
uds = ["tokio-uds"]
 | 
			
		||||
# TODO: FIXME
 | 
			
		||||
uds = [] # ["tokio-uds"]
 | 
			
		||||
 | 
			
		||||
[dependencies]
 | 
			
		||||
futures = "0.1.25"
 | 
			
		||||
tokio-io = "0.1.12"
 | 
			
		||||
tokio-tcp = "0.1"
 | 
			
		||||
tokio-openssl = { version="0.3.0", optional = true }
 | 
			
		||||
rustls = { version = "0.15.2", optional = true }
 | 
			
		||||
tokio-rustls = { version = "0.9.1", optional = true }
 | 
			
		||||
tokio-uds = { version="0.2.5", optional = true }
 | 
			
		||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
 | 
			
		||||
tokio = "0.2.0-alpha.4"
 | 
			
		||||
 | 
			
		||||
tokio-openssl = { version="0.4.0-alpha.4", optional = true }
 | 
			
		||||
rustls = { version = "0.16.0", optional = true }
 | 
			
		||||
tokio-rustls = { version = "0.12.0-alpha.2", optional = true }
 | 
			
		||||
#tokio-uds = { version="0.3.0-alpha.1", optional = true }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,8 +3,8 @@ use std::net::SocketAddr;
 | 
			
		|||
use std::rc::Rc;
 | 
			
		||||
use std::{fmt, io, net, time};
 | 
			
		||||
 | 
			
		||||
use tokio_io::{AsyncRead, AsyncWrite};
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
use tokio::io::{AsyncRead, AsyncWrite};
 | 
			
		||||
use tokio::net::tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
pub struct ServerConfig {
 | 
			
		||||
| 
						 | 
				
			
			@ -172,25 +172,25 @@ impl IoStream for TcpStream {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(any(feature = "ssl"))]
 | 
			
		||||
impl<T: IoStream> IoStream for tokio_openssl::SslStream<T> {
 | 
			
		||||
impl<T: IoStream + Unpin> IoStream for tokio_openssl::SslStream<T> {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn peer_addr(&self) -> Option<net::SocketAddr> {
 | 
			
		||||
        self.get_ref().get_ref().peer_addr()
 | 
			
		||||
        self.get_ref().peer_addr()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
 | 
			
		||||
        self.get_mut().get_mut().set_nodelay(nodelay)
 | 
			
		||||
        self.get_mut().set_nodelay(nodelay)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
 | 
			
		||||
        self.get_mut().get_mut().set_linger(dur)
 | 
			
		||||
        self.get_mut().set_linger(dur)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
 | 
			
		||||
        self.get_mut().get_mut().set_keepalive(dur)
 | 
			
		||||
        self.get_mut().set_keepalive(dur)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -218,7 +218,7 @@ impl<T: IoStream> IoStream for tokio_rustls::TlsStream<T, rustls::ServerSession>
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(all(unix, feature = "uds"))]
 | 
			
		||||
impl IoStream for tokio_uds::UnixStream {
 | 
			
		||||
impl IoStream for t::UnixStream {
 | 
			
		||||
    #[inline]
 | 
			
		||||
    fn peer_addr(&self) -> Option<net::SocketAddr> {
 | 
			
		||||
        None
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,12 +1,5 @@
 | 
			
		|||
# Changes
 | 
			
		||||
 | 
			
		||||
## [0.6.1] - 2019-09-25
 | 
			
		||||
 | 
			
		||||
### Added
 | 
			
		||||
 | 
			
		||||
* Add UDS listening support to `ServerBuilder`
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
## [0.6.0] - 2019-07-18
 | 
			
		||||
 | 
			
		||||
### Added
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
[package]
 | 
			
		||||
name = "actix-server"
 | 
			
		||||
version = "0.6.1"
 | 
			
		||||
version = "0.6.0"
 | 
			
		||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
 | 
			
		||||
description = "Actix server - General purpose tcp server"
 | 
			
		||||
keywords = ["network", "framework", "async", "futures"]
 | 
			
		||||
| 
						 | 
				
			
			@ -33,7 +33,7 @@ ssl = ["openssl", "tokio-openssl", "actix-server-config/ssl"]
 | 
			
		|||
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-config/rust-tls"]
 | 
			
		||||
 | 
			
		||||
# uds
 | 
			
		||||
uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
 | 
			
		||||
# uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
 | 
			
		||||
 | 
			
		||||
[dependencies]
 | 
			
		||||
actix-rt = "0.2.2"
 | 
			
		||||
| 
						 | 
				
			
			@ -45,30 +45,29 @@ num_cpus = "1.0"
 | 
			
		|||
 | 
			
		||||
mio = "0.6.19"
 | 
			
		||||
net2 = "0.2"
 | 
			
		||||
futures = "0.1"
 | 
			
		||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
 | 
			
		||||
slab = "0.4"
 | 
			
		||||
tokio-io = "0.1"
 | 
			
		||||
tokio-tcp = "0.1"
 | 
			
		||||
tokio-timer = "0.2.8"
 | 
			
		||||
tokio-reactor = "0.1"
 | 
			
		||||
tokio-signal = "0.2"
 | 
			
		||||
tokio = "0.2.0-alpha.4"
 | 
			
		||||
tokio-io = "0.2.0-alpha.4"
 | 
			
		||||
tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] }
 | 
			
		||||
tokio-timer = "0.3.0-alpha.4"
 | 
			
		||||
 | 
			
		||||
# unix domain sockets
 | 
			
		||||
mio-uds = { version="0.6.7", optional = true }
 | 
			
		||||
tokio-uds = { version="0.2.5", optional = true }
 | 
			
		||||
#tokio-uds = { version="0.2.5", optional = true }
 | 
			
		||||
 | 
			
		||||
# native-tls
 | 
			
		||||
native-tls = { version="0.2", optional = true }
 | 
			
		||||
 | 
			
		||||
# openssl
 | 
			
		||||
openssl = { version="0.10", optional = true }
 | 
			
		||||
tokio-openssl = { version="0.3", optional = true }
 | 
			
		||||
tokio-openssl = { version="0.4.0-alpha.4", optional = true }
 | 
			
		||||
 | 
			
		||||
# rustls
 | 
			
		||||
rustls = { version = "0.15.2", optional = true }
 | 
			
		||||
tokio-rustls = { version = "0.9.1", optional = true }
 | 
			
		||||
webpki = { version = "0.19", optional = true }
 | 
			
		||||
webpki-roots = { version = "0.16", optional = true }
 | 
			
		||||
rustls = { version = "0.16.0", optional = true }
 | 
			
		||||
tokio-rustls = { version = "0.12.0-alpha.2", optional = true }
 | 
			
		||||
webpki = { version = "0.21", optional = true }
 | 
			
		||||
webpki-roots = { version = "0.17", optional = true }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
bytes = "0.4"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,15 +3,16 @@ use std::time::{Duration, Instant};
 | 
			
		|||
use std::{io, thread};
 | 
			
		||||
 | 
			
		||||
use actix_rt::System;
 | 
			
		||||
use futures::future::{lazy, Future};
 | 
			
		||||
 | 
			
		||||
use log::{error, info};
 | 
			
		||||
use slab::Slab;
 | 
			
		||||
use tokio_timer::Delay;
 | 
			
		||||
use tokio_timer::delay;
 | 
			
		||||
 | 
			
		||||
use crate::server::Server;
 | 
			
		||||
use crate::socket::{SocketAddr, SocketListener, StdListener};
 | 
			
		||||
use crate::worker::{Conn, WorkerClient};
 | 
			
		||||
use crate::Token;
 | 
			
		||||
use futures::{Future, FutureExt};
 | 
			
		||||
 | 
			
		||||
pub(crate) enum Command {
 | 
			
		||||
    Pause,
 | 
			
		||||
| 
						 | 
				
			
			@ -440,14 +441,13 @@ impl Accept {
 | 
			
		|||
                        info.timeout = Some(Instant::now() + Duration::from_millis(500));
 | 
			
		||||
 | 
			
		||||
                        let r = self.timer.1.clone();
 | 
			
		||||
                        System::current().arbiter().send(lazy(move || {
 | 
			
		||||
                            Delay::new(Instant::now() + Duration::from_millis(510))
 | 
			
		||||
                                .map_err(|_| ())
 | 
			
		||||
                                .and_then(move |_| {
 | 
			
		||||
                                    let _ = r.set_readiness(mio::Ready::readable());
 | 
			
		||||
                                    Ok(())
 | 
			
		||||
                                })
 | 
			
		||||
                        }));
 | 
			
		||||
                        System::current().arbiter().send(
 | 
			
		||||
                            async move {
 | 
			
		||||
                                delay(Instant::now() + Duration::from_millis(510)).await;
 | 
			
		||||
                                r.set_readiness(mio::Ready::readable());
 | 
			
		||||
                            }
 | 
			
		||||
                                .boxed(),
 | 
			
		||||
                        );
 | 
			
		||||
                        return;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,14 +2,14 @@ use std::time::Duration;
 | 
			
		|||
use std::{io, mem, net};
 | 
			
		||||
 | 
			
		||||
use actix_rt::{spawn, Arbiter, System};
 | 
			
		||||
use futures::future::{lazy, ok};
 | 
			
		||||
use futures::stream::futures_unordered;
 | 
			
		||||
use futures::sync::mpsc::{unbounded, UnboundedReceiver};
 | 
			
		||||
use futures::{Async, Future, Poll, Stream};
 | 
			
		||||
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
 | 
			
		||||
use futures::future::ready;
 | 
			
		||||
use futures::stream::FuturesUnordered;
 | 
			
		||||
use futures::{ready, Future, FutureExt, Poll, Stream, StreamExt};
 | 
			
		||||
use log::{error, info};
 | 
			
		||||
use net2::TcpBuilder;
 | 
			
		||||
use num_cpus;
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
use tokio_net::tcp::TcpStream;
 | 
			
		||||
use tokio_timer::sleep;
 | 
			
		||||
 | 
			
		||||
use crate::accept::{AcceptLoop, AcceptNotify, Command};
 | 
			
		||||
| 
						 | 
				
			
			@ -20,6 +20,8 @@ use crate::signals::{Signal, Signals};
 | 
			
		|||
use crate::socket::StdListener;
 | 
			
		||||
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
 | 
			
		||||
use crate::{ssl, Token};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
/// Server builder
 | 
			
		||||
pub struct ServerBuilder {
 | 
			
		||||
| 
						 | 
				
			
			@ -320,10 +322,12 @@ impl ServerBuilder {
 | 
			
		|||
        let services: Vec<Box<dyn InternalServiceFactory>> =
 | 
			
		||||
            self.services.iter().map(|v| v.clone_factory()).collect();
 | 
			
		||||
 | 
			
		||||
        Arbiter::new().send(lazy(move || {
 | 
			
		||||
            Worker::start(rx1, rx2, services, avail, timeout);
 | 
			
		||||
            Ok::<_, ()>(())
 | 
			
		||||
        }));
 | 
			
		||||
        Arbiter::new().send(
 | 
			
		||||
            async move {
 | 
			
		||||
                Worker::start(rx1, rx2, services, avail, timeout);
 | 
			
		||||
            }
 | 
			
		||||
                .boxed(),
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        worker
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -381,31 +385,34 @@ impl ServerBuilder {
 | 
			
		|||
                // stop workers
 | 
			
		||||
                if !self.workers.is_empty() && graceful {
 | 
			
		||||
                    spawn(
 | 
			
		||||
                        futures_unordered(
 | 
			
		||||
                            self.workers
 | 
			
		||||
                                .iter()
 | 
			
		||||
                                .map(move |worker| worker.1.stop(graceful)),
 | 
			
		||||
                        )
 | 
			
		||||
                        .collect()
 | 
			
		||||
                        .then(move |_| {
 | 
			
		||||
                            if let Some(tx) = completion {
 | 
			
		||||
                                let _ = tx.send(());
 | 
			
		||||
                            }
 | 
			
		||||
                            if exit {
 | 
			
		||||
                                spawn(sleep(Duration::from_millis(300)).then(|_| {
 | 
			
		||||
                                    System::current().stop();
 | 
			
		||||
                                    ok(())
 | 
			
		||||
                                }));
 | 
			
		||||
                            }
 | 
			
		||||
                            ok(())
 | 
			
		||||
                        }),
 | 
			
		||||
                        self.workers
 | 
			
		||||
                            .iter()
 | 
			
		||||
                            .map(move |worker| worker.1.stop(graceful))
 | 
			
		||||
                            .collect::<FuturesUnordered<_>>()
 | 
			
		||||
                            .collect::<Vec<_>>()
 | 
			
		||||
                            .then(move |_| {
 | 
			
		||||
                                if let Some(tx) = completion {
 | 
			
		||||
                                    let _ = tx.send(());
 | 
			
		||||
                                }
 | 
			
		||||
                                if exit {
 | 
			
		||||
                                    spawn(
 | 
			
		||||
                                        async {
 | 
			
		||||
                                            tokio_timer::sleep(Duration::from_millis(300))
 | 
			
		||||
                                                .await;
 | 
			
		||||
                                            System::current().stop();
 | 
			
		||||
                                        }
 | 
			
		||||
                                            .boxed(),
 | 
			
		||||
                                    );
 | 
			
		||||
                                }
 | 
			
		||||
                                ready(())
 | 
			
		||||
                            }),
 | 
			
		||||
                    )
 | 
			
		||||
                } else {
 | 
			
		||||
                    // we need to stop system if server was spawned
 | 
			
		||||
                    if self.exit {
 | 
			
		||||
                        spawn(sleep(Duration::from_millis(300)).then(|_| {
 | 
			
		||||
                            System::current().stop();
 | 
			
		||||
                            ok(())
 | 
			
		||||
                            ready(())
 | 
			
		||||
                        }));
 | 
			
		||||
                    }
 | 
			
		||||
                    if let Some(tx) = completion {
 | 
			
		||||
| 
						 | 
				
			
			@ -447,15 +454,15 @@ impl ServerBuilder {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
impl Future for ServerBuilder {
 | 
			
		||||
    type Item = ();
 | 
			
		||||
    type Error = ();
 | 
			
		||||
    type Output = ();
 | 
			
		||||
 | 
			
		||||
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 | 
			
		||||
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 | 
			
		||||
        loop {
 | 
			
		||||
            match self.cmd.poll() {
 | 
			
		||||
                Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
 | 
			
		||||
                Ok(Async::NotReady) => return Ok(Async::NotReady),
 | 
			
		||||
                Ok(Async::Ready(Some(item))) => self.handle_cmd(item),
 | 
			
		||||
            match ready!(Pin::new(&mut self.cmd).poll_next(cx)) {
 | 
			
		||||
                Some(it) => self.as_mut().get_mut().handle_cmd(it),
 | 
			
		||||
                None => {
 | 
			
		||||
                    return Poll::Pending;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,19 +1,20 @@
 | 
			
		|||
use std::collections::HashMap;
 | 
			
		||||
use std::{fmt, io, net};
 | 
			
		||||
 | 
			
		||||
use actix_server_config::{Io, ServerConfig};
 | 
			
		||||
use actix_service::{IntoNewService, NewService};
 | 
			
		||||
use futures::future::{join_all, Future};
 | 
			
		||||
use log::error;
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
use crate::counter::CounterGuard;
 | 
			
		||||
use actix_server_config::{Io, ServerConfig};
 | 
			
		||||
use actix_service::{IntoNewService, NewService, ServiceExt};
 | 
			
		||||
use futures::future::{join_all, Future, FutureExt, LocalBoxFuture, TryFutureExt};
 | 
			
		||||
use log::error;
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use tokio_net::tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
use super::builder::bind_addr;
 | 
			
		||||
use super::services::{
 | 
			
		||||
    BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
 | 
			
		||||
};
 | 
			
		||||
use super::Token;
 | 
			
		||||
use std::process::Output;
 | 
			
		||||
 | 
			
		||||
pub struct ServiceConfig {
 | 
			
		||||
    pub(crate) services: Vec<(String, net::TcpListener)>,
 | 
			
		||||
| 
						 | 
				
			
			@ -108,50 +109,39 @@ impl InternalServiceFactory for ConfiguredService {
 | 
			
		|||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
 | 
			
		||||
    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
 | 
			
		||||
        // configure services
 | 
			
		||||
        let mut rt = ServiceRuntime::new(self.services.clone());
 | 
			
		||||
        self.rt.configure(&mut rt);
 | 
			
		||||
        rt.validate();
 | 
			
		||||
 | 
			
		||||
        let services = rt.services;
 | 
			
		||||
        let mut names = self.names.clone();
 | 
			
		||||
 | 
			
		||||
        // on start futures
 | 
			
		||||
        if rt.onstart.is_empty() {
 | 
			
		||||
            // construct services
 | 
			
		||||
            let mut fut = Vec::new();
 | 
			
		||||
            for (token, ns) in services {
 | 
			
		||||
                let config = ServerConfig::new(self.names[&token].1);
 | 
			
		||||
                fut.push(ns.new_service(&config).map(move |service| (token, service)));
 | 
			
		||||
        // construct services
 | 
			
		||||
        async move {
 | 
			
		||||
            let services = rt.services;
 | 
			
		||||
            // TODO: Proper error handling here
 | 
			
		||||
            for f in rt.onstart.into_iter() {
 | 
			
		||||
                f.await;
 | 
			
		||||
            }
 | 
			
		||||
            let mut res = vec![];
 | 
			
		||||
            for (token, ns) in services.into_iter() {
 | 
			
		||||
                let config = ServerConfig::new(names[&token].1);
 | 
			
		||||
 | 
			
		||||
            Box::new(join_all(fut).map_err(|e| {
 | 
			
		||||
                error!("Can not construct service: {:?}", e);
 | 
			
		||||
            }))
 | 
			
		||||
        } else {
 | 
			
		||||
            let names = self.names.clone();
 | 
			
		||||
 | 
			
		||||
            // run onstart future and then construct services
 | 
			
		||||
            Box::new(
 | 
			
		||||
                join_all(rt.onstart)
 | 
			
		||||
                    .map_err(|e| {
 | 
			
		||||
                        error!("Can not construct service: {:?}", e);
 | 
			
		||||
                    })
 | 
			
		||||
                    .and_then(move |_| {
 | 
			
		||||
                        // construct services
 | 
			
		||||
                        let mut fut = Vec::new();
 | 
			
		||||
                        for (token, ns) in services {
 | 
			
		||||
                            let config = ServerConfig::new(names[&token].1);
 | 
			
		||||
                            fut.push(
 | 
			
		||||
                                ns.new_service(&config).map(move |service| (token, service)),
 | 
			
		||||
                            );
 | 
			
		||||
                        }
 | 
			
		||||
                        join_all(fut).map_err(|e| {
 | 
			
		||||
                            error!("Can not construct service: {:?}", e);
 | 
			
		||||
                        })
 | 
			
		||||
                    }),
 | 
			
		||||
            )
 | 
			
		||||
                let newserv = ns.new_service(&config);
 | 
			
		||||
                match newserv.await {
 | 
			
		||||
                    Ok(serv) => {
 | 
			
		||||
                        res.push((token, serv));
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        error!("Can not construct service {:?}", e);
 | 
			
		||||
                        return Err(e);
 | 
			
		||||
                    }
 | 
			
		||||
                };
 | 
			
		||||
            }
 | 
			
		||||
            return Ok(res);
 | 
			
		||||
        }
 | 
			
		||||
            .boxed_local()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -181,7 +171,7 @@ fn not_configured(_: &mut ServiceRuntime) {
 | 
			
		|||
pub struct ServiceRuntime {
 | 
			
		||||
    names: HashMap<String, Token>,
 | 
			
		||||
    services: HashMap<Token, BoxedNewService>,
 | 
			
		||||
    onstart: Vec<Box<dyn Future<Item = (), Error = ()>>>,
 | 
			
		||||
    onstart: Vec<LocalBoxFuture<'static, ()>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl ServiceRuntime {
 | 
			
		||||
| 
						 | 
				
			
			@ -215,12 +205,15 @@ impl ServiceRuntime {
 | 
			
		|||
    {
 | 
			
		||||
        // let name = name.to_owned();
 | 
			
		||||
        if let Some(token) = self.names.get(name) {
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            self.services.insert(
 | 
			
		||||
                token.clone(),
 | 
			
		||||
                Box::new(ServiceFactory {
 | 
			
		||||
                    inner: service.into_new_service(),
 | 
			
		||||
                }),
 | 
			
		||||
            );
 | 
			
		||||
                 token.clone(),
 | 
			
		||||
                 Box::new(ServiceFactory {
 | 
			
		||||
                     inner: service.into_new_service(),
 | 
			
		||||
                 }),
 | 
			
		||||
             );
 | 
			
		||||
 | 
			
		||||
        } else {
 | 
			
		||||
            panic!("Unknown service: {:?}", name);
 | 
			
		||||
        }
 | 
			
		||||
| 
						 | 
				
			
			@ -229,9 +222,9 @@ impl ServiceRuntime {
 | 
			
		|||
    /// Execute future before services initialization.
 | 
			
		||||
    pub fn on_start<F>(&mut self, fut: F)
 | 
			
		||||
    where
 | 
			
		||||
        F: Future<Item = (), Error = ()> + 'static,
 | 
			
		||||
        F: Future<Output = ()> + 'static,
 | 
			
		||||
    {
 | 
			
		||||
        self.onstart.push(Box::new(fut))
 | 
			
		||||
        self.onstart.push(fut.boxed_local())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -243,7 +236,7 @@ type BoxedNewService = Box<
 | 
			
		|||
        InitError = (),
 | 
			
		||||
        Config = ServerConfig,
 | 
			
		||||
        Service = BoxedServerService,
 | 
			
		||||
        Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>,
 | 
			
		||||
        Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
 | 
			
		||||
    >,
 | 
			
		||||
>;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -265,12 +258,18 @@ where
 | 
			
		|||
    type InitError = ();
 | 
			
		||||
    type Config = ServerConfig;
 | 
			
		||||
    type Service = BoxedServerService;
 | 
			
		||||
    type Future = Box<dyn Future<Item = BoxedServerService, Error = ()>>;
 | 
			
		||||
    type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
 | 
			
		||||
 | 
			
		||||
    // Box<dyn Future<Output=Result<Vec<(Token, BoxedServerService)>, ()>>>;
 | 
			
		||||
 | 
			
		||||
    fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
 | 
			
		||||
        Box::new(self.inner.new_service(cfg).map_err(|_| ()).map(|s| {
 | 
			
		||||
            let service: BoxedServerService = Box::new(StreamService::new(s));
 | 
			
		||||
            service
 | 
			
		||||
        }))
 | 
			
		||||
        let fut = self.inner.new_service(cfg);
 | 
			
		||||
        async move {
 | 
			
		||||
            return match fut.await {
 | 
			
		||||
                Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
 | 
			
		||||
                Err(e) => Err(()),
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
            .boxed_local()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,8 @@
 | 
			
		|||
use std::cell::Cell;
 | 
			
		||||
use std::rc::Rc;
 | 
			
		||||
 | 
			
		||||
use futures::task::AtomicTask;
 | 
			
		||||
use futures::task::AtomicWaker;
 | 
			
		||||
use std::task;
 | 
			
		||||
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
/// Simple counter with ability to notify task on reaching specific number
 | 
			
		||||
| 
						 | 
				
			
			@ -13,7 +14,7 @@ pub struct Counter(Rc<CounterInner>);
 | 
			
		|||
struct CounterInner {
 | 
			
		||||
    count: Cell<usize>,
 | 
			
		||||
    capacity: usize,
 | 
			
		||||
    task: AtomicTask,
 | 
			
		||||
    task: AtomicWaker,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Counter {
 | 
			
		||||
| 
						 | 
				
			
			@ -22,7 +23,7 @@ impl Counter {
 | 
			
		|||
        Counter(Rc::new(CounterInner {
 | 
			
		||||
            capacity,
 | 
			
		||||
            count: Cell::new(0),
 | 
			
		||||
            task: AtomicTask::new(),
 | 
			
		||||
            task: AtomicWaker::new(),
 | 
			
		||||
        }))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -31,8 +32,8 @@ impl Counter {
 | 
			
		|||
    }
 | 
			
		||||
 | 
			
		||||
    /// Check if counter is not at capacity
 | 
			
		||||
    pub fn available(&self) -> bool {
 | 
			
		||||
        self.0.available()
 | 
			
		||||
    pub fn available(&self, cx: &mut task::Context) -> bool {
 | 
			
		||||
        self.0.available(cx)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get total number of acquired counts
 | 
			
		||||
| 
						 | 
				
			
			@ -66,14 +67,14 @@ impl CounterInner {
 | 
			
		|||
        let num = self.count.get();
 | 
			
		||||
        self.count.set(num - 1);
 | 
			
		||||
        if num == self.capacity {
 | 
			
		||||
            self.task.notify();
 | 
			
		||||
            self.task.wake();
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn available(&self) -> bool {
 | 
			
		||||
    fn available(&self, cx: &mut task::Context) -> bool {
 | 
			
		||||
        let avail = self.count.get() < self.capacity;
 | 
			
		||||
        if !avail {
 | 
			
		||||
            self.task.register();
 | 
			
		||||
            self.task.register(cx.waker());
 | 
			
		||||
        }
 | 
			
		||||
        avail
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
use futures::sync::mpsc::UnboundedSender;
 | 
			
		||||
use futures::sync::oneshot;
 | 
			
		||||
use futures::Future;
 | 
			
		||||
use futures::channel::mpsc::UnboundedSender;
 | 
			
		||||
use futures::channel::oneshot;
 | 
			
		||||
use futures::{Future, TryFutureExt};
 | 
			
		||||
 | 
			
		||||
use crate::builder::ServerBuilder;
 | 
			
		||||
use crate::signals::Signal;
 | 
			
		||||
| 
						 | 
				
			
			@ -43,14 +43,14 @@ impl Server {
 | 
			
		|||
    ///
 | 
			
		||||
    /// If socket contains some pending connection, they might be dropped.
 | 
			
		||||
    /// All opened connection remains active.
 | 
			
		||||
    pub fn pause(&self) -> impl Future<Item = (), Error = ()> {
 | 
			
		||||
    pub fn pause(&self) -> impl Future<Output = Result<(), ()>> {
 | 
			
		||||
        let (tx, rx) = oneshot::channel();
 | 
			
		||||
        let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
 | 
			
		||||
        rx.map_err(|_| ())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Resume accepting incoming connections
 | 
			
		||||
    pub fn resume(&self) -> impl Future<Item = (), Error = ()> {
 | 
			
		||||
    pub fn resume(&self) -> impl Future<Output = Result<(), ()>> {
 | 
			
		||||
        let (tx, rx) = oneshot::channel();
 | 
			
		||||
        let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
 | 
			
		||||
        rx.map_err(|_| ())
 | 
			
		||||
| 
						 | 
				
			
			@ -59,7 +59,7 @@ impl Server {
 | 
			
		|||
    /// Stop incoming connection processing, stop all workers and exit.
 | 
			
		||||
    ///
 | 
			
		||||
    /// If server starts with `spawn()` method, then spawned thread get terminated.
 | 
			
		||||
    pub fn stop(&self, graceful: bool) -> impl Future<Item = (), Error = ()> {
 | 
			
		||||
    pub fn stop(&self, graceful: bool) -> impl Future<Output = Result<(), ()>> {
 | 
			
		||||
        let (tx, rx) = oneshot::channel();
 | 
			
		||||
        let _ = self.0.unbounded_send(ServerCommand::Stop {
 | 
			
		||||
            graceful,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,14 +4,16 @@ use std::time::Duration;
 | 
			
		|||
 | 
			
		||||
use actix_rt::spawn;
 | 
			
		||||
use actix_server_config::{Io, ServerConfig};
 | 
			
		||||
use actix_service::{NewService, Service};
 | 
			
		||||
use futures::future::{err, ok, FutureResult};
 | 
			
		||||
use futures::{Future, Poll};
 | 
			
		||||
use actix_service::{NewService, Service, ServiceExt};
 | 
			
		||||
use futures::future::{err, ok, LocalBoxFuture, Ready};
 | 
			
		||||
use futures::{Future, FutureExt, Poll, StreamExt, TryFutureExt};
 | 
			
		||||
use log::error;
 | 
			
		||||
 | 
			
		||||
use super::Token;
 | 
			
		||||
use crate::counter::CounterGuard;
 | 
			
		||||
use crate::socket::{FromStream, StdStream};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
/// Server message
 | 
			
		||||
pub(crate) enum ServerMessage {
 | 
			
		||||
| 
						 | 
				
			
			@ -34,7 +36,7 @@ pub(crate) trait InternalServiceFactory: Send {
 | 
			
		|||
 | 
			
		||||
    fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
 | 
			
		||||
 | 
			
		||||
    fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
 | 
			
		||||
    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub(crate) type BoxedServerService = Box<
 | 
			
		||||
| 
						 | 
				
			
			@ -42,7 +44,7 @@ pub(crate) type BoxedServerService = Box<
 | 
			
		|||
        Request = (Option<CounterGuard>, ServerMessage),
 | 
			
		||||
        Response = (),
 | 
			
		||||
        Error = (),
 | 
			
		||||
        Future = FutureResult<(), ()>,
 | 
			
		||||
        Future = Ready<Result<(), ()>>,
 | 
			
		||||
    >,
 | 
			
		||||
>;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -66,12 +68,20 @@ where
 | 
			
		|||
    type Request = (Option<CounterGuard>, ServerMessage);
 | 
			
		||||
    type Response = ();
 | 
			
		||||
    type Error = ();
 | 
			
		||||
    type Future = FutureResult<(), ()>;
 | 
			
		||||
    type Future = Ready<Result<(), ()>>;
 | 
			
		||||
 | 
			
		||||
    fn poll_ready(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        ctx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<(), Self::Error>> {
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
 | 
			
		||||
        self.service.poll_ready().map_err(|_| ())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    */
 | 
			
		||||
    fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
 | 
			
		||||
        match req {
 | 
			
		||||
            ServerMessage::Connect(stream) => {
 | 
			
		||||
| 
						 | 
				
			
			@ -80,10 +90,14 @@ where
 | 
			
		|||
                });
 | 
			
		||||
 | 
			
		||||
                if let Ok(stream) = stream {
 | 
			
		||||
                    spawn(self.service.call(Io::new(stream)).then(move |res| {
 | 
			
		||||
                        drop(guard);
 | 
			
		||||
                        res.map_err(|_| ()).map(|_| ())
 | 
			
		||||
                    }));
 | 
			
		||||
                    let f = self.service.call(Io::new(stream));
 | 
			
		||||
                    spawn(
 | 
			
		||||
                        async move {
 | 
			
		||||
                            f.await;
 | 
			
		||||
                            drop(guard);
 | 
			
		||||
                        }
 | 
			
		||||
                            .boxed_local(),
 | 
			
		||||
                    );
 | 
			
		||||
                    ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    err(())
 | 
			
		||||
| 
						 | 
				
			
			@ -142,19 +156,19 @@ where
 | 
			
		|||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
 | 
			
		||||
    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
 | 
			
		||||
        let token = self.token;
 | 
			
		||||
        let config = ServerConfig::new(self.addr);
 | 
			
		||||
        Box::new(
 | 
			
		||||
            self.inner
 | 
			
		||||
                .create()
 | 
			
		||||
                .new_service(&config)
 | 
			
		||||
                .map_err(|_| ())
 | 
			
		||||
                .map(move |inner| {
 | 
			
		||||
                    let service: BoxedServerService = Box::new(StreamService::new(inner));
 | 
			
		||||
                    vec![(token, service)]
 | 
			
		||||
                }),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self.inner
 | 
			
		||||
            .create()
 | 
			
		||||
            .new_service(&config)
 | 
			
		||||
            .map_err(|_| ())
 | 
			
		||||
            .map_ok(move |inner| {
 | 
			
		||||
                let service: BoxedServerService = Box::new(StreamService::new(inner));
 | 
			
		||||
                vec![(token, service)]
 | 
			
		||||
            })
 | 
			
		||||
            .boxed_local()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -167,7 +181,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
 | 
			
		|||
        self.as_ref().clone_factory()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn create(&self) -> Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
 | 
			
		||||
    fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
 | 
			
		||||
        self.as_ref().create()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,10 +1,17 @@
 | 
			
		|||
use std::io;
 | 
			
		||||
 | 
			
		||||
use actix_rt::spawn;
 | 
			
		||||
use futures::stream::futures_unordered;
 | 
			
		||||
use futures::{Async, Future, Poll, Stream};
 | 
			
		||||
use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream};
 | 
			
		||||
use futures::{
 | 
			
		||||
    Future, FutureExt, Poll, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use crate::server::Server;
 | 
			
		||||
use actix_service::ServiceExt;
 | 
			
		||||
use futures::future::LocalBoxFuture;
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
use tokio_net::signal::unix::signal;
 | 
			
		||||
 | 
			
		||||
/// Different types of process signals
 | 
			
		||||
#[derive(PartialEq, Clone, Copy, Debug)]
 | 
			
		||||
| 
						 | 
				
			
			@ -27,14 +34,14 @@ pub(crate) struct Signals {
 | 
			
		|||
    streams: Vec<SigStream>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type SigStream = Box<dyn Stream<Item = Signal, Error = io::Error>>;
 | 
			
		||||
type SigStream = LocalBoxStream<'static, Result<Signal, io::Error>>;
 | 
			
		||||
 | 
			
		||||
impl Signals {
 | 
			
		||||
    pub(crate) fn start(srv: Server) {
 | 
			
		||||
        let fut = {
 | 
			
		||||
            #[cfg(not(unix))]
 | 
			
		||||
            {
 | 
			
		||||
                tokio_signal::ctrl_c()
 | 
			
		||||
                tokio_net::signal::ctrl_c()
 | 
			
		||||
                    .map_err(|_| ())
 | 
			
		||||
                    .and_then(move |stream| Signals {
 | 
			
		||||
                        srv,
 | 
			
		||||
| 
						 | 
				
			
			@ -44,51 +51,79 @@ impl Signals {
 | 
			
		|||
 | 
			
		||||
            #[cfg(unix)]
 | 
			
		||||
            {
 | 
			
		||||
                use tokio_signal::unix;
 | 
			
		||||
                use tokio_net::signal::unix;
 | 
			
		||||
 | 
			
		||||
                let mut sigs: Vec<Box<dyn Future<Item = SigStream, Error = io::Error>>> =
 | 
			
		||||
                    Vec::new();
 | 
			
		||||
                sigs.push(Box::new(
 | 
			
		||||
                    tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| {
 | 
			
		||||
                        let s: SigStream = Box::new(stream.map(|_| Signal::Int));
 | 
			
		||||
                        s
 | 
			
		||||
                    }),
 | 
			
		||||
                ));
 | 
			
		||||
                sigs.push(Box::new(
 | 
			
		||||
                    tokio_signal::unix::Signal::new(tokio_signal::unix::SIGHUP).map(
 | 
			
		||||
                        |stream: unix::Signal| {
 | 
			
		||||
                let mut sigs: Vec<_> = Vec::new();
 | 
			
		||||
 | 
			
		||||
                let mut SIG_MAP = [
 | 
			
		||||
                    (
 | 
			
		||||
                        tokio_net::signal::unix::SignalKind::interrupt(),
 | 
			
		||||
                        Signal::Int,
 | 
			
		||||
                    ),
 | 
			
		||||
                    (tokio_net::signal::unix::SignalKind::hangup(), Signal::Hup),
 | 
			
		||||
                    (
 | 
			
		||||
                        tokio_net::signal::unix::SignalKind::terminate(),
 | 
			
		||||
                        Signal::Term,
 | 
			
		||||
                    ),
 | 
			
		||||
                    (tokio_net::signal::unix::SignalKind::quit(), Signal::Quit),
 | 
			
		||||
                ];
 | 
			
		||||
 | 
			
		||||
                for (kind, sig) in SIG_MAP.into_iter() {
 | 
			
		||||
                    let sig = sig.clone();
 | 
			
		||||
                    let fut = signal(*kind).unwrap();
 | 
			
		||||
                    sigs.push(fut.map(move |_| Ok(sig)).boxed_local());
 | 
			
		||||
                }
 | 
			
		||||
                /* TODO: Finish rewriting this
 | 
			
		||||
                sigs.push(
 | 
			
		||||
                    tokio_net::signal::unix::signal(tokio_net::signal::si).unwrap()
 | 
			
		||||
                        .map(|stream| {
 | 
			
		||||
                            let s: SigStream = Box::new(stream.map(|_| Signal::Int));
 | 
			
		||||
                            s
 | 
			
		||||
                        }).boxed()
 | 
			
		||||
                );
 | 
			
		||||
                sigs.push(
 | 
			
		||||
 | 
			
		||||
                    tokio_net::signal::unix::signal(tokio_net::signal::unix::SignalKind::hangup()).unwrap()
 | 
			
		||||
                        .map(|stream: unix::Signal| {
 | 
			
		||||
                            let s: SigStream = Box::new(stream.map(|_| Signal::Hup));
 | 
			
		||||
                            s
 | 
			
		||||
                        },
 | 
			
		||||
                    ),
 | 
			
		||||
                ));
 | 
			
		||||
                sigs.push(Box::new(
 | 
			
		||||
                    tokio_signal::unix::Signal::new(tokio_signal::unix::SIGTERM).map(
 | 
			
		||||
                        |stream| {
 | 
			
		||||
                        }).boxed()
 | 
			
		||||
                );
 | 
			
		||||
                sigs.push(
 | 
			
		||||
                    tokio_net::signal::unix::signal(
 | 
			
		||||
                        tokio_net::signal::unix::SignalKind::terminate()
 | 
			
		||||
                    ).unwrap()
 | 
			
		||||
                        .map(|stream| {
 | 
			
		||||
                            let s: SigStream = Box::new(stream.map(|_| Signal::Term));
 | 
			
		||||
                            s
 | 
			
		||||
                        },
 | 
			
		||||
                    ),
 | 
			
		||||
                ));
 | 
			
		||||
                sigs.push(Box::new(
 | 
			
		||||
                    tokio_signal::unix::Signal::new(tokio_signal::unix::SIGQUIT).map(
 | 
			
		||||
                        |stream| {
 | 
			
		||||
                        }).boxed(),
 | 
			
		||||
                );
 | 
			
		||||
                sigs.push(
 | 
			
		||||
                    tokio_net::signal::unix::signal(
 | 
			
		||||
                        tokio_net::signal::unix::SignalKind::quit()
 | 
			
		||||
                    ).unwrap()
 | 
			
		||||
                        .map(|stream| {
 | 
			
		||||
                            let s: SigStream = Box::new(stream.map(|_| Signal::Quit));
 | 
			
		||||
                            s
 | 
			
		||||
                        },
 | 
			
		||||
                    ),
 | 
			
		||||
                ));
 | 
			
		||||
                futures_unordered(sigs)
 | 
			
		||||
                    .collect()
 | 
			
		||||
                    .map_err(|_| ())
 | 
			
		||||
                    .and_then(move |streams| Signals { srv, streams })
 | 
			
		||||
                        }).boxed()
 | 
			
		||||
                );
 | 
			
		||||
                */
 | 
			
		||||
 | 
			
		||||
                Signals { srv, streams: sigs }
 | 
			
		||||
            }
 | 
			
		||||
        };
 | 
			
		||||
        spawn(fut);
 | 
			
		||||
        spawn(async {});
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Future for Signals {
 | 
			
		||||
    type Output = ();
 | 
			
		||||
 | 
			
		||||
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    type Item = ();
 | 
			
		||||
    type Error = ();
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -115,4 +150,5 @@ impl Future for Signals {
 | 
			
		|||
            Ok(Async::NotReady)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,8 +1,8 @@
 | 
			
		|||
use std::{fmt, io, net};
 | 
			
		||||
 | 
			
		||||
use tokio_io::{AsyncRead, AsyncWrite};
 | 
			
		||||
use tokio_reactor::Handle;
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
use tokio_net::driver::Handle;
 | 
			
		||||
use tokio_net::tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
pub(crate) enum StdListener {
 | 
			
		||||
    Tcp(net::TcpListener),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -9,6 +9,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
 | 
			
		|||
use crate::counter::{Counter, CounterGuard};
 | 
			
		||||
use crate::ssl::MAX_CONN_COUNTER;
 | 
			
		||||
use crate::{Io, Protocol, ServerConfig};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
/// Support `SSL` connections via native-tls package
 | 
			
		||||
///
 | 
			
		||||
| 
						 | 
				
			
			@ -72,6 +74,18 @@ impl<T: AsyncRead + AsyncWrite, P> Service for NativeTlsAcceptorService<T, P> {
 | 
			
		|||
    type Error = Error;
 | 
			
		||||
    type Future = Accept<T, P>;
 | 
			
		||||
 | 
			
		||||
    fn poll_ready(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        ctx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<(), Self::Error>> {
 | 
			
		||||
        if self.conns.available(ctx) {
 | 
			
		||||
            Ok(Async::Ready(()))
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(Async::NotReady)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
 | 
			
		||||
        if self.conns.available() {
 | 
			
		||||
            Ok(Async::Ready(()))
 | 
			
		||||
| 
						 | 
				
			
			@ -79,6 +93,7 @@ impl<T: AsyncRead + AsyncWrite, P> Service for NativeTlsAcceptorService<T, P> {
 | 
			
		|||
            Ok(Async::NotReady)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
 | 
			
		||||
    fn call(&mut self, req: Self::Request) -> Self::Future {
 | 
			
		||||
        let (io, params, _) = req.into_parts();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,14 +1,18 @@
 | 
			
		|||
use std::marker::PhantomData;
 | 
			
		||||
 | 
			
		||||
use actix_service::{NewService, Service};
 | 
			
		||||
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
 | 
			
		||||
use openssl::ssl::{HandshakeError, SslAcceptor};
 | 
			
		||||
use futures::{future::ok, future::Ready, Future, FutureExt, Poll};
 | 
			
		||||
use openssl::ssl::SslAcceptor;
 | 
			
		||||
use tokio_io::{AsyncRead, AsyncWrite};
 | 
			
		||||
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
 | 
			
		||||
use tokio_openssl::{HandshakeError, SslStream};
 | 
			
		||||
 | 
			
		||||
use crate::counter::{Counter, CounterGuard};
 | 
			
		||||
use crate::ssl::MAX_CONN_COUNTER;
 | 
			
		||||
use crate::{Io, Protocol, ServerConfig};
 | 
			
		||||
use futures::future::LocalBoxFuture;
 | 
			
		||||
use std::io;
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
/// Support `SSL` connections via openssl package
 | 
			
		||||
///
 | 
			
		||||
| 
						 | 
				
			
			@ -37,14 +41,14 @@ impl<T: AsyncRead + AsyncWrite, P> Clone for OpensslAcceptor<T, P> {
 | 
			
		|||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsyncRead + AsyncWrite, P> NewService for OpensslAcceptor<T, P> {
 | 
			
		||||
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> NewService for OpensslAcceptor<T, P> {
 | 
			
		||||
    type Request = Io<T, P>;
 | 
			
		||||
    type Response = Io<SslStream<T>, P>;
 | 
			
		||||
    type Error = HandshakeError<T>;
 | 
			
		||||
    type Config = ServerConfig;
 | 
			
		||||
    type Service = OpensslAcceptorService<T, P>;
 | 
			
		||||
    type InitError = ();
 | 
			
		||||
    type Future = FutureResult<Self::Service, Self::InitError>;
 | 
			
		||||
    type Future = Ready<Result<Self::Service, Self::InitError>>;
 | 
			
		||||
 | 
			
		||||
    fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
 | 
			
		||||
        cfg.set_secure();
 | 
			
		||||
| 
						 | 
				
			
			@ -65,12 +69,20 @@ pub struct OpensslAcceptorService<T, P> {
 | 
			
		|||
    io: PhantomData<(T, P)>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsyncRead + AsyncWrite, P> Service for OpensslAcceptorService<T, P> {
 | 
			
		||||
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> Service for OpensslAcceptorService<T, P> {
 | 
			
		||||
    type Request = Io<T, P>;
 | 
			
		||||
    type Response = Io<SslStream<T>, P>;
 | 
			
		||||
    type Error = HandshakeError<T>;
 | 
			
		||||
    type Future = OpensslAcceptorServiceFut<T, P>;
 | 
			
		||||
 | 
			
		||||
    fn poll_ready(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        ctx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<(), Self::Error>> {
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
 | 
			
		||||
        if self.conns.available() {
 | 
			
		||||
            Ok(Async::Ready(()))
 | 
			
		||||
| 
						 | 
				
			
			@ -78,12 +90,18 @@ impl<T: AsyncRead + AsyncWrite, P> Service for OpensslAcceptorService<T, P> {
 | 
			
		|||
            Ok(Async::NotReady)
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
 | 
			
		||||
    fn call(&mut self, req: Self::Request) -> Self::Future {
 | 
			
		||||
        let (io, params, _) = req.into_parts();
 | 
			
		||||
        let acc = self.acceptor.clone();
 | 
			
		||||
        OpensslAcceptorServiceFut {
 | 
			
		||||
            _guard: self.conns.get(),
 | 
			
		||||
            fut: SslAcceptorExt::accept_async(&self.acceptor, io),
 | 
			
		||||
            fut: async move {
 | 
			
		||||
                let acc = acc;
 | 
			
		||||
                tokio_openssl::accept(&acc, io).await
 | 
			
		||||
            }
 | 
			
		||||
                .boxed_local::<'static>(),
 | 
			
		||||
            params: Some(params),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
| 
						 | 
				
			
			@ -93,17 +111,24 @@ pub struct OpensslAcceptorServiceFut<T, P>
 | 
			
		|||
where
 | 
			
		||||
    T: AsyncRead + AsyncWrite,
 | 
			
		||||
{
 | 
			
		||||
    fut: AcceptAsync<T>,
 | 
			
		||||
    fut: LocalBoxFuture<'static, Result<SslStream<T>, HandshakeError<T>>>,
 | 
			
		||||
    params: Option<P>,
 | 
			
		||||
    _guard: CounterGuard,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
 | 
			
		||||
    type Output = Result<Io<SslStream<T>, P>, HandshakeError<T>>;
 | 
			
		||||
 | 
			
		||||
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    type Item = Io<SslStream<T>, P>;
 | 
			
		||||
    type Error = HandshakeError<T>;
 | 
			
		||||
 | 
			
		||||
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 | 
			
		||||
        let io = futures::try_ready!(self.fut.poll());
 | 
			
		||||
        let io = futures::ready!(self.fut.poll())?;
 | 
			
		||||
        let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() {
 | 
			
		||||
            const H2: &[u8] = b"\x02h2";
 | 
			
		||||
            const HTTP10: &[u8] = b"\x08http/1.0";
 | 
			
		||||
| 
						 | 
				
			
			@ -127,4 +152,5 @@ impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
 | 
			
		|||
            proto,
 | 
			
		||||
        )))
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,6 +11,8 @@ use tokio_rustls::{Accept, TlsAcceptor, TlsStream};
 | 
			
		|||
use crate::counter::{Counter, CounterGuard};
 | 
			
		||||
use crate::ssl::MAX_CONN_COUNTER;
 | 
			
		||||
use crate::{Io, Protocol, ServerConfig as SrvConfig};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
/// Support `SSL` connections via rustls package
 | 
			
		||||
///
 | 
			
		||||
| 
						 | 
				
			
			@ -74,8 +76,11 @@ impl<T: AsyncRead + AsyncWrite, P> Service for RustlsAcceptorService<T, P> {
 | 
			
		|||
    type Error = io::Error;
 | 
			
		||||
    type Future = RustlsAcceptorServiceFut<T, P>;
 | 
			
		||||
 | 
			
		||||
    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
 | 
			
		||||
        if self.conns.available() {
 | 
			
		||||
    fn poll_ready(
 | 
			
		||||
        self: Pin<&mut Self>,
 | 
			
		||||
        ctx: &mut Context<'_>,
 | 
			
		||||
    ) -> Poll<Result<(), Self::Error>> {
 | 
			
		||||
        if self.conns.available(cx) {
 | 
			
		||||
            Ok(Async::Ready(()))
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(Async::NotReady)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,11 +1,11 @@
 | 
			
		|||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::{mem, time};
 | 
			
		||||
use std::{mem, task, time};
 | 
			
		||||
 | 
			
		||||
use actix_rt::{spawn, Arbiter};
 | 
			
		||||
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
 | 
			
		||||
use futures::sync::oneshot;
 | 
			
		||||
use futures::{future, Async, Future, Poll, Stream};
 | 
			
		||||
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
 | 
			
		||||
use futures::channel::oneshot;
 | 
			
		||||
use futures::{future, Future, Poll, Stream, TryFutureExt};
 | 
			
		||||
use futures::{FutureExt, StreamExt};
 | 
			
		||||
use log::{error, info, trace};
 | 
			
		||||
use tokio_timer::{sleep, Delay};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -14,6 +14,10 @@ use crate::counter::Counter;
 | 
			
		|||
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
 | 
			
		||||
use crate::socket::{SocketAddr, StdStream};
 | 
			
		||||
use crate::Token;
 | 
			
		||||
use actix_rt::spawn;
 | 
			
		||||
use futures::future::{LocalBoxFuture, MapOk};
 | 
			
		||||
use std::pin::Pin;
 | 
			
		||||
use std::task::Context;
 | 
			
		||||
 | 
			
		||||
pub(crate) struct WorkerCommand(Conn);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -153,31 +157,36 @@ impl Worker {
 | 
			
		|||
            state: WorkerState::Unavailable(Vec::new()),
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let mut fut = Vec::new();
 | 
			
		||||
        let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
 | 
			
		||||
        for (idx, factory) in wrk.factories.iter().enumerate() {
 | 
			
		||||
            fut.push(factory.create().map(move |res| {
 | 
			
		||||
                res.into_iter()
 | 
			
		||||
                    .map(|(t, s)| (idx, t, s))
 | 
			
		||||
            fut.push(factory.create().map_ok(move |r| {
 | 
			
		||||
                r.into_iter()
 | 
			
		||||
                    .map(|(t, s): (Token, _)| (idx, t, s))
 | 
			
		||||
                    .collect::<Vec<_>>()
 | 
			
		||||
            }));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        spawn(
 | 
			
		||||
            future::join_all(fut)
 | 
			
		||||
                .map_err(|e| {
 | 
			
		||||
                    error!("Can not start worker: {:?}", e);
 | 
			
		||||
                    Arbiter::current().stop();
 | 
			
		||||
                })
 | 
			
		||||
                .and_then(move |services| {
 | 
			
		||||
                    for item in services {
 | 
			
		||||
                        for (idx, token, service) in item {
 | 
			
		||||
                            while token.0 >= wrk.services.len() {
 | 
			
		||||
                                wrk.services.push(None);
 | 
			
		||||
            async {
 | 
			
		||||
                let mut res = future::join_all(fut).await;
 | 
			
		||||
                let res: Result<Vec<_>, _> = res.into_iter().collect();
 | 
			
		||||
                match res {
 | 
			
		||||
                    Ok(services) => {
 | 
			
		||||
                        for item in services {
 | 
			
		||||
                            for (idx, token, service) in item {
 | 
			
		||||
                                while token.0 >= wrk.services.len() {
 | 
			
		||||
                                    wrk.services.push(None);
 | 
			
		||||
                                }
 | 
			
		||||
                            }
 | 
			
		||||
                            wrk.services[token.0] = Some((idx, service));
 | 
			
		||||
                        }
 | 
			
		||||
                        Ok::<_, ()>(wrk);
 | 
			
		||||
                    }
 | 
			
		||||
                    wrk
 | 
			
		||||
                }),
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        //return Err(e);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
                .boxed_local(),
 | 
			
		||||
        );
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -198,13 +207,18 @@ impl Worker {
 | 
			
		|||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn check_readiness(&mut self, trace: bool) -> Result<bool, (Token, usize)> {
 | 
			
		||||
    fn check_readiness(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        trace: bool,
 | 
			
		||||
        cx: &mut Context<'_>,
 | 
			
		||||
    ) -> Result<bool, (Token, usize)> {
 | 
			
		||||
        /*
 | 
			
		||||
        let mut ready = self.conns.available();
 | 
			
		||||
        let mut failed = None;
 | 
			
		||||
        for (token, service) in &mut self.services.iter_mut().enumerate() {
 | 
			
		||||
            if let Some(service) = service {
 | 
			
		||||
                match service.1.poll_ready() {
 | 
			
		||||
                    Ok(Async::Ready(_)) => {
 | 
			
		||||
                match service.1.poll_ready(cx) {
 | 
			
		||||
                    Poll::Ready(Ok(_)) => {
 | 
			
		||||
                        if trace {
 | 
			
		||||
                            trace!(
 | 
			
		||||
                                "Service {:?} is available",
 | 
			
		||||
| 
						 | 
				
			
			@ -212,8 +226,8 @@ impl Worker {
 | 
			
		|||
                            );
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    Ok(Async::NotReady) => ready = false,
 | 
			
		||||
                    Err(_) => {
 | 
			
		||||
                    Poll::NotReady => ready = false,
 | 
			
		||||
                    Poll::Ready(Err(_)) => {
 | 
			
		||||
                        error!(
 | 
			
		||||
                            "Service {:?} readiness check returned error, restarting",
 | 
			
		||||
                            self.factories[service.0].name(Token(token))
 | 
			
		||||
| 
						 | 
				
			
			@ -227,7 +241,8 @@ impl Worker {
 | 
			
		|||
            Err(idx)
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(ready)
 | 
			
		||||
        }
 | 
			
		||||
        }*/
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -238,15 +253,19 @@ enum WorkerState {
 | 
			
		|||
    Restarting(
 | 
			
		||||
        usize,
 | 
			
		||||
        Token,
 | 
			
		||||
        Box<dyn Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>,
 | 
			
		||||
        Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>,
 | 
			
		||||
    ),
 | 
			
		||||
    Shutdown(Delay, Delay, oneshot::Sender<bool>),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Future for Worker {
 | 
			
		||||
    type Item = ();
 | 
			
		||||
    type Error = ();
 | 
			
		||||
    type Output = ();
 | 
			
		||||
 | 
			
		||||
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
 | 
			
		||||
        unimplemented!()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
 | 
			
		||||
        // `StopWorker` message handler
 | 
			
		||||
        if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() {
 | 
			
		||||
| 
						 | 
				
			
			@ -435,4 +454,5 @@ impl Future for Worker {
 | 
			
		|||
            WorkerState::None => panic!(),
 | 
			
		||||
        };
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,11 +4,12 @@ use std::{net, thread, time};
 | 
			
		|||
 | 
			
		||||
use actix_codec::{BytesCodec, Framed};
 | 
			
		||||
use actix_server::{Io, Server, ServerConfig};
 | 
			
		||||
use actix_service::{new_service_cfg, service_fn, IntoService};
 | 
			
		||||
use actix_service::{new_service_cfg, service_fn, IntoService, ServiceExt};
 | 
			
		||||
use bytes::Bytes;
 | 
			
		||||
use futures::{Future, Sink};
 | 
			
		||||
use futures::{Future, FutureExt, Sink, SinkExt};
 | 
			
		||||
use net2::TcpBuilder;
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
use tokio::future::ok;
 | 
			
		||||
use tokio_net::tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
fn unused_addr() -> net::SocketAddr {
 | 
			
		||||
    let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
 | 
			
		||||
| 
						 | 
				
			
			@ -30,7 +31,7 @@ fn test_bind() {
 | 
			
		|||
            .bind("test", addr, move || {
 | 
			
		||||
                new_service_cfg(move |cfg: &ServerConfig| {
 | 
			
		||||
                    assert_eq!(cfg.local_addr(), addr);
 | 
			
		||||
                    Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
 | 
			
		||||
                    ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
 | 
			
		||||
                })
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap()
 | 
			
		||||
| 
						 | 
				
			
			@ -54,7 +55,7 @@ fn test_bind_no_config() {
 | 
			
		|||
    let h = thread::spawn(move || {
 | 
			
		||||
        let sys = actix_rt::System::new("test");
 | 
			
		||||
        let srv = Server::build()
 | 
			
		||||
            .bind("test", addr, move || service_fn(|_| Ok::<_, ()>(())))
 | 
			
		||||
            .bind("test", addr, move || service_fn(|_| ok::<_, ()>(())))
 | 
			
		||||
            .unwrap()
 | 
			
		||||
            .start();
 | 
			
		||||
        let _ = tx.send((srv, actix_rt::System::current()));
 | 
			
		||||
| 
						 | 
				
			
			@ -78,7 +79,7 @@ fn test_listen() {
 | 
			
		|||
            .listen("test", lst, move || {
 | 
			
		||||
                new_service_cfg(move |cfg: &ServerConfig| {
 | 
			
		||||
                    assert_eq!(cfg.local_addr(), addr);
 | 
			
		||||
                    Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
 | 
			
		||||
                    ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
 | 
			
		||||
                })
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap()
 | 
			
		||||
| 
						 | 
				
			
			@ -107,14 +108,15 @@ fn test_start() {
 | 
			
		|||
            .bind("test", addr, move || {
 | 
			
		||||
                new_service_cfg(move |cfg: &ServerConfig| {
 | 
			
		||||
                    assert_eq!(cfg.local_addr(), addr);
 | 
			
		||||
                    Ok::<_, ()>(
 | 
			
		||||
                        (|io: Io<TcpStream>| {
 | 
			
		||||
                            Framed::new(io.into_parts().0, BytesCodec)
 | 
			
		||||
                                .send(Bytes::from_static(b"test"))
 | 
			
		||||
                                .then(|_| Ok::<_, ()>(()))
 | 
			
		||||
                        })
 | 
			
		||||
                        .into_service(),
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                    let serv_creator = (move |io: Io<TcpStream>| async {
 | 
			
		||||
                        panic!("Stream");
 | 
			
		||||
                        let mut f = Framed::new(io.into_parts().0, BytesCodec);
 | 
			
		||||
                        f.send(Bytes::from_static(b"test")).await.unwrap();
 | 
			
		||||
                        Ok::<_, ()>(())
 | 
			
		||||
                    }).into_service();
 | 
			
		||||
 | 
			
		||||
                    ok::<_, ()>(serv_creator)
 | 
			
		||||
                })
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap()
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +127,7 @@ fn test_start() {
 | 
			
		|||
    });
 | 
			
		||||
    let (srv, sys) = rx.recv().unwrap();
 | 
			
		||||
 | 
			
		||||
    let mut buf = [0u8; 4];
 | 
			
		||||
    let mut buf = [1u8; 4];
 | 
			
		||||
    let mut conn = net::TcpStream::connect(addr).unwrap();
 | 
			
		||||
    let _ = conn.read_exact(&mut buf);
 | 
			
		||||
    assert_eq!(buf, b"test"[..]);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
[package]
 | 
			
		||||
name = "actix-test-server"
 | 
			
		||||
version = "0.2.3"
 | 
			
		||||
version = "0.2.2"
 | 
			
		||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
 | 
			
		||||
description = "Actix test server"
 | 
			
		||||
keywords = ["network", "framework", "async", "futures"]
 | 
			
		||||
| 
						 | 
				
			
			@ -11,22 +11,49 @@ categories = ["network-programming", "asynchronous"]
 | 
			
		|||
license = "MIT/Apache-2.0"
 | 
			
		||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
 | 
			
		||||
edition = "2018"
 | 
			
		||||
workspace = ".."
 | 
			
		||||
 | 
			
		||||
[package.metadata.docs.rs]
 | 
			
		||||
features = ["ssl", "tls", "rust-tls"]
 | 
			
		||||
 | 
			
		||||
[lib]
 | 
			
		||||
name = "actix_test_server"
 | 
			
		||||
path = "src/lib.rs"
 | 
			
		||||
 | 
			
		||||
[features]
 | 
			
		||||
default = []
 | 
			
		||||
 | 
			
		||||
# tls
 | 
			
		||||
tls = ["native-tls", "actix-server/tls"]
 | 
			
		||||
 | 
			
		||||
# openssl
 | 
			
		||||
ssl = ["openssl", "actix-server/ssl"]
 | 
			
		||||
 | 
			
		||||
# rustls
 | 
			
		||||
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
 | 
			
		||||
 | 
			
		||||
[dependencies]
 | 
			
		||||
actix-rt = "0.2.1"
 | 
			
		||||
actix-server = "0.5.0"
 | 
			
		||||
actix-server-config = "0.1.0"
 | 
			
		||||
actix-testing = "0.1.0"
 | 
			
		||||
 | 
			
		||||
log = "0.4"
 | 
			
		||||
net2 = "0.2"
 | 
			
		||||
futures = "0.1"
 | 
			
		||||
tokio-tcp = "0.1"
 | 
			
		||||
tokio-reactor = "0.1"
 | 
			
		||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18"}
 | 
			
		||||
tokio-io = "0.2.0-alpha.4"
 | 
			
		||||
tokio-net = "0.2.0-alpha.4"
 | 
			
		||||
 | 
			
		||||
# native-tls
 | 
			
		||||
native-tls = { version="0.2", optional = true }
 | 
			
		||||
 | 
			
		||||
# openssl
 | 
			
		||||
openssl = { version="0.10", optional = true }
 | 
			
		||||
 | 
			
		||||
#rustls
 | 
			
		||||
rustls = { version = "^0.16", optional = true }
 | 
			
		||||
tokio-rustls = { version = "^0.12.0-alpha.2", optional = true }
 | 
			
		||||
webpki = { version = "0.21", optional = true }
 | 
			
		||||
webpki-roots = { version = "0.17", optional = true }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
actix-service = "0.4.0"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,2 +1,149 @@
 | 
			
		|||
//! Various helpers for Actix applications to use during testing.
 | 
			
		||||
pub use actix_testing::*;
 | 
			
		||||
use std::sync::mpsc;
 | 
			
		||||
use std::{net, thread};
 | 
			
		||||
 | 
			
		||||
use actix_rt::{Runtime, System};
 | 
			
		||||
use actix_server::{Server, StreamServiceFactory};
 | 
			
		||||
pub use actix_server_config::{Io, ServerConfig};
 | 
			
		||||
 | 
			
		||||
use futures::future::{lazy, Future, IntoFuture};
 | 
			
		||||
use net2::TcpBuilder;
 | 
			
		||||
use tokio_reactor::Handle;
 | 
			
		||||
use tokio_tcp::TcpStream;
 | 
			
		||||
 | 
			
		||||
/// The `TestServer` type.
 | 
			
		||||
///
 | 
			
		||||
/// `TestServer` is very simple test server that simplify process of writing
 | 
			
		||||
/// integration tests for actix-net applications.
 | 
			
		||||
///
 | 
			
		||||
/// # Examples
 | 
			
		||||
///
 | 
			
		||||
/// ```rust
 | 
			
		||||
/// use actix_service::{service_fn, IntoNewService};
 | 
			
		||||
/// use actix_test_server::TestServer;
 | 
			
		||||
///
 | 
			
		||||
/// fn main() {
 | 
			
		||||
///     let srv = TestServer::with(|| service_fn(
 | 
			
		||||
///         |sock| {
 | 
			
		||||
///             println!("New connection: {:?}", sock);
 | 
			
		||||
///             Ok::<_, ()>(())
 | 
			
		||||
///         }
 | 
			
		||||
///     ));
 | 
			
		||||
///
 | 
			
		||||
///     println!("SOCKET: {:?}", srv.connect());
 | 
			
		||||
/// }
 | 
			
		||||
/// ```
 | 
			
		||||
pub struct TestServer;
 | 
			
		||||
 | 
			
		||||
/// Test server runstime
 | 
			
		||||
pub struct TestServerRuntime {
 | 
			
		||||
    addr: net::SocketAddr,
 | 
			
		||||
    host: String,
 | 
			
		||||
    port: u16,
 | 
			
		||||
    rt: Runtime,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TestServer {
 | 
			
		||||
    /// Start new test server with application factory
 | 
			
		||||
    pub fn with<F: StreamServiceFactory>(factory: F) -> TestServerRuntime {
 | 
			
		||||
        let (tx, rx) = mpsc::channel();
 | 
			
		||||
 | 
			
		||||
        // run server in separate thread
 | 
			
		||||
        thread::spawn(move || {
 | 
			
		||||
            let sys = System::new("actix-test-server");
 | 
			
		||||
            let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
 | 
			
		||||
            let local_addr = tcp.local_addr().unwrap();
 | 
			
		||||
 | 
			
		||||
            Server::build()
 | 
			
		||||
                .listen("test", tcp, factory)?
 | 
			
		||||
                .workers(1)
 | 
			
		||||
                .disable_signals()
 | 
			
		||||
                .start();
 | 
			
		||||
 | 
			
		||||
            tx.send((System::current(), local_addr)).unwrap();
 | 
			
		||||
            sys.run()
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        let (system, addr) = rx.recv().unwrap();
 | 
			
		||||
        System::set_current(system);
 | 
			
		||||
 | 
			
		||||
        let rt = Runtime::new().unwrap();
 | 
			
		||||
        let host = format!("{}", addr.ip());
 | 
			
		||||
        let port = addr.port();
 | 
			
		||||
 | 
			
		||||
        TestServerRuntime {
 | 
			
		||||
            addr,
 | 
			
		||||
            rt,
 | 
			
		||||
            host,
 | 
			
		||||
            port,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get firat available unused local address
 | 
			
		||||
    pub fn unused_addr() -> net::SocketAddr {
 | 
			
		||||
        let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
 | 
			
		||||
        let socket = TcpBuilder::new_v4().unwrap();
 | 
			
		||||
        socket.bind(&addr).unwrap();
 | 
			
		||||
        socket.reuse_address(true).unwrap();
 | 
			
		||||
        let tcp = socket.to_tcp_listener().unwrap();
 | 
			
		||||
        tcp.local_addr().unwrap()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TestServerRuntime {
 | 
			
		||||
    /// Execute future on current runtime
 | 
			
		||||
    pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
 | 
			
		||||
    where
 | 
			
		||||
        F: Future<Item = I, Error = E>,
 | 
			
		||||
    {
 | 
			
		||||
        self.rt.block_on(fut)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Runs the provided function, with runtime enabled.
 | 
			
		||||
    pub fn run_on<F, R>(&mut self, f: F) -> Result<R::Item, R::Error>
 | 
			
		||||
    where
 | 
			
		||||
        F: FnOnce() -> R,
 | 
			
		||||
        R: IntoFuture,
 | 
			
		||||
    {
 | 
			
		||||
        self.rt.block_on(lazy(|| f().into_future()))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Spawn future to the current runtime
 | 
			
		||||
    pub fn spawn<F>(&mut self, fut: F)
 | 
			
		||||
    where
 | 
			
		||||
        F: Future<Item = (), Error = ()> + 'static,
 | 
			
		||||
    {
 | 
			
		||||
        self.rt.spawn(fut);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Test server host
 | 
			
		||||
    pub fn host(&self) -> &str {
 | 
			
		||||
        &self.host
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Test server port
 | 
			
		||||
    pub fn port(&self) -> u16 {
 | 
			
		||||
        self.port
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get test server address
 | 
			
		||||
    pub fn addr(&self) -> net::SocketAddr {
 | 
			
		||||
        self.addr
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Stop http server
 | 
			
		||||
    fn stop(&mut self) {
 | 
			
		||||
        System::current().stop();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Connect to server, return tokio TcpStream
 | 
			
		||||
    pub fn connect(&self) -> std::io::Result<TcpStream> {
 | 
			
		||||
        TcpStream::from_std(net::TcpStream::connect(self.addr)?, &Handle::default())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Drop for TestServerRuntime {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        self.stop()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue