apply generic executor to actix-server

This commit is contained in:
fakeshadow 2020-10-30 06:17:02 +08:00
parent 9a7a626f7b
commit bb4b0d1a63
16 changed files with 353 additions and 128 deletions

View File

@ -130,8 +130,14 @@ impl<E: ExecFactory> SystemRunner<E> {
} }
} }
pub fn spawn<F>(&mut self, fut: F)
where
F: Future<Output = ()> + 'static,
{
E::spawn_ref(&mut self.rt, fut);
}
/// Execute a future and wait for result. /// Execute a future and wait for result.
#[inline]
pub fn block_on<F, O>(&mut self, fut: F) -> O pub fn block_on<F, O>(&mut self, fut: F) -> O
where where
F: Future<Output = O>, F: Future<Output = O>,

View File

@ -33,7 +33,7 @@ pub fn spawn<F>(f: F)
where where
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
DefaultExec::spawn(f) DefaultExec::spawn(f);
} }
/// Asynchronous signal handling /// Asynchronous signal handling

View File

@ -1,13 +1,16 @@
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::time::Duration;
use tokio::{runtime, task::LocalSet}; use tokio::{runtime, task::LocalSet};
/// A trait for construct async executor and run future on it. /// A trait for construct async executor and run future on it.
/// ///
/// A factory trait is necessary as `actix` and `actix-web` can run on multiple instances of /// A factory trait is necessary as `actix` and `actix-web` can run on multiple instances of
/// executors. Therefore the executor would be constructed multiple times /// executors. Therefore the executor would be constructed multiple times
pub trait ExecFactory: Sized + Unpin + 'static { pub trait ExecFactory: Sized + Send + Sync + Unpin + 'static {
type Executor; type Executor;
type Sleep: Future<Output = ()> + Send + Unpin + 'static;
fn build() -> io::Result<Self::Executor>; fn build() -> io::Result<Self::Executor>;
@ -64,6 +67,9 @@ pub trait ExecFactory: Sized + Unpin + 'static {
/// ///
/// *. `spawn_ref` is preferred when you can choose between it and `spawn`. /// *. `spawn_ref` is preferred when you can choose between it and `spawn`.
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F); fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F);
/// Get a timeout sleep future with given duration.
fn sleep(dur: Duration) -> Self::Sleep;
} }
/// Default Single-threaded tokio executor on the current thread. /// Default Single-threaded tokio executor on the current thread.
@ -78,6 +84,7 @@ pub type DefaultExecutor = (runtime::Runtime, LocalSet);
impl ExecFactory for DefaultExec { impl ExecFactory for DefaultExec {
type Executor = DefaultExecutor; type Executor = DefaultExecutor;
type Sleep = tokio::time::Delay;
fn build() -> io::Result<Self::Executor> { fn build() -> io::Result<Self::Executor> {
let rt = runtime::Builder::new() let rt = runtime::Builder::new()
@ -95,6 +102,7 @@ impl ExecFactory for DefaultExec {
rt.block_on(local.run_until(f)) rt.block_on(local.run_until(f))
} }
#[inline]
fn spawn<F: Future<Output = ()> + 'static>(f: F) { fn spawn<F: Future<Output = ()> + 'static>(f: F) {
tokio::task::spawn_local(f); tokio::task::spawn_local(f);
} }
@ -102,4 +110,9 @@ impl ExecFactory for DefaultExec {
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) { fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) {
exec.1.spawn_local(f); exec.1.spawn_local(f);
} }
#[inline]
fn sleep(dur: Duration) -> Self::Sleep {
tokio::time::delay_for(dur)
}
} }

View File

