From 81bbf725a6f3217b0ae358ff3314e957bb1fa934 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 30 Oct 2020 20:50:05 +0800 Subject: [PATCH] add comment for custom executor example --- actix-rt/src/arbiter.rs | 6 +++--- actix-rt/src/lib.rs | 4 ++-- actix-rt/src/runtime.rs | 8 +++----- actix-rt/src/system.rs | 8 ++++---- actix-server/Cargo.toml | 9 +++++---- actix-server/examples/custom_executor.rs | 14 ++++++++++---- actix-server/src/accept.rs | 4 ++-- actix-server/src/builder.rs | 11 ++++++----- actix-server/src/config.rs | 9 +++++---- actix-server/src/signals.rs | 10 +++++----- actix-server/src/worker.rs | 6 ++---- actix-server/tests/test_server.rs | 6 +++--- actix-tls/examples/basic.rs | 5 +++-- 13 files changed, 53 insertions(+), 47 deletions(-) diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 0e42838a..20f147d8 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -12,7 +12,7 @@ use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; use tokio::stream::Stream; -use crate::runtime::{DefaultExec, ExecFactory}; +use crate::runtime::{ActixExec, ExecFactory}; use crate::system::System; thread_local!( @@ -79,7 +79,7 @@ impl Arbiter { #[deprecated(since = "1.2.0", note = "Please use actix_rt::spawn instead")] pub fn spawn + 'static>(f: F) { - DefaultExec::spawn(f) + ActixExec::spawn(f) } /// Returns the current thread's arbiter's address. If no Arbiter is present, then this @@ -105,7 +105,7 @@ impl Arbiter { /// Spawn new thread and run event loop in spawned thread. /// Returns address of newly created arbiter. pub fn new() -> Arbiter { - Self::new_with::() + Self::new_with::() } pub fn new_with() -> Arbiter { diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 7078c7ad..65f0d152 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -14,7 +14,7 @@ mod system; pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; -pub use self::runtime::{DefaultExec, ExecFactory}; +pub use self::runtime::{ActixExec, ExecFactory}; pub use self::system::System; #[doc(hidden)] @@ -33,7 +33,7 @@ pub fn spawn(f: F) where F: Future + 'static, { - DefaultExec::spawn(f); + ActixExec::spawn(f); } /// Asynchronous signal handling diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index b6823376..a79ec46b 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -78,12 +78,10 @@ pub trait ExecFactory: Sized + Send + Sync + Unpin + 'static { /// /// [mod]: index.html #[derive(Copy, Clone, Debug)] -pub struct DefaultExec; +pub struct ActixExec; -pub type DefaultExecutor = (runtime::Runtime, LocalSet); - -impl ExecFactory for DefaultExec { - type Executor = DefaultExecutor; +impl ExecFactory for ActixExec { + type Executor = (runtime::Runtime, LocalSet); type Sleep = tokio::time::Delay; fn build() -> io::Result { diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index feaddc28..f3cfc1b3 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -7,7 +7,7 @@ use futures_channel::mpsc::UnboundedSender; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; use crate::runtime::ExecFactory; -use crate::DefaultExec; +use crate::ActixExec; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -29,8 +29,8 @@ impl System { /// Create new system. /// /// This method panics if it can not create tokio runtime - pub fn new>(name: T) -> SystemRunner { - Self::new_with::(name) + pub fn new>(name: T) -> SystemRunner { + Self::new_with::(name) } /// This function will start tokio runtime and will finish once the @@ -40,7 +40,7 @@ impl System { where F: FnOnce() + 'static, { - Self::run_with::(f) + Self::run_with::(f) } /// Get current running system. diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 189e9c54..d6463060 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -25,12 +25,12 @@ actix-rt = "1.1.1" actix-codec = "0.3.0" actix-utils = "2.0.0" +futures-channel = { version = "0.3.4", default-features = false } +futures-util = { version = "0.3.4", default-features = false } log = "0.4" num_cpus = "1.13" mio = "0.6.19" socket2 = "0.3" -futures-channel = { version = "0.3.4", default-features = false } -futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } slab = "0.4" # unix domain sockets @@ -38,8 +38,9 @@ slab = "0.4" mio-uds = { version = "0.6.7" } [dev-dependencies] -bytes = "0.5" -env_logger = "0.7" actix-testing = "1.0.0" async-std = { version = "1.6.5", features = ["unstable", "tokio02"] } +bytes = "0.5" +env_logger = "0.7" +futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } tokio = { version = "0.2", features = ["io-util"] } diff --git a/actix-server/examples/custom_executor.rs b/actix-server/examples/custom_executor.rs index 81807274..2aac5690 100644 --- a/actix-server/examples/custom_executor.rs +++ b/actix-server/examples/custom_executor.rs @@ -23,6 +23,7 @@ use futures_util::future::ok; use log::{error, info}; fn main() -> io::Result<()> { + // annotate executor type and block_on it actix_rt::System::new_with::("actix").block_on(async { env::set_var("RUST_LOG", "actix=trace,basic=trace"); env_logger::init(); @@ -32,14 +33,15 @@ fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8080); info!("starting server on port: {}", &addr.0); - // Bind socket address and start worker(s). By default, the server uses the number of available - // logical CPU cores as the worker count. For this reason, the closure passed to bind needs - // to return a service *factory*; so it can be created once per worker. + // annotate again as the server would want to spawn tasks on executor. Server::build_with::() .bind("echo", addr, move || { let count = Arc::clone(&count); let num2 = Arc::clone(&count); + // stream type must impl actix_server::FromStream trait for handling the stream. + // at this point stream type can be generic if you want to passing it though + // pipeline but at last a service must give it a concrete type to work with. pipeline_factory(move |mut stream: AsyncStdTcpStream| { let count = Arc::clone(&count); @@ -89,19 +91,23 @@ fn main() -> io::Result<()> { }) } +// custom executor struct AsyncStdExec; +// stream type can work on the custom executor struct AsyncStdTcpStream(async_std::net::TcpStream); +// server would pass a StdStream enum and you have to convert it to your stream type. impl FromStream for AsyncStdTcpStream { fn from_stdstream(stream: StdStream) -> std::io::Result { match stream { StdStream::Tcp(tcp) => Ok(AsyncStdTcpStream(async_std::net::TcpStream::from(tcp))), - _ => unimplemented!(), + StdStream::Uds(_) => unimplemented!(), } } } +// impl trait for custom executor so server can/block_on spawn tasks impl ExecFactory for AsyncStdExec { type Executor = (); type Sleep = Pin + Send + 'static>>; diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 60d6a98d..8f045967 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -151,7 +151,7 @@ where .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let mut accept = Accept::::new(rx, socks, workers, srv); + let mut accept = Self::new(rx, socks, workers, srv); // Start listening for incoming commands if let Err(err) = accept.poll.register( @@ -182,7 +182,7 @@ where socks: Vec<(Token, StdListener)>, workers: Vec, srv: Server, - ) -> Accept { + ) -> Self { // Create a poll instance let poll = match mio::Poll::new() { Ok(poll) => poll, diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 0c4e909f..03b15b54 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -5,12 +6,12 @@ use std::time::Duration; use std::{io, mem, net}; use actix_rt::net::TcpStream; -use actix_rt::{DefaultExec, ExecFactory, System}; +use actix_rt::{ActixExec, ExecFactory, System}; use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::oneshot; use futures_util::future::ready; -use futures_util::stream::FuturesUnordered; -use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt}; +use futures_util::stream::{FuturesUnordered, Stream}; +use futures_util::{ready, FutureExt, StreamExt}; use log::{error, info}; use socket2::{Domain, Protocol, Socket, Type}; @@ -24,7 +25,7 @@ use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::{FromStream, Token}; /// Server builder -pub struct ServerBuilder { +pub struct ServerBuilder { threads: usize, token: Token, backlog: i32, @@ -53,7 +54,7 @@ where { /// Create new Server builder instance with default tokio executor. pub fn new() -> Self { - ServerBuilder::::new_with() + ServerBuilder::::new_with() } /// Create new Server builder instance with a generic executor. diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 6004290b..0f3adacb 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -1,10 +1,13 @@ use std::collections::HashMap; +use std::future::Future; +use std::marker::PhantomData; use std::{fmt, io, net}; use actix_rt::net::TcpStream; +use actix_rt::ExecFactory; use actix_service as actix; use actix_utils::counter::CounterGuard; -use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture}; +use futures_util::future::{ok, FutureExt, LocalBoxFuture}; use log::error; use super::builder::bind_addr; @@ -12,8 +15,6 @@ use super::service::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; use super::Token; -use actix_rt::ExecFactory; -use std::marker::PhantomData; pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, @@ -276,9 +277,9 @@ where type Request = (Option, ServerMessage); type Response = (); type Error = (); - type InitError = (); type Config = (); type Service = BoxedServerService; + type InitError = (); type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c531e455..698864e0 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,13 +1,12 @@ use std::future::Future; use std::io; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_util::future::lazy; +use actix_rt::ExecFactory; use crate::server::Server; -use actix_rt::ExecFactory; -use std::marker::PhantomData; /// Different types of process signals #[allow(dead_code)] @@ -37,12 +36,13 @@ where Exec: ExecFactory, { pub(crate) fn start(srv: Server) -> io::Result<()> { - Exec::spawn(lazy(|_| { + Exec::spawn(async { #[cfg(not(unix))] { Exec::spawn(Signals { srv, stream: Box::pin(actix_rt::signal::ctrl_c()), + _exec: PhantomData, }); } #[cfg(unix)] @@ -75,7 +75,7 @@ where _exec: PhantomData, }) } - })); + }); Ok(()) } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7528ee28..310ab0a1 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,4 +1,4 @@ -use std::marker::PhantomData; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -10,7 +10,7 @@ use actix_utils::counter::Counter; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot; use futures_util::future::{join_all, LocalBoxFuture, MapOk}; -use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt}; +use futures_util::{stream::Stream, FutureExt, TryFutureExt}; use log::{error, info, trace}; use crate::accept::AcceptNotify; @@ -137,7 +137,6 @@ where factories: Vec>, state: WorkerState, shutdown_timeout: time::Duration, - _exec: PhantomData, } struct WorkerService { @@ -189,7 +188,6 @@ where services: Vec::new(), conns: conns.clone(), state: WorkerState::Unavailable(Vec::new()), - _exec: PhantomData, }); let mut fut: Vec, _>> = Vec::new(); diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 26a682eb..a114d4a5 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; -use actix_rt::DefaultExec; +use actix_rt::ActixExec; use actix_server::Server; use actix_service::fn_service; use futures_util::future::{lazy, ok}; @@ -169,11 +169,11 @@ fn test_configure() { .listen("addr3", lst) .apply(move |rt| { let num = num.clone(); - rt.service::<_, _, DefaultExec>( + rt.service::<_, _, ActixExec>( "addr1", fn_service(|_| ok::<_, ()>(())), ); - rt.service::<_, _, DefaultExec>( + rt.service::<_, _, ActixExec>( "addr3", fn_service(|_| ok::<_, ()>(())), ); diff --git a/actix-tls/examples/basic.rs b/actix-tls/examples/basic.rs index cd706958..f7603257 100644 --- a/actix-tls/examples/basic.rs +++ b/actix-tls/examples/basic.rs @@ -25,9 +25,10 @@ use std::{ }, }; +use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::pipeline_factory; -use actix_tls::rustls::Acceptor as RustlsAcceptor; +use actix_tls::rustls::{Acceptor as RustlsAcceptor, Acceptor}; use futures_util::future::ok; use log::info; use rust_tls::{ @@ -56,7 +57,7 @@ async fn main() -> io::Result<()> { .set_single_cert(cert_chain, keys.remove(0)) .unwrap(); - let tls_acceptor = RustlsAcceptor::new(tls_config); + let tls_acceptor = RustlsAcceptor::::new(tls_config); let count = Arc::new(AtomicUsize::new(0));