update tokio-util

This commit is contained in:
fakeshadow 2021-01-31 04:42:18 -08:00
parent 057e7cd7c9
commit 3de4dbf647
4 changed files with 16 additions and 5 deletions

View File

@ -1,6 +1,10 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Upgrade `tokio-util` dependency to `0.6.3`. [#260]
* Export `tokio_util::sync::ReusableBoxFuture`. [#260]
[#260]: https://github.com/actix/actix-net/pull/260
## 0.4.0-beta.1 - 2020-12-28 ## 0.4.0-beta.1 - 2020-12-28

View File

@ -23,4 +23,4 @@ futures-sink = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"
tokio = "1" tokio = "1"
tokio-util = { version = "0.6", features = ["codec", "io"] } tokio-util = { version = "0.6.3", features = ["codec", "io"] }

View File

@ -21,3 +21,5 @@ pub use self::framed::{Framed, FramedParts};
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub use tokio_util::codec::{Decoder, Encoder}; pub use tokio_util::codec::{Decoder, Encoder};
pub use tokio_util::io::poll_read_buf; pub use tokio_util::io::poll_read_buf;
// TODO: reconsider ReusableBoxFuture should be re-export from actix-codec.
pub use tokio_util::sync::ReusableBoxFuture;

View File

@ -7,6 +7,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use actix_codec::ReusableBoxFuture;
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
@ -65,7 +66,7 @@ pub enum TcpConnectorResponse<T> {
req: Option<T>, req: Option<T>,
port: u16, port: u16,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<LocalBoxFuture<'static, Result<TcpStream, io::Error>>>, stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
}, },
Error(Option<ConnectError>), Error(Option<ConnectError>),
} }
@ -90,7 +91,7 @@ impl<T: Address> TcpConnectorResponse<T> {
req: Some(req), req: Some(req),
port, port,
addrs: None, addrs: None,
stream: Some(Box::pin(TcpStream::connect(addr))), stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
}, },
// when resolver returns multiple socket addr for request they would be popped from // when resolver returns multiple socket addr for request they would be popped from
@ -119,7 +120,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
stream, stream,
} => loop { } => loop {
if let Some(new) = stream.as_mut() { if let Some(new) = stream.as_mut() {
match ready!(new.as_mut().poll(cx)) { match ready!(new.get_pin().poll(cx)) {
Ok(sock) => { Ok(sock) => {
let req = req.take().unwrap(); let req = req.take().unwrap();
trace!( trace!(
@ -146,7 +147,11 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
// try to connect // try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap(); let addr = addrs.as_mut().unwrap().pop_front().unwrap();
*stream = Some(Box::pin(TcpStream::connect(addr))); if let Some(ref mut stream) = *stream {
stream.set(TcpStream::connect(addr));
} else {
*stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr)));
}
}, },
} }
} }