clean up actix-server code

This commit is contained in:
fakeshadow 2020-10-20 21:03:14 +08:00
parent 05d8551066
commit 0df09dc81d
8 changed files with 147 additions and 178 deletions

View File

@ -2,7 +2,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io};
use bytes::{Buf, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use futures_core::{ready, Stream};
use futures_sink::Sink;
use pin_project::pin_project;
@ -222,15 +222,19 @@ impl<T, U> Framed<T, U> {
}
// FixMe: Is this the right way to do it for now?
let mut buf = ReadBuf::new(&mut this.read_buf);
let cnt = match this.io.poll_read(cx, &mut buf) {
let mut read = ReadBuf::uninit(this.read_buf.bytes_mut());
let cnt = match this.io.poll_read(cx, &mut read) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(())) => buf.filled().len(),
Poll::Ready(Ok(())) => read.filled().len(),
};
if cnt == 0 {
this.flags.insert(Flags::EOF);
} else {
unsafe {
this.read_buf.advance_mut(cnt);
}
}
this.flags.insert(Flags::READABLE);
}

View File

@ -44,49 +44,49 @@ async fn test_rustls_string() {
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[actix_rt::test]
async fn test_static_str() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
// #[actix_rt::test]
// async fn test_static_str() {
// let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(())
// })
// });
//
// let resolver = actix_connect::start_default_resolver().await.unwrap();
// let mut conn = actix_connect::new_connector(resolver.clone());
//
// let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr());
//
// let connect = Connect::new(srv.host().to_owned());
// let mut conn = actix_connect::new_connector(resolver);
// let con = conn.call(connect).await;
// assert!(con.is_err());
// }
let resolver = actix_connect::start_default_resolver().await.unwrap();
let mut conn = actix_connect::new_connector(resolver.clone());
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
let connect = Connect::new(srv.host().to_owned());
let mut conn = actix_connect::new_connector(resolver);
let con = conn.call(connect).await;
assert!(con.is_err());
}
#[actix_rt::test]
async fn test_new_service() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
let resolver =
actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default())
.await
.unwrap();
let factory = actix_connect::new_connector_factory(resolver);
let mut conn = factory.new_service(()).await.unwrap();
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
// #[actix_rt::test]
// async fn test_new_service() {
// let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(())
// })
// });
//
// let resolver =
// actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default())
// .await
// .unwrap();
//
// let factory = actix_connect::new_connector_factory(resolver);
//
// let mut conn = factory.new_service(()).await.unwrap();
// let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr());
// }
#[cfg(all(feature = "openssl", feature = "uri"))]
#[actix_rt::test]

View File

@ -121,9 +121,9 @@ fn non_static_block_on() {
let sys = actix_rt::System::new("borrow some");
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str);
});
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str);
});
let rt = actix_rt::Runtime::new().unwrap();
@ -135,5 +135,6 @@ fn non_static_block_on() {
actix_rt::System::run(|| {
assert_eq!("test_str", str);
actix_rt::System::current().stop();
} ).unwrap();
}
})
.unwrap();
}

View File

