rework accept new method. bubble up build error instead of panic

This commit is contained in:
fakeshadow 2021-03-31 19:37:33 +08:00
parent acaff5e225
commit 0b35f776fb
8 changed files with 72 additions and 91 deletions

View File

@ -16,7 +16,7 @@ use std::sync::{
use std::{env, io};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_server::ServerHandle;
use actix_service::pipeline_factory;
use bytes::BytesMut;
use futures_util::future::ok;
@ -36,7 +36,7 @@ async fn main() -> io::Result<()> {
// Bind socket address and start worker(s). By default, the server uses the number of available
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build()
ServerHandle::build()
.bind("echo", addr, move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);

View File

@ -5,11 +5,11 @@ use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server_handle::Server;
use crate::server_handle::ServerHandle;
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle};
use crate::Token;
use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle};
use crate::{ServerBuilder, Token};
const DUR_ON_ERR: Duration = Duration::from_millis(500);
@ -32,7 +32,7 @@ pub(crate) struct Accept {
poll: Poll,
waker_queue: WakerQueue,
handles: Vec<WorkerHandle>,
srv: Server,
srv: ServerHandle,
next: usize,
backpressure: bool,
// poll time duration.
@ -54,40 +54,42 @@ fn connection_error(e: &io::Error) -> bool {
}
impl Accept {
pub(crate) fn start<F>(
pub(crate) fn start(
sockets: Vec<(Token, MioListener)>,
server_handle: Server,
worker_factory: F,
) -> WakerQueue
where
F: FnOnce(&WakerQueue) -> Vec<WorkerHandle>,
{
builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandle)>)> {
let server_handle = ServerHandle::new(builder.cmd_tx.clone());
// construct poll instance and it's waker
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker_queue = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
let waker_clone = waker_queue.clone();
let poll = Poll::new()?;
let waker_queue = WakerQueue::new(poll.registry())?;
// construct workers and collect handles.
let handles = worker_factory(&waker_queue);
let (handles, handles_clone) = (0..builder.threads)
.map(|idx| {
// start workers
let availability = WorkerAvailability::new(waker_queue.clone());
let factories = builder.services.iter().map(|v| v.clone_factory()).collect();
let handle =
ServerWorker::start(idx, factories, availability, builder.worker_config);
let handle_clone = (idx, handle.clone());
(handle, handle_clone)
})
.unzip();
let wake_queue_clone = waker_queue.clone();
let (mut accept, sockets) =
Accept::new_with_sockets(poll, wake_queue_clone, sockets, handles, server_handle)?;
// Accept runs in its own thread.
thread::Builder::new()
.name("actix-server acceptor".to_owned())
.spawn(move || {
let (mut accept, sockets) = Accept::new_with_sockets(
poll,
waker_queue,
sockets,
handles,
server_handle,
);
accept.poll_with(sockets);
})
.spawn(move || accept.poll_with(sockets))
.unwrap();
// return waker to server builder.
waker_clone
// return waker and worker handle clones to server builder.
Ok((waker_queue, handles_clone))
}
fn new_with_sockets(
@ -95,8 +97,8 @@ impl Accept {
waker_queue: WakerQueue,
socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>,
srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) {
srv: ServerHandle,
) -> io::Result<(Accept, Slab<ServerSocketInfo>)> {
let mut sockets = Slab::new();
for (hnd_token, mut lst) in socks.into_iter() {
let addr = lst.local_addr();
@ -106,8 +108,7 @@ impl Accept {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
.register(&mut lst, MioToken(token), Interest::READABLE)?;
entry.insert(ServerSocketInfo {
addr,
@ -127,7 +128,7 @@ impl Accept {
timeout: None,
};
(accept, sockets)
Ok((accept, sockets))
}
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {

View File

@ -14,7 +14,7 @@ use tokio::sync::oneshot;
use crate::accept::Accept;
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server_handle::{Server, ServerCommand};
use crate::server_handle::{ServerCommand, ServerHandle};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
@ -25,16 +25,16 @@ use crate::Token;
/// Server builder
pub struct ServerBuilder {
threads: usize,
pub(super) threads: usize,
token: Token,
backlog: u32,
services: Vec<Box<dyn InternalServiceFactory>>,
pub(super) services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>,
exit: bool,
no_signals: bool,
cmd_tx: UnboundedSender<ServerCommand>,
pub(super) cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
worker_config: ServerWorkerConfig,
pub(super) worker_config: ServerWorkerConfig,
}
impl Default for ServerBuilder {
@ -264,7 +264,7 @@ impl ServerBuilder {
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> ServerFuture {
pub fn run(mut self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
@ -278,33 +278,10 @@ impl ServerBuilder {
})
.collect();
// collect worker handles on start.
let mut handles = Vec::new();
// start accept thread. return waker_queue for wake up it.
let waker_queue = Accept::start(
sockets,
Server::new(self.cmd_tx.clone()),
// closure for construct worker and return it's handler.
|waker| {
(0..self.threads)
.map(|idx| {
// start workers
let availability = WorkerAvailability::new(waker.clone());
let factories =
self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start(
idx,
factories,
availability,
self.worker_config,
);
handles.push((idx, handle.clone()));
handle
})
.collect()
},
);
// start accept thread. return waker_queue and worker handles.
let (waker_queue, handles) = Accept::start(sockets, &self)
// TODO: include error to Server type and poll return it in Future.
.unwrap_or_else(|e| panic!("Can not start Accept: {}", e));
// construct signals future.
let signals = if !self.no_signals {
@ -313,7 +290,7 @@ impl ServerBuilder {
None
};
ServerFuture {
Server {
cmd_tx: self.cmd_tx,
cmd_rx: self.cmd_rx,
handles,
@ -331,7 +308,7 @@ impl ServerBuilder {
/// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ServerFuture {
pub struct Server {
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>,
@ -344,12 +321,12 @@ pub struct ServerFuture {
waker_queue: WakerQueue,
}
impl ServerFuture {
impl Server {
/// Obtain a Handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](crate::server::ServerHandle) for usage.
pub fn handle(&self) -> Server {
Server::new(self.cmd_tx.clone())
pub fn handle(&self) -> ServerHandle {
ServerHandle::new(self.cmd_tx.clone())
}
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
@ -493,7 +470,7 @@ impl ServerFuture {
}
}
impl Future for ServerFuture {
impl Future for Server {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -15,9 +15,9 @@ mod test_server;
mod waker_queue;
mod worker;
pub use self::builder::{ServerBuilder, ServerFuture};
pub use self::builder::{Server, ServerBuilder};
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server_handle::Server;
pub use self::server_handle::ServerHandle;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;

View File

@ -25,14 +25,14 @@ pub(crate) enum ServerCommand {
}
#[derive(Debug)]
pub struct Server(
pub struct ServerHandle(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl Server {
impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None)
ServerHandle(tx, None)
}
/// Start server building process
@ -80,13 +80,13 @@ impl Server {
}
}
impl Clone for Server {
impl Clone for ServerHandle {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
impl Future for ServerHandle {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -3,7 +3,7 @@ use std::{net, thread};
use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory};
use crate::{ServerBuilder, ServerHandle, ServiceFactory};
/// The `TestServer` type.
///
@ -49,7 +49,10 @@ impl TestServer {
// run server in separate thread
thread::spawn(move || {
System::new().block_on(async {
let server = factory(Server::build()).workers(1).disable_signals().run();
let server = factory(ServerHandle::build())
.workers(1)
.disable_signals()
.run();
tx.send(System::current()).unwrap();
server.await
@ -76,7 +79,7 @@ impl TestServer {
let local_addr = tcp.local_addr().unwrap();
sys.block_on(async {
let server = Server::build()
let server = ServerHandle::build()
.listen("test", tcp, factory)
.unwrap()
.workers(1)

View File

@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::{mpsc, Arc};
use std::{net, thread, time};
use actix_server::Server;
use actix_server::ServerHandle;
use actix_service::fn_service;
use actix_utils::future::ok;
use futures_util::future::lazy;
@ -24,7 +24,7 @@ fn test_bind() {
let h = thread::spawn(move || {
let system = actix_rt::System::new();
system.block_on(async {
let server = Server::build()
let server = ServerHandle::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
@ -55,7 +55,7 @@ fn test_listen() {
let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async {
let server = Server::build()
let server = ServerHandle::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
@ -92,7 +92,7 @@ fn test_start() {
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
let server = ServerHandle::build()
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
@ -160,7 +160,7 @@ fn test_configure() {
let h = thread::spawn(move || {
let num = num2.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
let server = ServerHandle::build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();

View File

@ -30,7 +30,7 @@ use std::{
};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_server::ServerHandle;
use actix_service::pipeline_factory;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
@ -68,7 +68,7 @@ async fn main() -> io::Result<()> {
let addr = ("127.0.0.1", 8443);
info!("starting server on port: {}", &addr.0);
Server::build()
ServerHandle::build()
.bind("tls-example", addr, move || {
let count = Arc::clone(&count);