mirror of https://github.com/fafhrd91/actix-net
update tokio-util.
This commit is contained in:
parent
582660910b
commit
a575583a6a
|
@ -23,4 +23,4 @@ futures-sink = { version = "0.3.4", default-features = false }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-project = "1.0.0"
|
pin-project = "1.0.0"
|
||||||
tokio = "0.3.1"
|
tokio = "0.3.1"
|
||||||
tokio-util = { version = "0.5.0", default-features = false, features = ["codec"] }
|
tokio-util = { version = "0.5.1", features = ["codec", "io"] }
|
||||||
|
|
|
@ -221,13 +221,16 @@ impl<T, U> Framed<T, U> {
|
||||||
this.read_buf.reserve(HW - remaining)
|
this.read_buf.reserve(HW - remaining)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Use poll_read_buf from tokio_util
|
let cnt = match tokio_util::io::poll_read_buf(this.io, cx, this.read_buf) {
|
||||||
match crate::util::poll_read_buf(this.io, cx, this.read_buf) {
|
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
||||||
Poll::Ready(Ok(cnt)) if cnt == 0 => this.flags.insert(Flags::EOF),
|
Poll::Ready(Ok(cnt)) => cnt,
|
||||||
_ => this.flags.insert(Flags::READABLE),
|
};
|
||||||
|
|
||||||
|
if cnt == 0 {
|
||||||
|
this.flags.insert(Flags::EOF);
|
||||||
}
|
}
|
||||||
|
this.flags.insert(Flags::READABLE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,43 +20,3 @@ pub use self::framed::{Framed, FramedParts};
|
||||||
|
|
||||||
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
pub use tokio_util::codec::{Decoder, Encoder};
|
pub use tokio_util::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
// FIXME: Remove this
|
|
||||||
/// temporary mod
|
|
||||||
pub mod util {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
/// temporary poll_read_buf function until it lands in tokio_util
|
|
||||||
pub fn poll_read_buf<T: AsyncRead, B: bytes::BufMut>(
|
|
||||||
io: Pin<&mut T>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut B,
|
|
||||||
) -> Poll<std::io::Result<usize>> {
|
|
||||||
if !buf.has_remaining_mut() {
|
|
||||||
return Poll::Ready(Ok(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
let n = {
|
|
||||||
let dst = buf.bytes_mut();
|
|
||||||
let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>]) };
|
|
||||||
let mut buf = ReadBuf::uninit(dst);
|
|
||||||
let ptr = buf.filled().as_ptr();
|
|
||||||
futures_core::ready!(io.poll_read(cx, &mut buf)?);
|
|
||||||
|
|
||||||
// Ensure the pointer does not change from under us
|
|
||||||
assert_eq!(ptr, buf.filled().as_ptr());
|
|
||||||
buf.filled().len()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Safety: This is guaranteed to be the number of initialized (and read)
|
|
||||||
// bytes due to the invariants provided by `ReadBuf::filled`.
|
|
||||||
unsafe {
|
|
||||||
buf.advance_mut(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
Poll::Ready(Ok(n))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -112,29 +112,3 @@ fn join_current_arbiter() {
|
||||||
"local_join should await only for the already spawned futures"
|
"local_join should await only for the already spawned futures"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn non_static_block_on() {
|
|
||||||
let string = String::from("test_str");
|
|
||||||
let str = string.as_str();
|
|
||||||
|
|
||||||
let sys = actix_rt::System::new("borrow some");
|
|
||||||
|
|
||||||
sys.block_on(async {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
assert_eq!("test_str", str);
|
|
||||||
});
|
|
||||||
|
|
||||||
let rt = actix_rt::Runtime::new().unwrap();
|
|
||||||
|
|
||||||
rt.block_on(async {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
assert_eq!("test_str", str);
|
|
||||||
});
|
|
||||||
|
|
||||||
actix_rt::System::run(|| {
|
|
||||||
assert_eq!("test_str", str);
|
|
||||||
actix_rt::System::current().stop();
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ pub(crate) enum Signal {
|
||||||
pub(crate) struct Signals {
|
pub(crate) struct Signals {
|
||||||
srv: Server,
|
srv: Server,
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
stream: Pin<Box<dyn Future<Output = io::Result<()>>>>,
|
stream: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
|
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue