reduce import

This commit is contained in:
fakeshadow 2020-10-21 14:29:19 +08:00
parent 8582aaaeb6
commit 2d667f328d
8 changed files with 68 additions and 67 deletions

View File

@ -10,7 +10,7 @@ use slab::Slab;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioSocketListener, SocketAddr, StdListener}; use crate::socket::{MioSocketListener, SocketAddr, StdListener};
use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN};
use crate::worker::{Conn, WorkerClient}; use crate::worker::{Conn, WorkerHandle};
use crate::Token; use crate::Token;
struct ServerSocketInfo { struct ServerSocketInfo {
@ -56,7 +56,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerHandle>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap(); let poll = self.poll.take().unwrap();
@ -70,7 +70,7 @@ impl AcceptLoop {
struct Accept { struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
workers: Vec<WorkerClient>, workers: Vec<WorkerHandle>,
srv: Server, srv: Server,
next: usize, next: usize,
backpressure: bool, backpressure: bool,
@ -97,7 +97,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
srv: Server, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerHandle>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
@ -117,10 +117,9 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerHandle>,
srv: Server, srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new(); let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() { for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr(); let addr = lst.local_addr();

View File

@ -22,7 +22,7 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::StdListener; use crate::socket::StdListener;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle};
use crate::Token; use crate::Token;
/// Server builder /// Server builder
@ -30,7 +30,7 @@ pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: i32, backlog: i32,
workers: Vec<(usize, WorkerClient)>, workers: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, StdListener)>, sockets: Vec<(Token, String, StdListener)>,
accept: AcceptLoop, accept: AcceptLoop,
@ -298,7 +298,7 @@ impl ServerBuilder {
} }
} }
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerClient { fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle {
let avail = WorkerAvailability::new(waker); let avail = WorkerAvailability::new(waker);
let services: Vec<Box<dyn InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();

View File

@ -1,10 +1,11 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future;
use std::{fmt, io, net}; use std::{fmt, io, net};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service as actix; use actix_service as actix;
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{ready, Future, FutureExt, LocalBoxFuture}; use futures_util::future::{ready, FutureExt, LocalBoxFuture};
use log::error; use log::error;
use super::builder::bind_addr; use super::builder::bind_addr;
@ -119,7 +120,7 @@ impl InternalServiceFactory for ConfiguredService {
let tokens = self.services.clone(); let tokens = self.services.clone();
// construct services // construct services
async move { Box::pin(async move {
let mut services = rt.services; let mut services = rt.services;
// TODO: Proper error handling here // TODO: Proper error handling here
for f in rt.onstart.into_iter() { for f in rt.onstart.into_iter() {
@ -152,8 +153,7 @@ impl InternalServiceFactory for ConfiguredService {
}; };
} }
Ok(res) Ok(res)
} })
.boxed_local()
} }
} }

View File

@ -30,6 +30,9 @@ impl Token {
} }
} }
pub(crate) type LocalBoxFuture<'a, T> =
std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
/// Start server building process /// Start server building process
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()

View File

@ -6,12 +6,12 @@ use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{ready, LocalBoxFuture, Ready}; use futures_util::future::{ready, Ready};
use futures_util::{FutureExt, TryFutureExt};
use log::error; use log::error;
use super::Token;
use crate::socket::{FromStream, MioStream}; use crate::socket::{FromStream, MioStream};
use crate::LocalBoxFuture;
use crate::Token;
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
@ -76,22 +76,20 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => match FromStream::from_mio(stream) {
let stream = FromStream::from_mio(stream).map_err(|e| { Ok(stream) => {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
let f = self.service.call(stream); let f = self.service.call(stream);
spawn(async move { spawn(async move {
let _ = f.await; let _ = f.await;
drop(guard); drop(guard);
}); });
ready(Ok(())) ready(Ok(()))
} else { }
Err(e) => {
error!("Can not convert to an async tcp stream: {}", e);
ready(Err(())) ready(Err(()))
} }
} },
_ => ready(Ok(())), _ => ready(Ok(())),
} }
} }
@ -147,15 +145,16 @@ where
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
let token = self.token; let token = self.token;
self.inner let fut = self.inner.create().new_service(());
.create() Box::pin(async move {
.new_service(()) match fut.await {
.map_err(|_| ()) Ok(inner) => {
.map_ok(move |inner| { let service = Box::new(StreamService::new(inner)) as BoxedServerService;
let service: BoxedServerService = Box::new(StreamService::new(inner)); Ok(vec![(token, service)])
vec![(token, service)] }
}) Err(_) => Err(()),
.boxed_local() }
})
} }
} }

View File

@ -94,9 +94,7 @@ impl StdListener {
StdListener::Uds(lst) => { StdListener::Uds(lst) => {
// ToDo: the same as above // ToDo: the same as above
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
Ok(MioSocketListener::Uds(MioUnixListener::from_std( Ok(MioSocketListener::Uds(MioUnixListener::from_std(lst)))
lst,
)))
} }
} }
} }

