mirror of https://github.com/fafhrd91/actix-net
Merge remote-tracking branch 'semtexzv/master' into std-future
* semtexzv/master: Migrate actix-codec, actix-rt, and actix-threadpool to std::future
This commit is contained in:
commit
43c04fc041
|
@ -12,3 +12,5 @@ guide/build/
|
|||
|
||||
# These are backup files generated by rustfmt
|
||||
**/*.rs.bk
|
||||
|
||||
.idea
|
||||
|
|
|
@ -19,7 +19,8 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
bytes = "0.4.12"
|
||||
futures = "0.1.24"
|
||||
tokio-io = "0.1.12"
|
||||
tokio-codec = "0.1.1"
|
||||
pin-utils = "0.1.0-alpha.4"
|
||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
|
||||
tokio-io = "0.2.0-alpha.4"
|
||||
tokio-codec = "0.2.0-alpha.4"
|
||||
log = "0.4"
|
|
@ -4,12 +4,14 @@ use std::fmt;
|
|||
use std::io::{self, Read, Write};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{Poll, Sink, StartSend, Stream};
|
||||
use futures::{Poll, Sink, Stream};
|
||||
use tokio_codec::{Decoder, Encoder};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
|
||||
use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
|
||||
const LW: usize = 1024;
|
||||
const HW: usize = 8 * 1024;
|
||||
|
@ -221,41 +223,43 @@ impl<T, U> Framed<T, U> {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T, U> Stream for Framed<T, U>
|
||||
where
|
||||
T: AsyncRead,
|
||||
U: Decoder,
|
||||
{
|
||||
type Item = U::Item;
|
||||
type Error = U::Error;
|
||||
type Item = Result<U::Item,U::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.inner.poll()
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
impl<T, U> Sink for Framed<T, U>
|
||||
impl<T, U> Sink<U::Item> for Framed<T, U>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
U: Encoder,
|
||||
U::Error: From<io::Error>,
|
||||
{
|
||||
type SinkItem = U::Item;
|
||||
type SinkError = U::Error;
|
||||
type Error = U::Error;
|
||||
|
||||
fn start_send(
|
||||
&mut self,
|
||||
item: Self::SinkItem,
|
||||
) -> StartSend<Self::SinkItem, Self::SinkError> {
|
||||
self.inner.get_mut().start_send(item)
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)}
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.get_mut().poll_complete()
|
||||
fn start_send(self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), Self::Error> {
|
||||
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)}
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.get_mut().close()
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)}
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -280,10 +284,16 @@ impl<T: Read, U> Read for Fuse<T, U> {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
|
||||
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.0.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Write, U> Write for Fuse<T, U> {
|
||||
|
@ -296,12 +306,23 @@ impl<T: Write, U> Write for Fuse<T, U> {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.0.shutdown()
|
||||
|
||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)}
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<T, U: Decoder> Decoder for Fuse<T, U> {
|
||||
type Item = U::Item;
|
||||
type Error = U::Error;
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
use std::fmt;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{try_ready, Async, Poll, Sink, StartSend, Stream};
|
||||
use futures::{Poll, Sink, Stream};
|
||||
use log::trace;
|
||||
use tokio_codec::Decoder;
|
||||
use tokio_io::AsyncRead;
|
||||
|
||||
use super::framed::Fuse;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
|
||||
/// A `Stream` of messages decoded from an `AsyncRead`.
|
||||
pub struct FramedRead<T, D> {
|
||||
|
@ -83,35 +85,35 @@ where
|
|||
T: AsyncRead,
|
||||
D: Decoder,
|
||||
{
|
||||
type Item = D::Item;
|
||||
type Error = D::Error;
|
||||
type Item = Result<D::Item,D::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.inner.poll()
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, D> Sink for FramedRead<T, D>
|
||||
impl<I, T, D> Sink<I> for FramedRead<T, D>
|
||||
where
|
||||
T: Sink,
|
||||
T: Sink<I>,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
type Error = T::Error;
|
||||
|
||||
fn start_send(
|
||||
&mut self,
|
||||
item: Self::SinkItem,
|
||||
) -> StartSend<Self::SinkItem, Self::SinkError> {
|
||||
self.inner.inner.0.start_send(item)
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)}
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.inner.0.poll_complete()
|
||||
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)}
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.inner.0.close()
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)}
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<T, D> fmt::Debug for FramedRead<T, D>
|
||||
|
@ -174,46 +176,66 @@ impl<T> FramedRead2<T> {
|
|||
|
||||
impl<T> Stream for FramedRead2<T>
|
||||
where
|
||||
T: AsyncRead + Decoder,
|
||||
T: tokio_io::AsyncRead + Decoder,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
type Item = Result<T::Item,T::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut this = unsafe { self.get_unchecked_mut() };
|
||||
loop {
|
||||
// Repeatedly call `decode` or `decode_eof` as long as it is
|
||||
// "readable". Readable is defined as not having returned `None`. If
|
||||
// the upstream has returned EOF, and the decoder is no longer
|
||||
// readable, it can be assumed that the decoder will never become
|
||||
// readable again, at which point the stream is terminated.
|
||||
if self.is_readable {
|
||||
if self.eof {
|
||||
let frame = self.inner.decode_eof(&mut self.buffer)?;
|
||||
return Ok(Async::Ready(frame));
|
||||
|
||||
if this.is_readable {
|
||||
if this.eof {
|
||||
match this.inner.decode_eof(&mut this.buffer) {
|
||||
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
|
||||
Ok(None) => return Poll::Ready(None),
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
trace!("attempting to decode a frame");
|
||||
|
||||
if let Some(frame) = self.inner.decode(&mut self.buffer)? {
|
||||
trace!("frame decoded from buffer");
|
||||
return Ok(Async::Ready(Some(frame)));
|
||||
match this.inner.decode(&mut this.buffer) {
|
||||
Ok(Some(frame)) => {
|
||||
trace!("frame decoded from buffer");
|
||||
return Poll::Ready(Some(Ok(frame)));
|
||||
}
|
||||
Err(e) => {
|
||||
return Poll::Ready(Some(Err(e)))
|
||||
}
|
||||
_ => {
|
||||
// Need more data
|
||||
}
|
||||
}
|
||||
|
||||
self.is_readable = false;
|
||||
|
||||
this.is_readable = false;
|
||||
}
|
||||
|
||||
assert!(!self.eof);
|
||||
assert!(!this.eof);
|
||||
|
||||
// Otherwise, try to read more data and try again. Make sure we've
|
||||
// got room for at least one byte to read to ensure that we don't
|
||||
// get a spurious 0 that looks like EOF
|
||||
self.buffer.reserve(1);
|
||||
if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
|
||||
trace!("read 0 bytes, mark stream as eof");
|
||||
self.eof = true;
|
||||
}
|
||||
this.buffer.reserve(1);
|
||||
unsafe {
|
||||
|
||||
self.is_readable = true;
|
||||
match Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut this.buffer) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
||||
Poll::Ready(Ok(0)) => {
|
||||
this.eof = true;
|
||||
}
|
||||
Poll::Ready(Ok(_cnt)) => {}
|
||||
}
|
||||
}
|
||||
this.is_readable = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,12 +2,14 @@ use std::fmt;
|
|||
use std::io::{self, Read};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream};
|
||||
use futures::{ready,Poll, Sink, Stream};
|
||||
use log::trace;
|
||||
use tokio_codec::{Decoder, Encoder};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use super::framed::Fuse;
|
||||
use std::task::Context;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// A `Sink` of frames encoded to an `AsyncWrite`.
|
||||
pub struct FramedWrite<T, E> {
|
||||
|
@ -95,24 +97,27 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T, E> Sink for FramedWrite<T, E>
|
||||
impl<T, E> Sink<E::Item> for FramedWrite<T, E>
|
||||
where
|
||||
T: AsyncWrite,
|
||||
E: Encoder,
|
||||
{
|
||||
type SinkItem = E::Item;
|
||||
type SinkError = E::Error;
|
||||
type Error = E::Error;
|
||||
|
||||
fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> {
|
||||
self.inner.start_send(item)
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx)}
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
fn start_send(self: Pin<&mut Self>, item: <E as Encoder>::Item) -> Result<(), Self::Error> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item)}
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), Self::SinkError> {
|
||||
Ok(self.inner.close()?)
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx)}
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx)}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,10 +126,9 @@ where
|
|||
T: Stream,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.inner.inner.0.poll()
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_next(cx)}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -220,13 +224,75 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Sink for FramedWrite2<T>
|
||||
impl<T> Sink<T::Item> for FramedWrite2<T>
|
||||
where
|
||||
T: AsyncWrite + Encoder,
|
||||
{
|
||||
type SinkItem = T::Item;
|
||||
type SinkError = T::Error;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let len = self.buffer.len();
|
||||
if len >= self.high_watermark {
|
||||
return Poll::Pending;
|
||||
} else {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: <T as Encoder>::Item) -> Result<(), Self::Error> {
|
||||
let this = unsafe { self.get_unchecked_mut() };
|
||||
// Check the buffer capacity
|
||||
let len = this.buffer.len();
|
||||
if len < this.low_watermark {
|
||||
this.buffer.reserve(this.high_watermark - len)
|
||||
}
|
||||
|
||||
this.inner.encode(item, &mut this.buffer)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let this = unsafe { self.get_unchecked_mut() };
|
||||
trace!("flushing framed transport");
|
||||
|
||||
while !this.buffer.is_empty() {
|
||||
trace!("writing; remaining={}", this.buffer.len());
|
||||
|
||||
let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?;
|
||||
|
||||
if n == 0 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"failed to \
|
||||
write frame to transport",
|
||||
)
|
||||
.into()))
|
||||
}
|
||||
|
||||
// TODO: Add a way to `bytes` to do this w/o returning the drained
|
||||
// data.
|
||||
let _ = this.buffer.split_to(n);
|
||||
}
|
||||
|
||||
// Try flushing the underlying IO
|
||||
ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?;
|
||||
|
||||
trace!("framed transport flushed");
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let mut this = unsafe { self.get_unchecked_mut() };
|
||||
ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?;
|
||||
ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?;
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
|
||||
// Check the buffer capacity
|
||||
let len = self.buffer.len();
|
||||
|
@ -275,6 +341,7 @@ where
|
|||
try_ready!(self.poll_complete());
|
||||
Ok(self.inner.shutdown()?)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
impl<T: Decoder> Decoder for FramedWrite2<T> {
|
||||
|
@ -300,4 +367,8 @@ impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
|
|||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.inner.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
|
||||
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
|
||||
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,4 +21,5 @@ pub use self::framed_read::FramedRead;
|
|||
pub use self::framed_write::FramedWrite;
|
||||
|
||||
pub use tokio_codec::{Decoder, Encoder};
|
||||
// TODO: Migrate to futures asyncRead
|
||||
pub use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
|
|
@ -19,9 +19,13 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
actix-threadpool = "0.1.1"
|
||||
futures = "0.1.25"
|
||||
tokio-current-thread = "0.1"
|
||||
tokio-executor = "0.1.5"
|
||||
tokio-reactor = "0.1.7"
|
||||
tokio-timer = "0.2.8"
|
||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
|
||||
|
||||
|
||||
# TODO: Replace this with dependency on tokio-runtime once it is ready
|
||||
tokio = { version = "0.2.0-alpha.4" }
|
||||
tokio-timer = "=0.3.0-alpha.4"
|
||||
tokio-executor = "=0.2.0-alpha.4"
|
||||
tokio-net = "=0.2.0-alpha.4"
|
||||
|
||||
copyless = "0.1.4"
|
||||
|
|
|
@ -3,11 +3,13 @@ use std::cell::{Cell, RefCell};
|
|||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::{fmt, thread};
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
|
||||
use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
|
||||
use futures::sync::oneshot::{channel, Canceled, Sender};
|
||||
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
|
||||
use tokio_current_thread::spawn;
|
||||
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
|
||||
use futures::channel::oneshot::{channel, Canceled, Sender};
|
||||
use futures::{future, Future, Poll, FutureExt, Stream};
|
||||
use tokio::runtime::current_thread::spawn;
|
||||
|
||||
use crate::builder::Builder;
|
||||
use crate::system::System;
|
||||
|
@ -17,7 +19,7 @@ use copyless::BoxHelper;
|
|||
thread_local!(
|
||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||
static RUNNING: Cell<bool> = Cell::new(false);
|
||||
static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
|
||||
static Q: RefCell<Vec<Box<dyn Future<Output = ()>>>> = RefCell::new(Vec::new());
|
||||
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||
);
|
||||
|
||||
|
@ -25,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
|||
|
||||
pub(crate) enum ArbiterCommand {
|
||||
Stop,
|
||||
Execute(Box<dyn Future<Item = (), Error = ()> + Send>),
|
||||
Execute(Box<dyn Future<Output=()> + Unpin + Send>),
|
||||
ExecuteFn(Box<dyn FnExec>),
|
||||
}
|
||||
|
||||
|
@ -129,7 +131,9 @@ impl Arbiter {
|
|||
Q.with(|cell| {
|
||||
let mut v = cell.borrow_mut();
|
||||
for fut in v.drain(..) {
|
||||
spawn(fut);
|
||||
// We pin the boxed future, so it can never again be moved.
|
||||
let fut = unsafe { Pin::new_unchecked(fut) };
|
||||
tokio_executor::current_thread::spawn( fut);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -142,14 +146,19 @@ impl Arbiter {
|
|||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||
/// thread.
|
||||
pub fn spawn<F>(future: F)
|
||||
where
|
||||
F: Future<Item = (), Error = ()> + 'static,
|
||||
where
|
||||
F: Future<Output=()> + 'static,
|
||||
{
|
||||
RUNNING.with(move |cell| {
|
||||
if cell.get() {
|
||||
spawn(Box::alloc().init(future));
|
||||
// Spawn the future on running executor
|
||||
spawn(future);
|
||||
} else {
|
||||
Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future)));
|
||||
// Box the future and push it to the queue, this results in double boxing
|
||||
// because the executor boxes the future again, but works for now
|
||||
Q.with(move |cell| {
|
||||
cell.borrow_mut().push(Box::alloc().init(future))
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -158,17 +167,17 @@ impl Arbiter {
|
|||
/// or Arbiter address, it is simply a helper for executing futures on the current
|
||||
/// thread.
|
||||
pub fn spawn_fn<F, R>(f: F)
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: IntoFuture<Item = (), Error = ()> + 'static,
|
||||
where
|
||||
F: FnOnce() -> R + 'static,
|
||||
R: Future<Output=()> + 'static,
|
||||
{
|
||||
Arbiter::spawn(future::lazy(f))
|
||||
Arbiter::spawn(future::lazy(|_| f()).flatten())
|
||||
}
|
||||
|
||||
/// Send a future to the Arbiter's thread, and spawn it.
|
||||
pub fn send<F>(&self, future: F)
|
||||
where
|
||||
F: Future<Item = (), Error = ()> + Send + 'static,
|
||||
where
|
||||
F: Future<Output=()> + Send + Unpin + 'static,
|
||||
{
|
||||
let _ = self
|
||||
.0
|
||||
|
@ -178,8 +187,8 @@ impl Arbiter {
|
|||
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
|
||||
/// is discarded.
|
||||
pub fn exec_fn<F>(&self, f: F)
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
let _ = self
|
||||
.0
|
||||
|
@ -191,10 +200,10 @@ impl Arbiter {
|
|||
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
|
||||
/// A future is created, and when resolved will contain the result of the function sent
|
||||
/// to the Arbiters thread.
|
||||
pub fn exec<F, R>(&self, f: F) -> impl Future<Item = R, Error = Canceled>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
pub fn exec<F, R>(&self, f: F) -> impl Future<Output=Result<R, Canceled>>
|
||||
where
|
||||
F: FnOnce() -> R + Send + 'static,
|
||||
R: Send + 'static,
|
||||
{
|
||||
let (tx, rx) = channel();
|
||||
let _ = self
|
||||
|
@ -221,8 +230,8 @@ impl Arbiter {
|
|||
///
|
||||
/// Panics is item is not inserted
|
||||
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&T) -> R,
|
||||
where
|
||||
F: FnMut(&T) -> R,
|
||||
{
|
||||
STORAGE.with(move |cell| {
|
||||
let st = cell.borrow();
|
||||
|
@ -238,8 +247,8 @@ impl Arbiter {
|
|||
///
|
||||
/// Panics is item is not inserted
|
||||
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
|
||||
where
|
||||
F: FnMut(&mut T) -> R,
|
||||
where
|
||||
F: FnMut(&mut T) -> R,
|
||||
{
|
||||
STORAGE.with(move |cell| {
|
||||
let mut st = cell.borrow_mut();
|
||||
|
@ -271,28 +280,33 @@ impl Drop for ArbiterController {
|
|||
}
|
||||
|
||||
impl Future for ArbiterController {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
type Output = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match self.rx.poll() {
|
||||
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some(item))) => match item {
|
||||
ArbiterCommand::Stop => {
|
||||
if let Some(stop) = self.stop.take() {
|
||||
let _ = stop.send(0);
|
||||
};
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
ArbiterCommand::Execute(fut) => {
|
||||
spawn(fut);
|
||||
}
|
||||
ArbiterCommand::ExecuteFn(f) => {
|
||||
f.call_box();
|
||||
}
|
||||
match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.rx) }.poll_next(cx) {
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(())
|
||||
},
|
||||
Poll::Ready(Some(item)) => {
|
||||
match item {
|
||||
ArbiterCommand::Stop => {
|
||||
if let Some(stop) = self.stop.take() {
|
||||
let _ = stop.send(0);
|
||||
};
|
||||
return Poll::Ready(());
|
||||
}
|
||||
ArbiterCommand::Execute(fut) => {
|
||||
spawn(fut);
|
||||
}
|
||||
ArbiterCommand::ExecuteFn(f) => {
|
||||
f.call_box();
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending => {
|
||||
return Poll::Pending
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -323,14 +337,13 @@ impl SystemArbiter {
|
|||
}
|
||||
|
||||
impl Future for SystemArbiter {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
type Output = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match self.commands.poll() {
|
||||
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
|
||||
Ok(Async::Ready(Some(cmd))) => match cmd {
|
||||
match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.commands) }.poll_next(cx) {
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(cmd)) => match cmd {
|
||||
SystemCommand::Exit(code) => {
|
||||
// stop arbiters
|
||||
for arb in self.arbiters.values() {
|
||||
|
@ -348,7 +361,7 @@ impl Future for SystemArbiter {
|
|||
self.arbiters.remove(&name);
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -359,8 +372,8 @@ pub trait FnExec: Send + 'static {
|
|||
}
|
||||
|
||||
impl<F> FnExec for F
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
where
|
||||
F: FnOnce() + Send + 'static,
|
||||
{
|
||||
#[allow(clippy::boxed_local)]
|
||||
fn call_box(self: Box<Self>) {
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
|
||||
use futures::future;
|
||||
use futures::{future, FutureExt};
|
||||
use futures::future::{lazy, Future};
|
||||
use futures::sync::mpsc::unbounded;
|
||||
use futures::sync::oneshot::{channel, Receiver};
|
||||
use futures::channel::mpsc::unbounded;
|
||||
use futures::channel::oneshot::{channel, Receiver};
|
||||
|
||||
use tokio_current_thread::{CurrentThread, Handle};
|
||||
use tokio_reactor::Reactor;
|
||||
use tokio_timer::clock::Clock;
|
||||
use tokio_timer::timer::Timer;
|
||||
use tokio::runtime::current_thread::Handle;
|
||||
use tokio_timer::{timer::Timer, clock::Clock};
|
||||
use tokio_net::driver::Reactor;
|
||||
|
||||
use crate::arbiter::{Arbiter, SystemArbiter};
|
||||
use crate::runtime::Runtime;
|
||||
use crate::system::System;
|
||||
use tokio_executor::current_thread::CurrentThread;
|
||||
|
||||
/// Builder struct for a actix runtime.
|
||||
///
|
||||
|
@ -118,7 +118,7 @@ impl Builder {
|
|||
rt.spawn(arb);
|
||||
|
||||
// init system arbiter and run configuration method
|
||||
let _ = rt.block_on(lazy(move || {
|
||||
let _ = rt.block_on(lazy(move |_| {
|
||||
f();
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
|
@ -159,30 +159,30 @@ pub(crate) struct AsyncSystemRunner {
|
|||
impl AsyncSystemRunner {
|
||||
/// This function will start event loop and returns a future that
|
||||
/// resolves once the `System::stop()` function is called.
|
||||
pub(crate) fn run_nonblocking(self) -> impl Future<Item = (), Error = io::Error> + Send {
|
||||
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(),io::Error>> + Send {
|
||||
let AsyncSystemRunner { stop, .. } = self;
|
||||
|
||||
// run loop
|
||||
future::lazy(|| {
|
||||
future::lazy(|_| {
|
||||
Arbiter::run_system();
|
||||
stop.then(|res| match res {
|
||||
Ok(code) => {
|
||||
if code != 0 {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Non-zero exit code: {}", code),
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
async {
|
||||
let res = match stop.await {
|
||||
Ok(code) => {
|
||||
if code != 0 {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("Non-zero exit code: {}", code),
|
||||
))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
})
|
||||
.then(|result| {
|
||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||
};
|
||||
Arbiter::stop_system();
|
||||
result
|
||||
})
|
||||
})
|
||||
return res;
|
||||
}
|
||||
}).flatten()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,10 +202,10 @@ impl SystemRunner {
|
|||
let SystemRunner { mut rt, stop, .. } = self;
|
||||
|
||||
// run loop
|
||||
let _ = rt.block_on(lazy(move || {
|
||||
let _ = rt.block_on(async {
|
||||
Arbiter::run_system();
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
});
|
||||
let result = match rt.block_on(stop) {
|
||||
Ok(code) => {
|
||||
if code != 0 {
|
||||
|
@ -224,19 +224,19 @@ impl SystemRunner {
|
|||
}
|
||||
|
||||
/// Execute a future and wait for result.
|
||||
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
|
||||
pub fn block_on<F, O>(&mut self, fut: F) -> O
|
||||
where
|
||||
F: Future<Item = I, Error = E>,
|
||||
F: Future<Output = O>,
|
||||
{
|
||||
let _ = self.rt.block_on(lazy(move || {
|
||||
let _ = self.rt.block_on(async {
|
||||
Arbiter::run_system();
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
});
|
||||
|
||||
let res = self.rt.block_on(fut);
|
||||
let _ = self.rt.block_on(lazy(move || {
|
||||
let _ = self.rt.block_on(async {
|
||||
Arbiter::stop_system();
|
||||
Ok::<_, ()>(())
|
||||
}));
|
||||
});
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ pub use actix_threadpool as blocking;
|
|||
/// This function panics if actix system is not running.
|
||||
pub fn spawn<F>(f: F)
|
||||
where
|
||||
F: futures::Future<Item = (), Error = ()> + 'static,
|
||||
F: futures::Future<Output = ()> + 'static,
|
||||
{
|
||||
if !System::is_set() {
|
||||
panic!("System is not running");
|
||||
|
|
|
@ -2,11 +2,9 @@ use std::error::Error;
|
|||
use std::{fmt, io};
|
||||
|
||||
use futures::Future;
|
||||
use tokio_current_thread::{self as current_thread, CurrentThread};
|
||||
use tokio_executor;
|
||||
use tokio_reactor::{self, Reactor};
|
||||
use tokio_timer::clock::{self, Clock};
|
||||
use tokio_timer::timer::{self, Timer};
|
||||
use tokio_executor::current_thread::{self, CurrentThread};
|
||||
use tokio_timer::{timer::{self, Timer}, clock::Clock};
|
||||
use tokio_net::driver::{Reactor, Handle as ReactorHandle};
|
||||
|
||||
use crate::builder::Builder;
|
||||
|
||||
|
@ -18,7 +16,7 @@ use crate::builder::Builder;
|
|||
/// [mod]: index.html
|
||||
#[derive(Debug)]
|
||||
pub struct Runtime {
|
||||
reactor_handle: tokio_reactor::Handle,
|
||||
reactor_handle: ReactorHandle,
|
||||
timer_handle: timer::Handle,
|
||||
clock: Clock,
|
||||
executor: CurrentThread<Timer<Reactor>>,
|
||||
|
@ -53,7 +51,7 @@ impl Runtime {
|
|||
}
|
||||
|
||||
pub(super) fn new2(
|
||||
reactor_handle: tokio_reactor::Handle,
|
||||
reactor_handle: ReactorHandle,
|
||||
timer_handle: timer::Handle,
|
||||
clock: Clock,
|
||||
executor: CurrentThread<Timer<Reactor>>,
|
||||
|
@ -97,7 +95,7 @@ impl Runtime {
|
|||
/// is currently at capacity and is unable to spawn a new future.
|
||||
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
||||
where
|
||||
F: Future<Item = (), Error = ()> + 'static,
|
||||
F: Future<Output = (),> + 'static,
|
||||
{
|
||||
self.executor.spawn(future);
|
||||
self
|
||||
|
@ -119,14 +117,14 @@ impl Runtime {
|
|||
///
|
||||
/// The caller is responsible for ensuring that other spawned futures
|
||||
/// complete execution by calling `block_on` or `run`.
|
||||
pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error>
|
||||
pub fn block_on<F>(&mut self, f: F) -> F::Output
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
self.enter(|executor| {
|
||||
// Run the provided future
|
||||
let ret = executor.block_on(f);
|
||||
ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
|
||||
ret
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -139,7 +137,7 @@ impl Runtime {
|
|||
|
||||
fn enter<F, R>(&mut self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R,
|
||||
F: FnOnce(&mut CurrentThread<Timer<Reactor>>) -> R,
|
||||
{
|
||||
let Runtime {
|
||||
ref reactor_handle,
|
||||
|
@ -149,26 +147,13 @@ impl Runtime {
|
|||
..
|
||||
} = *self;
|
||||
|
||||
// Binds an executor to this thread
|
||||
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
|
||||
// WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered
|
||||
// automatically inside its `block_on` and `run` methods
|
||||
let _reactor_guard = tokio_net::driver::set_default(reactor_handle);
|
||||
let _timer_guard = tokio_timer::set_default(timer_handle);
|
||||
|
||||
// This will set the default handle and timer to use inside the closure
|
||||
// and run the future.
|
||||
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
|
||||
clock::with_default(clock, enter, |enter| {
|
||||
timer::with_default(&timer_handle, enter, |enter| {
|
||||
// The TaskExecutor is a fake executor that looks into the
|
||||
// current single-threaded executor when used. This is a trick,
|
||||
// because we need two mutable references to the executor (one
|
||||
// to run the provided future, another to install as the default
|
||||
// one). We use the fake one here as the default one.
|
||||
let mut default_executor = current_thread::TaskExecutor::current();
|
||||
tokio_executor::with_default(&mut default_executor, enter, |enter| {
|
||||
let mut executor = executor.enter(enter);
|
||||
f(&mut executor)
|
||||
})
|
||||
})
|
||||
})
|
||||
tokio_timer::clock::with_default(clock , || {
|
||||
f(executor)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,9 +2,9 @@ use std::cell::RefCell;
|
|||
use std::io;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use futures::sync::mpsc::UnboundedSender;
|
||||
use futures::channel::mpsc::UnboundedSender;
|
||||
use futures::Future;
|
||||
use tokio_current_thread::Handle;
|
||||
use tokio::runtime::current_thread::Handle;
|
||||
|
||||
use crate::arbiter::{Arbiter, SystemCommand};
|
||||
use crate::builder::{Builder, SystemRunner};
|
||||
|
@ -64,7 +64,7 @@ impl System {
|
|||
pub fn run_in_executor<T: Into<String>>(
|
||||
name: T,
|
||||
executor: Handle,
|
||||
) -> impl Future<Item = (), Error = io::Error> + Send {
|
||||
) -> impl Future<Output = Result<(), io::Error>> + Send {
|
||||
Self::builder()
|
||||
.name(name)
|
||||
.build_async(executor)
|
||||
|
|
|
@ -19,7 +19,7 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
derive_more = "0.15"
|
||||
futures = "0.1.25"
|
||||
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
|
||||
parking_lot = "0.9"
|
||||
lazy_static = "1.2"
|
||||
log = "0.4"
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
//! Thread pool for blocking operations
|
||||
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::task::{Poll,Context};
|
||||
|
||||
use derive_more::Display;
|
||||
use futures::sync::oneshot;
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::channel::oneshot;
|
||||
use parking_lot::Mutex;
|
||||
use threadpool::ThreadPool;
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Env variable for default cpu pool size
|
||||
const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL";
|
||||
|
@ -41,20 +42,15 @@ thread_local! {
|
|||
|
||||
/// Blocking operation execution error
|
||||
#[derive(Debug, Display)]
|
||||
pub enum BlockingError<E: fmt::Debug> {
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
Error(E),
|
||||
#[display(fmt = "Thread pool is gone")]
|
||||
Canceled,
|
||||
}
|
||||
#[display(fmt = "Thread pool is gone")]
|
||||
pub struct Cancelled;
|
||||
|
||||
/// Execute blocking function on a thread pool, returns future that resolves
|
||||
/// to result of the function execution.
|
||||
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
|
||||
pub fn run<F, I>(f: F) -> CpuFuture<I>
|
||||
where
|
||||
F: FnOnce() -> Result<I, E> + Send + 'static,
|
||||
F: FnOnce() -> I + Send + 'static,
|
||||
I: Send + 'static,
|
||||
E: Send + fmt::Debug + 'static,
|
||||
{
|
||||
let (tx, rx) = oneshot::channel();
|
||||
POOL.with(|pool| {
|
||||
|
@ -70,19 +66,18 @@ where
|
|||
|
||||
/// Blocking operation completion future. It resolves with results
|
||||
/// of blocking function execution.
|
||||
pub struct CpuFuture<I, E> {
|
||||
rx: oneshot::Receiver<Result<I, E>>,
|
||||
pub struct CpuFuture<I> {
|
||||
rx: oneshot::Receiver<I>,
|
||||
}
|
||||
|
||||
impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
|
||||
type Item = I;
|
||||
type Error = BlockingError<E>;
|
||||
impl<I> Future for CpuFuture<I> {
|
||||
type Output = Result<I,Cancelled>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let res = futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled));
|
||||
match res {
|
||||
Ok(val) => Ok(Async::Ready(val)),
|
||||
Err(err) => Err(BlockingError::Error(err)),
|
||||
}
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let rx = unsafe{ self.map_unchecked_mut(|s|&mut s.rx)};
|
||||
let res = futures::ready!(rx.poll(cx));
|
||||
|
||||
Poll::Ready(res.map_err(|_| Cancelled))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue