merge master into mio-0.7.3. fix openssl connector

This commit is contained in:
fakeshadow 2020-12-27 10:14:17 +08:00
commit ca89c0df8b
30 changed files with 368 additions and 1862 deletions

View File

@ -33,7 +33,7 @@ uri = ["http"]
[dependencies] [dependencies]
actix-service = "1.0.6" actix-service = "1.0.6"
actix-codec = "0.3.0" actix-codec = "0.3.0"
actix-utils = "2.0.0" actix-utils = "3.0.0"
actix-rt = "1.1.1" actix-rt = "1.1.1"
derive_more = "0.99.2" derive_more = "0.99.2"

View File

@ -157,7 +157,7 @@ where
Self::Error(e) => e.take().unwrap(), Self::Error(e) => e.take().unwrap(),
Self::Accept(acc) => { Self::Accept(acc) => {
let (stream, _) = acc.as_mut().unwrap(); let (stream, _) = acc.as_mut().unwrap();
match ready!(Pin::new(stream).poll_accept(cx)) { match ready!(Pin::new(stream).poll_connect(cx)) {
Ok(()) => { Ok(()) => {
let (stream, connection) = acc.take().unwrap(); let (stream, connection) = acc.take().unwrap();
trace!("SSL Handshake success: {:?}", connection.host()); trace!("SSL Handshake success: {:?}", connection.host());

View File

@ -9,8 +9,16 @@
### Changed ### Changed
* Update `tokio` dependency to `1` * Update `tokio` dependency to `1`
* Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio. * Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio.
* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. These methods would accept a &Self when calling. * Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`.
* Remove `'static` lifetime requirement for `System::run` Remove `'static` lifetime requirement for `System::run` and `Builder::run`.
`Arbiter::spawn` would panic when `System` is not in scope. [#207]
### Fixed
* Fix work load issue by removing `PENDDING` thread local. [#207]
[#207]: https://github.com/actix/actix-net/pull/207
## [1.1.1] - 2020-04-30 ## [1.1.1] - 2020-04-30

View File

@ -17,11 +17,5 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-macros = "0.1.0" actix-macros = "0.1.0"
copyless = "0.1.4"
futures-channel = "0.3.7"
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
smallvec = "1"
tokio = { version = "1", features = ["rt", "net", "signal", "time"] }
[dev-dependencies] tokio = { version = "1", features = ["rt", "net", "signal", "sync", "time"] }
tokio = { version = "1", features = ["full"] }

View File

@ -1,5 +1,5 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell}; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
@ -7,25 +7,24 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, thread}; use std::{fmt, thread};
use copyless::BoxHelper; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender};
use futures_channel::oneshot::{channel, Canceled, Sender}; // use futures_util::stream::FuturesUnordered;
use futures_util::{ // use tokio::task::JoinHandle;
future::{self, FutureExt}, // use tokio::stream::StreamExt;
stream::Stream, use tokio::task::LocalSet;
};
use smallvec::SmallVec;
pub use tokio::task::JoinHandle;
use crate::runtime::Runtime; use crate::runtime::Runtime;
use crate::system::System; use crate::system::System;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); // TODO: Commented out code are for Arbiter::local_join function.
static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new()); // It can be safely removed if this function is not used in actix-*.
static PENDING: RefCell<SmallVec<[JoinHandle<()>; 8]>> = RefCell::new(SmallVec::new()); //
// /// stores join handle for spawned async tasks.
// static HANDLE: RefCell<FuturesUnordered<JoinHandle<()>>> =
// RefCell::new(FuturesUnordered::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
@ -69,14 +68,14 @@ impl Default for Arbiter {
} }
impl Arbiter { impl Arbiter {
pub(crate) fn new_system() -> Self { pub(crate) fn new_system(local: &LocalSet) -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded_channel();
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
Arbiter::spawn(ArbiterController { stop: None, rx });
local.spawn_local(ArbiterController { rx });
arb arb
} }
@ -91,13 +90,14 @@ impl Arbiter {
} }
/// Check if current arbiter is running. /// Check if current arbiter is running.
#[deprecated(note = "Thread local variables for running state of Arbiter is removed")]
pub fn is_running() -> bool { pub fn is_running() -> bool {
RUNNING.with(|cell| cell.get()) false
} }
/// Stop arbiter from continuing it's event loop. /// Stop arbiter from continuing it's event loop.
pub fn stop(&self) { pub fn stop(&self) {
let _ = self.sender.unbounded_send(ArbiterCommand::Stop); let _ = self.sender.send(ArbiterCommand::Stop);
} }
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
@ -106,69 +106,47 @@ impl Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);
let sys = System::current(); let sys = System::current();
let (arb_tx, arb_rx) = unbounded(); let (tx, rx) = unbounded_channel();
let arb_tx2 = arb_tx.clone();
let handle = thread::Builder::new() let handle = thread::Builder::new()
.name(name.clone()) .name(name.clone())
.spawn(move || { .spawn({
let rt = Runtime::new().expect("Can not create Runtime"); let tx = tx.clone();
let arb = Arbiter::with_sender(arb_tx); move || {
let rt = Runtime::new().expect("Can not create Runtime");
let arb = Arbiter::with_sender(tx);
let (stop, stop_rx) = channel(); STORAGE.with(|cell| cell.borrow_mut().clear());
RUNNING.with(|cell| cell.set(true));
STORAGE.with(|cell| cell.borrow_mut().clear());
System::set_current(sys); System::set_current(sys);
// start arbiter controller ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
rt.spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
});
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter // register arbiter
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb)); .send(SystemCommand::RegisterArbiter(id, arb));
// run loop // start arbiter controller
let _ = rt.block_on(stop_rx).unwrap_or(1); // run loop
rt.block_on(ArbiterController { rx });
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()
.sys() .sys()
.unbounded_send(SystemCommand::UnregisterArbiter(id)); .send(SystemCommand::UnregisterArbiter(id));
}
}) })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
}); });
Arbiter { Arbiter {
sender: arb_tx2, sender: tx,
thread_handle: Some(handle), thread_handle: Some(handle),
} }
} }
pub(crate) fn run_system(rt: Option<&Runtime>) {
RUNNING.with(|cell| cell.set(true));
Q.with(|cell| {
let mut v = cell.borrow_mut();
for fut in v.drain(..) {
if let Some(rt) = rt {
rt.spawn(fut);
} else {
tokio::task::spawn_local(fut);
}
}
});
}
pub(crate) fn stop_system() {
RUNNING.with(|cell| cell.set(false));
}
/// Spawn a future on the current thread. This does not create a new Arbiter /// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current /// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread. /// thread.
@ -176,26 +154,12 @@ impl Arbiter {
where where
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
RUNNING.with(move |cell| { // HANDLE.with(|handle| {
if cell.get() { // let handle = handle.borrow();
// Spawn the future on running executor // handle.push(tokio::task::spawn_local(future));
let len = PENDING.with(move |cell| { // });
let mut p = cell.borrow_mut(); // let _ = tokio::task::spawn_local(CleanupPending);
p.push(tokio::task::spawn_local(future)); let _ = tokio::task::spawn_local(future);
p.len()
});
if len > 7 {
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} else {
// Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
Q.with(move |cell| {
cell.borrow_mut().push(Pin::from(Box::alloc().init(future)))
});
}
});
} }
/// Executes a future on the current thread. This does not create a new Arbiter /// Executes a future on the current thread. This does not create a new Arbiter
@ -206,7 +170,9 @@ impl Arbiter {
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static, R: Future<Output = ()> + 'static,
{ {
Arbiter::spawn(future::lazy(|_| f()).flatten()) Arbiter::spawn(async {
f();
})
} }
/// Send a future to the Arbiter's thread, and spawn it. /// Send a future to the Arbiter's thread, and spawn it.
@ -214,9 +180,7 @@ impl Arbiter {
where where
F: Future<Output = ()> + Send + Unpin + 'static, F: Future<Output = ()> + Send + Unpin + 'static,
{ {
let _ = self let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future)));
.sender
.unbounded_send(ArbiterCommand::Execute(Box::new(future)));
} }
/// Send a function to the Arbiter's thread, and execute it. Any result from the function /// Send a function to the Arbiter's thread, and execute it. Any result from the function
@ -227,7 +191,7 @@ impl Arbiter {
{ {
let _ = self let _ = self
.sender .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .send(ArbiterCommand::ExecuteFn(Box::new(move || {
f(); f();
}))); })));
} }
@ -243,8 +207,8 @@ impl Arbiter {
let (tx, rx) = channel(); let (tx, rx) = channel();
let _ = self let _ = self
.sender .sender
.unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { .send(ArbiterCommand::ExecuteFn(Box::new(move || {
if !tx.is_canceled() { if !tx.is_closed() {
let _ = tx.send(f()); let _ = tx.send(f());
} }
}))); })));
@ -313,40 +277,33 @@ impl Arbiter {
/// Returns a future that will be completed once all currently spawned futures /// Returns a future that will be completed once all currently spawned futures
/// have completed. /// have completed.
pub fn local_join() -> impl Future<Output = ()> { #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")]
PENDING.with(move |cell| { pub async fn local_join() {
let current = cell.replace(SmallVec::new()); // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut()));
future::join_all(current).map(|_| ()) // async move {
}) // handle.collect::<Vec<_>>().await;
// }
unimplemented!("Arbiter::local_join function is removed.")
} }
} }
/// Future used for cleaning-up already finished `JoinHandle`s // /// Future used for cleaning-up already finished `JoinHandle`s
/// from the `PENDING` list so the vector doesn't grow indefinitely // /// from the `PENDING` list so the vector doesn't grow indefinitely
struct CleanupPending; // struct CleanupPending;
//
impl Future for CleanupPending { // impl Future for CleanupPending {
type Output = (); // type Output = ();
//
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
PENDING.with(move |cell| { // HANDLE.with(move |handle| {
let mut pending = cell.borrow_mut(); // recycle_join_handle(&mut *handle.borrow_mut(), cx);
let mut i = 0; // });
while i != pending.len() { //
if Pin::new(&mut pending[i]).poll(cx).is_ready() { // Poll::Ready(())
pending.remove(i); // }
} else { // }
i += 1;
}
}
});
Poll::Ready(())
}
}
struct ArbiterController { struct ArbiterController {
stop: Option<Sender<i32>>,
rx: UnboundedReceiver<ArbiterCommand>, rx: UnboundedReceiver<ArbiterCommand>,
} }
@ -368,25 +325,17 @@ impl Future for ArbiterController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match Pin::new(&mut self.rx).poll_next(cx) { match Pin::new(&mut self.rx).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => { ArbiterCommand::Stop => return Poll::Ready(()),
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
let len = PENDING.with(move |cell| { // HANDLE.with(|handle| {
let mut p = cell.borrow_mut(); // let mut handle = handle.borrow_mut();
p.push(tokio::task::spawn_local(fut)); // handle.push(tokio::task::spawn_local(fut));
p.len() // recycle_join_handle(&mut *handle, cx);
}); // });
if len > 7 { tokio::task::spawn_local(fut);
// Before reaching the inline size
tokio::task::spawn_local(CleanupPending);
}
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();
@ -398,6 +347,20 @@ impl Future for ArbiterController {
} }
} }
// fn recycle_join_handle(handle: &mut FuturesUnordered<JoinHandle<()>>, cx: &mut Context<'_>) {
// let _ = Pin::new(&mut *handle).poll_next(cx);
//
// // Try to recycle more join handles and free up memory.
// //
// // this is a guess. The yield limit for FuturesUnordered is 32.
// // So poll an extra 3 times would make the total poll below 128.
// if handle.len() > 64 {
// (0..3).for_each(|_| {
// let _ = Pin::new(&mut *handle).poll_next(cx);
// })
// }
// }
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {
Exit(i32), Exit(i32),
@ -427,7 +390,7 @@ impl Future for SystemArbiter {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match Pin::new(&mut self.commands).poll_next(cx) { match Pin::new(&mut self.commands).poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd { Poll::Ready(Some(cmd)) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {

View File

@ -2,9 +2,8 @@ use std::borrow::Cow;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use futures_channel::mpsc::unbounded; use tokio::sync::mpsc::unbounded_channel;
use futures_channel::oneshot::{channel, Receiver}; use tokio::sync::oneshot::{channel, Receiver};
use futures_util::future::{lazy, FutureExt};
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
@ -73,9 +72,10 @@ impl Builder {
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded_channel();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); let system =
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
// system arbiter // system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
@ -91,18 +91,23 @@ impl Builder {
F: FnOnce(), F: FnOnce(),
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded_channel();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); let rt = Runtime::new().unwrap();
let system = System::construct(
sys_sender,
Arbiter::new_system(rt.local()),
self.stop_on_panic,
);
// system arbiter // system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
let rt = Runtime::new().unwrap();
rt.spawn(arb); rt.spawn(arb);
// init system arbiter and run configuration method // init system arbiter and run configuration method
rt.block_on(lazy(move |_| f())); rt.block_on(async { f() });
SystemRunner { rt, stop, system } SystemRunner { rt, stop, system }
} }
@ -121,27 +126,21 @@ impl AsyncSystemRunner {
let AsyncSystemRunner { stop, .. } = self; let AsyncSystemRunner { stop, .. } = self;
// run loop // run loop
lazy(|_| { async {
Arbiter::run_system(None); match stop.await {
async { Ok(code) => {
let res = match stop.await { if code != 0 {
Ok(code) => { Err(io::Error::new(
if code != 0 { io::ErrorKind::Other,
Err(io::Error::new( format!("Non-zero exit code: {}", code),
io::ErrorKind::Other, ))
format!("Non-zero exit code: {}", code), } else {
)) Ok(())
} else {
Ok(())
}
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), }
}; Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
Arbiter::stop_system();
res
} }
}) }
.flatten()
} }
} }
@ -161,8 +160,7 @@ impl SystemRunner {
let SystemRunner { rt, stop, .. } = self; let SystemRunner { rt, stop, .. } = self;
// run loop // run loop
Arbiter::run_system(Some(&rt)); match rt.block_on(stop) {
let result = match rt.block_on(stop) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -174,19 +172,12 @@ impl SystemRunner {
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}; }
Arbiter::stop_system();
result
} }
/// Execute a future and wait for result. /// Execute a future and wait for result.
pub fn block_on<F, O>(&self, fut: F) -> O #[inline]
where pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
F: Future<Output = O>, self.rt.block_on(fut)
{
Arbiter::run_system(Some(&self.rt));
let res = self.rt.block_on(fut);
Arbiter::stop_system();
res
} }
} }

View File

@ -4,6 +4,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::future::Future;
#[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use actix_macros::{main, test}; pub use actix_macros::{main, test};
@ -22,15 +24,12 @@ pub use self::system::System;
/// # Panics /// # Panics
/// ///
/// This function panics if actix system is not running. /// This function panics if actix system is not running.
#[inline]
pub fn spawn<F>(f: F) pub fn spawn<F>(f: F)
where where
F: std::future::Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
if !System::is_set() { Arbiter::spawn(f)
panic!("System is not running");
}
Arbiter::spawn(f);
} }
/// Asynchronous signal handling /// Asynchronous signal handling

View File

@ -29,6 +29,10 @@ impl Runtime {
}) })
} }
pub(super) fn local(&self) -> &LocalSet {
&self.local
}
/// Spawn a future onto the single-threaded runtime. /// Spawn a future onto the single-threaded runtime.
/// ///
/// See [module level][mod] documentation for more details. /// See [module level][mod] documentation for more details.
@ -43,7 +47,7 @@ impl Runtime {
/// ///
/// # fn dox() { /// # fn dox() {
/// // Create the runtime /// // Create the runtime
/// let mut rt = Runtime::new().unwrap(); /// let rt = Runtime::new().unwrap();
/// ///
/// // Spawn a future onto the runtime /// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|_| { /// rt.spawn(future::lazy(|_| {

View File

@ -3,7 +3,7 @@ use std::future::Future;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures_channel::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
@ -70,7 +70,7 @@ impl System {
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```rust,ignore
/// use tokio::{runtime::Runtime, task::LocalSet}; /// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System; /// use actix_rt::System;
/// use futures_util::future::try_join_all; /// use futures_util::future::try_join_all;
@ -139,7 +139,7 @@ impl System {
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```rust,ignore
/// use tokio::runtime::Runtime; /// use tokio::runtime::Runtime;
/// use actix_rt::System; /// use actix_rt::System;
/// use futures_util::future::try_join_all; /// use futures_util::future::try_join_all;
@ -231,7 +231,7 @@ impl System {
/// Stop the system with a particular exit code. /// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) { pub fn stop_with_code(&self, code: i32) {
let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); let _ = self.sys.send(SystemCommand::Exit(code));
} }
pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> { pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> {

View File

@ -1,19 +1,5 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[test]
fn start_and_stop() {
actix_rt::System::new("start_and_stop").block_on(async move {
assert!(
actix_rt::Arbiter::is_running(),
"System doesn't seem to have started"
);
});
assert!(
!actix_rt::Arbiter::is_running(),
"System doesn't seem to have stopped"
);
}
#[test] #[test]
fn await_for_timer() { fn await_for_timer() {
let time = Duration::from_secs(2); let time = Duration::from_secs(2);
@ -76,42 +62,42 @@ fn join_another_arbiter() {
); );
} }
#[test] // #[test]
fn join_current_arbiter() { // fn join_current_arbiter() {
let time = Duration::from_secs(2); // let time = Duration::from_secs(2);
//
let instant = Instant::now(); // let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move { // actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::sleep(time).await; // tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
actix_rt::Arbiter::local_join().await; // actix_rt::Arbiter::local_join().await;
}); // });
assert!( // assert!(
instant.elapsed() >= time, // instant.elapsed() >= time,
"Join on current arbiter should wait for all spawned futures" // "Join on current arbiter should wait for all spawned futures"
); // );
//
let large_timer = Duration::from_secs(20); // let large_timer = Duration::from_secs(20);
let instant = Instant::now(); // let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move { // actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::sleep(time).await; // tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
let f = actix_rt::Arbiter::local_join(); // let f = actix_rt::Arbiter::local_join();
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::sleep(large_timer).await; // tokio::time::delay_for(large_timer).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
f.await; // f.await;
}); // });
assert!( // assert!(
instant.elapsed() < large_timer, // instant.elapsed() < large_timer,
"local_join should await only for the already spawned futures" // "local_join should await only for the already spawned futures"
); // );
} // }
#[test] #[test]
fn non_static_block_on() { fn non_static_block_on() {

View File

@ -23,7 +23,7 @@ default = []
actix-service = "1.0.6" actix-service = "1.0.6"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-codec = "0.3.0" actix-codec = "0.3.0"
actix-utils = "2.0.0" actix-utils = "3.0.0"
concurrent-queue = "1.2.2" concurrent-queue = "1.2.2"
futures-channel = { version = "0.3.7", default-features = false } futures-channel = { version = "0.3.7", default-features = false }

View File

@ -22,11 +22,14 @@ fn test_bind() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let srv = Server::build() let srv = sys.block_on(lazy(|_| {
.disable_signals() Server::build()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .workers(1)
.unwrap() .disable_signals()
.start(); .bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.start()
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
}); });
@ -46,12 +49,14 @@ fn test_listen() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
Server::build() sys.block_on(lazy(|_| {
.disable_signals() Server::build()
.workers(1) .disable_signals()
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .workers(1)
.unwrap() .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.start(); .unwrap()
.start()
}));
let _ = tx.send(actix_rt::System::current()); let _ = tx.send(actix_rt::System::current());
let _ = sys.run(); let _ = sys.run();
}); });
@ -77,18 +82,20 @@ fn test_start() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let srv: Server = Server::build() let srv = sys.block_on(lazy(|_| {
.backlog(100) Server::build()
.disable_signals() .backlog(100)
.bind("test", addr, move || { .disable_signals()
fn_service(|io: TcpStream| async move { .bind("test", addr, move || {
let mut f = Framed::new(io, BytesCodec); fn_service(|io: TcpStream| async move {
f.send(Bytes::from_static(b"test")).await.unwrap(); let mut f = Framed::new(io, BytesCodec);
Ok::<_, ()>(()) f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
}) })
}) .unwrap()
.unwrap() .start()
.start(); }));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
@ -144,28 +151,30 @@ fn test_configure() {
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let sys = actix_rt::System::new("test"); let sys = actix_rt::System::new("test");
let srv = Server::build() let srv = sys.block_on(lazy(|_| {
.disable_signals() Server::build()
.configure(move |cfg| { .disable_signals()
let num = num.clone(); .configure(move |cfg| {
let lst = net::TcpListener::bind(addr3).unwrap(); let num = num.clone();
cfg.bind("addr1", addr1) let lst = net::TcpListener::bind(addr3).unwrap();
.unwrap() cfg.bind("addr1", addr1)
.bind("addr2", addr2) .unwrap()
.unwrap() .bind("addr2", addr2)
.listen("addr3", lst) .unwrap()
.apply(move |rt| { .listen("addr3", lst)
let num = num.clone(); .apply(move |rt| {
rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); let num = num.clone();
rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
rt.on_start(lazy(move |_| { rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
let _ = num.fetch_add(1, Relaxed); rt.on_start(lazy(move |_| {
})) let _ = num.fetch_add(1, Relaxed);
}) }))
}) })
.unwrap() })
.workers(1) .unwrap()
.start(); .workers(1)
.start()
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run(); let _ = sys.run();
}); });

View File

@ -87,11 +87,14 @@ impl TestServer {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
Server::build() sys.block_on(async {
.listen("test", tcp, factory)? Server::build()
.workers(1) .listen("test", tcp, factory)
.disable_signals() .unwrap()
.start(); .workers(1)
.disable_signals()
.start();
});
tx.send((System::current(), local_addr)).unwrap(); tx.send((System::current(), local_addr)).unwrap();
sys.run() sys.run()

View File

@ -37,7 +37,7 @@ nativetls = ["native-tls", "tokio-native-tls"]
[dependencies] [dependencies]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.3.0" actix-codec = "0.3.0"
actix-utils = "2.0.0" actix-utils = "3.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View File

@ -1,8 +1,15 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
<<<<<<< HEAD
* Upgrade `pin-project` to `1.0`. * Upgrade `pin-project` to `1.0`.
* Update `bytes` dependency to `1`. * Update `bytes` dependency to `1`.
=======
* Use `pin-project-lite` to replace `pin-project`. [#229]
* Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229]
[#229]: https://github.com/actix/actix-net/pull/229
>>>>>>> upstream/master
## 2.0.0 - 2020-08-23 ## 2.0.0 - 2020-08-23
* No changes from beta 1. * No changes from beta 1.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "2.0.0" version = "3.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Various network related services and utilities for the Actix ecosystem." description = "Various network related services and utilities for the Actix ecosystem."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -19,12 +19,11 @@ path = "src/lib.rs"
actix-codec = "0.3.0" actix-codec = "0.3.0"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-service = "1.0.6" actix-service = "1.0.6"
bitflags = "1.2.1"
bytes = "1" futures-core = { version = "0.3.7", default-features = false }
either = "1.5.3"
futures-channel = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false }
futures-util = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
pin-project = "1.0.0" pin-project-lite = "0.2.0"
slab = "0.4"
[dev-dependencies]
futures-util = { version = "0.3.7", default-features = false }

View File

@ -1,129 +0,0 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use slab::Slab;
use crate::task::LocalWaker;
/// Condition allows to notify multiple receivers at the same time
pub struct Condition(Rc<RefCell<Inner>>);
struct Inner {
data: Slab<Option<LocalWaker>>,
}
impl Default for Condition {
fn default() -> Self {
Self::new()
}
}
impl Condition {
pub fn new() -> Condition {
Condition(Rc::new(RefCell::new(Inner { data: Slab::new() })))
}
/// Get condition waiter
pub fn wait(&mut self) -> Waiter {
let token = self.0.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.0.clone(),
}
}
/// Notify all waiters
pub fn notify(&self) {
let inner = self.0.borrow();
for item in inner.data.iter() {
if let Some(waker) = item.1 {
waker.wake();
}
}
}
}
impl Drop for Condition {
fn drop(&mut self) {
self.notify()
}
}
#[must_use = "Waiter do nothing unless polled"]
pub struct Waiter {
token: usize,
inner: Rc<RefCell<Inner>>,
}
impl Clone for Waiter {
fn clone(&self) -> Self {
let token = self.inner.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.inner.clone(),
}
}
}
impl Future for Waiter {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.data.get_unchecked_mut(this.token) };
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
*inner = Some(waker);
Poll::Pending
} else if inner.as_mut().unwrap().register(cx.waker()) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
impl Drop for Waiter {
fn drop(&mut self) {
self.inner.borrow_mut().data.remove(self.token);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
#[actix_rt::test]
async fn test_condition() {
let mut cond = Condition::new();
let mut waiter = cond.wait();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
Poll::Pending
);
cond.notify();
waiter.await;
let mut waiter = cond.wait();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
Poll::Pending
);
let mut waiter2 = waiter.clone();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
Poll::Pending
);
drop(cond);
waiter.await;
waiter2.await;
}
}

View File

@ -1,6 +1,7 @@
use std::cell::Cell; use core::cell::Cell;
use core::task;
use std::rc::Rc; use std::rc::Rc;
use std::task;
use crate::task::LocalWaker; use crate::task::LocalWaker;

View File

@ -2,13 +2,14 @@
#![allow(type_alias_bounds)] #![allow(type_alias_bounds)]
use std::pin::Pin; use core::future::Future;
use std::task::{Context, Poll}; use core::pin::Pin;
use std::{fmt, mem}; use core::task::{Context, Poll};
use core::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};
use futures_util::{future::Future, stream::Stream, FutureExt}; use futures_core::stream::Stream;
use log::debug; use log::debug;
use crate::mpsc; use crate::mpsc;
@ -61,25 +62,28 @@ pub enum Message<T> {
Close, Close,
} }
/// Dispatcher is a future that reads frames from Framed object pin_project_lite::pin_project! {
/// and passes them to the service. /// Dispatcher is a future that reads frames from Framed object
#[pin_project::pin_project] /// and passes them to the service.
pub struct Dispatcher<S, T, U, I> pub struct Dispatcher<S, T, U, I>
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead,
U: Encoder<I> + Decoder, T: AsyncWrite,
I: 'static, U: Encoder<I>,
<U as Encoder<I>>::Error: std::fmt::Debug, U: Decoder,
{ I: 'static,
service: S, <U as Encoder<I>>::Error: fmt::Debug,
state: State<S, U, I>, {
#[pin] service: S,
framed: Framed<T, U>, state: State<S, U, I>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>, #[pin]
tx: mpsc::Sender<Result<Message<I>, S::Error>>, framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
} }
enum State<S: Service, U: Encoder<I> + Decoder, I> { enum State<S: Service, U: Encoder<I> + Decoder, I> {
@ -114,8 +118,8 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>, U: Decoder + Encoder<I>,
I: 'static, I: 'static,
<U as Decoder>::Error: std::fmt::Debug, <U as Decoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug, <U as Encoder<I>>::Error: fmt::Debug,
{ {
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self { pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@ -178,7 +182,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>, U: Decoder + Encoder<I>,
I: 'static, I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug, <U as Encoder<I>>::Error: fmt::Debug,
{ {
loop { loop {
let this = self.as_mut().project(); let this = self.as_mut().project();
@ -198,9 +202,11 @@ where
}; };
let tx = this.tx.clone(); let tx = this.tx.clone();
actix_rt::spawn(this.service.call(item).map(move |item| { let fut = this.service.call(item);
actix_rt::spawn(async move {
let item = fut.await;
let _ = tx.send(item.map(Message::Item)); let _ = tx.send(item.map(Message::Item));
})); });
} }
Poll::Pending => return false, Poll::Pending => return false,
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
@ -220,7 +226,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>, U: Decoder + Encoder<I>,
I: 'static, I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug, <U as Encoder<I>>::Error: fmt::Debug,
{ {
loop { loop {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
@ -271,8 +277,8 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>, U: Decoder + Encoder<I>,
I: 'static, I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug, <U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug, <U as Decoder>::Error: fmt::Debug,
{ {
type Output = Result<(), DispatcherError<S::Error, U, I>>; type Output = Result<(), DispatcherError<S::Error, U, I>>;

View File

@ -1,153 +0,0 @@
//! Contains `Either` service and related types and functions.
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory};
use futures_util::{future, future::Future, ready};
/// Combine two different service types into a single type.
///
/// Both services must be of the same request, response, and error types.
/// `EitherService` is useful for handling conditional branching in service
/// middleware to different inner service types.
pub struct EitherService<A, B> {
left: A,
right: B,
}
impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
fn clone(&self) -> Self {
EitherService {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
impl<A, B> Service for EitherService<A, B>
where
A: Service,
B: Service<Response = A::Response, Error = A::Error>,
{
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let left = self.left.poll_ready(cx)?;
let right = self.right.poll_ready(cx)?;
if left.is_ready() && right.is_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: either::Either<A::Request, B::Request>) -> Self::Future {
match req {
either::Either::Left(req) => future::Either::Left(self.left.call(req)),
either::Either::Right(req) => future::Either::Right(self.right.call(req)),
}
}
}
/// Combine two different new service types into a single service.
pub struct Either<A, B> {
left: A,
right: B,
}
impl<A, B> Either<A, B> {
pub fn new(left: A, right: B) -> Either<A, B>
where
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
Either { left, right }
}
}
impl<A, B> ServiceFactory for Either<A, B>
where
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type InitError = A::InitError;
type Config = A::Config;
type Service = EitherService<A::Service, B::Service>;
type Future = EitherNewService<A, B>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
EitherNewService {
left: None,
right: None,
left_fut: self.left.new_service(cfg.clone()),
right_fut: self.right.new_service(cfg),
}
}
}
impl<A: Clone, B: Clone> Clone for Either<A, B> {
fn clone(&self) -> Self {
Self {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
#[doc(hidden)]
#[pin_project::pin_project]
pub struct EitherNewService<A: ServiceFactory, B: ServiceFactory> {
left: Option<A::Service>,
right: Option<B::Service>,
#[pin]
left_fut: A::Future,
#[pin]
right_fut: B::Future,
}
impl<A, B> Future for EitherNewService<A, B>
where
A: ServiceFactory,
B: ServiceFactory<Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Output = Result<EitherService<A::Service, B::Service>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.left.is_none() {
*this.left = Some(ready!(this.left_fut.poll(cx))?);
}
if this.right.is_none() {
*this.right = Some(ready!(this.right_fut.poll(cx))?);
}
if this.left.is_some() && this.right.is_some() {
Poll::Ready(Ok(EitherService {
left: this.left.take().unwrap(),
right: this.right.take().unwrap(),
}))
} else {
Poll::Pending
}
}
}

View File

@ -1,169 +0,0 @@
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
use super::counter::{Counter, CounterGuard};
/// InFlight - new service for service that can limit number of in-flight
/// async requests.
///
/// Default number of in-flight requests is 15
pub struct InFlight {
max_inflight: usize,
}
impl InFlight {
pub fn new(max: usize) -> Self {
Self { max_inflight: max }
}
}
impl Default for InFlight {
fn default() -> Self {
Self::new(15)
}
}
impl<S> Transform<S> for InFlight
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = Infallible;
type Transform = InFlightService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InFlightService::new(self.max_inflight, service))
}
}
pub struct InFlightService<S> {
count: Counter,
service: S,
}
impl<S> InFlightService<S>
where
S: Service,
{
pub fn new<U>(max: usize, service: U) -> Self
where
U: IntoService<S>,
{
Self {
count: Counter::new(max),
service: service.into_service(),
}
}
}
impl<T> Service for InFlightService<T>
where
T: Service,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.service.poll_ready(cx)?.is_pending() {
Poll::Pending
} else if !self.count.available(cx) {
log::trace!("InFlight limit exceeded");
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: T::Request) -> Self::Future {
InFlightServiceResponse {
fut: self.service.call(req),
_guard: self.count.get(),
}
}
}
#[doc(hidden)]
#[pin_project::pin_project]
pub struct InFlightServiceResponse<T: Service> {
#[pin]
fut: T::Future,
_guard: CounterGuard,
}
impl<T: Service> Future for InFlightServiceResponse<T> {
type Output = Result<T::Response, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::{apply, fn_factory, Service, ServiceFactory};
use futures_util::future::{lazy, ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
impl Service for SleepService {
type Request = ();
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::sleep(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}
}
#[actix_rt::test]
async fn test_transform() {
let wait_time = Duration::from_millis(50);
let mut srv = InFlightService::new(1, SleepService(wait_time));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
#[actix_rt::test]
async fn test_new_transform() {
let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
let mut srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
}

View File

@ -1,125 +0,0 @@
use std::convert::Infallible;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, Ready};
use super::time::{LowResTime, LowResTimeService};
pub struct KeepAlive<R, E, F> {
f: F,
ka: Duration,
time: LowResTime,
_t: PhantomData<(R, E)>,
}
impl<R, E, F> KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
pub fn new(ka: Duration, time: LowResTime, f: F) -> Self {
KeepAlive {
f,
ka,
time,
_t: PhantomData,
}
}
}
impl<R, E, F> Clone for KeepAlive<R, E, F>
where
F: Clone,
{
fn clone(&self) -> Self {
KeepAlive {
f: self.f.clone(),
ka: self.ka,
time: self.time.clone(),
_t: PhantomData,
}
}
}
impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
type Request = R;
type Response = R;
type Error = E;
type Config = ();
type Service = KeepAliveService<R, E, F>;
type InitError = Infallible;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(KeepAliveService::new(
self.ka,
self.time.timer(),
self.f.clone(),
))
}
}
pub struct KeepAliveService<R, E, F> {
f: F,
ka: Duration,
time: LowResTimeService,
delay: Pin<Box<Sleep>>,
expire: Instant,
_t: PhantomData<(R, E)>,
}
impl<R, E, F> KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self {
let expire = Instant::from_std(time.now() + ka);
KeepAliveService {
f,
ka,
time,
expire,
delay: Box::pin(sleep_until(expire)),
_t: PhantomData,
}
}
}
impl<R, E, F> Service for KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
type Request = R;
type Response = R;
type Error = E;
type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.delay.as_mut().poll(cx) {
Poll::Ready(_) => {
let now = Instant::from_std(self.time.now());
if self.expire <= now {
Poll::Ready(Err((self.f)()))
} else {
self.delay.as_mut().reset(self.expire);
let _ = Pin::new(&mut self.delay).poll(cx);
Poll::Ready(Ok(()))
}
}
Poll::Pending => Poll::Ready(Ok(())),
}
}
fn call(&mut self, req: R) -> Self::Future {
self.expire = Instant::from_std(self.time.now() + self.ka);
ok(req)
}
}

View File

@ -5,16 +5,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
pub mod condition;
pub mod counter; pub mod counter;
pub mod dispatcher; pub mod dispatcher;
pub mod either;
pub mod inflight;
pub mod keepalive;
pub mod mpsc; pub mod mpsc;
pub mod oneshot;
pub mod order;
pub mod stream;
pub mod task; pub mod task;
pub mod time;
pub mod timeout; pub mod timeout;

View File

@ -1,15 +1,16 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue. //! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::any::Any; use core::any::Any;
use std::cell::RefCell; use core::cell::RefCell;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::error::Error; use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_sink::Sink; use futures_sink::Sink;
use futures_util::stream::Stream;
use crate::task::LocalWaker; use crate::task::LocalWaker;

View File

@ -1,316 +0,0 @@
//! A one-shot, futures-aware channel.
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
pub use futures_channel::oneshot::Canceled;
use slab::Slab;
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
rx_task: LocalWaker::new(),
}));
let tx = Sender {
inner: inner.clone(),
};
let rx = Receiver { inner };
(tx, rx)
}
/// Creates a new futures-aware, pool of one-shot's.
pub fn pool<T>() -> Pool<T> {
Pool(Rc::new(RefCell::new(Slab::new())))
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
#[derive(Debug)]
pub struct Sender<T> {
inner: Rc<RefCell<Inner<T>>>,
}
/// A future representing the completion of a computation happening elsewhere in
/// memory.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
inner: Rc<RefCell<Inner<T>>>,
}
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
rx_task: LocalWaker,
}
impl<T> Sender<T> {
/// Completes this oneshot with a successful result.
///
/// This function will consume `self` and indicate to the other end, the
/// `Receiver`, that the error provided is the result of the computation this
/// represents.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
if Rc::strong_count(&self.inner) == 2 {
let mut inner = self.inner.borrow_mut();
inner.value = Some(val);
inner.rx_task.wake();
Ok(())
} else {
Err(val)
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
Rc::strong_count(&self.inner) == 1
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.borrow().rx_task.wake();
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = this.inner.borrow_mut().value.take() {
return Poll::Ready(Ok(val));
}
// Check if sender is dropped and return error if it is.
if Rc::strong_count(&this.inner) == 1 {
Poll::Ready(Err(Canceled))
} else {
this.inner.borrow().rx_task.register(cx.waker());
Poll::Pending
}
}
}
/// Futures-aware, pool of one-shot's.
pub struct Pool<T>(Rc<RefCell<Slab<PoolInner<T>>>>);
bitflags::bitflags! {
pub struct Flags: u8 {
const SENDER = 0b0000_0001;
const RECEIVER = 0b0000_0010;
}
}
#[derive(Debug)]
struct PoolInner<T> {
flags: Flags,
value: Option<T>,
waker: LocalWaker,
}
impl<T> Pool<T> {
pub fn channel(&mut self) -> (PSender<T>, PReceiver<T>) {
let token = self.0.borrow_mut().insert(PoolInner {
flags: Flags::all(),
value: None,
waker: LocalWaker::default(),
});
(
PSender {
token,
inner: self.0.clone(),
},
PReceiver {
token,
inner: self.0.clone(),
},
)
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Pool(self.0.clone())
}
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
#[derive(Debug)]
pub struct PSender<T> {
token: usize,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
/// A future representing the completion of a computation happening elsewhere in
/// memory.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct PReceiver<T> {
token: usize,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
// The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {}
impl<T> PSender<T> {
/// Completes this oneshot with a successful result.
///
/// This function will consume `self` and indicate to the other end, the
/// `Receiver`, that the error provided is the result of the computation this
/// represents.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
let mut inner = self.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(self.token) };
if inner.flags.contains(Flags::RECEIVER) {
inner.value = Some(val);
inner.waker.wake();
Ok(())
} else {
Err(val)
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
!unsafe { self.inner.borrow().get_unchecked(self.token) }
.flags
.contains(Flags::RECEIVER)
}
}
impl<T> Drop for PSender<T> {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::RECEIVER) {
inner_token.waker.wake();
inner_token.flags.remove(Flags::SENDER);
} else {
inner.remove(self.token);
}
}
}
impl<T> Drop for PReceiver<T> {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::SENDER) {
inner_token.flags.remove(Flags::RECEIVER);
} else {
inner.remove(self.token);
}
}
}
impl<T> Future for PReceiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(this.token) };
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.value.take() {
return Poll::Ready(Ok(val));
}
// Check if sender is dropped and return error if it is.
if !inner.flags.contains(Flags::SENDER) {
Poll::Ready(Err(Canceled))
} else {
inner.waker.register(cx.waker());
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
#[actix_rt::test]
async fn test_oneshot() {
let (tx, rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = channel();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
assert!(tx.send("test").is_err());
let (tx, rx) = channel::<&'static str>();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
}
#[actix_rt::test]
async fn test_pool() {
let (tx, rx) = pool().channel();
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = pool().channel();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
assert!(tx.send("test").is_err());
let (tx, rx) = pool::<&'static str>().channel();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = pool::<&'static str>().channel();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = pool::<&'static str>().channel();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
}
}

View File

@ -1,283 +0,0 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
use crate::oneshot;
use crate::task::LocalWaker;
struct Record<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
tx: oneshot::Sender<Result<I, E>>,
}
/// Timeout error
pub enum InOrderError<E> {
/// Service error
Service(E),
/// Service call dropped
Disconnected,
}
impl<E> From<E> for InOrderError<E> {
fn from(err: E) -> Self {
InOrderError::Service(err)
}
}
impl<E: fmt::Debug> fmt::Debug for InOrderError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e),
InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"),
}
}
}
impl<E: fmt::Display> fmt::Display for InOrderError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InOrderError::Service(e) => e.fmt(f),
InOrderError::Disconnected => write!(f, "InOrder service disconnected"),
}
}
}
/// InOrder - The service will yield responses as they become available,
/// in the order that their originating requests were submitted to the service.
pub struct InOrder<S> {
_t: PhantomData<S>,
}
impl<S> InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
pub fn new() -> Self {
Self { _t: PhantomData }
}
pub fn service(service: S) -> InOrderService<S> {
InOrderService::new(service)
}
}
impl<S> Default for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Transform<S> for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type Transform = InOrderService<S>;
type InitError = Infallible;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InOrderService::new(service))
}
}
pub struct InOrderService<S: Service> {
service: S,
waker: Rc<LocalWaker>,
acks: VecDeque<Record<S::Response, S::Error>>,
}
impl<S> InOrderService<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
pub fn new<U>(service: U) -> Self
where
U: IntoService<S>,
{
Self {
service: service.into_service(),
acks: VecDeque::new(),
waker: Rc::new(LocalWaker::new()),
}
}
}
impl<S> Service for InOrderService<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// poll_ready could be called from different task
self.waker.register(cx.waker());
// check acks
while !self.acks.is_empty() {
let rec = self.acks.front_mut().unwrap();
match Pin::new(&mut rec.rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let rec = self.acks.pop_front().unwrap();
let _ = rec.tx.send(res);
}
Poll::Pending => break,
Poll::Ready(Err(oneshot::Canceled)) => {
return Poll::Ready(Err(InOrderError::Disconnected))
}
}
}
// check nested service
if self
.service
.poll_ready(cx)
.map_err(InOrderError::Service)?
.is_pending()
{
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, request: S::Request) -> Self::Future {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 });
let waker = self.waker.clone();
let fut = self.service.call(request);
actix_rt::spawn(async move {
let res = fut.await;
waker.wake();
let _ = tx1.send(res);
});
InOrderServiceResponse { rx: rx2 }
}
}
#[doc(hidden)]
pub struct InOrderServiceResponse<S: Service> {
rx: oneshot::Receiver<Result<S::Response, S::Error>>,
}
impl<S: Service> Future for InOrderServiceResponse<S> {
type Output = Result<S::Response, InOrderError<S::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)),
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())),
Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)),
}
}
}
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::Service;
use futures_channel::oneshot;
use futures_util::future::{lazy, poll_fn, FutureExt, LocalBoxFuture};
struct Srv;
impl Service for Srv {
type Request = oneshot::Receiver<usize>;
type Response = usize;
type Error = ();
type Future = LocalBoxFuture<'static, Result<usize, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
req.map(|res| res.map_err(|_| ())).boxed_local()
}
}
#[actix_rt::test]
async fn test_in_order() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx_stop, rx_stop) = oneshot::channel();
let h = std::thread::spawn(move || {
let rx1 = rx1;
let rx2 = rx2;
let rx3 = rx3;
let tx_stop = tx_stop;
actix_rt::System::new("test").block_on(async {
let mut srv = InOrderService::new(Srv);
let _ = lazy(|cx| srv.poll_ready(cx)).await;
let res1 = srv.call(rx1);
let res2 = srv.call(rx2);
let res3 = srv.call(rx3);
actix_rt::spawn(async move {
poll_fn(|cx| {
let _ = srv.poll_ready(cx);
Poll::<()>::Pending
})
.await;
});
assert_eq!(res1.await.unwrap(), 1);
assert_eq!(res2.await.unwrap(), 2);
assert_eq!(res3.await.unwrap(), 3);
let _ = tx_stop.send(());
actix_rt::System::current().stop();
});
});
let _ = tx3.send(3);
std::thread::sleep(Duration::from_millis(50));
let _ = tx2.send(2);
let _ = tx1.send(1);
let _ = rx_stop.await;
let _ = h.join();
}
}

View File

@ -1,76 +0,0 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service};
use futures_util::{stream::Stream, FutureExt};
use crate::mpsc;
#[pin_project::pin_project]
pub struct Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
#[pin]
stream: S,
service: T,
err_rx: mpsc::Receiver<T::Error>,
err_tx: mpsc::Sender<T::Error>,
}
impl<S, T> Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
pub fn new<F>(stream: S, service: F) -> Self
where
F: IntoService<T>,
{
let (err_tx, err_rx) = mpsc::channel();
Dispatcher {
err_rx,
err_tx,
stream,
service: service.into_service(),
}
}
}
impl<S, T> Future for Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
type Output = Result<(), T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) {
return Poll::Ready(Err(e));
}
loop {
return match this.service.poll_ready(cx)? {
Poll::Ready(_) => match this.stream.poll_next(cx) {
Poll::Ready(Some(item)) => {
let stop = this.err_tx.clone();
actix_rt::spawn(this.service.call(item).map(move |res| {
if let Err(e) = res {
let _ = stop.send(e);
}
}));
this = self.as_mut().project();
continue;
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Ok(())),
},
Poll::Pending => Poll::Pending,
};
}
}
}

View File

@ -1,7 +1,7 @@
use std::cell::UnsafeCell; use core::cell::UnsafeCell;
use std::marker::PhantomData; use core::fmt;
use std::task::Waker; use core::marker::PhantomData;
use std::{fmt, rc}; use core::task::Waker;
/// A synchronization primitive for task wakeup. /// A synchronization primitive for task wakeup.
/// ///
@ -23,7 +23,8 @@ use std::{fmt, rc};
#[derive(Default)] #[derive(Default)]
pub struct LocalWaker { pub struct LocalWaker {
pub(crate) waker: UnsafeCell<Option<Waker>>, pub(crate) waker: UnsafeCell<Option<Waker>>,
_t: PhantomData<rc::Rc<()>>, // mark LocalWaker as a !Send type.
_t: PhantomData<*const ()>,
} }
impl LocalWaker { impl LocalWaker {

View File

@ -1,225 +0,0 @@
use std::cell::RefCell;
use std::convert::Infallible;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use actix_rt::time::sleep;
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, ready, FutureExt, Ready};
#[derive(Clone, Debug)]
pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
Inner {
resolution,
current: None,
}
}
}
impl LowResTime {
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
}
pub fn timer(&self) -> LowResTimeService {
LowResTimeService(self.0.clone())
}
}
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
}
}
impl ServiceFactory for LowResTime {
type Request = ();
type Response = Instant;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(self.timer())
}
}
#[derive(Clone, Debug)]
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn with(resolution: Duration) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> Instant {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = Instant::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
now
}
}
}
impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(self.now())
}
}
#[derive(Clone, Debug)]
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
SystemTimeInner {
resolution,
current: None,
}
}
}
#[derive(Clone, Debug)]
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(sleep(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
now
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, SystemTime};
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value.
#[actix_rt::test]
async fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = SystemTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test]
async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = SystemTimeService::with(resolution);
let first_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
sleep(wait_time).await;
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= wait_time);
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
let first_time = time_service.now();
sleep(wait_time).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
}
}

View File

@ -2,15 +2,14 @@
//! //!
//! If the response does not complete within the specified timeout, the response //! If the response does not complete within the specified timeout, the response
//! will be aborted. //! will be aborted.
use std::future::Future; use core::future::Future;
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use std::{fmt, time}; use core::{fmt, time};
use actix_rt::time::{sleep, Sleep}; use actix_rt::time::{sleep, Sleep};
use actix_service::{IntoService, Service, Transform}; use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
/// Applies a timeout to requests. /// Applies a timeout to requests.
#[derive(Debug)] #[derive(Debug)]
@ -87,13 +86,33 @@ where
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type Transform = TimeoutService<S>; type Transform = TimeoutService<S>;
type InitError = E; type InitError = E;
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = TimeoutFuture<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
ok(TimeoutService { let service = TimeoutService {
service, service,
timeout: self.timeout, timeout: self.timeout,
}) };
TimeoutFuture {
service: Some(service),
_err: PhantomData,
}
}
}
pub struct TimeoutFuture<T, E> {
service: Option<T>,
_err: PhantomData<E>,
}
impl<T, E> Unpin for TimeoutFuture<T, E> {}
impl<T, E> Future for TimeoutFuture<T, E> {
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(self.get_mut().service.take().unwrap()))
} }
} }
@ -140,14 +159,15 @@ where
} }
} }
/// `TimeoutService` response future pin_project_lite::pin_project! {
#[pin_project::pin_project] /// `TimeoutService` response future
#[derive(Debug)] #[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> { pub struct TimeoutServiceResponse<T: Service> {
#[pin] #[pin]
fut: T::Future, fut: T::Future,
#[pin] #[pin]
sleep: Sleep, sleep: Sleep,
}
} }
impl<T> Future for TimeoutServiceResponse<T> impl<T> Future for TimeoutServiceResponse<T>
@ -160,17 +180,15 @@ where
let this = self.project(); let this = self.project();
// First, try polling the future // First, try polling the future
match this.fut.poll(cx) { if let Poll::Ready(res) = this.fut.poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), return match res {
Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), Ok(v) => Poll::Ready(Ok(v)),
Poll::Pending => {} Err(e) => Poll::Ready(Err(TimeoutError::Service(e))),
};
} }
// Now check the sleep // Now check the sleep
match this.sleep.poll(cx) { this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout))
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
}
} }
} }