add mio stream convertion for windows

This commit is contained in:
fakeshadow 2020-10-19 12:48:01 +08:00
parent 80098d2906
commit 6ee906335a
13 changed files with 71 additions and 80 deletions

View File

@ -23,4 +23,4 @@ futures-sink = { version = "0.3.4", default-features = false }
log = "0.4"
pin-project = "0.4.17"
tokio = "0.3.0"
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] }

View File

@ -222,7 +222,7 @@ impl<T, U> Framed<T, U> {
}
// FixMe: This must be fixed as `poll_read_buf` is removed
// let cnt = match this.io.poll_read_buf(cx, &mut this.read_buf) {
// let cnt = match this.io.poll_read(cx, &mut this.read_buf) {
// Poll::Pending => return Poll::Pending,
// Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
// Poll::Ready(Ok(cnt)) => cnt,

View File

@ -6,6 +6,10 @@
* Add `System::attach_to_tokio` method. [#173]
### Changed
* update tokio to 0.3
* Remove `'static` lifetime requirement for `Runtime::block_on`. The method would accept a &Self when calling.
## [1.1.1] - 2020-04-30
### Fixed

View File

@ -112,7 +112,7 @@ impl Arbiter {
let handle = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let mut rt = Runtime::new().expect("Can not create Runtime");
let rt = Runtime::new().expect("Can not create Runtime");
let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel();

View File

@ -97,7 +97,7 @@ impl Builder {
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();
rt.spawn(arb);
// init system arbiter and run configuration method
@ -157,7 +157,7 @@ impl SystemRunner {
/// This function will start event loop and will finish once the
/// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> {
let SystemRunner { mut rt, stop, .. } = self;
let SystemRunner { rt, stop, .. } = self;
// run loop
Arbiter::run_system(Some(&rt));

View File

@ -81,10 +81,10 @@ impl Runtime {
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> F::Output
pub fn block_on<F>(&self, f: F) -> F::Output
where
F: Future + 'static,
F: Future,
{
self.local.block_on(&mut self.rt, f)
self.local.block_on(&self.rt, f)
}
}

View File

@ -1,7 +1,10 @@
# Changes
## Unreleased - 2020-xx-xx
* Update mio to 0.7.3
* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`.
* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly.
* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows).
## 1.0.4 - 2020-09-12
* Update actix-codec to 0.3.0.

View File

@ -21,21 +21,6 @@ struct ServerSocketInfo {
timeout: Option<Instant>,
}
/// accept notify would clone waker queue from accept loop and use it to push new interest and wake
/// up the accept poll.
#[derive(Clone)]
pub(crate) struct AcceptNotify(WakerQueue);
impl AcceptNotify {
pub(crate) fn new(waker: WakerQueue) -> Self {
Self(waker)
}
pub(crate) fn notify(&self) {
self.0.wake(WakerInterest::Notify);
}
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
@ -63,12 +48,12 @@ impl AcceptLoop {
}
}
pub fn wake_accept(&self, i: WakerInterest) {
self.waker.wake(i);
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn get_accept_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.waker.clone())
pub fn wake_accept(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start_accept(
@ -303,23 +288,22 @@ impl Accept {
}
#[cfg(target_os = "windows")]
fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> {
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
self.poll
.registry()
.register(
&info.sock,
&mut info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
Interest::READABLE,
)
.or_else(|_| {
self.poll.reregister(
&info.sock,
self.poll.registry().reregister(
&mut info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
Interest::READABLE,
)
})
}

View File

@ -14,13 +14,13 @@ use futures_util::{future::Future, ready, stream::Stream, FutureExt, StreamExt};
use log::{error, info};
use socket2::{Domain, Protocol, Socket, Type};
use crate::accept::{AcceptLoop, AcceptNotify};
use crate::accept::AcceptLoop;
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::waker_queue::WakerInterest;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::Token;
@ -266,7 +266,7 @@ impl ServerBuilder {
// start workers
let workers = (0..self.threads)
.map(|idx| {
let worker = self.start_worker(idx, self.accept.get_accept_notify());
let worker = self.start_worker(idx, self.accept.waker_owned());
self.workers.push((idx, worker.clone()));
worker
@ -297,8 +297,8 @@ impl ServerBuilder {
}
}
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let avail = WorkerAvailability::new(notify);
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerClient {
let avail = WorkerAvailability::new(waker);
let services: Vec<Box<dyn InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect();
@ -433,7 +433,7 @@ impl ServerBuilder {
break;
}
let worker = self.start_worker(new_idx, self.accept.get_accept_notify());
let worker = self.start_worker(new_idx, self.accept.waker_owned());
self.workers.push((new_idx, worker.clone()));
self.accept.wake_accept(WakerInterest::Worker(worker));
}

View File

@ -1,5 +1,10 @@
use std::{fmt, io, net};
#[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use mio::event::Source;
use mio::net::{
TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener,
@ -166,15 +171,15 @@ pub trait FromStream: AsyncRead + AsyncWrite + Sized {
fn from_mio_stream(sock: MioStream) -> io::Result<Self>;
}
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)]
impl FromStream for TcpStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(stream) => {
// FixMe: this only works on unix. We possibly want TcpStream::new from tokio.
let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream);
TcpStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) })
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
#[cfg(all(unix))]
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
@ -182,15 +187,30 @@ impl FromStream for TcpStream {
}
}
#[cfg(all(unix))]
impl FromStream for UnixStream {
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(windows)]
impl FromStream for TcpStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(stream) => {
// FixMe: this only works on unix. Like for TcpStream.
let raw = std::os::unix::io::IntoRawFd::into_raw_fd(stream);
UnixStream::from_std(unsafe { std::os::unix::io::FromRawFd::from_raw_fd(raw) })
MioStream::Tcp(mio) => {
let raw = IntoRawSocket::into_raw_socket(mio);
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
#[cfg(unix)]
impl FromStream for UnixStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
}
}

View File

@ -4,8 +4,6 @@ use concurrent_queue::{ConcurrentQueue, PopError};
use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerClient;
use futures_util::core_reexport::fmt::Formatter;
use std::fmt::Debug;
/// waker token for `mio::Poll` instance
pub(crate) const WAKER_TOKEN: MioToken = MioToken(1);
@ -41,7 +39,7 @@ impl WakerQueue {
(self.0).0.wake().expect("can not wake up Accept Poll");
}
/// pop an `Interests` from the back of the queue.
/// pop an `WakerInterest` from the back of the queue.
pub(crate) fn pop(&self) -> Result<WakerInterest, WakerQueueError> {
(self.0).1.pop()
}
@ -65,21 +63,4 @@ pub(crate) enum WakerInterest {
Worker(WorkerClient),
}
impl Debug for WakerInterest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("WakerInterest");
match *self {
Self::Notify => f.field("type", &"notify"),
Self::Pause => f.field("type", &"pause"),
Self::Resume => f.field("type", &"resume"),
Self::Stop => f.field("type", &"stop"),
Self::Timer => f.field("type", &"timer"),
Self::Worker(_) => f.field("type", &"worker"),
};
f.finish()
}
}
pub(crate) type WakerQueueError = PopError;

View File

@ -13,9 +13,9 @@ use futures_util::future::{join_all, LocalBoxFuture, MapOk};
use futures_util::{future::Future, stream::Stream, FutureExt, TryFutureExt};
use log::{error, info, trace};
use crate::accept::AcceptNotify;
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};
use crate::socket::{MioStream, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::Token;
pub(crate) struct WorkerCommand(Conn);
@ -97,14 +97,14 @@ impl WorkerClient {
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
notify: AcceptNotify,
waker: WakerQueue,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(notify: AcceptNotify) -> Self {
pub fn new(waker: WakerQueue) -> Self {
WorkerAvailability {
notify,
waker,
available: Arc::new(AtomicBool::new(false)),
}
}
@ -116,7 +116,7 @@ impl WorkerAvailability {
pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
self.notify.notify()
self.waker.wake(WakerInterest::Notify);
}
}
}

View File

@ -24,7 +24,6 @@ fn test_bind() {
let h = thread::spawn(move || {
let sys = actix_rt::System::new("test");
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()