diff --git a/actix-server/src/accept/acceptable.rs b/actix-server/src/accept/acceptable.rs index e0371775..494ac083 100644 --- a/actix-server/src/accept/acceptable.rs +++ b/actix-server/src/accept/acceptable.rs @@ -1,14 +1,19 @@ -use std::{fmt, io}; +use std::{fmt, io, marker::PhantomData}; use mio::{Registry, Token}; +use crate::waker_queue::WakerQueue; + #[doc(hidden)] /// Trait define IO source that can be managed by [super::Accept]. pub trait Acceptable: fmt::Debug { /// Type accepted from IO source. type Connection: Send + 'static; - fn accept(&mut self) -> io::Result>; + fn accept( + &mut self, + cx: &mut AcceptContext<'_, Self::Connection>, + ) -> io::Result>; /// Register IO source to Acceptor [Registry](mio::Registry). /// Self must impl [Source](mio::event::Source) trait. @@ -18,3 +23,24 @@ pub trait Acceptable: fmt::Debug { /// Self must impl [Source](mio::event::Source) trait. fn deregister(&mut self, registry: &Registry) -> io::Result<()>; } + +#[doc(hidden)] +/// Context type of Accept thread. Expose Waker and possible other types to public. +pub struct AcceptContext<'a, C> { + waker: WakerQueue, + _p: PhantomData<&'a ()>, +} + +impl<'a, C> AcceptContext<'a, C> { + pub(super) fn new(waker: WakerQueue) -> Self { + Self { + waker, + _p: PhantomData, + } + } + + // TODO: Make this public + pub(super) fn waker(&self) -> &WakerQueue { + &self.waker + } +} diff --git a/actix-server/src/accept/mod.rs b/actix-server/src/accept/mod.rs index 914bde44..eb05fa6e 100644 --- a/actix-server/src/accept/mod.rs +++ b/actix-server/src/accept/mod.rs @@ -7,7 +7,7 @@ use loom::thread; mod acceptable; mod availability; -pub use acceptable::Acceptable; +pub use acceptable::{AcceptContext, Acceptable}; use std::{io, time::Duration}; @@ -78,7 +78,7 @@ where struct Accept { poll: Poll, source: Box<[Source]>, - waker: WakerQueue, + context: AcceptContext<'static, A::Connection>, handles: Vec>, srv: Server, next: usize, @@ -143,7 +143,7 @@ where // actix system. let sys = System::current(); thread::Builder::new() - .name("actix-server acceptor".to_owned()) + .name("actix-server accept".to_owned()) .spawn(move || { System::set_current(sys); let source = source @@ -168,10 +168,12 @@ where avail.set_available(handle.idx(), true); }); + let context = AcceptContext::new(waker); + let accept = Accept { poll, source, - waker, + context, handles, srv, next: 0, @@ -221,7 +223,7 @@ where loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.context.waker().guard(); match guard.pop_front() { // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { @@ -363,8 +365,8 @@ where fn accept(&mut self, token: usize) { while self.avail.available() { let source = &mut self.source[token]; - - match source.acceptable.accept() { + let cx = &mut self.context; + match source.acceptable.accept(cx) { Ok(Some(io)) => { let conn = Conn { token, io }; self.accept_one(conn); @@ -384,7 +386,7 @@ where source.timeout = Some(Instant::now() + Duration::from_millis(500)); // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); + let waker = self.context.waker().clone(); System::current().arbiter().spawn(async move { sleep(Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 3505ce46..0ffb3e68 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -14,7 +14,7 @@ mod test_server; mod waker_queue; mod worker; -pub use self::accept::Acceptable; +pub use self::accept::{AcceptContext, Acceptable}; pub use self::builder::ServerBuilder; pub use self::server::Server; pub use self::service::ServiceFactory; diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 551414d4..f385f5d4 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -15,13 +15,16 @@ use actix_rt::net::TcpStream; use mio::net::TcpStream as MioTcpStream; use mio::{event::Source, Interest, Registry, Token}; -use crate::accept::Acceptable; +use crate::accept::{AcceptContext, Acceptable}; /// impl Acceptable trait for [mio::net::TcpListener] so it can be managed by server and it's [mio::Poll] instance. impl Acceptable for MioTcpListener { type Connection = MioTcpStream; - fn accept(&mut self) -> io::Result> { + fn accept( + &mut self, + _: &mut AcceptContext<'_, Self::Connection>, + ) -> io::Result> { Self::accept(self).map(|stream| Some(stream.0)) } @@ -49,7 +52,10 @@ impl From for MioListener { impl Acceptable for MioListener { type Connection = MioStream; - fn accept(&mut self) -> io::Result> { + fn accept( + &mut self, + _: &mut AcceptContext<'_, Self::Connection>, + ) -> io::Result> { match *self { MioListener::Tcp(ref mut lst) => { MioTcpListener::accept(lst).map(|stream| Some(MioStream::Tcp(stream.0))) @@ -157,7 +163,10 @@ mod unix_impl { impl Acceptable for MioUnixListener { type Connection = MioUnixStream; - fn accept(&mut self) -> io::Result> { + fn accept( + &mut self, + _: &mut AcceptContext<'_, Self::Connection>, + ) -> io::Result> { Self::accept(self).map(|stream| Some(stream.0)) }