@ -74,6 +74,7 @@ struct TokioCompatExec;
impl ExecFactory for TokioCompatExec { impl ExecFactory for TokioCompatExec {
type Executor = tokio_compat::runtime::current_thread::Runtime; type Executor = tokio_compat::runtime::current_thread::Runtime;
type Sleep = tokio::time::Delay;
fn build() -> std::io::Result<Self::Executor> { fn build() -> std::io::Result<Self::Executor> {
let rt = tokio_compat::runtime::current_thread::Runtime::new()?; let rt = tokio_compat::runtime::current_thread::Runtime::new()?;
@ -94,6 +95,10 @@ impl ExecFactory for TokioCompatExec {
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) { fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) {
exec.spawn_std(f); exec.spawn_std(f);
} }
fn sleep(dur: Duration) -> Self::Sleep {
tokio::time::delay_for(dur)
}
} }
#[test] #[test]

View File

@ -41,4 +41,5 @@ mio-uds = { version = "0.6.7" }
bytes = "0.5" bytes = "0.5"
env_logger = "0.7" env_logger = "0.7"
actix-testing = "1.0.0" actix-testing = "1.0.0"
async-std = { version = "1.6.5", features = ["unstable", "tokio02"] }
tokio = { version = "0.2", features = ["io-util"] } tokio = { version = "0.2", features = ["io-util"] }

View File

@ -0,0 +1,128 @@
//! Simple composite-service TCP echo server.
//!
//! Using the following command:
//!
//! ```sh
//! nc 127.0.0.1 8080
//! ```
//!
//! Start typing. When you press enter the typed line will be echoed back. The server will log
//! the length of each line it echos and the total size of data sent when the connection is closed.
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{env, io};
use actix_rt::ExecFactory;
use actix_server::{FromStream, Server, StdStream};
use actix_service::pipeline_factory;
use futures_util::future::ok;
use log::{error, info};
fn main() -> io::Result<()> {
actix_rt::System::new_with::<AsyncStdExec, _>("actix").block_on(async {
env::set_var("RUST_LOG", "actix=trace,basic=trace");
env_logger::init();
let count = Arc::new(AtomicUsize::new(0));
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of available
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build_with::<AsyncStdExec>()
.bind("echo", addr, move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: AsyncStdTcpStream| {
let count = Arc::clone(&count);
async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;
let mut size = 0;
let mut buf = vec![0; 1024];
use async_std::prelude::*;
loop {
match stream.0.read(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.0.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}
// send data down service pipeline
Ok((buf.len(), size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})?
.workers(1)
.run()
.await
})
}
struct AsyncStdExec;
struct AsyncStdTcpStream(async_std::net::TcpStream);
impl FromStream for AsyncStdTcpStream {
fn from_stdstream(stream: StdStream) -> std::io::Result<Self> {
match stream {
StdStream::Tcp(tcp) => Ok(AsyncStdTcpStream(async_std::net::TcpStream::from(tcp))),
_ => unimplemented!(),
}
}
}
impl ExecFactory for AsyncStdExec {
type Executor = ();
type Sleep = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
fn build() -> std::io::Result<Self::Executor> {
Ok(())
}
fn block_on<F: Future>(_: &mut Self::Executor, f: F) -> <F as Future>::Output {
async_std::task::block_on(f)
}
fn spawn<F: Future<Output = ()> + 'static>(f: F) {
async_std::task::spawn_local(f);
}
fn spawn_ref<F: Future<Output = ()> + 'static>(_: &mut Self::Executor, f: F) {
async_std::task::spawn_local(f);
}
fn sleep(dur: Duration) -> Self::Sleep {
Box::pin(async_std::task::sleep(dur))
}
}

View File

@ -1,9 +1,9 @@
use std::marker::PhantomData;
use std::sync::mpsc as sync_mpsc; use std::sync::mpsc as sync_mpsc;
use std::time::Duration; use std::time::{Duration, Instant};
use std::{io, thread}; use std::{io, thread};
use actix_rt::time::{delay_until, Instant}; use actix_rt::{ExecFactory, System};
use actix_rt::System;
use log::{error, info}; use log::{error, info};
use slab::Slab; use slab::Slab;
@ -81,14 +81,16 @@ impl AcceptLoop {
AcceptNotify::new(self.notify_ready.clone()) AcceptNotify::new(self.notify_ready.clone())
} }
pub(crate) fn start( pub(crate) fn start<Exec>(
&mut self, &mut self,
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) where
Exec: ExecFactory,
{
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
Accept::start( Accept::<Exec>::start(
self.rx.take().expect("Can not re-use AcceptInfo"), self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"), self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"), self.notify_reg.take().expect("Can not re-use AcceptInfo"),
@ -99,7 +101,7 @@ impl AcceptLoop {
} }
} }
struct Accept { struct Accept<Exec> {
poll: mio::Poll, poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>, sockets: Slab<ServerSocketInfo>,
@ -108,6 +110,7 @@ struct Accept {
timer: (mio::Registration, mio::SetReadiness), timer: (mio::Registration, mio::SetReadiness),
next: usize, next: usize,
backpressure: bool, backpressure: bool,
_exec: PhantomData<Exec>,
} }
const DELTA: usize = 100; const DELTA: usize = 100;
@ -128,7 +131,10 @@ fn connection_error(e: &io::Error) -> bool {
|| e.kind() == io::ErrorKind::ConnectionReset || e.kind() == io::ErrorKind::ConnectionReset
} }
impl Accept { impl<Exec> Accept<Exec>
where
Exec: ExecFactory,
{
#![allow(clippy::too_many_arguments)] #![allow(clippy::too_many_arguments)]
pub(crate) fn start( pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, rx: sync_mpsc::Receiver<Command>,
@ -145,7 +151,7 @@ impl Accept {
.name("actix-server accept loop".to_owned()) .name("actix-server accept loop".to_owned())
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv); let mut accept = Accept::<Exec>::new(rx, socks, workers, srv);
// Start listening for incoming commands // Start listening for incoming commands
if let Err(err) = accept.poll.register( if let Err(err) = accept.poll.register(
@ -176,7 +182,7 @@ impl Accept {
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: Server, srv: Server,
) -> Accept { ) -> Accept<Exec> {
// Create a poll instance // Create a poll instance
let poll = match mio::Poll::new() { let poll = match mio::Poll::new() {
Ok(poll) => poll, Ok(poll) => poll,
@ -227,6 +233,7 @@ impl Accept {
next: 0, next: 0,
timer: (tm, tmr), timer: (tm, tmr),
backpressure: false, backpressure: false,
_exec: PhantomData,
} }
} }
@ -462,7 +469,7 @@ impl Accept {
let r = self.timer.1.clone(); let r = self.timer.1.clone();
System::current().arbiter().send(Box::pin(async move { System::current().arbiter().send(Box::pin(async move {
delay_until(Instant::now() + Duration::from_millis(510)).await; Exec::sleep(Duration::from_millis(510)).await;
let _ = r.set_readiness(mio::Ready::readable()); let _ = r.set_readiness(mio::Ready::readable());
})); }));
return; return;

View File

@ -1,11 +1,11 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use std::{io, mem, net}; use std::{io, mem, net};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_rt::time::{delay_until, Instant}; use actix_rt::{DefaultExec, ExecFactory, System};
use actix_rt::{spawn, System};
use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::mpsc::{unbounded, UnboundedReceiver};
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::future::ready; use futures_util::future::ready;
@ -21,10 +21,10 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::StdListener; use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::Token; use crate::{FromStream, Token};
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder<Exec = DefaultExec> {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: i32, backlog: i32,
@ -38,6 +38,7 @@ pub struct ServerBuilder {
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
server: Server, server: Server,
notify: Vec<oneshot::Sender<()>>, notify: Vec<oneshot::Sender<()>>,
_exec: PhantomData<Exec>,
} }
impl Default for ServerBuilder { impl Default for ServerBuilder {
@ -46,9 +47,20 @@ impl Default for ServerBuilder {
} }
} }
impl ServerBuilder { impl<Exec> ServerBuilder<Exec>
/// Create new Server builder instance where
pub fn new() -> ServerBuilder { Exec: ExecFactory,
{
/// Create new Server builder instance with default tokio executor.
pub fn new() -> Self {
ServerBuilder::<DefaultExec>::new_with()
}
/// Create new Server builder instance with a generic executor.
pub fn new_with<E>() -> ServerBuilder<E>
where
E: ExecFactory,
{
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let server = Server::new(tx); let server = Server::new(tx);
@ -66,6 +78,7 @@ impl ServerBuilder {
cmd: rx, cmd: rx,
notify: Vec::new(), notify: Vec::new(),
server, server,
_exec: Default::default(),
} }
} }
@ -134,7 +147,7 @@ impl ServerBuilder {
/// ///
/// This function is useful for moving parts of configuration to a /// This function is useful for moving parts of configuration to a
/// different module or even library. /// different module or even library.
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder> pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder<Exec>>
where where
F: Fn(&mut ServiceConfig) -> io::Result<()>, F: Fn(&mut ServiceConfig) -> io::Result<()>,
{ {
@ -143,7 +156,7 @@ impl ServerBuilder {
f(&mut cfg)?; f(&mut cfg)?;
if let Some(apply) = cfg.apply { if let Some(apply) = cfg.apply {
let mut srv = ConfiguredService::new(apply); let mut srv = ConfiguredService::<Exec>::new(apply);
for (name, lst) in cfg.services { for (name, lst) in cfg.services {
let token = self.token.next(); let token = self.token.next();
srv.stream(token, name.clone(), lst.local_addr()?); srv.stream(token, name.clone(), lst.local_addr()?);
@ -157,16 +170,18 @@ impl ServerBuilder {
} }
/// Add new service to the server. /// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N, S>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServiceFactory<S>,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
N: AsRef<str>,
S: FromStream,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets { for lst in sockets {
let token = self.token.next(); let token = self.token.next();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::<_, _, Exec>::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory.clone(), factory.clone(),
@ -180,11 +195,12 @@ impl ServerBuilder {
#[cfg(all(unix))] #[cfg(all(unix))]
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind_uds<F, U, N, S>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServiceFactory<S>,
N: AsRef<str>,
U: AsRef<std::path::Path>, U: AsRef<std::path::Path>,
N: AsRef<str>,
S: FromStream,
{ {
use std::os::unix::net::UnixListener; use std::os::unix::net::UnixListener;
@ -205,19 +221,21 @@ impl ServerBuilder {
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// Useful when running as a systemd service and /// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate. /// a socket FD can be acquired using the systemd crate.
pub fn listen_uds<F, N: AsRef<str>>( pub fn listen_uds<F, N, S>(
mut self, mut self,
name: N, name: N,
lst: std::os::unix::net::UnixListener, lst: std::os::unix::net::UnixListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<actix_rt::net::UnixStream>, F: ServiceFactory<S>,
N: AsRef<str>,
S: FromStream,
{ {
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
let token = self.token.next(); let token = self.token.next();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create( self.services.push(StreamNewService::<_, _, Exec>::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory, factory,
@ -239,7 +257,7 @@ impl ServerBuilder {
F: ServiceFactory<TcpStream>, F: ServiceFactory<TcpStream>,
{ {
let token = self.token.next(); let token = self.token.next();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::<_, _, Exec>::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory, factory,
@ -276,7 +294,7 @@ impl ServerBuilder {
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2); info!("Starting \"{}\" service on {}", sock.1, sock.2);
} }
self.accept.start( self.accept.start::<Exec>(
mem::take(&mut self.sockets) mem::take(&mut self.sockets)
.into_iter() .into_iter()
.map(|t| (t.0, t.2)) .map(|t| (t.0, t.2))
@ -286,12 +304,12 @@ impl ServerBuilder {
// handle signals // handle signals
if !self.no_signals { if !self.no_signals {
Signals::start(self.server.clone()).unwrap(); Signals::<Exec>::start(self.server.clone()).unwrap();
} }
// start http server actor // start http server actor
let server = self.server.clone(); let server = self.server.clone();
spawn(self); Exec::spawn(self);
server server
} }
} }
@ -301,7 +319,7 @@ impl ServerBuilder {
let services: Vec<Box<dyn InternalServiceFactory>> = let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Worker::start(idx, services, avail, self.shutdown_timeout) Worker::<Exec>::start(idx, services, avail, self.shutdown_timeout)
} }
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
@ -360,7 +378,7 @@ impl ServerBuilder {
// stop workers // stop workers
if !self.workers.is_empty() && graceful { if !self.workers.is_empty() && graceful {
spawn( Exec::spawn(
self.workers self.workers
.iter() .iter()
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful))
@ -374,16 +392,10 @@ impl ServerBuilder {
let _ = tx.send(()); let _ = tx.send(());
} }
if exit { if exit {
spawn( Exec::spawn(async {
async { Exec::sleep(Duration::from_millis(300)).await;
delay_until(
Instant::now() + Duration::from_millis(300),
)
.await;
System::current().stop(); System::current().stop();
} });
.boxed(),
);
} }
ready(()) ready(())
}), }),
@ -391,14 +403,10 @@ impl ServerBuilder {
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
spawn( Exec::spawn(async {
delay_until(Instant::now() + Duration::from_millis(300)).then( Exec::sleep(Duration::from_millis(300)).await;
|_| {
System::current().stop(); System::current().stop();
ready(()) });
},
),
);
} }
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
@ -441,7 +449,10 @@ impl ServerBuilder {
} }
} }
impl Future for ServerBuilder { impl<Exec> Future for ServerBuilder<Exec>
where
Exec: ExecFactory,
{
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -12,6 +12,8 @@ use super::service::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
}; };
use super::Token; use super::Token;
use actix_rt::ExecFactory;
use std::marker::PhantomData;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
@ -72,20 +74,22 @@ impl ServiceConfig {
} }
} }
pub(super) struct ConfiguredService { pub(super) struct ConfiguredService<Exec> {
rt: Box<dyn ServiceRuntimeConfiguration>, rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, net::SocketAddr)>, names: HashMap<Token, (String, net::SocketAddr)>,
topics: HashMap<String, Token>, topics: HashMap<String, Token>,
services: Vec<Token>, services: Vec<Token>,
_exec: PhantomData<Exec>,
} }
impl ConfiguredService { impl<Exec> ConfiguredService<Exec> {
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self { pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
ConfiguredService { ConfiguredService {
rt, rt,
names: HashMap::new(), names: HashMap::new(),
topics: HashMap::new(), topics: HashMap::new(),
services: Vec::new(), services: Vec::new(),
_exec: Default::default(),
} }
} }
@ -96,7 +100,10 @@ impl ConfiguredService {
} }
} }
impl InternalServiceFactory for ConfiguredService { impl<Exec> InternalServiceFactory for ConfiguredService<Exec>
where
Exec: ExecFactory,
{
fn name(&self, token: Token) -> &str { fn name(&self, token: Token) -> &str {
&self.names[&token].0 &self.names[&token].0
} }
@ -107,6 +114,7 @@ impl InternalServiceFactory for ConfiguredService {
names: self.names.clone(), names: self.names.clone(),
topics: self.topics.clone(), topics: self.topics.clone(),
services: self.services.clone(), services: self.services.clone(),
_exec: PhantomData,
}) })
} }
@ -142,7 +150,7 @@ impl InternalServiceFactory for ConfiguredService {
let name = names.remove(&token).unwrap().0; let name = names.remove(&token).unwrap().0;
res.push(( res.push((
token, token,
Box::new(StreamService::new(actix::fn_service( Box::new(StreamService::<_, Exec>::new(actix::fn_service(
move |_: TcpStream| { move |_: TcpStream| {
error!("Service {:?} is not configured", name); error!("Service {:?} is not configured", name);
ok::<_, ()>(()) ok::<_, ()>(())
@ -207,20 +215,22 @@ impl ServiceRuntime {
/// ///
/// Name of the service must be registered during configuration stage with /// Name of the service must be registered during configuration stage with
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&mut self, name: &str, service: F) pub fn service<T, F, Exec>(&mut self, name: &str, service: F)
where where
F: actix::IntoServiceFactory<T>, F: actix::IntoServiceFactory<T>,
T: actix::ServiceFactory<Config = (), Request = TcpStream> + 'static, T: actix::ServiceFactory<Config = (), Request = TcpStream> + 'static,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::InitError: fmt::Debug, T::InitError: fmt::Debug,
Exec: ExecFactory,
{ {
// let name = name.to_owned(); // let name = name.to_owned();
if let Some(token) = self.names.get(name) { if let Some(token) = self.names.get(name) {
self.services.insert( self.services.insert(
*token, *token,
Box::new(ServiceFactory { Box::new(ServiceFactory::<_, Exec> {
inner: service.into_factory(), inner: service.into_factory(),
_exec: PhantomData,
}), }),
); );
} else { } else {
@ -249,17 +259,19 @@ type BoxedNewService = Box<
>, >,
>; >;
struct ServiceFactory<T> { struct ServiceFactory<T, Exec> {
inner: T, inner: T,
_exec: PhantomData<Exec>,
} }
impl<T> actix::ServiceFactory for ServiceFactory<T> impl<T, Exec> actix::ServiceFactory for ServiceFactory<T, Exec>
where where
T: actix::ServiceFactory<Config = (), Request = TcpStream>, T: actix::ServiceFactory<Config = (), Request = TcpStream>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::Error: 'static, T::Error: 'static,
T::InitError: fmt::Debug + 'static, T::InitError: fmt::Debug + 'static,
Exec: ExecFactory,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
@ -273,7 +285,7 @@ where
let fut = self.inner.new_service(()); let fut = self.inner.new_service(());
async move { async move {
match fut.await { match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Ok(s) => Ok(Box::new(StreamService::<_, Exec>::new(s)) as BoxedServerService),
Err(e) => { Err(e) => {
error!("Can not construct service: {:?}", e); error!("Can not construct service: {:?}", e);
Err(()) Err(())

View File

@ -15,6 +15,7 @@ pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::socket::StdStream;
#[doc(hidden)] #[doc(hidden)]
pub use self::socket::FromStream; pub use self::socket::FromStream;

View File

@ -3,6 +3,7 @@ use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_rt::ExecFactory;
use futures_channel::mpsc::UnboundedSender; use futures_channel::mpsc::UnboundedSender;
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::FutureExt; use futures_util::FutureExt;
@ -41,6 +42,11 @@ impl Server {
ServerBuilder::default() ServerBuilder::default()
} }
/// Start server building process with a custom executor
pub fn build_with<Exec: ExecFactory>() -> ServerBuilder<Exec> {
ServerBuilder::<Exec>::new_with()
}
pub(crate) fn signal(&self, sig: Signal) { pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
} }

View File

@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn; use actix_rt::ExecFactory;
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{err, ok, LocalBoxFuture, Ready}; use futures_util::future::{err, ok, LocalBoxFuture, Ready};
@ -48,22 +48,27 @@ pub(crate) type BoxedServerService = Box<
>, >,
>; >;
pub(crate) struct StreamService<T> { pub(crate) struct StreamService<T, Exec> {
service: T, service: T,
_exec: PhantomData<Exec>,
} }
impl<T> StreamService<T> { impl<T, Exec> StreamService<T, Exec> {
pub(crate) fn new(service: T) -> Self { pub(crate) fn new(service: T) -> Self {
StreamService { service } StreamService {
service,
_exec: PhantomData,
}
} }
} }
impl<T, I> Service for StreamService<T> impl<T, I, Exec> Service for StreamService<T, Exec>
where where
T: Service<Request = I>, T: Service<Request = I>,
T::Future: 'static, T::Future: 'static,
T::Error: 'static, T::Error: 'static,
I: FromStream, I: FromStream,
Exec: ExecFactory,
{ {
type Request = (Option<CounterGuard>, ServerMessage); type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
@ -83,7 +88,7 @@ where
if let Ok(stream) = stream { if let Ok(stream) = stream {
let f = self.service.call(stream); let f = self.service.call(stream);
spawn(async move { Exec::spawn(async move {
let _ = f.await; let _ = f.await;
drop(guard); drop(guard);
}); });
@ -97,18 +102,24 @@ where
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F, Io, Exec>
where
F: ServiceFactory<Io>,
Io: FromStream,
Exec: ExecFactory,
{
name: String, name: String,
inner: F, inner: F,
token: Token, token: Token,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>, _t: PhantomData<(Io, Exec)>,
} }
impl<F, Io> StreamNewService<F, Io> impl<F, Io, Exec> StreamNewService<F, Io, Exec>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
Exec: ExecFactory,
{ {
pub(crate) fn create( pub(crate) fn create(
name: String, name: String,
@ -126,10 +137,11 @@ where
} }
} }
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> impl<F, Io, Exec> InternalServiceFactory for StreamNewService<F, Io, Exec>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
Exec: ExecFactory,
{ {
fn name(&self, _: Token) -> &str { fn name(&self, _: Token) -> &str {
&self.name &self.name
@ -152,7 +164,8 @@ where
.new_service(()) .new_service(())
.map_err(|_| ()) .map_err(|_| ())
.map_ok(move |inner| { .map_ok(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner)); let service: BoxedServerService =
Box::new(StreamService::<_, Exec>::new(inner));
vec![(token, service)] vec![(token, service)]
}) })
.boxed_local() .boxed_local()

View File

@ -6,6 +6,8 @@ use std::task::{Context, Poll};
use futures_util::future::lazy; use futures_util::future::lazy;
use crate::server::Server; use crate::server::Server;
use actix_rt::ExecFactory;
use std::marker::PhantomData;
/// Different types of process signals /// Different types of process signals
#[allow(dead_code)] #[allow(dead_code)]
@ -21,20 +23,24 @@ pub(crate) enum Signal {
Quit, Quit,
} }
pub(crate) struct Signals { pub(crate) struct Signals<Exec> {
srv: Server, srv: Server,
#[cfg(not(unix))] #[cfg(not(unix))]
stream: Pin<Box<dyn Future<Output = io::Result<()>>>>, stream: Pin<Box<dyn Future<Output = io::Result<()>>>>,
#[cfg(unix)] #[cfg(unix)]
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
_exec: PhantomData<Exec>,
} }
impl Signals { impl<Exec> Signals<Exec>
where
Exec: ExecFactory,
{
pub(crate) fn start(srv: Server) -> io::Result<()> { pub(crate) fn start(srv: Server) -> io::Result<()> {
actix_rt::spawn(lazy(|_| { Exec::spawn(lazy(|_| {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
actix_rt::spawn(Signals { Exec::spawn(Signals {
srv, srv,
stream: Box::pin(actix_rt::signal::ctrl_c()), stream: Box::pin(actix_rt::signal::ctrl_c()),
}); });
@ -63,7 +69,11 @@ impl Signals {
} }
} }
actix_rt::spawn(Signals { srv, streams }) Exec::spawn(Self {
srv,
streams,
_exec: PhantomData,
})
} }
})); }));
@ -71,7 +81,10 @@ impl Signals {
} }
} }
impl Future for Signals { impl<Exec> Future for Signals<Exec>
where
Exec: Unpin,
{
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -1,6 +1,5 @@
use std::{fmt, io, net}; use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
pub(crate) enum StdListener { pub(crate) enum StdListener {
@ -143,7 +142,7 @@ impl mio::Evented for SocketListener {
} }
} }
pub trait FromStream: AsyncRead + AsyncWrite + Sized { pub trait FromStream: Sized + Send + 'static {
fn from_stdstream(sock: StdStream) -> io::Result<Self>; fn from_stdstream(sock: StdStream) -> io::Result<Self>;
} }

View File

@ -1,11 +1,11 @@
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time; use std::time;
use actix_rt::time::{delay_until, Delay, Instant}; use actix_rt::{Arbiter, ExecFactory};
use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot; use futures_channel::oneshot;
@ -125,15 +125,19 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream /// Worker accepts Socket objects via unbounded channel and starts stream
/// processing. /// processing.
pub(crate) struct Worker { pub(crate) struct Worker<Exec>
where
Exec: ExecFactory,
{
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>, rx2: UnboundedReceiver<StopCommand>,
services: Vec<WorkerService>, services: Vec<WorkerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState<Exec>,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
_exec: PhantomData<Exec>,
} }
struct WorkerService { struct WorkerService {
@ -159,7 +163,10 @@ enum WorkerServiceStatus {
Stopped, Stopped,
} }
impl Worker { impl<Exec> Worker<Exec>
where
Exec: ExecFactory,
{
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
@ -170,10 +177,10 @@ impl Worker {
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = availability.clone(); let avail = availability.clone();
Arbiter::new().send( Arbiter::new_with::<Exec>().send(
async move { async move {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { let mut wrk: Worker<Exec> = MAX_CONNS_COUNTER.with(move |conns| Worker {
rx, rx,
rx2, rx2,
availability, availability,
@ -182,6 +189,7 @@ impl Worker {
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()), state: WorkerState::Unavailable(Vec::new()),
_exec: PhantomData,
}); });
let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new(); let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
@ -193,7 +201,7 @@ impl Worker {
})); }));
} }
spawn(async move { Exec::spawn(async move {
let res = join_all(fut).await; let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect(); let res: Result<Vec<_>, _> = res.into_iter().collect();
match res { match res {
@ -228,7 +236,7 @@ impl Worker {
self.services.iter_mut().for_each(|srv| { self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped; srv.status = WorkerServiceStatus::Stopped;
actix_rt::spawn( Exec::spawn(
srv.service srv.service
.call((None, ServerMessage::ForceShutdown)) .call((None, ServerMessage::ForceShutdown))
.map(|_| ()), .map(|_| ()),
@ -240,7 +248,7 @@ impl Worker {
self.services.iter_mut().for_each(move |srv| { self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping; srv.status = WorkerServiceStatus::Stopping;
actix_rt::spawn( Exec::spawn(
srv.service srv.service
.call((None, ServerMessage::Shutdown(timeout))) .call((None, ServerMessage::Shutdown(timeout)))
.map(|_| ()), .map(|_| ()),
@ -297,7 +305,7 @@ impl Worker {
} }
} }
enum WorkerState { enum WorkerState<Exec: ExecFactory> {
Available, Available,
Unavailable(Vec<Conn>), Unavailable(Vec<Conn>),
Restarting( Restarting(
@ -306,14 +314,13 @@ enum WorkerState {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>, Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
), ),
Shutdown( Shutdown(Exec::Sleep, Exec::Sleep, Option<oneshot::Sender<bool>>),
Pin<Box<Delay>>,
Pin<Box<Delay>>,
Option<oneshot::Sender<bool>>,
),
} }
impl Future for Worker { impl<Exec> Future for Worker<Exec>
where
Exec: ExecFactory,
{
type Output = (); type Output = ();
// FIXME: remove this attribute // FIXME: remove this attribute
@ -335,8 +342,8 @@ impl Future for Worker {
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))), Exec::sleep(time::Duration::from_secs(1)),
Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), Exec::sleep(self.shutdown_timeout),
Some(result), Some(result),
); );
} else { } else {
@ -423,31 +430,24 @@ impl Future for Worker {
} }
// check graceful timeout // check graceful timeout
match t2.as_mut().poll(cx) { if Pin::new(t2).poll(cx).is_ready() {
Poll::Pending => (),
Poll::Ready(_) => {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); self.shutdown(true);
Arbiter::current().stop(); Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
}
// sleep for 1 second and then check again // sleep for 1 second and then check again
match t1.as_mut().poll(cx) { if Pin::new(&mut *t1).poll(cx).is_ready() {
Poll::Pending => (), *t1 = Exec::sleep(time::Duration::from_secs(1));
Poll::Ready(_) => { let _ = Pin::new(t1).poll(cx);
*t1 = Box::pin(delay_until(
Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);
}
} }
Poll::Pending Poll::Pending
} }
WorkerState::Available => { WorkerState::Available => {
loop { loop {
match Pin::new(&mut self.rx).poll_next(cx) { return match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming io stream // handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => { Poll::Ready(Some(WorkerCommand(msg))) => {
match self.check_readiness(cx) { match self.check_readiness(cx) {
@ -478,14 +478,14 @@ impl Future for Worker {
); );
} }
} }
return self.poll(cx); self.poll(cx)
} }
Poll::Pending => { Poll::Pending => {
self.state = WorkerState::Available; self.state = WorkerState::Available;
return Poll::Pending; Poll::Pending
}
Poll::Ready(None) => return Poll::Ready(()),
} }
Poll::Ready(None) => Poll::Ready(()),
};
} }
} }
} }

View File

@ -2,10 +2,12 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use std::{net, thread, time}; use std::{net, thread, time};
use actix_rt::DefaultExec;
use actix_server::Server; use actix_server::Server;
use actix_service::fn_service; use actix_service::fn_service;
use futures_util::future::{lazy, ok}; use futures_util::future::{lazy, ok};
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
use tokio::net::TcpStream;
fn unused_addr() -> net::SocketAddr { fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
@ -28,7 +30,9 @@ fn test_bind() {
Server::build() Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .bind("test", addr, move || {
fn_service(|_: TcpStream| ok::<_, ()>(()))
})
.unwrap() .unwrap()
.start() .start()
})); }));
@ -165,8 +169,14 @@ fn test_configure() {
.listen("addr3", lst) .listen("addr3", lst)
.apply(move |rt| { .apply(move |rt| {
let num = num.clone(); let num = num.clone();
rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); rt.service::<_, _, DefaultExec>(
rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); "addr1",
fn_service(|_| ok::<_, ()>(())),
);
rt.service::<_, _, DefaultExec>(
"addr3",
fn_service(|_| ok::<_, ()>(())),
);
rt.on_start(lazy(move |_| { rt.on_start(lazy(move |_| {
let _ = num.fetch_add(1, Relaxed); let _ = num.fetch_add(1, Relaxed);
})) }))