diff --git a/.gitignore b/.gitignore index 42d0755d..a6909f1f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ guide/build/ # These are backup files generated by rustfmt **/*.rs.bk + +.idea diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 376e4b31..fefdf718 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -19,7 +19,8 @@ path = "src/lib.rs" [dependencies] bytes = "0.4.12" -futures = "0.1.24" -tokio-io = "0.1.12" -tokio-codec = "0.1.1" +pin-utils = "0.1.0-alpha.4" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +tokio-io = "0.2.0-alpha.4" +tokio-codec = "0.2.0-alpha.4" log = "0.4" \ No newline at end of file diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 7fbb4f27..458ac35a 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -4,12 +4,14 @@ use std::fmt; use std::io::{self, Read, Write}; use bytes::BytesMut; -use futures::{Poll, Sink, StartSend, Stream}; +use futures::{Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use std::pin::Pin; +use std::task::Context; const LW: usize = 1024; const HW: usize = 8 * 1024; @@ -221,41 +223,43 @@ impl Framed { } } + impl Stream for Framed where T: AsyncRead, U: Decoder, { - type Item = U::Item; - type Error = U::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } + + } -impl Sink for Framed +impl Sink for Framed where T: AsyncWrite, U: Encoder, U::Error: From, { - type SinkItem = U::Item; - type SinkError = U::Error; + type Error = U::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.get_mut().start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)} } } @@ -280,10 +284,16 @@ impl Read for Fuse { } } + impl AsyncRead for Fuse { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.0.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)} + } } impl Write for Fuse { @@ -296,12 +306,23 @@ impl Write for Fuse { } } + impl AsyncWrite for Fuse { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() + + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)} + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)} + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)} } } + impl Decoder for Fuse { type Item = U::Item; type Error = U::Error; diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs index b470d1bf..30e0cd93 100644 --- a/actix-codec/src/framed_read.rs +++ b/actix-codec/src/framed_read.rs @@ -1,12 +1,14 @@ use std::fmt; use bytes::BytesMut; -use futures::{try_ready, Async, Poll, Sink, StartSend, Stream}; +use futures::{Poll, Sink, Stream}; use log::trace; use tokio_codec::Decoder; use tokio_io::AsyncRead; use super::framed::Fuse; +use std::pin::Pin; +use std::task::Context; /// A `Stream` of messages decoded from an `AsyncRead`. pub struct FramedRead { @@ -83,35 +85,35 @@ where T: AsyncRead, D: Decoder, { - type Item = D::Item; - type Error = D::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } } -impl Sink for FramedRead +impl Sink for FramedRead where - T: Sink, + T: Sink, { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; + type Error = T::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.inner.0.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.poll_complete() + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)} } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)} + } + } impl fmt::Debug for FramedRead @@ -174,46 +176,66 @@ impl FramedRead2 { impl Stream for FramedRead2 where - T: AsyncRead + Decoder, + T: tokio_io::AsyncRead + Decoder, { - type Item = T::Item; - type Error = T::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if self.is_readable { - if self.eof { - let frame = self.inner.decode_eof(&mut self.buffer)?; - return Ok(Async::Ready(frame)); + + if this.is_readable { + if this.eof { + match this.inner.decode_eof(&mut this.buffer) { + Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), + Ok(None) => return Poll::Ready(None), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } trace!("attempting to decode a frame"); - if let Some(frame) = self.inner.decode(&mut self.buffer)? { - trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); + match this.inner.decode(&mut this.buffer) { + Ok(Some(frame)) => { + trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + Err(e) => { + return Poll::Ready(Some(Err(e))) + } + _ => { + // Need more data + } } - self.is_readable = false; + + this.is_readable = false; } - assert!(!self.eof); + assert!(!this.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - self.buffer.reserve(1); - if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { - trace!("read 0 bytes, mark stream as eof"); - self.eof = true; - } + this.buffer.reserve(1); + unsafe { - self.is_readable = true; + match Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut this.buffer) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Ok(0)) => { + this.eof = true; + } + Poll::Ready(Ok(_cnt)) => {} + } + } + this.is_readable = true; } } } diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs index 6b5c2b19..b2dfaf8e 100644 --- a/actix-codec/src/framed_write.rs +++ b/actix-codec/src/framed_write.rs @@ -2,12 +2,14 @@ use std::fmt; use std::io::{self, Read}; use bytes::BytesMut; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use futures::{ready,Poll, Sink, Stream}; use log::trace; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed::Fuse; +use std::task::Context; +use std::pin::Pin; /// A `Sink` of frames encoded to an `AsyncWrite`. pub struct FramedWrite { @@ -95,24 +97,27 @@ where } } -impl Sink for FramedWrite +impl Sink for FramedWrite where T: AsyncWrite, E: Encoder, { - type SinkItem = E::Item; - type SinkError = E::Error; + type Error = E::Error; - fn start_send(&mut self, item: E::Item) -> StartSend { - self.inner.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - Ok(self.inner.close()?) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx)} } } @@ -121,10 +126,9 @@ where T: Stream, { type Item = T::Item; - type Error = T::Error; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.0.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_next(cx)} } } @@ -220,13 +224,75 @@ where } } -impl Sink for FramedWrite2 +impl Sink for FramedWrite2 where T: AsyncWrite + Encoder, { - type SinkItem = T::Item; - type SinkError = T::Error; + type Error = T::Error; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let len = self.buffer.len(); + if len >= self.high_watermark { + return Poll::Pending; + } else { + return Poll::Ready(Ok(())); + } + } + + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + let this = unsafe { self.get_unchecked_mut() }; + // Check the buffer capacity + let len = this.buffer.len(); + if len < this.low_watermark { + this.buffer.reserve(this.high_watermark - len) + } + + this.inner.encode(item, &mut this.buffer)?; + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + trace!("flushing framed transport"); + + while !this.buffer.is_empty() { + trace!("writing; remaining={}", this.buffer.len()); + + let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?; + + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into())) + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = this.buffer.split_to(n); + } + + // Try flushing the underlying IO + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?; + + trace!("framed transport flushed"); + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; + ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?; + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) + } + + + + /* fn start_send(&mut self, item: T::Item) -> StartSend { // Check the buffer capacity let len = self.buffer.len(); @@ -275,6 +341,7 @@ where try_ready!(self.poll_complete()); Ok(self.inner.shutdown()?) } + */ } impl Decoder for FramedWrite2 { @@ -300,4 +367,8 @@ impl AsyncRead for FramedWrite2 { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)} + } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 77102a8c..06c66112 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -21,4 +21,5 @@ pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use tokio_codec::{Decoder, Encoder}; +// TODO: Migrate to futures asyncRead pub use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 8131ed19..9deeaaf6 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -19,9 +19,13 @@ path = "src/lib.rs" [dependencies] actix-threadpool = "0.1.1" -futures = "0.1.25" -tokio-current-thread = "0.1" -tokio-executor = "0.1.5" -tokio-reactor = "0.1.7" -tokio-timer = "0.2.8" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } + + +# TODO: Replace this with dependency on tokio-runtime once it is ready +tokio = { version = "0.2.0-alpha.4" } +tokio-timer = "=0.3.0-alpha.4" +tokio-executor = "=0.2.0-alpha.4" +tokio-net = "=0.2.0-alpha.4" + copyless = "0.1.4" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 083105b3..f85dcc96 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -3,11 +3,13 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; +use std::pin::Pin; +use std::task::Context; -use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::sync::oneshot::{channel, Canceled, Sender}; -use futures::{future, Async, Future, IntoFuture, Poll, Stream}; -use tokio_current_thread::spawn; +use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot::{channel, Canceled, Sender}; +use futures::{future, Future, Poll, FutureExt, Stream}; +use tokio::runtime::current_thread::spawn; use crate::builder::Builder; use crate::system::System; @@ -17,7 +19,7 @@ use copyless::BoxHelper; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>> = RefCell::new(Vec::new()); + static Q: RefCell>>> = RefCell::new(Vec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -25,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ArbiterCommand { Stop, - Execute(Box + Send>), + Execute(Box + Unpin + Send>), ExecuteFn(Box), } @@ -129,7 +131,9 @@ impl Arbiter { Q.with(|cell| { let mut v = cell.borrow_mut(); for fut in v.drain(..) { - spawn(fut); + // We pin the boxed future, so it can never again be moved. + let fut = unsafe { Pin::new_unchecked(fut) }; + tokio_executor::current_thread::spawn( fut); } }); } @@ -142,14 +146,19 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. pub fn spawn(future: F) - where - F: Future + 'static, + where + F: Future + 'static, { RUNNING.with(move |cell| { if cell.get() { - spawn(Box::alloc().init(future)); + // Spawn the future on running executor + spawn(future); } else { - Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); + // Box the future and push it to the queue, this results in double boxing + // because the executor boxes the future again, but works for now + Q.with(move |cell| { + cell.borrow_mut().push(Box::alloc().init(future)) + }); } }); } @@ -158,17 +167,17 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for executing futures on the current /// thread. pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: IntoFuture + 'static, + where + F: FnOnce() -> R + 'static, + R: Future + 'static, { - Arbiter::spawn(future::lazy(f)) + Arbiter::spawn(future::lazy(|_| f()).flatten()) } /// Send a future to the Arbiter's thread, and spawn it. pub fn send(&self, future: F) - where - F: Future + Send + 'static, + where + F: Future + Send + Unpin + 'static, { let _ = self .0 @@ -178,8 +187,8 @@ impl Arbiter { /// Send a function to the Arbiter's thread, and execute it. Any result from the function /// is discarded. pub fn exec_fn(&self, f: F) - where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { let _ = self .0 @@ -191,10 +200,10 @@ impl Arbiter { /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + pub fn exec(&self, f: F) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let (tx, rx) = channel(); let _ = self @@ -221,8 +230,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, + where + F: FnMut(&T) -> R, { STORAGE.with(move |cell| { let st = cell.borrow(); @@ -238,8 +247,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, + where + F: FnMut(&mut T) -> R, { STORAGE.with(move |cell| { let mut st = cell.borrow_mut(); @@ -271,28 +280,33 @@ impl Drop for ArbiterController { } impl Future for ArbiterController { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.rx.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(item))) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Ok(Async::Ready(())); - } - ArbiterCommand::Execute(fut) => { - spawn(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } + match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.rx) }.poll_next(cx) { + Poll::Ready(None) => { + return Poll::Ready(()) + }, + Poll::Ready(Some(item)) => { + match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + return Poll::Ready(()); + } + ArbiterCommand::Execute(fut) => { + spawn(fut); + } + ArbiterCommand::ExecuteFn(f) => { + f.call_box(); + } + } + } + Poll::Pending => { + return Poll::Pending }, - Ok(Async::NotReady) => return Ok(Async::NotReady), } } } @@ -323,14 +337,13 @@ impl SystemArbiter { } impl Future for SystemArbiter { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.commands.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(cmd))) => match cmd { + match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.commands) }.poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { // stop arbiters for arb in self.arbiters.values() { @@ -348,7 +361,7 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } } } @@ -359,8 +372,8 @@ pub trait FnExec: Send + 'static { } impl FnExec for F -where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { #[allow(clippy::boxed_local)] fn call_box(self: Box) { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 71509f14..1aa95045 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,19 +1,19 @@ use std::borrow::Cow; use std::io; -use futures::future; +use futures::{future, FutureExt}; use futures::future::{lazy, Future}; -use futures::sync::mpsc::unbounded; -use futures::sync::oneshot::{channel, Receiver}; +use futures::channel::mpsc::unbounded; +use futures::channel::oneshot::{channel, Receiver}; -use tokio_current_thread::{CurrentThread, Handle}; -use tokio_reactor::Reactor; -use tokio_timer::clock::Clock; -use tokio_timer::timer::Timer; +use tokio::runtime::current_thread::Handle; +use tokio_timer::{timer::Timer, clock::Clock}; +use tokio_net::driver::Reactor; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; use crate::system::System; +use tokio_executor::current_thread::CurrentThread; /// Builder struct for a actix runtime. /// @@ -118,7 +118,7 @@ impl Builder { rt.spawn(arb); // init system arbiter and run configuration method - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(lazy(move |_| { f(); Ok::<_, ()>(()) })); @@ -159,30 +159,30 @@ pub(crate) struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future + Send { + pub(crate) fn run_nonblocking(self) -> impl Future> + Send { let AsyncSystemRunner { stop, .. } = self; // run loop - future::lazy(|| { + future::lazy(|_| { Arbiter::run_system(); - stop.then(|res| match res { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) + async { + let res = match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }) - .then(|result| { + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }; Arbiter::stop_system(); - result - }) - }) + return res; + } + }).flatten() } } @@ -202,10 +202,10 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(async { Arbiter::run_system(); Ok::<_, ()>(()) - })); + }); let result = match rt.block_on(stop) { Ok(code) => { if code != 0 { @@ -224,19 +224,19 @@ impl SystemRunner { } /// Execute a future and wait for result. - pub fn block_on(&mut self, fut: F) -> Result + pub fn block_on(&mut self, fut: F) -> O where - F: Future, + F: Future, { - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::run_system(); - Ok::<_, ()>(()) - })); + }); + let res = self.rt.block_on(fut); - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::stop_system(); - Ok::<_, ()>(()) - })); + }); + res } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 9b16b959..97c56102 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -20,7 +20,7 @@ pub use actix_threadpool as blocking; /// This function panics if actix system is not running. pub fn spawn(f: F) where - F: futures::Future + 'static, + F: futures::Future + 'static, { if !System::is_set() { panic!("System is not running"); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c6b2a9fc..a68dd858 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,11 +2,9 @@ use std::error::Error; use std::{fmt, io}; use futures::Future; -use tokio_current_thread::{self as current_thread, CurrentThread}; -use tokio_executor; -use tokio_reactor::{self, Reactor}; -use tokio_timer::clock::{self, Clock}; -use tokio_timer::timer::{self, Timer}; +use tokio_executor::current_thread::{self, CurrentThread}; +use tokio_timer::{timer::{self, Timer}, clock::Clock}; +use tokio_net::driver::{Reactor, Handle as ReactorHandle}; use crate::builder::Builder; @@ -18,7 +16,7 @@ use crate::builder::Builder; /// [mod]: index.html #[derive(Debug)] pub struct Runtime { - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -53,7 +51,7 @@ impl Runtime { } pub(super) fn new2( - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -97,7 +95,7 @@ impl Runtime { /// is currently at capacity and is unable to spawn a new future. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { self.executor.spawn(future); self @@ -119,14 +117,14 @@ impl Runtime { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> Result + pub fn block_on(&mut self, f: F) -> F::Output where F: Future, { self.enter(|executor| { // Run the provided future let ret = executor.block_on(f); - ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + ret }) } @@ -139,7 +137,7 @@ impl Runtime { fn enter(&mut self, f: F) -> R where - F: FnOnce(&mut current_thread::Entered>) -> R, + F: FnOnce(&mut CurrentThread>) -> R, { let Runtime { ref reactor_handle, @@ -149,26 +147,13 @@ impl Runtime { .. } = *self; - // Binds an executor to this thread - let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered + // automatically inside its `block_on` and `run` methods + let _reactor_guard = tokio_net::driver::set_default(reactor_handle); + let _timer_guard = tokio_timer::set_default(timer_handle); - // This will set the default handle and timer to use inside the closure - // and run the future. - tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { - clock::with_default(clock, enter, |enter| { - timer::with_default(&timer_handle, enter, |enter| { - // The TaskExecutor is a fake executor that looks into the - // current single-threaded executor when used. This is a trick, - // because we need two mutable references to the executor (one - // to run the provided future, another to install as the default - // one). We use the fake one here as the default one. - let mut default_executor = current_thread::TaskExecutor::current(); - tokio_executor::with_default(&mut default_executor, enter, |enter| { - let mut executor = executor.enter(enter); - f(&mut executor) - }) - }) - }) + tokio_timer::clock::with_default(clock , || { + f(executor) }) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 745de00f..c2eb9f37 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,9 +2,9 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use futures::Future; -use tokio_current_thread::Handle; +use tokio::runtime::current_thread::Handle; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; @@ -64,7 +64,7 @@ impl System { pub fn run_in_executor>( name: T, executor: Handle, - ) -> impl Future + Send { + ) -> impl Future> + Send { Self::builder() .name(name) .build_async(executor) diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 5e9d855d..643c380b 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] derive_more = "0.15" -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } parking_lot = "0.9" lazy_static = "1.2" log = "0.4" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index b9952e62..e78438f5 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,12 +1,13 @@ //! Thread pool for blocking operations -use std::fmt; +use std::future::Future; +use std::task::{Poll,Context}; use derive_more::Display; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; +use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; +use std::pin::Pin; /// Env variable for default cpu pool size const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; @@ -41,20 +42,15 @@ thread_local! { /// Blocking operation execution error #[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} +#[display(fmt = "Thread pool is gone")] +pub struct Cancelled; /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub fn run(f: F) -> CpuFuture +pub fn run(f: F) -> CpuFuture where - F: FnOnce() -> Result + Send + 'static, + F: FnOnce() -> I + Send + 'static, I: Send + 'static, - E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); POOL.with(|pool| { @@ -70,19 +66,18 @@ where /// Blocking operation completion future. It resolves with results /// of blocking function execution. -pub struct CpuFuture { - rx: oneshot::Receiver>, +pub struct CpuFuture { + rx: oneshot::Receiver, } -impl Future for CpuFuture { - type Item = I; - type Error = BlockingError; +impl Future for CpuFuture { + type Output = Result; - fn poll(&mut self) -> Poll { - let res = futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled)); - match res { - Ok(val) => Ok(Async::Ready(val)), - Err(err) => Err(BlockingError::Error(err)), - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let rx = unsafe{ self.map_unchecked_mut(|s|&mut s.rx)}; + let res = futures::ready!(rx.poll(cx)); + + Poll::Ready(res.map_err(|_| Cancelled)) } + }