fix keepalive wake up

This commit is contained in:
Rob Ede 2022-01-29 04:24:06 +00:00
parent 42c29e47cb
commit e9aabbfd4f
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 30 additions and 15 deletions

View File

@ -80,13 +80,12 @@ impl ServiceConfig {
local_addr: Option<net::SocketAddr>, local_addr: Option<net::SocketAddr>,
) -> ServiceConfig { ) -> ServiceConfig {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(Duration::ZERO) => (Duration::ZERO, false),
KeepAlive::Timeout(val) => (val, true), KeepAlive::Timeout(val) => (val, true),
KeepAlive::Os => (Duration::ZERO, true), KeepAlive::Os => (Duration::ZERO, true),
KeepAlive::Disabled => (Duration::ZERO, false), KeepAlive::Disabled => (Duration::ZERO, false),
}; };
let keep_alive = ka_enabled.then(|| keep_alive); let keep_alive = (ka_enabled && keep_alive > Duration::ZERO).then(|| keep_alive);
ServiceConfig(Rc::new(Inner { ServiceConfig(Rc::new(Inner {
keep_alive, keep_alive,

View File

@ -207,6 +207,12 @@ impl TimerState {
*self = Self::Inactive; *self = Self::Inactive;
} }
fn init(&mut self, cx: &mut Context<'_>) {
if let TimerState::Active { timer } = self {
let _ = timer.as_mut().poll(cx);
}
}
} }
impl fmt::Display for TimerState { impl fmt::Display for TimerState {
@ -770,6 +776,8 @@ where
} }
/// Process one incoming request. /// Process one incoming request.
///
/// Boolean in return type indicates if any meaningful work was done.
fn poll_request( fn poll_request(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -786,11 +794,15 @@ where
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
let mut updated = false;
loop { loop {
log::trace!("attempt to decode frame"); log::trace!("attempt to decode frame");
match this.codec.decode(this.read_buf) { match this.codec.decode(this.read_buf) {
Ok(Some(msg)) => { Ok(Some(msg)) => {
updated = true;
log::trace!("found full frame (head)"); log::trace!("found full frame (head)");
match msg { match msg {
@ -919,8 +931,7 @@ where
} }
} }
// TODO: what's this boolean do now? Ok(updated)
Ok(false)
} }
fn poll_head_timer( fn poll_head_timer(
@ -1241,10 +1252,7 @@ where
// poll response to populate write buffer // poll response to populate write buffer
// drain indicates whether write buffer should be emptied before next run // drain indicates whether write buffer should be emptied before next run
let drain = match inner.as_mut().poll_response(cx)? { let drain = match inner.as_mut().poll_response(cx)? {
PollResponse::DrainWriteBuf => { PollResponse::DrainWriteBuf => true,
inner.flags.contains(Flags::KEEP_ALIVE);
true
}
PollResponse::DoNothing => { PollResponse::DoNothing => {
if inner.flags.contains(Flags::KEEP_ALIVE) { if inner.flags.contains(Flags::KEEP_ALIVE) {
@ -1255,6 +1263,8 @@ where
.project() .project()
.ka_timer .ka_timer
.set(deadline, line!()); .set(deadline, line!());
inner.as_mut().project().ka_timer.init(cx);
} }
} }
@ -1316,6 +1326,7 @@ where
"start shutdown because keep-alive is disabled or opted \ "start shutdown because keep-alive is disabled or opted \
out for this connection" out for this connection"
); );
inner_p.flags.remove(Flags::FINISHED);
inner_p.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);
return self.poll(cx); return self.poll(cx);
} }

View File

@ -71,8 +71,6 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
#[actix_rt::test] #[actix_rt::test]
async fn late_request() { async fn late_request() {
let _ = env_logger::try_init();
let mut buf = TestBuffer::empty(); let mut buf = TestBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None);

View File

@ -217,8 +217,6 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn camel_case_headers() { async fn camel_case_headers() {
let _ = env_logger::try_init();
let mut srv = actix_http_test::test_server(|| { let mut srv = actix_http_test::test_server(|| {
H1Service::with_config(ServiceConfig::default(), |req: Request| async move { H1Service::with_config(ServiceConfig::default(), |req: Request| async move {
let mut res = Response::ok(); let mut res = Response::ok();

View File

@ -309,6 +309,8 @@ async fn test_http1_keepalive() {
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive_timeout() { async fn test_http1_keepalive_timeout() {
let _ = env_logger::try_init();
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(1) .keep_alive(1)
@ -317,14 +319,21 @@ async fn test_http1_keepalive_timeout() {
}) })
.await; .await;
log::debug!(target: "actix-test", "connect");
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024]; log::debug!(target: "actix-test", "send req");
let _ = stream.write_all(b"GET /test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 256];
log::debug!(target: "actix-test", "first read");
let _ = stream.read(&mut data); let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
log::debug!(target: "actix-test", "sleep");
thread::sleep(Duration::from_millis(1100)); thread::sleep(Duration::from_millis(1100));
let mut data = vec![0; 1024]; log::debug!(target: "actix-test", "second read");
let mut data = vec![0; 256];
let res = stream.read(&mut data).unwrap(); let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0); assert_eq!(res, 0);