start migrate to tokio 1.0

This commit is contained in:
fakeshadow 2020-12-23 06:54:25 +08:00
parent 7f785edd28
commit aa8d33a22b
16 changed files with 128 additions and 96 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
version: version:
- 1.43.0 - 1.44.0
- stable - stable
- nightly - nightly

View File

@ -29,3 +29,7 @@ actix-tracing = { path = "actix-tracing" }
actix-utils = { path = "actix-utils" } actix-utils = { path = "actix-utils" }
actix-router = { path = "router" } actix-router = { path = "router" }
bytestring = { path = "string" } bytestring = { path = "string" }
bytes = { git = "https://github.com/tokio-rs/bytes.git" }
tokio = { git = "https://github.com/tokio-rs/tokio.git" }
tokio-util = { git = "https://github.com/tokio-rs/tokio.git" }

View File

@ -2,8 +2,9 @@
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Upgrade `pin-project` to `1.0`. * Upgrade `pin-project` to `1.0`.
* Update `tokio` dependency to 0.3.1. * Upgrade `tokio` dependency to `1`.
* Update `tokio-util` dependency to 0.5.1. * Upgrade `tokio-util` dependency to `0.6`.
* Upgrade `bytes` dependency to `1`.
## 0.3.0 - 2020-08-23 ## 0.3.0 - 2020-08-23
* No changes from beta 2. * No changes from beta 2.

View File

@ -17,10 +17,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
bitflags = "1.2.1" bitflags = "1.2.1"
bytes = "0.6" bytes = "1"
futures-core = { version = "0.3.4", default-features = false } futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
log = "0.4" log = "0.4"
pin-project = "1.0.0" pin-project = "1.0.0"
tokio = "0.3.1" tokio = "1"
tokio-util = { version = "0.5.1", features = ["codec", "io"] } tokio-util = { version = "0.6", features = ["codec", "io"] }

View File

@ -14,7 +14,7 @@ impl Encoder<Bytes> for BytesCodec {
#[inline] #[inline]
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.extend_from_slice(item.bytes()); dst.extend_from_slice(item.chunk());
Ok(()) Ok(())
} }
} }

View File

@ -56,6 +56,6 @@ tokio-rustls = { version = "0.20.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.6" bytes = "1"
actix-testing = "1.0.0" actix-testing = "1.0.0"
futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } futures-util = { version = "0.3.4", default-features = false, features = ["sink"] }

View File

@ -21,7 +21,7 @@ copyless = "0.1.4"
futures-channel = "0.3.4" futures-channel = "0.3.4"
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
smallvec = "1" smallvec = "1"
tokio = { version = "0.3.1", features = ["rt", "net", "signal", "stream", "time"] } tokio = { version = "1", features = ["rt", "net", "signal", "time"] }
[dev-dependencies] [dev-dependencies]
tokio = { version = "0.3.1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -35,7 +35,7 @@ slab = "0.4"
[dev-dependencies] [dev-dependencies]
actix-testing = "1.0.0" actix-testing = "1.0.0"
bytes = "0.6" bytes = "1"
env_logger = "0.7" env_logger = "0.7"
futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } futures-util = { version = "0.3.4", default-features = false, features = ["sink"] }
tokio = { version = "0.3.1", features = ["full"] } tokio = { version = "1", features = ["full"] }

View File

