From 1bfbf228bd50f13bf39994ce237e31a952960696 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 29 Mar 2021 10:21:47 +0100 Subject: [PATCH] use new crates --- .cargo/config.toml | 2 + actix-codec/CHANGES.md | 3 + actix-codec/Cargo.toml | 4 + .../src/dispatcher.rs | 31 ++- actix-codec/src/lib.rs | 1 + actix-macros/Cargo.toml | 2 +- actix-service/src/macros.rs | 1 + actix-tracing/Cargo.toml | 2 +- actix-utils/CHANGES.md | 5 +- actix-utils/Cargo.toml | 2 +- actix-utils/src/counter.rs | 4 +- actix-utils/src/future/mod.rs | 7 + actix-utils/src/{ => future}/poll_fn.rs | 12 +- actix-utils/src/future/ready.rs | 122 +++++++++ actix-utils/src/lib.rs | 7 +- actix-utils/src/mpsc.rs | 253 ------------------ actix-utils/src/task.rs | 65 ----- actix-utils/src/timeout.rs | 30 ++- local-channel/CHANGES.md | 2 +- local-channel/Cargo.toml | 7 +- local-waker/CHANGES.md | 2 +- local-waker/Cargo.toml | 7 +- 22 files changed, 208 insertions(+), 363 deletions(-) create mode 100644 .cargo/config.toml rename {actix-utils => actix-codec}/src/dispatcher.rs (94%) create mode 100644 actix-utils/src/future/mod.rs rename actix-utils/src/{ => future}/poll_fn.rs (77%) create mode 100644 actix-utils/src/future/ready.rs delete mode 100644 actix-utils/src/mpsc.rs delete mode 100644 actix-utils/src/task.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..6b518f97 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[alias] +lint = "hack --clean-per-run clippy --workspace --tests" diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index f6102cbf..f08cd8c8 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Move dispatcher from actix-utils. [#???] + +[#???]: https://github.com/actix/actix-net/pull/??? ## 0.4.0-beta.1 - 2020-12-28 diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 95a24764..5778c4b6 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -16,10 +16,14 @@ name = "actix_codec" path = "src/lib.rs" [dependencies] +actix-service = "2.0.0-beta.5" +actix-rt = "2.0.0" + bitflags = "1.2.1" bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } +local-channel = "0.1.0" log = "0.4" pin-project-lite = "0.2" tokio = "1" diff --git a/actix-utils/src/dispatcher.rs b/actix-codec/src/dispatcher.rs similarity index 94% rename from actix-utils/src/dispatcher.rs rename to actix-codec/src/dispatcher.rs index 94ac9971..6aea7e22 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-codec/src/dispatcher.rs @@ -2,23 +2,31 @@ #![allow(type_alias_bounds)] -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; -use core::{fmt, mem}; +use core::{ + fmt, + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, +}; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; use futures_core::stream::Stream; +use local_channel::mpsc; use log::debug; use pin_project_lite::pin_project; -use crate::mpsc; +use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; /// Framed transport errors pub enum DispatcherError + Decoder, I> { + /// Inner service error. Service(E), + + /// Frame encoding error. Encoder(>::Error), + + /// Frame decoding error. Decoder(::Error), } @@ -58,14 +66,17 @@ where } } +/// Message type wrapper for signalling end of message stream. pub enum Message { + /// Message item. Item(T), + + /// Signal from service to flush all messages and stop processing. Close, } pin_project! { - /// Dispatcher is a future that reads frames from Framed object - /// and passes them to the service. + /// A future that reads frames from a [`Framed`] object and passes them to a [`Service`]. pub struct Dispatcher where S: Service<::Item, Response = I>, @@ -130,6 +141,7 @@ where ::Error: fmt::Debug, >::Error: fmt::Debug, { + /// Create new `Dispatcher`. pub fn new(framed: Framed, service: F) -> Self where F: IntoService::Item>, @@ -188,6 +200,7 @@ where &mut self.framed } + /// Read from framed object. fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where S: Service<::Item, Response = I>, @@ -231,7 +244,7 @@ where } } - /// write to framed object + /// Write to framed object. fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where S: Service<::Item, Response = I>, diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index dec30ba6..c3c36342 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -13,6 +13,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod bcodec; +pub mod dispatcher; mod framed; pub use self::bcodec::BytesCodec; diff --git a/actix-macros/Cargo.toml b/actix-macros/Cargo.toml index 0555f990..1664fc27 100644 --- a/actix-macros/Cargo.toml +++ b/actix-macros/Cargo.toml @@ -19,5 +19,5 @@ syn = { version = "^1", features = ["full"] } [dev-dependencies] actix-rt = "2.0.0" -futures-util = { version = "0.3", default-features = false } +futures-util = { version = "0.3.7", default-features = false } trybuild = "1" diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index 4a083895..1656468e 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -147,6 +147,7 @@ mod tests { forward_ready!(inner); + #[allow(clippy::unit_arg)] fn call(&self, req: ()) -> Self::Future { self.inner.call(req) } diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index 7f043f4b..992edbf4 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-service = "2.0.0-beta.5" -futures-util = { version = "0.3.4", default-features = false } +futures-util = { version = "0.3.7", default-features = false } tracing = "0.1" tracing-futures = "0.2" diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 8d97b741..85d34274 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,7 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx -* Add `async fn mpsc::Receiver::recv`. [#286] +* Moved `mpsc` to own crate `local-channel`. [#???] +* Moved `task::LocalWaker` to own crate `local-waker`. [#???] +* Expose `future` mod with `ready` and `poll_fn` helpers. [#???] * `SendError` inner field is now public. [#286] * Rename `Dispatcher::{get_sink => tx}`. [#286] * Rename `Dispatcher::{get_ref => service}`. [#286] @@ -10,6 +12,7 @@ * Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286] [#286]: https://github.com/actix/actix-net/pull/286 +[#???]: https://github.com/actix/actix-net/pull/??? ## 3.0.0-beta.2 - 2021-02-06 diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 9c21dd1b..2384c191 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -16,12 +16,12 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.5" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } +local-waker = "0.1" log = "0.4" pin-project-lite = "0.2.0" diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 0b5984d2..026db0af 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,9 +1,11 @@ +//! Task-notifying counter. + use core::cell::Cell; use core::task; use std::rc::Rc; -use crate::task::LocalWaker; +use local_waker::LocalWaker; #[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number diff --git a/actix-utils/src/future/mod.rs b/actix-utils/src/future/mod.rs new file mode 100644 index 00000000..0ad84ec7 --- /dev/null +++ b/actix-utils/src/future/mod.rs @@ -0,0 +1,7 @@ +//! Asynchronous values. + +mod poll_fn; +mod ready; + +pub use self::poll_fn::{poll_fn, PollFn}; +pub use self::ready::{err, ok, ready, Ready}; diff --git a/actix-utils/src/poll_fn.rs b/actix-utils/src/future/poll_fn.rs similarity index 77% rename from actix-utils/src/poll_fn.rs rename to actix-utils/src/future/poll_fn.rs index 2180f4a4..8061d42f 100644 --- a/actix-utils/src/poll_fn.rs +++ b/actix-utils/src/future/poll_fn.rs @@ -3,20 +3,20 @@ use core::{ fmt, future::Future, - task::{self, Poll}, + task::{Context, Poll}, }; use std::pin::Pin; /// Create a future driven by the provided function that receives a task context. -pub(crate) fn poll_fn(f: F) -> PollFn +pub fn poll_fn(f: F) -> PollFn where - F: FnMut(&mut task::Context<'_>) -> Poll, + F: FnMut(&mut Context<'_>) -> Poll, { PollFn { f } } /// A Future driven by the inner function. -pub(crate) struct PollFn { +pub struct PollFn { f: F, } @@ -30,11 +30,11 @@ impl fmt::Debug for PollFn { impl Future for PollFn where - F: FnMut(&mut task::Context<'_>) -> task::Poll, + F: FnMut(&mut Context<'_>) -> Poll, { type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { (self.f)(cx) } } diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs new file mode 100644 index 00000000..be2ee146 --- /dev/null +++ b/actix-utils/src/future/ready.rs @@ -0,0 +1,122 @@ +//! When MSRV is 1.48, replace with `core::future::Ready` and `core::future::ready()`. + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`ready`](ready()) function. +/// +/// Panic will occur if polled more than once. +/// +/// # Examples +/// ``` +/// use actix_utils::future::ready; +/// +/// // async +/// # async fn run() { +/// let a = ready(1); +/// assert_eq!(a.await, 1); +/// # } +/// +/// // sync +/// let a = ready(1); +/// assert_eq!(a.into_inner(), 1); +/// ``` +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Ready { + val: Option, +} + +impl Ready { + /// Unwraps the value from this immediately ready future. + #[inline] + pub fn into_inner(mut self) -> T { + self.val.take().unwrap() + } +} + +impl Unpin for Ready {} + +impl Future for Ready { + type Output = T; + + #[inline] + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let val = self.val.take().expect("Ready polled after completion"); + Poll::Ready(val) + } +} + +/// Creates a future that is immediately ready with a value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::ready; +/// +/// # async fn run() { +/// let a = ready(1); +/// assert_eq!(a.await, 1); +/// # } +/// +/// // sync +/// let a = ready(1); +/// assert_eq!(a.into_inner(), 1); +/// ``` +pub fn ready(val: T) -> Ready { + Ready { val: Some(val) } +} + +/// Create a future that is immediately ready with a success value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::ok; +/// +/// # async fn run() { +/// let a = ok::<_, ()>(1); +/// assert_eq!(a.await, Ok(1)); +/// # } +/// ``` +pub fn ok(val: T) -> Ready> { + Ready { val: Some(Ok(val)) } +} + +/// Create a future that is immediately ready with an error value. +/// +/// # Examples +/// ```no_run +/// use actix_utils::future::err; +/// +/// # async fn run() { +/// let a = err::<(), _>(1); +/// assert_eq!(a.await, Err(1)); +/// # } +/// ``` +pub fn err(err: E) -> Ready> { + Ready { + val: Some(Err(err)), + } +} + +#[cfg(test)] +mod tests { + use futures_util::task::noop_waker; + + use super::*; + + #[test] + #[should_panic] + fn multiple_poll_panics() { + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + + let mut ready = ready(1); + assert_eq!(Pin::new(&mut ready).poll(&mut cx), Poll::Ready(1)); + + // panic! + let _ = Pin::new(&mut ready).poll(&mut cx); + } +} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 6658cba8..911610fd 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -6,10 +6,5 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] pub mod counter; -pub mod dispatcher; -pub mod mpsc; -mod poll_fn; -pub mod task; +pub mod future; pub mod timeout; - -use self::poll_fn::poll_fn; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs deleted file mode 100644 index 9c7a5a0e..00000000 --- a/actix-utils/src/mpsc.rs +++ /dev/null @@ -1,253 +0,0 @@ -//! A multi-producer, single-consumer, futures-aware, FIFO queue. - -use core::{ - cell::RefCell, - fmt, - pin::Pin, - task::{Context, Poll}, -}; - -use std::{collections::VecDeque, error::Error, rc::Rc}; - -use futures_core::stream::Stream; -use futures_sink::Sink; - -use crate::{poll_fn, task::LocalWaker}; - -/// Creates a unbounded in-memory channel with buffered storage. -/// -/// [Sender]s and [Receiver]s are `!Send`. -pub fn channel() -> (Sender, Receiver) { - let shared = Rc::new(RefCell::new(Shared { - has_receiver: true, - buffer: VecDeque::new(), - blocked_recv: LocalWaker::new(), - })); - - let sender = Sender { - shared: shared.clone(), - }; - - let receiver = Receiver { shared }; - - (sender, receiver) -} - -#[derive(Debug)] -struct Shared { - buffer: VecDeque, - blocked_recv: LocalWaker, - has_receiver: bool, -} - -/// The transmission end of a channel. -/// -/// This is created by the `channel` function. -#[derive(Debug)] -pub struct Sender { - shared: Rc>>, -} - -impl Unpin for Sender {} - -impl Sender { - /// Sends the provided message along this channel. - pub fn send(&self, item: T) -> Result<(), SendError> { - let mut shared = self.shared.borrow_mut(); - - if !shared.has_receiver { - // receiver was dropped - return Err(SendError(item)); - }; - - shared.buffer.push_back(item); - shared.blocked_recv.wake(); - - Ok(()) - } - - /// Closes the sender half. - /// - /// This prevents any further messages from being sent on the channel, by any sender, while - /// still enabling the receiver to drain messages that are already buffered. - pub fn close(&mut self) { - self.shared.borrow_mut().has_receiver = false; - } -} - -impl Clone for Sender { - fn clone(&self) -> Self { - Sender { - shared: self.shared.clone(), - } - } -} - -impl Sink for Sender { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError> { - self.send(item) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -impl Drop for Sender { - fn drop(&mut self) { - let count = Rc::strong_count(&self.shared); - let shared = self.shared.borrow_mut(); - - // check is last sender is about to drop - if shared.has_receiver && count == 2 { - // Wake up receiver as its stream has ended - shared.blocked_recv.wake(); - } - } -} - -/// The receiving end of a channel which implements the `Stream` trait. -/// -/// This is created by the [`channel`] function. -#[derive(Debug)] -pub struct Receiver { - shared: Rc>>, -} - -impl Receiver { - /// Receive the next value. - /// - /// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or - /// when all senders have been dropped and, therefore, no more values can ever be sent though - /// this channel. - pub async fn recv(&mut self) -> Option { - let mut this = Pin::new(self); - poll_fn(|cx| this.as_mut().poll_next(cx)).await - } - - /// Create an associated [Sender]. - pub fn sender(&self) -> Sender { - Sender { - shared: self.shared.clone(), - } - } -} - -impl Unpin for Receiver {} - -impl Stream for Receiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut shared = self.shared.borrow_mut(); - - if Rc::strong_count(&self.shared) == 1 { - // All senders have been dropped, so drain the buffer and end the stream. - return Poll::Ready(shared.buffer.pop_front()); - } - - if let Some(msg) = shared.buffer.pop_front() { - Poll::Ready(Some(msg)) - } else { - shared.blocked_recv.register(cx.waker()); - Poll::Pending - } - } -} - -impl Drop for Receiver { - fn drop(&mut self) { - let mut shared = self.shared.borrow_mut(); - shared.buffer.clear(); - shared.has_receiver = false; - } -} - -/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed. -pub struct SendError(pub T); - -impl SendError { - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - self.0 - } -} - -impl fmt::Debug for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_tuple("SendError").field(&"...").finish() - } -} - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "send failed because receiver is gone") - } -} - -impl Error for SendError {} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - use futures_util::{stream::Stream, StreamExt}; - - #[actix_rt::test] - async fn test_mpsc() { - let (tx, mut rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test"); - - let tx2 = tx.clone(); - tx2.send("test2").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test2"); - - assert_eq!( - lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, - Poll::Pending - ); - drop(tx2); - assert_eq!( - lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, - Poll::Pending - ); - drop(tx); - assert_eq!(rx.next().await, None); - - let (tx, rx) = channel(); - tx.send("test").unwrap(); - drop(rx); - assert!(tx.send("test").is_err()); - - let (mut tx, _) = channel(); - let tx2 = tx.clone(); - tx.close(); - assert!(tx.send("test").is_err()); - assert!(tx2.send("test").is_err()); - } - - #[actix_rt::test] - async fn test_recv() { - let (tx, mut rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.recv().await.unwrap(), "test"); - drop(tx); - - let (tx, mut rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.recv().await.unwrap(), "test"); - drop(tx); - assert!(rx.recv().await.is_none()); - } -} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs deleted file mode 100644 index 507bfc14..00000000 --- a/actix-utils/src/task.rs +++ /dev/null @@ -1,65 +0,0 @@ -use core::{cell::Cell, fmt, marker::PhantomData, task::Waker}; - -/// A synchronization primitive for task wakeup. -/// -/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can -/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying -/// task to wake up. This is useful in scenarios where a computation completes in another task and -/// wants to notify the consumer, but the consumer is in the process of being migrated to a new -/// logical task. -/// -/// Consumers should call [`register`] before checking the result of a computation and producers -/// should call [`wake`] after producing the computation (this differs from the usual `thread::park` -/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in -/// a no-op. -/// -/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`]. -/// -/// [`register`]: LocalWaker::register -/// [`wake`]: LocalWaker::wake -#[derive(Default)] -pub struct LocalWaker { - pub(crate) waker: Cell>, - // mark LocalWaker as a !Send type. - _phantom: PhantomData<*const ()>, -} - -impl LocalWaker { - /// Creates a new, empty `LocalWaker`. - pub fn new() -> Self { - LocalWaker::default() - } - - /// Registers the waker to be notified on calls to `wake`. - /// - /// Returns `true` if waker was registered before. - #[inline] - pub fn register(&self, waker: &Waker) -> bool { - let last_waker = self.waker.replace(Some(waker.clone())); - last_waker.is_some() - } - - /// Calls `wake` on the last `Waker` passed to `register`. - /// - /// If `register` has not been called yet, then this does nothing. - #[inline] - pub fn wake(&self) { - if let Some(waker) = self.take() { - waker.wake(); - } - } - - /// Returns the last `Waker` passed to `register`, so that the user can wake it. - /// - /// If a waker has not been registered, this returns `None`. - #[inline] - pub fn take(&self) -> Option { - self.waker.take() - } -} - -impl fmt::Debug for LocalWaker { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "LocalWaker") - } -} diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index f13c7ffa..affcd62b 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -2,11 +2,14 @@ //! //! If the response does not complete within the specified timeout, the response will be aborted. -use core::future::Future; -use core::marker::PhantomData; -use core::pin::Pin; -use core::task::{Context, Poll}; -use core::{fmt, time}; +use core::{ + fmt, + future::Future, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, + time, +}; use actix_rt::time::{sleep, Sleep}; use actix_service::{IntoService, Service, Transform}; @@ -19,11 +22,12 @@ pub struct Timeout { _t: PhantomData, } -/// Timeout error +/// Service or timeout error. pub enum TimeoutError { - /// Service error + /// Inner service error. Service(E), - /// Service call timeout + + /// Timeout during service call. Timeout, } @@ -45,8 +49,8 @@ impl fmt::Debug for TimeoutError { impl fmt::Display for TimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TimeoutError::Service(e) => e.fmt(f), - TimeoutError::Timeout => write!(f, "Service call timeout"), + TimeoutError::Service(err) => err.fmt(f), + TimeoutError::Timeout => write!(f, "timeout during service call"), } } } @@ -160,7 +164,7 @@ where } pin_project! { - /// `TimeoutService` response future + /// `TimeoutService` response future. #[derive(Debug)] pub struct TimeoutServiceResponse where @@ -182,7 +186,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - // First, try polling the future + // first try polling the future if let Poll::Ready(res) = this.fut.poll(cx) { return match res { Ok(v) => Poll::Ready(Ok(v)), @@ -190,7 +194,7 @@ where }; } - // Now check the sleep + // now check the sleep this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout)) } } diff --git a/local-channel/CHANGES.md b/local-channel/CHANGES.md index 34286df2..cccf9609 100644 --- a/local-channel/CHANGES.md +++ b/local-channel/CHANGES.md @@ -3,5 +3,5 @@ ## Unreleased - 2021-xx-xx -## 0.1.0 - 2021-03-29 +## 0.1.1 - 2021-03-29 * Move local mpsc channel to it's own crate. diff --git a/local-channel/Cargo.toml b/local-channel/Cargo.toml index 93bacf08..a9d3691e 100644 --- a/local-channel/Cargo.toml +++ b/local-channel/Cargo.toml @@ -1,8 +1,11 @@ [package] name = "local-channel" -version = "0.1.0" +version = "0.1.1" description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue" -authors = ["Rob Ede "] +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] edition = "2018" license = "MIT OR Apache-2.0" repository = "https://github.com/actix/actix-net.git" diff --git a/local-waker/CHANGES.md b/local-waker/CHANGES.md index 2b8d3e30..edb5aa3e 100644 --- a/local-waker/CHANGES.md +++ b/local-waker/CHANGES.md @@ -3,5 +3,5 @@ ## Unreleased - 2021-xx-xx -## 0.1.0 - 2021-03-29 +## 0.1.1 - 2021-03-29 * Move `LocalWaker` to it's own crate. diff --git a/local-waker/Cargo.toml b/local-waker/Cargo.toml index 166454bf..df1f9ab8 100644 --- a/local-waker/Cargo.toml +++ b/local-waker/Cargo.toml @@ -1,8 +1,11 @@ [package] name = "local-waker" -version = "0.1.0" +version = "0.1.1" description = "A synchronization primitive for thread-local task wakeup" -authors = ["Rob Ede "] +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] keywords = ["waker", "local", "futures", "no-std"] repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/local-waker"