From a575583a6a3b16877a65ffed55e9c0a27e68cf16 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 4 Dec 2020 08:32:46 +0800 Subject: [PATCH] update tokio-util. --- actix-codec/Cargo.toml | 2 +- actix-codec/src/framed.rs | 11 +++++--- actix-codec/src/lib.rs | 40 ----------------------------- actix-rt/tests/integration_tests.rs | 26 ------------------- actix-server/src/signals.rs | 2 +- 5 files changed, 9 insertions(+), 72 deletions(-) diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index e9672bf9..97806b74 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -23,4 +23,4 @@ futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "1.0.0" tokio = "0.3.1" -tokio-util = { version = "0.5.0", default-features = false, features = ["codec"] } +tokio-util = { version = "0.5.1", features = ["codec", "io"] } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 2eb33b99..3c6447a9 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -221,13 +221,16 @@ impl Framed { this.read_buf.reserve(HW - remaining) } - // FIXME: Use poll_read_buf from tokio_util - match crate::util::poll_read_buf(this.io, cx, this.read_buf) { + let cnt = match tokio_util::io::poll_read_buf(this.io, cx, this.read_buf) { Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Ok(cnt)) if cnt == 0 => this.flags.insert(Flags::EOF), - _ => this.flags.insert(Flags::READABLE), + Poll::Ready(Ok(cnt)) => cnt, + }; + + if cnt == 0 { + this.flags.insert(Flags::EOF); } + this.flags.insert(Flags::READABLE); } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 9318ae5c..b5f32066 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -20,43 +20,3 @@ pub use self::framed::{Framed, FramedParts}; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio_util::codec::{Decoder, Encoder}; - -// FIXME: Remove this -/// temporary mod -pub mod util { - use super::*; - - use std::pin::Pin; - use std::task::{Context, Poll}; - - /// temporary poll_read_buf function until it lands in tokio_util - pub fn poll_read_buf( - io: Pin<&mut T>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let n = { - let dst = buf.bytes_mut(); - let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit]) }; - let mut buf = ReadBuf::uninit(dst); - let ptr = buf.filled().as_ptr(); - futures_core::ready!(io.poll_read(cx, &mut buf)?); - - // Ensure the pointer does not change from under us - assert_eq!(ptr, buf.filled().as_ptr()); - buf.filled().len() - }; - - // Safety: This is guaranteed to be the number of initialized (and read) - // bytes due to the invariants provided by `ReadBuf::filled`. - unsafe { - buf.advance_mut(n); - } - - Poll::Ready(Ok(n)) - } -} diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 5471f800..c1d2b910 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -112,29 +112,3 @@ fn join_current_arbiter() { "local_join should await only for the already spawned futures" ); } - -#[test] -fn non_static_block_on() { - let string = String::from("test_str"); - let str = string.as_str(); - - let sys = actix_rt::System::new("borrow some"); - - sys.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); - }); - - let rt = actix_rt::Runtime::new().unwrap(); - - rt.block_on(async { - actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); - }); - - actix_rt::System::run(|| { - assert_eq!("test_str", str); - actix_rt::System::current().stop(); - }) - .unwrap(); -} diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 72208e14..6a0d7da9 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -21,7 +21,7 @@ pub(crate) enum Signal { pub(crate) struct Signals { srv: Server, #[cfg(not(unix))] - stream: Pin>>>, + stream: Pin>>>, #[cfg(unix)] streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, }