@ -172,76 +172,40 @@ impl Accept {
let token = event.token();
match token {
// This is a loop because interests for command were a loop that would try to
// drain the command channel. We break at first iter with other kind interests.
// drain the command channel.
WAKER_TOKEN => 'waker: loop {
match self.waker.pop() {
Ok(i) => {
match i {
WakerInterest::Pause => {
for (_, info) in sockets.iter_mut() {
if let Err(err) =
self.poll.registry().deregister(&mut info.sock)
{
error!(
"Can not deregister server socket {}",
err
);
} else {
info!(
"Paused accepting connections on {}",
info.addr
);
}
}
}
WakerInterest::Resume => {
for (token, info) in sockets.iter_mut() {
if let Err(err) = self.register(token, info) {
error!(
"Can not resume socket accept process: {}",
err
);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
WakerInterest::Stop => {
for (_, info) in sockets.iter_mut() {
let _ =
self.poll.registry().deregister(&mut info.sock);
}
return;
}
WakerInterest::Worker(worker) => {
self.backpressure(&mut sockets, false);
self.workers.push(worker);
}
// timer and notify interests need to break the loop at first iter.
WakerInterest::Timer => {
self.process_timer(&mut sockets);
break 'waker;
}
WakerInterest::Notify => {
self.backpressure(&mut sockets, false);
break 'waker;
Ok(WakerInterest::Notify) => {
self.maybe_backpressure(&mut sockets, false)
}
Ok(WakerInterest::Pause) => {
for (_, info) in sockets.iter_mut() {
if let Err(err) =
self.poll.registry().deregister(&mut info.sock)
{
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Err(err) => match err {
// the waker queue is empty so we break the loop
WakerQueueError::Empty => break 'waker,
// the waker queue is closed so we return
WakerQueueError::Closed => {
for (_, info) in sockets.iter_mut() {
let _ = self.poll.registry().deregister(&mut info.sock);
}
return;
Ok(WakerInterest::Resume) => {
for (token, info) in sockets.iter_mut() {
self.register_logged(token, info);
}
},
}
Ok(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets)
}
Ok(WakerInterest::Worker(worker)) => {
self.maybe_backpressure(&mut sockets, false);
self.workers.push(worker);
}
Ok(WakerInterest::Timer) => self.process_timer(&mut sockets),
Err(WakerQueueError::Empty) => break 'waker,
Err(WakerQueueError::Closed) => {
return self.deregister_all(&mut sockets);
}
}
},
_ => {
@ -261,15 +225,7 @@ impl Accept {
for (token, info) in sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.registry().register(
&mut info.sock,
MioToken(token + DELTA),
Interest::READABLE,
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
self.register_logged(token, info);
} else {
info.timeout = Some(inst);
}
@ -307,23 +263,30 @@ impl Accept {
})
}
fn backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) {
match self.register(token, info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_all(&mut self, sockets: &mut Slab<ServerSocketInfo>) {
sockets.iter_mut().for_each(|(_, info)| {
let _ = self.poll.registry().deregister(&mut info.sock);
});
}
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in sockets.iter_mut() {
if let Err(err) = self.register(token, info) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
self.register_logged(token, info);
}
}
} else if on {
self.backpressure = true;
for (_, info) in sockets.iter_mut() {
let _ = self.poll.registry().deregister(&mut info.sock);
}
self.deregister_all(sockets);
}
}
@ -331,7 +294,10 @@ impl Accept {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
Ok(_) => {
self.set_next();
break;
}
Err(tmp) => {
self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp;
@ -345,8 +311,6 @@ impl Accept {
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
@ -355,7 +319,7 @@ impl Accept {
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
self.set_next();
return;
}
Err(tmp) => {
@ -364,7 +328,7 @@ impl Accept {
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
self.backpressure(sockets, true);
self.maybe_backpressure(sockets, true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
@ -373,14 +337,19 @@ impl Accept {
}
}
}
self.next = (self.next + 1) % self.workers.len();
self.set_next();
}
// enable backpressure
self.backpressure(sockets, true);
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, msg);
}
}
// set next worker that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.workers.len();
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let msg = if let Some(info) = sockets.get_mut(token) {
@ -402,10 +371,10 @@ impl Accept {
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let w = self.waker.clone();
let waker = self.waker.clone();
System::current().arbiter().send(Box::pin(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await;
w.wake(WakerInterest::Timer);
waker.wake(WakerInterest::Timer);
}));
return;
}

View File

@ -1,7 +1,5 @@
//! General purpose TCP server.
#![deny(rust_2018_idioms)]
mod accept;
mod builder;
mod config;

View File

@ -229,7 +229,7 @@ impl Worker {
self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped;
actix_rt::spawn(
spawn(
srv.service
.call((None, ServerMessage::ForceShutdown))
.map(|_| ()),
@ -241,7 +241,7 @@ impl Worker {
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
actix_rt::spawn(
spawn(
srv.service
.call((None, ServerMessage::Shutdown(timeout)))
.map(|_| ()),
@ -304,21 +304,15 @@ enum WorkerState {
Restarting(
usize,
Token,
#[allow(clippy::type_complexity)]
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
),
Shutdown(
Pin<Box<Sleep>>,
Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>,
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
),
Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>),
}
impl Future for Worker {
type Output = ();
// FIXME: remove this attribute
#[allow(clippy::never_loop)]
// #[allow(clippy::never_loop)]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) =
@ -336,8 +330,8 @@ impl Future for Worker {
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(sleep_until(Instant::now() + time::Duration::from_secs(1))),
Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)),
sleep_until(Instant::now() + time::Duration::from_secs(1)),
sleep_until(Instant::now() + self.shutdown_timeout),
Some(result),
);
} else {
@ -391,9 +385,10 @@ impl Future for Worker {
}
}
WorkerState::Restarting(idx, token, ref mut fut) => {
match Pin::new(fut).poll(cx) {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(item)) => {
for (token, service) in item {
// only interest in the first item?
if let Some((token, service)) = item.into_iter().next() {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name(token)
@ -402,6 +397,15 @@ impl Future for Worker {
self.state = WorkerState::Unavailable(Vec::new());
return self.poll(cx);
}
// for (token, service) in item {
// trace!(
// "Service {:?} has been restarted",
// self.factories[idx].name(token)
// );
// self.services[token.0].created(service);
// self.state = WorkerState::Unavailable(Vec::new());
// return self.poll(cx);
// }
}
Poll::Ready(Err(_)) => {
panic!(
@ -424,26 +428,19 @@ impl Future for Worker {
}
// check graceful timeout
match t2.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
let _ = tx.take().unwrap().send(false);
self.shutdown(true);
Arbiter::current().stop();
return Poll::Ready(());
}
if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false);
self.shutdown(true);
Arbiter::current().stop();
return Poll::Ready(());
}
// sleep for 1 second and then check again
match t1.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
*t1 = Box::pin(sleep_until(
Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);
}
if Pin::new(&mut *t1).poll(cx).is_ready() {
*t1 = sleep_until(Instant::now() + time::Duration::from_secs(1));
let _ = Pin::new(t1).poll(cx);
}
Poll::Pending
}
WorkerState::Available => {

View File

@ -283,7 +283,7 @@ pub fn bench_async_service<S>(c: &mut Criterion, srv: S, name: &str)
where
S: Service<Request = (), Response = usize, Error = ()> + Clone + 'static,
{
let mut rt = actix_rt::System::new("test");
let rt = actix_rt::System::new("test");
// start benchmark loops
c.bench_function(name, move |b| {

View File

@ -85,7 +85,7 @@ pub fn bench_async_service<S>(c: &mut Criterion, srv: S, name: &str)
where
S: Service<Request = (), Response = usize, Error = ()> + Clone + 'static,
{
let mut rt = actix_rt::System::new("test");
let rt = actix_rt::System::new("test");
// start benchmark loops
c.bench_function(name, move |b| {
@ -94,7 +94,7 @@ where
// exclude request generation, it appears it takes significant time vs call (3us vs 1us)
let start = std::time::Instant::now();
// benchmark body
rt.block_on(async move { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
rt.block_on(async { join_all(srvs.iter_mut().map(|srv| srv.call(()))).await });
// check that at least first request succeeded
start.elapsed()
})