From 06237637cb77c90c518c1d0035b12ed57e46bbbc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 16 Mar 2021 13:25:20 +0800 Subject: [PATCH] override to forked tokio --- Cargo.toml | 3 +++ actix-http/src/h1/dispatcher.rs | 23 ++++++++++++++--------- actix-http/src/test.rs | 18 +++++++++--------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8477c8ede..d21a39fdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,6 +141,9 @@ actix-multipart = { path = "actix-multipart" } actix-files = { path = "actix-files" } awc = { path = "awc" } +actix-rt = { git = "https://github.com/actix/actix-net.git", branch = "feat/net_poll_ready" } +tokio = { git = "https://github.com/fakeshadow/tokio", branch = "feature/net_poll_ready" } + [[bench]] name = "server" harness = false diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index c63498d02..7fc839bd9 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -695,15 +695,20 @@ where if timer.as_mut().poll(cx).is_ready() { // payload is pending and it's time to check the ready state of io. if this.flags.contains(Flags::PAYLOAD_PENDING) { - // only interest in the error type. - // The io is ready or not is not important. - let _ = - Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)?; - // reset the interval and check again after 1 second. - timer - .as_mut() - .reset(Instant::now() + Duration::from_secs(1)); - let _ = timer.poll(cx); + match Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)? { + // if io is ready and already closed resolve with dispatcher error. + Poll::Ready(ready) if ready.is_read_closed() => { + trace!("Response payload pending check determine remote connection is dropped"); + return Err(DispatchError::DisconnectTimeout); + } + _ => { + // reset the interval and check again after 1 second. + timer + .as_mut() + .reset(Instant::now() + Duration::from_secs(1)); + let _ = timer.poll(cx); + } + } // got timeout during shutdown, drop connection } else if this.flags.contains(Flags::SHUTDOWN) { return Err(DispatchError::DisconnectTimeout); diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 77bd459f1..c8ad0d726 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -10,7 +10,7 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; -use actix_rt::net::ActixStream; +use actix_rt::net::{ActixStream, Ready}; use bytes::{Bytes, BytesMut}; use http::{Method, Uri, Version}; @@ -399,21 +399,21 @@ impl AsyncWrite for TestSeqBuffer { } impl ActixStream for TestBuffer { - fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(Ready::READABLE)) } - fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(Ready::WRITABLE)) } } impl ActixStream for TestSeqBuffer { - fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(Ready::READABLE)) } - fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(Ready::WRITABLE)) } }