Add AcceptContext type for passing Accept state to Acceptable trait

This commit is contained in:
fakeshadow 2021-05-04 09:53:56 +08:00
parent f3c9e465e0
commit 48ee38501a
4 changed files with 52 additions and 15 deletions

View File

@ -1,14 +1,19 @@
use std::{fmt, io};
use std::{fmt, io, marker::PhantomData};
use mio::{Registry, Token};
use crate::waker_queue::WakerQueue;
#[doc(hidden)]
/// Trait define IO source that can be managed by [super::Accept].
pub trait Acceptable: fmt::Debug {
/// Type accepted from IO source.
type Connection: Send + 'static;
fn accept(&mut self) -> io::Result<Option<Self::Connection>>;
fn accept(
&mut self,
cx: &mut AcceptContext<'_, Self::Connection>,
) -> io::Result<Option<Self::Connection>>;
/// Register IO source to Acceptor [Registry](mio::Registry).
/// Self must impl [Source](mio::event::Source) trait.
@ -18,3 +23,24 @@ pub trait Acceptable: fmt::Debug {
/// Self must impl [Source](mio::event::Source) trait.
fn deregister(&mut self, registry: &Registry) -> io::Result<()>;
}
#[doc(hidden)]
/// Context type of Accept thread. Expose Waker and possible other types to public.
pub struct AcceptContext<'a, C> {
waker: WakerQueue<C>,
_p: PhantomData<&'a ()>,
}
impl<'a, C> AcceptContext<'a, C> {
pub(super) fn new(waker: WakerQueue<C>) -> Self {
Self {
waker,
_p: PhantomData,
}
}
// TODO: Make this public
pub(super) fn waker(&self) -> &WakerQueue<C> {
&self.waker
}
}

View File

@ -7,7 +7,7 @@ use loom::thread;
mod acceptable;
mod availability;
pub use acceptable::Acceptable;
pub use acceptable::{AcceptContext, Acceptable};
use std::{io, time::Duration};
@ -78,7 +78,7 @@ where
struct Accept<A: Acceptable> {
poll: Poll,
source: Box<[Source<A>]>,
waker: WakerQueue<A::Connection>,
context: AcceptContext<'static, A::Connection>,
handles: Vec<WorkerHandleAccept<A::Connection>>,
srv: Server,
next: usize,
@ -143,7 +143,7 @@ where
// actix system.
let sys = System::current();
thread::Builder::new()
.name("actix-server acceptor".to_owned())
.name("actix-server accept".to_owned())
.spawn(move || {
System::set_current(sys);
let source = source
@ -168,10 +168,12 @@ where
avail.set_available(handle.idx(), true);
});
let context = AcceptContext::new(waker);
let accept = Accept {
poll,
source,
waker,
context,
handles,
srv,
next: 0,
@ -221,7 +223,7 @@ where
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
let mut guard = self.context.waker().guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
@ -363,8 +365,8 @@ where
fn accept(&mut self, token: usize) {
while self.avail.available() {
let source = &mut self.source[token];
match source.acceptable.accept() {
let cx = &mut self.context;
match source.acceptable.accept(cx) {
Ok(Some(io)) => {
let conn = Conn { token, io };
self.accept_one(conn);
@ -384,7 +386,7 @@ where
source.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
let waker = self.context.waker().clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);

View File

@ -14,7 +14,7 @@ mod test_server;
mod waker_queue;
mod worker;
pub use self::accept::Acceptable;
pub use self::accept::{AcceptContext, Acceptable};
pub use self::builder::ServerBuilder;
pub use self::server::Server;
pub use self::service::ServiceFactory;

View File

@ -15,13 +15,16 @@ use actix_rt::net::TcpStream;
use mio::net::TcpStream as MioTcpStream;
use mio::{event::Source, Interest, Registry, Token};
use crate::accept::Acceptable;
use crate::accept::{AcceptContext, Acceptable};
/// impl Acceptable trait for [mio::net::TcpListener] so it can be managed by server and it's [mio::Poll] instance.
impl Acceptable for MioTcpListener {
type Connection = MioTcpStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
fn accept(
&mut self,
_: &mut AcceptContext<'_, Self::Connection>,
) -> io::Result<Option<Self::Connection>> {
Self::accept(self).map(|stream| Some(stream.0))
}
@ -49,7 +52,10 @@ impl From<StdTcpListener> for MioListener {
impl Acceptable for MioListener {
type Connection = MioStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
fn accept(
&mut self,
_: &mut AcceptContext<'_, Self::Connection>,
) -> io::Result<Option<Self::Connection>> {
match *self {
MioListener::Tcp(ref mut lst) => {
MioTcpListener::accept(lst).map(|stream| Some(MioStream::Tcp(stream.0)))
@ -157,7 +163,10 @@ mod unix_impl {
impl Acceptable for MioUnixListener {
type Connection = MioUnixStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
fn accept(
&mut self,
_: &mut AcceptContext<'_, Self::Connection>,
) -> io::Result<Option<Self::Connection>> {
Self::accept(self).map(|stream| Some(stream.0))
}