From 2d667f328d988099011147bf1049c79ec07017a6 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 21 Oct 2020 14:29:19 +0800 Subject: [PATCH] reduce import --- actix-server/src/accept.rs | 11 +++---- actix-server/src/builder.rs | 6 ++-- actix-server/src/config.rs | 8 ++--- actix-server/src/lib.rs | 3 ++ actix-server/src/service.rs | 39 +++++++++++------------ actix-server/src/socket.rs | 4 +-- actix-server/src/waker_queue.rs | 8 ++--- actix-server/src/worker.rs | 56 +++++++++++++++++---------------- 8 files changed, 68 insertions(+), 67 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 93c49ead..389d4022 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -10,7 +10,7 @@ use slab::Slab; use crate::server::Server; use crate::socket::{MioSocketListener, SocketAddr, StdListener}; use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerClient}; +use crate::worker::{Conn, WorkerHandle}; use crate::Token; struct ServerSocketInfo { @@ -56,7 +56,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, socks: Vec<(Token, StdListener)>, - workers: Vec, + workers: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let poll = self.poll.take().unwrap(); @@ -70,7 +70,7 @@ impl AcceptLoop { struct Accept { poll: Poll, waker: WakerQueue, - workers: Vec, + workers: Vec, srv: Server, next: usize, backpressure: bool, @@ -97,7 +97,7 @@ impl Accept { waker: WakerQueue, socks: Vec<(Token, StdListener)>, srv: Server, - workers: Vec, + workers: Vec, ) { // Accept runs in its own thread and would want to spawn additional futures to current // actix system. @@ -117,10 +117,9 @@ impl Accept { poll: Poll, waker: WakerQueue, socks: Vec<(Token, StdListener)>, - workers: Vec, + workers: Vec, srv: Server, ) -> (Accept, Slab) { - let mut sockets = Slab::new(); for (hnd_token, lst) in socks.into_iter() { let addr = lst.local_addr(); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index e5551201..dc32bc22 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -22,7 +22,7 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::StdListener; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; +use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; use crate::Token; /// Server builder @@ -30,7 +30,7 @@ pub struct ServerBuilder { threads: usize, token: Token, backlog: i32, - workers: Vec<(usize, WorkerClient)>, + workers: Vec<(usize, WorkerHandle)>, services: Vec>, sockets: Vec<(Token, String, StdListener)>, accept: AcceptLoop, @@ -298,7 +298,7 @@ impl ServerBuilder { } } - fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerClient { + fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { let avail = WorkerAvailability::new(waker); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index e409fd7a..c17c98dd 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -1,10 +1,11 @@ use std::collections::HashMap; +use std::future::Future; use std::{fmt, io, net}; use actix_rt::net::TcpStream; use actix_service as actix; use actix_utils::counter::CounterGuard; -use futures_util::future::{ready, Future, FutureExt, LocalBoxFuture}; +use futures_util::future::{ready, FutureExt, LocalBoxFuture}; use log::error; use super::builder::bind_addr; @@ -119,7 +120,7 @@ impl InternalServiceFactory for ConfiguredService { let tokens = self.services.clone(); // construct services - async move { + Box::pin(async move { let mut services = rt.services; // TODO: Proper error handling here for f in rt.onstart.into_iter() { @@ -152,8 +153,7 @@ impl InternalServiceFactory for ConfiguredService { }; } Ok(res) - } - .boxed_local() + }) } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index ffecae8d..b15ff26e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -30,6 +30,9 @@ impl Token { } } +pub(crate) type LocalBoxFuture<'a, T> = + std::pin::Pin + 'a>>; + /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 435f3eab..29359fb6 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -6,12 +6,12 @@ use std::time::Duration; use actix_rt::spawn; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_utils::counter::CounterGuard; -use futures_util::future::{ready, LocalBoxFuture, Ready}; -use futures_util::{FutureExt, TryFutureExt}; +use futures_util::future::{ready, Ready}; use log::error; -use super::Token; use crate::socket::{FromStream, MioStream}; +use crate::LocalBoxFuture; +use crate::Token; /// Server message pub(crate) enum ServerMessage { @@ -76,22 +76,20 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { - ServerMessage::Connect(stream) => { - let stream = FromStream::from_mio(stream).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); - - if let Ok(stream) = stream { + ServerMessage::Connect(stream) => match FromStream::from_mio(stream) { + Ok(stream) => { let f = self.service.call(stream); spawn(async move { let _ = f.await; drop(guard); }); ready(Ok(())) - } else { + } + Err(e) => { + error!("Can not convert to an async tcp stream: {}", e); ready(Err(())) } - } + }, _ => ready(Ok(())), } } @@ -147,15 +145,16 @@ where fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { let token = self.token; - self.inner - .create() - .new_service(()) - .map_err(|_| ()) - .map_ok(move |inner| { - let service: BoxedServerService = Box::new(StreamService::new(inner)); - vec![(token, service)] - }) - .boxed_local() + let fut = self.inner.create().new_service(()); + Box::pin(async move { + match fut.await { + Ok(inner) => { + let service = Box::new(StreamService::new(inner)) as BoxedServerService; + Ok(vec![(token, service)]) + } + Err(_) => Err(()), + } + }) } } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index e72bd185..ed1f9d31 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -94,9 +94,7 @@ impl StdListener { StdListener::Uds(lst) => { // ToDo: the same as above lst.set_nonblocking(true)?; - Ok(MioSocketListener::Uds(MioUnixListener::from_std( - lst, - ))) + Ok(MioSocketListener::Uds(MioUnixListener::from_std(lst))) } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 544e05fb..ab6d9a38 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -3,13 +3,13 @@ use std::sync::Arc; use concurrent_queue::{ConcurrentQueue, PopError}; use mio::{Registry, Token as MioToken, Waker}; -use crate::worker::WorkerClient; +use crate::worker::WorkerHandle; /// waker token for `mio::Poll` instance pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` -/// we want `Poll` to look into. +/// the `Poll` would want to look into. pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue)>); impl Clone for WakerQueue { @@ -62,8 +62,8 @@ pub(crate) enum WakerInterest { Timer, /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by /// if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerClient` to workers. - Worker(WorkerClient), + /// `WorkerHandle`. + Worker(WorkerHandle), } pub(crate) type WakerQueueError = PopError; diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index e122d87d..ec88e0e5 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -10,13 +10,14 @@ use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot; -use futures_util::future::{join_all, LocalBoxFuture, MapOk}; -use futures_util::{stream::Stream, FutureExt, TryFutureExt}; +use futures_util::future::join_all; +use futures_util::{stream::Stream, TryFutureExt}; use log::{error, info, trace}; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::socket::{MioStream, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::LocalBoxFuture; use crate::Token; pub(crate) struct WorkerCommand(Conn); @@ -56,22 +57,24 @@ thread_local! { Counter::new(MAX_CONNS.load(Ordering::Relaxed)); } +// a handle to worker that can send message to worker and share the availability of worker to other +// thread. #[derive(Clone)] -pub(crate) struct WorkerClient { +pub(crate) struct WorkerHandle { pub idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, avail: WorkerAvailability, } -impl WorkerClient { +impl WorkerHandle { pub fn new( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, avail: WorkerAvailability, ) -> Self { - WorkerClient { + WorkerHandle { idx, tx1, tx2, @@ -166,7 +169,7 @@ impl Worker { factories: Vec>, availability: WorkerAvailability, shutdown_timeout: Duration, - ) -> WorkerClient { + ) -> WorkerHandle { let (tx1, rx) = unbounded(); let (tx2, rx2) = unbounded(); let avail = availability.clone(); @@ -184,14 +187,16 @@ impl Worker { state: WorkerState::Unavailable(Vec::new()), }); - let mut fut: Vec, _>> = Vec::new(); - for (idx, factory) in wrk.factories.iter().enumerate() { - fut.push(factory.create().map_ok(move |r| { - r.into_iter() - .map(|(t, s): (Token, _)| (idx, t, s)) - .collect::>() - })); - } + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + factory.create().map_ok(move |r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() + }) + }) + .collect::>(); spawn(async move { let res = join_all(fut).await; @@ -218,7 +223,7 @@ impl Worker { }); })); - WorkerClient::new(idx, tx1, tx2, avail) + WorkerHandle::new(idx, tx1, tx2, avail) } fn shutdown(&mut self, force: bool) { @@ -226,11 +231,10 @@ impl Worker { self.services.iter_mut().for_each(|srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopped; - spawn( - srv.service - .call((None, ServerMessage::ForceShutdown)) - .map(|_| ()), - ); + let fut = srv.service.call((None, ServerMessage::ForceShutdown)); + spawn(async { + let _ = fut.await; + }); } }); } else { @@ -238,11 +242,10 @@ impl Worker { self.services.iter_mut().for_each(move |srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopping; - spawn( - srv.service - .call((None, ServerMessage::Shutdown(timeout))) - .map(|_| ()), - ); + let fut = srv.service.call((None, ServerMessage::Shutdown(timeout))); + spawn(async { + let _ = fut.await; + }); } }); } @@ -309,7 +312,6 @@ enum WorkerState { impl Future for Worker { type Output = (); - // #[allow(clippy::never_loop)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = @@ -480,7 +482,7 @@ impl Future for Worker { Poll::Pending } Poll::Ready(None) => Poll::Ready(()), - } + }; } } }