From aa8d33a22be0266e7ea1b17633179ddef8a150b1 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 23 Dec 2020 06:54:25 +0800 Subject: [PATCH] start migrate to tokio 1.0 --- .github/workflows/linux.yml | 2 +- Cargo.toml | 4 ++ actix-codec/CHANGES.md | 5 +- actix-codec/Cargo.toml | 6 +- actix-codec/src/bcodec.rs | 2 +- actix-connect/Cargo.toml | 2 +- actix-rt/Cargo.toml | 4 +- actix-server/Cargo.toml | 4 +- actix-server/src/signals.rs | 104 ++++++++++++++++++++-------------- actix-server/src/worker.rs | 16 ++++-- actix-service/src/pipeline.rs | 52 +++++++++-------- actix-tls/Cargo.toml | 2 +- actix-utils/Cargo.toml | 2 +- actix-utils/src/keepalive.rs | 10 ++-- actix-utils/src/timeout.rs | 7 ++- string/Cargo.toml | 2 +- 16 files changed, 128 insertions(+), 96 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 8a5e19d5..4394457a 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -14,7 +14,7 @@ jobs: fail-fast: false matrix: version: - - 1.43.0 + - 1.44.0 - stable - nightly diff --git a/Cargo.toml b/Cargo.toml index f032478a..63941d7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,7 @@ actix-tracing = { path = "actix-tracing" } actix-utils = { path = "actix-utils" } actix-router = { path = "router" } bytestring = { path = "string" } + +bytes = { git = "https://github.com/tokio-rs/bytes.git" } +tokio = { git = "https://github.com/tokio-rs/tokio.git" } +tokio-util = { git = "https://github.com/tokio-rs/tokio.git" } \ No newline at end of file diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index 3182f737..a9d6772c 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -2,8 +2,9 @@ ## Unreleased - 2020-xx-xx * Upgrade `pin-project` to `1.0`. -* Update `tokio` dependency to 0.3.1. -* Update `tokio-util` dependency to 0.5.1. +* Upgrade `tokio` dependency to `1`. +* Upgrade `tokio-util` dependency to `0.6`. +* Upgrade `bytes` dependency to `1`. ## 0.3.0 - 2020-08-23 * No changes from beta 2. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 97806b74..88ad4196 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -17,10 +17,10 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" -bytes = "0.6" +bytes = "1" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "1.0.0" -tokio = "0.3.1" -tokio-util = { version = "0.5.1", features = ["codec", "io"] } +tokio = "1" +tokio-util = { version = "0.6", features = ["codec", "io"] } diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index 045b20a2..b06279ea 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -14,7 +14,7 @@ impl Encoder for BytesCodec { #[inline] fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { - dst.extend_from_slice(item.bytes()); + dst.extend_from_slice(item.chunk()); Ok(()) } } diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index a93722b1..ca57c1b7 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -56,6 +56,6 @@ tokio-rustls = { version = "0.20.0", optional = true } webpki = { version = "0.21", optional = true } [dev-dependencies] -bytes = "0.6" +bytes = "1" actix-testing = "1.0.0" futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 1918c17c..676b7358 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,7 +21,7 @@ copyless = "0.1.4" futures-channel = "0.3.4" futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } smallvec = "1" -tokio = { version = "0.3.1", features = ["rt", "net", "signal", "stream", "time"] } +tokio = { version = "1", features = ["rt", "net", "signal", "time"] } [dev-dependencies] -tokio = { version = "0.3.1", features = ["full"] } +tokio = { version = "1", features = ["full"] } diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 1e47f92e..0cd46dc3 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -35,7 +35,7 @@ slab = "0.4" [dev-dependencies] actix-testing = "1.0.0" -bytes = "0.6" +bytes = "1" env_logger = "0.7" futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } -tokio = { version = "0.3.1", features = ["full"] } +tokio = { version = "1", features = ["full"] } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 6a0d7da9..7a85b9a8 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,6 +1,6 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +// use std::future::Future; +// use std::pin::Pin; +// use std::task::{Context, Poll}; use crate::server::Server; @@ -19,11 +19,11 @@ pub(crate) enum Signal { } pub(crate) struct Signals { - srv: Server, - #[cfg(not(unix))] - stream: Pin>>>, - #[cfg(unix)] - streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, + // srv: Server, +// #[cfg(not(unix))] +// stream: Pin>>>, +// #[cfg(unix)] +// streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, } impl Signals { @@ -39,8 +39,6 @@ impl Signals { { use actix_rt::signal::unix; - let mut streams = Vec::new(); - let sig_map = [ (unix::SignalKind::interrupt(), Signal::Int), (unix::SignalKind::hangup(), Signal::Hup), @@ -50,7 +48,14 @@ impl Signals { for (kind, sig) in sig_map.iter() { match unix::signal(*kind) { - Ok(stream) => streams.push((*sig, stream)), + Ok(mut stream) => { + let sig = *sig; + let srv = srv.clone(); + actix_rt::spawn(async move { + stream.recv().await; + srv.signal(sig); + }); + } Err(e) => log::error!( "Can not initialize stream handler for {:?} err: {}", sig, @@ -59,40 +64,51 @@ impl Signals { } } - actix_rt::spawn(Signals { srv, streams }); + // for (kind, sig) in sig_map.iter() { + // match unix::signal(*kind) { + // Ok(stream) => streams.push((*sig, stream)), + // Err(e) => log::error!( + // "Can not initialize stream handler for {:?} err: {}", + // sig, + // e + // ), + // } + // } + // + // actix_rt::spawn(Signals { srv, streams }); } } } -impl Future for Signals { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(not(unix))] - match Pin::new(&mut self.stream).poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - #[cfg(unix)] - { - use futures_util::stream::Stream; - - for idx in 0..self.streams.len() { - loop { - match Pin::new(&mut self.streams[idx].1).poll_next(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => break, - Poll::Ready(Some(_)) => { - let sig = self.streams[idx].0; - self.srv.signal(sig); - } - } - } - } - Poll::Pending - } - } -} +// impl Future for Signals { +// type Output = (); +// +// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// #[cfg(not(unix))] +// match Pin::new(&mut self.stream).poll(cx) { +// Poll::Ready(_) => { +// self.srv.signal(Signal::Int); +// Poll::Ready(()) +// } +// Poll::Pending => Poll::Pending, +// } +// #[cfg(unix)] +// { +// use futures_util::stream::Stream; +// +// for idx in 0..self.streams.len() { +// loop { +// match Pin::new(&mut self.streams[idx].1).poll_next(cx) { +// Poll::Ready(None) => return Poll::Ready(()), +// Poll::Pending => break, +// Poll::Ready(Some(_)) => { +// let sig = self.streams[idx].0; +// self.srv.signal(sig); +// } +// } +// } +// } +// Poll::Pending +// } +// } +// } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 298c8048..a235d076 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -299,7 +299,11 @@ enum WorkerState { Token, LocalBoxFuture<'static, Result, ()>>, ), - Shutdown(Sleep, Sleep, Option>), + Shutdown( + Pin>, + Pin>, + Option>, + ), } impl Future for Worker { @@ -322,8 +326,8 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - sleep_until(Instant::now() + Duration::from_secs(1)), - sleep_until(Instant::now() + self.shutdown_timeout), + Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), + Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), Some(result), ); } else { @@ -398,9 +402,9 @@ impl Future for Worker { } // sleep for 1 second and then check again - if Pin::new(&mut *t1).poll(cx).is_ready() { - *t1 = sleep_until(Instant::now() + Duration::from_secs(1)); - let _ = Pin::new(t1).poll(cx); + if t1.as_mut().poll(cx).is_ready() { + *t1 = Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))); + let _ = t1.as_mut().poll(cx); } Poll::Pending diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 3951809d..75cd6af9 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -183,14 +183,17 @@ impl PipelineFactory { factory: F, ) -> PipelineFactory< impl ServiceFactory< - Request = T::Request, - Response = U::Response, - Error = T::Error, - Config = T::Config, - InitError = T::InitError, - Service = impl Service - + Clone, - > + Clone, + Request = T::Request, + Response = U::Response, + Error = T::Error, + Config = T::Config, + InitError = T::InitError, + Service = impl Service< + Request = T::Request, + Response = U::Response, + Error = T::Error, + > + Clone, + > + Clone, > where Self: Sized, @@ -218,13 +221,13 @@ impl PipelineFactory { f: F, ) -> PipelineFactory< impl ServiceFactory< - Request = T::Request, - Response = Res, - Error = Err, - Config = T::Config, - InitError = T::InitError, - Service = impl Service + Clone, - > + Clone, + Request = T::Request, + Response = Res, + Error = Err, + Config = T::Config, + InitError = T::InitError, + Service = impl Service + Clone, + > + Clone, > where Self: Sized, @@ -251,14 +254,17 @@ impl PipelineFactory { factory: F, ) -> PipelineFactory< impl ServiceFactory< - Request = T::Request, - Response = U::Response, - Error = T::Error, - Config = T::Config, - InitError = T::InitError, - Service = impl Service - + Clone, - > + Clone, + Request = T::Request, + Response = U::Response, + Error = T::Error, + Config = T::Config, + InitError = T::InitError, + Service = impl Service< + Request = T::Request, + Response = U::Response, + Error = T::Error, + > + Clone, + > + Clone, > where Self: Sized, diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index b68e135c..7acc2087 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -56,7 +56,7 @@ native-tls = { version = "0.2", optional = true } tokio-native-tls = { version = "0.2.0", optional = true } [dev-dependencies] -bytes = "0.6" +bytes = "1" log = "0.4" env_logger = "0.7" actix-testing = "1.0.0" diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 92087ed8..4c3bf06a 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -20,7 +20,7 @@ actix-codec = "0.3.0" actix-rt = "1.1.1" actix-service = "1.0.6" bitflags = "1.2.1" -bytes = "0.6" +bytes = "1" either = "1.5.3" futures-channel = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs index 70850cfb..61ea783a 100644 --- a/actix-utils/src/keepalive.rs +++ b/actix-utils/src/keepalive.rs @@ -53,9 +53,9 @@ where type Request = R; type Response = R; type Error = E; - type InitError = Infallible; type Config = (); type Service = KeepAliveService; + type InitError = Infallible; type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { @@ -71,7 +71,7 @@ pub struct KeepAliveService { f: F, ka: Duration, time: LowResTimeService, - delay: Sleep, + delay: Pin>, expire: Instant, _t: PhantomData<(R, E)>, } @@ -87,7 +87,7 @@ where ka, time, expire, - delay: sleep_until(expire), + delay: Box::pin(sleep_until(expire)), _t: PhantomData, } } @@ -103,13 +103,13 @@ where type Future = Ready>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.delay).poll(cx) { + match self.delay.as_mut().poll(cx) { Poll::Ready(_) => { let now = Instant::from_std(self.time.now()); if self.expire <= now { Poll::Ready(Err((self.f)())) } else { - self.delay.reset(self.expire); + self.delay.as_mut().reset(self.expire); let _ = Pin::new(&mut self.delay).poll(cx); Poll::Ready(Ok(())) } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 2dc73d02..5c3b6d70 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -85,8 +85,8 @@ where type Request = S::Request; type Response = S::Response; type Error = TimeoutError; - type InitError = E; type Transform = TimeoutService; + type InitError = E; type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { @@ -146,6 +146,7 @@ where pub struct TimeoutServiceResponse { #[pin] fut: T::Future, + #[pin] sleep: Sleep, } @@ -156,7 +157,7 @@ where type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.project(); // First, try polling the future match this.fut.poll(cx) { @@ -166,7 +167,7 @@ where } // Now check the sleep - match Pin::new(&mut this.sleep).poll(cx) { + match this.sleep.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), } diff --git a/string/Cargo.toml b/string/Cargo.toml index 99085072..d3c290c8 100644 --- a/string/Cargo.toml +++ b/string/Cargo.toml @@ -15,7 +15,7 @@ name = "bytestring" path = "src/lib.rs" [dependencies] -bytes = "0.6" +bytes = "1" serde = { version = "1.0", optional = true } [dev-dependencies]