diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 22f0dc87a..b5532e49e 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -695,9 +695,9 @@ where if timer.as_mut().poll(cx).is_ready() { // payload is pending and it's time to check the ready state of io. if this.flags.contains(Flags::PAYLOAD_PENDING) { - match Pin::new(this.io.as_mut().unwrap()).poll_write_ready(cx)? { + match Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)? { // if io is ready and already closed resolve with dispatcher error. - Poll::Ready(ready) if ready.is_write_closed() => { + Poll::Ready(ready) if ready.is_read_closed() => { trace!("Response payload pending check determine remote connection is closed"); this.flags .insert(Flags::SHUTDOWN | Flags::WRITE_DISCONNECT); @@ -1002,7 +1002,8 @@ mod tests { use std::str; use actix_service::fn_service; - use futures_util::future::{lazy, ready, Ready}; + use bytes::Bytes; + use futures_util::future::{lazy, poll_fn, ready, Ready}; use super::*; use crate::{ @@ -1012,6 +1013,7 @@ mod tests { test::{TestBuffer, TestSeqBuffer}, HttpMessage, KeepAlive, }; + use tokio::io::AsyncWriteExt; fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { haystack[from..] @@ -1033,6 +1035,24 @@ mod tests { fn_service(|_req: Request| ready(Ok::<_, Error>(Response::Ok().finish()))) } + fn pending_service() -> impl Service { + struct PendingForever; + impl futures_core::Stream for PendingForever { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + fn_service(|_req: Request| async { + Ok::<_, Error>(Response::Ok().streaming(Box::pin(PendingForever))) + }) + } + fn echo_path_service() -> impl Service { fn_service(|req: Request| { let path = req.path().as_bytes(); @@ -1384,4 +1404,58 @@ mod tests { }) .await; } + + #[actix_rt::test] + async fn test_pending_interval_check() { + // TODO: use unused_addr function from actix-web. It need a counterpart in actix-http + let addr: std::net::SocketAddr = "127.0.0.1:15746" + .parse() + .expect("Test assume port 15746 is free. Please file an issue when you encounter this panic"); + + let lst = tokio::net::TcpListener::bind(addr).await.unwrap(); + + let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap(); + + let (io, _) = lst.accept().await.unwrap(); + + let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + + let services = HttpFlow::new(pending_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + io, + cfg, + services, + OnConnectData::default(), + None, + ); + + actix_rt::pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal(_))); + + let req = "\ + GET /abcd HTTP/1.1\r\n\r\n\ + "; + + stream.write(req.as_bytes()).await.unwrap(); + + actix_rt::task::yield_now().await; + + poll_fn(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + stream.shutdown().await.unwrap(); + + actix_rt::time::sleep(Duration::from_secs(2)).await; + + poll_fn(|cx| { + assert!(h1.as_mut().poll(cx).is_ready()); + Poll::Ready(()) + }) + .await; + } }