merge master

This commit is contained in:
fakeshadow 2021-04-28 20:23:38 +08:00
commit 32dcccf2b1
8 changed files with 38 additions and 195 deletions

View File

@ -1,6 +1,10 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Remove `ServerBuilder::configure` [#349]
[#349]: https://github.com/actix/actix-net/pull/349
## 2.0.0-beta.5 - 2021-04-20 ## 2.0.0-beta.5 - 2021-04-20

View File

@ -9,14 +9,11 @@ use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
struct ServerSocketInfo { struct ServerSocketInfo {
/// Address of socket. Mainly used for logging.
addr: SocketAddr,
token: usize, token: usize,
lst: MioListener, lst: MioListener,
@ -189,15 +186,12 @@ impl Accept {
let sockets = socks let sockets = socks
.into_iter() .into_iter()
.map(|(token, mut lst)| { .map(|(token, mut lst)| {
let addr = lst.local_addr();
// Start listening for incoming connections // Start listening for incoming connections
poll.registry() poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE) .register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e)); .unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo { ServerSocketInfo {
addr,
token, token,
lst, lst,
timeout: None, timeout: None,
@ -370,14 +364,14 @@ impl Accept {
fn register_logged(&self, info: &mut ServerSocketInfo) { fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) { match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr), Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e), Err(e) => error!("Can not register server socket {}", e),
} }
} }
fn deregister_logged(&self, info: &mut ServerSocketInfo) { fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) { match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.addr), Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => { Err(e) => {
error!("Can not deregister server socket {}", e) error!("Can not deregister server socket {}", e)
} }

View File

@ -14,7 +14,6 @@ use tokio::sync::{
}; };
use crate::accept::AcceptLoop; use crate::accept::AcceptLoop;
use crate::config::{ConfiguredService, ServiceConfig};
use crate::join_all; use crate::join_all;
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
@ -148,32 +147,6 @@ impl ServerBuilder {
self self
} }
/// Execute external configuration as part of the server building process.
///
/// This function is useful for moving parts of configuration to a different module or
/// even library.
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
where
F: Fn(&mut ServiceConfig) -> io::Result<()>,
{
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
f(&mut cfg)?;
if let Some(apply) = cfg.apply {
let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services {
let token = self.next_token();
srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, MioListener::Tcp(lst)));
}
self.services.push(Box::new(srv));
}
self.threads = cfg.threads;
Ok(self)
}
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where

View File

@ -6,7 +6,6 @@
mod accept; mod accept;
mod builder; mod builder;
mod config;
mod server; mod server;
mod service; mod service;
mod signals; mod signals;
@ -16,7 +15,6 @@ mod waker_queue;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;

View File

