From ce02d8d24101748725b043fa5de3d18b770b6d16 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 28 Apr 2021 20:25:04 +0800 Subject: [PATCH] remove garbage --- actix-server/src/config.rs | 289 ------------------------------------- 1 file changed, 289 deletions(-) delete mode 100644 actix-server/src/config.rs diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs deleted file mode 100644 index 111daf90..00000000 --- a/actix-server/src/config.rs +++ /dev/null @@ -1,289 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::{fmt, io}; - -use actix_rt::net::TcpStream; -use actix_service::{ - fn_service, IntoServiceFactory as IntoBaseServiceFactory, - ServiceFactory as BaseServiceFactory, -}; -use actix_utils::future::ready; -use futures_core::future::LocalBoxFuture; -use log::error; - -use crate::{ - builder::bind_addr, - service::{BoxedServerService, InternalServiceFactory, StreamService}, - socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}, - worker::WorkerCounterGuard, -}; - -pub struct ServiceConfig { - pub(crate) services: Vec<(String, MioTcpListener)>, - pub(crate) apply: Option>, - pub(crate) threads: usize, - pub(crate) backlog: u32, -} - -impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { - ServiceConfig { - threads, - backlog, - services: Vec::new(), - apply: None, - } - } - - /// Set number of workers to start. - /// - /// By default server uses number of available logical cpu as workers - /// count. - pub fn workers(&mut self, num: usize) { - self.threads = num; - } - - /// Add new service to server - pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> - where - U: ToSocketAddrs, - { - let sockets = bind_addr(addr, self.backlog)?; - - for lst in sockets { - self._listen(name.as_ref(), lst); - } - - Ok(self) - } - - /// Add new service to server - pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { - self._listen(name, MioTcpListener::from_std(lst)) - } - - /// Register service configuration function. This function get called - /// during worker runtime configuration. It get executed in worker thread. - pub fn apply(&mut self, f: F) -> io::Result<()> - where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, - { - self.apply = Some(Box::new(f)); - Ok(()) - } - - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self - } -} - -pub(super) struct ConfiguredService { - rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, -} - -impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { - ConfiguredService { - rt, - names: HashMap::new(), - topics: HashMap::new(), - services: Vec::new(), - } - } - - pub(super) fn stream(&mut self, token: usize, name: String, addr: StdSocketAddr) { - self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, token); - self.services.push(token); - } -} - -impl InternalServiceFactory for ConfiguredService { - fn name(&self, idx: usize) -> &str { - &self.names[&idx].0 - } - - fn clone_factory(&self) -> Box { - Box::new(Self { - rt: self.rt.clone(), - names: self.names.clone(), - topics: self.topics.clone(), - services: self.services.clone(), - }) - } - - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { - // configure services - let mut rt = ServiceRuntime::new(self.topics.clone()); - self.rt.configure(&mut rt); - rt.validate(); - let mut names = self.names.clone(); - let tokens = self.services.clone(); - - // construct services - Box::pin(async move { - let mut services = rt.services; - // TODO: Proper error handling here - for f in rt.onstart.into_iter() { - f.await; - } - let mut res = vec![]; - for token in tokens { - if let Some(srv) = services.remove(&token) { - let newserv = srv.new_service(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Can not construct service"); - return Err(()); - } - } - } else { - let name = names.remove(&token).unwrap().0; - res.push(( - token, - Box::new(StreamService::new(fn_service(move |_: TcpStream| { - error!("Service {:?} is not configured", name); - ready::>(Ok(())) - }))), - )); - }; - } - Ok(res) - }) - } -} - -pub(super) trait ServiceRuntimeConfiguration: Send { - fn clone(&self) -> Box; - - fn configure(&self, rt: &mut ServiceRuntime); -} - -impl ServiceRuntimeConfiguration for F -where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, -{ - fn clone(&self) -> Box { - Box::new(self.clone()) - } - - fn configure(&self, rt: &mut ServiceRuntime) { - (self)(rt) - } -} - -fn not_configured(_: &mut ServiceRuntime) { - error!("Service is not configured"); -} - -pub struct ServiceRuntime { - names: HashMap, - services: HashMap, - onstart: Vec>, -} - -impl ServiceRuntime { - fn new(names: HashMap) -> Self { - ServiceRuntime { - names, - services: HashMap::new(), - onstart: Vec::new(), - } - } - - fn validate(&self) { - for (name, token) in &self.names { - if !self.services.contains_key(&token) { - error!("Service {:?} is not configured", name); - } - } - } - - /// Register service. - /// - /// Name of the service must be registered during configuration stage with - /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. - pub fn service(&mut self, name: &str, service: F) - where - F: IntoBaseServiceFactory, - T: BaseServiceFactory + 'static, - T::Future: 'static, - T::Service: 'static, - T::InitError: fmt::Debug, - { - // let name = name.to_owned(); - if let Some(token) = self.names.get(name) { - self.services.insert( - *token, - Box::new(ServiceFactory { - inner: service.into_factory(), - }), - ); - } else { - panic!("Unknown service: {:?}", name); - } - } - - /// Execute future before services initialization. - pub fn on_start(&mut self, fut: F) - where - F: Future + 'static, - { - self.onstart.push(Box::pin(fut)) - } -} - -type BoxedNewService = Box< - dyn BaseServiceFactory< - (WorkerCounterGuard, MioStream), - Response = (), - Error = (), - InitError = (), - Config = (), - Service = BoxedServerService, - Future = LocalBoxFuture<'static, Result>, - >, ->; - -struct ServiceFactory { - inner: T, -} - -impl BaseServiceFactory<(WorkerCounterGuard, MioStream)> for ServiceFactory -where - T: BaseServiceFactory, - T::Future: 'static, - T::Service: 'static, - T::Error: 'static, - T::InitError: fmt::Debug + 'static, -{ - type Response = (); - type Error = (); - type Config = (); - type Service = BoxedServerService; - type InitError = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _: ()) -> Self::Future { - let fut = self.inner.new_service(()); - Box::pin(async move { - match fut.await { - Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), - Err(e) => { - error!("Can not construct service: {:?}", e); - Err(()) - } - } - }) - } -}