diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 4eb65991..c1a273f4 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -130,8 +130,14 @@ impl SystemRunner { } } + pub fn spawn(&mut self, fut: F) + where + F: Future + 'static, + { + E::spawn_ref(&mut self.rt, fut); + } + /// Execute a future and wait for result. - #[inline] pub fn block_on(&mut self, fut: F) -> O where F: Future, diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 8d92eca5..7078c7ad 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -33,7 +33,7 @@ pub fn spawn(f: F) where F: Future + 'static, { - DefaultExec::spawn(f) + DefaultExec::spawn(f); } /// Asynchronous signal handling diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 91920d11..b6823376 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,13 +1,16 @@ use std::future::Future; use std::io; +use std::time::Duration; + use tokio::{runtime, task::LocalSet}; /// A trait for construct async executor and run future on it. /// /// A factory trait is necessary as `actix` and `actix-web` can run on multiple instances of /// executors. Therefore the executor would be constructed multiple times -pub trait ExecFactory: Sized + Unpin + 'static { +pub trait ExecFactory: Sized + Send + Sync + Unpin + 'static { type Executor; + type Sleep: Future + Send + Unpin + 'static; fn build() -> io::Result; @@ -64,6 +67,9 @@ pub trait ExecFactory: Sized + Unpin + 'static { /// /// *. `spawn_ref` is preferred when you can choose between it and `spawn`. fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F); + + /// Get a timeout sleep future with given duration. + fn sleep(dur: Duration) -> Self::Sleep; } /// Default Single-threaded tokio executor on the current thread. @@ -78,6 +84,7 @@ pub type DefaultExecutor = (runtime::Runtime, LocalSet); impl ExecFactory for DefaultExec { type Executor = DefaultExecutor; + type Sleep = tokio::time::Delay; fn build() -> io::Result { let rt = runtime::Builder::new() @@ -95,6 +102,7 @@ impl ExecFactory for DefaultExec { rt.block_on(local.run_until(f)) } + #[inline] fn spawn + 'static>(f: F) { tokio::task::spawn_local(f); } @@ -102,4 +110,9 @@ impl ExecFactory for DefaultExec { fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { exec.1.spawn_local(f); } + + #[inline] + fn sleep(dur: Duration) -> Self::Sleep { + tokio::time::delay_for(dur) + } } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 4e228d38..dcf70f25 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -74,6 +74,7 @@ struct TokioCompatExec; impl ExecFactory for TokioCompatExec { type Executor = tokio_compat::runtime::current_thread::Runtime; + type Sleep = tokio::time::Delay; fn build() -> std::io::Result { let rt = tokio_compat::runtime::current_thread::Runtime::new()?; @@ -94,6 +95,10 @@ impl ExecFactory for TokioCompatExec { fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { exec.spawn_std(f); } + + fn sleep(dur: Duration) -> Self::Sleep { + tokio::time::delay_for(dur) + } } #[test] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 1a67f61c..189e9c54 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -41,4 +41,5 @@ mio-uds = { version = "0.6.7" } bytes = "0.5" env_logger = "0.7" actix-testing = "1.0.0" +async-std = { version = "1.6.5", features = ["unstable", "tokio02"] } tokio = { version = "0.2", features = ["io-util"] } diff --git a/actix-server/examples/custom_executor.rs b/actix-server/examples/custom_executor.rs new file mode 100644 index 00000000..81807274 --- /dev/null +++ b/actix-server/examples/custom_executor.rs @@ -0,0 +1,128 @@ +//! Simple composite-service TCP echo server. +//! +//! Using the following command: +//! +//! ```sh +//! nc 127.0.0.1 8080 +//! ``` +//! +//! Start typing. When you press enter the typed line will be echoed back. The server will log +//! the length of each line it echos and the total size of data sent when the connection is closed. + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use std::{env, io}; + +use actix_rt::ExecFactory; +use actix_server::{FromStream, Server, StdStream}; +use actix_service::pipeline_factory; +use futures_util::future::ok; +use log::{error, info}; + +fn main() -> io::Result<()> { + actix_rt::System::new_with::("actix").block_on(async { + env::set_var("RUST_LOG", "actix=trace,basic=trace"); + env_logger::init(); + + let count = Arc::new(AtomicUsize::new(0)); + + let addr = ("127.0.0.1", 8080); + info!("starting server on port: {}", &addr.0); + + // Bind socket address and start worker(s). By default, the server uses the number of available + // logical CPU cores as the worker count. For this reason, the closure passed to bind needs + // to return a service *factory*; so it can be created once per worker. + Server::build_with::() + .bind("echo", addr, move || { + let count = Arc::clone(&count); + let num2 = Arc::clone(&count); + + pipeline_factory(move |mut stream: AsyncStdTcpStream| { + let count = Arc::clone(&count); + + async move { + let num = count.fetch_add(1, Ordering::SeqCst); + let num = num + 1; + + let mut size = 0; + let mut buf = vec![0; 1024]; + + use async_std::prelude::*; + + loop { + match stream.0.read(&mut buf).await { + // end of stream; bail from loop + Ok(0) => break, + + // more bytes to process + Ok(bytes_read) => { + info!("[{}] read {} bytes", num, bytes_read); + stream.0.write_all(&buf[size..]).await.unwrap(); + size += bytes_read; + } + + // stream error; bail from loop with error + Err(err) => { + error!("Stream Error: {:?}", err); + return Err(()); + } + } + } + + // send data down service pipeline + Ok((buf.len(), size)) + } + }) + .map_err(|err| error!("Service Error: {:?}", err)) + .and_then(move |(_, size)| { + let num = num2.load(Ordering::SeqCst); + info!("[{}] total bytes read: {}", num, size); + ok(size) + }) + })? + .workers(1) + .run() + .await + }) +} + +struct AsyncStdExec; + +struct AsyncStdTcpStream(async_std::net::TcpStream); + +impl FromStream for AsyncStdTcpStream { + fn from_stdstream(stream: StdStream) -> std::io::Result { + match stream { + StdStream::Tcp(tcp) => Ok(AsyncStdTcpStream(async_std::net::TcpStream::from(tcp))), + _ => unimplemented!(), + } + } +} + +impl ExecFactory for AsyncStdExec { + type Executor = (); + type Sleep = Pin + Send + 'static>>; + + fn build() -> std::io::Result { + Ok(()) + } + + fn block_on(_: &mut Self::Executor, f: F) -> ::Output { + async_std::task::block_on(f) + } + + fn spawn + 'static>(f: F) { + async_std::task::spawn_local(f); + } + + fn spawn_ref + 'static>(_: &mut Self::Executor, f: F) { + async_std::task::spawn_local(f); + } + + fn sleep(dur: Duration) -> Self::Sleep { + Box::pin(async_std::task::sleep(dur)) + } +} diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 4dc218fd..60d6a98d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,9 +1,9 @@ +use std::marker::PhantomData; use std::sync::mpsc as sync_mpsc; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{io, thread}; -use actix_rt::time::{delay_until, Instant}; -use actix_rt::System; +use actix_rt::{ExecFactory, System}; use log::{error, info}; use slab::Slab; @@ -81,14 +81,16 @@ impl AcceptLoop { AcceptNotify::new(self.notify_ready.clone()) } - pub(crate) fn start( + pub(crate) fn start( &mut self, socks: Vec<(Token, StdListener)>, workers: Vec, - ) { + ) where + Exec: ExecFactory, + { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - Accept::start( + Accept::::start( self.rx.take().expect("Can not re-use AcceptInfo"), self.cmd_reg.take().expect("Can not re-use AcceptInfo"), self.notify_reg.take().expect("Can not re-use AcceptInfo"), @@ -99,7 +101,7 @@ impl AcceptLoop { } } -struct Accept { +struct Accept { poll: mio::Poll, rx: sync_mpsc::Receiver, sockets: Slab, @@ -108,6 +110,7 @@ struct Accept { timer: (mio::Registration, mio::SetReadiness), next: usize, backpressure: bool, + _exec: PhantomData, } const DELTA: usize = 100; @@ -128,7 +131,10 @@ fn connection_error(e: &io::Error) -> bool { || e.kind() == io::ErrorKind::ConnectionReset } -impl Accept { +impl Accept +where + Exec: ExecFactory, +{ #![allow(clippy::too_many_arguments)] pub(crate) fn start( rx: sync_mpsc::Receiver, @@ -145,7 +151,7 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let mut accept = Accept::new(rx, socks, workers, srv); + let mut accept = Accept::::new(rx, socks, workers, srv); // Start listening for incoming commands if let Err(err) = accept.poll.register( @@ -176,7 +182,7 @@ impl Accept { socks: Vec<(Token, StdListener)>, workers: Vec, srv: Server, - ) -> Accept { + ) -> Accept { // Create a poll instance let poll = match mio::Poll::new() { Ok(poll) => poll, @@ -227,6 +233,7 @@ impl Accept { next: 0, timer: (tm, tmr), backpressure: false, + _exec: PhantomData, } } @@ -462,7 +469,7 @@ impl Accept { let r = self.timer.1.clone(); System::current().arbiter().send(Box::pin(async move { - delay_until(Instant::now() + Duration::from_millis(510)).await; + Exec::sleep(Duration::from_millis(510)).await; let _ = r.set_readiness(mio::Ready::readable()); })); return; diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 8a90d598..0c4e909f 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,11 +1,11 @@ +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::{io, mem, net}; use actix_rt::net::TcpStream; -use actix_rt::time::{delay_until, Instant}; -use actix_rt::{spawn, System}; +use actix_rt::{DefaultExec, ExecFactory, System}; use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::oneshot; use futures_util::future::ready; @@ -21,10 +21,10 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::StdListener; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use crate::Token; +use crate::{FromStream, Token}; /// Server builder -pub struct ServerBuilder { +pub struct ServerBuilder { threads: usize, token: Token, backlog: i32, @@ -38,6 +38,7 @@ pub struct ServerBuilder { cmd: UnboundedReceiver, server: Server, notify: Vec>, + _exec: PhantomData, } impl Default for ServerBuilder { @@ -46,9 +47,20 @@ impl Default for ServerBuilder { } } -impl ServerBuilder { - /// Create new Server builder instance - pub fn new() -> ServerBuilder { +impl ServerBuilder +where + Exec: ExecFactory, +{ + /// Create new Server builder instance with default tokio executor. + pub fn new() -> Self { + ServerBuilder::::new_with() + } + + /// Create new Server builder instance with a generic executor. + pub fn new_with() -> ServerBuilder + where + E: ExecFactory, + { let (tx, rx) = unbounded(); let server = Server::new(tx); @@ -66,6 +78,7 @@ impl ServerBuilder { cmd: rx, notify: Vec::new(), server, + _exec: Default::default(), } } @@ -134,7 +147,7 @@ impl ServerBuilder { /// /// This function is useful for moving parts of configuration to a /// different module or even library. - pub fn configure(mut self, f: F) -> io::Result + pub fn configure(mut self, f: F) -> io::Result> where F: Fn(&mut ServiceConfig) -> io::Result<()>, { @@ -143,7 +156,7 @@ impl ServerBuilder { f(&mut cfg)?; if let Some(apply) = cfg.apply { - let mut srv = ConfiguredService::new(apply); + let mut srv = ConfiguredService::::new(apply); for (name, lst) in cfg.services { let token = self.token.next(); srv.stream(token, name.clone(), lst.local_addr()?); @@ -157,16 +170,18 @@ impl ServerBuilder { } /// Add new service to the server. - pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result + pub fn bind(mut self, name: N, addr: U, factory: F) -> io::Result where - F: ServiceFactory, + F: ServiceFactory, U: net::ToSocketAddrs, + N: AsRef, + S: FromStream, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { let token = self.token.next(); - self.services.push(StreamNewService::create( + self.services.push(StreamNewService::<_, _, Exec>::create( name.as_ref().to_string(), token, factory.clone(), @@ -180,11 +195,12 @@ impl ServerBuilder { #[cfg(all(unix))] /// Add new unix domain service to the server. - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result + pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where - F: ServiceFactory, - N: AsRef, + F: ServiceFactory, U: AsRef, + N: AsRef, + S: FromStream, { use std::os::unix::net::UnixListener; @@ -205,19 +221,21 @@ impl ServerBuilder { /// Add new unix domain service to the server. /// Useful when running as a systemd service and /// a socket FD can be acquired using the systemd crate. - pub fn listen_uds>( + pub fn listen_uds( mut self, name: N, lst: std::os::unix::net::UnixListener, factory: F, ) -> io::Result where - F: ServiceFactory, + F: ServiceFactory, + N: AsRef, + S: FromStream, { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; let token = self.token.next(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( + self.services.push(StreamNewService::<_, _, Exec>::create( name.as_ref().to_string(), token, factory, @@ -239,7 +257,7 @@ impl ServerBuilder { F: ServiceFactory, { let token = self.token.next(); - self.services.push(StreamNewService::create( + self.services.push(StreamNewService::<_, _, Exec>::create( name.as_ref().to_string(), token, factory, @@ -276,7 +294,7 @@ impl ServerBuilder { for sock in &self.sockets { info!("Starting \"{}\" service on {}", sock.1, sock.2); } - self.accept.start( + self.accept.start::( mem::take(&mut self.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -286,12 +304,12 @@ impl ServerBuilder { // handle signals if !self.no_signals { - Signals::start(self.server.clone()).unwrap(); + Signals::::start(self.server.clone()).unwrap(); } // start http server actor let server = self.server.clone(); - spawn(self); + Exec::spawn(self); server } } @@ -301,7 +319,7 @@ impl ServerBuilder { let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Worker::start(idx, services, avail, self.shutdown_timeout) + Worker::::start(idx, services, avail, self.shutdown_timeout) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -360,7 +378,7 @@ impl ServerBuilder { // stop workers if !self.workers.is_empty() && graceful { - spawn( + Exec::spawn( self.workers .iter() .map(move |worker| worker.1.stop(graceful)) @@ -374,16 +392,10 @@ impl ServerBuilder { let _ = tx.send(()); } if exit { - spawn( - async { - delay_until( - Instant::now() + Duration::from_millis(300), - ) - .await; - System::current().stop(); - } - .boxed(), - ); + Exec::spawn(async { + Exec::sleep(Duration::from_millis(300)).await; + System::current().stop(); + }); } ready(()) }), @@ -391,14 +403,10 @@ impl ServerBuilder { } else { // we need to stop system if server was spawned if self.exit { - spawn( - delay_until(Instant::now() + Duration::from_millis(300)).then( - |_| { - System::current().stop(); - ready(()) - }, - ), - ); + Exec::spawn(async { + Exec::sleep(Duration::from_millis(300)).await; + System::current().stop(); + }); } if let Some(tx) = completion { let _ = tx.send(()); @@ -441,7 +449,10 @@ impl ServerBuilder { } } -impl Future for ServerBuilder { +impl Future for ServerBuilder +where + Exec: ExecFactory, +{ type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index fda1ade9..6004290b 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -12,6 +12,8 @@ use super::service::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; use super::Token; +use actix_rt::ExecFactory; +use std::marker::PhantomData; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, @@ -72,20 +74,22 @@ impl ServiceConfig { } } -pub(super) struct ConfiguredService { +pub(super) struct ConfiguredService { rt: Box, names: HashMap, topics: HashMap, services: Vec, + _exec: PhantomData, } -impl ConfiguredService { +impl ConfiguredService { pub(super) fn new(rt: Box) -> Self { ConfiguredService { rt, names: HashMap::new(), topics: HashMap::new(), services: Vec::new(), + _exec: Default::default(), } } @@ -96,7 +100,10 @@ impl ConfiguredService { } } -impl InternalServiceFactory for ConfiguredService { +impl InternalServiceFactory for ConfiguredService +where + Exec: ExecFactory, +{ fn name(&self, token: Token) -> &str { &self.names[&token].0 } @@ -107,6 +114,7 @@ impl InternalServiceFactory for ConfiguredService { names: self.names.clone(), topics: self.topics.clone(), services: self.services.clone(), + _exec: PhantomData, }) } @@ -142,7 +150,7 @@ impl InternalServiceFactory for ConfiguredService { let name = names.remove(&token).unwrap().0; res.push(( token, - Box::new(StreamService::new(actix::fn_service( + Box::new(StreamService::<_, Exec>::new(actix::fn_service( move |_: TcpStream| { error!("Service {:?} is not configured", name); ok::<_, ()>(()) @@ -207,20 +215,22 @@ impl ServiceRuntime { /// /// Name of the service must be registered during configuration stage with /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. - pub fn service(&mut self, name: &str, service: F) + pub fn service(&mut self, name: &str, service: F) where F: actix::IntoServiceFactory, T: actix::ServiceFactory + 'static, T::Future: 'static, T::Service: 'static, T::InitError: fmt::Debug, + Exec: ExecFactory, { // let name = name.to_owned(); if let Some(token) = self.names.get(name) { self.services.insert( *token, - Box::new(ServiceFactory { + Box::new(ServiceFactory::<_, Exec> { inner: service.into_factory(), + _exec: PhantomData, }), ); } else { @@ -249,17 +259,19 @@ type BoxedNewService = Box< >, >; -struct ServiceFactory { +struct ServiceFactory { inner: T, + _exec: PhantomData, } -impl actix::ServiceFactory for ServiceFactory +impl actix::ServiceFactory for ServiceFactory where T: actix::ServiceFactory, T::Future: 'static, T::Service: 'static, T::Error: 'static, T::InitError: fmt::Debug + 'static, + Exec: ExecFactory, { type Request = (Option, ServerMessage); type Response = (); @@ -273,7 +285,7 @@ where let fut = self.inner.new_service(()); async move { match fut.await { - Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), + Ok(s) => Ok(Box::new(StreamService::<_, Exec>::new(s)) as BoxedServerService), Err(e) => { error!("Can not construct service: {:?}", e); Err(()) diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 8efc29d3..01f486f6 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -15,6 +15,7 @@ pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; +pub use self::socket::StdStream; #[doc(hidden)] pub use self::socket::FromStream; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index b29a9e02..54930529 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -3,6 +3,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use actix_rt::ExecFactory; use futures_channel::mpsc::UnboundedSender; use futures_channel::oneshot; use futures_util::FutureExt; @@ -41,6 +42,11 @@ impl Server { ServerBuilder::default() } + /// Start server building process with a custom executor + pub fn build_with() -> ServerBuilder { + ServerBuilder::::new_with() + } + pub(crate) fn signal(&self, sig: Signal) { let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 984e5228..77145e2f 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use std::time::Duration; -use actix_rt::spawn; +use actix_rt::ExecFactory; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_utils::counter::CounterGuard; use futures_util::future::{err, ok, LocalBoxFuture, Ready}; @@ -48,22 +48,27 @@ pub(crate) type BoxedServerService = Box< >, >; -pub(crate) struct StreamService { +pub(crate) struct StreamService { service: T, + _exec: PhantomData, } -impl StreamService { +impl StreamService { pub(crate) fn new(service: T) -> Self { - StreamService { service } + StreamService { + service, + _exec: PhantomData, + } } } -impl Service for StreamService +impl Service for StreamService where T: Service, T::Future: 'static, T::Error: 'static, I: FromStream, + Exec: ExecFactory, { type Request = (Option, ServerMessage); type Response = (); @@ -83,7 +88,7 @@ where if let Ok(stream) = stream { let f = self.service.call(stream); - spawn(async move { + Exec::spawn(async move { let _ = f.await; drop(guard); }); @@ -97,18 +102,24 @@ where } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService +where + F: ServiceFactory, + Io: FromStream, + Exec: ExecFactory, +{ name: String, inner: F, token: Token, addr: SocketAddr, - _t: PhantomData, + _t: PhantomData<(Io, Exec)>, } -impl StreamNewService +impl StreamNewService where F: ServiceFactory, Io: FromStream + Send + 'static, + Exec: ExecFactory, { pub(crate) fn create( name: String, @@ -126,10 +137,11 @@ where } } -impl InternalServiceFactory for StreamNewService +impl InternalServiceFactory for StreamNewService where F: ServiceFactory, Io: FromStream + Send + 'static, + Exec: ExecFactory, { fn name(&self, _: Token) -> &str { &self.name @@ -152,7 +164,8 @@ where .new_service(()) .map_err(|_| ()) .map_ok(move |inner| { - let service: BoxedServerService = Box::new(StreamService::new(inner)); + let service: BoxedServerService = + Box::new(StreamService::<_, Exec>::new(inner)); vec![(token, service)] }) .boxed_local() diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index b6339621..c531e455 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -6,6 +6,8 @@ use std::task::{Context, Poll}; use futures_util::future::lazy; use crate::server::Server; +use actix_rt::ExecFactory; +use std::marker::PhantomData; /// Different types of process signals #[allow(dead_code)] @@ -21,20 +23,24 @@ pub(crate) enum Signal { Quit, } -pub(crate) struct Signals { +pub(crate) struct Signals { srv: Server, #[cfg(not(unix))] stream: Pin>>>, #[cfg(unix)] streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, + _exec: PhantomData, } -impl Signals { +impl Signals +where + Exec: ExecFactory, +{ pub(crate) fn start(srv: Server) -> io::Result<()> { - actix_rt::spawn(lazy(|_| { + Exec::spawn(lazy(|_| { #[cfg(not(unix))] { - actix_rt::spawn(Signals { + Exec::spawn(Signals { srv, stream: Box::pin(actix_rt::signal::ctrl_c()), }); @@ -63,7 +69,11 @@ impl Signals { } } - actix_rt::spawn(Signals { srv, streams }) + Exec::spawn(Self { + srv, + streams, + _exec: PhantomData, + }) } })); @@ -71,7 +81,10 @@ impl Signals { } } -impl Future for Signals { +impl Future for Signals +where + Exec: Unpin, +{ type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 3025660a..7155f9dd 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,6 +1,5 @@ use std::{fmt, io, net}; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; pub(crate) enum StdListener { @@ -143,7 +142,7 @@ impl mio::Evented for SocketListener { } } -pub trait FromStream: AsyncRead + AsyncWrite + Sized { +pub trait FromStream: Sized + Send + 'static { fn from_stdstream(sock: StdStream) -> io::Result; } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 35331757..7528ee28 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,11 +1,11 @@ +use std::marker::PhantomData; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time; -use actix_rt::time::{delay_until, Delay, Instant}; -use actix_rt::{spawn, Arbiter}; +use actix_rt::{Arbiter, ExecFactory}; use actix_utils::counter::Counter; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot; @@ -125,15 +125,19 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream /// processing. -pub(crate) struct Worker { +pub(crate) struct Worker +where + Exec: ExecFactory, +{ rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Vec, availability: WorkerAvailability, conns: Counter, factories: Vec>, - state: WorkerState, + state: WorkerState, shutdown_timeout: time::Duration, + _exec: PhantomData, } struct WorkerService { @@ -159,7 +163,10 @@ enum WorkerServiceStatus { Stopped, } -impl Worker { +impl Worker +where + Exec: ExecFactory, +{ pub(crate) fn start( idx: usize, factories: Vec>, @@ -170,10 +177,10 @@ impl Worker { let (tx2, rx2) = unbounded(); let avail = availability.clone(); - Arbiter::new().send( + Arbiter::new_with::().send( async move { availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + let mut wrk: Worker = MAX_CONNS_COUNTER.with(move |conns| Worker { rx, rx2, availability, @@ -182,6 +189,7 @@ impl Worker { services: Vec::new(), conns: conns.clone(), state: WorkerState::Unavailable(Vec::new()), + _exec: PhantomData, }); let mut fut: Vec, _>> = Vec::new(); @@ -193,7 +201,7 @@ impl Worker { })); } - spawn(async move { + Exec::spawn(async move { let res = join_all(fut).await; let res: Result, _> = res.into_iter().collect(); match res { @@ -228,7 +236,7 @@ impl Worker { self.services.iter_mut().for_each(|srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopped; - actix_rt::spawn( + Exec::spawn( srv.service .call((None, ServerMessage::ForceShutdown)) .map(|_| ()), @@ -240,7 +248,7 @@ impl Worker { self.services.iter_mut().for_each(move |srv| { if srv.status == WorkerServiceStatus::Available { srv.status = WorkerServiceStatus::Stopping; - actix_rt::spawn( + Exec::spawn( srv.service .call((None, ServerMessage::Shutdown(timeout))) .map(|_| ()), @@ -297,7 +305,7 @@ impl Worker { } } -enum WorkerState { +enum WorkerState { Available, Unavailable(Vec), Restarting( @@ -306,14 +314,13 @@ enum WorkerState { #[allow(clippy::type_complexity)] Pin, ()>>>>, ), - Shutdown( - Pin>, - Pin>, - Option>, - ), + Shutdown(Exec::Sleep, Exec::Sleep, Option>), } -impl Future for Worker { +impl Future for Worker +where + Exec: ExecFactory, +{ type Output = (); // FIXME: remove this attribute @@ -335,8 +342,8 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - Box::pin(delay_until(Instant::now() + time::Duration::from_secs(1))), - Box::pin(delay_until(Instant::now() + self.shutdown_timeout)), + Exec::sleep(time::Duration::from_secs(1)), + Exec::sleep(self.shutdown_timeout), Some(result), ); } else { @@ -423,31 +430,24 @@ impl Future for Worker { } // check graceful timeout - match t2.as_mut().poll(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); - Arbiter::current().stop(); - return Poll::Ready(()); - } + if Pin::new(t2).poll(cx).is_ready() { + let _ = tx.take().unwrap().send(false); + self.shutdown(true); + Arbiter::current().stop(); + return Poll::Ready(()); } // sleep for 1 second and then check again - match t1.as_mut().poll(cx) { - Poll::Pending => (), - Poll::Ready(_) => { - *t1 = Box::pin(delay_until( - Instant::now() + time::Duration::from_secs(1), - )); - let _ = t1.as_mut().poll(cx); - } + if Pin::new(&mut *t1).poll(cx).is_ready() { + *t1 = Exec::sleep(time::Duration::from_secs(1)); + let _ = Pin::new(t1).poll(cx); } + Poll::Pending } WorkerState::Available => { loop { - match Pin::new(&mut self.rx).poll_next(cx) { + return match Pin::new(&mut self.rx).poll_next(cx) { // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { match self.check_readiness(cx) { @@ -478,14 +478,14 @@ impl Future for Worker { ); } } - return self.poll(cx); + self.poll(cx) } Poll::Pending => { self.state = WorkerState::Available; - return Poll::Pending; + Poll::Pending } - Poll::Ready(None) => return Poll::Ready(()), - } + Poll::Ready(None) => Poll::Ready(()), + }; } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 838c3cf1..26a682eb 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -2,10 +2,12 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; +use actix_rt::DefaultExec; use actix_server::Server; use actix_service::fn_service; use futures_util::future::{lazy, ok}; use socket2::{Domain, Protocol, Socket, Type}; +use tokio::net::TcpStream; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); @@ -28,7 +30,9 @@ fn test_bind() { Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .bind("test", addr, move || { + fn_service(|_: TcpStream| ok::<_, ()>(())) + }) .unwrap() .start() })); @@ -165,8 +169,14 @@ fn test_configure() { .listen("addr3", lst) .apply(move |rt| { let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); + rt.service::<_, _, DefaultExec>( + "addr1", + fn_service(|_| ok::<_, ()>(())), + ); + rt.service::<_, _, DefaultExec>( + "addr3", + fn_service(|_| ok::<_, ()>(())), + ); rt.on_start(lazy(move |_| { let _ = num.fetch_add(1, Relaxed); }))