fix FromStream trait

This commit is contained in:
fakeshadow 2021-04-21 22:37:10 +08:00
parent 4ff6323202
commit 0f10bdbfab
3 changed files with 71 additions and 42 deletions

View File

@ -100,36 +100,48 @@ impl Accept {
fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let len = self.sockets.len();
let mut idx = 0;
while idx < len {
'socket: loop {
if !self.avail.available() {
return Poll::Pending;
// Only accept when there is worker available.
while self.avail.available() {
// use manual indexing so Accept::accept_one can reborrow self.
let socket = &mut self.sockets[idx];
match socket.lst.poll_accept(cx) {
Poll::Ready(Ok(io)) => {
let conn = Conn {
io,
token: socket.token,
};
self.accept_one(conn);
}
// error type that can continue to next connection.
Poll::Ready(Err(ref e)) if connection_error(e) => continue,
// error type that should be handled as delayed wakeup.
Poll::Ready(Err(ref e)) => {
error!("Error accepting connection: {}", e);
let socket = &mut self.sockets[idx];
// use a sleep timer to wake up Accept after 500 ms.
// This delay is used for not busy loop on errors.
// Therefore the timing is not important and can be overwritten
// by next socket having a similar error.
let deadline = Instant::now() + Duration::from_millis(500);
self.timeout.as_mut().reset(deadline);
let _ = self.timeout.as_mut().poll(cx);
match socket.lst.poll_accept(cx) {
Poll::Ready(Ok(io)) => {
let conn = Conn {
io,
token: socket.token,
};
self.accept_one(conn);
// increment index and handle next socket
idx += 1;
if idx == len {
break;
}
Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket,
Poll::Ready(Err(ref e)) => {
error!("Error accepting connection: {}", e);
let deadline = Instant::now() + Duration::from_millis(500);
self.timeout.as_mut().reset(deadline);
let _ = self.timeout.as_mut().poll(cx);
break 'socket;
}
Poll::Pending => {
// increment index and handle next socket
idx += 1;
if idx == len {
break;
}
Poll::Pending => break 'socket,
};
}
idx += 1;
}
};
}
Poll::Pending

View File

@ -5,6 +5,7 @@ use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::socket::{FromStream, Stream};
use crate::worker::WorkerCounterGuard;
@ -62,15 +63,20 @@ where
}
fn call(&self, (guard, req): (WorkerCounterGuard, Stream)) -> Self::Future {
let stream = FromStream::from_stream(req);
let f = self.service.call(stream);
actix_rt::spawn(async move {
let _ = f.await;
drop(guard);
});
ready(Ok(()))
ready(match FromStream::from_stream(req) {
Ok(stream) => {
let f = self.service.call(stream);
actix_rt::spawn(async move {
let _ = f.await;
drop(guard);
});
Ok(())
}
Err(e) => {
error!("Can not convert to an async tcp stream: {}", e);
Err(())
}
})
}
}

View File

@ -80,7 +80,7 @@ pub enum Stream {
/// helper trait for converting mio stream to tokio stream.
pub trait FromStream: Sized {
fn from_stream(stream: Stream) -> Self;
fn from_stream(stream: Stream) -> io::Result<Self>;
}
#[cfg(windows)]
@ -88,9 +88,12 @@ mod win_impl {
use super::*;
impl FromStream for TcpStream {
fn from_stream(stream: Stream) -> Self {
fn from_stream(stream: Stream) -> io::Result<Self> {
match stream {
Stream::Tcp(stream) => stream,
Stream::Tcp(stream) => {
let stream = stream.into_std()?;
TcpStream::from_std(stream)
}
}
}
}
@ -100,10 +103,15 @@ mod win_impl {
mod unix_impl {
use super::*;
use actix_rt::net::UnixStream;
impl FromStream for TcpStream {
fn from_stream(stream: Stream) -> Self {
fn from_stream(stream: Stream) -> io::Result<Self> {
match stream {
Stream::Tcp(stream) => stream,
Stream::Tcp(stream) => {
let stream = stream.into_std()?;
TcpStream::from_std(stream)
}
Stream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
@ -111,11 +119,14 @@ mod unix_impl {
}
}
impl FromStream for actix_rt::net::UnixStream {
fn from_stream(stream: Stream) -> Self {
impl FromStream for UnixStream {
fn from_stream(stream: Stream) -> io::Result<Self> {
match stream {
Stream::Tcp(_) => panic!("Should not happen, bug in server impl"),
Stream::Uds(stream) => stream,
Stream::Uds(stream) => {
let stream = stream.into_std()?;
UnixStream::from_std(stream)
}
}
}
}