diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index db9d4d8b..01d4af09 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -40,4 +40,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1", features = ["io-util"] } +tokio = { version = "1", features = ["io-util", "macros", "rt-multi-thread"] } diff --git a/actix-server/examples/basic.rs b/actix-server/examples/basic.rs index 45e473a9..76918967 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/basic.rs @@ -16,14 +16,14 @@ use std::sync::{ use std::{env, io}; use actix_rt::net::TcpStream; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::pipeline_factory; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -#[actix_rt::main] +#[tokio::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "actix=trace,basic=trace"); env_logger::init(); @@ -36,7 +36,7 @@ async fn main() -> io::Result<()> { // Bind socket address and start worker(s). By default, the server uses the number of available // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. - Server::build() + ServerHandle::build() .bind("echo", addr, move || { let count = Arc::clone(&count); let num2 = Arc::clone(&count); diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 170ef1dc..b78ebf70 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,20 +1,18 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{io, thread}; -use actix_rt::{ - time::{sleep_until, Instant}, - System, -}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; -use crate::server::Server; +use crate::server_handle::ServerHandle; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandle}; use crate::Token; +const DUR_ON_ERR: Duration = Duration::from_millis(500); + struct ServerSocketInfo { // addr for socket. mainly used for logging. addr: SocketAddr, @@ -22,19 +20,21 @@ struct ServerSocketInfo { // mio::Token token: Token, lst: MioListener, - // timeout is used to mark the deadline when this socket's listener should be registered again - // after an error. - timeout: Option, + // mark the deadline when this socket's listener should be registered again + timeout_deadline: Option, } /// poll instance of the server. pub(crate) struct Accept { poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, handles: Vec, - srv: Server, + srv: ServerHandle, next: usize, backpressure: bool, + // poll time duration. + // use the smallest duration from sockets timeout_deadline. + timeout: Option, } /// This function defines errors that are per-connection. Which basically @@ -53,7 +53,7 @@ fn connection_error(e: &io::Error) -> bool { impl Accept { pub(crate) fn start( sockets: Vec<(Token, MioListener)>, - server: Server, + server_handle: ServerHandle, worker_factory: F, ) -> WakerQueue where @@ -61,22 +61,24 @@ impl Accept { { // construct poll instance and it's waker let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) + let waker_queue = WakerQueue::new(poll.registry()) .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - let waker_clone = waker.clone(); + let waker_clone = waker_queue.clone(); // construct workers and collect handles. - let handles = worker_factory(&waker); + let handles = worker_factory(&waker_queue); - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); + // Accept runs in its own thread. thread::Builder::new() - .name("actix-server accept loop".to_owned()) + .name("actix-server acceptor".to_owned()) .spawn(move || { - System::set_current(sys); - let (mut accept, sockets) = - Accept::new_with_sockets(poll, waker, sockets, handles, server); + let (mut accept, sockets) = Accept::new_with_sockets( + poll, + waker_queue, + sockets, + handles, + server_handle, + ); accept.poll_with(sockets); }) .unwrap(); @@ -87,10 +89,10 @@ impl Accept { fn new_with_sockets( poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, socks: Vec<(Token, MioListener)>, handles: Vec, - srv: Server, + srv: ServerHandle, ) -> (Accept, Slab) { let mut sockets = Slab::new(); for (hnd_token, mut lst) in socks.into_iter() { @@ -108,17 +110,18 @@ impl Accept { addr, token: hnd_token, lst, - timeout: None, + timeout_deadline: None, }); } let accept = Accept { poll, - waker, + waker_queue, handles, srv, next: 0, backpressure: false, + timeout: None, }; (accept, sockets) @@ -128,9 +131,11 @@ impl Accept { let mut events = mio::Events::with_capacity(128); loop { - if let Err(e) = self.poll.poll(&mut events, None) { + if let Err(e) = self.poll.poll(&mut events, self.timeout) { match e.kind() { std::io::ErrorKind::Interrupted => { + // check for timeout and re-register sockets. + self.process_timeout(&mut sockets); continue; } _ => { @@ -148,7 +153,7 @@ impl Accept { WAKER_TOKEN => 'waker: loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. we may want to recover // from backpressure. @@ -164,12 +169,6 @@ impl Accept { self.maybe_backpressure(&mut sockets, false); self.handles.push(handle); } - // got timer interest and it's time to try register socket(s) - // again. - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } Some(WakerInterest::Pause) => { drop(guard); sockets.iter_mut().for_each(|(_, info)| { @@ -208,21 +207,43 @@ impl Accept { } } } + + // check for timeout and re-register sockets. + self.process_timeout(&mut sockets); } } - fn process_timer(&self, sockets: &mut Slab) { - let now = Instant::now(); - sockets.iter_mut().for_each(|(token, info)| { - // only the ServerSocketInfo have an associate timeout value was de registered. - if let Some(inst) = info.timeout.take() { - if now > inst { - self.register_logged(token, info); - } else { - info.timeout = Some(inst); + fn process_timeout(&mut self, sockets: &mut Slab) { + // take old timeout as it's no use after each iteration. + if self.timeout.take().is_some() { + let now = Instant::now(); + sockets.iter_mut().for_each(|(token, info)| { + // only the ServerSocketInfo have an associate timeout value was de registered. + if let Some(inst) = info.timeout_deadline { + // timeout expired register socket again. + if now >= inst { + info.timeout_deadline = None; + self.register_logged(token, info); + } else { + // still timed out. try set new timeout. + let dur = inst - now; + self.set_timeout(dur); + } + } + }); + } + } + + // update Accept timeout duration. would keep the smallest duration. + fn set_timeout(&mut self, dur: Duration) { + match self.timeout { + Some(timeout) => { + if timeout > dur { + self.timeout = Some(dur); } } - }); + None => self.timeout = Some(dur), + } } #[cfg(not(target_os = "windows"))] @@ -272,7 +293,7 @@ impl Accept { if !on { self.backpressure = false; for (token, info) in sockets.iter_mut() { - if info.timeout.is_some() { + if info.timeout_deadline.is_some() { // socket will attempt to re-register itself when its timeout completes continue; } @@ -370,17 +391,11 @@ impl Accept { error!("Can not deregister server socket {}", err); } - // sleep after error. write the timeout to socket info as later the poll - // would need it mark which socket and when it's listener should be - // registered. - info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep_until(Instant::now() + Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + // sleep after error. write the timeout deadline to socket info + // as later the poll would need it mark which socket and when + // it's listener should be registered again. + info.timeout_deadline = Some(Instant::now() + DUR_ON_ERR); + self.set_timeout(DUR_ON_ERR); return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 429be852..29f6d7b7 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -6,15 +6,15 @@ use std::{io, mem}; use actix_rt::net::TcpStream; use actix_rt::time::{sleep_until, Instant}; -use actix_rt::{self as rt, System}; +use actix_rt::System; use futures_core::future::BoxFuture; use log::{error, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use crate::accept::Accept; use crate::config::{ConfiguredService, ServiceConfig}; -use crate::server::{Server, ServerCommand}; +use crate::server_handle::{ServerCommand, ServerHandle}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; @@ -32,8 +32,8 @@ pub struct ServerBuilder { sockets: Vec<(Token, String, MioListener)>, exit: bool, no_signals: bool, - cmd: UnboundedReceiver, - server: Server, + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, worker_config: ServerWorkerConfig, } @@ -47,8 +47,6 @@ impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { let (tx, rx) = unbounded_channel(); - let server = Server::new(tx); - ServerBuilder { threads: num_cpus::get(), token: Token::default(), @@ -57,8 +55,8 @@ impl ServerBuilder { backlog: 2048, exit: false, no_signals: false, - cmd: rx, - server, + cmd_tx: tx, + cmd_rx: rx, worker_config: ServerWorkerConfig::default(), } } @@ -267,7 +265,7 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { + pub fn run(mut self) -> ServerFuture { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { @@ -287,7 +285,7 @@ impl ServerBuilder { // start accept thread. return waker_queue for wake up it. let waker_queue = Accept::start( sockets, - self.server.clone(), + ServerHandle::new(self.cmd_tx.clone()), // closure for construct worker and return it's handler. |waker| { (0..self.threads) @@ -316,8 +314,9 @@ impl ServerBuilder { None }; - let server_future = ServerFuture { - cmd: self.cmd, + ServerFuture { + cmd_tx: self.cmd_tx, + cmd_rx: self.cmd_rx, handles, services: self.services, notify: Vec::new(), @@ -326,19 +325,16 @@ impl ServerBuilder { signals, on_stop_task: None, waker_queue, - }; - - // spawn server future. - rt::spawn(server_future); - - self.server + } } } } -/// `ServerFuture` when awaited or spawned would listen to signal and message from `Server`. -struct ServerFuture { - cmd: UnboundedReceiver, +/// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle). +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ServerFuture { + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, handles: Vec<(usize, WorkerHandle)>, services: Vec>, notify: Vec>, @@ -350,6 +346,13 @@ struct ServerFuture { } impl ServerFuture { + /// Obtain a Handle for ServerFuture that can be used to change state of actix server. + /// + /// See [ServerHandle](crate::server::ServerHandle) for usage. + pub fn handle(&self) -> ServerHandle { + ServerHandle::new(self.cmd_tx.clone()) + } + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { @@ -415,6 +418,7 @@ impl ServerFuture { .map(move |worker| worker.1.stop(graceful)) .collect::>(); + // TODO: this async block can return io::Error. Some(Box::pin(async move { for handle in iter { let _ = handle.await; @@ -433,6 +437,7 @@ impl ServerFuture { } else { // we need to stop system if server was spawned let exit = self.exit; + // TODO: this async block can return io::Error. Some(Box::pin(async move { if exit { sleep_until(Instant::now() + Duration::from_millis(300)).await; @@ -490,7 +495,7 @@ impl ServerFuture { } impl Future for ServerFuture { - type Output = (); + type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().get_mut(); @@ -509,10 +514,10 @@ impl Future for ServerFuture { loop { // got on stop task. resolve it exclusively and exit. if let Some(ref mut fut) = this.on_stop_task { - return fut.as_mut().poll(cx); + return fut.as_mut().poll(cx).map(|_| Ok(())); } - match Pin::new(&mut this.cmd).poll_recv(cx) { + match Pin::new(&mut this.cmd_rx).poll_recv(cx) { Poll::Ready(Some(it)) => { this.on_stop_task = this.handle_cmd(it); } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 24129b5a..d9c1eeb2 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,7 +7,7 @@ mod accept; mod builder; mod config; -mod server; +mod server_handle; mod service; mod signals; mod socket; @@ -17,7 +17,7 @@ mod worker; pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; -pub use self::server::Server; +pub use self::server_handle::ServerHandle; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server.rs b/actix-server/src/server_handle.rs similarity index 95% rename from actix-server/src/server.rs rename to actix-server/src/server_handle.rs index 74228b3f..679dc1a1 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server_handle.rs @@ -25,14 +25,14 @@ pub(crate) enum ServerCommand { } #[derive(Debug)] -pub struct Server( +pub struct ServerHandle( UnboundedSender, Option>, ); -impl Server { +impl ServerHandle { pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx, None) + ServerHandle(tx, None) } /// Start server building process @@ -80,13 +80,13 @@ impl Server { } } -impl Clone for Server { +impl Clone for ServerHandle { fn clone(&self) -> Self { Self(self.0.clone(), None) } } -impl Future for Server { +impl Future for ServerHandle { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 864f391c..996faa2a 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -3,7 +3,7 @@ use std::{net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{ServerBuilder, ServerHandle, ServiceFactory}; /// The `TestServer` type. /// @@ -49,7 +49,15 @@ impl TestServer { // run server in separate thread thread::spawn(move || { let sys = System::new(); - factory(Server::build()).workers(1).disable_signals().run(); + sys.block_on(async { + actix_rt::spawn(async move { + let _ = factory(ServerHandle::build()) + .workers(1) + .disable_signals() + .run() + .await; + }) + }); tx.send(System::current()).unwrap(); sys.run() @@ -75,12 +83,15 @@ impl TestServer { let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - Server::build() - .listen("test", tcp, factory) - .unwrap() - .workers(1) - .disable_signals() - .run(); + actix_rt::spawn(async move { + let _ = ServerHandle::build() + .listen("test", tcp, factory) + .unwrap() + .workers(1) + .disable_signals() + .run() + .await; + }); tx.send((System::current(), local_addr)).unwrap(); }); sys.run() diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index f92363b5..6b103689 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,10 +78,6 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// `WorkerHandle`. diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index defc7306..aad4c7a3 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; @@ -198,17 +197,11 @@ impl ServerWorker { let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); + availability.set(false); + // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }) - .spawn(async move { - availability.set(false); + std::thread::spawn(move || { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, @@ -219,7 +212,6 @@ impl ServerWorker { conns: conns.clone(), state: WorkerState::Unavailable, }); - let fut = wrk .factories .iter() @@ -234,30 +226,35 @@ impl ServerWorker { }) .collect::>(); - // a second spawn to make sure worker future runs as non boxed future. - // As Arbiter::spawn would box the future before send it to arbiter. - spawn(async move { - let res: Result, _> = join_all(fut).await.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + .block_on(tokio::task::LocalSet::new().run_until(async move { + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + } } } + Err(e) => { + error!("Can not start worker: {:?}", e); + } } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } - } - wrk.await - }); + wrk.await + })) }); WorkerHandle::new(idx, tx1, tx2, avail) @@ -424,7 +421,6 @@ impl Future for ServerWorker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::current().stop(); return Poll::Ready(()); } @@ -432,7 +428,6 @@ impl Future for ServerWorker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::current().stop(); return Poll::Ready(()); } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 86ec25e6..23e4635c 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::fn_service; use futures_util::future::{lazy, ok}; @@ -22,18 +22,21 @@ fn test_bind() { let h = thread::spawn(move || { let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .workers(1) - .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run() - })); - let _ = tx.send((srv, actix_rt::System::current())); + sys.block_on(async { + actix_rt::spawn(async move { + let _ = ServerHandle::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .run() + .await; + }); + }); + let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); - let (_, sys) = rx.recv().unwrap(); + let sys = rx.recv().unwrap(); thread::sleep(time::Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); @@ -50,14 +53,17 @@ fn test_listen() { let sys = actix_rt::System::new(); let lst = net::TcpListener::bind(addr).unwrap(); sys.block_on(async { - Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run(); - let _ = tx.send(actix_rt::System::current()); + actix_rt::spawn(async move { + let _ = ServerHandle::build() + .disable_signals() + .workers(1) + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .run() + .await; + }); }); + let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); let sys = rx.recv().unwrap(); @@ -81,9 +87,8 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let server = ServerHandle::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -94,11 +99,11 @@ fn test_start() { }) }) .unwrap() - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + .run(); + let handle = server.handle(); + let _ = tx.send((handle, actix_rt::System::current())); + let _ = server.await; + }); }); let (srv, sys) = rx.recv().unwrap(); @@ -150,9 +155,8 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let server = ServerHandle::build() .disable_signals() .configure(move |cfg| { let num = num.clone(); @@ -173,18 +177,21 @@ fn test_configure() { }) .unwrap() .workers(1) - .run() - })); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + .run(); + + let handle = server.handle(); + let _ = tx.send((handle, actix_rt::System::current())); + let _ = server.await; + }); }); - let (_, sys) = rx.recv().unwrap(); + let (server, sys) = rx.recv().unwrap(); thread::sleep(time::Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok()); assert_eq!(num.load(Relaxed), 1); + let _ = server.stop(true); sys.stop(); let _ = h.join(); } diff --git a/actix-tls/examples/basic.rs b/actix-tls/examples/basic.rs index d1762b08..0f02cf38 100644 --- a/actix-tls/examples/basic.rs +++ b/actix-tls/examples/basic.rs @@ -29,7 +29,7 @@ use std::{ }, }; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::pipeline_factory; use actix_tls::accept::rustls::Acceptor as RustlsAcceptor; use futures_util::future::ok; @@ -67,7 +67,7 @@ async fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8443); info!("starting server on port: {}", &addr.0); - Server::build() + ServerHandle::build() .bind("tls-example", addr, move || { let count = Arc::clone(&count); diff --git a/actix-tls/src/connect/ssl/rustls.rs b/actix-tls/src/connect/ssl/rustls.rs index 811c7adc..46b4b11d 100755 --- a/actix-tls/src/connect/ssl/rustls.rs +++ b/actix-tls/src/connect/ssl/rustls.rs @@ -14,8 +14,8 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use futures_core::{future::LocalBoxFuture, ready}; use log::trace; -use tokio_rustls::{Connect, TlsConnector}; use tokio_rustls::webpki::DNSNameRef; +use tokio_rustls::{Connect, TlsConnector}; use crate::connect::{Address, Connection};