diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index b8b23299..4cea0f31 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -33,7 +33,7 @@ uri = ["http"] [dependencies] actix-service = "1.0.6" actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-utils = "3.0.0" actix-rt = "1.1.1" derive_more = "0.99.2" diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index d5050307..3435b094 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -157,7 +157,7 @@ where Self::Error(e) => e.take().unwrap(), Self::Accept(acc) => { let (stream, _) = acc.as_mut().unwrap(); - match ready!(Pin::new(stream).poll_accept(cx)) { + match ready!(Pin::new(stream).poll_connect(cx)) { Ok(()) => { let (stream, connection) = acc.take().unwrap(); trace!("SSL Handshake success: {:?}", connection.host()); diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 3e6221f3..3290a664 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -9,8 +9,16 @@ ### Changed * Update `tokio` dependency to `1` * Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio. -* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. These methods would accept a &Self when calling. -* Remove `'static` lifetime requirement for `System::run` +* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. + Remove `'static` lifetime requirement for `System::run` and `Builder::run`. + `Arbiter::spawn` would panic when `System` is not in scope. [#207] + +### Fixed + +* Fix work load issue by removing `PENDDING` thread local. [#207] + +[#207]: https://github.com/actix/actix-net/pull/207 + ## [1.1.1] - 2020-04-30 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index d849cee9..eff206f2 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -17,11 +17,5 @@ path = "src/lib.rs" [dependencies] actix-macros = "0.1.0" -copyless = "0.1.4" -futures-channel = "0.3.7" -futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } -smallvec = "1" -tokio = { version = "1", features = ["rt", "net", "signal", "time"] } -[dev-dependencies] -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "net", "signal", "sync", "time"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 24d3d125..7aae7cd2 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,5 +1,5 @@ use std::any::{Any, TypeId}; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; use std::future::Future; use std::pin::Pin; @@ -7,25 +7,24 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{fmt, thread}; -use copyless::BoxHelper; -use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures_channel::oneshot::{channel, Canceled, Sender}; -use futures_util::{ - future::{self, FutureExt}, - stream::Stream, -}; -use smallvec::SmallVec; - -pub use tokio::task::JoinHandle; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender}; +// use futures_util::stream::FuturesUnordered; +// use tokio::task::JoinHandle; +// use tokio::stream::StreamExt; +use tokio::task::LocalSet; use crate::runtime::Runtime; use crate::system::System; thread_local!( static ADDR: RefCell> = RefCell::new(None); - static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>>> = RefCell::new(Vec::new()); - static PENDING: RefCell; 8]>> = RefCell::new(SmallVec::new()); + // TODO: Commented out code are for Arbiter::local_join function. + // It can be safely removed if this function is not used in actix-*. + // + // /// stores join handle for spawned async tasks. + // static HANDLE: RefCell>> = + // RefCell::new(FuturesUnordered::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -69,14 +68,14 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system() -> Self { - let (tx, rx) = unbounded(); + pub(crate) fn new_system(local: &LocalSet) -> Self { + let (tx, rx) = unbounded_channel(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - RUNNING.with(|cell| cell.set(false)); STORAGE.with(|cell| cell.borrow_mut().clear()); - Arbiter::spawn(ArbiterController { stop: None, rx }); + + local.spawn_local(ArbiterController { rx }); arb } @@ -91,13 +90,14 @@ impl Arbiter { } /// Check if current arbiter is running. + #[deprecated(note = "Thread local variables for running state of Arbiter is removed")] pub fn is_running() -> bool { - RUNNING.with(|cell| cell.get()) + false } /// Stop arbiter from continuing it's event loop. pub fn stop(&self) { - let _ = self.sender.unbounded_send(ArbiterCommand::Stop); + let _ = self.sender.send(ArbiterCommand::Stop); } /// Spawn new thread and run event loop in spawned thread. @@ -106,69 +106,47 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); - let (arb_tx, arb_rx) = unbounded(); - let arb_tx2 = arb_tx.clone(); + let (tx, rx) = unbounded_channel(); let handle = thread::Builder::new() .name(name.clone()) - .spawn(move || { - let rt = Runtime::new().expect("Can not create Runtime"); - let arb = Arbiter::with_sender(arb_tx); + .spawn({ + let tx = tx.clone(); + move || { + let rt = Runtime::new().expect("Can not create Runtime"); + let arb = Arbiter::with_sender(tx); - let (stop, stop_rx) = channel(); - RUNNING.with(|cell| cell.set(true)); - STORAGE.with(|cell| cell.borrow_mut().clear()); + STORAGE.with(|cell| cell.borrow_mut().clear()); - System::set_current(sys); + System::set_current(sys); - // start arbiter controller - rt.spawn(ArbiterController { - stop: Some(stop), - rx: arb_rx, - }); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - // register arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); + // register arbiter + let _ = System::current() + .sys() + .send(SystemCommand::RegisterArbiter(id, arb)); - // run loop - let _ = rt.block_on(stop_rx).unwrap_or(1); + // start arbiter controller + // run loop + rt.block_on(ArbiterController { rx }); - // unregister arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::UnregisterArbiter(id)); + // unregister arbiter + let _ = System::current() + .sys() + .send(SystemCommand::UnregisterArbiter(id)); + } }) .unwrap_or_else(|err| { panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) }); Arbiter { - sender: arb_tx2, + sender: tx, thread_handle: Some(handle), } } - pub(crate) fn run_system(rt: Option<&Runtime>) { - RUNNING.with(|cell| cell.set(true)); - Q.with(|cell| { - let mut v = cell.borrow_mut(); - for fut in v.drain(..) { - if let Some(rt) = rt { - rt.spawn(fut); - } else { - tokio::task::spawn_local(fut); - } - } - }); - } - - pub(crate) fn stop_system() { - RUNNING.with(|cell| cell.set(false)); - } - /// Spawn a future on the current thread. This does not create a new Arbiter /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. @@ -176,26 +154,12 @@ impl Arbiter { where F: Future + 'static, { - RUNNING.with(move |cell| { - if cell.get() { - // Spawn the future on running executor - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(future)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } - } else { - // Box the future and push it to the queue, this results in double boxing - // because the executor boxes the future again, but works for now - Q.with(move |cell| { - cell.borrow_mut().push(Pin::from(Box::alloc().init(future))) - }); - } - }); + // HANDLE.with(|handle| { + // let handle = handle.borrow(); + // handle.push(tokio::task::spawn_local(future)); + // }); + // let _ = tokio::task::spawn_local(CleanupPending); + let _ = tokio::task::spawn_local(future); } /// Executes a future on the current thread. This does not create a new Arbiter @@ -206,7 +170,9 @@ impl Arbiter { F: FnOnce() -> R + 'static, R: Future + 'static, { - Arbiter::spawn(future::lazy(|_| f()).flatten()) + Arbiter::spawn(async { + f(); + }) } /// Send a future to the Arbiter's thread, and spawn it. @@ -214,9 +180,7 @@ impl Arbiter { where F: Future + Send + Unpin + 'static, { - let _ = self - .sender - .unbounded_send(ArbiterCommand::Execute(Box::new(future))); + let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future))); } /// Send a function to the Arbiter's thread, and execute it. Any result from the function @@ -227,7 +191,7 @@ impl Arbiter { { let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { f(); }))); } @@ -243,8 +207,8 @@ impl Arbiter { let (tx, rx) = channel(); let _ = self .sender - .unbounded_send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_canceled() { + .send(ArbiterCommand::ExecuteFn(Box::new(move || { + if !tx.is_closed() { let _ = tx.send(f()); } }))); @@ -313,40 +277,33 @@ impl Arbiter { /// Returns a future that will be completed once all currently spawned futures /// have completed. - pub fn local_join() -> impl Future { - PENDING.with(move |cell| { - let current = cell.replace(SmallVec::new()); - future::join_all(current).map(|_| ()) - }) + #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")] + pub async fn local_join() { + // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); + // async move { + // handle.collect::>().await; + // } + unimplemented!("Arbiter::local_join function is removed.") } } -/// Future used for cleaning-up already finished `JoinHandle`s -/// from the `PENDING` list so the vector doesn't grow indefinitely -struct CleanupPending; - -impl Future for CleanupPending { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - PENDING.with(move |cell| { - let mut pending = cell.borrow_mut(); - let mut i = 0; - while i != pending.len() { - if Pin::new(&mut pending[i]).poll(cx).is_ready() { - pending.remove(i); - } else { - i += 1; - } - } - }); - - Poll::Ready(()) - } -} +// /// Future used for cleaning-up already finished `JoinHandle`s +// /// from the `PENDING` list so the vector doesn't grow indefinitely +// struct CleanupPending; +// +// impl Future for CleanupPending { +// type Output = (); +// +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// HANDLE.with(move |handle| { +// recycle_join_handle(&mut *handle.borrow_mut(), cx); +// }); +// +// Poll::Ready(()) +// } +// } struct ArbiterController { - stop: Option>, rx: UnboundedReceiver, } @@ -368,25 +325,17 @@ impl Future for ArbiterController { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.rx).poll_next(cx) { + match Pin::new(&mut self.rx).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Poll::Ready(()); - } + ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(fut)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } + // HANDLE.with(|handle| { + // let mut handle = handle.borrow_mut(); + // handle.push(tokio::task::spawn_local(fut)); + // recycle_join_handle(&mut *handle, cx); + // }); + tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); @@ -398,6 +347,20 @@ impl Future for ArbiterController { } } +// fn recycle_join_handle(handle: &mut FuturesUnordered>, cx: &mut Context<'_>) { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// +// // Try to recycle more join handles and free up memory. +// // +// // this is a guess. The yield limit for FuturesUnordered is 32. +// // So poll an extra 3 times would make the total poll below 128. +// if handle.len() > 64 { +// (0..3).for_each(|_| { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// }) +// } +// } + #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), @@ -427,7 +390,7 @@ impl Future for SystemArbiter { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match Pin::new(&mut self.commands).poll_next(cx) { + match Pin::new(&mut self.commands).poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index e875a296..ff7b0e06 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -2,9 +2,8 @@ use std::borrow::Cow; use std::future::Future; use std::io; -use futures_channel::mpsc::unbounded; -use futures_channel::oneshot::{channel, Receiver}; -use futures_util::future::{lazy, FutureExt}; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::oneshot::{channel, Receiver}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; @@ -73,9 +72,10 @@ impl Builder { fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let system = + System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); @@ -91,18 +91,23 @@ impl Builder { F: FnOnce(), { let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); + let (sys_sender, sys_receiver) = unbounded_channel(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let rt = Runtime::new().unwrap(); + + let system = System::construct( + sys_sender, + Arbiter::new_system(rt.local()), + self.stop_on_panic, + ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - let rt = Runtime::new().unwrap(); rt.spawn(arb); // init system arbiter and run configuration method - rt.block_on(lazy(move |_| f())); + rt.block_on(async { f() }); SystemRunner { rt, stop, system } } @@ -121,27 +126,21 @@ impl AsyncSystemRunner { let AsyncSystemRunner { stop, .. } = self; // run loop - lazy(|_| { - Arbiter::run_system(None); - async { - let res = match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } + async { + match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - res + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } - }) - .flatten() + } } } @@ -161,8 +160,7 @@ impl SystemRunner { let SystemRunner { rt, stop, .. } = self; // run loop - Arbiter::run_system(Some(&rt)); - let result = match rt.block_on(stop) { + match rt.block_on(stop) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -174,19 +172,12 @@ impl SystemRunner { } } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - result + } } /// Execute a future and wait for result. - pub fn block_on(&self, fut: F) -> O - where - F: Future, - { - Arbiter::run_system(Some(&self.rt)); - let res = self.rt.block_on(fut); - Arbiter::stop_system(); - res + #[inline] + pub fn block_on(&self, fut: F) -> F::Output { + self.rt.block_on(fut) } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 4b0fde9f..3fd94bf9 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -4,6 +4,8 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +use std::future::Future; + #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use actix_macros::{main, test}; @@ -22,15 +24,12 @@ pub use self::system::System; /// # Panics /// /// This function panics if actix system is not running. +#[inline] pub fn spawn(f: F) where - F: std::future::Future + 'static, + F: Future + 'static, { - if !System::is_set() { - panic!("System is not running"); - } - - Arbiter::spawn(f); + Arbiter::spawn(f) } /// Asynchronous signal handling diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c19aa2b7..a72f492c 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -29,6 +29,10 @@ impl Runtime { }) } + pub(super) fn local(&self) -> &LocalSet { + &self.local + } + /// Spawn a future onto the single-threaded runtime. /// /// See [module level][mod] documentation for more details. @@ -43,7 +47,7 @@ impl Runtime { /// /// # fn dox() { /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); + /// let rt = Runtime::new().unwrap(); /// /// // Spawn a future onto the runtime /// rt.spawn(future::lazy(|_| { diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 07e0575c..1fbbc0ee 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -3,7 +3,7 @@ use std::future::Future; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures_channel::mpsc::UnboundedSender; +use tokio::sync::mpsc::UnboundedSender; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemCommand}; @@ -70,7 +70,7 @@ impl System { /// /// # Examples /// - /// ``` + /// ```rust,ignore /// use tokio::{runtime::Runtime, task::LocalSet}; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -139,7 +139,7 @@ impl System { /// /// # Examples /// - /// ``` + /// ```rust,ignore /// use tokio::runtime::Runtime; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -231,7 +231,7 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.sys.unbounded_send(SystemCommand::Exit(code)); + let _ = self.sys.send(SystemCommand::Exit(code)); } pub(crate) fn sys(&self) -> &UnboundedSender { diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 5471f800..12ceb4ef 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,19 +1,5 @@ use std::time::{Duration, Instant}; -#[test] -fn start_and_stop() { - actix_rt::System::new("start_and_stop").block_on(async move { - assert!( - actix_rt::Arbiter::is_running(), - "System doesn't seem to have started" - ); - }); - assert!( - !actix_rt::Arbiter::is_running(), - "System doesn't seem to have stopped" - ); -} - #[test] fn await_for_timer() { let time = Duration::from_secs(2); @@ -76,42 +62,42 @@ fn join_another_arbiter() { ); } -#[test] -fn join_current_arbiter() { - let time = Duration::from_secs(2); - - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::sleep(time).await; - actix_rt::Arbiter::current().stop(); - }); - actix_rt::Arbiter::local_join().await; - }); - assert!( - instant.elapsed() >= time, - "Join on current arbiter should wait for all spawned futures" - ); - - let large_timer = Duration::from_secs(20); - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::sleep(time).await; - actix_rt::Arbiter::current().stop(); - }); - let f = actix_rt::Arbiter::local_join(); - actix_rt::spawn(async move { - tokio::time::sleep(large_timer).await; - actix_rt::Arbiter::current().stop(); - }); - f.await; - }); - assert!( - instant.elapsed() < large_timer, - "local_join should await only for the already spawned futures" - ); -} +// #[test] +// fn join_current_arbiter() { +// let time = Duration::from_secs(2); +// +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// actix_rt::Arbiter::local_join().await; +// }); +// assert!( +// instant.elapsed() >= time, +// "Join on current arbiter should wait for all spawned futures" +// ); +// +// let large_timer = Duration::from_secs(20); +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// let f = actix_rt::Arbiter::local_join(); +// actix_rt::spawn(async move { +// tokio::time::delay_for(large_timer).await; +// actix_rt::Arbiter::current().stop(); +// }); +// f.await; +// }); +// assert!( +// instant.elapsed() < large_timer, +// "local_join should await only for the already spawned futures" +// ); +// } #[test] fn non_static_block_on() { diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 615c18b7..2bb86ef4 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -23,7 +23,7 @@ default = [] actix-service = "1.0.6" actix-rt = "1.1.1" actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-utils = "3.0.0" concurrent-queue = "1.2.2" futures-channel = { version = "0.3.7", default-features = false } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index b3ade4e1..0a1b525b 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -22,11 +22,14 @@ fn test_bind() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); - let srv = Server::build() - .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + let srv = sys.block_on(lazy(|_| { + Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); @@ -46,12 +49,14 @@ fn test_listen() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + sys.block_on(lazy(|_| { + Server::build() + .disable_signals() + .workers(1) + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + })); let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); @@ -77,18 +82,20 @@ fn test_start() { let h = thread::spawn(move || { let sys = actix_rt::System::new("test"); - let srv: Server = Server::build() - .backlog(100) - .disable_signals() - .bind("test", addr, move || { - fn_service(|io: TcpStream| async move { - let mut f = Framed::new(io, BytesCodec); - f.send(Bytes::from_static(b"test")).await.unwrap(); - Ok::<_, ()>(()) + let srv = sys.block_on(lazy(|_| { + Server::build() + .backlog(100) + .disable_signals() + .bind("test", addr, move || { + fn_service(|io: TcpStream| async move { + let mut f = Framed::new(io, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + }) }) - }) - .unwrap() - .start(); + .unwrap() + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); @@ -144,28 +151,30 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); let sys = actix_rt::System::new("test"); - let srv = Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .start(); + let srv = sys.block_on(lazy(|_| { + Server::build() + .disable_signals() + .configure(move |cfg| { + let num = num.clone(); + let lst = net::TcpListener::bind(addr3).unwrap(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .listen("addr3", lst) + .apply(move |rt| { + let num = num.clone(); + rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); + rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); + rt.on_start(lazy(move |_| { + let _ = num.fetch_add(1, Relaxed); + })) + }) + }) + .unwrap() + .workers(1) + .start() + })); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index efcdd394..de933085 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -87,11 +87,14 @@ impl TestServer { let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - Server::build() - .listen("test", tcp, factory)? - .workers(1) - .disable_signals() - .start(); + sys.block_on(async { + Server::build() + .listen("test", tcp, factory) + .unwrap() + .workers(1) + .disable_signals() + .start(); + }); tx.send((System::current(), local_addr)).unwrap(); sys.run() diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 09e1ba08..6b183be1 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -37,7 +37,7 @@ nativetls = ["native-tls", "tokio-native-tls"] [dependencies] actix-service = "1.0.0" actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-utils = "3.0.0" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 1f09732d..5debbaf5 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,8 +1,15 @@ # Changes ## Unreleased - 2020-xx-xx +<<<<<<< HEAD * Upgrade `pin-project` to `1.0`. * Update `bytes` dependency to `1`. +======= +* Use `pin-project-lite` to replace `pin-project`. [#229] +* Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229] + +[#229]: https://github.com/actix/actix-net/pull/229 +>>>>>>> upstream/master ## 2.0.0 - 2020-08-23 * No changes from beta 1. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 9c842434..f5bd5793 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "2.0.0" +version = "3.0.0" authors = ["Nikolay Kim "] description = "Various network related services and utilities for the Actix ecosystem." keywords = ["network", "framework", "async", "futures"] @@ -19,12 +19,11 @@ path = "src/lib.rs" actix-codec = "0.3.0" actix-rt = "1.1.1" actix-service = "1.0.6" -bitflags = "1.2.1" -bytes = "1" -either = "1.5.3" -futures-channel = { version = "0.3.7", default-features = false } + +futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } -futures-util = { version = "0.3.7", default-features = false } log = "0.4" -pin-project = "1.0.0" -slab = "0.4" +pin-project-lite = "0.2.0" + +[dev-dependencies] +futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-utils/src/condition.rs b/actix-utils/src/condition.rs deleted file mode 100644 index 9c7c977c..00000000 --- a/actix-utils/src/condition.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use slab::Slab; - -use crate::task::LocalWaker; - -/// Condition allows to notify multiple receivers at the same time -pub struct Condition(Rc>); - -struct Inner { - data: Slab>, -} - -impl Default for Condition { - fn default() -> Self { - Self::new() - } -} - -impl Condition { - pub fn new() -> Condition { - Condition(Rc::new(RefCell::new(Inner { data: Slab::new() }))) - } - - /// Get condition waiter - pub fn wait(&mut self) -> Waiter { - let token = self.0.borrow_mut().data.insert(None); - Waiter { - token, - inner: self.0.clone(), - } - } - - /// Notify all waiters - pub fn notify(&self) { - let inner = self.0.borrow(); - for item in inner.data.iter() { - if let Some(waker) = item.1 { - waker.wake(); - } - } - } -} - -impl Drop for Condition { - fn drop(&mut self) { - self.notify() - } -} - -#[must_use = "Waiter do nothing unless polled"] -pub struct Waiter { - token: usize, - inner: Rc>, -} - -impl Clone for Waiter { - fn clone(&self) -> Self { - let token = self.inner.borrow_mut().data.insert(None); - Waiter { - token, - inner: self.inner.clone(), - } - } -} - -impl Future for Waiter { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - let mut inner = this.inner.borrow_mut(); - let inner = unsafe { inner.data.get_unchecked_mut(this.token) }; - if inner.is_none() { - let waker = LocalWaker::default(); - waker.register(cx.waker()); - *inner = Some(waker); - Poll::Pending - } else if inner.as_mut().unwrap().register(cx.waker()) { - Poll::Pending - } else { - Poll::Ready(()) - } - } -} - -impl Drop for Waiter { - fn drop(&mut self) { - self.inner.borrow_mut().data.remove(self.token); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - - #[actix_rt::test] - async fn test_condition() { - let mut cond = Condition::new(); - let mut waiter = cond.wait(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter).poll(cx)).await, - Poll::Pending - ); - cond.notify(); - waiter.await; - - let mut waiter = cond.wait(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter).poll(cx)).await, - Poll::Pending - ); - let mut waiter2 = waiter.clone(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await, - Poll::Pending - ); - - drop(cond); - waiter.await; - waiter2.await; - } -} diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 4fe9dd0a..0b5984d2 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,6 +1,7 @@ -use std::cell::Cell; +use core::cell::Cell; +use core::task; + use std::rc::Rc; -use std::task; use crate::task::LocalWaker; diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs index 1ee72564..c3cb4f16 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-utils/src/dispatcher.rs @@ -2,13 +2,14 @@ #![allow(type_alias_bounds)] -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, mem}; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::{fmt, mem}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures_util::{future::Future, stream::Stream, FutureExt}; +use futures_core::stream::Stream; use log::debug; use crate::mpsc; @@ -61,25 +62,28 @@ pub enum Message { Close, } -/// Dispatcher is a future that reads frames from Framed object -/// and passes them to the service. -#[pin_project::pin_project] -pub struct Dispatcher -where - S: Service::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - I: 'static, - >::Error: std::fmt::Debug, -{ - service: S, - state: State, - #[pin] - framed: Framed, - rx: mpsc::Receiver, S::Error>>, - tx: mpsc::Sender, S::Error>>, +pin_project_lite::pin_project! { + /// Dispatcher is a future that reads frames from Framed object + /// and passes them to the service. + pub struct Dispatcher + where + S: Service::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead, + T: AsyncWrite, + U: Encoder, + U: Decoder, + I: 'static, + >::Error: fmt::Debug, + { + service: S, + state: State, + #[pin] + framed: Framed, + rx: mpsc::Receiver, S::Error>>, + tx: mpsc::Sender, S::Error>>, + } } enum State + Decoder, I> { @@ -114,8 +118,8 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - ::Error: std::fmt::Debug, - >::Error: std::fmt::Debug, + ::Error: fmt::Debug, + >::Error: fmt::Debug, { pub fn new>(framed: Framed, service: F) -> Self { let (tx, rx) = mpsc::channel(); @@ -178,7 +182,7 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, + >::Error: fmt::Debug, { loop { let this = self.as_mut().project(); @@ -198,9 +202,11 @@ where }; let tx = this.tx.clone(); - actix_rt::spawn(this.service.call(item).map(move |item| { + let fut = this.service.call(item); + actix_rt::spawn(async move { + let item = fut.await; let _ = tx.send(item.map(Message::Item)); - })); + }); } Poll::Pending => return false, Poll::Ready(Err(err)) => { @@ -220,7 +226,7 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, + >::Error: fmt::Debug, { loop { let mut this = self.as_mut().project(); @@ -271,8 +277,8 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, - ::Error: std::fmt::Debug, + >::Error: fmt::Debug, + ::Error: fmt::Debug, { type Output = Result<(), DispatcherError>; diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs deleted file mode 100644 index fdf15ffe..00000000 --- a/actix-utils/src/either.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Contains `Either` service and related types and functions. -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{Service, ServiceFactory}; -use futures_util::{future, future::Future, ready}; - -/// Combine two different service types into a single type. -/// -/// Both services must be of the same request, response, and error types. -/// `EitherService` is useful for handling conditional branching in service -/// middleware to different inner service types. -pub struct EitherService { - left: A, - right: B, -} - -impl Clone for EitherService { - fn clone(&self) -> Self { - EitherService { - left: self.left.clone(), - right: self.right.clone(), - } - } -} - -impl Service for EitherService -where - A: Service, - B: Service, -{ - type Request = either::Either; - type Response = A::Response; - type Error = A::Error; - type Future = future::Either; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let left = self.left.poll_ready(cx)?; - let right = self.right.poll_ready(cx)?; - - if left.is_ready() && right.is_ready() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn call(&mut self, req: either::Either) -> Self::Future { - match req { - either::Either::Left(req) => future::Either::Left(self.left.call(req)), - either::Either::Right(req) => future::Either::Right(self.right.call(req)), - } - } -} - -/// Combine two different new service types into a single service. -pub struct Either { - left: A, - right: B, -} - -impl Either { - pub fn new(left: A, right: B) -> Either - where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - Config = A::Config, - Response = A::Response, - Error = A::Error, - InitError = A::InitError, - >, - { - Either { left, right } - } -} - -impl ServiceFactory for Either -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - Config = A::Config, - Response = A::Response, - Error = A::Error, - InitError = A::InitError, - >, -{ - type Request = either::Either; - type Response = A::Response; - type Error = A::Error; - type InitError = A::InitError; - type Config = A::Config; - type Service = EitherService; - type Future = EitherNewService; - - fn new_service(&self, cfg: A::Config) -> Self::Future { - EitherNewService { - left: None, - right: None, - left_fut: self.left.new_service(cfg.clone()), - right_fut: self.right.new_service(cfg), - } - } -} - -impl Clone for Either { - fn clone(&self) -> Self { - Self { - left: self.left.clone(), - right: self.right.clone(), - } - } -} - -#[doc(hidden)] -#[pin_project::pin_project] -pub struct EitherNewService { - left: Option, - right: Option, - #[pin] - left_fut: A::Future, - #[pin] - right_fut: B::Future, -} - -impl Future for EitherNewService -where - A: ServiceFactory, - B: ServiceFactory, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.left.is_none() { - *this.left = Some(ready!(this.left_fut.poll(cx))?); - } - if this.right.is_none() { - *this.right = Some(ready!(this.right_fut.poll(cx))?); - } - - if this.left.is_some() && this.right.is_some() { - Poll::Ready(Ok(EitherService { - left: this.left.take().unwrap(), - right: this.right.take().unwrap(), - })) - } else { - Poll::Pending - } - } -} diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs deleted file mode 100644 index 2207f82a..00000000 --- a/actix-utils/src/inflight.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::convert::Infallible; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; - -use super::counter::{Counter, CounterGuard}; - -/// InFlight - new service for service that can limit number of in-flight -/// async requests. -/// -/// Default number of in-flight requests is 15 -pub struct InFlight { - max_inflight: usize, -} - -impl InFlight { - pub fn new(max: usize) -> Self { - Self { max_inflight: max } - } -} - -impl Default for InFlight { - fn default() -> Self { - Self::new(15) - } -} - -impl Transform for InFlight -where - S: Service, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type InitError = Infallible; - type Transform = InFlightService; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ok(InFlightService::new(self.max_inflight, service)) - } -} - -pub struct InFlightService { - count: Counter, - service: S, -} - -impl InFlightService -where - S: Service, -{ - pub fn new(max: usize, service: U) -> Self - where - U: IntoService, - { - Self { - count: Counter::new(max), - service: service.into_service(), - } - } -} - -impl Service for InFlightService -where - T: Service, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Future = InFlightServiceResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.service.poll_ready(cx)?.is_pending() { - Poll::Pending - } else if !self.count.available(cx) { - log::trace!("InFlight limit exceeded"); - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: T::Request) -> Self::Future { - InFlightServiceResponse { - fut: self.service.call(req), - _guard: self.count.get(), - } - } -} - -#[doc(hidden)] -#[pin_project::pin_project] -pub struct InFlightServiceResponse { - #[pin] - fut: T::Future, - _guard: CounterGuard, -} - -impl Future for InFlightServiceResponse { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - -#[cfg(test)] -mod tests { - - use std::task::{Context, Poll}; - use std::time::Duration; - - use super::*; - use actix_service::{apply, fn_factory, Service, ServiceFactory}; - use futures_util::future::{lazy, ok, FutureExt, LocalBoxFuture}; - - struct SleepService(Duration); - - impl Service for SleepService { - type Request = (); - type Response = (); - type Error = (); - type Future = LocalBoxFuture<'static, Result<(), ()>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - actix_rt::time::sleep(self.0) - .then(|_| ok::<_, ()>(())) - .boxed_local() - } - } - - #[actix_rt::test] - async fn test_transform() { - let wait_time = Duration::from_millis(50); - - let mut srv = InFlightService::new(1, SleepService(wait_time)); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - - let res = srv.call(()); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - - let _ = res.await; - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - } - - #[actix_rt::test] - async fn test_new_transform() { - let wait_time = Duration::from_millis(50); - - let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time)))); - - let mut srv = srv.new_service(&()).await.unwrap(); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - - let res = srv.call(()); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - - let _ = res.await; - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - } -} diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs deleted file mode 100644 index 61ea783a..00000000 --- a/actix-utils/src/keepalive.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::convert::Infallible; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, Ready}; - -use super::time::{LowResTime, LowResTimeService}; - -pub struct KeepAlive { - f: F, - ka: Duration, - time: LowResTime, - _t: PhantomData<(R, E)>, -} - -impl KeepAlive -where - F: Fn() -> E + Clone, -{ - pub fn new(ka: Duration, time: LowResTime, f: F) -> Self { - KeepAlive { - f, - ka, - time, - _t: PhantomData, - } - } -} - -impl Clone for KeepAlive -where - F: Clone, -{ - fn clone(&self) -> Self { - KeepAlive { - f: self.f.clone(), - ka: self.ka, - time: self.time.clone(), - _t: PhantomData, - } - } -} - -impl ServiceFactory for KeepAlive -where - F: Fn() -> E + Clone, -{ - type Request = R; - type Response = R; - type Error = E; - type Config = (); - type Service = KeepAliveService; - type InitError = Infallible; - type Future = Ready>; - - fn new_service(&self, _: ()) -> Self::Future { - ok(KeepAliveService::new( - self.ka, - self.time.timer(), - self.f.clone(), - )) - } -} - -pub struct KeepAliveService { - f: F, - ka: Duration, - time: LowResTimeService, - delay: Pin>, - expire: Instant, - _t: PhantomData<(R, E)>, -} - -impl KeepAliveService -where - F: Fn() -> E, -{ - pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self { - let expire = Instant::from_std(time.now() + ka); - KeepAliveService { - f, - ka, - time, - expire, - delay: Box::pin(sleep_until(expire)), - _t: PhantomData, - } - } -} - -impl Service for KeepAliveService -where - F: Fn() -> E, -{ - type Request = R; - type Response = R; - type Error = E; - type Future = Ready>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match self.delay.as_mut().poll(cx) { - Poll::Ready(_) => { - let now = Instant::from_std(self.time.now()); - if self.expire <= now { - Poll::Ready(Err((self.f)())) - } else { - self.delay.as_mut().reset(self.expire); - let _ = Pin::new(&mut self.delay).poll(cx); - Poll::Ready(Ok(())) - } - } - Poll::Pending => Poll::Ready(Ok(())), - } - } - - fn call(&mut self, req: R) -> Self::Future { - self.expire = Instant::from_std(self.time.now() + self.ka); - ok(req) - } -} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 19df225b..4c4f019c 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -5,16 +5,8 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -pub mod condition; pub mod counter; pub mod dispatcher; -pub mod either; -pub mod inflight; -pub mod keepalive; pub mod mpsc; -pub mod oneshot; -pub mod order; -pub mod stream; pub mod task; -pub mod time; pub mod timeout; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index 5905e123..2299dedb 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,15 +1,16 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use std::any::Any; -use std::cell::RefCell; +use core::any::Any; +use core::cell::RefCell; +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; + use std::collections::VecDeque; use std::error::Error; -use std::fmt; -use std::pin::Pin; use std::rc::Rc; -use std::task::{Context, Poll}; +use futures_core::stream::Stream; use futures_sink::Sink; -use futures_util::stream::Stream; use crate::task::LocalWaker; diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs deleted file mode 100644 index e75fad60..00000000 --- a/actix-utils/src/oneshot.rs +++ /dev/null @@ -1,316 +0,0 @@ -//! A one-shot, futures-aware channel. -use std::cell::RefCell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -pub use futures_channel::oneshot::Canceled; -use slab::Slab; - -use crate::task::LocalWaker; - -/// Creates a new futures-aware, one-shot channel. -pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(RefCell::new(Inner { - value: None, - rx_task: LocalWaker::new(), - })); - let tx = Sender { - inner: inner.clone(), - }; - let rx = Receiver { inner }; - (tx, rx) -} - -/// Creates a new futures-aware, pool of one-shot's. -pub fn pool() -> Pool { - Pool(Rc::new(RefCell::new(Slab::new()))) -} - -/// Represents the completion half of a oneshot through which the result of a -/// computation is signaled. -#[derive(Debug)] -pub struct Sender { - inner: Rc>>, -} - -/// A future representing the completion of a computation happening elsewhere in -/// memory. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct Receiver { - inner: Rc>>, -} - -// The channels do not ever project Pin to the inner T -impl Unpin for Receiver {} -impl Unpin for Sender {} - -#[derive(Debug)] -struct Inner { - value: Option, - rx_task: LocalWaker, -} - -impl Sender { - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the error provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was dropped before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, val: T) -> Result<(), T> { - if Rc::strong_count(&self.inner) == 2 { - let mut inner = self.inner.borrow_mut(); - inner.value = Some(val); - inner.rx_task.wake(); - Ok(()) - } else { - Err(val) - } - } - - /// Tests to see whether this `Sender`'s corresponding `Receiver` - /// has gone away. - pub fn is_canceled(&self) -> bool { - Rc::strong_count(&self.inner) == 1 - } -} - -impl Drop for Sender { - fn drop(&mut self) { - self.inner.borrow().rx_task.wake(); - } -} - -impl Future for Receiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // If we've got a value, then skip the logic below as we're done. - if let Some(val) = this.inner.borrow_mut().value.take() { - return Poll::Ready(Ok(val)); - } - - // Check if sender is dropped and return error if it is. - if Rc::strong_count(&this.inner) == 1 { - Poll::Ready(Err(Canceled)) - } else { - this.inner.borrow().rx_task.register(cx.waker()); - Poll::Pending - } - } -} - -/// Futures-aware, pool of one-shot's. -pub struct Pool(Rc>>>); - -bitflags::bitflags! { - pub struct Flags: u8 { - const SENDER = 0b0000_0001; - const RECEIVER = 0b0000_0010; - } -} - -#[derive(Debug)] -struct PoolInner { - flags: Flags, - value: Option, - waker: LocalWaker, -} - -impl Pool { - pub fn channel(&mut self) -> (PSender, PReceiver) { - let token = self.0.borrow_mut().insert(PoolInner { - flags: Flags::all(), - value: None, - waker: LocalWaker::default(), - }); - - ( - PSender { - token, - inner: self.0.clone(), - }, - PReceiver { - token, - inner: self.0.clone(), - }, - ) - } -} - -impl Clone for Pool { - fn clone(&self) -> Self { - Pool(self.0.clone()) - } -} - -/// Represents the completion half of a oneshot through which the result of a -/// computation is signaled. -#[derive(Debug)] -pub struct PSender { - token: usize, - inner: Rc>>>, -} - -/// A future representing the completion of a computation happening elsewhere in -/// memory. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct PReceiver { - token: usize, - inner: Rc>>>, -} - -// The one-shots do not ever project Pin to the inner T -impl Unpin for PReceiver {} -impl Unpin for PSender {} - -impl PSender { - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the error provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was dropped before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, val: T) -> Result<(), T> { - let mut inner = self.inner.borrow_mut(); - let inner = unsafe { inner.get_unchecked_mut(self.token) }; - - if inner.flags.contains(Flags::RECEIVER) { - inner.value = Some(val); - inner.waker.wake(); - Ok(()) - } else { - Err(val) - } - } - - /// Tests to see whether this `Sender`'s corresponding `Receiver` - /// has gone away. - pub fn is_canceled(&self) -> bool { - !unsafe { self.inner.borrow().get_unchecked(self.token) } - .flags - .contains(Flags::RECEIVER) - } -} - -impl Drop for PSender { - fn drop(&mut self) { - let mut inner = self.inner.borrow_mut(); - let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; - if inner_token.flags.contains(Flags::RECEIVER) { - inner_token.waker.wake(); - inner_token.flags.remove(Flags::SENDER); - } else { - inner.remove(self.token); - } - } -} - -impl Drop for PReceiver { - fn drop(&mut self) { - let mut inner = self.inner.borrow_mut(); - let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; - if inner_token.flags.contains(Flags::SENDER) { - inner_token.flags.remove(Flags::RECEIVER); - } else { - inner.remove(self.token); - } - } -} - -impl Future for PReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - let mut inner = this.inner.borrow_mut(); - let inner = unsafe { inner.get_unchecked_mut(this.token) }; - - // If we've got a value, then skip the logic below as we're done. - if let Some(val) = inner.value.take() { - return Poll::Ready(Ok(val)); - } - - // Check if sender is dropped and return error if it is. - if !inner.flags.contains(Flags::SENDER) { - Poll::Ready(Err(Canceled)) - } else { - inner.waker.register(cx.waker()); - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - - #[actix_rt::test] - async fn test_oneshot() { - let (tx, rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, rx) = channel(); - assert!(!tx.is_canceled()); - drop(rx); - assert!(tx.is_canceled()); - assert!(tx.send("test").is_err()); - - let (tx, rx) = channel::<&'static str>(); - drop(tx); - assert!(rx.await.is_err()); - - let (tx, mut rx) = channel::<&'static str>(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, mut rx) = channel::<&'static str>(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - drop(tx); - assert!(rx.await.is_err()); - } - - #[actix_rt::test] - async fn test_pool() { - let (tx, rx) = pool().channel(); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, rx) = pool().channel(); - assert!(!tx.is_canceled()); - drop(rx); - assert!(tx.is_canceled()); - assert!(tx.send("test").is_err()); - - let (tx, rx) = pool::<&'static str>().channel(); - drop(tx); - assert!(rx.await.is_err()); - - let (tx, mut rx) = pool::<&'static str>().channel(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, mut rx) = pool::<&'static str>().channel(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - drop(tx); - assert!(rx.await.is_err()); - } -} diff --git a/actix-utils/src/order.rs b/actix-utils/src/order.rs deleted file mode 100644 index 6d79ae4c..00000000 --- a/actix-utils/src/order.rs +++ /dev/null @@ -1,283 +0,0 @@ -use std::collections::VecDeque; -use std::convert::Infallible; -use std::fmt; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; - -use crate::oneshot; -use crate::task::LocalWaker; - -struct Record { - rx: oneshot::Receiver>, - tx: oneshot::Sender>, -} - -/// Timeout error -pub enum InOrderError { - /// Service error - Service(E), - /// Service call dropped - Disconnected, -} - -impl From for InOrderError { - fn from(err: E) -> Self { - InOrderError::Service(err) - } -} - -impl fmt::Debug for InOrderError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e), - InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"), - } - } -} - -impl fmt::Display for InOrderError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - InOrderError::Service(e) => e.fmt(f), - InOrderError::Disconnected => write!(f, "InOrder service disconnected"), - } - } -} - -/// InOrder - The service will yield responses as they become available, -/// in the order that their originating requests were submitted to the service. -pub struct InOrder { - _t: PhantomData, -} - -impl InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - pub fn new() -> Self { - Self { _t: PhantomData } - } - - pub fn service(service: S) -> InOrderService { - InOrderService::new(service) - } -} - -impl Default for InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - fn default() -> Self { - Self::new() - } -} - -impl Transform for InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - type Request = S::Request; - type Response = S::Response; - type Error = InOrderError; - type Transform = InOrderService; - type InitError = Infallible; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ok(InOrderService::new(service)) - } -} - -pub struct InOrderService { - service: S, - waker: Rc, - acks: VecDeque>, -} - -impl InOrderService -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - pub fn new(service: U) -> Self - where - U: IntoService, - { - Self { - service: service.into_service(), - acks: VecDeque::new(), - waker: Rc::new(LocalWaker::new()), - } - } -} - -impl Service for InOrderService -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - type Request = S::Request; - type Response = S::Response; - type Error = InOrderError; - type Future = InOrderServiceResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // poll_ready could be called from different task - self.waker.register(cx.waker()); - - // check acks - while !self.acks.is_empty() { - let rec = self.acks.front_mut().unwrap(); - match Pin::new(&mut rec.rx).poll(cx) { - Poll::Ready(Ok(res)) => { - let rec = self.acks.pop_front().unwrap(); - let _ = rec.tx.send(res); - } - Poll::Pending => break, - Poll::Ready(Err(oneshot::Canceled)) => { - return Poll::Ready(Err(InOrderError::Disconnected)) - } - } - } - - // check nested service - if self - .service - .poll_ready(cx) - .map_err(InOrderError::Service)? - .is_pending() - { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, request: S::Request) -> Self::Future { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - self.acks.push_back(Record { rx: rx1, tx: tx2 }); - - let waker = self.waker.clone(); - let fut = self.service.call(request); - actix_rt::spawn(async move { - let res = fut.await; - waker.wake(); - let _ = tx1.send(res); - }); - - InOrderServiceResponse { rx: rx2 } - } -} - -#[doc(hidden)] -pub struct InOrderServiceResponse { - rx: oneshot::Receiver>, -} - -impl Future for InOrderServiceResponse { - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.rx).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)), - Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())), - Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)), - } - } -} - -#[cfg(test)] -mod tests { - - use std::task::{Context, Poll}; - use std::time::Duration; - - use super::*; - use actix_service::Service; - use futures_channel::oneshot; - use futures_util::future::{lazy, poll_fn, FutureExt, LocalBoxFuture}; - - struct Srv; - - impl Service for Srv { - type Request = oneshot::Receiver; - type Response = usize; - type Error = (); - type Future = LocalBoxFuture<'static, Result>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: oneshot::Receiver) -> Self::Future { - req.map(|res| res.map_err(|_| ())).boxed_local() - } - } - - #[actix_rt::test] - async fn test_in_order() { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let (tx3, rx3) = oneshot::channel(); - let (tx_stop, rx_stop) = oneshot::channel(); - - let h = std::thread::spawn(move || { - let rx1 = rx1; - let rx2 = rx2; - let rx3 = rx3; - let tx_stop = tx_stop; - actix_rt::System::new("test").block_on(async { - let mut srv = InOrderService::new(Srv); - - let _ = lazy(|cx| srv.poll_ready(cx)).await; - let res1 = srv.call(rx1); - let res2 = srv.call(rx2); - let res3 = srv.call(rx3); - - actix_rt::spawn(async move { - poll_fn(|cx| { - let _ = srv.poll_ready(cx); - Poll::<()>::Pending - }) - .await; - }); - - assert_eq!(res1.await.unwrap(), 1); - assert_eq!(res2.await.unwrap(), 2); - assert_eq!(res3.await.unwrap(), 3); - - let _ = tx_stop.send(()); - actix_rt::System::current().stop(); - }); - }); - - let _ = tx3.send(3); - std::thread::sleep(Duration::from_millis(50)); - let _ = tx2.send(2); - let _ = tx1.send(1); - - let _ = rx_stop.await; - let _ = h.join(); - } -} diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs deleted file mode 100644 index 72e9e019..00000000 --- a/actix-utils/src/stream.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service}; -use futures_util::{stream::Stream, FutureExt}; - -use crate::mpsc; - -#[pin_project::pin_project] -pub struct Dispatcher -where - S: Stream, - T: Service + 'static, -{ - #[pin] - stream: S, - service: T, - err_rx: mpsc::Receiver, - err_tx: mpsc::Sender, -} - -impl Dispatcher -where - S: Stream, - T: Service + 'static, -{ - pub fn new(stream: S, service: F) -> Self - where - F: IntoService, - { - let (err_tx, err_rx) = mpsc::channel(); - Dispatcher { - err_rx, - err_tx, - stream, - service: service.into_service(), - } - } -} - -impl Future for Dispatcher -where - S: Stream, - T: Service + 'static, -{ - type Output = Result<(), T::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) { - return Poll::Ready(Err(e)); - } - - loop { - return match this.service.poll_ready(cx)? { - Poll::Ready(_) => match this.stream.poll_next(cx) { - Poll::Ready(Some(item)) => { - let stop = this.err_tx.clone(); - actix_rt::spawn(this.service.call(item).map(move |res| { - if let Err(e) = res { - let _ = stop.send(e); - } - })); - this = self.as_mut().project(); - continue; - } - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(Ok(())), - }, - Poll::Pending => Poll::Pending, - }; - } - } -} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs index cb32eb8d..8f85f5e4 100644 --- a/actix-utils/src/task.rs +++ b/actix-utils/src/task.rs @@ -1,7 +1,7 @@ -use std::cell::UnsafeCell; -use std::marker::PhantomData; -use std::task::Waker; -use std::{fmt, rc}; +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::PhantomData; +use core::task::Waker; /// A synchronization primitive for task wakeup. /// @@ -23,7 +23,8 @@ use std::{fmt, rc}; #[derive(Default)] pub struct LocalWaker { pub(crate) waker: UnsafeCell>, - _t: PhantomData>, + // mark LocalWaker as a !Send type. + _t: PhantomData<*const ()>, } impl LocalWaker { diff --git a/actix-utils/src/time.rs b/actix-utils/src/time.rs deleted file mode 100644 index e8daf4bf..00000000 --- a/actix-utils/src/time.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::cell::RefCell; -use std::convert::Infallible; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::time::{self, Duration, Instant}; - -use actix_rt::time::sleep; -use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, ready, FutureExt, Ready}; - -#[derive(Clone, Debug)] -pub struct LowResTime(Rc>); - -#[derive(Debug)] -struct Inner { - resolution: Duration, - current: Option, -} - -impl Inner { - fn new(resolution: Duration) -> Self { - Inner { - resolution, - current: None, - } - } -} - -impl LowResTime { - pub fn with(resolution: Duration) -> LowResTime { - LowResTime(Rc::new(RefCell::new(Inner::new(resolution)))) - } - - pub fn timer(&self) -> LowResTimeService { - LowResTimeService(self.0.clone()) - } -} - -impl Default for LowResTime { - fn default() -> Self { - LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1))))) - } -} - -impl ServiceFactory for LowResTime { - type Request = (); - type Response = Instant; - type Error = Infallible; - type InitError = Infallible; - type Config = (); - type Service = LowResTimeService; - type Future = Ready>; - - fn new_service(&self, _: ()) -> Self::Future { - ok(self.timer()) - } -} - -#[derive(Clone, Debug)] -pub struct LowResTimeService(Rc>); - -impl LowResTimeService { - pub fn with(resolution: Duration) -> LowResTimeService { - LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution)))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> Instant { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = Instant::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - actix_rt::spawn(sleep(interval).then(move |_| { - inner.borrow_mut().current.take(); - ready(()) - })); - now - } - } -} - -impl Service for LowResTimeService { - type Request = (); - type Response = Instant; - type Error = Infallible; - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - ok(self.now()) - } -} - -#[derive(Clone, Debug)] -pub struct SystemTime(Rc>); - -#[derive(Debug)] -struct SystemTimeInner { - resolution: Duration, - current: Option, -} - -impl SystemTimeInner { - fn new(resolution: Duration) -> Self { - SystemTimeInner { - resolution, - current: None, - } - } -} - -#[derive(Clone, Debug)] -pub struct SystemTimeService(Rc>); - -impl SystemTimeService { - pub fn with(resolution: Duration) -> SystemTimeService { - SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution)))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> time::SystemTime { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = time::SystemTime::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - actix_rt::spawn(sleep(interval).then(move |_| { - inner.borrow_mut().current.take(); - ready(()) - })); - now - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::{Duration, SystemTime}; - - /// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value. - #[actix_rt::test] - async fn system_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - - let time_service = SystemTimeService::with(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value. - #[actix_rt::test] - async fn low_res_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - let time_service = LowResTimeService::with(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: `SystemTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[actix_rt::test] - async fn system_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(300); - - let time_service = SystemTimeService::with(resolution); - - let first_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - sleep(wait_time).await; - - let second_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - assert!(second_time - first_time >= wait_time); - } - - /// State Under Test: `LowResTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[actix_rt::test] - async fn low_res_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(300); - let time_service = LowResTimeService::with(resolution); - - let first_time = time_service.now(); - - sleep(wait_time).await; - - let second_time = time_service.now(); - assert!(second_time - first_time >= wait_time); - } -} diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 5c3b6d70..17b851a4 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -2,15 +2,14 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, time}; +use core::future::Future; +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::{fmt, time}; use actix_rt::time::{sleep, Sleep}; use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; /// Applies a timeout to requests. #[derive(Debug)] @@ -87,13 +86,33 @@ where type Error = TimeoutError; type Transform = TimeoutService; type InitError = E; - type Future = Ready>; + type Future = TimeoutFuture; fn new_transform(&self, service: S) -> Self::Future { - ok(TimeoutService { + let service = TimeoutService { service, timeout: self.timeout, - }) + }; + + TimeoutFuture { + service: Some(service), + _err: PhantomData, + } + } +} + +pub struct TimeoutFuture { + service: Option, + _err: PhantomData, +} + +impl Unpin for TimeoutFuture {} + +impl Future for TimeoutFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(self.get_mut().service.take().unwrap())) } } @@ -140,14 +159,15 @@ where } } -/// `TimeoutService` response future -#[pin_project::pin_project] -#[derive(Debug)] -pub struct TimeoutServiceResponse { - #[pin] - fut: T::Future, - #[pin] - sleep: Sleep, +pin_project_lite::pin_project! { + /// `TimeoutService` response future + #[derive(Debug)] + pub struct TimeoutServiceResponse { + #[pin] + fut: T::Future, + #[pin] + sleep: Sleep, + } } impl Future for TimeoutServiceResponse @@ -160,17 +180,15 @@ where let this = self.project(); // First, try polling the future - match this.fut.poll(cx) { - Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), - Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), - Poll::Pending => {} + if let Poll::Ready(res) = this.fut.poll(cx) { + return match res { + Ok(v) => Poll::Ready(Ok(v)), + Err(e) => Poll::Ready(Err(TimeoutError::Service(e))), + }; } // Now check the sleep - match this.sleep.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), - } + this.sleep.poll(cx).map(|_| Err(TimeoutError::Timeout)) } }