@ -1,6 +1,6 @@
use std::future::Future; // use std::future::Future;
use std::pin::Pin; // use std::pin::Pin;
use std::task::{Context, Poll}; // use std::task::{Context, Poll};
use crate::server::Server; use crate::server::Server;
@ -19,11 +19,11 @@ pub(crate) enum Signal {
} }
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server, // srv: Server,
#[cfg(not(unix))] // #[cfg(not(unix))]
stream: Pin<Box<dyn Future<Output = std::io::Result<()>>>>, // stream: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
#[cfg(unix)] // #[cfg(unix)]
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, // streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
} }
impl Signals { impl Signals {
@ -39,8 +39,6 @@ impl Signals {
{ {
use actix_rt::signal::unix; use actix_rt::signal::unix;
let mut streams = Vec::new();
let sig_map = [ let sig_map = [
(unix::SignalKind::interrupt(), Signal::Int), (unix::SignalKind::interrupt(), Signal::Int),
(unix::SignalKind::hangup(), Signal::Hup), (unix::SignalKind::hangup(), Signal::Hup),
@ -50,7 +48,14 @@ impl Signals {
for (kind, sig) in sig_map.iter() { for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) { match unix::signal(*kind) {
Ok(stream) => streams.push((*sig, stream)), Ok(mut stream) => {
let sig = *sig;
let srv = srv.clone();
actix_rt::spawn(async move {
stream.recv().await;
srv.signal(sig);
});
}
Err(e) => log::error!( Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}", "Can not initialize stream handler for {:?} err: {}",
sig, sig,
@ -59,40 +64,51 @@ impl Signals {
} }
} }
actix_rt::spawn(Signals { srv, streams }); // for (kind, sig) in sig_map.iter() {
// match unix::signal(*kind) {
// Ok(stream) => streams.push((*sig, stream)),
// Err(e) => log::error!(
// "Can not initialize stream handler for {:?} err: {}",
// sig,
// e
// ),
// }
// }
//
// actix_rt::spawn(Signals { srv, streams });
} }
} }
} }
impl Future for Signals { // impl Future for Signals {
type Output = (); // type Output = ();
//
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))] // #[cfg(not(unix))]
match Pin::new(&mut self.stream).poll(cx) { // match Pin::new(&mut self.stream).poll(cx) {
Poll::Ready(_) => { // Poll::Ready(_) => {
self.srv.signal(Signal::Int); // self.srv.signal(Signal::Int);
Poll::Ready(()) // Poll::Ready(())
} // }
Poll::Pending => Poll::Pending, // Poll::Pending => Poll::Pending,
} // }
#[cfg(unix)] // #[cfg(unix)]
{ // {
use futures_util::stream::Stream; // use futures_util::stream::Stream;
//
for idx in 0..self.streams.len() { // for idx in 0..self.streams.len() {
loop { // loop {
match Pin::new(&mut self.streams[idx].1).poll_next(cx) { // match Pin::new(&mut self.streams[idx].1).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()), // Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break, // Poll::Pending => break,
Poll::Ready(Some(_)) => { // Poll::Ready(Some(_)) => {
let sig = self.streams[idx].0; // let sig = self.streams[idx].0;
self.srv.signal(sig); // self.srv.signal(sig);
} // }
} // }
} // }
} // }
Poll::Pending // Poll::Pending
} // }
} // }
} // }

View File

@ -299,7 +299,11 @@ enum WorkerState {
Token, Token,
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>, LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
), ),
Shutdown(Sleep, Sleep, Option<oneshot::Sender<bool>>), Shutdown(
Pin<Box<Sleep>>,
Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>,
),
} }
impl Future for Worker { impl Future for Worker {
@ -322,8 +326,8 @@ impl Future for Worker {
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
sleep_until(Instant::now() + Duration::from_secs(1)), Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))),
sleep_until(Instant::now() + self.shutdown_timeout), Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)),
Some(result), Some(result),
); );
} else { } else {
@ -398,9 +402,9 @@ impl Future for Worker {
} }
// sleep for 1 second and then check again // sleep for 1 second and then check again
if Pin::new(&mut *t1).poll(cx).is_ready() { if t1.as_mut().poll(cx).is_ready() {
*t1 = sleep_until(Instant::now() + Duration::from_secs(1)); *t1 = Box::pin(sleep_until(Instant::now() + Duration::from_secs(1)));
let _ = Pin::new(t1).poll(cx); let _ = t1.as_mut().poll(cx);
} }
Poll::Pending Poll::Pending

View File

@ -183,14 +183,17 @@ impl<T: ServiceFactory> PipelineFactory<T> {
factory: F, factory: F,
) -> PipelineFactory< ) -> PipelineFactory<
impl ServiceFactory< impl ServiceFactory<
Request = T::Request, Request = T::Request,
Response = U::Response, Response = U::Response,
Error = T::Error, Error = T::Error,
Config = T::Config, Config = T::Config,
InitError = T::InitError, InitError = T::InitError,
Service = impl Service<Request = T::Request, Response = U::Response, Error = T::Error> Service = impl Service<
+ Clone, Request = T::Request,
> + Clone, Response = U::Response,
Error = T::Error,
> + Clone,
> + Clone,
> >
where where
Self: Sized, Self: Sized,
@ -218,13 +221,13 @@ impl<T: ServiceFactory> PipelineFactory<T> {
f: F, f: F,
) -> PipelineFactory< ) -> PipelineFactory<
impl ServiceFactory< impl ServiceFactory<
Request = T::Request, Request = T::Request,
Response = Res, Response = Res,
Error = Err, Error = Err,
Config = T::Config, Config = T::Config,
InitError = T::InitError, InitError = T::InitError,
Service = impl Service<Request = T::Request, Response = Res, Error = Err> + Clone, Service = impl Service<Request = T::Request, Response = Res, Error = Err> + Clone,
> + Clone, > + Clone,
> >
where where
Self: Sized, Self: Sized,
@ -251,14 +254,17 @@ impl<T: ServiceFactory> PipelineFactory<T> {
factory: F, factory: F,
) -> PipelineFactory< ) -> PipelineFactory<
impl ServiceFactory< impl ServiceFactory<
Request = T::Request, Request = T::Request,
Response = U::Response, Response = U::Response,
Error = T::Error, Error = T::Error,
Config = T::Config, Config = T::Config,
InitError = T::InitError, InitError = T::InitError,
Service = impl Service<Request = T::Request, Response = U::Response, Error = T::Error> Service = impl Service<
+ Clone, Request = T::Request,
> + Clone, Response = U::Response,
Error = T::Error,
> + Clone,
> + Clone,
> >
where where
Self: Sized, Self: Sized,

