diff --git a/actix-router/CHANGES.md b/actix-router/CHANGES.md index fefab81c..581243fb 100644 --- a/actix-router/CHANGES.md +++ b/actix-router/CHANGES.md @@ -1,7 +1,7 @@ # Changes ## Unreleased - 2021-xx-xx -* `Router::recognize_mut_checked` take `&self` and return `Option<(&T, ResourceId)>` [#247] +* Add `Router::recognize_checked` [#247] [#247]: https://github.com/actix/actix-net/pull/247 diff --git a/actix-router/src/router.rs b/actix-router/src/router.rs index b6932f9e..aeb1aa36 100644 --- a/actix-router/src/router.rs +++ b/actix-router/src/router.rs @@ -45,7 +45,7 @@ impl Router { None } - pub fn recognize_mut_checked( + pub fn recognize_checked( &self, resource: &mut R, check: F, @@ -62,6 +62,24 @@ impl Router { } None } + + pub fn recognize_mut_checked( + &mut self, + resource: &mut R, + check: F, + ) -> Option<(&mut T, ResourceId)> + where + F: Fn(&R, &Option) -> bool, + R: Resource

, + P: ResourcePath, + { + for item in self.0.iter_mut() { + if item.0.match_path_checked(resource, &check, &item.2) { + return Some((&mut item.1, ResourceId(item.0.id()))); + } + } + None + } } pub struct RouterBuilder { diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index ead85de0..fdffd139 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -24,7 +24,7 @@ default = [] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" @@ -36,6 +36,7 @@ slab = "0.4" tokio = { version = "1", features = ["sync"] } [dev-dependencies] +actix-rt = "2.0.0-beta.2" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 91e98fc2..eb18590a 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,9 +6,9 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; +use futures_core::ready; use log::{error, info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; @@ -172,56 +172,63 @@ impl Worker { let avail = availability.clone(); // every worker runs in it's own arbiter. - Arbiter::new().send(Box::pin(async move { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { - rx, - rx2, - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable, - }); + std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(tokio::task::LocalSet::new().run_until(async { + availability.set(false); + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + rx, + rx2, + availability, + factories, + shutdown_timeout, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable, + }); - let fut = wrk - .factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } - }) - .collect::>(); - - spawn(async move { - let res: Result, _> = join_all(fut).await.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { + fut.await.map(|res| { + res.into_iter() + .map(|(t, s)| (idx, t, s)) + .collect::>() + }) } + }) + .collect::>(); + + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + + match res { + Ok(res) => { + res.into_iter() + .flatten() + .for_each(|(factory, token, service)| { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + }); + wrk.await } + Err(e) => error!("Can not start worker: {:?}", e), } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } - } - wrk.await - }); - })); + })); + }); WorkerHandle::new(idx, tx1, tx2, avail) } @@ -360,8 +367,8 @@ impl Future for Worker { } }, WorkerState::Restarting(idx, token, ref mut fut) => { - match fut.as_mut().poll(cx) { - Poll::Ready(Ok(item)) => { + match ready!(fut.as_mut().poll(cx)) { + Ok(item) => { // only interest in the first item? if let Some((token, service)) = item.into_iter().next() { trace!( @@ -373,13 +380,12 @@ impl Future for Worker { return self.poll(cx); } } - Poll::Ready(Err(_)) => { + Err(_) => { panic!( "Can not restart {:?} service", self.factories[idx].name(token) ); } - Poll::Pending => return Poll::Pending, } self.poll(cx) } @@ -387,7 +393,6 @@ impl Future for Worker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::current().stop(); return Poll::Ready(()); } @@ -395,7 +400,6 @@ impl Future for Worker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::current().stop(); return Poll::Ready(()); } @@ -430,16 +434,15 @@ impl Future for Worker { } } - match Pin::new(&mut self.rx).poll_recv(cx) { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { // handle incoming io stream - Poll::Ready(Some(WorkerCommand(msg))) => { + Some(WorkerCommand(msg)) => { let guard = self.conns.get(); let _ = self.services[msg.token.0] .service .call((Some(guard), msg.io)); } - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(()), + None => return Poll::Ready(()), }; }, } diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 8f23d60c..9b51fae5 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -45,7 +45,7 @@ uri = ["http"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index f038414c..c82cf79e 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" futures-core = { version = "0.3.7", default-features = false }