override to forked tokio

This commit is contained in:
fakeshadow 2021-03-16 13:25:20 +08:00
parent 6f03ef4c8c
commit 06237637cb
3 changed files with 26 additions and 18 deletions

View File

@ -141,6 +141,9 @@ actix-multipart = { path = "actix-multipart" }
actix-files = { path = "actix-files" } actix-files = { path = "actix-files" }
awc = { path = "awc" } awc = { path = "awc" }
actix-rt = { git = "https://github.com/actix/actix-net.git", branch = "feat/net_poll_ready" }
tokio = { git = "https://github.com/fakeshadow/tokio", branch = "feature/net_poll_ready" }
[[bench]] [[bench]]
name = "server" name = "server"
harness = false harness = false

View File

@ -695,15 +695,20 @@ where
if timer.as_mut().poll(cx).is_ready() { if timer.as_mut().poll(cx).is_ready() {
// payload is pending and it's time to check the ready state of io. // payload is pending and it's time to check the ready state of io.
if this.flags.contains(Flags::PAYLOAD_PENDING) { if this.flags.contains(Flags::PAYLOAD_PENDING) {
// only interest in the error type. match Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)? {
// The io is ready or not is not important. // if io is ready and already closed resolve with dispatcher error.
let _ = Poll::Ready(ready) if ready.is_read_closed() => {
Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)?; trace!("Response payload pending check determine remote connection is dropped");
// reset the interval and check again after 1 second. return Err(DispatchError::DisconnectTimeout);
timer }
.as_mut() _ => {
.reset(Instant::now() + Duration::from_secs(1)); // reset the interval and check again after 1 second.
let _ = timer.poll(cx); timer
.as_mut()
.reset(Instant::now() + Duration::from_secs(1));
let _ = timer.poll(cx);
}
}
// got timeout during shutdown, drop connection // got timeout during shutdown, drop connection
} else if this.flags.contains(Flags::SHUTDOWN) { } else if this.flags.contains(Flags::SHUTDOWN) {
return Err(DispatchError::DisconnectTimeout); return Err(DispatchError::DisconnectTimeout);

View File

@ -10,7 +10,7 @@ use std::{
}; };
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::ActixStream; use actix_rt::net::{ActixStream, Ready};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use http::{Method, Uri, Version}; use http::{Method, Uri, Version};
@ -399,21 +399,21 @@ impl AsyncWrite for TestSeqBuffer {
} }
impl ActixStream for TestBuffer { impl ActixStream for TestBuffer {
fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<Ready>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(Ready::READABLE))
} }
fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<Ready>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(Ready::WRITABLE))
} }
} }
impl ActixStream for TestSeqBuffer { impl ActixStream for TestSeqBuffer {
fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_read_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<Ready>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(Ready::READABLE))
} }
fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_write_ready(&self, _: &mut Context<'_>) -> Poll<io::Result<Ready>> {
Poll::Ready(Ok(())) Poll::Ready(Ok(Ready::WRITABLE))
} }
} }