update actix-server

This commit is contained in:
Nikolay Kim 2019-11-12 10:53:28 +06:00
parent 46382a922a
commit 8e882f0c37
10 changed files with 222 additions and 236 deletions

View File

@ -45,12 +45,13 @@ num_cpus = "1.0"
mio = "0.6.19" mio = "0.6.19"
net2 = "0.2" net2 = "0.2"
futures = { package = "futures-preview", version = "0.3.0-alpha.18" } futures = "0.3.1"
slab = "0.4" slab = "0.4"
tokio = "0.2.0-alpha.4"
tokio-io = "0.2.0-alpha.4" tokio = "0.2.0-alpha.6"
tokio-net = { version = "0.2.0-alpha.4", features = ["signal"] } tokio-io = "0.2.0-alpha.6"
tokio-timer = "0.3.0-alpha.4" tokio-net = { version = "0.2.0-alpha.6", features = ["signal"] }
tokio-timer = "0.3.0-alpha.6"
# unix domain sockets # unix domain sockets
mio-uds = { version="0.6.7", optional = true } mio-uds = { version="0.6.7", optional = true }
@ -61,7 +62,7 @@ native-tls = { version="0.2", optional = true }
# openssl # openssl
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }
tokio-openssl = { version="0.4.0-alpha.4", optional = true } tokio-openssl = { version="0.4.0-alpha.6", optional = true }
# rustls # rustls
rustls = { version = "0.16.0", optional = true } rustls = { version = "0.16.0", optional = true }

View File