View File

@ -3,13 +3,13 @@ use std::sync::Arc;
use concurrent_queue::{ConcurrentQueue, PopError}; use concurrent_queue::{ConcurrentQueue, PopError};
use mio::{Registry, Token as MioToken, Waker}; use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerClient; use crate::worker::WorkerHandle;
/// waker token for `mio::Poll` instance /// waker token for `mio::Poll` instance
pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); pub(crate) const WAKER_TOKEN: MioToken = MioToken(1);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// we want `Poll` to look into. /// the `Poll` would want to look into.
pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue<WakerInterest>)>); pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue<WakerInterest>)>);
impl Clone for WakerQueue { impl Clone for WakerQueue {
@ -62,8 +62,8 @@ pub(crate) enum WakerInterest {
Timer, Timer,
/// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by
/// if work can be sent to it successfully).`Accept` would be waked up and add the new /// if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerClient` to workers. /// `WorkerHandle`.
Worker(WorkerClient), Worker(WorkerHandle),
} }
pub(crate) type WakerQueueError = PopError; pub(crate) type WakerQueueError = PopError;

View File

@ -10,13 +10,14 @@ use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::future::{join_all, LocalBoxFuture, MapOk}; use futures_util::future::join_all;
use futures_util::{stream::Stream, FutureExt, TryFutureExt}; use futures_util::{stream::Stream, TryFutureExt};
use log::{error, info, trace}; use log::{error, info, trace};
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{MioStream, SocketAddr}; use crate::socket::{MioStream, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::LocalBoxFuture;
use crate::Token; use crate::Token;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@ -56,22 +57,24 @@ thread_local! {
Counter::new(MAX_CONNS.load(Ordering::Relaxed)); Counter::new(MAX_CONNS.load(Ordering::Relaxed));
} }
// a handle to worker that can send message to worker and share the availability of worker to other
// thread.
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct WorkerClient { pub(crate) struct WorkerHandle {
pub idx: usize, pub idx: usize,
tx1: UnboundedSender<WorkerCommand>, tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>, tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerClient { impl WorkerHandle {
pub fn new( pub fn new(
idx: usize, idx: usize,
tx1: UnboundedSender<WorkerCommand>, tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>, tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
) -> Self { ) -> Self {
WorkerClient { WorkerHandle {
idx, idx,
tx1, tx1,
tx2, tx2,
@ -166,7 +169,7 @@ impl Worker {
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: Duration, shutdown_timeout: Duration,
) -> WorkerClient { ) -> WorkerHandle {
let (tx1, rx) = unbounded(); let (tx1, rx) = unbounded();
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = availability.clone(); let avail = availability.clone();
@ -184,14 +187,16 @@ impl Worker {
state: WorkerState::Unavailable(Vec::new()), state: WorkerState::Unavailable(Vec::new()),
}); });
let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new(); let fut = wrk
for (idx, factory) in wrk.factories.iter().enumerate() { .factories
fut.push(factory.create().map_ok(move |r| { .iter()
r.into_iter() .enumerate()
.map(|(t, s): (Token, _)| (idx, t, s)) .map(|(idx, factory)| {
.collect::<Vec<_>>() factory.create().map_ok(move |r| {
})); r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
} })
})
.collect::<Vec<_>>();
spawn(async move { spawn(async move {
let res = join_all(fut).await; let res = join_all(fut).await;
@ -218,7 +223,7 @@ impl Worker {
}); });
})); }));
WorkerClient::new(idx, tx1, tx2, avail) WorkerHandle::new(idx, tx1, tx2, avail)
} }
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
@ -226,11 +231,10 @@ impl Worker {
self.services.iter_mut().for_each(|srv| { self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped; srv.status = WorkerServiceStatus::Stopped;
spawn( let fut = srv.service.call((None, ServerMessage::ForceShutdown));
srv.service spawn(async {
.call((None, ServerMessage::ForceShutdown)) let _ = fut.await;
.map(|_| ()), });
);
} }
}); });
} else { } else {
@ -238,11 +242,10 @@ impl Worker {
self.services.iter_mut().for_each(move |srv| { self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping; srv.status = WorkerServiceStatus::Stopping;
spawn( let fut = srv.service.call((None, ServerMessage::Shutdown(timeout)));
srv.service spawn(async {
.call((None, ServerMessage::Shutdown(timeout))) let _ = fut.await;
.map(|_| ()), });
);
} }
}); });
} }
@ -309,7 +312,6 @@ enum WorkerState {
impl Future for Worker { impl Future for Worker {
type Output = (); type Output = ();
// #[allow(clippy::never_loop)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) = if let Poll::Ready(Some(StopCommand { graceful, result })) =
@ -480,7 +482,7 @@ impl Future for Worker {
Poll::Pending Poll::Pending
} }
Poll::Ready(None) => Poll::Ready(()), Poll::Ready(None) => Poll::Ready(()),
} };
} }
} }
} }