mirror of https://github.com/fafhrd91/actix-net
fix compat layer on trust-dns
This commit is contained in:
parent
b024b23a53
commit
f8189462c6
|
@ -2,13 +2,12 @@ use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::{fmt, io};
|
use std::{fmt, io};
|
||||||
|
|
||||||
use bytes::{Buf, BufMut, BytesMut};
|
use bytes::{Buf, BytesMut};
|
||||||
use futures_core::{ready, Stream};
|
use futures_core::{ready, Stream};
|
||||||
use futures_sink::Sink;
|
use futures_sink::Sink;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
|
||||||
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf};
|
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
|
||||||
use std::mem::MaybeUninit;
|
|
||||||
|
|
||||||
/// Low-water mark
|
/// Low-water mark
|
||||||
const LW: usize = 1024;
|
const LW: usize = 1024;
|
||||||
|
@ -222,30 +221,14 @@ impl<T, U> Framed<T, U> {
|
||||||
this.read_buf.reserve(HW - remaining)
|
this.read_buf.reserve(HW - remaining)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME: Is this the right way to do it for now? (line 225 - 244)
|
// FIXME: Use poll_read_buf from tokio_util
|
||||||
let dst = this.read_buf.bytes_mut();
|
match crate::util::poll_read_buf(this.io, cx, this.read_buf) {
|
||||||
// SAFETY: `BufMut::bytes_mut` only return an empty slice when remaining is 0.
|
|
||||||
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
|
|
||||||
let mut buf = ReadBuf::uninit(dst);
|
|
||||||
|
|
||||||
let cnt = match this.io.poll_read(cx, &mut buf) {
|
|
||||||
Poll::Pending => return Poll::Pending,
|
Poll::Pending => return Poll::Pending,
|
||||||
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
|
||||||
Poll::Ready(Ok(())) => buf.filled().len(),
|
Poll::Ready(Ok(cnt)) if cnt == 0 => this.flags.insert(Flags::EOF),
|
||||||
};
|
_ => this.flags.insert(Flags::READABLE),
|
||||||
|
|
||||||
if cnt == 0 {
|
|
||||||
this.flags.insert(Flags::EOF);
|
|
||||||
} else {
|
|
||||||
// SAFETY: This is guaranteed to be the number of initialized (and read)
|
|
||||||
// bytes due to the invariants provided by `ReadBuf::filled`.
|
|
||||||
unsafe {
|
|
||||||
this.read_buf.advance_mut(cnt);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.flags.insert(Flags::READABLE);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Flush write buffer to underlying I/O stream.
|
/// Flush write buffer to underlying I/O stream.
|
||||||
|
|
|
@ -20,3 +20,43 @@ pub use self::framed::{Framed, FramedParts};
|
||||||
|
|
||||||
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
pub use tokio_util::codec::{Decoder, Encoder};
|
pub use tokio_util::codec::{Decoder, Encoder};
|
||||||
|
|
||||||
|
// FIXME: Remove this
|
||||||
|
/// temporary mod
|
||||||
|
pub mod util {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// temporary poll_read_buf function until it lands in tokio_util
|
||||||
|
pub fn poll_read_buf<T: AsyncRead, B: bytes::BufMut>(
|
||||||
|
io: Pin<&mut T>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut B,
|
||||||
|
) -> Poll<std::io::Result<usize>> {
|
||||||
|
if !buf.has_remaining_mut() {
|
||||||
|
return Poll::Ready(Ok(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
let n = {
|
||||||
|
let dst = buf.bytes_mut();
|
||||||
|
let dst = unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>]) };
|
||||||
|
let mut buf = ReadBuf::uninit(dst);
|
||||||
|
let ptr = buf.filled().as_ptr();
|
||||||
|
futures_core::ready!(io.poll_read(cx, &mut buf)?);
|
||||||
|
|
||||||
|
// Ensure the pointer does not change from under us
|
||||||
|
assert_eq!(ptr, buf.filled().as_ptr());
|
||||||
|
buf.filled().len()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Safety: This is guaranteed to be the number of initialized (and read)
|
||||||
|
// bytes due to the invariants provided by `ReadBuf::filled`.
|
||||||
|
unsafe {
|
||||||
|
buf.advance_mut(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Ready(Ok(n))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -39,7 +39,9 @@ actix-rt = "1.1.1"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
either = "1.5.3"
|
either = "1.5.3"
|
||||||
futures-util = { version = "0.3.4", default-features = false }
|
futures-util = { version = "0.3.4", default-features = false }
|
||||||
http = { version = "0.2.0", optional = true }
|
# FIXME: Use release version
|
||||||
|
http = { git = "https://github.com/paolobarbolini/http.git", branch = "bytes06", optional = true }
|
||||||
|
#http = { version = "0.2.0", optional = true }
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
tokio-compat-02 = "0.1.2"
|
tokio-compat-02 = "0.1.2"
|
||||||
trust-dns-proto = { version = "0.19", default-features = false, features = ["tokio-runtime"] }
|
trust-dns-proto = { version = "0.19", default-features = false, features = ["tokio-runtime"] }
|
||||||
|
|
|
@ -45,18 +45,12 @@ pub async fn start_resolver(
|
||||||
) -> Result<AsyncResolver, ConnectError> {
|
) -> Result<AsyncResolver, ConnectError> {
|
||||||
// FIXME: remove compat layer
|
// FIXME: remove compat layer
|
||||||
use tokio_compat_02::FutureExt;
|
use tokio_compat_02::FutureExt;
|
||||||
async {
|
Ok(AsyncResolver::tokio(cfg, opts).compat().await?)
|
||||||
Ok(AsyncResolver::tokio(cfg, opts).await?)
|
|
||||||
}.compat().await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DefaultResolver(AsyncResolver);
|
struct DefaultResolver(AsyncResolver);
|
||||||
|
|
||||||
pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
||||||
// FIXME: remove compat layer
|
|
||||||
use tokio_compat_02::FutureExt;
|
|
||||||
async {
|
|
||||||
|
|
||||||
if Arbiter::contains_item::<DefaultResolver>() {
|
if Arbiter::contains_item::<DefaultResolver>() {
|
||||||
Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone()))
|
Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone()))
|
||||||
} else {
|
} else {
|
||||||
|
@ -68,12 +62,13 @@ pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let resolver = AsyncResolver::tokio(cfg, opts).await?;
|
// FIXME: remove compat layer
|
||||||
|
use tokio_compat_02::FutureExt;
|
||||||
|
let resolver = AsyncResolver::tokio(cfg, opts).compat().await?;
|
||||||
|
|
||||||
Arbiter::set_item(DefaultResolver(resolver.clone()));
|
Arbiter::set_item(DefaultResolver(resolver.clone()));
|
||||||
Ok(resolver)
|
Ok(resolver)
|
||||||
}
|
}
|
||||||
}.compat().await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
||||||
|
|
|
@ -163,7 +163,9 @@ impl<T: Address> ResolverFuture<T> {
|
||||||
ResolverFuture {
|
ResolverFuture {
|
||||||
lookup: Box::pin(async move {
|
lookup: Box::pin(async move {
|
||||||
let resolver = resolver_clone;
|
let resolver = resolver_clone;
|
||||||
resolver.lookup_ip(host_clone).await
|
// FIXME: Remove compat layer
|
||||||
|
use tokio_compat_02::FutureExt;
|
||||||
|
resolver.lookup_ip(host_clone).compat().await
|
||||||
}),
|
}),
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
//! General purpose TCP server.
|
//! General purpose TCP server.
|
||||||
|
|
||||||
|
#![deny(rust_2018_idioms)]
|
||||||
|
|
||||||
mod accept;
|
mod accept;
|
||||||
mod builder;
|
mod builder;
|
||||||
mod config;
|
mod config;
|
||||||
|
|
Loading…
Reference in New Issue