@ -3,7 +3,7 @@ use std::time::{Duration, Instant};
use std::{io, thread}; use std::{io, thread};
use actix_rt::System; use actix_rt::System;
use futures::FutureExt;
use log::{error, info}; use log::{error, info};
use slab::Slab; use slab::Slab;
use tokio_timer::delay; use tokio_timer::delay;
@ -12,7 +12,6 @@ use crate::server::Server;
use crate::socket::{SocketAddr, SocketListener, StdListener}; use crate::socket::{SocketAddr, SocketListener, StdListener};
use crate::worker::{Conn, WorkerClient}; use crate::worker::{Conn, WorkerClient};
use crate::Token; use crate::Token;
use futures::{Future, FutureExt};
pub(crate) enum Command { pub(crate) enum Command {
Pause, Pause,
@ -371,7 +370,7 @@ impl Accept {
match self.workers[self.next].send(msg) { match self.workers[self.next].send(msg) {
Ok(_) => (), Ok(_) => (),
Err(tmp) => { Err(tmp) => {
self.srv.worker_died(self.workers[self.next].idx); self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
@ -397,7 +396,7 @@ impl Accept {
return; return;
} }
Err(tmp) => { Err(tmp) => {
self.srv.worker_died(self.workers[self.next].idx); self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
@ -444,7 +443,7 @@ impl Accept {
System::current().arbiter().send( System::current().arbiter().send(
async move { async move {
delay(Instant::now() + Duration::from_millis(510)).await; delay(Instant::now() + Duration::from_millis(510)).await;
r.set_readiness(mio::Ready::readable()); let _ = r.set_readiness(mio::Ready::readable());
} }
.boxed(), .boxed(),
); );

View File

@ -1,27 +1,27 @@
use std::time::Duration; use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{io, mem, net}; use std::{io, mem, net};
use actix_rt::{spawn, Arbiter, System}; use actix_rt::{spawn, Arbiter, System};
use futures::channel::mpsc::{unbounded, UnboundedReceiver}; use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::future::ready; use futures::future::ready;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::{ready, Future, FutureExt, Poll, Stream, StreamExt}; use futures::{ready, Future, FutureExt, Stream, StreamExt};
use log::{error, info}; use log::{error, info};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_net::tcp::TcpStream; use tokio_net::tcp::TcpStream;
use tokio_timer::sleep; use tokio_timer::delay;
use crate::accept::{AcceptLoop, AcceptNotify, Command}; use crate::accept::{AcceptLoop, AcceptNotify, Command};
use crate::config::{ConfiguredService, ServiceConfig}; use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::services::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; // use crate::signals::{Signal, Signals};
use crate::socket::StdListener; use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token}; use crate::{ssl, Token};
use std::pin::Pin;
use std::task::Context;
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
@ -303,7 +303,7 @@ impl ServerBuilder {
// handle signals // handle signals
if !self.no_signals { if !self.no_signals {
Signals::start(self.server.clone()); // Signals::start(self.server.clone());
} }
// start http server actor // start http server actor
@ -342,37 +342,37 @@ impl ServerBuilder {
self.accept.send(Command::Resume); self.accept.send(Command::Resume);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Signal(sig) => { // ServerCommand::Signal(sig) => {
// Signals support // Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig { // match sig {
Signal::Int => { // Signal::Int => {
info!("SIGINT received, exiting"); // info!("SIGINT received, exiting");
self.exit = true; // self.exit = true;
self.handle_cmd(ServerCommand::Stop { // self.handle_cmd(ServerCommand::Stop {
graceful: false, // graceful: false,
completion: None, // completion: None,
}) // })
} // }
Signal::Term => { // Signal::Term => {
info!("SIGTERM received, stopping"); // info!("SIGTERM received, stopping");
self.exit = true; // self.exit = true;
self.handle_cmd(ServerCommand::Stop { // self.handle_cmd(ServerCommand::Stop {
graceful: true, // graceful: true,
completion: None, // completion: None,
}) // })
} // }
Signal::Quit => { // Signal::Quit => {
info!("SIGQUIT received, exiting"); // info!("SIGQUIT received, exiting");
self.exit = true; // self.exit = true;
self.handle_cmd(ServerCommand::Stop { // self.handle_cmd(ServerCommand::Stop {
graceful: false, // graceful: false,
completion: None, // completion: None,
}) // })
} // }
_ => (), // _ => (),
} // }
} // }
ServerCommand::Stop { ServerCommand::Stop {
graceful, graceful,
completion, completion,
@ -397,7 +397,7 @@ impl ServerBuilder {
if exit { if exit {
spawn( spawn(
async { async {
tokio_timer::sleep(Duration::from_millis(300)) delay(Instant::now() + Duration::from_millis(300))
.await; .await;
System::current().stop(); System::current().stop();
} }
@ -410,17 +410,19 @@ impl ServerBuilder {
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
spawn(sleep(Duration::from_millis(300)).then(|_| { spawn(
delay(Instant::now() + Duration::from_millis(300)).then(|_| {
System::current().stop(); System::current().stop();
ready(()) ready(())
})); }),
);
} }
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
} }
} }
ServerCommand::WorkerDied(idx) => { ServerCommand::WorkerFaulted(idx) => {
let mut found = false; let mut found = false;
for i in 0..self.workers.len() { for i in 0..self.workers.len() {
if self.workers[i].0 == idx { if self.workers[i].0 == idx {

View File

@ -1,12 +1,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::{fmt, io, net}; use std::{fmt, io, net};
use crate::counter::CounterGuard;
use actix_server_config::{Io, ServerConfig}; use actix_server_config::{Io, ServerConfig};
use actix_service::{IntoNewService, NewService, ServiceExt}; use actix_service::{Factory, IntoFactory};
use futures::future::{join_all, Future, FutureExt, LocalBoxFuture, TryFutureExt}; use futures::future::{Future, FutureExt, LocalBoxFuture};
use log::error; use log::error;
use std::pin::Pin;
use tokio_net::tcp::TcpStream; use tokio_net::tcp::TcpStream;
use super::builder::bind_addr; use super::builder::bind_addr;
@ -14,7 +12,7 @@ use super::services::{
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
}; };
use super::Token; use super::Token;
use std::process::Output; use crate::counter::CounterGuard;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) services: Vec<(String, net::TcpListener)>,
@ -115,7 +113,7 @@ impl InternalServiceFactory for ConfiguredService {
self.rt.configure(&mut rt); self.rt.configure(&mut rt);
rt.validate(); rt.validate();
let mut names = self.names.clone(); let names = self.names.clone();
// construct services // construct services
async move { async move {
@ -197,23 +195,20 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&mut self, name: &str, service: F) pub fn service<T, F>(&mut self, name: &str, service: F)
where where
F: IntoNewService<T>, F: IntoFactory<T>,
T: NewService<Config = ServerConfig, Request = Io<TcpStream>> + 'static, T: Factory<Config = ServerConfig, Request = Io<TcpStream>> + 'static,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::InitError: fmt::Debug, T::InitError: fmt::Debug,
{ {
// let name = name.to_owned(); // let name = name.to_owned();
if let Some(token) = self.names.get(name) { if let Some(token) = self.names.get(name) {
self.services.insert( self.services.insert(
token.clone(), token.clone(),
Box::new(ServiceFactory { Box::new(ServiceFactory {
inner: service.into_new_service(), inner: service.into_factory(),
}), }),
); );
} else { } else {
panic!("Unknown service: {:?}", name); panic!("Unknown service: {:?}", name);
} }
@ -229,7 +224,7 @@ impl ServiceRuntime {
} }
type BoxedNewService = Box< type BoxedNewService = Box<
dyn NewService< dyn Factory<
Request = (Option<CounterGuard>, ServerMessage), Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
@ -244,9 +239,9 @@ struct ServiceFactory<T> {
inner: T, inner: T,
} }
impl<T> NewService for ServiceFactory<T> impl<T> Factory for ServiceFactory<T>
where where
T: NewService<Config = ServerConfig, Request = Io<TcpStream>>, T: Factory<Config = ServerConfig, Request = Io<TcpStream>>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::Error: 'static, T::Error: 'static,
@ -260,14 +255,15 @@ where
type Service = BoxedServerService; type Service = BoxedServerService;
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>; type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
// Box<dyn Future<Output=Result<Vec<(Token, BoxedServerService)>, ()>>>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
let fut = self.inner.new_service(cfg); let fut = self.inner.new_service(cfg);
async move { async move {
return match fut.await { return match fut.await {
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
Err(e) => Err(()), Err(e) => {
error!("Can not construct service: {:?}", e);
Err(())
}
}; };
} }
.boxed_local() .boxed_local()

View File

@ -6,7 +6,7 @@ mod config;
mod counter; mod counter;
mod server; mod server;
mod services; mod services;
mod signals; // mod signals;
mod socket; mod socket;
pub mod ssl; pub mod ssl;
mod worker; mod worker;

View File

@ -3,14 +3,14 @@ use futures::channel::oneshot;
use futures::{Future, TryFutureExt}; use futures::{Future, TryFutureExt};
use crate::builder::ServerBuilder; use crate::builder::ServerBuilder;
use crate::signals::Signal; // use crate::signals::Signal;
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
WorkerDied(usize), WorkerFaulted(usize),
Pause(oneshot::Sender<()>), Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>), Resume(oneshot::Sender<()>),
Signal(Signal), // Signal(Signal),
/// Whether to try and shut down gracefully /// Whether to try and shut down gracefully
Stop { Stop {
graceful: bool, graceful: bool,
@ -31,12 +31,12 @@ impl Server {
ServerBuilder::default() ServerBuilder::default()
} }
pub(crate) fn signal(&self, sig: Signal) { // pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); // let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
} // }
pub(crate) fn worker_died(&self, idx: usize) { pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx)); let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
} }
/// Pause accepting incoming connections /// Pause accepting incoming connections

View File

@ -1,19 +1,18 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
use actix_server_config::{Io, ServerConfig}; use actix_server_config::{Io, ServerConfig};
use actix_service::{NewService, Service, ServiceExt}; use actix_service::{Factory, Service};
use futures::future::{err, ok, LocalBoxFuture, Ready}; use futures::future::{err, ok, LocalBoxFuture, Ready};
use futures::{Future, FutureExt, Poll, StreamExt, TryFutureExt}; use futures::{FutureExt, TryFutureExt};
use log::error; use log::error;
use super::Token; use super::Token;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
use crate::socket::{FromStream, StdStream}; use crate::socket::{FromStream, StdStream};
use std::pin::Pin;
use std::task::Context;
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
@ -26,7 +25,7 @@ pub(crate) enum ServerMessage {
} }
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type NewService: NewService<Config = ServerConfig, Request = Io<Stream>>; type NewService: Factory<Config = ServerConfig, Request = Io<Stream>>;
fn create(&self) -> Self::NewService; fn create(&self) -> Self::NewService;
} }
@ -70,18 +69,10 @@ where
type Error = (); type Error = ();
type Future = Ready<Result<(), ()>>; type Future = Ready<Result<(), ()>>;
fn poll_ready( fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self: Pin<&mut Self>, self.service.poll_ready(ctx).map_err(|_| ())
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
} }
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ())
}
*/
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
@ -93,7 +84,7 @@ where
let f = self.service.call(Io::new(stream)); let f = self.service.call(Io::new(stream));
spawn( spawn(
async move { async move {
f.await; let _ = f.await;
drop(guard); drop(guard);
} }
.boxed_local(), .boxed_local(),
@ -189,7 +180,7 @@ impl InternalServiceFactory for Box<dyn InternalServiceFactory> {
impl<F, T, I> ServiceFactory<I> for F impl<F, T, I> ServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: NewService<Config = ServerConfig, Request = Io<I>>, T: Factory<Config = ServerConfig, Request = Io<I>>,
I: FromStream, I: FromStream,
{ {
type NewService = T; type NewService = T;

View File

@ -1,17 +1,15 @@
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_rt::spawn; use actix_rt::spawn;
use futures::future::LocalBoxFuture;
use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream}; use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream};
use futures::{ use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt};
Future, FutureExt, Poll, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt, use tokio_net::signal::unix::signal;
};
use crate::server::Server; use crate::server::Server;
use actix_service::ServiceExt;
use futures::future::LocalBoxFuture;
use std::pin::Pin;
use std::task::Context;
use tokio_net::signal::unix::signal;
/// Different types of process signals /// Different types of process signals
#[derive(PartialEq, Clone, Copy, Debug)] #[derive(PartialEq, Clone, Copy, Debug)]

View File

@ -1,23 +1,22 @@
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, task, time}; use std::task::{Context, Poll};
use std::{mem, time};
use actix_rt::{spawn, Arbiter};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::{future, Future, Poll, Stream, TryFutureExt}; use futures::future::{join_all, LocalBoxFuture, MapOk};
use futures::{FutureExt, StreamExt}; use futures::{Future, FutureExt, Stream, TryFutureExt};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio_timer::{sleep, Delay}; use tokio_timer::{delay, Delay};
use crate::accept::AcceptNotify; use crate::accept::AcceptNotify;
use crate::counter::Counter; use crate::counter::Counter;
use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{SocketAddr, StdStream}; use crate::socket::{SocketAddr, StdStream};
use crate::Token; use crate::Token;
use actix_rt::spawn;
use futures::future::{LocalBoxFuture, MapOk};
use std::pin::Pin;
use std::task::Context;
pub(crate) struct WorkerCommand(Conn); pub(crate) struct WorkerCommand(Conn);
@ -167,8 +166,8 @@ impl Worker {
} }
spawn( spawn(
async { async move {
let mut res = future::join_all(fut).await; let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect(); let res: Result<Vec<_>, _> = res.into_iter().collect();
match res { match res {
Ok(services) => { Ok(services) => {
@ -177,12 +176,13 @@ impl Worker {
while token.0 >= wrk.services.len() { while token.0 >= wrk.services.len() {
wrk.services.push(None); wrk.services.push(None);
} }
wrk.services[token.0] = Some((idx, service));
} }
} }
Ok::<_, ()>(wrk);
} }
Err(e) => { Err(e) => {
//return Err(e); error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
} }
} }
} }
@ -212,8 +212,7 @@ impl Worker {
trace: bool, trace: bool,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Result<bool, (Token, usize)> { ) -> Result<bool, (Token, usize)> {
/* let mut ready = self.conns.available(cx);
let mut ready = self.conns.available();
let mut failed = None; let mut failed = None;
for (token, service) in &mut self.services.iter_mut().enumerate() { for (token, service) in &mut self.services.iter_mut().enumerate() {
if let Some(service) = service { if let Some(service) = service {
@ -226,7 +225,7 @@ impl Worker {
); );
} }
} }
Poll::NotReady => ready = false, Poll::Pending => ready = false,
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
@ -241,8 +240,7 @@ impl Worker {
Err(idx) Err(idx)
} else { } else {
Ok(ready) Ok(ready)
}*/ }
unimplemented!()
} }
} }
@ -253,7 +251,7 @@ enum WorkerState {
Restarting( Restarting(
usize, usize,
Token, Token,
Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>, Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
), ),
Shutdown(Delay, Delay, oneshot::Sender<bool>), Shutdown(Delay, Delay, oneshot::Sender<bool>),
} }
@ -261,39 +259,36 @@ enum WorkerState {
impl Future for Worker { impl Future for Worker {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
/*
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// `StopWorker` message handler // `StopWorker` message handler
if let Ok(Async::Ready(Some(StopCommand { graceful, result }))) = self.rx2.poll() { if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_next(cx)
{
self.availability.set(false); self.availability.set(false);
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = result.send(true); let _ = result.send(true);
return Ok(Async::Ready(())); return Poll::Ready(());
} else if graceful { } else if graceful {
self.shutdown(false); self.shutdown(false);
let num = num_connections(); let num = num_connections();
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)), delay(time::Instant::now() + time::Duration::from_secs(1)),
sleep(self.shutdown_timeout), delay(time::Instant::now() + self.shutdown_timeout),
result, result,
); );
} else { } else {
let _ = result.send(true); let _ = result.send(true);
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); self.shutdown(true);
let _ = result.send(false); let _ = result.send(false);
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} }
@ -301,13 +296,13 @@ impl Future for Worker {
match state { match state {
WorkerState::Unavailable(mut conns) => { WorkerState::Unavailable(mut conns) => {
match self.check_readiness(true) { match self.check_readiness(true, cx) {
Ok(true) => { Ok(true) => {
self.state = WorkerState::Available; self.state = WorkerState::Available;
// process requests from wait queue // process requests from wait queue
while let Some(msg) = conns.pop() { while let Some(msg) = conns.pop() {
match self.check_readiness(false) { match self.check_readiness(false, cx) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
@ -319,7 +314,7 @@ impl Future for Worker {
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
self.state = WorkerState::Unavailable(conns); self.state = WorkerState::Unavailable(conns);
return self.poll(); return self.poll(cx);
} }
Err((token, idx)) => { Err((token, idx)) => {
trace!( trace!(
@ -331,16 +326,16 @@ impl Future for Worker {
token, token,
self.factories[idx].create(), self.factories[idx].create(),
); );
return self.poll(); return self.poll(cx);
} }
} }
} }
self.availability.set(true); self.availability.set(true);
return self.poll(); return self.poll(cx);
} }
Ok(false) => { Ok(false) => {
self.state = WorkerState::Unavailable(conns); self.state = WorkerState::Unavailable(conns);
return Ok(Async::NotReady); return Poll::Pending;
} }
Err((token, idx)) => { Err((token, idx)) => {
trace!( trace!(
@ -349,13 +344,13 @@ impl Future for Worker {
); );
self.state = self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create()); WorkerState::Restarting(idx, token, self.factories[idx].create());
return self.poll(); return self.poll(cx);
} }
} }
} }
WorkerState::Restarting(idx, token, mut fut) => { WorkerState::Restarting(idx, token, mut fut) => {
match fut.poll() { match Pin::new(&mut fut).poll(cx) {
Ok(Async::Ready(item)) => { Poll::Ready(Ok(item)) => {
for (token, service) in item { for (token, service) in item {
trace!( trace!(
"Service {:?} has been restarted", "Service {:?} has been restarted",
@ -365,55 +360,55 @@ impl Future for Worker {
self.state = WorkerState::Unavailable(Vec::new()); self.state = WorkerState::Unavailable(Vec::new());
} }
} }
Ok(Async::NotReady) => { Poll::Ready(Err(_)) => {
self.state = WorkerState::Restarting(idx, token, fut);
return Ok(Async::NotReady);
}
Err(_) => {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
self.factories[idx].name(token) self.factories[idx].name(token)
); );
} }
Poll::Pending => {
self.state = WorkerState::Restarting(idx, token, fut);
return Poll::Pending;
} }
return self.poll(); }
return self.poll(cx);
} }
WorkerState::Shutdown(mut t1, mut t2, tx) => { WorkerState::Shutdown(mut t1, mut t2, tx) => {
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
let _ = tx.send(true); let _ = tx.send(true);
Arbiter::current().stop(); Arbiter::current().stop();
return Ok(Async::Ready(())); return Poll::Ready(());
} }
// check graceful timeout // check graceful timeout
match t2.poll().unwrap() { match Pin::new(&mut t2).poll(cx) {
Async::NotReady => (), Poll::Pending => (),
Async::Ready(_) => { Poll::Ready(_) => {
self.shutdown(true); self.shutdown(true);
let _ = tx.send(false); let _ = tx.send(false);
Arbiter::current().stop(); Arbiter::current().stop();
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} }
// sleep for 1 second and then check again // sleep for 1 second and then check again
match t1.poll().unwrap() { match Pin::new(&mut t1).poll(cx) {
Async::NotReady => (), Poll::Pending => (),
Async::Ready(_) => { Poll::Ready(_) => {
t1 = sleep(time::Duration::from_secs(1)); t1 = delay(time::Instant::now() + time::Duration::from_secs(1));
let _ = t1.poll(); let _ = Pin::new(&mut t1).poll(cx);
} }
} }
self.state = WorkerState::Shutdown(t1, t2, tx); self.state = WorkerState::Shutdown(t1, t2, tx);
return Ok(Async::NotReady); return Poll::Pending;
} }
WorkerState::Available => { WorkerState::Available => {
loop { loop {
match self.rx.poll() { match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming tcp stream // handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand(msg)))) => { Poll::Ready(Some(WorkerCommand(msg))) => {
match self.check_readiness(false) { match self.check_readiness(false, cx) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
@ -441,18 +436,17 @@ impl Future for Worker {
); );
} }
} }
return self.poll(); return self.poll(cx);
} }
Ok(Async::NotReady) => { Poll::Pending => {
self.state = WorkerState::Available; self.state = WorkerState::Available;
return Ok(Async::NotReady); return Poll::Pending;
} }
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Poll::Ready(None) => return Poll::Ready(()),
} }
} }
} }
WorkerState::None => panic!(), WorkerState::None => panic!(),
}; };
} }
*/
} }

View File

@ -1,10 +1,10 @@
use std::io::Read; use std::io::{self, Read};
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread, time}; use std::{net, thread, time};
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_server::{Io, Server, ServerConfig}; use actix_server::{Io, Server, ServerConfig};
use actix_service::{new_service_cfg, service_fn, IntoService, ServiceExt}; use actix_service::{into_service, service_fn, service_fn_config, IntoService};
use bytes::Bytes; use bytes::Bytes;
use futures::{Future, FutureExt, Sink, SinkExt}; use futures::{Future, FutureExt, Sink, SinkExt};
use net2::TcpBuilder; use net2::TcpBuilder;
@ -29,7 +29,7 @@ fn test_bind() {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let srv = Server::build() let srv = Server::build()
.bind("test", addr, move || { .bind("test", addr, move || {
new_service_cfg(move |cfg: &ServerConfig| { service_fn_config(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); assert_eq!(cfg.local_addr(), addr);
ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
}) })
@ -77,7 +77,7 @@ fn test_listen() {
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
let srv = Server::build() let srv = Server::build()
.listen("test", lst, move || { .listen("test", lst, move || {
new_service_cfg(move |cfg: &ServerConfig| { service_fn_config(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); assert_eq!(cfg.local_addr(), addr);
ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
}) })
@ -95,70 +95,75 @@ fn test_listen() {
let _ = h.join(); let _ = h.join();
} }
#[test] // #[test]
#[cfg(unix)] // #[cfg(unix)]
fn test_start() { // fn test_start() {
let addr = unused_addr(); // let addr = unused_addr();
let (tx, rx) = mpsc::channel(); // let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { // let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); // let sys = actix_rt::System::new("test");
let srv = Server::build() // let srv: Server = Server::build()
.backlog(100) // .backlog(100)
.bind("test", addr, move || { // .bind("test", addr, move || {
new_service_cfg(move |cfg: &ServerConfig| { // service_fn_config(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr); // assert_eq!(cfg.local_addr(), addr);
let serv_creator = (move |io: Io<TcpStream>| async { // let srv = into_service(
panic!("Stream"); // (|io: Io<TcpStream>| {
let mut f = Framed::new(io.into_parts().0, BytesCodec); // let t = async {
f.send(Bytes::from_static(b"test")).await.unwrap(); // let mut f = Framed::new(io.into_parts().0, BytesCodec);
Ok::<_, ()>(()) // f.send(Bytes::from_static(b"test")).await.unwrap();
}).into_service(); // Ok::<_, ()>(())
// };
// //ok::<_, ()>(())
// t
// }),
// );
ok::<_, ()>(serv_creator) // ok::<_, ()>(srv)
}) // })
}) // })
.unwrap() // .unwrap()
.start(); // .start();
let _ = tx.send((srv, actix_rt::System::current())); // let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); // let _ = sys.run();
}); // });
let (srv, sys) = rx.recv().unwrap(); // let (srv, sys) = rx.recv().unwrap();
let mut buf = [1u8; 4]; // let mut buf = [1u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap(); // let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf); // let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]); // assert_eq!(buf, b"test"[..]);
// pause // // pause
let _ = srv.pause(); // let _ = srv.pause();
thread::sleep(time::Duration::from_millis(200)); // thread::sleep(time::Duration::from_millis(200));
let mut conn = net::TcpStream::connect(addr).unwrap(); // let mut conn = net::TcpStream::connect(addr).unwrap();
conn.set_read_timeout(Some(time::Duration::from_millis(100))) // conn.set_read_timeout(Some(time::Duration::from_millis(100)))
.unwrap(); // .unwrap();
let res = conn.read_exact(&mut buf); // let res = conn.read_exact(&mut buf);
assert!(res.is_err()); // assert!(res.is_err());
// resume // // resume
let _ = srv.resume(); // let _ = srv.resume();
thread::sleep(time::Duration::from_millis(100)); // thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_ok()); // assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); // assert!(net::TcpStream::connect(addr).is_ok());
assert!(net::TcpStream::connect(addr).is_ok()); // assert!(net::TcpStream::connect(addr).is_ok());
let mut buf = [0u8; 4]; // let mut buf = [0u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap(); // let mut conn = net::TcpStream::connect(addr).unwrap();
let _ = conn.read_exact(&mut buf); // let _ = conn.read_exact(&mut buf);
assert_eq!(buf, b"test"[..]); // assert_eq!(buf, b"test"[..]);
// stop // // stop
let _ = srv.stop(false); // let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100)); // thread::sleep(time::Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); // assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(time::Duration::from_millis(100)); // thread::sleep(time::Duration::from_millis(100));
let _ = sys.stop(); // let _ = sys.stop();
let _ = h.join(); // let _ = h.join();
} // }