From 0f10bdbfab4ff2af81a4e4f9f6aac646dcafeeb8 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 21 Apr 2021 22:37:10 +0800 Subject: [PATCH] fix FromStream trait --- actix-server/src/accept.rs | 62 ++++++++++++++++++++++--------------- actix-server/src/service.rs | 24 ++++++++------ actix-server/src/socket.rs | 27 +++++++++++----- 3 files changed, 71 insertions(+), 42 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bc02ecf5..ea28c84b 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -100,36 +100,48 @@ impl Accept { fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<()> { let len = self.sockets.len(); let mut idx = 0; - while idx < len { - 'socket: loop { - if !self.avail.available() { - return Poll::Pending; + + // Only accept when there is worker available. + while self.avail.available() { + // use manual indexing so Accept::accept_one can reborrow self. + let socket = &mut self.sockets[idx]; + + match socket.lst.poll_accept(cx) { + Poll::Ready(Ok(io)) => { + let conn = Conn { + io, + token: socket.token, + }; + self.accept_one(conn); } + // error type that can continue to next connection. + Poll::Ready(Err(ref e)) if connection_error(e) => continue, + // error type that should be handled as delayed wakeup. + Poll::Ready(Err(ref e)) => { + error!("Error accepting connection: {}", e); - let socket = &mut self.sockets[idx]; + // use a sleep timer to wake up Accept after 500 ms. + // This delay is used for not busy loop on errors. + // Therefore the timing is not important and can be overwritten + // by next socket having a similar error. + let deadline = Instant::now() + Duration::from_millis(500); + self.timeout.as_mut().reset(deadline); + let _ = self.timeout.as_mut().poll(cx); - match socket.lst.poll_accept(cx) { - Poll::Ready(Ok(io)) => { - let conn = Conn { - io, - token: socket.token, - }; - self.accept_one(conn); + // increment index and handle next socket + idx += 1; + if idx == len { + break; } - Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket, - Poll::Ready(Err(ref e)) => { - error!("Error accepting connection: {}", e); - - let deadline = Instant::now() + Duration::from_millis(500); - self.timeout.as_mut().reset(deadline); - let _ = self.timeout.as_mut().poll(cx); - - break 'socket; + } + Poll::Pending => { + // increment index and handle next socket + idx += 1; + if idx == len { + break; } - Poll::Pending => break 'socket, - }; - } - idx += 1; + } + }; } Poll::Pending diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 0e539cd7..12b899e7 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; +use log::error; use crate::socket::{FromStream, Stream}; use crate::worker::WorkerCounterGuard; @@ -62,15 +63,20 @@ where } fn call(&self, (guard, req): (WorkerCounterGuard, Stream)) -> Self::Future { - let stream = FromStream::from_stream(req); - - let f = self.service.call(stream); - actix_rt::spawn(async move { - let _ = f.await; - drop(guard); - }); - - ready(Ok(())) + ready(match FromStream::from_stream(req) { + Ok(stream) => { + let f = self.service.call(stream); + actix_rt::spawn(async move { + let _ = f.await; + drop(guard); + }); + Ok(()) + } + Err(e) => { + error!("Can not convert to an async tcp stream: {}", e); + Err(()) + } + }) } } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 8966bed4..78124599 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -80,7 +80,7 @@ pub enum Stream { /// helper trait for converting mio stream to tokio stream. pub trait FromStream: Sized { - fn from_stream(stream: Stream) -> Self; + fn from_stream(stream: Stream) -> io::Result; } #[cfg(windows)] @@ -88,9 +88,12 @@ mod win_impl { use super::*; impl FromStream for TcpStream { - fn from_stream(stream: Stream) -> Self { + fn from_stream(stream: Stream) -> io::Result { match stream { - Stream::Tcp(stream) => stream, + Stream::Tcp(stream) => { + let stream = stream.into_std()?; + TcpStream::from_std(stream) + } } } } @@ -100,10 +103,15 @@ mod win_impl { mod unix_impl { use super::*; + use actix_rt::net::UnixStream; + impl FromStream for TcpStream { - fn from_stream(stream: Stream) -> Self { + fn from_stream(stream: Stream) -> io::Result { match stream { - Stream::Tcp(stream) => stream, + Stream::Tcp(stream) => { + let stream = stream.into_std()?; + TcpStream::from_std(stream) + } Stream::Uds(_) => { panic!("Should not happen, bug in server impl"); } @@ -111,11 +119,14 @@ mod unix_impl { } } - impl FromStream for actix_rt::net::UnixStream { - fn from_stream(stream: Stream) -> Self { + impl FromStream for UnixStream { + fn from_stream(stream: Stream) -> io::Result { match stream { Stream::Tcp(_) => panic!("Should not happen, bug in server impl"), - Stream::Uds(stream) => stream, + Stream::Uds(stream) => { + let stream = stream.into_std()?; + UnixStream::from_std(stream) + } } } }