From 438451edf2c9fa96a431edeb1c0037c03f77f206 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 11 Feb 2021 20:45:13 -0800 Subject: [PATCH] refactor DateService --- actix-http/src/config.rs | 173 +++++++++++++++++++++++------------ actix-http/src/h1/codec.rs | 4 +- actix-http/src/h1/encoder.rs | 12 +-- 3 files changed, 125 insertions(+), 64 deletions(-) diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index 61708cfc3..719a04e18 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -4,9 +4,11 @@ use std::rc::Rc; use std::time::Duration; use std::{fmt, net}; -use actix_rt::time::{sleep, sleep_until, Instant, Sleep}; +use actix_rt::{ + task::JoinHandle, + time::{interval, sleep_until, Instant, Sleep}, +}; use bytes::BytesMut; -use futures_util::{future, FutureExt}; use time::OffsetDateTime; /// "Sun, 06 Nov 1994 08:49:37 GMT".len() @@ -49,7 +51,7 @@ struct Inner { ka_enabled: bool, secure: bool, local_addr: Option, - timer: DateService, + date_service: DateService, } impl Clone for ServiceConfig { @@ -91,7 +93,7 @@ impl ServiceConfig { client_disconnect, secure, local_addr, - timer: DateService::new(), + date_service: DateService::new(), })) } @@ -125,7 +127,7 @@ impl ServiceConfig { let delay_time = self.0.client_timeout; if delay_time != 0 { Some(sleep_until( - self.0.timer.now() + Duration::from_millis(delay_time), + self.0.date_service.now() + Duration::from_millis(delay_time), )) } else { None @@ -136,7 +138,7 @@ impl ServiceConfig { pub fn client_timer_expire(&self) -> Option { let delay = self.0.client_timeout; if delay != 0 { - Some(self.0.timer.now() + Duration::from_millis(delay)) + Some(self.0.date_service.now() + Duration::from_millis(delay)) } else { None } @@ -146,7 +148,7 @@ impl ServiceConfig { pub fn client_disconnect_timer(&self) -> Option { let delay = self.0.client_disconnect; if delay != 0 { - Some(self.0.timer.now() + Duration::from_millis(delay)) + Some(self.0.date_service.now() + Duration::from_millis(delay)) } else { None } @@ -156,7 +158,7 @@ impl ServiceConfig { /// Return keep-alive timer delay is configured. pub fn keep_alive_timer(&self) -> Option { if let Some(ka) = self.0.keep_alive { - Some(sleep_until(self.0.timer.now() + ka)) + Some(sleep_until(self.0.date_service.now() + ka)) } else { None } @@ -165,7 +167,7 @@ impl ServiceConfig { /// Keep-alive expire time pub fn keep_alive_expire(&self) -> Option { if let Some(ka) = self.0.keep_alive { - Some(self.0.timer.now() + ka) + Some(self.0.date_service.now() + ka) } else { None } @@ -173,7 +175,7 @@ impl ServiceConfig { #[inline] pub(crate) fn now(&self) -> Instant { - self.0.timer.now() + self.0.date_service.now() } #[doc(hidden)] @@ -181,7 +183,7 @@ impl ServiceConfig { let mut buf: [u8; 39] = [0; 39]; buf[..6].copy_from_slice(b"date: "); self.0 - .timer + .date_service .set_date(|date| buf[6..35].copy_from_slice(&date.bytes)); buf[35..].copy_from_slice(b"\r\n\r\n"); dst.extend_from_slice(&buf); @@ -189,7 +191,7 @@ impl ServiceConfig { pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { self.0 - .timer + .date_service .set_date(|date| dst.extend_from_slice(&date.bytes)); } } @@ -230,57 +232,78 @@ impl fmt::Write for Date { } } -#[derive(Clone)] -struct DateService(Rc); - -struct DateServiceInner { - current: Cell>, +/// Service for update Date and Instant periodically at 500 millis interval. +struct DateService { + current: Rc>, + handle: JoinHandle<()>, } -impl DateServiceInner { - fn new() -> Self { - DateServiceInner { - current: Cell::new(None), - } - } - - fn reset(&self) { - self.current.take(); - } - - fn update(&self) { - let now = Instant::now(); - let date = Date::new(); - self.current.set(Some((date, now))); +impl Drop for DateService { + fn drop(&mut self) { + // stop the timer update async task on drop. + self.handle.abort(); } } impl DateService { fn new() -> Self { - DateService(Rc::new(DateServiceInner::new())) - } + // shared date and timer for DateService and update async task. + let current = Rc::new(Cell::new((Date::new(), Instant::now()))); + let current_clone = current.clone(); + // spawn an async task sleep for 500 milli and update current date/timer in a loop. + // handle is used to stop the task on DateService drop. + let handle = actix_rt::spawn(async move { + #[cfg(test)] + let _notify = notify_on_drop::NotifyOnDrop::new(); - fn check_date(&self) { - if self.0.current.get().is_none() { - self.0.update(); + let mut interval = interval(Duration::from_millis(500)); + loop { + // TODO: consider use the Instant returned by interval stream + // (If it's accurate enough) + let _now = interval.tick().await; + let date = Date::new(); + let now = Instant::now(); + current_clone.set((date, now)); + } + }); - // periodic date update - let s = self.clone(); - actix_rt::spawn(sleep(Duration::from_millis(500)).then(move |_| { - s.0.reset(); - future::ready(()) - })); - } + DateService { current, handle } } fn now(&self) -> Instant { - self.check_date(); - self.0.current.get().unwrap().1 + self.current.get().1 } fn set_date(&self, mut f: F) { - self.check_date(); - f(&self.0.current.get().unwrap().0); + f(&self.current.get().0); + } +} + +// test drop behavior of DateService. only enabled in tests. +// TODO: move to a util module for testing all spawn handle drop style tasks. +#[cfg(test)] +mod notify_on_drop { + use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; + + static NOTIFY_DROPPED: AtomicBool = AtomicBool::new(false); + + pub(crate) fn is_dropped() -> bool { + NOTIFY_DROPPED.load(SeqCst) + } + + pub(crate) struct NotifyOnDrop; + + impl NotifyOnDrop { + pub(crate) fn new() -> Self { + NOTIFY_DROPPED.store(false, SeqCst); + NotifyOnDrop + } + } + + impl Drop for NotifyOnDrop { + fn drop(&mut self) { + NOTIFY_DROPPED.store(true, SeqCst); + } } } @@ -288,14 +311,52 @@ impl DateService { mod tests { use super::*; - // Test modifying the date from within the closure - // passed to `set_date` - #[test] - fn test_evil_date() { - let service = DateService::new(); - // Make sure that `check_date` doesn't try to spawn a task - service.0.update(); - service.set_date(|_| service.0.reset()); + use actix_rt::task::yield_now; + + #[actix_rt::test] + async fn test_date_service_update() { + let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None); + + yield_now().await; + + let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); + settings.set_date(&mut buf1); + let now1 = settings.now(); + + sleep_until(Instant::now() + Duration::from_secs(1)).await; + + let now2 = settings.now(); + let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); + settings.set_date(&mut buf2); + + assert!(now2 - now1 > Duration::from_millis(500)); + + assert_ne!(buf1, buf2); + + drop(settings); + assert!(notify_on_drop::is_dropped()); + } + + #[actix_rt::test] + async fn test_date_service_drop() { + let service = Rc::new(DateService::new()); + + // yield so date service have a chance to register the spawned timer update task. + yield_now().await; + + let clone1 = service.clone(); + let clone2 = service.clone(); + let clone3 = service.clone(); + + drop(clone1); + assert_eq!(false, notify_on_drop::is_dropped()); + drop(clone2); + assert_eq!(false, notify_on_drop::is_dropped()); + drop(clone3); + assert_eq!(false, notify_on_drop::is_dropped()); + + drop(service); + assert!(notify_on_drop::is_dropped()); } #[test] diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 4aeb9f120..634ca25e8 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -201,8 +201,8 @@ mod tests { use super::*; use crate::HttpMessage; - #[test] - fn test_http_request_chunked_payload_and_next_message() { + #[actix_rt::test] + async fn test_http_request_chunked_payload_and_next_message() { let mut codec = Codec::default(); let mut buf = BytesMut::from( diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 2932a7dce..69e69de42 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -529,8 +529,8 @@ mod tests { ); } - #[test] - fn test_camel_case() { + #[actix_rt::test] + async fn test_camel_case() { let mut bytes = BytesMut::with_capacity(2048); let mut head = RequestHead::default(); head.set_camel_case_headers(true); @@ -593,8 +593,8 @@ mod tests { assert!(data.contains("date: date\r\n")); } - #[test] - fn test_extra_headers() { + #[actix_rt::test] + async fn test_extra_headers() { let mut bytes = BytesMut::with_capacity(2048); let mut head = RequestHead::default(); @@ -627,8 +627,8 @@ mod tests { assert!(data.contains("date: date\r\n")); } - #[test] - fn test_no_content_length() { + #[actix_rt::test] + async fn test_no_content_length() { let mut bytes = BytesMut::with_capacity(2048); let mut res: Response<()> =