@ -21,7 +21,7 @@ pub(crate) trait InternalServiceFactory: Send {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>>; fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
@ -128,14 +128,14 @@ where
}) })
} }
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token; let token = self.token;
let fut = self.inner.create().new_service(()); let fut = self.inner.create().new_service(());
Box::pin(async move { Box::pin(async move {
match fut.await { match fut.await {
Ok(inner) => { Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _; let service = Box::new(StreamService::new(inner)) as _;
Ok(vec![(token, service)]) Ok((token, service))
} }
Err(_) => Err(()), Err(_) => Err(()),
} }

View File

@ -23,9 +23,15 @@ pub(crate) enum MioListener {
impl MioListener { impl MioListener {
pub(crate) fn local_addr(&self) -> SocketAddr { pub(crate) fn local_addr(&self) -> SocketAddr {
match *self { match *self {
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), MioListener::Tcp(ref lst) => lst
.local_addr()
.map(SocketAddr::Tcp)
.unwrap_or(SocketAddr::Unknown),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), MioListener::Uds(ref lst) => lst
.local_addr()
.map(SocketAddr::Uds)
.unwrap_or(SocketAddr::Unknown),
} }
} }
@ -110,14 +116,15 @@ impl fmt::Debug for MioListener {
impl fmt::Display for MioListener { impl fmt::Display for MioListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
} }
} }
} }
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Unknown,
Tcp(StdSocketAddr), Tcp(StdSocketAddr),
#[cfg(unix)] #[cfg(unix)]
Uds(mio::net::SocketAddr), Uds(mio::net::SocketAddr),
@ -126,9 +133,10 @@ pub(crate) enum SocketAddr {
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), Self::Uds(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }
@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr {
impl fmt::Debug for SocketAddr { impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), Self::Uds(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }

View File

@ -275,11 +275,7 @@ impl ServerWorker {
.enumerate() .enumerate()
.map(|(idx, factory)| { .map(|(idx, factory)| {
let fut = factory.create(); let fut = factory.create();
async move { async move { fut.await.map(|(t, s)| (idx, t, s)) }
fut.await.map(|r| {
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
})
}
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -292,9 +288,8 @@ impl ServerWorker {
let services = match res { let services = match res {
Ok(res) => res Ok(res) => res
.into_iter() .into_iter()
.flatten() .fold(Vec::new(), |mut services, (factory, token, service)| {
.fold(Vec::new(), |mut services, (factory, idx, service)| { assert_eq!(token, services.len());
assert_eq!(idx, services.len());
services.push(WorkerService { services.push(WorkerService {
factory, factory,
service, service,
@ -332,7 +327,7 @@ impl ServerWorker {
self.services[idx].status = WorkerServiceStatus::Restarting; self.services[idx].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart { self.state = WorkerState::Restarting(Restart {
factory_id, factory_id,
idx, token: idx,
fut: factory.create(), fut: factory.create(),
}); });
} }
@ -402,8 +397,8 @@ enum WorkerState {
struct Restart { struct Restart {
factory_id: usize, factory_id: usize,
idx: usize, token: usize,
fut: LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>>, fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
} }
// Shutdown keep states necessary for server shutdown: // Shutdown keep states necessary for server shutdown:
@ -475,21 +470,17 @@ impl Future for ServerWorker {
}, },
WorkerState::Restarting(ref mut restart) => { WorkerState::Restarting(ref mut restart) => {
let factory_id = restart.factory_id; let factory_id = restart.factory_id;
let token = restart.idx; let token = restart.token;
let service = ready!(restart.fut.as_mut().poll(cx)) let (token_new, service) = ready!(restart.fut.as_mut().poll(cx))
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
this.factories[factory_id].name(token) this.factories[factory_id].name(token)
) )
}) });
.into_iter()
// Find the same token from vector. There should be only one assert_eq!(token, token_new);
// So the first match would be enough.
.find(|(t, _)| *t == token)
.map(|(_, service)| service)
.expect("No BoxedServerService found");
trace!( trace!(
"Service {:?} has been restarted", "Service {:?} has been restarted",

View File

@ -142,57 +142,6 @@ fn test_start() {
let _ = h.join(); let _ = h.join();
} }
#[test]
fn test_configure() {
let addr1 = unused_addr();
let addr2 = unused_addr();
let addr3 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone();
let h = thread::spawn(move || {
let num = num2.clone();
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let lst = net::TcpListener::bind(addr3).unwrap();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.listen("addr3", lst)
.apply(move |rt| {
let num = num.clone();
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
rt.on_start(lazy(move |_| {
let _ = num.fetch_add(1, Ordering::Relaxed);
}))
})
})
.unwrap()
.workers(1)
.run()
}));
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Ordering::Relaxed), 1);
sys.stop();
let _ = h.join();
}
#[actix_rt::test] #[actix_rt::test]
async fn test_max_concurrent_connections() { async fn test_max_concurrent_connections() {
// Note: // Note:
@ -305,81 +254,6 @@ async fn test_service_restart() {
let num_clone = num.clone(); let num_clone = num.clone();
let num2_clone = num2.clone(); let num2_clone = num2.clone();
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
.backlog(1)
.disable_signals()
.configure(move |cfg| {
let num = num.clone();
let num2 = num2.clone();
cfg.bind("addr1", addr1)
.unwrap()
.bind("addr2", addr2)
.unwrap()
.apply(move |rt| {
let num = num.clone();
let num2 = num2.clone();
rt.service(
"addr1",
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
}),
);
rt.service(
"addr2",
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
}),
);
})
})
.unwrap()
.workers(1)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
.await
.unwrap()
.shutdown()
.await
.unwrap();
TcpStream::connect(addr2)
.await
.unwrap()
.shutdown()
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
let addr1 = unused_addr();
let addr2 = unused_addr();
let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0));
let num2 = Arc::new(AtomicUsize::new(0));
let num_clone = num.clone();
let num2_clone = num2.clone();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num.clone(); let num = num.clone();
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {