From 6ee906335a9fa700a2de14f910a813a0f82b1f7d Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 19 Oct 2020 12:48:01 +0800 Subject: [PATCH] add mio stream convertion for windows --- actix-codec/Cargo.toml | 2 +- actix-codec/src/framed.rs | 2 +- actix-rt/CHANGES.md | 4 +++ actix-rt/src/arbiter.rs | 2 +- actix-rt/src/builder.rs | 4 +-- actix-rt/src/runtime.rs | 6 ++--- actix-server/CHANGES.md | 5 +++- actix-server/src/accept.rs | 38 ++++++++------------------ actix-server/src/builder.rs | 12 ++++----- actix-server/src/socket.rs | 44 ++++++++++++++++++++++--------- actix-server/src/waker_queue.rs | 21 +-------------- actix-server/src/worker.rs | 10 +++---- actix-server/tests/test_server.rs | 1 - 13 files changed, 71 insertions(+), 80 deletions(-) diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 9f73cb80..f6407de2 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -23,4 +23,4 @@ futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "0.4.17" tokio = "0.3.0" -tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } +tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 96d0df49..37beaac9 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -222,7 +222,7 @@ impl Framed { } // FixMe: This must be fixed as `poll_read_buf` is removed - // let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) { + // let cnt = match this.io.poll_read(cx, &mut this.read_buf) { // Poll::Pending => return Poll::Pending, // Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), // Poll::Ready(Ok(cnt)) => cnt, diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 9b5fb636..c6cbedd7 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -6,6 +6,10 @@ * Add `System::attach_to_tokio` method. [#173] +### Changed +* update tokio to 0.3 +* Remove `'static` lifetime requirement for `Runtime::block_on`. The method would accept a &Self when calling. + ## [1.1.1] - 2020-04-30 ### Fixed diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index eff10ca3..7f64095e 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -112,7 +112,7 @@ impl Arbiter { let handle = thread::Builder::new() .name(name.clone()) .spawn(move || { - let mut rt = Runtime::new().expect("Can not create Runtime"); + let rt = Runtime::new().expect("Can not create Runtime"); let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index f4d9b1bf..2760b983 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -97,7 +97,7 @@ impl Builder { // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); rt.spawn(arb); // init system arbiter and run configuration method @@ -157,7 +157,7 @@ impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. pub fn run(self) -> io::Result<()> { - let SystemRunner { mut rt, stop, .. } = self; + let SystemRunner { rt, stop, .. } = self; // run loop Arbiter::run_system(Some(&rt)); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index b3726329..8c15e799 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -81,10 +81,10 @@ impl Runtime { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> F::Output + pub fn block_on(&self, f: F) -> F::Output where - F: Future + 'static, + F: Future, { - self.local.block_on(&mut self.rt, f) + self.local.block_on(&self.rt, f) } } diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 9a1b56c9..61e571a4 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,7 +1,10 @@ # Changes ## Unreleased - 2020-xx-xx - +* Update mio to 0.7.3 +* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. +* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. +* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). ## 1.0.4 - 2020-09-12 * Update actix-codec to 0.3.0. diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 4227f536..a8710c4e 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -21,21 +21,6 @@ struct ServerSocketInfo { timeout: Option, } -/// accept notify would clone waker queue from accept loop and use it to push new interest and wake -/// up the accept poll. -#[derive(Clone)] -pub(crate) struct AcceptNotify(WakerQueue); - -impl AcceptNotify { - pub(crate) fn new(waker: WakerQueue) -> Self { - Self(waker) - } - - pub(crate) fn notify(&self) { - self.0.wake(WakerInterest::Notify); - } -} - /// Accept loop would live with `ServerBuilder`. /// /// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to @@ -63,12 +48,12 @@ impl AcceptLoop { } } - pub fn wake_accept(&self, i: WakerInterest) { - self.waker.wake(i); + pub(crate) fn waker_owned(&self) -> WakerQueue { + self.waker.clone() } - pub fn get_accept_notify(&self) -> AcceptNotify { - AcceptNotify::new(self.waker.clone()) + pub fn wake_accept(&self, i: WakerInterest) { + self.waker.wake(i); } pub(crate) fn start_accept( @@ -303,23 +288,22 @@ impl Accept { } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> { + fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. self.poll + .registry() .register( - &info.sock, + &mut info.sock, mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), + Interest::READABLE, ) .or_else(|_| { - self.poll.reregister( - &info.sock, + self.poll.registry().reregister( + &mut info.sock, mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), + Interest::READABLE, ) }) } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index b5a9d298..e51ed8f7 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,13 +14,13 @@ use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt}; use log::{error, info}; use socket2::{Domain, Protocol, Socket, Type}; -use crate::accept::{AcceptLoop, AcceptNotify}; +use crate::accept::AcceptLoop; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::StdListener; -use crate::waker_queue::WakerInterest; +use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::Token; @@ -266,7 +266,7 @@ impl ServerBuilder { // start workers let workers = (0..self.threads) .map(|idx| { - let worker = self.start_worker(idx, self.accept.get_accept_notify()); + let worker = self.start_worker(idx, self.accept.waker_owned()); self.workers.push((idx, worker.clone())); worker @@ -297,8 +297,8 @@ impl ServerBuilder { } } - fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let avail = WorkerAvailability::new(notify); + fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerClient { + let avail = WorkerAvailability::new(waker); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); @@ -433,7 +433,7 @@ impl ServerBuilder { break; } - let worker = self.start_worker(new_idx, self.accept.get_accept_notify()); + let worker = self.start_worker(new_idx, self.accept.waker_owned()); self.workers.push((new_idx, worker.clone())); self.accept.wake_accept(WakerInterest::Worker(worker)); } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index c3084484..afbf4637 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,5 +1,10 @@ use std::{fmt, io, net}; +#[cfg(unix)] +use std::os::unix::io::{FromRawFd, IntoRawFd}; +#[cfg(windows)] +use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + use mio::event::Source; use mio::net::{ TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener, @@ -166,15 +171,15 @@ pub trait FromStream: AsyncRead + AsyncWrite + Sized { fn from_mio_stream(sock: MioStream) -> io::Result; } +// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream +#[cfg(unix)] impl FromStream for TcpStream { fn from_mio_stream(sock: MioStream) -> io::Result { match sock { - MioStream::Tcp(stream) => { - // FixMe: this only works on unix. We possibly want TcpStream::new from tokio. - let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream); - TcpStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) }) + MioStream::Tcp(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } - #[cfg(all(unix))] MioStream::Uds(_) => { panic!("Should not happen, bug in server impl"); } @@ -182,15 +187,30 @@ impl FromStream for TcpStream { } } -#[cfg(all(unix))] -impl FromStream for UnixStream { +// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream +#[cfg(windows)] +impl FromStream for TcpStream { fn from_mio_stream(sock: MioStream) -> io::Result { match sock { - MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - MioStream::Uds(stream) => { - // FixMe: this only works on unix. Like for TcpStream. - let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream); - UnixStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) }) + MioStream::Tcp(mio) => { + let raw = IntoRawSocket::into_raw_socket(mio); + TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) + } + MioStream::Uds(_) => { + panic!("Should not happen, bug in server impl"); + } + } + } +} + +#[cfg(unix)] +impl FromStream for UnixStream { + fn from_mio_stream(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), + MioStream::Uds(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index d7e40a95..06a2155e 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -4,8 +4,6 @@ use concurrent_queue::{ConcurrentQueue, PopError}; use mio::{Registry, Token as MioToken, Waker}; use crate::worker::WorkerClient; -use futures_util::core_reexport::fmt::Formatter; -use std::fmt::Debug; /// waker token for `mio::Poll` instance pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); @@ -41,7 +39,7 @@ impl WakerQueue { (self.0).0.wake().expect("can not wake up Accept Poll"); } - /// pop an `Interests` from the back of the queue. + /// pop an `WakerInterest` from the back of the queue. pub(crate) fn pop(&self) -> Result { (self.0).1.pop() } @@ -65,21 +63,4 @@ pub(crate) enum WakerInterest { Worker(WorkerClient), } -impl Debug for WakerInterest { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut f = f.debug_struct("WakerInterest"); - - match *self { - Self::Notify => f.field("type", &"notify"), - Self::Pause => f.field("type", &"pause"), - Self::Resume => f.field("type", &"resume"), - Self::Stop => f.field("type", &"stop"), - Self::Timer => f.field("type", &"timer"), - Self::Worker(_) => f.field("type", &"worker"), - }; - - f.finish() - } -} - pub(crate) type WakerQueueError = PopError; diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 45d8d3aa..c879ca30 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -13,9 +13,9 @@ use futures_util::future::{join_all, LocalBoxFuture, MapOk}; use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt}; use log::{error, info, trace}; -use crate::accept::AcceptNotify; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::socket::{MioStream, SocketAddr}; +use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::Token; pub(crate) struct WorkerCommand(Conn); @@ -97,14 +97,14 @@ impl WorkerClient { #[derive(Clone)] pub(crate) struct WorkerAvailability { - notify: AcceptNotify, + waker: WakerQueue, available: Arc, } impl WorkerAvailability { - pub fn new(notify: AcceptNotify) -> Self { + pub fn new(waker: WakerQueue) -> Self { WorkerAvailability { - notify, + waker, available: Arc::new(AtomicBool::new(false)), } } @@ -116,7 +116,7 @@ impl WorkerAvailability { pub fn set(&self, val: bool) { let old = self.available.swap(val, Ordering::Release); if !old && val { - self.notify.notify() + self.waker.wake(WakerInterest::Notify); } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 7217aab2..0b3ebb40 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -24,7 +24,6 @@ fn test_bind() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let srv = Server::build() - .workers(1) .disable_signals() .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .unwrap()