mirror of https://github.com/fafhrd91/actix-web
add test for pending interval check
This commit is contained in:
parent
1048d7df70
commit
56159a9b70
|
@ -695,9 +695,9 @@ where
|
|||
if timer.as_mut().poll(cx).is_ready() {
|
||||
// payload is pending and it's time to check the ready state of io.
|
||||
if this.flags.contains(Flags::PAYLOAD_PENDING) {
|
||||
match Pin::new(this.io.as_mut().unwrap()).poll_write_ready(cx)? {
|
||||
match Pin::new(this.io.as_mut().unwrap()).poll_read_ready(cx)? {
|
||||
// if io is ready and already closed resolve with dispatcher error.
|
||||
Poll::Ready(ready) if ready.is_write_closed() => {
|
||||
Poll::Ready(ready) if ready.is_read_closed() => {
|
||||
trace!("Response payload pending check determine remote connection is closed");
|
||||
this.flags
|
||||
.insert(Flags::SHUTDOWN | Flags::WRITE_DISCONNECT);
|
||||
|
@ -1002,7 +1002,8 @@ mod tests {
|
|||
use std::str;
|
||||
|
||||
use actix_service::fn_service;
|
||||
use futures_util::future::{lazy, ready, Ready};
|
||||
use bytes::Bytes;
|
||||
use futures_util::future::{lazy, poll_fn, ready, Ready};
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
|
@ -1012,6 +1013,7 @@ mod tests {
|
|||
test::{TestBuffer, TestSeqBuffer},
|
||||
HttpMessage, KeepAlive,
|
||||
};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
||||
haystack[from..]
|
||||
|
@ -1033,6 +1035,24 @@ mod tests {
|
|||
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::Ok().finish())))
|
||||
}
|
||||
|
||||
fn pending_service() -> impl Service<Request, Response = Response, Error = Error> {
|
||||
struct PendingForever;
|
||||
impl futures_core::Stream for PendingForever {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn_service(|_req: Request| async {
|
||||
Ok::<_, Error>(Response::Ok().streaming(Box::pin(PendingForever)))
|
||||
})
|
||||
}
|
||||
|
||||
fn echo_path_service() -> impl Service<Request, Response = Response, Error = Error> {
|
||||
fn_service(|req: Request| {
|
||||
let path = req.path().as_bytes();
|
||||
|
@ -1384,4 +1404,58 @@ mod tests {
|
|||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_pending_interval_check() {
|
||||
// TODO: use unused_addr function from actix-web. It need a counterpart in actix-http
|
||||
let addr: std::net::SocketAddr = "127.0.0.1:15746"
|
||||
.parse()
|
||||
.expect("Test assume port 15746 is free. Please file an issue when you encounter this panic");
|
||||
|
||||
let lst = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
|
||||
let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
|
||||
|
||||
let (io, _) = lst.accept().await.unwrap();
|
||||
|
||||
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None);
|
||||
|
||||
let services = HttpFlow::new(pending_service(), ExpectHandler, None);
|
||||
|
||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||
io,
|
||||
cfg,
|
||||
services,
|
||||
OnConnectData::default(),
|
||||
None,
|
||||
);
|
||||
|
||||
actix_rt::pin!(h1);
|
||||
|
||||
assert!(matches!(&h1.inner, DispatcherState::Normal(_)));
|
||||
|
||||
let req = "\
|
||||
GET /abcd HTTP/1.1\r\n\r\n\
|
||||
";
|
||||
|
||||
stream.write(req.as_bytes()).await.unwrap();
|
||||
|
||||
actix_rt::task::yield_now().await;
|
||||
|
||||
poll_fn(|cx| {
|
||||
assert!(h1.as_mut().poll(cx).is_pending());
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
|
||||
stream.shutdown().await.unwrap();
|
||||
|
||||
actix_rt::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
poll_fn(|cx| {
|
||||
assert!(h1.as_mut().poll(cx).is_ready());
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue