This commit is contained in:
Rob Ede 2021-11-01 03:21:11 +00:00 committed by GitHub
commit fadc4e1748
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 529 additions and 266 deletions

View File

@ -1,8 +1,14 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Server` to `ServerHandle`. [#403]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
* Remove wrapper `service::ServiceFactory` trait. [#403]
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
* Minimum supported Rust version (MSRV) is now 1.52. * Minimum supported Rust version (MSRV) is now 1.52.
[#403]: https://github.com/actix/actix-net/pull/403
## 2.0.0-beta.6 - 2021-10-11 ## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374] * Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]

View File

@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -0,0 +1,33 @@
use std::io;
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_factory, fn_service};
use log::info;
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
.init();
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
Server::build()
.bind(
"startup-fail",
addr,
fn_factory(|| async move {
if 1 > 2 {
Ok(fn_service(move |mut _stream: TcpStream| async move {
Ok::<u32, u32>(0)
}))
} else {
Err(42)
}
}),
)?
.workers(2)
.run()
.await
}

View File

@ -19,9 +19,8 @@ use std::{
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _}; use actix_service::{fn_factory, fn_service, ServiceExt as _};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -39,17 +38,26 @@ async fn main() -> io::Result<()> {
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs // logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker. // to return a service *factory*; so it can be created once per worker.
Server::build() Server::build()
.bind("echo", addr, move || { .bind("echo", addr, {
let count = Arc::clone(&count); fn_factory::<_, (), _, _, _, _>(move || {
let num2 = Arc::clone(&count);
fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count); let count = Arc::clone(&count);
async move { async move {
let num = count.fetch_add(1, Ordering::SeqCst); let count = Arc::clone(&count);
let num = num + 1; let count2 = Arc::clone(&count);
let svc = fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
let num = count.fetch_add(1, Ordering::SeqCst) + 1;
info!(
"[{}] accepting connection from: {}",
num,
stream.peer_addr().unwrap()
);
async move {
let mut size = 0; let mut size = 0;
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
@ -79,12 +87,16 @@ async fn main() -> io::Result<()> {
}) })
.map_err(|err| error!("Service Error: {:?}", err)) .map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| { .and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst); let num = count2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size); info!("[{}] total bytes read: {}", num, size);
ok(size) async move { Ok(size) }
});
Ok::<_, ()>(svc.clone())
}
}) })
})? })?
.workers(1) .workers(2)
.run() .run()
.await .await
} }

View File

@ -8,7 +8,7 @@ use actix_rt::{
use log::{debug, error, info}; use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::ServerHandle;
use crate::socket::MioListener; use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
@ -30,13 +30,13 @@ struct ServerSocketInfo {
/// ///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. /// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop { pub(crate) struct AcceptLoop {
srv: Option<Server>, srv: Option<ServerHandle>,
poll: Option<Poll>, poll: Option<Poll>,
waker: WakerQueue, waker: WakerQueue,
} }
impl AcceptLoop { impl AcceptLoop {
pub fn new(srv: Server) -> Self { pub fn new(srv: ServerHandle) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry()) let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
@ -74,7 +74,7 @@ struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: ServerHandle,
next: usize, next: usize,
avail: Availability, avail: Availability,
paused: bool, paused: bool,
@ -153,7 +153,7 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(usize, MioListener)>, socks: Vec<(usize, MioListener)>,
srv: Server, srv: ServerHandle,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
@ -176,7 +176,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(usize, MioListener)>, socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) { ) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks let sockets = socks
.into_iter() .into_iter()

View File

@ -1,4 +1,5 @@
use std::{ use std::{
fmt,
future::Future, future::Future,
io, mem, io, mem,
pin::Pin, pin::Pin,
@ -7,21 +8,25 @@ use std::{
}; };
use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_service::ServiceFactory;
use log::{error, info}; use log::{error, info};
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver}, mpsc::{unbounded_channel, UnboundedReceiver},
oneshot, oneshot,
}; };
use crate::accept::AcceptLoop; use crate::{
use crate::join_all; accept::AcceptLoop,
use crate::server::{Server, ServerCommand}; join_all,
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; server::{ServerCommand, ServerHandle},
use crate::signals::{Signal, Signals}; service::{ServerServiceFactory, StreamNewService},
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; signals::{Signal, Signals},
use crate::socket::{MioTcpListener, MioTcpSocket}; socket::{
use crate::waker_queue::{WakerInterest, WakerQueue}; MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; },
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
};
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
@ -29,13 +34,13 @@ pub struct ServerBuilder {
token: usize, token: usize,
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>, handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn ServerServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>, sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
server: Server, server: ServerHandle,
notify: Vec<oneshot::Sender<()>>, notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig, worker_config: ServerWorkerConfig,
} }
@ -50,7 +55,7 @@ impl ServerBuilder {
/// Create new Server builder instance /// Create new Server builder instance
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let server = Server::new(tx); let server = ServerHandle::new(tx);
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
@ -114,15 +119,21 @@ impl ServerBuilder {
/// Sets the maximum per-worker number of concurrent connections. /// Sets the maximum per-worker number of concurrent connections.
/// ///
/// All socket listeners will stop accepting connections when this limit is /// All socket listeners will stop accepting connections when this limit is reached for
/// reached for each worker. /// each worker.
/// ///
/// By default max connections is set to a 25k per worker. /// By default max connections is set to a 25,600 per worker.
pub fn maxconn(mut self, num: usize) -> Self { pub fn max_concurrent_connections(mut self, num: usize) -> Self {
self.worker_config.max_concurrent_connections(num); self.worker_config.max_concurrent_connections(num);
self self
} }
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
pub fn maxconn(self, num: usize) -> Self {
self.max_concurrent_connections(num)
}
/// Stop Actix system. /// Stop Actix system.
pub fn system_exit(mut self) -> Self { pub fn system_exit(mut self) -> Self {
self.exit = true; self.exit = true;
@ -147,91 +158,61 @@ impl ServerBuilder {
self self
} }
/// Add new service to the server. /// Bind server to socket addresses.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> ///
/// Binds to all network interface addresses that resolve from the `addr` argument.
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
/// interfaces at the same time by passing a list of socket addresses.
///
/// This fails only if all addresses fail to bind.
pub fn bind<F, U, InitErr>(
mut self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
U: ToSocketAddrs, U: ToSocketAddrs,
{ {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets { for lst in sockets {
let token = self.next_token(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory.clone(), factory.clone(),
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); .push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
/// Add new unix domain service to the server. /// Bind server to existing TCP listener.
#[cfg(unix)] ///
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self> /// Useful when running as a systemd service and a socket FD can be passed to the process.
where pub fn listen<F, InitErr>(
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
#[cfg(unix)]
pub fn listen_uds<F, N: AsRef<str>>(
mut self, mut self,
name: N, name: impl AsRef<str>,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener, lst: StdTcpListener,
factory: F, factory: F,
) -> io::Result<Self> ) -> io::Result<Self>
where where
F: ServiceFactory<TcpStream>, F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{ {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let addr = lst.local_addr()?;
let token = self.next_token(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@ -246,7 +227,7 @@ impl ServerBuilder {
} }
/// Starts processing incoming connections and return server controller. /// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server { pub fn run(mut self) -> ServerHandle {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Server should have at least one bound socket"); panic!("Server should have at least one bound socket");
} else { } else {
@ -284,7 +265,7 @@ impl ServerBuilder {
Signals::start(self.server.clone()); Signals::start(self.server.clone());
} }
// start http server actor // start http server
let server = self.server.clone(); let server = self.server.clone();
rt::spawn(self); rt::spawn(self);
server server
@ -423,6 +404,74 @@ impl ServerBuilder {
} }
} }
/// Unix Domain Socket (UDS) support.
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, InitErr>(
self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
U: AsRef<std::path::Path>,
InitErr: fmt::Debug + Send + 'static,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
///
/// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen_uds<F, InitErr>(
mut self,
name: impl AsRef<str>,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
InitErr: fmt::Debug + Send + 'static,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
}
impl Future for ServerBuilder { impl Future for ServerBuilder {
type Output = (); type Output = ();
@ -441,30 +490,29 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
backlog: u32, backlog: u32,
) -> io::Result<Vec<MioTcpListener>> { ) -> io::Result<Vec<MioTcpListener>> {
let mut err = None; let mut err = None;
let mut succ = false; let mut success = false;
let mut sockets = Vec::new(); let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? { for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) { match create_tcp_listener(addr, backlog) {
Ok(lst) => { Ok(lst) => {
succ = true; success = true;
sockets.push(lst); sockets.push(lst);
} }
Err(e) => err = Some(e), Err(e) => err = Some(e),
} }
} }
if !succ { if success {
if let Some(e) = err.take() { Ok(sockets)
Err(e) } else if let Some(err) = err.take() {
Err(err)
} else { } else {
Err(io::Error::new( Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"Can not bind to address.", "Can not bind to socket address",
)) ))
} }
} else {
Ok(sockets)
}
} }
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> { fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {

View File

@ -15,8 +15,7 @@ mod waker_queue;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::server::Server; pub use self::server::{Server, ServerHandle};
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer; pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]

View File

@ -6,8 +6,18 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::builder::ServerBuilder; use crate::{signals::Signal, ServerBuilder};
use crate::signals::Signal;
#[derive(Debug)]
#[non_exhaustive]
pub struct Server;
impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
@ -32,19 +42,14 @@ pub(crate) enum ServerCommand {
/// ///
/// A graceful shutdown will wait for all workers to stop first. /// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)] #[derive(Debug)]
pub struct Server( pub struct ServerHandle(
UnboundedSender<ServerCommand>, UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>, Option<oneshot::Receiver<()>>,
); );
impl Server { impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self { pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None) ServerHandle(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
} }
pub(crate) fn signal(&self, sig: Signal) { pub(crate) fn signal(&self, sig: Signal) {
@ -91,13 +96,13 @@ impl Server {
} }
} }
impl Clone for Server { impl Clone for ServerHandle {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self(self.0.clone(), None) Self(self.0.clone(), None)
} }
} }
impl Future for Server { impl Future for ServerHandle {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -1,26 +1,26 @@
use std::marker::PhantomData; use std::{
use std::net::SocketAddr; fmt,
use std::task::{Context, Poll}; marker::PhantomData,
net::SocketAddr,
task::{Context, Poll},
};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ready, Ready}; use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::{
use crate::worker::WorkerCounterGuard; socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
};
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub(crate) trait ServerServiceFactory: Send {
type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory;
}
pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: usize) -> &str; fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn ServerServiceFactory>;
/// Initialize Mio stream handler service and return it with its service token.
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
} }
@ -56,7 +56,7 @@ where
{ {
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Ready<Result<(), ()>>; type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(|_| ()) self.service.poll_ready(ctx).map_err(|_| ())
@ -72,25 +72,26 @@ where
}); });
Ok(()) Ok(())
} }
Err(e) => { Err(err) => {
error!("Can not convert to an async tcp stream: {}", e); error!("Can not convert Mio stream to an async TCP stream: {}", err);
Err(()) Err(())
} }
}) })
} }
} }
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F, Io, InitErr> {
name: String, name: String,
inner: F, inner: F,
token: usize, token: usize,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>, _t: PhantomData<(Io, InitErr)>,
} }
impl<F, Io> StreamNewService<F, Io> impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
pub(crate) fn create( pub(crate) fn create(
@ -98,7 +99,7 @@ where
token: usize, token: usize,
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> { ) -> Box<dyn ServerServiceFactory> {
Box::new(Self { Box::new(Self {
name, name,
token, token,
@ -109,16 +110,17 @@ where
} }
} }
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io> impl<F, Io, InitErr> ServerServiceFactory for StreamNewService<F, Io, InitErr>
where where
F: ServiceFactory<Io>, F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: usize) -> &str { fn name(&self, _: usize) -> &str {
&self.name &self.name
} }
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn ServerServiceFactory> {
Box::new(Self { Box::new(Self {
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
@ -130,28 +132,18 @@ where
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token; let token = self.token;
let fut = self.inner.create().new_service(()); let fut = self.inner.new_service(());
Box::pin(async move { Box::pin(async move {
match fut.await { match fut.await {
Ok(inner) => { Ok(svc) => {
let service = Box::new(StreamService::new(inner)) as _; let service = Box::new(StreamService::new(svc)) as _;
Ok((token, service)) Ok((token, service))
} }
Err(_) => Err(()), Err(err) => {
error!("{:?}", err);
Err(())
}
} }
}) })
} }
} }
impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>,
I: FromStream,
{
type Factory = T;
fn create(&self) -> T {
(self)()
}
}

View File

@ -2,7 +2,7 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::server::Server; use crate::server::ServerHandle;
/// Types of process signals. /// Types of process signals.
#[allow(dead_code)] #[allow(dead_code)]
@ -20,7 +20,7 @@ pub(crate) enum Signal {
/// Process signal listener. /// Process signal listener.
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server, srv: ServerHandle,
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
@ -31,7 +31,7 @@ pub(crate) struct Signals {
impl Signals { impl Signals {
/// Spawns a signal listening future that is able to send commands to the `Server`. /// Spawns a signal listening future that is able to send commands to the `Server`.
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: ServerHandle) {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
actix_rt::spawn(Signals { actix_rt::spawn(Signals {

View File

@ -1,9 +1,9 @@
use std::sync::mpsc; use std::{fmt, net, sync::mpsc, thread};
use std::{net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use actix_service::ServiceFactory;
use crate::{Server, ServerBuilder, ServiceFactory}; use crate::{Server, ServerBuilder};
/// A testing server. /// A testing server.
/// ///
@ -12,13 +12,13 @@ use crate::{Server, ServerBuilder, ServiceFactory};
/// ///
/// # Examples /// # Examples
/// ``` /// ```
/// use actix_service::fn_service;
/// use actix_server::TestServer; /// use actix_server::TestServer;
/// use actix_service::fn_service;
/// ///
/// #[actix_rt::main] /// #[actix_rt::main]
/// async fn main() { /// async fn main() {
/// let srv = TestServer::with(|| fn_service( /// let srv = TestServer::with(fn_service(|sock|
/// |sock| async move { /// async move {
/// println!("New connection: {:?}", sock); /// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(()) /// Ok::<_, ()>(())
/// } /// }
@ -27,9 +27,10 @@ use crate::{Server, ServerBuilder, ServiceFactory};
/// println!("SOCKET: {:?}", srv.connect()); /// println!("SOCKET: {:?}", srv.connect());
/// } /// }
/// ``` /// ```
#[non_exhaustive]
pub struct TestServer; pub struct TestServer;
/// Test server runtime /// Test server runtime.
pub struct TestServerRuntime { pub struct TestServerRuntime {
addr: net::SocketAddr, addr: net::SocketAddr,
host: String, host: String,
@ -38,7 +39,7 @@ pub struct TestServerRuntime {
} }
impl TestServer { impl TestServer {
/// Start new server with server builder. /// Start new server using server builder.
pub fn start<F>(mut factory: F) -> TestServerRuntime pub fn start<F>(mut factory: F) -> TestServerRuntime
where where
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static, F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
@ -63,8 +64,12 @@ impl TestServer {
} }
} }
/// Start new test server with application factory. /// Start new test server with default settings using application factory.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn with<F, InitErr>(factory: F) -> TestServerRuntime
where
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
// run server in separate thread // run server in separate thread

View File

@ -24,10 +24,12 @@ use tokio::sync::{
}; };
use crate::join_all; use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, ServerServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop { pub(crate) struct Stop {
@ -196,7 +198,7 @@ impl WorkerHandleServer {
/// Service worker. /// Service worker.
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field. // UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping. // It must be dropped as soon as ServerWorker dropping.
@ -204,7 +206,7 @@ pub(crate) struct ServerWorker {
rx2: UnboundedReceiver<Stop>, rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter, counter: WorkerCounter,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn ServerServiceFactory>]>,
state: WorkerState, state: WorkerState,
shutdown_timeout: Duration, shutdown_timeout: Duration,
} }
@ -244,10 +246,11 @@ impl Default for ServerWorkerConfig {
fn default() -> Self { fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime. // 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self { Self {
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: DEFAULT_SHUTDOWN_DURATION,
max_blocking_threads, max_blocking_threads,
max_concurrent_connections: 25600, max_concurrent_connections: 25_600,
} }
} }
} }
@ -269,7 +272,7 @@ impl ServerWorkerConfig {
impl ServerWorker { impl ServerWorker {
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn ServerServiceFactory>>,
waker_queue: WakerQueue, waker_queue: WakerQueue,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
@ -314,6 +317,7 @@ impl ServerWorker {
.await .await
.into_iter() .into_iter()
.collect::<Result<Vec<_>, _>>(); .collect::<Result<Vec<_>, _>>();
let services = match res { let services = match res {
Ok(res) => res Ok(res) => res
.into_iter() .into_iter()
@ -327,8 +331,9 @@ impl ServerWorker {
services services
}) })
.into_boxed_slice(), .into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e); Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop(); Arbiter::current().stop();
return; return;
} }

View File

@ -25,9 +25,7 @@ fn test_bind() {
let srv = Server::build() let srv = Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))?
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -56,9 +54,7 @@ fn test_listen() {
let srv = Server::build() let srv = Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || { .listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))?
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -94,13 +90,15 @@ fn test_start() {
let srv = Server::build() let srv = Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind(
"test",
addr,
fn_service(|io: TcpStream| async move { fn_service(|io: TcpStream| async move {
let mut f = Framed::new(io, BytesCodec); let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap(); f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(()) Ok::<_, ()>(())
}) }),
})? )?
.run(); .run();
let _ = tx.send((srv.clone(), actix_rt::System::current())); let _ = tx.send((srv.clone(), actix_rt::System::current()));
@ -170,10 +168,10 @@ async fn test_max_concurrent_connections() {
// Set a relative higher backlog. // Set a relative higher backlog.
.backlog(12) .backlog(12)
// max connection for a worker is 3. // max connection for a worker is 3.
.maxconn(max_conn) .max_concurrent_connections(max_conn)
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, {
let counter = counter.clone(); let counter = counter.clone();
fn_service(move |_io: TcpStream| { fn_service(move |_io: TcpStream| {
let counter = counter.clone(); let counter = counter.clone();
@ -263,14 +261,14 @@ async fn test_service_restart() {
let server = Server::build() let server = Server::build()
.backlog(1) .backlog(1)
.disable_signals() .disable_signals()
.bind("addr1", addr1, move || { .bind("addr1", addr1, {
let num = num.clone(); let num = num.clone();
fn_factory(move || { fn_factory(move || {
let num = num.clone(); let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) } async move { Ok::<_, ()>(TestService(num)) }
}) })
})? })?
.bind("addr2", addr2, move || { .bind("addr2", addr2, {
let num2 = num2.clone(); let num2 = num2.clone();
fn_factory(move || { fn_factory(move || {
let num2 = num2.clone(); let num2 = num2.clone();
@ -319,6 +317,7 @@ async fn worker_restart() {
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Clone)]
struct TestServiceFactory(Arc<AtomicUsize>); struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory { impl ServiceFactory<TcpStream> for TestServiceFactory {
@ -381,7 +380,7 @@ async fn worker_restart() {
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let server = Server::build() let server = Server::build()
.disable_signals() .disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))? .bind("addr", addr, TestServiceFactory(counter.clone()))?
.workers(2) .workers(2)
.run(); .run();

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403]
[#403]: https://github.com/actix/actix-net/pull/403
## 2.0.1 - 2021-10-11 ## 2.0.1 - 2021-10-11

View File

@ -14,7 +14,7 @@ use super::{Service, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end of another service /// Service for the `and_then` combinator, chaining a computation onto the end of another service
/// which completes successfully. /// which completes successfully.
/// ///
/// This is created by the `Pipeline::and_then` method. /// Created by the `.and_then()` combinator.
pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>); pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
impl<A, B, Req> AndThenService<A, B, Req> { impl<A, B, Req> AndThenService<A, B, Req> {
@ -116,7 +116,7 @@ where
} }
} }
/// `.and_then()` service factory combinator /// Service factory created by the `.and_then()` combinator.
pub struct AndThenServiceFactory<A, B, Req> pub struct AndThenServiceFactory<A, B, Req>
where where
A: ServiceFactory<Req>, A: ServiceFactory<Req>,

View File

@ -63,7 +63,7 @@ where
} }
} }
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to NewService\ /// Convert `Fn(Config, &Server) -> Future<Service>` fn to ServiceFactory.
struct ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err> struct ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
where where
S1: Service<Req>, S1: Service<Req>,

View File

@ -44,7 +44,7 @@ pub trait ServiceExt<Req>: Service<Req> {
/// Call another service after call to this one has resolved successfully. /// Call another service after call to this one has resolved successfully.
/// ///
/// This function can be used to chain two services together and ensure that the second service /// This function can be used to chain two services together and ensure that the second service
/// isn't called until call to the fist service have finished. Result of the call to the first /// isn't called until call to the fist service has resolved. Result of the call to the first
/// service is used as an input parameter for the second service's call. /// service is used as an input parameter for the second service's call.
/// ///
/// Note that this function consumes the receiving service and returns a wrapped version of it. /// Note that this function consumes the receiving service and returns a wrapped version of it.

View File

@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData};
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
/// Create `ServiceFactory` for function that can act as a `Service` /// Create `ServiceFactory` for function that can act as a `Service`
// TODO: remove unnecessary Cfg type param
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>( pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
f: F, f: F,
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg> ) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
@ -48,6 +49,7 @@ where
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
// TODO: remove unnecessary Cfg type param
pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>( pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
f: F, f: F,
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err> ) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
@ -160,7 +162,7 @@ where
Fut: Future<Output = Result<Res, Err>>, Fut: Future<Output = Result<Res, Err>>,
{ {
f: F, f: F,
_t: PhantomData<(Req, Cfg)>, _t: PhantomData<fn(Cfg, Req)>,
} }
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg> impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
@ -237,7 +239,7 @@ where
Srv: Service<Req>, Srv: Service<Req>,
{ {
f: F, f: F,
_t: PhantomData<(Fut, Cfg, Req, Srv, Err)>, _t: PhantomData<fn(Cfg, Req)>,
} }
impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err> impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err>
@ -293,7 +295,7 @@ where
Fut: Future<Output = Result<Srv, Err>>, Fut: Future<Output = Result<Srv, Err>>,
{ {
f: F, f: F,
_t: PhantomData<(Cfg, Req)>, _t: PhantomData<fn(Cfg, Req)>,
} }
impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err> impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
@ -353,10 +355,11 @@ where
mod tests { mod tests {
use core::task::Poll; use core::task::Poll;
use alloc::rc::Rc;
use futures_util::future::lazy; use futures_util::future::lazy;
use super::*; use super::*;
use crate::{ok, Service, ServiceFactory}; use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt};
#[actix_rt::test] #[actix_rt::test]
async fn test_fn_service() { async fn test_fn_service() {
@ -391,4 +394,142 @@ mod tests {
assert!(res.is_ok()); assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", 1)); assert_eq!(res.unwrap(), ("srv", 1));
} }
// these three properties of a service factory are usually important
fn is_static<T: 'static>(_t: &T) {}
fn impls_clone<T: Clone>(_t: &T) {}
fn impls_send<T: Send>(_t: &T) {}
#[actix_rt::test]
async fn test_fn_factory_impl_send() {
let svc_fac = fn_factory_with_config(|cfg: usize| {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Cfg type is explicitly !Send
let svc_fac = fn_factory_with_config(|cfg: Rc<usize>| {
let cfg = Rc::clone(&cfg);
ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg))))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv")))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Req type is explicitly !Send
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv")))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Service type is explicitly !Send
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| {
ok::<_, ()>("srv")
})))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
}
#[actix_rt::test]
async fn test_service_combinators_impls() {
#[derive(Clone)]
struct Ident;
impl<T: 'static> Service<T> for Ident {
type Response = T;
type Error = ();
type Future = Ready<Result<Self::Response, Self::Error>>;
crate::always_ready!();
fn call(&self, req: T) -> Self::Future {
ok(req)
}
}
let svc = Ident;
is_static(&svc);
impls_clone(&svc);
impls_send(&svc);
let svc = ServiceExt::map(Ident, core::convert::identity);
impls_send(&svc);
svc.call(()).await.unwrap();
let svc = ServiceExt::map_err(Ident, core::convert::identity);
impls_send(&svc);
svc.call(()).await.unwrap();
let svc = ServiceExt::and_then(Ident, Ident);
// impls_send(&svc); // fails to compile :(
svc.call(()).await.unwrap();
// let svc = ServiceExt::and_then_send(Ident, Ident);
// impls_send(&svc);
// svc.call(()).await.unwrap();
}
#[actix_rt::test]
async fn test_factory_combinators_impls() {
#[derive(Clone)]
struct Ident;
impl<T: 'static> ServiceFactory<T> for Ident {
type Response = T;
type Error = ();
type Config = ();
// explicitly !Send result service
type Service = boxed::RcService<T, Self::Response, Self::Error>;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
ok(boxed::rc_service(fn_service(ok)))
}
}
let svc_fac = Ident;
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::and_then(Ident, Ident);
// impls_send(&svc_fac); // fails to compile :(
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
// let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident);
// impls_send(&svc_fac);
// let svc = svc_fac.new_service(()).await.unwrap();
// svc.call(()).await.unwrap();
}
} }

View File

@ -103,7 +103,7 @@ where
} }
} }
/// `MapNewService` new service combinator /// `MapServiceFactory` new service combinator.
pub struct MapServiceFactory<A, F, Req, Res> { pub struct MapServiceFactory<A, F, Req, Res> {
a: A, a: A,
f: F, f: F,

View File

@ -238,8 +238,7 @@ where
} }
} }
/// Map this service's output to a different type, returning a new service /// Map this service's output to a different type, returning a new service.
/// of the resulting type.
pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<SF, F, Req, R>, Req> pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<SF, F, Req, R>, Req>
where where
Self: Sized, Self: Sized,
@ -251,7 +250,7 @@ where
} }
} }
/// Map this service's error to a different error, returning a new service. /// Map this service's error to a different type, returning a new service.
pub fn map_err<F, E>( pub fn map_err<F, E>(
self, self,
f: F, f: F,
@ -266,7 +265,7 @@ where
} }
} }
/// Map this factory's init error to a different error, returning a new service. /// Map this factory's init error to a different type, returning a new service.
pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<SF, F, Req, E>, Req> pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<SF, F, Req, E>, Req>
where where
Self: Sized, Self: Sized,

