From a844941b9f7d8852db953b0dd82cd4ff638aa680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Hornick=C3=BD?= Date: Fri, 13 Sep 2019 15:45:52 +0200 Subject: [PATCH 01/17] Migrate actix-codec, actix-rt, and actix-threadpool to std::future --- .gitignore | 2 + Cargo.toml | 13 +++- actix-codec/Cargo.toml | 7 +- actix-codec/src/framed.rs | 59 ++++++++++----- actix-codec/src/framed_read.rs | 95 ++++++++++++++---------- actix-codec/src/framed_write.rs | 103 +++++++++++++++++++++----- actix-codec/src/lib.rs | 1 + actix-rt/Cargo.toml | 14 ++-- actix-rt/src/arbiter.rs | 123 ++++++++++++++++++-------------- actix-rt/src/builder.rs | 72 +++++++++---------- actix-rt/src/lib.rs | 2 +- actix-rt/src/runtime.rs | 45 ++++-------- actix-rt/src/system.rs | 6 +- actix-threadpool/Cargo.toml | 2 +- actix-threadpool/src/lib.rs | 41 +++++------ 15 files changed, 355 insertions(+), 230 deletions(-) diff --git a/.gitignore b/.gitignore index 42d0755d..a6909f1f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ guide/build/ # These are backup files generated by rustfmt **/*.rs.bk + +.idea diff --git a/Cargo.toml b/Cargo.toml index cc9e4596..a6ca7caf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,13 +29,22 @@ members = [ "router", ] + [dev-dependencies] -actix-service = "0.4.0" +actix-service = "0.4.2" actix-codec = "0.1.1" actix-rt = "0.2.0" actix-server = { version="0.5.0", features=["ssl"] } env_logger = "0.6" -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } openssl = "0.10" + tokio-tcp = "0.1" tokio-openssl = "0.3" + +[patch.crates-io] +actix-codec = { path = "./actix-codec" } +actix-connect = { path = "./actix-connect" } +actix-rt = { path = "./actix-rt" } +actix-service = { path = "./actix-service" } +actix-threadpool = { path = "./actix-threadpool" } \ No newline at end of file diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 376e4b31..fefdf718 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -19,7 +19,8 @@ path = "src/lib.rs" [dependencies] bytes = "0.4.12" -futures = "0.1.24" -tokio-io = "0.1.12" -tokio-codec = "0.1.1" +pin-utils = "0.1.0-alpha.4" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +tokio-io = "0.2.0-alpha.4" +tokio-codec = "0.2.0-alpha.4" log = "0.4" \ No newline at end of file diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 7fbb4f27..458ac35a 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -4,12 +4,14 @@ use std::fmt; use std::io::{self, Read, Write}; use bytes::BytesMut; -use futures::{Poll, Sink, StartSend, Stream}; +use futures::{Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use std::pin::Pin; +use std::task::Context; const LW: usize = 1024; const HW: usize = 8 * 1024; @@ -221,41 +223,43 @@ impl Framed { } } + impl Stream for Framed where T: AsyncRead, U: Decoder, { - type Item = U::Item; - type Error = U::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } + + } -impl Sink for Framed +impl Sink for Framed where T: AsyncWrite, U: Encoder, U::Error: From, { - type SinkItem = U::Item; - type SinkError = U::Error; + type Error = U::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.get_mut().start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)} } } @@ -280,10 +284,16 @@ impl Read for Fuse { } } + impl AsyncRead for Fuse { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.0.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)} + } } impl Write for Fuse { @@ -296,12 +306,23 @@ impl Write for Fuse { } } + impl AsyncWrite for Fuse { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() + + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)} + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)} + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)} } } + impl Decoder for Fuse { type Item = U::Item; type Error = U::Error; diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs index f54b2c23..30e0cd93 100644 --- a/actix-codec/src/framed_read.rs +++ b/actix-codec/src/framed_read.rs @@ -1,12 +1,14 @@ use std::fmt; use bytes::BytesMut; -use futures::{try_ready, Async, Poll, Sink, StartSend, Stream}; +use futures::{Poll, Sink, Stream}; use log::trace; use tokio_codec::Decoder; use tokio_io::AsyncRead; use super::framed::Fuse; +use std::pin::Pin; +use std::task::Context; /// A `Stream` of messages decoded from an `AsyncRead`. pub struct FramedRead { @@ -83,35 +85,35 @@ where T: AsyncRead, D: Decoder, { - type Item = D::Item; - type Error = D::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } } -impl Sink for FramedRead +impl Sink for FramedRead where - T: Sink, + T: Sink, { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; + type Error = T::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.inner.0.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.poll_complete() + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)} } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)} + } + } impl fmt::Debug for FramedRead @@ -174,45 +176,66 @@ impl FramedRead2 { impl Stream for FramedRead2 where - T: AsyncRead + Decoder, + T: tokio_io::AsyncRead + Decoder, { - type Item = T::Item; - type Error = T::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if self.is_readable { - if self.eof { - let frame = self.inner.decode_eof(&mut self.buffer)?; - return Ok(Async::Ready(frame)); + + if this.is_readable { + if this.eof { + match this.inner.decode_eof(&mut this.buffer) { + Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), + Ok(None) => return Poll::Ready(None), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } trace!("attempting to decode a frame"); - if let Some(frame) = self.inner.decode(&mut self.buffer)? { - trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); + match this.inner.decode(&mut this.buffer) { + Ok(Some(frame)) => { + trace!("frame decoded from buffer"); + return Poll::Ready(Some(Ok(frame))); + } + Err(e) => { + return Poll::Ready(Some(Err(e))) + } + _ => { + // Need more data + } } - self.is_readable = false; + + this.is_readable = false; } - assert!(!self.eof); + assert!(!this.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - self.buffer.reserve(1); - if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { - self.eof = true; - } + this.buffer.reserve(1); + unsafe { - self.is_readable = true; + match Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut this.buffer) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Ok(0)) => { + this.eof = true; + } + Poll::Ready(Ok(_cnt)) => {} + } + } + this.is_readable = true; } } } diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs index 6b5c2b19..b2dfaf8e 100644 --- a/actix-codec/src/framed_write.rs +++ b/actix-codec/src/framed_write.rs @@ -2,12 +2,14 @@ use std::fmt; use std::io::{self, Read}; use bytes::BytesMut; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use futures::{ready,Poll, Sink, Stream}; use log::trace; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed::Fuse; +use std::task::Context; +use std::pin::Pin; /// A `Sink` of frames encoded to an `AsyncWrite`. pub struct FramedWrite { @@ -95,24 +97,27 @@ where } } -impl Sink for FramedWrite +impl Sink for FramedWrite where T: AsyncWrite, E: Encoder, { - type SinkItem = E::Item; - type SinkError = E::Error; + type Error = E::Error; - fn start_send(&mut self, item: E::Item) -> StartSend { - self.inner.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - Ok(self.inner.close()?) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx)} } } @@ -121,10 +126,9 @@ where T: Stream, { type Item = T::Item; - type Error = T::Error; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.0.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_next(cx)} } } @@ -220,13 +224,75 @@ where } } -impl Sink for FramedWrite2 +impl Sink for FramedWrite2 where T: AsyncWrite + Encoder, { - type SinkItem = T::Item; - type SinkError = T::Error; + type Error = T::Error; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let len = self.buffer.len(); + if len >= self.high_watermark { + return Poll::Pending; + } else { + return Poll::Ready(Ok(())); + } + } + + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + let this = unsafe { self.get_unchecked_mut() }; + // Check the buffer capacity + let len = this.buffer.len(); + if len < this.low_watermark { + this.buffer.reserve(this.high_watermark - len) + } + + this.inner.encode(item, &mut this.buffer)?; + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + trace!("flushing framed transport"); + + while !this.buffer.is_empty() { + trace!("writing; remaining={}", this.buffer.len()); + + let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?; + + if n == 0 { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into())) + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = this.buffer.split_to(n); + } + + // Try flushing the underlying IO + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?; + + trace!("framed transport flushed"); + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; + ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?; + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) + } + + + + /* fn start_send(&mut self, item: T::Item) -> StartSend { // Check the buffer capacity let len = self.buffer.len(); @@ -275,6 +341,7 @@ where try_ready!(self.poll_complete()); Ok(self.inner.shutdown()?) } + */ } impl Decoder for FramedWrite2 { @@ -300,4 +367,8 @@ impl AsyncRead for FramedWrite2 { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)} + } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 77102a8c..06c66112 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -21,4 +21,5 @@ pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use tokio_codec::{Decoder, Encoder}; +// TODO: Migrate to futures asyncRead pub use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 8131ed19..9deeaaf6 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -19,9 +19,13 @@ path = "src/lib.rs" [dependencies] actix-threadpool = "0.1.1" -futures = "0.1.25" -tokio-current-thread = "0.1" -tokio-executor = "0.1.5" -tokio-reactor = "0.1.7" -tokio-timer = "0.2.8" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } + + +# TODO: Replace this with dependency on tokio-runtime once it is ready +tokio = { version = "0.2.0-alpha.4" } +tokio-timer = "=0.3.0-alpha.4" +tokio-executor = "=0.2.0-alpha.4" +tokio-net = "=0.2.0-alpha.4" + copyless = "0.1.4" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 285ad3c9..8b1946e0 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -3,11 +3,13 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; +use std::pin::Pin; +use std::task::Context; -use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::sync::oneshot::{channel, Canceled, Sender}; -use futures::{future, Async, Future, IntoFuture, Poll, Stream}; -use tokio_current_thread::spawn; +use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot::{channel, Canceled, Sender}; +use futures::{future, Future, Poll, FutureExt, Stream}; +use tokio::runtime::current_thread::spawn; use crate::builder::Builder; use crate::system::System; @@ -17,7 +19,7 @@ use copyless::BoxHelper; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>> = RefCell::new(Vec::new()); + static Q: RefCell>>> = RefCell::new(Vec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -25,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ArbiterCommand { Stop, - Execute(Box + Send>), + Execute(Box + Unpin + Send>), ExecuteFn(Box), } @@ -129,7 +131,9 @@ impl Arbiter { Q.with(|cell| { let mut v = cell.borrow_mut(); for fut in v.drain(..) { - spawn(fut); + // We pin the boxed future, so it can never again be moved. + let fut = unsafe { Pin::new_unchecked(fut) }; + tokio_executor::current_thread::spawn( fut); } }); } @@ -142,14 +146,19 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. pub fn spawn(future: F) - where - F: Future + 'static, + where + F: Future + 'static, { RUNNING.with(move |cell| { if cell.get() { - spawn(Box::alloc().init(future)); + // Spawn the future on running executor + spawn(future); } else { - Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); + // Box the future and push it to the queue, this results in double boxing + // because the executor boxes the future again, but works for now + Q.with(move |cell| { + cell.borrow_mut().push(Box::alloc().init(future)) + }); } }); } @@ -158,17 +167,17 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for executing futures on the current /// thread. pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: IntoFuture + 'static, + where + F: FnOnce() -> R + 'static, + R: Future + 'static, { - Arbiter::spawn(future::lazy(f)) + Arbiter::spawn(future::lazy(|_| f()).flatten()) } /// Send a future to the Arbiter's thread, and spawn it. pub fn send(&self, future: F) - where - F: Future + Send + 'static, + where + F: Future + Send + Unpin + 'static, { let _ = self .0 @@ -178,8 +187,8 @@ impl Arbiter { /// Send a function to the Arbiter's thread, and execute it. Any result from the function /// is discarded. pub fn exec_fn(&self, f: F) - where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { let _ = self .0 @@ -191,10 +200,10 @@ impl Arbiter { /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + pub fn exec(&self, f: F) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let (tx, rx) = channel(); let _ = self @@ -221,8 +230,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, + where + F: FnMut(&T) -> R, { STORAGE.with(move |cell| { let st = cell.borrow(); @@ -238,8 +247,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, + where + F: FnMut(&mut T) -> R, { STORAGE.with(move |cell| { let mut st = cell.borrow_mut(); @@ -269,28 +278,33 @@ impl Drop for ArbiterController { } impl Future for ArbiterController { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.rx.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(item))) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Ok(Async::Ready(())); - } - ArbiterCommand::Execute(fut) => { - spawn(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } + match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.rx) }.poll_next(cx) { + Poll::Ready(None) => { + return Poll::Ready(()) + }, + Poll::Ready(Some(item)) => { + match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + return Poll::Ready(()); + } + ArbiterCommand::Execute(fut) => { + spawn(fut); + } + ArbiterCommand::ExecuteFn(f) => { + f.call_box(); + } + } + } + Poll::Pending => { + return Poll::Pending }, - Ok(Async::NotReady) => return Ok(Async::NotReady), } } } @@ -321,14 +335,13 @@ impl SystemArbiter { } impl Future for SystemArbiter { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.commands.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(cmd))) => match cmd { + match unsafe { self.as_mut().map_unchecked_mut(|p| &mut p.commands) }.poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { // stop arbiters for arb in self.arbiters.values() { @@ -346,7 +359,7 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } } } @@ -357,8 +370,8 @@ pub trait FnExec: Send + 'static { } impl FnExec for F -where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { #[allow(clippy::boxed_local)] fn call_box(self: Box) { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 71509f14..1aa95045 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,19 +1,19 @@ use std::borrow::Cow; use std::io; -use futures::future; +use futures::{future, FutureExt}; use futures::future::{lazy, Future}; -use futures::sync::mpsc::unbounded; -use futures::sync::oneshot::{channel, Receiver}; +use futures::channel::mpsc::unbounded; +use futures::channel::oneshot::{channel, Receiver}; -use tokio_current_thread::{CurrentThread, Handle}; -use tokio_reactor::Reactor; -use tokio_timer::clock::Clock; -use tokio_timer::timer::Timer; +use tokio::runtime::current_thread::Handle; +use tokio_timer::{timer::Timer, clock::Clock}; +use tokio_net::driver::Reactor; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; use crate::system::System; +use tokio_executor::current_thread::CurrentThread; /// Builder struct for a actix runtime. /// @@ -118,7 +118,7 @@ impl Builder { rt.spawn(arb); // init system arbiter and run configuration method - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(lazy(move |_| { f(); Ok::<_, ()>(()) })); @@ -159,30 +159,30 @@ pub(crate) struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future + Send { + pub(crate) fn run_nonblocking(self) -> impl Future> + Send { let AsyncSystemRunner { stop, .. } = self; // run loop - future::lazy(|| { + future::lazy(|_| { Arbiter::run_system(); - stop.then(|res| match res { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) + async { + let res = match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }) - .then(|result| { + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }; Arbiter::stop_system(); - result - }) - }) + return res; + } + }).flatten() } } @@ -202,10 +202,10 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(async { Arbiter::run_system(); Ok::<_, ()>(()) - })); + }); let result = match rt.block_on(stop) { Ok(code) => { if code != 0 { @@ -224,19 +224,19 @@ impl SystemRunner { } /// Execute a future and wait for result. - pub fn block_on(&mut self, fut: F) -> Result + pub fn block_on(&mut self, fut: F) -> O where - F: Future, + F: Future, { - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::run_system(); - Ok::<_, ()>(()) - })); + }); + let res = self.rt.block_on(fut); - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::stop_system(); - Ok::<_, ()>(()) - })); + }); + res } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 9b16b959..97c56102 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -20,7 +20,7 @@ pub use actix_threadpool as blocking; /// This function panics if actix system is not running. pub fn spawn(f: F) where - F: futures::Future + 'static, + F: futures::Future + 'static, { if !System::is_set() { panic!("System is not running"); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c6b2a9fc..a68dd858 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,11 +2,9 @@ use std::error::Error; use std::{fmt, io}; use futures::Future; -use tokio_current_thread::{self as current_thread, CurrentThread}; -use tokio_executor; -use tokio_reactor::{self, Reactor}; -use tokio_timer::clock::{self, Clock}; -use tokio_timer::timer::{self, Timer}; +use tokio_executor::current_thread::{self, CurrentThread}; +use tokio_timer::{timer::{self, Timer}, clock::Clock}; +use tokio_net::driver::{Reactor, Handle as ReactorHandle}; use crate::builder::Builder; @@ -18,7 +16,7 @@ use crate::builder::Builder; /// [mod]: index.html #[derive(Debug)] pub struct Runtime { - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -53,7 +51,7 @@ impl Runtime { } pub(super) fn new2( - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -97,7 +95,7 @@ impl Runtime { /// is currently at capacity and is unable to spawn a new future. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { self.executor.spawn(future); self @@ -119,14 +117,14 @@ impl Runtime { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> Result + pub fn block_on(&mut self, f: F) -> F::Output where F: Future, { self.enter(|executor| { // Run the provided future let ret = executor.block_on(f); - ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + ret }) } @@ -139,7 +137,7 @@ impl Runtime { fn enter(&mut self, f: F) -> R where - F: FnOnce(&mut current_thread::Entered>) -> R, + F: FnOnce(&mut CurrentThread>) -> R, { let Runtime { ref reactor_handle, @@ -149,26 +147,13 @@ impl Runtime { .. } = *self; - // Binds an executor to this thread - let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered + // automatically inside its `block_on` and `run` methods + let _reactor_guard = tokio_net::driver::set_default(reactor_handle); + let _timer_guard = tokio_timer::set_default(timer_handle); - // This will set the default handle and timer to use inside the closure - // and run the future. - tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { - clock::with_default(clock, enter, |enter| { - timer::with_default(&timer_handle, enter, |enter| { - // The TaskExecutor is a fake executor that looks into the - // current single-threaded executor when used. This is a trick, - // because we need two mutable references to the executor (one - // to run the provided future, another to install as the default - // one). We use the fake one here as the default one. - let mut default_executor = current_thread::TaskExecutor::current(); - tokio_executor::with_default(&mut default_executor, enter, |enter| { - let mut executor = executor.enter(enter); - f(&mut executor) - }) - }) - }) + tokio_timer::clock::with_default(clock , || { + f(executor) }) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 745de00f..c2eb9f37 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,9 +2,9 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use futures::Future; -use tokio_current_thread::Handle; +use tokio::runtime::current_thread::Handle; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; @@ -64,7 +64,7 @@ impl System { pub fn run_in_executor>( name: T, executor: Handle, - ) -> impl Future + Send { + ) -> impl Future> + Send { Self::builder() .name(name) .build_async(executor) diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 5e9d855d..643c380b 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] derive_more = "0.15" -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } parking_lot = "0.9" lazy_static = "1.2" log = "0.4" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index b9952e62..e78438f5 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,12 +1,13 @@ //! Thread pool for blocking operations -use std::fmt; +use std::future::Future; +use std::task::{Poll,Context}; use derive_more::Display; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; +use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; +use std::pin::Pin; /// Env variable for default cpu pool size const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; @@ -41,20 +42,15 @@ thread_local! { /// Blocking operation execution error #[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} +#[display(fmt = "Thread pool is gone")] +pub struct Cancelled; /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub fn run(f: F) -> CpuFuture +pub fn run(f: F) -> CpuFuture where - F: FnOnce() -> Result + Send + 'static, + F: FnOnce() -> I + Send + 'static, I: Send + 'static, - E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); POOL.with(|pool| { @@ -70,19 +66,18 @@ where /// Blocking operation completion future. It resolves with results /// of blocking function execution. -pub struct CpuFuture { - rx: oneshot::Receiver>, +pub struct CpuFuture { + rx: oneshot::Receiver, } -impl Future for CpuFuture { - type Item = I; - type Error = BlockingError; +impl Future for CpuFuture { + type Output = Result; - fn poll(&mut self) -> Poll { - let res = futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled)); - match res { - Ok(val) => Ok(Async::Ready(val)), - Err(err) => Err(BlockingError::Error(err)), - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let rx = unsafe{ self.map_unchecked_mut(|s|&mut s.rx)}; + let res = futures::ready!(rx.poll(cx)); + + Poll::Ready(res.map_err(|_| Cancelled)) } + } From 8f05986a9f3cb5013b1cfea8d68b11c4bf1dfd33 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Thu, 3 Oct 2019 14:55:44 +0900 Subject: [PATCH 02/17] Use `map()` instead of `and_then()` (#51) --- router/src/url.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/src/url.rs b/router/src/url.rs index 79fe2b63..a45e2cd8 100644 --- a/router/src/url.rs +++ b/router/src/url.rs @@ -195,7 +195,7 @@ fn from_hex(v: u8) -> Option { #[inline] fn restore_ch(d1: u8, d2: u8) -> Option { - from_hex(d1).and_then(|d1| from_hex(d2).and_then(move |d2| Some(d1 << 4 | d2))) + from_hex(d1).and_then(|d1| from_hex(d2).map(move |d2| d1 << 4 | d2)) } #[cfg(test)] From e733c562d95cc5dca7637bb9bf64925fb7d41d65 Mon Sep 17 00:00:00 2001 From: Jerome Gravel-Niquet Date: Thu, 3 Oct 2019 14:32:32 -0400 Subject: [PATCH 03/17] Update rustls, tokio-rustls and webpki across the board (#42) * Update rustls, tokio-rustls and webpki across the board * bump minimum rust version to 1.37 * updated readme and changelogs to reflect changes and minimum required rust version --- .travis.yml | 2 +- README.md | 2 +- actix-connect/CHANGES.md | 7 +++++++ actix-connect/Cargo.toml | 6 +++--- actix-connect/src/ssl/rustls.rs | 13 +++++-------- actix-server-config/Cargo.toml | 4 ++-- actix-server-config/changes.md | 7 +++++++ actix-server-config/src/lib.rs | 2 +- actix-server/CHANGES.md | 7 +++++++ actix-server/Cargo.toml | 8 ++++---- actix-server/src/ssl/rustls.rs | 10 +++++----- 11 files changed, 43 insertions(+), 25 deletions(-) diff --git a/.travis.yml b/.travis.yml index 21c3ebc4..6eaf6d1e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ matrix: include: - rust: stable - rust: beta - - rust: 1.36.0 + - rust: 1.37.0 - rust: nightly-2019-06-15 allow_failures: - rust: nightly-2019-06-15 diff --git a/README.md b/README.md index 99ecd5fc..5c100657 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Actix net - framework for composable network services * [API Documentation (Development)](https://actix.rs/actix-net/actix_net/) * [Chat on gitter](https://gitter.im/actix/actix) * Cargo package: [actix-net](https://crates.io/crates/actix-net) -* Minimum supported Rust version: 1.36 or later +* Minimum supported Rust version: 1.37 or later ## Example diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index 95ac002e..00dfe4d6 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## Next + +### Changed + +* Update `rustls` to 0.16 +* Minimum required Rust version upped to 1.37.0 + ## [0.2.5] - 2019-09-05 * Add `TcpConnectService` diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index d7d0c276..4a2a0146 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -51,9 +51,9 @@ openssl = { version="0.10", optional = true } tokio-openssl = { version="0.3", optional = true } #rustls -rustls = { version = "0.15.2", optional = true } -tokio-rustls = { version = "0.9.1", optional = true } -webpki = { version = "0.19", optional = true } +rustls = { version = "0.16.0", optional = true } +tokio-rustls = { version = "0.10.0", optional = true } +webpki = { version = "0.21", optional = true } [dev-dependencies] bytes = "0.4" diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index 9a518507..7d6f5720 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -5,10 +5,7 @@ use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{NewService, Service}; use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use std::sync::Arc; -use tokio_rustls::{ - rustls::{ClientConfig, ClientSession}, - Connect, TlsConnector, TlsStream, -}; +use tokio_rustls::{client::TlsStream, rustls::ClientConfig, Connect, TlsConnector}; use webpki::DNSNameRef; use crate::{Address, Connection}; @@ -37,7 +34,7 @@ where connector: Arc, ) -> impl Service< Request = Connection, - Response = Connection>, + Response = Connection>, Error = std::io::Error, > { RustlsConnectorService { @@ -61,7 +58,7 @@ where U: AsyncRead + AsyncWrite + fmt::Debug, { type Request = Connection; - type Response = Connection>; + type Response = Connection>; type Error = std::io::Error; type Config = (); type Service = RustlsConnectorService; @@ -86,7 +83,7 @@ where U: AsyncRead + AsyncWrite + fmt::Debug, { type Request = Connection; - type Response = Connection>; + type Response = Connection>; type Error = std::io::Error; type Future = ConnectAsyncExt; @@ -114,7 +111,7 @@ impl Future for ConnectAsyncExt where U: AsyncRead + AsyncWrite + fmt::Debug, { - type Item = Connection>; + type Item = Connection>; type Error = std::io::Error; fn poll(&mut self) -> Poll { diff --git a/actix-server-config/Cargo.toml b/actix-server-config/Cargo.toml index 08af1acf..3e892742 100644 --- a/actix-server-config/Cargo.toml +++ b/actix-server-config/Cargo.toml @@ -33,6 +33,6 @@ futures = "0.1.25" tokio-io = "0.1.12" tokio-tcp = "0.1" tokio-openssl = { version="0.3.0", optional = true } -rustls = { version = "0.15.2", optional = true } -tokio-rustls = { version = "0.9.1", optional = true } +rustls = { version = "0.16.0", optional = true } +tokio-rustls = { version = "0.10.0", optional = true } tokio-uds = { version="0.2.5", optional = true } diff --git a/actix-server-config/changes.md b/actix-server-config/changes.md index 7e8ebfbd..6f514080 100644 --- a/actix-server-config/changes.md +++ b/actix-server-config/changes.md @@ -1,5 +1,12 @@ # Changes +## Next + +### Changed + +* Update `rustls` to 0.16 +* Minimum required Rust version upped to 1.37.0 + ## [0.1.2] - 2019-07-18 ### Added diff --git a/actix-server-config/src/lib.rs b/actix-server-config/src/lib.rs index 85f72bdb..fc5dac04 100644 --- a/actix-server-config/src/lib.rs +++ b/actix-server-config/src/lib.rs @@ -195,7 +195,7 @@ impl IoStream for tokio_openssl::SslStream { } #[cfg(any(feature = "rust-tls"))] -impl IoStream for tokio_rustls::TlsStream { +impl IoStream for tokio_rustls::server::TlsStream { #[inline] fn peer_addr(&self) -> Option { self.get_ref().0.peer_addr() diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 6ab3bcc9..c317b56d 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## Next + +### Changed + +* Update `rustls` to 0.16 +* Minimum required Rust version upped to 1.37.0 + ## [0.6.1] - 2019-09-25 ### Added diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 2bda582a..30183a08 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -65,10 +65,10 @@ openssl = { version="0.10", optional = true } tokio-openssl = { version="0.3", optional = true } # rustls -rustls = { version = "0.15.2", optional = true } -tokio-rustls = { version = "0.9.1", optional = true } -webpki = { version = "0.19", optional = true } -webpki-roots = { version = "0.16", optional = true } +rustls = { version = "0.16.0", optional = true } +tokio-rustls = { version = "0.10.0", optional = true } +webpki = { version = "0.21", optional = true } +webpki-roots = { version = "0.17", optional = true } [dev-dependencies] bytes = "0.4" diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index 151bbbd6..06edc0f5 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use actix_service::{NewService, Service}; use futures::{future::ok, future::FutureResult, Async, Future, Poll}; -use rustls::{ServerConfig, ServerSession}; +use rustls::ServerConfig; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_rustls::{Accept, TlsAcceptor, TlsStream}; +use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; @@ -41,7 +41,7 @@ impl Clone for RustlsAcceptor { impl NewService for RustlsAcceptor { type Request = Io; - type Response = Io, P>; + type Response = Io, P>; type Error = io::Error; type Config = SrvConfig; @@ -70,7 +70,7 @@ pub struct RustlsAcceptorService { impl Service for RustlsAcceptorService { type Request = Io; - type Response = Io, P>; + type Response = Io, P>; type Error = io::Error; type Future = RustlsAcceptorServiceFut; @@ -102,7 +102,7 @@ where } impl Future for RustlsAcceptorServiceFut { - type Item = Io, P>; + type Item = Io, P>; type Error = io::Error; fn poll(&mut self) -> Poll { From fba2002702295b82d98405db36d2b628831ba35f Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Fri, 4 Oct 2019 06:21:59 +0900 Subject: [PATCH 04/17] Prepare actix-connect release (#53) --- actix-connect/CHANGES.md | 2 +- actix-connect/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index 00dfe4d6..baea3d30 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## Next +## [0.3.0] - 2019-10-03 ### Changed diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 4a2a0146..517a185b 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-connect" -version = "0.2.5" +version = "0.3.0" authors = ["Nikolay Kim "] description = "Actix Connector - tcp connector service" keywords = ["network", "framework", "async", "futures"] From 2667850d6002942abce895ef8302dfafd8e831c7 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Fri, 4 Oct 2019 09:13:33 +0900 Subject: [PATCH 05/17] Prepare actix-server-config release (#54) * Prepare actix-server-config release * Bump up actix-server-config to 0.2.0 --- actix-connect/Cargo.toml | 2 +- actix-ioframe/Cargo.toml | 2 +- actix-server-config/Cargo.toml | 2 +- actix-server-config/changes.md | 2 +- actix-server/Cargo.toml | 2 +- actix-test-server/Cargo.toml | 2 +- actix-testing/Cargo.toml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 517a185b..15e24cbc 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -58,4 +58,4 @@ webpki = { version = "0.21", optional = true } [dev-dependencies] bytes = "0.4" actix-testing = { version="0.1.0" } -actix-server-config = "0.1.0" +actix-server-config = "0.2.0" diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index b5838832..7eac1f9b 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -30,6 +30,6 @@ log = "0.4" actix-rt = "0.2.2" actix-connect = "0.2.0" actix-testing = "0.1.0" -actix-server-config = "0.1.1" +actix-server-config = "0.2.0" tokio-tcp = "0.1" tokio-timer = "0.2" diff --git a/actix-server-config/Cargo.toml b/actix-server-config/Cargo.toml index 3e892742..5c7053be 100644 --- a/actix-server-config/Cargo.toml +++ b/actix-server-config/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server-config" -version = "0.1.2" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix server config utils" homepage = "https://actix.rs" diff --git a/actix-server-config/changes.md b/actix-server-config/changes.md index 6f514080..9af452a5 100644 --- a/actix-server-config/changes.md +++ b/actix-server-config/changes.md @@ -1,6 +1,6 @@ # Changes -## Next +## [0.2.0] - 2019-10-03 ### Changed diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 30183a08..5420b140 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,7 +38,7 @@ uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] [dependencies] actix-rt = "0.2.2" actix-service = "0.4.1" -actix-server-config = "0.1.2" +actix-server-config = "0.2.0" log = "0.4" num_cpus = "1.0" diff --git a/actix-test-server/Cargo.toml b/actix-test-server/Cargo.toml index 8e063c38..2ce973a3 100644 --- a/actix-test-server/Cargo.toml +++ b/actix-test-server/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] actix-rt = "0.2.1" actix-server = "0.5.0" -actix-server-config = "0.1.0" +actix-server-config = "0.2.0" actix-testing = "0.1.0" log = "0.4" diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 3c101b13..10d42edc 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] actix-rt = "0.2.1" actix-server = "0.6.0" -actix-server-config = "0.1.0" +actix-server-config = "0.2.0" actix-service = "0.4.0" log = "0.4" From f6f9e1fcdb1fcefad69adc8c4cb3f71e5d80738f Mon Sep 17 00:00:00 2001 From: Sven-Hendrik Haase Date: Fri, 4 Oct 2019 07:30:13 +0200 Subject: [PATCH 06/17] Add an error message if we receive a non-hostname-based dest This is more helpful than an unwrap and at least points users at the right location. Upstream issue is https://github.com/briansmith/webpki/issues/54 --- actix-connect/src/ssl/rustls.rs | 3 ++- actix-connect/tests/test_connect.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index 7d6f5720..465ef1e4 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -94,7 +94,8 @@ where fn call(&mut self, stream: Connection) -> Self::Future { trace!("SSL Handshake start for: {:?}", stream.host()); let (io, stream) = stream.replace(()); - let host = DNSNameRef::try_from_ascii_str(stream.host()).unwrap(); + let host = DNSNameRef::try_from_ascii_str(stream.host()) + .expect("rustls currently only handles hostname-based connections. See https://github.com/briansmith/webpki/issues/54"); ConnectAsyncExt { fut: TlsConnector::from(self.connector.clone()).connect(host, io), stream: Some(stream), diff --git a/actix-connect/tests/test_connect.rs b/actix-connect/tests/test_connect.rs index a8fdb1b6..471dd314 100644 --- a/actix-connect/tests/test_connect.rs +++ b/actix-connect/tests/test_connect.rs @@ -42,6 +42,7 @@ fn test_rustls_string() { let con = test::call_service(&mut conn, addr.into()); assert_eq!(con.peer_addr().unwrap(), srv.addr()); } + #[test] fn test_static_str() { let srv = TestServer::with(|| { From e3155957a8db7ac8e2f429508da9459455a8e7ba Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Fri, 4 Oct 2019 17:36:23 +0900 Subject: [PATCH 07/17] Prepare actix-server release (#55) --- actix-server/CHANGES.md | 2 +- actix-server/Cargo.toml | 2 +- actix-testing/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index c317b56d..1b3aa478 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,6 @@ # Changes -## Next +## [0.7.0] - 2019-10-04 ### Changed diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 5420b140..3720b887 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "0.6.1" +version = "0.7.0" authors = ["Nikolay Kim "] description = "Actix server - General purpose tcp server" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 10d42edc..b11e6455 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-rt = "0.2.1" -actix-server = "0.6.0" +actix-server = "0.7.0" actix-server-config = "0.2.0" actix-service = "0.4.0" From fa72975f34d2f3b19860240a3b564afb02037ff7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 8 Oct 2019 14:46:22 +0600 Subject: [PATCH 08/17] extra trace logging --- actix-codec/src/framed_read.rs | 1 + actix-ioframe/src/dispatcher.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs index f54b2c23..b470d1bf 100644 --- a/actix-codec/src/framed_read.rs +++ b/actix-codec/src/framed_read.rs @@ -209,6 +209,7 @@ where // get a spurious 0 that looks like EOF self.buffer.reserve(1); if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { + trace!("read 0 bytes, mark stream as eof"); self.eof = true; } diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index 422a10d4..57713b10 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -147,6 +147,7 @@ where } Ok(Async::NotReady) => return false, Ok(Async::Ready(None)) => { + log::trace!("Client disconnected"); self.dispatch_state = FramedState::Stopping; return true; } From 9982a9498d5c36be672101e939d9576cd681aa52 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 8 Oct 2019 15:02:43 +0600 Subject: [PATCH 09/17] register current task in counters available method. --- actix-utils/CHANGES.md | 5 +++++ actix-utils/Cargo.toml | 2 +- actix-utils/src/counter.rs | 17 ++++++++++------- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 97131b76..d2db8811 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [0.4.6] - 2019-10-08 + +* Refactor `Counter` type. register current task in available method. + + ## [0.4.5] - 2019-07-19 ### Removed diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index f4f11ab6..b687d030 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.4.5" +version = "0.4.6" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 1302c91c..2f355094 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -25,11 +25,13 @@ impl Counter { })) } + /// Get counter guard. pub fn get(&self) -> CounterGuard { CounterGuard::new(self.0.clone()) } - /// Check if counter is not at capacity + /// Check if counter is not at capacity. If counter at capacity + /// it registers notification for current task. pub fn available(&self) -> bool { self.0.available() } @@ -57,11 +59,7 @@ impl Drop for CounterGuard { impl CounterInner { fn inc(&self) { - let num = self.count.get() + 1; - self.count.set(num); - if num == self.capacity { - self.task.register(); - } + self.count.set(self.count.get() + 1); } fn dec(&self) { @@ -73,6 +71,11 @@ impl CounterInner { } fn available(&self) -> bool { - self.count.get() < self.capacity + if self.count.get() < self.capacity { + true + } else { + self.task.register(); + false + } } } From 35e32d8e553924d26e4c729793618e72364c71a0 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 14 Oct 2019 10:30:27 +0600 Subject: [PATCH 10/17] prepare actix-testing release --- Cargo.toml | 26 -------------------------- actix-testing/CHANGES.md | 5 +++++ actix-testing/Cargo.toml | 6 +++--- 3 files changed, 8 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b33514f..f489c5c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,18 +1,3 @@ -[package] -name = "actix-net" -version = "0.3.0" -authors = ["Nikolay Kim "] -description = "Actix net - framework for the composable network services for Rust" -readme = "README.md" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-net/" -categories = ["network-programming", "asynchronous"] -license = "MIT/Apache-2.0" -exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] -edition = "2018" - [workspace] members = [ "actix-codec", @@ -28,17 +13,6 @@ members = [ "router", ] -[dev-dependencies] -actix-service = "0.4.0" -actix-codec = "0.1.1" -actix-rt = "0.2.0" -actix-server = { version="0.5.0", features=["ssl"] } -env_logger = "0.6" -futures = "0.1.25" -openssl = "0.10" -tokio-tcp = "0.1" -tokio-openssl = "0.3" - [patch.crates-io] actix-codec = { path = "actix-codec" } actix-connect = { path = "actix-connect" } diff --git a/actix-testing/CHANGES.md b/actix-testing/CHANGES.md index 3dfd22fb..7896c18e 100644 --- a/actix-testing/CHANGES.md +++ b/actix-testing/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [0.2.0] - 2019-10-14 + +* Upgrade actix-server and actix-server-config deps + + ## [0.1.0] - 2019-09-25 * Initial impl diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index b11e6455..eb9ac455 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-testing" -version = "0.1.0" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix testing utils" keywords = ["network", "framework", "async", "futures"] @@ -17,10 +17,10 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "0.2.1" +actix-rt = "0.2.5" actix-server = "0.7.0" actix-server-config = "0.2.0" -actix-service = "0.4.0" +actix-service = "0.4.2" log = "0.4" net2 = "0.2" From 0b0060fe47f1a5bc8921b33571b530df64ee17fe Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 14 Oct 2019 10:37:48 +0600 Subject: [PATCH 11/17] update deps --- actix-connect/Cargo.toml | 2 +- actix-ioframe/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 15e24cbc..0fa5c903 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -57,5 +57,5 @@ webpki = { version = "0.21", optional = true } [dev-dependencies] bytes = "0.4" -actix-testing = { version="0.1.0" } +actix-testing = { version="0.2.0" } actix-server-config = "0.2.0" diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 7eac1f9b..53639e36 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -28,8 +28,8 @@ log = "0.4" [dev-dependencies] actix-rt = "0.2.2" -actix-connect = "0.2.0" -actix-testing = "0.1.0" +actix-connect = "0.3.0" +actix-testing = "0.2.0" actix-server-config = "0.2.0" tokio-tcp = "0.1" tokio-timer = "0.2" From 115e82329fe47f3aad5c3b946c01821bf849f90d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 14 Oct 2019 11:19:08 +0600 Subject: [PATCH 12/17] fix arbiter thread panic message --- actix-rt/CHANGES.md | 7 +++++++ actix-rt/src/arbiter.rs | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index c82a0d07..98f6a601 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.2.6] - Unreleased + +### Fixed + +* Fix arbiter's thread panic message. + + ## [0.2.5] - 2019-09-02 ### Added diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 285ad3c9..083105b3 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -260,9 +260,11 @@ struct ArbiterController { impl Drop for ArbiterController { fn drop(&mut self) { if thread::panicking() { - eprintln!("Panic in Arbiter thread, shutting down system."); if System::current().stop_on_panic() { + eprintln!("Panic in Arbiter thread, shutting down system."); System::current().stop_with_code(1) + } else { + eprintln!("Panic in Arbiter thread."); } } } From 2e8c2c77337ac3f7a1a0fd2d3c8636a163c8607a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 14 Oct 2019 17:55:52 +0600 Subject: [PATCH 13/17] Re-register task on every future poll --- actix-ioframe/CHANGES.md | 6 ++++++ actix-ioframe/Cargo.toml | 2 +- actix-ioframe/src/cell.rs | 4 ++++ actix-ioframe/src/dispatcher.rs | 3 ++- actix-utils/CHANGES.md | 5 +++++ actix-utils/Cargo.toml | 2 +- actix-utils/src/framed.rs | 3 ++- 7 files changed, 21 insertions(+), 4 deletions(-) diff --git a/actix-ioframe/CHANGES.md b/actix-ioframe/CHANGES.md index 90905f5f..a7aa244f 100644 --- a/actix-ioframe/CHANGES.md +++ b/actix-ioframe/CHANGES.md @@ -1,5 +1,11 @@ # Changes + +## [0.1.1] - 2019-10-14 + +* Re-register task on every dispatcher poll. + + ## [0.1.0] - 2019-09-25 * Initial release diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 53639e36..97e32152 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-ioframe" -version = "0.1.0" +version = "0.1.1" authors = ["Nikolay Kim "] description = "Actix framed service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-ioframe/src/cell.rs b/actix-ioframe/src/cell.rs index e517aca1..3e82aee4 100644 --- a/actix-ioframe/src/cell.rs +++ b/actix-ioframe/src/cell.rs @@ -29,6 +29,10 @@ impl Cell { } } + pub(crate) unsafe fn get_ref(&mut self) -> &T { + &*self.inner.as_ref().get() + } + pub(crate) unsafe fn get_mut(&mut self) -> &mut T { &mut *self.inner.as_ref().get() } diff --git a/actix-ioframe/src/dispatcher.rs b/actix-ioframe/src/dispatcher.rs index 57713b10..05a7d05f 100644 --- a/actix-ioframe/src/dispatcher.rs +++ b/actix-ioframe/src/dispatcher.rs @@ -154,7 +154,6 @@ where }; let mut cell = self.inner.clone(); - unsafe { cell.get_mut().task.register() }; tokio_current_thread::spawn( self.service .call(Item::new(self.state.clone(), self.sink.clone(), item)) @@ -275,6 +274,8 @@ where type Error = ServiceError; fn poll(&mut self) -> Poll { + unsafe { self.inner.get_ref().task.register() }; + match mem::replace(&mut self.dispatch_state, FramedState::Processing) { FramedState::Processing => { if self.poll_read() || self.poll_write() { diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index d2db8811..4e4ac8d7 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,10 @@ # Changes +## [0.4.7] - 2019-10-14 + +* Re-register task on every framed transport poll. + + ## [0.4.6] - 2019-10-08 * Refactor `Counter` type. register current task in available method. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index b687d030..ee3c1dba 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.4.6" +version = "0.4.7" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 7be2c750..2fa2d8e9 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -129,7 +129,6 @@ where }; let mut cell = self.inner.clone(); - cell.get_mut().task.register(); tokio_current_thread::spawn(self.service.call(item).then(move |item| { let inner = cell.get_mut(); inner.buf.push_back(item); @@ -293,6 +292,8 @@ where type Error = FramedTransportError; fn poll(&mut self) -> Poll { + self.inner.get_ref().task.register(); + match mem::replace(&mut self.state, TransportState::Processing) { TransportState::Processing => { if self.poll_read() || self.poll_write() { From e7631f76f2c76d27fb19b2bb4deeb03c5181ad49 Mon Sep 17 00:00:00 2001 From: Christian Battaglia Date: Thu, 7 Nov 2019 17:49:54 -0500 Subject: [PATCH 14/17] update to latest tokio alpha and futures-rs --- actix-codec/Cargo.toml | 6 +++--- actix-rt/Cargo.toml | 11 +++++------ actix-threadpool/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index fefdf718..1eb604d6 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -20,7 +20,7 @@ path = "src/lib.rs" [dependencies] bytes = "0.4.12" pin-utils = "0.1.0-alpha.4" -futures = { package = "futures-preview", version = "0.3.0-alpha.18" } -tokio-io = "0.2.0-alpha.4" -tokio-codec = "0.2.0-alpha.4" +futures = "0.3.1" +tokio-io = "0.2.0-alpha.5" +tokio-codec = "0.2.0-alpha.5" log = "0.4" \ No newline at end of file diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 9deeaaf6..db3782ab 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -19,13 +19,12 @@ path = "src/lib.rs" [dependencies] actix-threadpool = "0.1.1" -futures = { package = "futures-preview", version = "0.3.0-alpha.18" } - +futures = "0.3.1" # TODO: Replace this with dependency on tokio-runtime once it is ready -tokio = { version = "0.2.0-alpha.4" } -tokio-timer = "=0.3.0-alpha.4" -tokio-executor = "=0.2.0-alpha.4" -tokio-net = "=0.2.0-alpha.4" +tokio = { version = "0.2.0-alpha.5" } +tokio-timer = "=0.3.0-alpha.5" +tokio-executor = "=0.2.0-alpha.5" +tokio-net = "=0.2.0-alpha.5" copyless = "0.1.4" diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 643c380b..03e3e027 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] derive_more = "0.15" -futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +futures = "0.3.1" parking_lot = "0.9" lazy_static = "1.2" log = "0.4" From 7528e7f1cb9cf2bc0af800b05fee8fbb80a90f18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Hornick=C3=BD?= Date: Tue, 1 Oct 2019 09:54:02 +0200 Subject: [PATCH 15/17] Migrate actix-service to std::future, This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits, look into the semtexzv/std-future-service-tmp branch. --- actix-service/Cargo.toml | 4 +- actix-service/src/and_then.rs | 156 +++++++++++++--------- actix-service/src/and_then_apply.rs | 97 ++++++++------ actix-service/src/and_then_apply_fn.rs | 153 ++++++++++++--------- actix-service/src/apply.rs | 81 ++++++----- actix-service/src/apply_cfg.rs | 72 ++++++---- actix-service/src/blank.rs | 17 ++- actix-service/src/boxed.rs | 41 ++++-- actix-service/src/cell.rs | 4 + actix-service/src/fn_service.rs | 27 ++-- actix-service/src/fn_transform.rs | 7 +- actix-service/src/from_err.rs | 99 ++++++++------ actix-service/src/lib.rs | 104 +++++++++++++-- actix-service/src/map.rs | 90 +++++++------ actix-service/src/map_err.rs | 97 ++++++++------ actix-service/src/map_init_err.rs | 15 ++- actix-service/src/then.rs | 178 +++++++++++++++---------- actix-service/src/transform.rs | 37 ++--- actix-service/src/transform_err.rs | 24 ++-- 19 files changed, 813 insertions(+), 490 deletions(-) diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 8bdf6bcf..c4908713 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -23,7 +23,9 @@ name = "actix_service" path = "src/lib.rs" [dependencies] -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +pin-project = "0.4.0-alpha.11" [dev-dependencies] +tokio = "0.2.0-alpha.4" actix-rt = "0.2" \ No newline at end of file diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 208c7b09..44bb1a45 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -1,14 +1,21 @@ -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. /// /// This is created by the `ServiceExt::and_then` method. +#[pin_project] pub struct AndThen { + #[pin] a: A, + #[pin] b: Cell, } @@ -45,12 +52,16 @@ where type Error = A::Error; type Future = AndThenFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(cx)?.is_ready(); + if !this.b.get_pin().poll_ready(cx)?.is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -59,13 +70,16 @@ where } } +#[pin_project] pub struct AndThenFuture where A: Service, B: Service, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -88,22 +102,33 @@ where A: Service, B: Service, { - type Item = B::Response; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll(); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - match self.fut_a.as_mut().expect("Bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(resp)); - self.poll() + loop { + let mut fut_a = this.fut_a.as_mut(); + let mut fut_b = this.fut_b.as_mut(); + + if let Some(fut) = fut_b.as_mut().as_pin_mut() { + return fut.poll(cx); + } + + match fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { + Poll::Ready(Ok(resp)) => { + fut_a.set(None); + let new_fut = this.b.get_mut().call(resp); + fut_b.set(Some(new_fut)); + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), } } } @@ -174,13 +199,17 @@ where } } +#[pin_project] pub struct AndThenNewServiceFuture where A: NewService, B: NewService, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, + a: Option, b: Option, } @@ -205,37 +234,35 @@ where A: NewService, B: NewService, { - type Item = AndThen; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThen::new( - self.a.take().unwrap(), - self.b.take().unwrap(), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(AndThen::new( + this.a.take().unwrap(), + this.b.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Poll}; + use futures::future::{ok, poll_fn, ready, Ready}; + use futures::Poll; use std::cell::Cell; use std::rc::Rc; @@ -243,15 +270,19 @@ mod tests { use crate::{NewService, Service, ServiceExt}; struct Srv1(Rc>); + impl Service for Srv1 { type Request = &'static str; type Response = &'static str; type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { @@ -266,11 +297,14 @@ mod tests { type Request = &'static str; type Response = (&'static str, &'static str); type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { @@ -278,39 +312,35 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt.clone())); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Ok(()))); assert_eq!(cnt.get(), 2); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt)); - let res = srv.call("srv1").poll(); + let res = srv.call("srv1").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2"))); + assert_eq!(res.unwrap(), (("srv1", "srv2"))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - let blank = move || Ok::<_, ()>(Srv1(cnt2.clone())); + let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone()))); let new_srv = blank .into_new_service() - .and_then(move || Ok(Srv2(cnt.clone()))); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call("srv1").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2"))); - } else { - panic!() - } + .and_then(move || ready(Ok(Srv2(cnt.clone())))); + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call("srv1").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("srv1", "srv2")); } } diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs index a8ce6991..f26b1517 100644 --- a/actix-service/src/and_then_apply.rs +++ b/actix-service/src/and_then_apply.rs @@ -1,11 +1,15 @@ use std::rc::Rc; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use crate::and_then::AndThen; use crate::from_err::FromErr; use crate::{NewService, Transform}; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// `Apply` new service combinator pub struct AndThenTransform { a: A, @@ -72,6 +76,7 @@ where } } +#[pin_project] pub struct AndThenTransformFuture where A: NewService, @@ -79,8 +84,11 @@ where T: Transform, T::Error: From, { + #[pin] fut_a: A::Future, + #[pin] fut_b: B::Future, + #[pin] fut_t: Option, a: Option, t: Option, @@ -94,56 +102,63 @@ where T: Transform, T::Error: From, { - type Item = AndThen, T::Transform>; - type Error = T::InitError; + type Output = Result, T::Transform>, T::InitError>; - fn poll(&mut self) -> Poll { - if self.fut_t.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.fut_t = Some(self.t_cell.new_transform(service)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if this.fut_t.is_none() { + if let Poll::Ready(svc) = this.fut_b.poll(cx)? { + this.fut_t.set(Some(this.t_cell.new_transform(svc))) } } - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + if this.a.is_none() { + if let Poll::Ready(svc) = this.fut_a.poll(cx)? { + *this.a = Some(svc) } } - if let Some(ref mut fut) = self.fut_t { - if let Async::Ready(transform) = fut.poll()? { - self.t = Some(transform); + if let Some(fut) = this.fut_t.as_pin_mut() { + if let Poll::Ready(transform) = fut.poll(cx)? { + *this.t = Some(transform) } } - if self.a.is_some() && self.t.is_some() { - Ok(Async::Ready(AndThen::new( - FromErr::new(self.a.take().unwrap()), - self.t.take().unwrap(), + if this.a.is_some() && this.t.is_some() { + Poll::Ready(Ok(AndThen::new( + FromErr::new(this.a.take().unwrap()), + this.t.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, ready, Ready}; + use futures::{Future, FutureExt, Poll, TryFutureExt}; use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -151,36 +166,32 @@ mod tests { } } - #[test] - fn test_apply() { - let blank = |req| Ok(req); + #[tokio::test] + async fn test_apply() { + let blank = |req| ready(Ok(req)); let mut srv = blank .into_service() .apply_fn(Srv, |req: &'static str, srv: &mut Srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>((|req| Ok(req)).into_service()); + #[tokio::test] + async fn test_new_service() { + let blank = move || ok::<_, ()>((|req| ok(req)).into_service()); let new_srv = blank.into_new_service().apply( - |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), - || Ok(Srv), + |req: &'static str, srv: &mut Srv| srv.call(()).map_ok(move |res| (req, res)), + || ok(Srv), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs index 5fd846b7..6b006c73 100644 --- a/actix-service/src/and_then_apply_fn.rs +++ b/actix-service/src/and_then_apply_fn.rs @@ -1,11 +1,16 @@ +use futures::{Future, Poll}; use std::marker::PhantomData; - -use futures::{Async, Future, IntoFuture, Poll}; +use std::pin::Pin; +use std::task::Context; use super::{IntoNewService, IntoService, NewService, Service}; use crate::cell::Cell; +use crate::IntoFuture; +use pin_project::pin_project; + /// `Apply` service combinator +#[pin_project] pub struct AndThenApply where A: Service, @@ -14,8 +19,11 @@ where Out: IntoFuture, Out::Error: Into, { + #[pin] a: A, + #[pin] b: Cell, + #[pin] f: Cell, r: PhantomData<(Out,)>, } @@ -70,12 +78,16 @@ where type Error = A::Error; type Future = AndThenApplyFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(ctx)?.is_ready(); + if !this.b.get_pin().poll_ready(ctx).is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -89,6 +101,7 @@ where } } +#[pin_project] pub struct AndThenApplyFuture where A: Service, @@ -99,7 +112,9 @@ where { b: Cell, f: Cell, + #[pin] fut_a: Option, + #[pin] fut_b: Option, } @@ -111,23 +126,30 @@ where Out: IntoFuture, Out::Error: Into, { - type Item = Out::Item; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll().map_err(|e| e.into()); - } - - match self.fut_a.as_mut().expect("Bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = - Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); - self.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + if let Some(fut) = this.fut_b.as_mut().as_pin_mut() { + return fut.poll(cx).map_err(|e| e.into()); + } + + match this + .fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx)? + { + Poll::Ready(resp) => { + this.fut_a.set(None); + this.fut_b.set(Some( + (&mut *this.f.get_mut())(resp, this.b.get_mut()).into_future(), + )); + } + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), } } } @@ -195,12 +217,13 @@ where a: None, b: None, f: self.f.clone(), - fut_a: self.a.new_service(cfg).into_future(), - fut_b: self.b.new_service(cfg).into_future(), + fut_a: self.a.new_service(cfg), + fut_b: self.b.new_service(cfg), } } } +#[pin_project] pub struct AndThenApplyNewServiceFuture where A: NewService, @@ -209,7 +232,9 @@ where Out: IntoFuture, Out::Error: Into, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, f: Cell, a: Option, @@ -224,53 +249,60 @@ where Out: IntoFuture, Out::Error: Into, { - type Item = AndThenApply; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThenApply { - f: self.f.clone(), - a: self.a.take().unwrap(), - b: Cell::new(self.b.take().unwrap()), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(AndThenApply { + f: this.f.clone(), + a: this.a.take().unwrap(), + b: Cell::new(this.b.take().unwrap()), r: PhantomData, })) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, Ready}; + use futures::{Future, Poll, TryFutureExt}; use crate::blank::{Blank, BlankNewService}; use crate::{NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -278,30 +310,27 @@ mod tests { } } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let new_srv = BlankNewService::new_unit().apply_fn( - || Ok(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + || ok(Srv), + |req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index d5f84463..9bb96773 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,9 +1,14 @@ use std::marker::PhantomData; -use futures::{Async, Future, IntoFuture, Poll}; +use futures::{ready, Future, Poll}; use super::{IntoNewService, IntoService, NewService, Service}; +use crate::IntoFuture; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Apply tranform function to a service pub fn apply_fn(service: U, f: F) -> Apply where @@ -30,10 +35,12 @@ where #[doc(hidden)] /// `Apply` service combinator +#[pin_project] pub struct Apply where T: Service, { + #[pin] service: T, f: F, r: PhantomData<(In, Out)>, @@ -82,8 +89,11 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(ready!(self.project().service.poll_ready(ctx)).map_err(|e| e.into())) } fn call(&mut self, req: In) -> Self::Future { @@ -154,12 +164,14 @@ where } } +#[pin_project] pub struct ApplyNewServiceFuture where T: NewService, F: FnMut(In, &mut T::Service) -> Out + Clone, Out: IntoFuture, { + #[pin] fut: T::Future, f: Option, r: PhantomData<(In, Out)>, @@ -187,36 +199,40 @@ where Out: IntoFuture, Out::Error: From, { - type Item = Apply; - type Error = T::InitError; + type Output = Result, T::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(Apply::new(service, self.f.take().unwrap()))) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, Ready}; + use futures::{Future, Poll, TryFutureExt}; use super::*; use crate::{IntoService, NewService, Service, ServiceExt}; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -224,34 +240,33 @@ mod tests { } } - #[test] - fn test_call() { - let blank = |req| Ok(req); + #[tokio::test] + async fn test_call() { + let blank = |req| ok(req); let mut srv = blank .into_service() .apply_fn(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let new_srv = ApplyNewService::new( - || Ok::<_, ()>(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + || ok::<_, ()>(Srv), + |req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + + let mut srv = new_srv.new_service(&()).await.unwrap(); + + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index a63de936..d0ce6fa1 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -1,10 +1,14 @@ use std::marker::PhantomData; use futures::future::Future; -use futures::{try_ready, Async, IntoFuture, Poll}; +use futures::{ready, Poll}; use crate::cell::Cell; -use crate::{IntoService, NewService, Service}; +use crate::{IntoFuture, IntoService, NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Convert `Fn(&Config, &mut Service) -> Future` fn to a NewService pub fn apply_cfg( @@ -61,7 +65,8 @@ where } } -/// Convert `Fn(&Config) -> Future` fn to NewService +/// Convert `Fn(&Config) -> Future` fn to NewService\ +#[pin_project] struct ApplyConfigService where F: FnMut(&C, &mut T) -> R, @@ -71,6 +76,7 @@ where S: Service, { f: Cell, + #[pin] srv: Cell, _t: PhantomData<(C, R, S)>, } @@ -118,12 +124,14 @@ where } } +#[pin_project] struct FnNewServiceConfigFut where R: IntoFuture, R::Item: IntoService, S: Service, { + #[pin] fut: R::Future, _t: PhantomData<(S,)>, } @@ -134,11 +142,10 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).into_service())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) } } @@ -206,6 +213,7 @@ where } } +#[pin_project] struct ApplyConfigNewServiceFut where C: Clone, @@ -218,8 +226,11 @@ where { cfg: C, f: Cell, + #[pin] srv: Option, + #[pin] srv_fut: Option, + #[pin] fut: Option, _t: PhantomData<(S,)>, } @@ -234,33 +245,38 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.srv_fut { - match fut.poll()? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(srv) => { - let _ = self.srv_fut.take(); - self.srv = Some(srv); - return self.poll(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + 'poll: loop { + if let Some(fut) = this.srv_fut.as_mut().as_pin_mut() { + match fut.poll(cx)? { + Poll::Pending => return Poll::Pending, + Poll::Ready(srv) => { + this.srv_fut.set(None); + this.srv.set(Some(srv)); + continue 'poll; + } } } - } - if let Some(ref mut fut) = self.fut { - Ok(Async::Ready(try_ready!(fut.poll()).into_service())) - } else if let Some(ref mut srv) = self.srv { - match srv.poll_ready()? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(_) => { - self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future()); - return self.poll(); + if let Some(fut) = this.fut.as_mut().as_pin_mut() { + return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service())); + } else if let Some(mut srv) = this.srv.as_mut().as_pin_mut() { + match srv.as_mut().poll_ready(cx)? { + Poll::Ready(_) => { + this.fut.set(Some( + this.f.get_mut()(&this.cfg, unsafe { Pin::get_unchecked_mut(srv) }) + .into_future(), + )); + continue 'poll; + } + Poll::Pending => return Poll::Pending, } + } else { + return Poll::Pending; } - } else { - Ok(Async::NotReady) } } } diff --git a/actix-service/src/blank.rs b/actix-service/src/blank.rs index d02f75f8..e213c4c0 100644 --- a/actix-service/src/blank.rs +++ b/actix-service/src/blank.rs @@ -1,9 +1,11 @@ use std::marker::PhantomData; -use futures::future::{ok, FutureResult}; -use futures::{Async, Poll}; +use futures::future::{ok, Ready}; +use futures::Poll; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; /// Empty service #[derive(Clone)] @@ -34,10 +36,13 @@ impl Service for Blank { type Request = R; type Response = R; type Error = E; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + _ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: R) -> Self::Future { @@ -76,7 +81,7 @@ impl NewService for BlankNewService { type Config = (); type Service = Blank; type InitError = E2; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(Blank::default()) diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index 16e37fa2..b405ca26 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -1,7 +1,11 @@ -use futures::future::{err, ok, Either, FutureResult}; -use futures::{Async, Future, IntoFuture, Poll}; +use std::pin::Pin; -use crate::{NewService, Service}; +use crate::{IntoFuture, NewService, Service, ServiceExt}; +use futures::future::FutureExt; +use futures::future::LocalBoxFuture; +use futures::future::{err, ok, Either, Ready}; +use futures::{Future, Poll}; +use std::task::Context; pub type BoxedService = Box< dyn Service< @@ -13,7 +17,7 @@ pub type BoxedService = Box< >; pub type BoxedServiceResponse = - Either, Box>>; + Either>, LocalBoxFuture<'static, Result>>; pub struct BoxedNewService(Inner); @@ -53,7 +57,7 @@ type Inner = Box< Error = Err, InitError = InitErr, Service = BoxedService, - Future = Box, Error = InitErr>>, + Future = LocalBoxFuture<'static, Result, InitErr>>, >, >; @@ -70,7 +74,8 @@ where type InitError = InitErr; type Config = C; type Service = BoxedService; - type Future = Box>; + + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: &C) -> Self::Future { self.0.new_service(cfg) @@ -99,15 +104,18 @@ where type InitError = InitErr; type Config = C; type Service = BoxedService; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: &C) -> Self::Future { - Box::new( + /* TODO: Figure out what the hell is hapenning here + Box::new( self.service .new_service(cfg) .into_future() .map(ServiceWrapper::boxed), ) + */ + unimplemented!() } } @@ -132,10 +140,22 @@ where type Response = Res; type Error = Err; type Future = Either< - FutureResult, - Box>, + Ready>, + LocalBoxFuture<'static, Result>, >; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + unimplemented!() + } + + /* fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.0.poll_ready() } @@ -148,4 +168,5 @@ where Ok(Async::NotReady) => Either::B(Box::new(fut)), } } + */ } diff --git a/actix-service/src/cell.rs b/actix-service/src/cell.rs index 2dc6d6a6..e9bb1bcf 100644 --- a/actix-service/src/cell.rs +++ b/actix-service/src/cell.rs @@ -1,4 +1,5 @@ //! Custom cell impl +use std::pin::Pin; use std::{cell::UnsafeCell, fmt, rc::Rc}; pub(crate) struct Cell { @@ -33,6 +34,9 @@ impl Cell { pub(crate) fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.inner.as_ref().get() } } + pub(crate) fn get_pin(self: Pin<&mut Self>) -> Pin<&mut T> { + unsafe { Pin::new_unchecked(&mut *Pin::get_unchecked_mut(self).inner.as_ref().get()) } + } #[allow(clippy::mut_from_ref)] pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 8adcfd55..5deeec66 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,9 +1,14 @@ use std::marker::PhantomData; -use futures::future::{ok, Future, FutureResult}; -use futures::{try_ready, Async, IntoFuture, Poll}; +use crate::IntoFuture; +use futures::future::{ok, Future, Ready}; +use futures::{ready, Poll}; use crate::{IntoNewService, IntoService, NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Create `NewService` for function that can act as a Service pub fn service_fn(f: F) -> NewServiceFn @@ -75,8 +80,11 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + _ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Req) -> Self::Future { @@ -135,7 +143,7 @@ where type Config = Cfg; type Service = ServiceFn; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &Cfg) -> Self::Future { ok(ServiceFn::new(self.f.clone())) @@ -210,12 +218,14 @@ where } } +#[pin_project] pub struct FnNewServiceConfigFut where R: IntoFuture, R::Item: IntoService, S: Service, { + #[pin] fut: R::Future, _t: PhantomData<(S,)>, } @@ -226,11 +236,10 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).into_service())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) } } diff --git a/actix-service/src/fn_transform.rs b/actix-service/src/fn_transform.rs index 2e4be2d5..ea132ada 100644 --- a/actix-service/src/fn_transform.rs +++ b/actix-service/src/fn_transform.rs @@ -1,10 +1,9 @@ use std::marker::PhantomData; -use futures::future::{ok, FutureResult}; -use futures::IntoFuture; +use futures::future::{ok, Ready}; use crate::apply::Apply; -use crate::{IntoTransform, Service, Transform}; +use crate::{IntoFuture, IntoTransform, Service, Transform}; /// Use function as transform service pub fn transform_fn( @@ -50,7 +49,7 @@ where type Error = Out::Error; type Transform = Apply; type InitError = Err; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(Apply::new(service, self.f.clone())) diff --git a/actix-service/src/from_err.rs b/actix-service/src/from_err.rs index 5d37d725..72281972 100644 --- a/actix-service/src/from_err.rs +++ b/actix-service/src/from_err.rs @@ -1,13 +1,19 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Service for the `from_err` combinator, changing the error type of a service. /// /// This is created by the `ServiceExt::from_err` method. +#[pin_project] pub struct FromErr { + #[pin] service: A, f: PhantomData, } @@ -47,8 +53,11 @@ where type Error = E; type Future = FromErrFuture; - fn poll_ready(&mut self) -> Poll<(), E> { - self.service.poll_ready().map_err(E::from) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + self.project().service.poll_ready(ctx).map_err(E::from) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -59,7 +68,9 @@ where } } +#[pin_project] pub struct FromErrFuture { + #[pin] fut: A::Future, f: PhantomData, } @@ -69,11 +80,10 @@ where A: Service, E: From, { - type Item = A::Response; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(E::from) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx).map_err(E::from) } } @@ -131,11 +141,13 @@ where } } +#[pin_project] pub struct FromErrNewServiceFuture where A: NewService, E: From, { + #[pin] fut: A::Future, e: PhantomData, } @@ -145,34 +157,48 @@ where A: NewService, E: From, { - type Item = FromErr; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(FromErr::new(service))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(svc) = self.project().fut.poll(cx)? { + Poll::Ready(Ok(FromErr::new(svc))) } else { - Ok(Async::NotReady) + Poll::Pending } } + + /* + fn poll(&mut self) -> Poll { + if let Poll::Ready(service) = self.fut.poll()? { + Ok(Poll::Ready(FromErr::new(service))) + } else { + Ok(Poll::Pending) + } + } + */ } #[cfg(test)] mod tests { - use futures::future::{err, FutureResult}; + use futures::future::{err, Ready}; use super::*; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use tokio::future::ok; struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Err(()) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -189,32 +215,29 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.from_err::(); - let res = srv.poll_ready(); + let res = srv.poll_once().await; + + assert_eq!(res, Poll::Ready(Err(Error))); + } + + #[tokio::test] + async fn test_call() { + let mut srv = Srv.from_err::(); + let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), Error); } - #[test] - fn test_call() { - let mut srv = Srv.from_err::(); - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } - - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); let new_srv = blank.into_new_service().from_err::(); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), Error); } } diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index ee327da0..185e79d4 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -1,8 +1,13 @@ +use futures::future::{ready, LocalBoxFuture, Ready}; +use futures::{Future, Poll}; use std::cell::RefCell; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task; +use std::task::Context; -use futures::{Future, IntoFuture, Poll}; +mod cell; mod and_then; mod and_then_apply; @@ -11,7 +16,6 @@ mod apply; mod apply_cfg; pub mod blank; pub mod boxed; -mod cell; mod fn_service; mod fn_transform; mod from_err; @@ -24,6 +28,9 @@ mod transform; mod transform_err; pub use self::and_then::{AndThen, AndThenNewService}; + +use self::and_then_apply::AndThenTransform; +use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService}; pub use self::apply_cfg::{apply_cfg, new_apply_cfg}; pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn}; @@ -36,8 +43,34 @@ pub use self::map_init_err::MapInitErr; pub use self::then::{Then, ThenNewService}; pub use self::transform::{apply_transform, IntoTransform, Transform}; -use self::and_then_apply::AndThenTransform; -use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; +pub trait IntoFuture { + type Item; + type Error; + type Future: Future>; + fn into_future(self) -> Self::Future; +} + +impl>, I, E> IntoFuture for F { + type Item = I; + type Error = E; + type Future = F; + + fn into_future(self) -> Self::Future { + self + } +} + +/* +impl IntoFuture for Result { + type Item = I; + type Error = E; + type Future = Ready; + + fn into_future(self) -> Self::Future { + ready(self) + } +} +*/ /// An asynchronous function from `Request` to a `Response`. pub trait Service { @@ -51,7 +84,7 @@ pub trait Service { type Error; /// The future response value. - type Future: Future; + type Future: Future>; /// Returns `Ready` when the service is able to process requests. /// @@ -62,7 +95,10 @@ pub trait Service { /// This is a **best effort** implementation. False positives are permitted. /// It is permitted for the service to return `Ready` from a `poll_ready` /// call and the next invocation of `call` results in an error. - fn poll_ready(&mut self) -> Poll<(), Self::Error>; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut task::Context<'_>, + ) -> Poll>; /// Process the request and return the response asynchronously. /// @@ -74,6 +110,31 @@ pub trait Service { /// Calling `call` without calling `poll_ready` is permitted. The /// implementation must be resilient to this fact. fn call(&mut self, req: Self::Request) -> Self::Future; + + #[cfg(test)] + fn poll_test(&mut self) -> Poll> { + // kinda stupid method, but works for our test purposes + unsafe { + let mut this = Pin::new_unchecked(self); + tokio::runtime::current_thread::Builder::new() + .build() + .unwrap() + .block_on(futures::future::poll_fn(move |cx| { + let this = &mut this; + Poll::Ready(this.as_mut().poll_ready(cx)) + })) + } + } + + fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll>> { + unsafe { + let mut this = Pin::new_unchecked(self); + Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| { + let this = &mut this; + Poll::Ready(this.as_mut().poll_ready(cx)) + }))) + } + } } /// An extension trait for `Service`s that provides a variety of convenient @@ -206,7 +267,7 @@ pub trait NewService { type InitError; /// The future of the `Service` instance. - type Future: Future; + type Future: Future>; /// Create and return a new service value asynchronously. fn new_service(&self, cfg: &Self::Config) -> Self::Future; @@ -343,8 +404,11 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - (**self).poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut **s).poll_ready(ctx) } } fn call(&mut self, request: Self::Request) -> S::Future { @@ -361,8 +425,14 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - (**self).poll_ready() + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { + let p: &mut S = Pin::as_mut(&mut self).get_mut(); + Pin::new_unchecked(p).poll_ready(ctx) + } } fn call(&mut self, request: Self::Request) -> S::Future { @@ -379,12 +449,18 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - self.borrow_mut().poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { + let r = self.get_unchecked_mut(); + Pin::new_unchecked(&mut (*(**r).borrow_mut())).poll_ready(ctx) + } } fn call(&mut self, request: Self::Request) -> S::Future { - self.borrow_mut().call(request) + (&mut (**self).borrow_mut()).call(request) } } diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index ac3654b0..a29690b0 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -1,13 +1,19 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Service for the `map` combinator, changing the type of a service's response. /// /// This is created by the `ServiceExt::map` method. +#[pin_project] pub struct Map { + #[pin] service: A, f: F, _t: PhantomData, @@ -52,8 +58,11 @@ where type Error = A::Error; type Future = MapFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + self.project().service.poll_ready(ctx) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -61,12 +70,14 @@ where } } +#[pin_project] pub struct MapFuture where A: Service, F: FnMut(A::Response) -> Response, { f: F, + #[pin] fut: A::Future, } @@ -85,13 +96,14 @@ where A: Service, F: FnMut(A::Response) -> Response, { - type Item = Response; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.fut.poll()? { - Async::Ready(resp) => Ok(Async::Ready((self.f)(resp))), - Async::NotReady => Ok(Async::NotReady), + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.fut.poll(cx) { + Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } } } @@ -151,11 +163,13 @@ where } } +#[pin_project] pub struct MapNewServiceFuture where A: NewService, F: FnMut(A::Response) -> Res, { + #[pin] fut: A::Future, f: Option, } @@ -175,34 +189,38 @@ where A: NewService, F: FnMut(A::Response) -> Res, { - type Item = Map; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(Map::new(service, self.f.take().unwrap()))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; + use futures::future::{ok, Ready}; use super::*; use crate::{IntoNewService, Service, ServiceExt}; struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -210,32 +228,28 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.map(|_| "ok"); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Ok(()))); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let mut srv = Srv.map(|_| "ok"); - let res = srv.call(()).poll(); + let res = srv.call(()).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready("ok")); + assert_eq!(res.unwrap(), "ok"); } - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); let new_srv = blank.into_new_service().map(|_| "ok"); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready("ok")); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("ok")); } } diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index 47ce11fc..0bbd5bf0 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -1,14 +1,20 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Service for the `map_err` combinator, changing the type of a service's /// error. /// /// This is created by the `ServiceExt::map_err` method. +#[pin_project] pub struct MapErr { + #[pin] service: A, f: F, _t: PhantomData, @@ -53,8 +59,12 @@ where type Error = E; type Future = MapErrFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(&self.f) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.project(); + this.service.poll_ready(ctx).map_err(this.f) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -62,12 +72,14 @@ where } } +#[pin_project] pub struct MapErrFuture where A: Service, F: Fn(A::Error) -> E, { f: F, + #[pin] fut: A::Future, } @@ -86,11 +98,11 @@ where A: Service, F: Fn(A::Error) -> E, { - type Item = A::Response; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + this.fut.poll(cx).map_err(this.f) } } @@ -156,11 +168,13 @@ where } } +#[pin_project] pub struct MapErrNewServiceFuture where A: NewService, F: Fn(A::Error) -> E, { + #[pin] fut: A::Future, f: F, } @@ -180,24 +194,25 @@ where A: NewService, F: Fn(A::Error) -> E + Clone, { - type Item = MapErr; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(MapErr::new(service, self.f.clone()))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{err, FutureResult}; + use futures::future::{err, Ready}; use super::*; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use tokio::future::ok; struct Srv; @@ -205,10 +220,13 @@ mod tests { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Err(()) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -216,32 +234,33 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.map_err(|_| "error"); - let res = srv.poll_ready(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), "error"); - } - - #[test] - fn test_call() { - let mut srv = Srv.map_err(|_| "error"); - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), "error"); - } - - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); - let new_srv = blank.into_new_service().map_err(|_| "error"); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); + let res = srv.poll_once().await; + if let Poll::Ready(res) = res { assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); } else { - panic!() + panic!("Should be ready"); } } + + #[tokio::test] + async fn test_call() { + let mut srv = Srv.map_err(|_| "error"); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), "error"); + } + + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); + let new_srv = blank.into_new_service().map_err(|_| "error"); + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), "error"); + } } diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index 4866370a..c9225eb3 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -4,6 +4,10 @@ use futures::{Future, Poll}; use super::NewService; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// `MapInitErr` service combinator pub struct MapInitErr { a: A, @@ -58,13 +62,14 @@ where MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) } } - +#[pin_project] pub struct MapInitErrFuture where A: NewService, F: Fn(A::InitError) -> E, { f: F, + #[pin] fut: A::Future, } @@ -83,10 +88,10 @@ where A: NewService, F: Fn(A::InitError) -> E, { - type Item = A::Service; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + this.fut.poll(cx).map_err(this.f) } } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 56fae3a1..7cfbfe88 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -1,14 +1,21 @@ -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; +use std::pin::Pin; +use std::task::Context; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; +use pin_project::pin_project; + /// Service for the `then` combinator, chaining a computation onto the end of /// another service. /// /// This is created by the `ServiceExt::then` method. +#[pin_project] pub struct Then { + #[pin] a: A, + #[pin] b: Cell, } @@ -45,12 +52,16 @@ where type Error = B::Error; type Future = ThenFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(ctx)?.is_ready(); + if !this.b.get_pin().poll_ready(ctx)?.is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -59,13 +70,16 @@ where } } +#[pin_project] pub struct ThenFuture where A: Service, B: Service>, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -88,26 +102,33 @@ where A: Service, B: Service>, { - type Item = B::Response; - type Error = B::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll(); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - match self.fut_a.as_mut().expect("bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(Ok(resp))); - self.poll() + loop { + let mut fut_a = this.fut_a.as_mut(); + let mut fut_b = this.fut_b.as_mut(); + + if let Some(fut) = fut_b.as_mut().as_pin_mut() { + return fut.poll(cx); } - Err(err) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(Err(err))); - self.poll() + + match fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { + Poll::Ready(r) => { + fut_a.set(None); + let new_fut = this.b.get_mut().call(r); + fut_b.set(Some(new_fut)); + } + + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), } } } @@ -175,6 +196,7 @@ where } } +#[pin_project] pub struct ThenNewServiceFuture where A: NewService, @@ -185,7 +207,9 @@ where InitError = A::InitError, >, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, a: Option, b: Option, @@ -221,53 +245,59 @@ where InitError = A::InitError, >, { - type Item = Then; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(Then::new( - self.a.take().unwrap(), - self.b.take().unwrap(), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(Then::new( + this.a.take().unwrap(), + this.b.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{err, ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{err, ok, ready, Ready}; + use futures::{Future, Poll}; use std::cell::Cell; use std::rc::Rc; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv1(Rc>); + impl Service for Srv1 { type Request = Result<&'static str, &'static str>; type Response = &'static str; type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.get_mut(); + + this.0.set(this.0.get() + 1); + Poll::Ready(Ok(())) } fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future { @@ -284,11 +314,15 @@ mod tests { type Request = Result<&'static str, ()>; type Response = (&'static str, &'static str); type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.get_mut(); + this.0.set(this.0.get() + 1); + Poll::Ready(Err(())) } fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future { @@ -299,46 +333,44 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone())); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Err(()))); assert_eq!(cnt.get(), 2); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).then(Srv2(cnt)).clone(); - let res = srv.call(Ok("srv1")).poll(); + let res = srv.call(Ok("srv1")).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + assert_eq!(res.unwrap(), (("srv1", "ok"))); - let res = srv.call(Err("srv")).poll(); + let res = srv.call(Err("srv")).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); + assert_eq!(res.unwrap(), (("srv2", "err"))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - let blank = move || Ok::<_, ()>(Srv1(cnt2.clone())); - let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone()))); - if let Async::Ready(mut srv) = new_srv.clone().new_service(&()).poll().unwrap() { - let res = srv.call(Ok("srv1")).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone()))); + let new_srv = blank + .into_new_service() + .then(move || ready(Ok(Srv2(cnt.clone())))); + let mut srv = new_srv.clone().new_service(&()).await.unwrap(); + let res = srv.call(Ok("srv1")).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv1", "ok"))); - let res = srv.call(Err("srv")).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); - } else { - panic!() - } + let res = srv.call(Err("srv")).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv2", "err"))); } } diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 0e93bf72..e4682b63 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,10 +1,13 @@ use std::rc::Rc; use std::sync::Arc; -use futures::{Async, Future, IntoFuture, Poll}; - use crate::transform_err::{TransformFromErr, TransformMapInitErr}; use crate::{IntoNewService, NewService, Service}; +use futures::{Future, Poll}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// The `Transform` trait defines the interface of a Service factory. `Transform` /// is often implemented for middleware, defining how to construct a @@ -32,7 +35,7 @@ pub trait Transform { type InitError; /// The future response value. - type Future: Future; + type Future: Future>; /// Creates and returns a new Service component, asynchronously fn new_transform(&self, service: S) -> Self::Future; @@ -193,19 +196,21 @@ where fn new_service(&self, cfg: &S::Config) -> Self::Future { ApplyTransformFuture { t_cell: self.t.clone(), - fut_a: self.s.new_service(cfg).into_future(), + fut_a: self.s.new_service(cfg), fut_t: None, } } } - +#[pin_project] pub struct ApplyTransformFuture where S: NewService, T: Transform, { + #[pin] fut_a: S::Future, - fut_t: Option<::Future>, + #[pin] + fut_t: Option, t_cell: Rc, } @@ -214,19 +219,21 @@ where S: NewService, T: Transform, { - type Item = T::Transform; - type Error = T::InitError; + type Output = Result; - fn poll(&mut self) -> Poll { - if self.fut_t.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.fut_t = Some(self.t_cell.new_transform(service).into_future()); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if this.fut_t.as_mut().as_pin_mut().is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + this.fut_t.set(Some(this.t_cell.new_transform(service))); } } - if let Some(ref mut fut) = self.fut_t { - fut.poll() + + if let Some(fut) = this.fut_t.as_mut().as_pin_mut() { + fut.poll(cx) } else { - Ok(Async::NotReady) + Poll::Pending } } } diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index dcc3c245..a6940707 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -3,6 +3,10 @@ use std::marker::PhantomData; use futures::{Future, Poll}; use super::Transform; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Transform for the `map_err` combinator, changing the type of a new /// transform's init error. @@ -63,12 +67,13 @@ where } } } - +#[pin_project] pub struct TransformMapInitErrFuture where T: Transform, F: Fn(T::InitError) -> E, { + #[pin] fut: T::Future, f: F, } @@ -78,11 +83,11 @@ where T: Transform, F: Fn(T::InitError) -> E + Clone, { - type Item = T::Transform; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.fut.poll(cx).map_err(this.f) } } @@ -139,11 +144,13 @@ where } } +#[pin_project] pub struct TransformFromErrFuture where T: Transform, E: From, { + #[pin] fut: T::Future, _t: PhantomData, } @@ -153,10 +160,9 @@ where T: Transform, E: From, { - type Item = T::Transform; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(E::from) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx).map_err(E::from) } } From 172b35fae578ffc2653c9d6fba9e122c64bb07a2 Mon Sep 17 00:00:00 2001 From: Christian Battaglia Date: Thu, 7 Nov 2019 18:38:05 -0500 Subject: [PATCH 16/17] update futures-rs and tokio --- actix-service/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index c4908713..af76325a 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -23,9 +23,9 @@ name = "actix_service" path = "src/lib.rs" [dependencies] -futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +futures = "0.3.1" pin-project = "0.4.0-alpha.11" [dev-dependencies] -tokio = "0.2.0-alpha.4" +tokio = "0.2.0-alpha.5" actix-rt = "0.2" \ No newline at end of file From 05ae2585f353a007a532bf6dc7709fa6afb2faa3 Mon Sep 17 00:00:00 2001 From: Kai Ren Date: Sun, 10 Nov 2019 19:43:02 +0200 Subject: [PATCH 17/17] Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' --- actix-rt/Cargo.toml | 2 +- actix-rt/src/arbiter.rs | 4 ++-- actix-threadpool/CHANGES.md | 6 +++++ actix-threadpool/Cargo.toml | 2 +- actix-threadpool/src/lib.rs | 44 ++++++++++++++++++------------------- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index db3782ab..2130afaa 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -18,7 +18,7 @@ name = "actix_rt" path = "src/lib.rs" [dependencies] -actix-threadpool = "0.1.1" +actix-threadpool = "0.2" futures = "0.3.1" # TODO: Replace this with dependency on tokio-runtime once it is ready diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index f85dcc96..dc3d0fdc 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -4,11 +4,11 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot::{channel, Canceled, Sender}; -use futures::{future, Future, Poll, FutureExt, Stream}; +use futures::{future, Future, FutureExt, Stream}; use tokio::runtime::current_thread::spawn; use crate::builder::Builder; diff --git a/actix-threadpool/CHANGES.md b/actix-threadpool/CHANGES.md index 1050c503..aadf9ac9 100644 --- a/actix-threadpool/CHANGES.md +++ b/actix-threadpool/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.2.0] - 2019-??-?? + +### Changed + +* Migrate to `std::future` + ## [0.1.2] - 2019-08-05 ### Changed diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 03e3e027..da6a9133 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-threadpool" -version = "0.1.2" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix thread pool for sync code" keywords = ["actix", "network", "framework", "async", "futures"] diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index e78438f5..5ed6b7ca 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,34 +1,34 @@ //! Thread pool for blocking operations -use std::future::Future; -use std::task::{Poll,Context}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use derive_more::Display; use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; -use std::pin::Pin; -/// Env variable for default cpu pool size +/// Env variable for default cpu pool size. const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; lazy_static::lazy_static! { pub(crate) static ref DEFAULT_POOL: Mutex = { - let default = match std::env::var(ENV_CPU_POOL_VAR) { - Ok(val) => { - if let Ok(val) = val.parse() { - val - } else { - log::error!("Can not parse ACTIX_THREADPOOL value"); - num_cpus::get() * 5 - } - } - Err(_) => num_cpus::get() * 5, - }; + let num = std::env::var(ENV_CPU_POOL_VAR) + .map_err(|_| ()) + .and_then(|val| { + val.parse().map_err(|_| log::warn!( + "Can not parse {} value, using default", + ENV_CPU_POOL_VAR, + )) + }) + .unwrap_or_else(|_| num_cpus::get() * 5); Mutex::new( threadpool::Builder::new() .thread_name("actix-web".to_owned()) - .num_threads(default) + .num_threads(num) .build(), ) }; @@ -40,8 +40,8 @@ thread_local! { }; } -/// Blocking operation execution error -#[derive(Debug, Display)] +/// Error of blocking operation execution being cancelled. +#[derive(Clone, Copy, Debug, Display)] #[display(fmt = "Thread pool is gone")] pub struct Cancelled; @@ -71,13 +71,11 @@ pub struct CpuFuture { } impl Future for CpuFuture { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let rx = unsafe{ self.map_unchecked_mut(|s|&mut s.rx)}; - let res = futures::ready!(rx.poll(cx)); - + let rx = Pin::new(&mut Pin::get_mut(self).rx); + let res = futures::ready!(rx.poll(cx)); Poll::Ready(res.map_err(|_| Cancelled)) } - }