mirror of https://github.com/fafhrd91/actix-net
use new crates
This commit is contained in:
parent
14d13bca5d
commit
1bfbf228bd
|
@ -0,0 +1,2 @@
|
|||
[alias]
|
||||
lint = "hack --clean-per-run clippy --workspace --tests"
|
|
@ -1,6 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Move dispatcher from actix-utils. [#???]
|
||||
|
||||
[#???]: https://github.com/actix/actix-net/pull/???
|
||||
|
||||
|
||||
## 0.4.0-beta.1 - 2020-12-28
|
||||
|
|
|
@ -16,10 +16,14 @@ name = "actix_codec"
|
|||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-service = "2.0.0-beta.5"
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
bitflags = "1.2.1"
|
||||
bytes = "1"
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
local-channel = "0.1.0"
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2"
|
||||
tokio = "1"
|
||||
|
|
|
@ -2,23 +2,31 @@
|
|||
|
||||
#![allow(type_alias_bounds)]
|
||||
|
||||
use core::future::Future;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{fmt, mem};
|
||||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||
use actix_service::{IntoService, Service};
|
||||
use futures_core::stream::Stream;
|
||||
use local_channel::mpsc;
|
||||
use log::debug;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::mpsc;
|
||||
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
|
||||
|
||||
/// Framed transport errors
|
||||
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
|
||||
/// Inner service error.
|
||||
Service(E),
|
||||
|
||||
/// Frame encoding error.
|
||||
Encoder(<U as Encoder<I>>::Error),
|
||||
|
||||
/// Frame decoding error.
|
||||
Decoder(<U as Decoder>::Error),
|
||||
}
|
||||
|
||||
|
@ -58,14 +66,17 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Message type wrapper for signalling end of message stream.
|
||||
pub enum Message<T> {
|
||||
/// Message item.
|
||||
Item(T),
|
||||
|
||||
/// Signal from service to flush all messages and stop processing.
|
||||
Close,
|
||||
}
|
||||
|
||||
pin_project! {
|
||||
/// Dispatcher is a future that reads frames from Framed object
|
||||
/// and passes them to the service.
|
||||
/// A future that reads frames from a [`Framed`] object and passes them to a [`Service`].
|
||||
pub struct Dispatcher<S, T, U, I>
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
|
@ -130,6 +141,7 @@ where
|
|||
<U as Decoder>::Error: fmt::Debug,
|
||||
<U as Encoder<I>>::Error: fmt::Debug,
|
||||
{
|
||||
/// Create new `Dispatcher`.
|
||||
pub fn new<F>(framed: Framed<T, U>, service: F) -> Self
|
||||
where
|
||||
F: IntoService<S, <U as Decoder>::Item>,
|
||||
|
@ -188,6 +200,7 @@ where
|
|||
&mut self.framed
|
||||
}
|
||||
|
||||
/// Read from framed object.
|
||||
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
||||
|
@ -231,7 +244,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// write to framed object
|
||||
/// Write to framed object.
|
||||
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
|
||||
where
|
||||
S: Service<<U as Decoder>::Item, Response = I>,
|
|
@ -13,6 +13,7 @@
|
|||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||
|
||||
mod bcodec;
|
||||
pub mod dispatcher;
|
||||
mod framed;
|
||||
|
||||
pub use self::bcodec::BytesCodec;
|
||||
|
|
|
@ -19,5 +19,5 @@ syn = { version = "^1", features = ["full"] }
|
|||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
futures-util = { version = "0.3", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
trybuild = "1"
|
||||
|
|
|
@ -147,6 +147,7 @@ mod tests {
|
|||
|
||||
forward_ready!(inner);
|
||||
|
||||
#[allow(clippy::unit_arg)]
|
||||
fn call(&self, req: ()) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ path = "src/lib.rs"
|
|||
[dependencies]
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-util = { version = "0.3.4", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
tracing = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Add `async fn mpsc::Receiver::recv`. [#286]
|
||||
* Moved `mpsc` to own crate `local-channel`. [#???]
|
||||
* Moved `task::LocalWaker` to own crate `local-waker`. [#???]
|
||||
* Expose `future` mod with `ready` and `poll_fn` helpers. [#???]
|
||||
* `SendError` inner field is now public. [#286]
|
||||
* Rename `Dispatcher::{get_sink => tx}`. [#286]
|
||||
* Rename `Dispatcher::{get_ref => service}`. [#286]
|
||||
|
@ -10,6 +12,7 @@
|
|||
* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286]
|
||||
|
||||
[#286]: https://github.com/actix/actix-net/pull/286
|
||||
[#???]: https://github.com/actix/actix-net/pull/???
|
||||
|
||||
|
||||
## 3.0.0-beta.2 - 2021-02-06
|
||||
|
|
|
@ -16,12 +16,12 @@ name = "actix_utils"
|
|||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
local-waker = "0.1"
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2.0"
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
//! Task-notifying counter.
|
||||
|
||||
use core::cell::Cell;
|
||||
use core::task;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
use crate::task::LocalWaker;
|
||||
use local_waker::LocalWaker;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Simple counter with ability to notify task on reaching specific number
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
//! Asynchronous values.
|
||||
|
||||
mod poll_fn;
|
||||
mod ready;
|
||||
|
||||
pub use self::poll_fn::{poll_fn, PollFn};
|
||||
pub use self::ready::{err, ok, ready, Ready};
|
|
@ -3,20 +3,20 @@
|
|||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
task::{self, Poll},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Create a future driven by the provided function that receives a task context.
|
||||
pub(crate) fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
pub fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> Poll<T>,
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
PollFn { f }
|
||||
}
|
||||
|
||||
/// A Future driven by the inner function.
|
||||
pub(crate) struct PollFn<F> {
|
||||
pub struct PollFn<F> {
|
||||
f: F,
|
||||
}
|
||||
|
||||
|
@ -30,11 +30,11 @@ impl<F> fmt::Debug for PollFn<F> {
|
|||
|
||||
impl<F, T> Future for PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> task::Poll<T>,
|
||||
F: FnMut(&mut Context<'_>) -> Poll<T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
(self.f)(cx)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
//! When MSRV is 1.48, replace with `core::future::Ready` and `core::future::ready()`.
|
||||
|
||||
use core::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Future for the [`ready`](ready()) function.
|
||||
///
|
||||
/// Panic will occur if polled more than once.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// use actix_utils::future::ready;
|
||||
///
|
||||
/// // async
|
||||
/// # async fn run() {
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.await, 1);
|
||||
/// # }
|
||||
///
|
||||
/// // sync
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.into_inner(), 1);
|
||||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Ready<T> {
|
||||
val: Option<T>,
|
||||
}
|
||||
|
||||
impl<T> Ready<T> {
|
||||
/// Unwraps the value from this immediately ready future.
|
||||
#[inline]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
self.val.take().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for Ready<T> {}
|
||||
|
||||
impl<T> Future for Ready<T> {
|
||||
type Output = T;
|
||||
|
||||
#[inline]
|
||||
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<T> {
|
||||
let val = self.val.take().expect("Ready polled after completion");
|
||||
Poll::Ready(val)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a future that is immediately ready with a value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::ready;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.await, 1);
|
||||
/// # }
|
||||
///
|
||||
/// // sync
|
||||
/// let a = ready(1);
|
||||
/// assert_eq!(a.into_inner(), 1);
|
||||
/// ```
|
||||
pub fn ready<T>(val: T) -> Ready<T> {
|
||||
Ready { val: Some(val) }
|
||||
}
|
||||
|
||||
/// Create a future that is immediately ready with a success value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::ok;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = ok::<_, ()>(1);
|
||||
/// assert_eq!(a.await, Ok(1));
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn ok<T, E>(val: T) -> Ready<Result<T, E>> {
|
||||
Ready { val: Some(Ok(val)) }
|
||||
}
|
||||
|
||||
/// Create a future that is immediately ready with an error value.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_utils::future::err;
|
||||
///
|
||||
/// # async fn run() {
|
||||
/// let a = err::<(), _>(1);
|
||||
/// assert_eq!(a.await, Err(1));
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
|
||||
Ready {
|
||||
val: Some(Err(err)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures_util::task::noop_waker;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn multiple_poll_panics() {
|
||||
let waker = noop_waker();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
|
||||
let mut ready = ready(1);
|
||||
assert_eq!(Pin::new(&mut ready).poll(&mut cx), Poll::Ready(1));
|
||||
|
||||
// panic!
|
||||
let _ = Pin::new(&mut ready).poll(&mut cx);
|
||||
}
|
||||
}
|
|
@ -6,10 +6,5 @@
|
|||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||
|
||||
pub mod counter;
|
||||
pub mod dispatcher;
|
||||
pub mod mpsc;
|
||||
mod poll_fn;
|
||||
pub mod task;
|
||||
pub mod future;
|
||||
pub mod timeout;
|
||||
|
||||
use self::poll_fn::poll_fn;
|
||||
|
|
|
@ -1,253 +0,0 @@
|
|||
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
fmt,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use std::{collections::VecDeque, error::Error, rc::Rc};
|
||||
|
||||
use futures_core::stream::Stream;
|
||||
use futures_sink::Sink;
|
||||
|
||||
use crate::{poll_fn, task::LocalWaker};
|
||||
|
||||
/// Creates a unbounded in-memory channel with buffered storage.
|
||||
///
|
||||
/// [Sender]s and [Receiver]s are `!Send`.
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
let shared = Rc::new(RefCell::new(Shared {
|
||||
has_receiver: true,
|
||||
buffer: VecDeque::new(),
|
||||
blocked_recv: LocalWaker::new(),
|
||||
}));
|
||||
|
||||
let sender = Sender {
|
||||
shared: shared.clone(),
|
||||
};
|
||||
|
||||
let receiver = Receiver { shared };
|
||||
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Shared<T> {
|
||||
buffer: VecDeque<T>,
|
||||
blocked_recv: LocalWaker,
|
||||
has_receiver: bool,
|
||||
}
|
||||
|
||||
/// The transmission end of a channel.
|
||||
///
|
||||
/// This is created by the `channel` function.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender<T> {
|
||||
shared: Rc<RefCell<Shared<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Unpin for Sender<T> {}
|
||||
|
||||
impl<T> Sender<T> {
|
||||
/// Sends the provided message along this channel.
|
||||
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
|
||||
if !shared.has_receiver {
|
||||
// receiver was dropped
|
||||
return Err(SendError(item));
|
||||
};
|
||||
|
||||
shared.buffer.push_back(item);
|
||||
shared.blocked_recv.wake();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Closes the sender half.
|
||||
///
|
||||
/// This prevents any further messages from being sent on the channel, by any sender, while
|
||||
/// still enabling the receiver to drain messages that are already buffered.
|
||||
pub fn close(&mut self) {
|
||||
self.shared.borrow_mut().has_receiver = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Sender<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Sender {
|
||||
shared: self.shared.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sink<T> for Sender<T> {
|
||||
type Error = SendError<T>;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
|
||||
self.send(item)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Sender<T> {
|
||||
fn drop(&mut self) {
|
||||
let count = Rc::strong_count(&self.shared);
|
||||
let shared = self.shared.borrow_mut();
|
||||
|
||||
// check is last sender is about to drop
|
||||
if shared.has_receiver && count == 2 {
|
||||
// Wake up receiver as its stream has ended
|
||||
shared.blocked_recv.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The receiving end of a channel which implements the `Stream` trait.
|
||||
///
|
||||
/// This is created by the [`channel`] function.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver<T> {
|
||||
shared: Rc<RefCell<Shared<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
/// Receive the next value.
|
||||
///
|
||||
/// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or
|
||||
/// when all senders have been dropped and, therefore, no more values can ever be sent though
|
||||
/// this channel.
|
||||
pub async fn recv(&mut self) -> Option<T> {
|
||||
let mut this = Pin::new(self);
|
||||
poll_fn(|cx| this.as_mut().poll_next(cx)).await
|
||||
}
|
||||
|
||||
/// Create an associated [Sender].
|
||||
pub fn sender(&self) -> Sender<T> {
|
||||
Sender {
|
||||
shared: self.shared.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Unpin for Receiver<T> {}
|
||||
|
||||
impl<T> Stream for Receiver<T> {
|
||||
type Item = T;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
|
||||
if Rc::strong_count(&self.shared) == 1 {
|
||||
// All senders have been dropped, so drain the buffer and end the stream.
|
||||
return Poll::Ready(shared.buffer.pop_front());
|
||||
}
|
||||
|
||||
if let Some(msg) = shared.buffer.pop_front() {
|
||||
Poll::Ready(Some(msg))
|
||||
} else {
|
||||
shared.blocked_recv.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Receiver<T> {
|
||||
fn drop(&mut self) {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
shared.buffer.clear();
|
||||
shared.has_receiver = false;
|
||||
}
|
||||
}
|
||||
|
||||
/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed.
|
||||
pub struct SendError<T>(pub T);
|
||||
|
||||
impl<T> SendError<T> {
|
||||
/// Returns the message that was attempted to be sent but failed.
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for SendError<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt.debug_tuple("SendError").field(&"...").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Display for SendError<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(fmt, "send failed because receiver is gone")
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Error for SendError<T> {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_util::future::lazy;
|
||||
use futures_util::{stream::Stream, StreamExt};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_mpsc() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.next().await.unwrap(), "test");
|
||||
|
||||
let tx2 = tx.clone();
|
||||
tx2.send("test2").unwrap();
|
||||
assert_eq!(rx.next().await.unwrap(), "test2");
|
||||
|
||||
assert_eq!(
|
||||
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||
Poll::Pending
|
||||
);
|
||||
drop(tx2);
|
||||
assert_eq!(
|
||||
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
|
||||
Poll::Pending
|
||||
);
|
||||
drop(tx);
|
||||
assert_eq!(rx.next().await, None);
|
||||
|
||||
let (tx, rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
drop(rx);
|
||||
assert!(tx.send("test").is_err());
|
||||
|
||||
let (mut tx, _) = channel();
|
||||
let tx2 = tx.clone();
|
||||
tx.close();
|
||||
assert!(tx.send("test").is_err());
|
||||
assert!(tx2.send("test").is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_recv() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.recv().await.unwrap(), "test");
|
||||
drop(tx);
|
||||
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.recv().await.unwrap(), "test");
|
||||
drop(tx);
|
||||
assert!(rx.recv().await.is_none());
|
||||
}
|
||||
}
|
|
@ -1,65 +0,0 @@
|
|||
use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
|
||||
|
||||
/// A synchronization primitive for task wakeup.
|
||||
///
|
||||
/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can
|
||||
/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying
|
||||
/// task to wake up. This is useful in scenarios where a computation completes in another task and
|
||||
/// wants to notify the consumer, but the consumer is in the process of being migrated to a new
|
||||
/// logical task.
|
||||
///
|
||||
/// Consumers should call [`register`] before checking the result of a computation and producers
|
||||
/// should call [`wake`] after producing the computation (this differs from the usual `thread::park`
|
||||
/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in
|
||||
/// a no-op.
|
||||
///
|
||||
/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`].
|
||||
///
|
||||
/// [`register`]: LocalWaker::register
|
||||
/// [`wake`]: LocalWaker::wake
|
||||
#[derive(Default)]
|
||||
pub struct LocalWaker {
|
||||
pub(crate) waker: Cell<Option<Waker>>,
|
||||
// mark LocalWaker as a !Send type.
|
||||
_phantom: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl LocalWaker {
|
||||
/// Creates a new, empty `LocalWaker`.
|
||||
pub fn new() -> Self {
|
||||
LocalWaker::default()
|
||||
}
|
||||
|
||||
/// Registers the waker to be notified on calls to `wake`.
|
||||
///
|
||||
/// Returns `true` if waker was registered before.
|
||||
#[inline]
|
||||
pub fn register(&self, waker: &Waker) -> bool {
|
||||
let last_waker = self.waker.replace(Some(waker.clone()));
|
||||
last_waker.is_some()
|
||||
}
|
||||
|
||||
/// Calls `wake` on the last `Waker` passed to `register`.
|
||||
///
|
||||
/// If `register` has not been called yet, then this does nothing.
|
||||
#[inline]
|
||||
pub fn wake(&self) {
|
||||
if let Some(waker) = self.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the last `Waker` passed to `register`, so that the user can wake it.
|
||||
///
|
||||
/// If a waker has not been registered, this returns `None`.
|
||||
#[inline]
|
||||
pub fn take(&self) -> Option<Waker> {
|
||||
self.waker.take()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for LocalWaker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "LocalWaker")
|
||||
}
|
||||
}
|
|
@ -2,11 +2,14 @@
|
|||
//!
|
||||
//! If the response does not complete within the specified timeout, the response will be aborted.
|
||||
|
||||
use core::future::Future;
|
||||
use core::marker::PhantomData;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{fmt, time};
|
||||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time,
|
||||
};
|
||||
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use actix_service::{IntoService, Service, Transform};
|
||||
|
@ -19,11 +22,12 @@ pub struct Timeout<E = ()> {
|
|||
_t: PhantomData<E>,
|
||||
}
|
||||
|
||||
/// Timeout error
|
||||
/// Service or timeout error.
|
||||
pub enum TimeoutError<E> {
|
||||
/// Service error
|
||||
/// Inner service error.
|
||||
Service(E),
|
||||
/// Service call timeout
|
||||
|
||||
/// Timeout during service call.
|
||||
Timeout,
|
||||
}
|
||||
|
||||
|
@ -45,8 +49,8 @@ impl<E: fmt::Debug> fmt::Debug for TimeoutError<E> {
|
|||
impl<E: fmt::Display> fmt::Display for TimeoutError<E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
TimeoutError::Service(e) => e.fmt(f),
|
||||
TimeoutError::Timeout => write!(f, "Service call timeout"),
|
||||
TimeoutError::Service(err) => err.fmt(f),
|
||||
TimeoutError::Timeout => write!(f, "timeout during service call"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -160,7 +164,7 @@ where
|
|||
}
|
||||
|
||||
pin_project! {
|
||||
/// `TimeoutService` response future
|
||||
/// `TimeoutService` response future.
|
||||
#[derive(Debug)]
|
||||
pub struct TimeoutServiceResponse<S, Req>
|
||||
where
|
||||
|
@ -182,7 +186,7 @@ where
|
|||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
// First, try polling the future
|
||||
// first try polling the future
|
||||
if let Poll::Ready(res) = this.fut.poll(cx) {
|
||||
return match res {
|
||||
Ok(v) => Poll::Ready(Ok(v)),
|
||||
|
@ -190,7 +194,7 @@ where
|
|||
};
|
||||
}
|
||||
|
||||
// Now check the sleep
|
||||
// now check the sleep
|
||||
this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,5 +3,5 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 0.1.0 - 2021-03-29
|
||||
## 0.1.1 - 2021-03-29
|
||||
* Move local mpsc channel to it's own crate.
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
[package]
|
||||
name = "local-channel"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue"
|
||||
authors = ["Rob Ede <robjtede@icloud.com>"]
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
|
|
|
@ -3,5 +3,5 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 0.1.0 - 2021-03-29
|
||||
## 0.1.1 - 2021-03-29
|
||||
* Move `LocalWaker` to it's own crate.
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
[package]
|
||||
name = "local-waker"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
description = "A synchronization primitive for thread-local task wakeup"
|
||||
authors = ["Rob Ede <robjtede@icloud.com>"]
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
keywords = ["waker", "local", "futures", "no-std"]
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/local-waker"
|
||||
|
|
Loading…
Reference in New Issue