View File

@ -31,21 +31,24 @@ use std::{
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::ServiceFactoryExt as _; use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory};
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok; use futures_util::future::ok;
use log::info; use log::info;
use rustls::{server::ServerConfig, Certificate, PrivateKey}; use rustls::{server::ServerConfig, Certificate, PrivateKey};
use rustls_pemfile::{certs, rsa_private_keys}; use rustls_pemfile::{certs, rsa_private_keys};
const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"];
const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"];
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info"); env::set_var("RUST_LOG", "info");
env_logger::init(); env_logger::init();
// Load TLS key and cert files // Load TLS key and cert files
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap()); let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap());
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap()); let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap());
let cert_chain = certs(cert_file) let cert_chain = certs(cert_file)
.unwrap() .unwrap()
@ -68,17 +71,33 @@ async fn main() -> io::Result<()> {
info!("starting server on port: {}", &addr.0); info!("starting server on port: {}", &addr.0);
Server::build() Server::build()
.bind("tls-example", addr, move || { .bind("tls-example", addr, {
let count = Arc::clone(&count); let count = Arc::clone(&count);
// Set up TLS service factory // Set up TLS service factory
tls_acceptor // note: moving rustls acceptor into fn_factory scope
.clone() fn_factory(move || {
// manually call new_service so that and_then can be used from ServiceExt
// type annotation for inner stream type is required
let svc = <RustlsAcceptor as ServiceFactory<TcpStream>>::new_service(
&tls_acceptor,
(),
);
let count = Arc::clone(&count);
async move {
let svc = svc
.await?
.map_err(|err| println!("Rustls error: {:?}", err)) .map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| { .and_then(fn_service(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed); let num = count.fetch_add(1, Ordering::Relaxed) + 1;
info!("[{}] Got TLS connection: {:?}", num, &*stream); info!("[{}] Got TLS connection: {:?}", num, &*stream);
ok(()) ok(())
}));
Ok::<_, ()>(svc)
}
}) })
})? })?
.workers(1) .workers(1)

