rename Server to ServerHandle and do not return it directly

This commit is contained in:
fakeshadow 2021-02-05 04:58:40 -08:00
parent 6b832b293c
commit b12b897e1d
12 changed files with 204 additions and 175 deletions

View File

@ -40,4 +40,4 @@ actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.8" env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util"] } tokio = { version = "1", features = ["io-util", "macros", "rt-multi-thread"] }

View File

@ -16,14 +16,14 @@ use std::sync::{
use std::{env, io}; use std::{env, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::ServerHandle;
use actix_service::pipeline_factory; use actix_service::pipeline_factory;
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok; use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main] #[tokio::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "actix=trace,basic=trace"); env::set_var("RUST_LOG", "actix=trace,basic=trace");
env_logger::init(); env_logger::init();
@ -36,7 +36,7 @@ async fn main() -> io::Result<()> {
// Bind socket address and start worker(s). By default, the server uses the number of available // Bind socket address and start worker(s). By default, the server uses the number of available
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs // logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker. // to return a service *factory*; so it can be created once per worker.
Server::build() ServerHandle::build()
.bind("echo", addr, move || { .bind("echo", addr, move || {
let count = Arc::clone(&count); let count = Arc::clone(&count);
let num2 = Arc::clone(&count); let num2 = Arc::clone(&count);

View File

@ -1,20 +1,18 @@
use std::time::Duration; use std::time::{Duration, Instant};
use std::{io, thread}; use std::{io, thread};
use actix_rt::{
time::{sleep_until, Instant},
System,
};
use log::{error, info}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use slab::Slab; use slab::Slab;
use crate::server::Server; use crate::server_handle::ServerHandle;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle}; use crate::worker::{Conn, WorkerHandle};
use crate::Token; use crate::Token;
const DUR_ON_ERR: Duration = Duration::from_millis(500);
struct ServerSocketInfo { struct ServerSocketInfo {
// addr for socket. mainly used for logging. // addr for socket. mainly used for logging.
addr: SocketAddr, addr: SocketAddr,
@ -22,19 +20,21 @@ struct ServerSocketInfo {
// mio::Token // mio::Token
token: Token, token: Token,
lst: MioListener, lst: MioListener,
// timeout is used to mark the deadline when this socket's listener should be registered again // mark the deadline when this socket's listener should be registered again
// after an error. timeout_deadline: Option<Instant>,
timeout: Option<Instant>,
} }
/// poll instance of the server. /// poll instance of the server.
pub(crate) struct Accept { pub(crate) struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker_queue: WakerQueue,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
srv: Server, srv: ServerHandle,
next: usize, next: usize,
backpressure: bool, backpressure: bool,
// poll time duration.
// use the smallest duration from sockets timeout_deadline.
timeout: Option<Duration>,
} }
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
@ -53,7 +53,7 @@ fn connection_error(e: &io::Error) -> bool {
impl Accept { impl Accept {
pub(crate) fn start<F>( pub(crate) fn start<F>(
sockets: Vec<(Token, MioListener)>, sockets: Vec<(Token, MioListener)>,
server: Server, server_handle: ServerHandle,
worker_factory: F, worker_factory: F,
) -> WakerQueue ) -> WakerQueue
where where
@ -61,22 +61,24 @@ impl Accept {
{ {
// construct poll instance and it's waker // construct poll instance and it's waker
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry()) let waker_queue = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
let waker_clone = waker.clone(); let waker_clone = waker_queue.clone();
// construct workers and collect handles. // construct workers and collect handles.
let handles = worker_factory(&waker); let handles = worker_factory(&waker_queue);
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread.
// actix system.
let sys = System::current();
thread::Builder::new() thread::Builder::new()
.name("actix-server accept loop".to_owned()) .name("actix-server acceptor".to_owned())
.spawn(move || { .spawn(move || {
System::set_current(sys); let (mut accept, sockets) = Accept::new_with_sockets(
let (mut accept, sockets) = poll,
Accept::new_with_sockets(poll, waker, sockets, handles, server); waker_queue,
sockets,
handles,
server_handle,
);
accept.poll_with(sockets); accept.poll_with(sockets);
}) })
.unwrap(); .unwrap();
@ -87,10 +89,10 @@ impl Accept {
fn new_with_sockets( fn new_with_sockets(
poll: Poll, poll: Poll,
waker: WakerQueue, waker_queue: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
srv: Server, srv: ServerHandle,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new(); let mut sockets = Slab::new();
for (hnd_token, mut lst) in socks.into_iter() { for (hnd_token, mut lst) in socks.into_iter() {
@ -108,17 +110,18 @@ impl Accept {
addr, addr,
token: hnd_token, token: hnd_token,
lst, lst,
timeout: None, timeout_deadline: None,
}); });
} }
let accept = Accept { let accept = Accept {
poll, poll,
waker, waker_queue,
handles, handles,
srv, srv,
next: 0, next: 0,
backpressure: false, backpressure: false,
timeout: None,
}; };
(accept, sockets) (accept, sockets)
@ -128,9 +131,11 @@ impl Accept {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, self.timeout) {
match e.kind() { match e.kind() {
std::io::ErrorKind::Interrupted => { std::io::ErrorKind::Interrupted => {
// check for timeout and re-register sockets.
self.process_timeout(&mut sockets);
continue; continue;
} }
_ => { _ => {
@ -148,7 +153,7 @@ impl Accept {
WAKER_TOKEN => 'waker: loop { WAKER_TOKEN => 'waker: loop {
// take guard with every iteration so no new interest can be added // take guard with every iteration so no new interest can be added
// until the current task is done. // until the current task is done.
let mut guard = self.waker.guard(); let mut guard = self.waker_queue.guard();
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. we may want to recover // worker notify it becomes available. we may want to recover
// from backpressure. // from backpressure.
@ -164,12 +169,6 @@ impl Accept {
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s)
// again.
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => { Some(WakerInterest::Pause) => {
drop(guard); drop(guard);
sockets.iter_mut().for_each(|(_, info)| { sockets.iter_mut().for_each(|(_, info)| {
@ -208,22 +207,44 @@ impl Accept {
} }
} }
} }
// check for timeout and re-register sockets.
self.process_timeout(&mut sockets);
} }
} }
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) { fn process_timeout(&mut self, sockets: &mut Slab<ServerSocketInfo>) {
// take old timeout as it's no use after each iteration.
if self.timeout.take().is_some() {
let now = Instant::now(); let now = Instant::now();
sockets.iter_mut().for_each(|(token, info)| { sockets.iter_mut().for_each(|(token, info)| {
// only the ServerSocketInfo have an associate timeout value was de registered. // only the ServerSocketInfo have an associate timeout value was de registered.
if let Some(inst) = info.timeout.take() { if let Some(inst) = info.timeout_deadline {
if now > inst { // timeout expired register socket again.
if now >= inst {
info.timeout_deadline = None;
self.register_logged(token, info); self.register_logged(token, info);
} else { } else {
info.timeout = Some(inst); // still timed out. try set new timeout.
let dur = inst - now;
self.set_timeout(dur);
} }
} }
}); });
} }
}
// update Accept timeout duration. would keep the smallest duration.
fn set_timeout(&mut self, dur: Duration) {
match self.timeout {
Some(timeout) => {
if timeout > dur {
self.timeout = Some(dur);
}
}
None => self.timeout = Some(dur),
}
}
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
@ -272,7 +293,7 @@ impl Accept {
if !on { if !on {
self.backpressure = false; self.backpressure = false;
for (token, info) in sockets.iter_mut() { for (token, info) in sockets.iter_mut() {
if info.timeout.is_some() { if info.timeout_deadline.is_some() {
// socket will attempt to re-register itself when its timeout completes // socket will attempt to re-register itself when its timeout completes
continue; continue;
} }
@ -370,17 +391,11 @@ impl Accept {
error!("Can not deregister server socket {}", err); error!("Can not deregister server socket {}", err);
} }
// sleep after error. write the timeout to socket info as later the poll // sleep after error. write the timeout deadline to socket info
// would need it mark which socket and when it's listener should be // as later the poll would need it mark which socket and when
// registered. // it's listener should be registered again.
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout_deadline = Some(Instant::now() + DUR_ON_ERR);
self.set_timeout(DUR_ON_ERR);
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return; return;
} }

View File

@ -6,15 +6,15 @@ use std::{io, mem};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_rt::time::{sleep_until, Instant}; use actix_rt::time::{sleep_until, Instant};
use actix_rt::{self as rt, System}; use actix_rt::System;
use futures_core::future::BoxFuture; use futures_core::future::BoxFuture;
use log::{error, info}; use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::accept::Accept; use crate::accept::Accept;
use crate::config::{ConfiguredService, ServiceConfig}; use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand}; use crate::server_handle::{ServerCommand, ServerHandle};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
@ -32,8 +32,8 @@ pub struct ServerBuilder {
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
exit: bool, exit: bool,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd_tx: UnboundedSender<ServerCommand>,
server: Server, cmd_rx: UnboundedReceiver<ServerCommand>,
worker_config: ServerWorkerConfig, worker_config: ServerWorkerConfig,
} }
@ -47,8 +47,6 @@ impl ServerBuilder {
/// Create new Server builder instance /// Create new Server builder instance
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let server = Server::new(tx);
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token::default(), token: Token::default(),
@ -57,8 +55,8 @@ impl ServerBuilder {
backlog: 2048, backlog: 2048,
exit: false, exit: false,
no_signals: false, no_signals: false,
cmd: rx, cmd_tx: tx,
server, cmd_rx: rx,
worker_config: ServerWorkerConfig::default(), worker_config: ServerWorkerConfig::default(),
} }
} }
@ -267,7 +265,7 @@ impl ServerBuilder {
} }
/// Starts processing incoming connections and return server controller. /// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server { pub fn run(mut self) -> ServerFuture {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Server should have at least one bound socket"); panic!("Server should have at least one bound socket");
} else { } else {
@ -287,7 +285,7 @@ impl ServerBuilder {
// start accept thread. return waker_queue for wake up it. // start accept thread. return waker_queue for wake up it.
let waker_queue = Accept::start( let waker_queue = Accept::start(
sockets, sockets,
self.server.clone(), ServerHandle::new(self.cmd_tx.clone()),
// closure for construct worker and return it's handler. // closure for construct worker and return it's handler.
|waker| { |waker| {
(0..self.threads) (0..self.threads)
@ -316,8 +314,9 @@ impl ServerBuilder {
None None
}; };
let server_future = ServerFuture { ServerFuture {
cmd: self.cmd, cmd_tx: self.cmd_tx,
cmd_rx: self.cmd_rx,
handles, handles,
services: self.services, services: self.services,
notify: Vec::new(), notify: Vec::new(),
@ -326,19 +325,16 @@ impl ServerBuilder {
signals, signals,
on_stop_task: None, on_stop_task: None,
waker_queue, waker_queue,
}; }
// spawn server future.
rt::spawn(server_future);
self.server
} }
} }
} }
/// `ServerFuture` when awaited or spawned would listen to signal and message from `Server`. /// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle).
struct ServerFuture { #[must_use = "futures do nothing unless you `.await` or poll them"]
cmd: UnboundedReceiver<ServerCommand>, pub struct ServerFuture {
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>, handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>, notify: Vec<oneshot::Sender<()>>,
@ -350,6 +346,13 @@ struct ServerFuture {
} }
impl ServerFuture { impl ServerFuture {
/// Obtain a Handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](crate::server::ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
ServerHandle::new(self.cmd_tx.clone())
}
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> { fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
@ -415,6 +418,7 @@ impl ServerFuture {
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// TODO: this async block can return io::Error.
Some(Box::pin(async move { Some(Box::pin(async move {
for handle in iter { for handle in iter {
let _ = handle.await; let _ = handle.await;
@ -433,6 +437,7 @@ impl ServerFuture {
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
let exit = self.exit; let exit = self.exit;
// TODO: this async block can return io::Error.
Some(Box::pin(async move { Some(Box::pin(async move {
if exit { if exit {
sleep_until(Instant::now() + Duration::from_millis(300)).await; sleep_until(Instant::now() + Duration::from_millis(300)).await;
@ -490,7 +495,7 @@ impl ServerFuture {
} }
impl Future for ServerFuture { impl Future for ServerFuture {
type Output = (); type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
@ -509,10 +514,10 @@ impl Future for ServerFuture {
loop { loop {
// got on stop task. resolve it exclusively and exit. // got on stop task. resolve it exclusively and exit.
if let Some(ref mut fut) = this.on_stop_task { if let Some(ref mut fut) = this.on_stop_task {
return fut.as_mut().poll(cx); return fut.as_mut().poll(cx).map(|_| Ok(()));
} }
match Pin::new(&mut this.cmd).poll_recv(cx) { match Pin::new(&mut this.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(it)) => { Poll::Ready(Some(it)) => {
this.on_stop_task = this.handle_cmd(it); this.on_stop_task = this.handle_cmd(it);
} }

View File

@ -7,7 +7,7 @@
mod accept; mod accept;
mod builder; mod builder;
mod config; mod config;
mod server; mod server_handle;
mod service; mod service;
mod signals; mod signals;
mod socket; mod socket;
@ -17,7 +17,7 @@ mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server_handle::ServerHandle;
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;

View File

@ -25,14 +25,14 @@ pub(crate) enum ServerCommand {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Server( pub struct ServerHandle(
UnboundedSender<ServerCommand>, UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>, Option<oneshot::Receiver<()>>,
); );
impl Server { impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self { pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None) ServerHandle(tx, None)
} }
/// Start server building process /// Start server building process
@ -80,13 +80,13 @@ impl Server {
} }
} }
impl Clone for Server { impl Clone for ServerHandle {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone(), None) Self(self.0.clone(), None)
} }
} }
impl Future for Server { impl Future for ServerHandle {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -3,7 +3,7 @@ use std::{net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory}; use crate::{ServerBuilder, ServerHandle, ServiceFactory};
/// The `TestServer` type. /// The `TestServer` type.
/// ///
@ -49,7 +49,15 @@ impl TestServer {
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let sys = System::new(); let sys = System::new();
factory(Server::build()).workers(1).disable_signals().run(); sys.block_on(async {
actix_rt::spawn(async move {
let _ = factory(ServerHandle::build())
.workers(1)
.disable_signals()
.run()
.await;
})
});
tx.send(System::current()).unwrap(); tx.send(System::current()).unwrap();
sys.run() sys.run()
@ -75,12 +83,15 @@ impl TestServer {
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
sys.block_on(async { sys.block_on(async {
Server::build() actix_rt::spawn(async move {
let _ = ServerHandle::build()
.listen("test", tcp, factory) .listen("test", tcp, factory)
.unwrap() .unwrap()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.run(); .run()
.await;
});
tx.send((System::current(), local_addr)).unwrap(); tx.send((System::current(), local_addr)).unwrap();
}); });
sys.run() sys.run()

View File

@ -78,10 +78,6 @@ pub(crate) enum WakerInterest {
Pause, Pause,
Resume, Resume,
Stop, Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`. /// `WorkerHandle`.

View File

@ -6,7 +6,6 @@ use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::{error, info, trace}; use log::{error, info, trace};
@ -198,17 +197,11 @@ impl ServerWorker {
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
let avail = availability.clone(); let avail = availability.clone();
availability.set(false);
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime. // use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || { std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx, rx,
rx2, rx2,
@ -219,7 +212,6 @@ impl ServerWorker {
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable, state: WorkerState::Unavailable,
}); });
let fut = wrk let fut = wrk
.factories .factories
.iter() .iter()
@ -234,10 +226,16 @@ impl ServerWorker {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future. tokio::runtime::Builder::new_current_thread()
// As Arbiter::spawn would box the future before send it to arbiter. .enable_all()
spawn(async move { .max_blocking_threads(config.max_blocking_threads)
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect(); .build()
.unwrap()
.block_on(tokio::task::LocalSet::new().run_until(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
match res { match res {
Ok(services) => { Ok(services) => {
for item in services { for item in services {
@ -253,11 +251,10 @@ impl ServerWorker {
} }
Err(e) => { Err(e) => {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
} }
} }
wrk.await wrk.await
}); }))
}); });
WorkerHandle::new(idx, tx1, tx2, avail) WorkerHandle::new(idx, tx1, tx2, avail)
@ -424,7 +421,6 @@ impl Future for ServerWorker {
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
let _ = tx.take().unwrap().send(true); let _ = tx.take().unwrap().send(true);
Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -432,7 +428,6 @@ impl Future for ServerWorker {
if Pin::new(t2).poll(cx).is_ready() { if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); self.shutdown(true);
Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }

View File

@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{net, thread, time}; use std::{net, thread, time};
use actix_server::Server; use actix_server::ServerHandle;
use actix_service::fn_service; use actix_service::fn_service;
use futures_util::future::{lazy, ok}; use futures_util::future::{lazy, ok};
@ -22,18 +22,21 @@ fn test_bind() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new(); let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| { sys.block_on(async {
Server::build() actix_rt::spawn(async move {
let _ = ServerHandle::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.run() .run()
})); .await;
let _ = tx.send((srv, actix_rt::System::current())); });
});
let _ = tx.send(actix_rt::System::current());
let _ = sys.run(); let _ = sys.run();
}); });
let (_, sys) = rx.recv().unwrap(); let sys = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
@ -50,14 +53,17 @@ fn test_listen() {
let sys = actix_rt::System::new(); let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async { sys.block_on(async {
Server::build() actix_rt::spawn(async move {
let _ = ServerHandle::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.run(); .run()
let _ = tx.send(actix_rt::System::current()); .await;
}); });
});
let _ = tx.send(actix_rt::System::current());
let _ = sys.run(); let _ = sys.run();
}); });
let sys = rx.recv().unwrap(); let sys = rx.recv().unwrap();
@ -81,9 +87,8 @@ fn test_start() {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new(); actix_rt::System::new().block_on(async {
let srv = sys.block_on(lazy(|_| { let server = ServerHandle::build()
Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move || {
@ -94,11 +99,11 @@ fn test_start() {
}) })
}) })
.unwrap() .unwrap()
.run() .run();
})); let handle = server.handle();
let _ = tx.send((handle, actix_rt::System::current()));
let _ = tx.send((srv, actix_rt::System::current())); let _ = server.await;
let _ = sys.run(); });
}); });
let (srv, sys) = rx.recv().unwrap(); let (srv, sys) = rx.recv().unwrap();
@ -150,9 +155,8 @@ fn test_configure() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let sys = actix_rt::System::new(); actix_rt::System::new().block_on(async {
let srv = sys.block_on(lazy(|_| { let server = ServerHandle::build()
Server::build()
.disable_signals() .disable_signals()
.configure(move |cfg| { .configure(move |cfg| {
let num = num.clone(); let num = num.clone();
@ -173,18 +177,21 @@ fn test_configure() {
}) })
.unwrap() .unwrap()
.workers(1) .workers(1)
.run() .run();
}));
let _ = tx.send((srv, actix_rt::System::current())); let handle = server.handle();
let _ = sys.run(); let _ = tx.send((handle, actix_rt::System::current()));
let _ = server.await;
}); });
let (_, sys) = rx.recv().unwrap(); });
let (server, sys) = rx.recv().unwrap();
thread::sleep(time::Duration::from_millis(500)); thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Relaxed), 1); assert_eq!(num.load(Relaxed), 1);
let _ = server.stop(true);
sys.stop(); sys.stop();
let _ = h.join(); let _ = h.join();
} }

View File

@ -29,7 +29,7 @@ use std::{
}, },
}; };
use actix_server::Server; use actix_server::ServerHandle;
use actix_service::pipeline_factory; use actix_service::pipeline_factory;
use actix_tls::accept::rustls::Acceptor as RustlsAcceptor; use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
use futures_util::future::ok; use futures_util::future::ok;
@ -67,7 +67,7 @@ async fn main() -> io::Result<()> {
let addr = ("127.0.0.1", 8443); let addr = ("127.0.0.1", 8443);
info!("starting server on port: {}", &addr.0); info!("starting server on port: {}", &addr.0);
Server::build() ServerHandle::build()
.bind("tls-example", addr, move || { .bind("tls-example", addr, move || {
let count = Arc::clone(&count); let count = Arc::clone(&count);

View File

@ -14,8 +14,8 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::trace; use log::trace;
use tokio_rustls::{Connect, TlsConnector};
use tokio_rustls::webpki::DNSNameRef; use tokio_rustls::webpki::DNSNameRef;
use tokio_rustls::{Connect, TlsConnector};
use crate::connect::{Address, Connection}; use crate::connect::{Address, Connection};