diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 58b1bd38..28c7b206 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx +* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] +* Remove `ServerBuilder::configure` [#349] + +[#349]: https://github.com/actix/actix-net/pull/349 ## 2.0.0-beta.5 - 2021-04-20 diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ab0ae707..9101939e 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -9,14 +9,11 @@ use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use crate::server::Server; -use crate::socket::{MioListener, SocketAddr}; +use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; struct ServerSocketInfo { - /// Address of socket. Mainly used for logging. - addr: SocketAddr, - token: usize, lst: MioListener, @@ -189,15 +186,12 @@ impl Accept { let sockets = socks .into_iter() .map(|(token, mut lst)| { - let addr = lst.local_addr(); - // Start listening for incoming connections poll.registry() .register(&mut lst, MioToken(token), Interest::READABLE) .unwrap_or_else(|e| panic!("Can not register io: {}", e)); ServerSocketInfo { - addr, token, lst, timeout: None, @@ -370,14 +364,14 @@ impl Accept { fn register_logged(&self, info: &mut ServerSocketInfo) { match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.addr), + Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } fn deregister_logged(&self, info: &mut ServerSocketInfo) { match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.addr), + Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index bd694e7c..e84a887d 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,7 +14,6 @@ use tokio::sync::{ }; use crate::accept::AcceptLoop; -use crate::config::{ConfiguredService, ServiceConfig}; use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; @@ -148,32 +147,6 @@ impl ServerBuilder { self } - /// Execute external configuration as part of the server building process. - /// - /// This function is useful for moving parts of configuration to a different module or - /// even library. - pub fn configure(mut self, f: F) -> io::Result - where - F: Fn(&mut ServiceConfig) -> io::Result<()>, - { - let mut cfg = ServiceConfig::new(self.threads, self.backlog); - - f(&mut cfg)?; - - if let Some(apply) = cfg.apply { - let mut srv = ConfiguredService::new(apply); - for (name, lst) in cfg.services { - let token = self.next_token(); - srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, MioListener::Tcp(lst))); - } - self.services.push(Box::new(srv)); - } - self.threads = cfg.threads; - - Ok(self) - } - /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index ba7d0c29..b2117191 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,7 +6,6 @@ mod accept; mod builder; -mod config; mod server; mod service; mod signals; @@ -16,7 +15,6 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index bc436e75..28ffb4f1 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -21,7 +21,7 @@ pub(crate) trait InternalServiceFactory: Send { fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< @@ -128,14 +128,14 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; - Ok(vec![(token, service)]) + Ok((token, service)) } Err(_) => Err(()), } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 948b5f1f..cd7ccc1a 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -23,9 +23,15 @@ pub(crate) enum MioListener { impl MioListener { pub(crate) fn local_addr(&self) -> SocketAddr { match *self { - MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + MioListener::Tcp(ref lst) => lst + .local_addr() + .map(SocketAddr::Tcp) + .unwrap_or(SocketAddr::Unknown), #[cfg(unix)] - MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + MioListener::Uds(ref lst) => lst + .local_addr() + .map(SocketAddr::Uds) + .unwrap_or(SocketAddr::Unknown), } } @@ -110,14 +116,15 @@ impl fmt::Debug for MioListener { impl fmt::Display for MioListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), #[cfg(unix)] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + MioListener::Uds(ref lst) => write!(f, "{:?}", lst), } } } pub(crate) enum SocketAddr { + Unknown, Tcp(StdSocketAddr), #[cfg(unix)] Uds(mio::net::SocketAddr), @@ -126,9 +133,10 @@ pub(crate) enum SocketAddr { impl fmt::Display for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + Self::Unknown => write!(f, "Unknown SocketAddr"), + Self::Tcp(ref addr) => write!(f, "{}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } @@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr { impl fmt::Debug for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + Self::Unknown => write!(f, "Unknown SocketAddr"), + Self::Tcp(ref addr) => write!(f, "{:?}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index df8bc723..f929fc18 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -275,11 +275,7 @@ impl ServerWorker { .enumerate() .map(|(idx, factory)| { let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) .collect::>(); @@ -292,9 +288,8 @@ impl ServerWorker { let services = match res { Ok(res) => res .into_iter() - .flatten() - .fold(Vec::new(), |mut services, (factory, idx, service)| { - assert_eq!(idx, services.len()); + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); services.push(WorkerService { factory, service, @@ -332,7 +327,7 @@ impl ServerWorker { self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - idx, + token: idx, fut: factory.create(), }); } @@ -402,8 +397,8 @@ enum WorkerState { struct Restart { factory_id: usize, - idx: usize, - fut: LocalBoxFuture<'static, Result, ()>>, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -475,21 +470,17 @@ impl Future for ServerWorker { }, WorkerState::Restarting(ref mut restart) => { let factory_id = restart.factory_id; - let token = restart.idx; + let token = restart.token; - let service = ready!(restart.fut.as_mut().poll(cx)) + let (token_new, service) = ready!(restart.fut.as_mut().poll(cx)) .unwrap_or_else(|_| { panic!( "Can not restart {:?} service", this.factories[factory_id].name(token) ) - }) - .into_iter() - // Find the same token from vector. There should be only one - // So the first match would be enough. - .find(|(t, _)| *t == token) - .map(|(_, service)| service) - .expect("No BoxedServerService found"); + }); + + assert_eq!(token, token_new); trace!( "Service {:?} has been restarted", diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cc9f8190..78894816 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -142,57 +142,6 @@ fn test_start() { let _ = h.join(); } -#[test] -fn test_configure() { - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let addr3 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = num.clone(); - - let h = thread::spawn(move || { - let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Ordering::Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); - }); - let (_, sys) = rx.recv().unwrap(); - thread::sleep(Duration::from_millis(500)); - - assert!(net::TcpStream::connect(addr1).is_ok()); - assert!(net::TcpStream::connect(addr2).is_ok()); - assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Ordering::Relaxed), 1); - sys.stop(); - let _ = h.join(); -} - #[actix_rt::test] async fn test_max_concurrent_connections() { // Note: @@ -305,81 +254,6 @@ async fn test_service_restart() { let num_clone = num.clone(); let num2_clone = num2.clone(); - let h = thread::spawn(move || { - actix_rt::System::new().block_on(async { - let server = Server::build() - .backlog(1) - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let num2 = num2.clone(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .apply(move |rt| { - let num = num.clone(); - let num2 = num2.clone(); - rt.service( - "addr1", - fn_factory(move || { - let num = num.clone(); - async move { Ok::<_, ()>(TestService(num)) } - }), - ); - rt.service( - "addr2", - fn_factory(move || { - let num2 = num2.clone(); - async move { Ok::<_, ()>(TestService(num2)) } - }), - ); - }) - }) - .unwrap() - .workers(1) - .run(); - - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await - }) - }); - - let (server, sys) = rx.recv().unwrap(); - - for _ in 0..5 { - TcpStream::connect(addr1) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - TcpStream::connect(addr2) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - } - - sleep(Duration::from_secs(3)).await; - - assert!(num_clone.load(Ordering::SeqCst) > 5); - assert!(num2_clone.load(Ordering::SeqCst) > 5); - - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); - - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = Arc::new(AtomicUsize::new(0)); - - let num_clone = num.clone(); - let num2_clone = num2.clone(); - let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async {