View File

@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect};
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
#[actix_rt::test] #[actix_rt::test]
async fn test_string() { async fn test_string() {
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@ -34,7 +34,7 @@ async fn test_string() {
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
#[actix_rt::test] #[actix_rt::test]
async fn test_rustls_string() { async fn test_rustls_string() {
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@ -50,13 +50,11 @@ async fn test_rustls_string() {
#[actix_rt::test] #[actix_rt::test]
async fn test_static_str() { async fn test_static_str() {
let srv = TestServer::with(|| { let srv = TestServer::with(fn_service(|io: TcpStream| async {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
}) }));
});
let conn = actix_connect::default_connector(); let conn = actix_connect::default_connector();
@ -75,13 +73,11 @@ async fn test_static_str() {
#[actix_rt::test] #[actix_rt::test]
async fn test_new_service() { async fn test_new_service() {
let srv = TestServer::with(|| { let srv = TestServer::with(fn_service(|io: TcpStream| async {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
}) }));
});
let factory = actix_connect::default_connector_factory(); let factory = actix_connect::default_connector_factory();
@ -98,7 +94,7 @@ async fn test_new_service() {
async fn test_openssl_uri() { async fn test_openssl_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@ -117,7 +113,7 @@ async fn test_openssl_uri() {
async fn test_rustls_uri() { async fn test_rustls_uri() {
use std::convert::TryFrom; use std::convert::TryFrom;
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
@ -133,7 +129,7 @@ async fn test_rustls_uri() {
#[actix_rt::test] #[actix_rt::test]
async fn test_local_addr() { async fn test_local_addr() {
let srv = TestServer::with(|| { let srv = TestServer::with({
fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;

View File

@ -38,8 +38,9 @@ async fn custom_resolver() {
async fn custom_resolver_connect() { async fn custom_resolver_connect() {
use trust_dns_resolver::TokioAsyncResolver; use trust_dns_resolver::TokioAsyncResolver;
let srv = let srv = TestServer::with(fn_service(|_io: TcpStream| async {
TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) })); Ok::<_, io::Error>(())
}));
struct MyResolver { struct MyResolver {
trust_dns: TokioAsyncResolver, trust_dns: TokioAsyncResolver,