View File

@ -56,7 +56,7 @@ native-tls = { version = "0.2", optional = true }
tokio-native-tls = { version = "0.2.0", optional = true } tokio-native-tls = { version = "0.2.0", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.6" bytes = "1"
log = "0.4" log = "0.4"
env_logger = "0.7" env_logger = "0.7"
actix-testing = "1.0.0" actix-testing = "1.0.0"

View File

@ -20,7 +20,7 @@ actix-codec = "0.3.0"
actix-rt = "1.1.1" actix-rt = "1.1.1"
actix-service = "1.0.6" actix-service = "1.0.6"
bitflags = "1.2.1" bitflags = "1.2.1"
bytes = "0.6" bytes = "1"
either = "1.5.3" either = "1.5.3"
futures-channel = { version = "0.3.4", default-features = false } futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }

View File

@ -53,9 +53,9 @@ where
type Request = R; type Request = R;
type Response = R; type Response = R;
type Error = E; type Error = E;
type InitError = Infallible;
type Config = (); type Config = ();
type Service = KeepAliveService<R, E, F>; type Service = KeepAliveService<R, E, F>;
type InitError = Infallible;
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
@ -71,7 +71,7 @@ pub struct KeepAliveService<R, E, F> {
f: F, f: F,
ka: Duration, ka: Duration,
time: LowResTimeService, time: LowResTimeService,
delay: Sleep, delay: Pin<Box<Sleep>>,
expire: Instant, expire: Instant,
_t: PhantomData<(R, E)>, _t: PhantomData<(R, E)>,
} }
@ -87,7 +87,7 @@ where
ka, ka,
time, time,
expire, expire,
delay: sleep_until(expire), delay: Box::pin(sleep_until(expire)),
_t: PhantomData, _t: PhantomData,
} }
} }
@ -103,13 +103,13 @@ where
type Future = Ready<Result<R, E>>; type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.delay).poll(cx) { match self.delay.as_mut().poll(cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
let now = Instant::from_std(self.time.now()); let now = Instant::from_std(self.time.now());
if self.expire <= now { if self.expire <= now {
Poll::Ready(Err((self.f)())) Poll::Ready(Err((self.f)()))
} else { } else {
self.delay.reset(self.expire); self.delay.as_mut().reset(self.expire);
let _ = Pin::new(&mut self.delay).poll(cx); let _ = Pin::new(&mut self.delay).poll(cx);
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }

View File

@ -85,8 +85,8 @@ where
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type InitError = E;
type Transform = TimeoutService<S>; type Transform = TimeoutService<S>;
type InitError = E;
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
@ -146,6 +146,7 @@ where
pub struct TimeoutServiceResponse<T: Service> { pub struct TimeoutServiceResponse<T: Service> {
#[pin] #[pin]
fut: T::Future, fut: T::Future,
#[pin]
sleep: Sleep, sleep: Sleep,
} }
@ -156,7 +157,7 @@ where
type Output = Result<T::Response, TimeoutError<T::Error>>; type Output = Result<T::Response, TimeoutError<T::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project(); let this = self.project();
// First, try polling the future // First, try polling the future
match this.fut.poll(cx) { match this.fut.poll(cx) {
@ -166,7 +167,7 @@ where
} }
// Now check the sleep // Now check the sleep
match Pin::new(&mut this.sleep).poll(cx) { match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
} }

View File

@ -15,7 +15,7 @@ name = "bytestring"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
bytes = "0.6" bytes = "1"
serde = { version = "1.0", optional = true } serde = { version = "1.0", optional = true }
[dev-dependencies] [dev-dependencies]