From 4ff632320269a8557391802cfc1ac2de0dd6f3c9 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 21 Apr 2021 21:30:38 +0800 Subject: [PATCH] Remove actix System from Accept thread --- actix-server/src/accept.rs | 121 +++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 58 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 6064b849..bc02ecf5 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,10 +1,13 @@ -use std::time::Duration; -use std::{io, thread}; - -use actix_rt::{ - time::{sleep, Instant, Sleep}, - System, +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, + thread, + time::Duration, }; + +use actix_rt::time::{sleep, Instant, Sleep}; use log::error; use tokio::sync::mpsc::UnboundedReceiver; @@ -25,10 +28,16 @@ pub(crate) struct Accept { srv: Server, next: usize, avail: Availability, - paused: bool, + state: AcceptState, timeout: Pin>, } +enum AcceptState { + Accept, + Pause, + Stop, +} + pub(crate) enum Interest { Pause, Resume, @@ -44,14 +53,10 @@ impl Accept { srv: Server, handles: Vec, ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); + // Accept runs in its own thread and tokio runtime thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { - System::set_current(sys); - tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -87,11 +92,49 @@ impl Accept { srv, next: 0, avail, - paused: false, + state: AcceptState::Accept, timeout: Box::pin(sleep(Duration::from_millis(500))), } } + fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let len = self.sockets.len(); + let mut idx = 0; + while idx < len { + 'socket: loop { + if !self.avail.available() { + return Poll::Pending; + } + + let socket = &mut self.sockets[idx]; + + match socket.lst.poll_accept(cx) { + Poll::Ready(Ok(io)) => { + let conn = Conn { + io, + token: socket.token, + }; + self.accept_one(conn); + } + Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket, + Poll::Ready(Err(ref e)) => { + error!("Error accepting connection: {}", e); + + let deadline = Instant::now() + Duration::from_millis(500); + self.timeout.as_mut().reset(deadline); + let _ = self.timeout.as_mut().poll(cx); + + break 'socket; + } + Poll::Pending => break 'socket, + }; + } + idx += 1; + } + + Poll::Pending + } + // Send connection to worker and handle error. fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { let next = self.next(); @@ -171,10 +214,6 @@ impl Accept { } } -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - impl Future for Accept { type Output = (); @@ -190,51 +229,17 @@ impl Future for Accept { this.avail.set_available(handle.idx(), true); this.handles.push(handle); } - Interest::Pause => this.paused = true, - Interest::Resume => this.paused = false, - Interest::Stop => return Poll::Ready(()), + Interest::Pause => this.state = AcceptState::Pause, + Interest::Resume => this.state = AcceptState::Accept, + Interest::Stop => this.state = AcceptState::Stop, } } - if this.paused { - return Poll::Pending; + match this.state { + AcceptState::Accept => this.poll_accept(cx), + AcceptState::Pause => Poll::Pending, + AcceptState::Stop => Poll::Ready(()), } - - let len = this.sockets.len(); - let mut idx = 0; - while idx < len { - 'socket: loop { - if !this.avail.available() { - return Poll::Pending; - } - - let socket = &mut this.sockets[idx]; - - match socket.lst.poll_accept(cx) { - Poll::Ready(Ok(io)) => { - let conn = Conn { - io, - token: socket.token, - }; - this.accept_one(conn); - } - Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket, - Poll::Ready(Err(ref e)) => { - error!("Error accepting connection: {}", e); - - let deadline = Instant::now() + Duration::from_millis(500); - this.timeout.as_mut().reset(deadline); - let _ = this.timeout.as_mut().poll(cx); - - break 'socket; - } - Poll::Pending => break 'socket, - }; - } - idx += 1; - } - - Poll::Pending } }