mirror of https://github.com/fafhrd91/actix-net
revert actix-router change.add Router::recognize_checked
This commit is contained in:
parent
6627c6b129
commit
409c63f948
|
@ -1,7 +1,7 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
* `Router::recognize_mut_checked` take `&self` and return `Option<(&T, ResourceId)>` [#247]
|
* Add `Router::recognize_checked` [#247]
|
||||||
|
|
||||||
[#247]: https://github.com/actix/actix-net/pull/247
|
[#247]: https://github.com/actix/actix-net/pull/247
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ impl<T, U> Router<T, U> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn recognize_mut_checked<R, P, F>(
|
pub fn recognize_checked<R, P, F>(
|
||||||
&self,
|
&self,
|
||||||
resource: &mut R,
|
resource: &mut R,
|
||||||
check: F,
|
check: F,
|
||||||
|
@ -62,6 +62,24 @@ impl<T, U> Router<T, U> {
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn recognize_mut_checked<R, P, F>(
|
||||||
|
&mut self,
|
||||||
|
resource: &mut R,
|
||||||
|
check: F,
|
||||||
|
) -> Option<(&mut T, ResourceId)>
|
||||||
|
where
|
||||||
|
F: Fn(&R, &Option<U>) -> bool,
|
||||||
|
R: Resource<P>,
|
||||||
|
P: ResourcePath,
|
||||||
|
{
|
||||||
|
for item in self.0.iter_mut() {
|
||||||
|
if item.0.match_path_checked(resource, &check, &item.2) {
|
||||||
|
return Some((&mut item.1, ResourceId(item.0.id())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RouterBuilder<T, U = ()> {
|
pub struct RouterBuilder<T, U = ()> {
|
||||||
|
|
|
@ -24,7 +24,7 @@ default = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.4.0-beta.1"
|
actix-codec = "0.4.0-beta.1"
|
||||||
actix-rt = "2.0.0-beta.2"
|
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||||
actix-service = "2.0.0-beta.3"
|
actix-service = "2.0.0-beta.3"
|
||||||
actix-utils = "3.0.0-beta.1"
|
actix-utils = "3.0.0-beta.1"
|
||||||
|
|
||||||
|
@ -36,6 +36,7 @@ slab = "0.4"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
actix-rt = "2.0.0-beta.2"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||||
|
|
|
@ -6,9 +6,9 @@ use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use actix_rt::time::{sleep_until, Instant, Sleep};
|
use actix_rt::time::{sleep_until, Instant, Sleep};
|
||||||
use actix_rt::{spawn, Arbiter};
|
|
||||||
use actix_utils::counter::Counter;
|
use actix_utils::counter::Counter;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use futures_core::ready;
|
||||||
use log::{error, info, trace};
|
use log::{error, info, trace};
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
@ -172,7 +172,12 @@ impl Worker {
|
||||||
let avail = availability.clone();
|
let avail = availability.clone();
|
||||||
|
|
||||||
// every worker runs in it's own arbiter.
|
// every worker runs in it's own arbiter.
|
||||||
Arbiter::new().send(Box::pin(async move {
|
std::thread::spawn(move || {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
.block_on(tokio::task::LocalSet::new().run_until(async {
|
||||||
availability.set(false);
|
availability.set(false);
|
||||||
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
|
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
|
||||||
rx,
|
rx,
|
||||||
|
@ -192,36 +197,38 @@ impl Worker {
|
||||||
.map(|(idx, factory)| {
|
.map(|(idx, factory)| {
|
||||||
let fut = factory.create();
|
let fut = factory.create();
|
||||||
async move {
|
async move {
|
||||||
fut.await.map(|r| {
|
fut.await.map(|res| {
|
||||||
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
|
res.into_iter()
|
||||||
|
.map(|(t, s)| (idx, t, s))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
spawn(async move {
|
let res = join_all(fut)
|
||||||
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.collect::<Result<Vec<_>, _>>();
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Ok(services) => {
|
Ok(res) => {
|
||||||
for item in services {
|
res.into_iter()
|
||||||
for (factory, token, service) in item {
|
.flatten()
|
||||||
|
.for_each(|(factory, token, service)| {
|
||||||
assert_eq!(token.0, wrk.services.len());
|
assert_eq!(token.0, wrk.services.len());
|
||||||
wrk.services.push(WorkerService {
|
wrk.services.push(WorkerService {
|
||||||
factory,
|
factory,
|
||||||
service,
|
service,
|
||||||
status: WorkerServiceStatus::Unavailable,
|
status: WorkerServiceStatus::Unavailable,
|
||||||
});
|
});
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Can not start worker: {:?}", e);
|
|
||||||
Arbiter::current().stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
wrk.await
|
|
||||||
});
|
});
|
||||||
|
wrk.await
|
||||||
|
}
|
||||||
|
Err(e) => error!("Can not start worker: {:?}", e),
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
WorkerHandle::new(idx, tx1, tx2, avail)
|
WorkerHandle::new(idx, tx1, tx2, avail)
|
||||||
}
|
}
|
||||||
|
@ -360,8 +367,8 @@ impl Future for Worker {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
WorkerState::Restarting(idx, token, ref mut fut) => {
|
WorkerState::Restarting(idx, token, ref mut fut) => {
|
||||||
match fut.as_mut().poll(cx) {
|
match ready!(fut.as_mut().poll(cx)) {
|
||||||
Poll::Ready(Ok(item)) => {
|
Ok(item) => {
|
||||||
// only interest in the first item?
|
// only interest in the first item?
|
||||||
if let Some((token, service)) = item.into_iter().next() {
|
if let Some((token, service)) = item.into_iter().next() {
|
||||||
trace!(
|
trace!(
|
||||||
|
@ -373,13 +380,12 @@ impl Future for Worker {
|
||||||
return self.poll(cx);
|
return self.poll(cx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(_)) => {
|
Err(_) => {
|
||||||
panic!(
|
panic!(
|
||||||
"Can not restart {:?} service",
|
"Can not restart {:?} service",
|
||||||
self.factories[idx].name(token)
|
self.factories[idx].name(token)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Poll::Pending => return Poll::Pending,
|
|
||||||
}
|
}
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
}
|
}
|
||||||
|
@ -387,7 +393,6 @@ impl Future for Worker {
|
||||||
let num = num_connections();
|
let num = num_connections();
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
let _ = tx.take().unwrap().send(true);
|
let _ = tx.take().unwrap().send(true);
|
||||||
Arbiter::current().stop();
|
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -395,7 +400,6 @@ impl Future for Worker {
|
||||||
if Pin::new(t2).poll(cx).is_ready() {
|
if Pin::new(t2).poll(cx).is_ready() {
|
||||||
let _ = tx.take().unwrap().send(false);
|
let _ = tx.take().unwrap().send(false);
|
||||||
self.shutdown(true);
|
self.shutdown(true);
|
||||||
Arbiter::current().stop();
|
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,16 +434,15 @@ impl Future for Worker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match Pin::new(&mut self.rx).poll_recv(cx) {
|
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
||||||
// handle incoming io stream
|
// handle incoming io stream
|
||||||
Poll::Ready(Some(WorkerCommand(msg))) => {
|
Some(WorkerCommand(msg)) => {
|
||||||
let guard = self.conns.get();
|
let guard = self.conns.get();
|
||||||
let _ = self.services[msg.token.0]
|
let _ = self.services[msg.token.0]
|
||||||
.service
|
.service
|
||||||
.call((Some(guard), msg.io));
|
.call((Some(guard), msg.io));
|
||||||
}
|
}
|
||||||
Poll::Pending => return Poll::Pending,
|
None => return Poll::Ready(()),
|
||||||
Poll::Ready(None) => return Poll::Ready(()),
|
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ uri = ["http"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.4.0-beta.1"
|
actix-codec = "0.4.0-beta.1"
|
||||||
actix-rt = "2.0.0-beta.2"
|
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||||
actix-service = "2.0.0-beta.3"
|
actix-service = "2.0.0-beta.3"
|
||||||
actix-utils = "3.0.0-beta.1"
|
actix-utils = "3.0.0-beta.1"
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-codec = "0.4.0-beta.1"
|
actix-codec = "0.4.0-beta.1"
|
||||||
actix-rt = "2.0.0-beta.2"
|
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||||
actix-service = "2.0.0-beta.3"
|
actix-service = "2.0.0-beta.3"
|
||||||
|
|
||||||
futures-core = { version = "0.3.7", default-features = false }
|
futures-core = { version = "0.3.7", default-features = false }
|
||||||
|
|
Loading…
Reference in New Issue