From 58230b15b9cd67a98e65a074652bd384e24757f6 Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Tue, 31 Jul 2018 19:51:26 -0700
Subject: [PATCH] use one thread for accept loop; refactor rust-tls support

---
 .travis.yml          |   6 +-
 src/server/accept.rs | 439 +++++++++++++++++++++++++++----------------
 src/server/mod.rs    |   6 +-
 src/server/srv.rs    |  57 +++---
 src/test.rs          |  65 ++++---
 tests/test_server.rs |  56 ++++++
 tests/test_ws.rs     |   9 +-
 7 files changed, 406 insertions(+), 232 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 54a86aa7..f03c9523 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,12 +32,12 @@ script:
   - |
     if [[ "$TRAVIS_RUST_VERSION" != "stable" ]]; then
     cargo clean
-    cargo test --features="alpn,tls" -- --nocapture
+    cargo test --features="alpn,tls,rust-tls" -- --nocapture
     fi
   - |
     if [[ "$TRAVIS_RUST_VERSION" == "stable" ]]; then
     RUSTFLAGS="--cfg procmacro2_semver_exempt" cargo install -f cargo-tarpaulin
-    cargo tarpaulin --features="alpn,tls" --out Xml --no-count
+    cargo tarpaulin --features="alpn,tls,rust-tls" --out Xml --no-count
     bash <(curl -s https://codecov.io/bash)
     echo "Uploaded code coverage"
     fi
@@ -46,7 +46,7 @@ script:
 after_success:
   - |
     if [[ "$TRAVIS_OS_NAME" == "linux" && "$TRAVIS_PULL_REQUEST" = "false" && "$TRAVIS_BRANCH" == "master" && "$TRAVIS_RUST_VERSION" == "beta" ]]; then
-      cargo doc --features "alpn, tls, session" --no-deps &&
+      cargo doc --features "alpn, tls, rust-tls, session" --no-deps &&
       echo "<meta http-equiv=refresh content=0;url=os_balloon/index.html>" > target/doc/index.html &&
       git clone https://github.com/davisp/ghp-import.git &&
       ./ghp-import/ghp_import.py -n -p -f -m "Documentation upload" -r https://"$GH_TOKEN"@github.com/"$TRAVIS_REPO_SLUG.git" target/doc &&
diff --git a/src/server/accept.rs b/src/server/accept.rs
index a91ca814..75280560 100644
--- a/src/server/accept.rs
+++ b/src/server/accept.rs
@@ -1,22 +1,16 @@
 use std::sync::mpsc as sync_mpsc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 use std::{io, net, thread};
 
-use futures::sync::mpsc;
+use futures::{sync::mpsc, Future};
 use mio;
 use slab::Slab;
+use tokio_timer::Delay;
 
-#[cfg(feature = "tls")]
-use native_tls::TlsAcceptor;
-
-#[cfg(feature = "alpn")]
-use openssl::ssl::{AlpnError, SslAcceptorBuilder};
-
-#[cfg(feature = "rust-tls")]
-use rustls::ServerConfig;
+use actix::{msgs::Execute, Arbiter, System};
 
 use super::srv::{ServerCommand, Socket};
-use super::worker::{Conn, SocketInfo};
+use super::worker::Conn;
 
 pub(crate) enum Command {
     Pause,
@@ -25,169 +19,43 @@ pub(crate) enum Command {
     Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
 }
 
+struct ServerSocketInfo {
+    addr: net::SocketAddr,
+    token: usize,
+    sock: mio::net::TcpListener,
+    timeout: Option<Instant>,
+}
+
+struct Accept {
+    poll: mio::Poll,
+    rx: sync_mpsc::Receiver<Command>,
+    sockets: Slab<ServerSocketInfo>,
+    workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
+    _reg: mio::Registration,
+    next: usize,
+    srv: mpsc::UnboundedSender<ServerCommand>,
+    timer: (mio::Registration, mio::SetReadiness),
+}
+
+const CMD: mio::Token = mio::Token(0);
+const TIMER: mio::Token = mio::Token(1);
+
 pub(crate) fn start_accept_thread(
-    token: usize, sock: Socket, srv: mpsc::UnboundedSender<ServerCommand>,
-    socks: Slab<SocketInfo>,
-    mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
+    socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender<ServerCommand>,
+    workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
 ) -> (mio::SetReadiness, sync_mpsc::Sender<Command>) {
     let (tx, rx) = sync_mpsc::channel();
     let (reg, readiness) = mio::Registration::new2();
 
+    let sys = System::current();
+
     // start accept thread
     #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))]
     let _ = thread::Builder::new()
-        .name(format!("Accept on {}", sock.addr))
+        .name("actix-web accept loop".to_owned())
         .spawn(move || {
-            const SRV: mio::Token = mio::Token(0);
-            const CMD: mio::Token = mio::Token(1);
-
-            let addr = sock.addr;
-            let mut server = Some(
-                mio::net::TcpListener::from_std(sock.lst)
-                    .expect("Can not create mio::net::TcpListener"),
-            );
-
-            // Create a poll instance
-            let poll = match mio::Poll::new() {
-                Ok(poll) => poll,
-                Err(err) => panic!("Can not create mio::Poll: {}", err),
-            };
-
-            // Start listening for incoming connections
-            if let Some(ref srv) = server {
-                if let Err(err) =
-                    poll.register(srv, SRV, mio::Ready::readable(), mio::PollOpt::edge())
-                {
-                    panic!("Can not register io: {}", err);
-                }
-            }
-
-            // Start listening for incoming commands
-            if let Err(err) =
-                poll.register(&reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
-            {
-                panic!("Can not register Registration: {}", err);
-            }
-
-            // Create storage for events
-            let mut events = mio::Events::with_capacity(128);
-
-            // Sleep on error
-            let sleep = Duration::from_millis(100);
-
-            let mut next = 0;
-            loop {
-                if let Err(err) = poll.poll(&mut events, None) {
-                    panic!("Poll error: {}", err);
-                }
-
-                for event in events.iter() {
-                    match event.token() {
-                        SRV => if let Some(ref server) = server {
-                            loop {
-                                match server.accept_std() {
-                                    Ok((io, addr)) => {
-                                        let mut msg = Conn {
-                                            io,
-                                            token,
-                                            peer: Some(addr),
-                                            http2: false,
-                                        };
-                                        while !workers.is_empty() {
-                                            match workers[next].1.unbounded_send(msg) {
-                                                Ok(_) => (),
-                                                Err(err) => {
-                                                    let _ = srv.unbounded_send(
-                                                        ServerCommand::WorkerDied(
-                                                            workers[next].0,
-                                                            socks.clone(),
-                                                        ),
-                                                    );
-                                                    msg = err.into_inner();
-                                                    workers.swap_remove(next);
-                                                    if workers.is_empty() {
-                                                        error!("No workers");
-                                                        thread::sleep(sleep);
-                                                        break;
-                                                    } else if workers.len() <= next {
-                                                        next = 0;
-                                                    }
-                                                    continue;
-                                                }
-                                            }
-                                            next = (next + 1) % workers.len();
-                                            break;
-                                        }
-                                    }
-                                    Err(ref e)
-                                        if e.kind() == io::ErrorKind::WouldBlock =>
-                                    {
-                                        break
-                                    }
-                                    Err(ref e) if connection_error(e) => continue,
-                                    Err(e) => {
-                                        error!("Error accepting connection: {}", e);
-                                        // sleep after error
-                                        thread::sleep(sleep);
-                                        break;
-                                    }
-                                }
-                            }
-                        },
-                        CMD => match rx.try_recv() {
-                            Ok(cmd) => match cmd {
-                                Command::Pause => if let Some(ref server) = server {
-                                    if let Err(err) = poll.deregister(server) {
-                                        error!(
-                                            "Can not deregister server socket {}",
-                                            err
-                                        );
-                                    } else {
-                                        info!(
-                                            "Paused accepting connections on {}",
-                                            addr
-                                        );
-                                    }
-                                },
-                                Command::Resume => {
-                                    if let Some(ref server) = server {
-                                        if let Err(err) = poll.register(
-                                            server,
-                                            SRV,
-                                            mio::Ready::readable(),
-                                            mio::PollOpt::edge(),
-                                        ) {
-                                            error!("Can not resume socket accept process: {}", err);
-                                        } else {
-                                            info!("Accepting connections on {} has been resumed",
-                                                  addr);
-                                        }
-                                    }
-                                }
-                                Command::Stop => {
-                                    if let Some(server) = server.take() {
-                                        let _ = poll.deregister(&server);
-                                    }
-                                    return;
-                                }
-                                Command::Worker(idx, addr) => {
-                                    workers.push((idx, addr));
-                                }
-                            },
-                            Err(err) => match err {
-                                sync_mpsc::TryRecvError::Empty => (),
-                                sync_mpsc::TryRecvError::Disconnected => {
-                                    if let Some(server) = server.take() {
-                                        let _ = poll.deregister(&server);
-                                    }
-                                    return;
-                                }
-                            },
-                        },
-                        _ => unreachable!(),
-                    }
-                }
-            }
+            System::set_current(sys);
+            Accept::new(reg, rx, socks, workers, srv).poll();
         });
 
     (readiness, tx)
@@ -205,3 +73,244 @@ fn connection_error(e: &io::Error) -> bool {
         || e.kind() == io::ErrorKind::ConnectionAborted
         || e.kind() == io::ErrorKind::ConnectionReset
 }
+
+impl Accept {
+    fn new(
+        _reg: mio::Registration, rx: sync_mpsc::Receiver<Command>,
+        socks: Vec<(usize, Socket)>,
+        workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>,
+        srv: mpsc::UnboundedSender<ServerCommand>,
+    ) -> Accept {
+        // Create a poll instance
+        let poll = match mio::Poll::new() {
+            Ok(poll) => poll,
+            Err(err) => panic!("Can not create mio::Poll: {}", err),
+        };
+
+        // Start listening for incoming commands
+        if let Err(err) =
+            poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge())
+        {
+            panic!("Can not register Registration: {}", err);
+        }
+
+        // Start accept
+        let mut sockets = Slab::new();
+        for (stoken, sock) in socks {
+            let server = mio::net::TcpListener::from_std(sock.lst)
+                .expect("Can not create mio::net::TcpListener");
+
+            let entry = sockets.vacant_entry();
+            let token = entry.key();
+
+            // Start listening for incoming connections
+            if let Err(err) = poll.register(
+                &server,
+                mio::Token(token + 1000),
+                mio::Ready::readable(),
+                mio::PollOpt::edge(),
+            ) {
+                panic!("Can not register io: {}", err);
+            }
+
+            entry.insert(ServerSocketInfo {
+                token: stoken,
+                addr: sock.addr,
+                sock: server,
+                timeout: None,
+            });
+        }
+
+        // Timer
+        let (tm, tmr) = mio::Registration::new2();
+        if let Err(err) =
+            poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
+        {
+            panic!("Can not register Registration: {}", err);
+        }
+
+        Accept {
+            poll,
+            rx,
+            _reg,
+            sockets,
+            workers,
+            srv,
+            next: 0,
+            timer: (tm, tmr),
+        }
+    }
+
+    fn poll(&mut self) {
+        // Create storage for events
+        let mut events = mio::Events::with_capacity(128);
+
+        loop {
+            if let Err(err) = self.poll.poll(&mut events, None) {
+                panic!("Poll error: {}", err);
+            }
+
+            for event in events.iter() {
+                let token = event.token();
+                match token {
+                    CMD => if !self.process_cmd() {
+                        return;
+                    },
+                    TIMER => self.process_timer(),
+                    _ => self.accept(token),
+                }
+            }
+        }
+    }
+
+    fn process_timer(&mut self) {
+        let now = Instant::now();
+        for (token, info) in self.sockets.iter_mut() {
+            if let Some(inst) = info.timeout.take() {
+                if now > inst {
+                    if let Err(err) = self.poll.register(
+                        &info.sock,
+                        mio::Token(token + 1000),
+                        mio::Ready::readable(),
+                        mio::PollOpt::edge(),
+                    ) {
+                        error!("Can not register server socket {}", err);
+                    } else {
+                        info!("Resume accepting connections on {}", info.addr);
+                    }
+                } else {
+                    info.timeout = Some(inst);
+                }
+            }
+        }
+    }
+
+    fn process_cmd(&mut self) -> bool {
+        loop {
+            match self.rx.try_recv() {
+                Ok(cmd) => match cmd {
+                    Command::Pause => {
+                        for (_, info) in self.sockets.iter_mut() {
+                            if let Err(err) = self.poll.deregister(&info.sock) {
+                                error!("Can not deregister server socket {}", err);
+                            } else {
+                                info!("Paused accepting connections on {}", info.addr);
+                            }
+                        }
+                    }
+                    Command::Resume => {
+                        for (token, info) in self.sockets.iter() {
+                            if let Err(err) = self.poll.register(
+                                &info.sock,
+                                mio::Token(token + 1000),
+                                mio::Ready::readable(),
+                                mio::PollOpt::edge(),
+                            ) {
+                                error!("Can not resume socket accept process: {}", err);
+                            } else {
+                                info!(
+                                    "Accepting connections on {} has been resumed",
+                                    info.addr
+                                );
+                            }
+                        }
+                    }
+                    Command::Stop => {
+                        for (_, info) in self.sockets.iter() {
+                            let _ = self.poll.deregister(&info.sock);
+                        }
+                        return false;
+                    }
+                    Command::Worker(idx, addr) => {
+                        self.workers.push((idx, addr));
+                    }
+                },
+                Err(err) => match err {
+                    sync_mpsc::TryRecvError::Empty => break,
+                    sync_mpsc::TryRecvError::Disconnected => {
+                        for (_, info) in self.sockets.iter() {
+                            let _ = self.poll.deregister(&info.sock);
+                        }
+                        return false;
+                    }
+                },
+            }
+        }
+        true
+    }
+
+    fn accept(&mut self, token: mio::Token) {
+        let token = usize::from(token);
+        if token < 1000 {
+            return;
+        }
+
+        if let Some(info) = self.sockets.get_mut(token - 1000) {
+            loop {
+                match info.sock.accept_std() {
+                    Ok((io, addr)) => {
+                        let mut msg = Conn {
+                            io,
+                            token: info.token,
+                            peer: Some(addr),
+                            http2: false,
+                        };
+                        while !self.workers.is_empty() {
+                            match self.workers[self.next].1.unbounded_send(msg) {
+                                Ok(_) => (),
+                                Err(err) => {
+                                    let _ = self.srv.unbounded_send(
+                                        ServerCommand::WorkerDied(
+                                            self.workers[self.next].0,
+                                        ),
+                                    );
+                                    msg = err.into_inner();
+                                    self.workers.swap_remove(self.next);
+                                    if self.workers.is_empty() {
+                                        error!("No workers");
+                                        thread::sleep(Duration::from_millis(100));
+                                        break;
+                                    } else if self.workers.len() <= self.next {
+                                        self.next = 0;
+                                    }
+                                    continue;
+                                }
+                            }
+                            self.next = (self.next + 1) % self.workers.len();
+                            break;
+                        }
+                    }
+                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
+                    Err(ref e) if connection_error(e) => continue,
+                    Err(e) => {
+                        error!("Error accepting connection: {}", e);
+                        if let Err(err) = self.poll.deregister(&info.sock) {
+                            error!("Can not deregister server socket {}", err);
+                        }
+
+                        // sleep after error
+                        info.timeout = Some(Instant::now() + Duration::from_millis(500));
+
+                        let r = self.timer.1.clone();
+                        System::current().arbiter().do_send(Execute::new(
+                            move || -> Result<(), ()> {
+                                Arbiter::spawn(
+                                    Delay::new(
+                                        Instant::now() + Duration::from_millis(510),
+                                    ).map_err(|_| ())
+                                        .and_then(move |_| {
+                                            let _ =
+                                                r.set_readiness(mio::Ready::readable());
+                                            Ok(())
+                                        }),
+                                );
+                                Ok(())
+                            },
+                        ));
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/src/server/mod.rs b/src/server/mod.rs
index a4f5e87d..429e293f 100644
--- a/src/server/mod.rs
+++ b/src/server/mod.rs
@@ -315,10 +315,10 @@ impl IoStream for TlsStream<TcpStream> {
 #[cfg(feature = "rust-tls")]
 use rustls::{ClientSession, ServerSession};
 #[cfg(feature = "rust-tls")]
-use tokio_rustls::TlsStream;
+use tokio_rustls::TlsStream as RustlsStream;
 
 #[cfg(feature = "rust-tls")]
-impl IoStream for TlsStream<TcpStream, ClientSession> {
+impl IoStream for RustlsStream<TcpStream, ClientSession> {
     #[inline]
     fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
         let _ = <Self as AsyncWrite>::shutdown(self);
@@ -337,7 +337,7 @@ impl IoStream for TlsStream<TcpStream, ClientSession> {
 }
 
 #[cfg(feature = "rust-tls")]
-impl IoStream for TlsStream<TcpStream, ServerSession> {
+impl IoStream for RustlsStream<TcpStream, ServerSession> {
     #[inline]
     fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
         let _ = <Self as AsyncWrite>::shutdown(self);
diff --git a/src/server/srv.rs b/src/server/srv.rs
index a054d5a7..e776f742 100644
--- a/src/server/srv.rs
+++ b/src/server/srv.rs
@@ -46,14 +46,6 @@ fn configure_alpn(builder: &mut SslAcceptorBuilder) -> io::Result<()> {
     Ok(())
 }
 
-#[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
-fn configure_alpn(builder: &mut Arc<ServerConfig>) -> io::Result<()> {
-    Arc::<ServerConfig>::get_mut(builder)
-        .unwrap()
-        .set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
-    Ok(())
-}
-
 /// An HTTP Server
 pub struct HttpServer<H>
 where
@@ -68,7 +60,11 @@ where
     #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
     workers: Vec<(usize, Addr<Worker<H::Handler>>)>,
     sockets: Vec<Socket>,
-    accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
+    accept: Option<(
+        mio::SetReadiness,
+        sync_mpsc::Sender<Command>,
+        Slab<SocketInfo>,
+    )>,
     exit: bool,
     shutdown_timeout: u16,
     signals: Option<Addr<signal::ProcessSignals>>,
@@ -77,7 +73,7 @@ where
 }
 
 pub(crate) enum ServerCommand {
-    WorkerDied(usize, Slab<SocketInfo>),
+    WorkerDied(usize),
 }
 
 impl<H> Actor for HttpServer<H>
@@ -114,7 +110,7 @@ where
             factory: Arc::new(f),
             workers: Vec::new(),
             sockets: Vec::new(),
-            accept: Vec::new(),
+            accept: None,
             exit: false,
             shutdown_timeout: 30,
             signals: None,
@@ -280,22 +276,22 @@ where
         Ok(self)
     }
 
-    #[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
+    #[cfg(feature = "rust-tls")]
     /// Use listener for accepting incoming tls connection requests
     ///
     /// This method sets alpn protocols to "h2" and "http/1.1"
-    pub fn listen_ssl(
-        mut self, lst: net::TcpListener, mut builder: Arc<ServerConfig>,
+    pub fn listen_rustls(
+        mut self, lst: net::TcpListener, mut builder: ServerConfig,
     ) -> io::Result<Self> {
         // alpn support
         if !self.no_http2 {
-            configure_alpn(&mut builder)?;
+            builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
         }
         let addr = lst.local_addr().unwrap();
         self.sockets.push(Socket {
             addr,
             lst,
-            tp: StreamHandlerType::Rustls(builder.clone()),
+            tp: StreamHandlerType::Rustls(Arc::new(builder)),
         });
         Ok(self)
     }
@@ -378,20 +374,21 @@ where
         Ok(self)
     }
 
-    #[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
+    #[cfg(feature = "rust-tls")]
     /// Start listening for incoming tls connections.
     ///
     /// This method sets alpn protocols to "h2" and "http/1.1"
-    pub fn bind_ssl<S: net::ToSocketAddrs>(
-        mut self, addr: S, mut builder: Arc<ServerConfig>,
+    pub fn bind_rustls<S: net::ToSocketAddrs>(
+        mut self, addr: S, mut builder: ServerConfig,
     ) -> io::Result<Self> {
         // alpn support
         if !self.no_http2 {
-            configure_alpn(&mut builder)?;
+            builder.set_protocols(&vec!["h2".to_string(), "http/1.1".to_string()]);
         }
 
+        let builder = Arc::new(builder);
         let sockets = self.bind2(addr)?;
-        self.sockets.extend(sockets.into_iter().map(|mut s| {
+        self.sockets.extend(sockets.into_iter().map(move |mut s| {
             s.tp = StreamHandlerType::Rustls(builder.clone());
             s
         }));
@@ -487,17 +484,12 @@ impl<H: IntoHttpHandler> HttpServer<H> {
             let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false);
             let workers = self.start_workers(&settings, &socks);
 
-            // start acceptors threads
-            for (token, sock) in addrs {
+            // start accept thread
+            for (_, sock) in &addrs {
                 info!("Starting server on http://{}", sock.addr);
-                self.accept.push(start_accept_thread(
-                    token,
-                    sock,
-                    tx.clone(),
-                    socks.clone(),
-                    workers.clone(),
-                ));
             }
+            let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone());
+            self.accept = Some((r, cmd, socks));
 
             // start http server actor
             let signals = self.subscribe_to_signals();
@@ -672,7 +664,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
 
     fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
         match msg {
-            ServerCommand::WorkerDied(idx, socks) => {
+            ServerCommand::WorkerDied(idx) => {
                 let mut found = false;
                 for i in 0..self.workers.len() {
                     if self.workers[i].0 == idx {
@@ -700,6 +692,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
                     let ka = self.keep_alive;
                     let factory = Arc::clone(&self.factory);
                     let host = self.host.clone();
+                    let socks = self.accept.as_ref().unwrap().2.clone();
                     let addr = socks[0].addr;
 
                     let addr = Arbiter::start(move |ctx: &mut Context<_>| {
@@ -709,7 +702,7 @@ impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H> {
                         ctx.add_message_stream(rx);
                         Worker::new(apps, socks, ka, settings)
                     });
-                    for item in &self.accept {
+                    if let Some(ref item) = &self.accept {
                         let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
                         let _ = item.0.set_readiness(mio::Ready::readable());
                     }
diff --git a/src/test.rs b/src/test.rs
index f94732dd..5c520a75 100644
--- a/src/test.rs
+++ b/src/test.rs
@@ -15,10 +15,10 @@ use tokio::runtime::current_thread::Runtime;
 
 #[cfg(feature = "alpn")]
 use openssl::ssl::SslAcceptorBuilder;
-#[cfg(feature = "rust-tls")]
+#[cfg(all(feature = "rust-tls"))]
 use rustls::ServerConfig;
-#[cfg(feature = "rust-tls")]
-use std::sync::Arc;
+//#[cfg(all(feature = "rust-tls"))]
+//use std::sync::Arc;
 
 use application::{App, HttpApplication};
 use body::Binary;
@@ -144,7 +144,7 @@ impl TestServer {
             builder.set_verify(SslVerifyMode::NONE);
             ClientConnector::with_connector(builder.build()).start()
         }
-        #[cfg(feature = "rust-tls")]
+        #[cfg(all(feature = "rust-tls", not(feature = "alpn")))]
         {
             use rustls::ClientConfig;
             use std::fs::File;
@@ -256,7 +256,7 @@ pub struct TestServerBuilder<S> {
     #[cfg(feature = "alpn")]
     ssl: Option<SslAcceptorBuilder>,
     #[cfg(feature = "rust-tls")]
-    ssl: Option<Arc<ServerConfig>>,
+    rust_ssl: Option<ServerConfig>,
 }
 
 impl<S: 'static> TestServerBuilder<S> {
@@ -267,8 +267,10 @@ impl<S: 'static> TestServerBuilder<S> {
     {
         TestServerBuilder {
             state: Box::new(state),
-            #[cfg(any(feature = "alpn", feature = "rust-tls"))]
+            #[cfg(feature = "alpn")]
             ssl: None,
+            #[cfg(feature = "rust-tls")]
+            rust_ssl: None,
         }
     }
 
@@ -280,9 +282,9 @@ impl<S: 'static> TestServerBuilder<S> {
     }
 
     #[cfg(feature = "rust-tls")]
-    /// Create ssl server
-    pub fn ssl(mut self, ssl: Arc<ServerConfig>) -> Self {
-        self.ssl = Some(ssl);
+    /// Create rust tls server
+    pub fn rustls(mut self, ssl: ServerConfig) -> Self {
+        self.rust_ssl = Some(ssl);
         self
     }
 
@@ -294,41 +296,56 @@ impl<S: 'static> TestServerBuilder<S> {
     {
         let (tx, rx) = mpsc::channel();
 
-        #[cfg(any(feature = "alpn", feature = "rust-tls"))]
-        let ssl = self.ssl.is_some();
-        #[cfg(not(any(feature = "alpn", feature = "rust-tls")))]
-        let ssl = false;
+        let mut has_ssl = false;
+
+        #[cfg(feature = "alpn")]
+        {
+            has_ssl = has_ssl || self.ssl.is_some();
+        }
+
+        #[cfg(feature = "rust-tls")]
+        {
+            has_ssl = has_ssl || self.rust_ssl.is_some();
+        }
 
         // run server in separate thread
         thread::spawn(move || {
-            let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
-            let local_addr = tcp.local_addr().unwrap();
+            let addr = TestServer::unused_addr();
 
             let sys = System::new("actix-test-server");
             let state = self.state;
-            let srv = HttpServer::new(move || {
+            let mut srv = HttpServer::new(move || {
                 let mut app = TestApp::new(state());
                 config(&mut app);
                 vec![app]
             }).workers(1)
                 .disable_signals();
 
-            tx.send((System::current(), local_addr, TestServer::get_conn()))
+            tx.send((System::current(), addr, TestServer::get_conn()))
                 .unwrap();
 
-            #[cfg(any(feature = "alpn", feature = "rust-tls"))]
+            #[cfg(feature = "alpn")]
             {
                 let ssl = self.ssl.take();
                 if let Some(ssl) = ssl {
-                    srv.listen_ssl(tcp, ssl).unwrap().start();
-                } else {
-                    srv.listen(tcp).start();
+                    let tcp = net::TcpListener::bind(addr).unwrap();
+                    srv = srv.listen_ssl(tcp, ssl).unwrap();
                 }
             }
-            #[cfg(not(any(feature = "alpn", feature = "rust-tls")))]
+            #[cfg(feature = "rust-tls")]
             {
-                srv.listen(tcp).start();
+                let ssl = self.rust_ssl.take();
+                if let Some(ssl) = ssl {
+                    let tcp = net::TcpListener::bind(addr).unwrap();
+                    srv = srv.listen_rustls(tcp, ssl).unwrap();
+                }
             }
+            if !has_ssl {
+                let tcp = net::TcpListener::bind(addr).unwrap();
+                srv = srv.listen(tcp);
+            }
+            srv.start();
+
             sys.run();
         });
 
@@ -336,8 +353,8 @@ impl<S: 'static> TestServerBuilder<S> {
         System::set_current(system);
         TestServer {
             addr,
-            ssl,
             conn,
+            ssl: has_ssl,
             rt: Runtime::new().unwrap(),
         }
     }
diff --git a/tests/test_server.rs b/tests/test_server.rs
index 82a318e5..3a825928 100644
--- a/tests/test_server.rs
+++ b/tests/test_server.rs
@@ -153,6 +153,62 @@ fn test_shutdown() {
     let _ = sys.stop();
 }
 
+#[test]
+#[cfg(unix)]
+fn test_panic() {
+    let _ = test::TestServer::unused_addr();
+    let (tx, rx) = mpsc::channel();
+
+    thread::spawn(|| {
+        System::run(move || {
+            let srv = server::new(|| {
+                App::new()
+                    .resource("/panic", |r| {
+                        r.method(http::Method::GET).f(|_| -> &'static str {
+                            panic!("error");
+                        });
+                    })
+                    .resource("/", |r| {
+                        r.method(http::Method::GET).f(|_| HttpResponse::Ok())
+                    })
+            }).workers(1);
+
+            let srv = srv.bind("127.0.0.1:0").unwrap();
+            let addr = srv.addrs()[0];
+            srv.start();
+            let _ = tx.send((addr, System::current()));
+        });
+    });
+    let (addr, sys) = rx.recv().unwrap();
+    System::set_current(sys.clone());
+
+    let mut rt = Runtime::new().unwrap();
+    {
+        let req = client::ClientRequest::get(format!("http://{}/panic", addr).as_str())
+            .finish()
+            .unwrap();
+        let response = rt.block_on(req.send());
+        assert!(response.is_err());
+    }
+
+    {
+        let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
+            .finish()
+            .unwrap();
+        let response = rt.block_on(req.send());
+        assert!(response.is_err());
+    }
+    {
+        let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
+            .finish()
+            .unwrap();
+        let response = rt.block_on(req.send()).unwrap();
+        assert!(response.status().is_success());
+    }
+
+    let _ = sys.stop();
+}
+
 #[test]
 fn test_simple() {
     let mut srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok()));
diff --git a/tests/test_ws.rs b/tests/test_ws.rs
index 1ed80bf7..94f38978 100644
--- a/tests/test_ws.rs
+++ b/tests/test_ws.rs
@@ -277,13 +277,12 @@ fn test_ws_server_ssl() {
 
 #[test]
 #[cfg(feature = "rust-tls")]
-fn test_ws_server_ssl() {
+fn test_ws_server_rust_tls() {
     extern crate rustls;
-    use rustls::{ServerConfig, NoClientAuth};
     use rustls::internal::pemfile::{certs, rsa_private_keys};
-    use std::io::BufReader;
-    use std::sync::Arc;
+    use rustls::{NoClientAuth, ServerConfig};
     use std::fs::File;
+    use std::io::BufReader;
 
     // load ssl keys
     let mut config = ServerConfig::new(NoClientAuth::new());
@@ -293,7 +292,7 @@ fn test_ws_server_ssl() {
     let mut keys = rsa_private_keys(key_file).unwrap();
     config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
 
-    let mut srv = test::TestServer::build().ssl(Arc::new(config)).start(|app| {
+    let mut srv = test::TestServer::build().rustls(config).start(|app| {
         app.handler(|req| {
             ws::start(
                 req,