diff --git a/v2/crates/rvcsi-events/Cargo.toml b/v2/crates/rvcsi-events/Cargo.toml index 10a74caf..b374bceb 100644 --- a/v2/crates/rvcsi-events/Cargo.toml +++ b/v2/crates/rvcsi-events/Cargo.toml @@ -11,7 +11,6 @@ categories = ["science"] [dependencies] rvcsi-core = { path = "../rvcsi-core" } -rvcsi-dsp = { path = "../rvcsi-dsp" } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/v2/crates/rvcsi-events/src/detectors.rs b/v2/crates/rvcsi-events/src/detectors.rs new file mode 100644 index 00000000..43d87766 --- /dev/null +++ b/v2/crates/rvcsi-events/src/detectors.rs @@ -0,0 +1,779 @@ +//! Event detectors — small deterministic state machines over [`CsiWindow`]s. +//! +//! Every detector implements [`EventDetector`]; an [`crate::EventPipeline`] +//! runs each in turn on every closed window and concatenates the emitted +//! [`CsiEvent`]s. Detectors are intentionally tiny and side-effect-free: the +//! only state they keep is the bare minimum to debounce / hysteresis-gate, so +//! replaying the same window stream is fully deterministic. + +use rvcsi_core::{CsiEvent, CsiEventKind, CsiWindow, IdGenerator, WindowId}; + +/// Consumes [`CsiWindow`]s and emits [`CsiEvent`]s. +pub trait EventDetector { + /// Process one window; return any events it triggers (possibly empty). + fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec; + + /// Stable name for logging / inspection. + fn name(&self) -> &'static str; +} + +/// Build a single-window-evidence [`CsiEvent`] (validated in debug builds). +fn make_event( + ids: &IdGenerator, + kind: CsiEventKind, + window: &CsiWindow, + timestamp_ns: u64, + confidence: f32, +) -> CsiEvent { + let evidence: Vec = vec![window.window_id]; + let confidence = confidence.clamp(0.0, 1.0); + let event = CsiEvent::new( + ids.next_event(), + kind, + window.session_id, + window.source_id.clone(), + timestamp_ns, + confidence, + evidence, + ); + debug_assert!( + event.validate().is_ok(), + "detector produced an invalid CsiEvent: {:?}", + event.validate() + ); + event +} + +// --------------------------------------------------------------------------- +// PresenceDetector +// --------------------------------------------------------------------------- + +/// Tunables for [`PresenceDetector`]. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct PresenceConfig { + /// Enter `Present` when `presence_score >= on_threshold` for `enter_windows` windows. + pub on_threshold: f32, + /// Exit to `Absent` when `presence_score <= off_threshold` for `exit_windows` windows. + pub off_threshold: f32, + /// Consecutive high windows required to declare presence. + pub enter_windows: u32, + /// Consecutive low windows required to declare absence. + pub exit_windows: u32, +} + +impl Default for PresenceConfig { + fn default() -> Self { + // A truly quiet window has `presence_score ≈ 0.40` (the + // `WindowBuffer` logistic floor at zero motion), so `off_threshold` + // sits just above that and `on_threshold` well above it. + PresenceConfig { + on_threshold: 0.7, + off_threshold: 0.45, + enter_windows: 2, + exit_windows: 3, + } + } +} + +impl PresenceConfig { + /// Validate the relationship `on_threshold > off_threshold` and positivity. + fn checked(self) -> Self { + assert!( + self.on_threshold > self.off_threshold, + "PresenceConfig requires on_threshold > off_threshold" + ); + assert!(self.enter_windows >= 1 && self.exit_windows >= 1); + self + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum PresenceState { + Absent, + Present, +} + +/// Hysteresis state machine over [`CsiWindow::presence_score`]. +/// +/// Emits a single [`CsiEventKind::PresenceStarted`] when the score has been +/// high for `enter_windows` consecutive windows, and a single +/// [`CsiEventKind::PresenceEnded`] when it has been low for `exit_windows` +/// consecutive windows. A window that breaks the streak resets the counter. +#[derive(Debug, Clone)] +pub struct PresenceDetector { + cfg: PresenceConfig, + state: PresenceState, + streak: u32, +} + +impl Default for PresenceDetector { + fn default() -> Self { + Self::new() + } +} + +impl PresenceDetector { + /// New detector with default thresholds. + pub fn new() -> Self { + Self::with_config(PresenceConfig::default()) + } + + /// New detector with explicit config. + /// + /// # Panics + /// Panics if `on_threshold <= off_threshold` or a window count is zero. + pub fn with_config(cfg: PresenceConfig) -> Self { + PresenceDetector { + cfg: cfg.checked(), + state: PresenceState::Absent, + streak: 0, + } + } +} + +impl EventDetector for PresenceDetector { + fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec { + let p = window.presence_score; + match self.state { + PresenceState::Absent => { + if p >= self.cfg.on_threshold { + self.streak += 1; + if self.streak >= self.cfg.enter_windows { + self.state = PresenceState::Present; + self.streak = 0; + return vec![make_event( + ids, + CsiEventKind::PresenceStarted, + window, + window.end_ns, + p, + )]; + } + } else { + self.streak = 0; + } + } + PresenceState::Present => { + if p <= self.cfg.off_threshold { + self.streak += 1; + if self.streak >= self.cfg.exit_windows { + self.state = PresenceState::Absent; + self.streak = 0; + return vec![make_event( + ids, + CsiEventKind::PresenceEnded, + window, + window.end_ns, + (1.0 - p).clamp(0.0, 1.0), + )]; + } + } else { + self.streak = 0; + } + } + } + Vec::new() + } + + fn name(&self) -> &'static str { + "presence" + } +} + +// --------------------------------------------------------------------------- +// MotionDetector +// --------------------------------------------------------------------------- + +/// Tunables for [`MotionDetector`]. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct MotionConfig { + /// Rising-edge threshold on `motion_energy`. + pub on_threshold: f32, + /// Falling-edge threshold on `motion_energy` (`< on_threshold`). + pub off_threshold: f32, + /// Consecutive windows above/below the relevant threshold before firing. + pub debounce_windows: u32, +} + +impl Default for MotionConfig { + fn default() -> Self { + MotionConfig { + on_threshold: 0.05, + off_threshold: 0.02, + debounce_windows: 2, + } + } +} + +impl MotionConfig { + fn checked(self) -> Self { + assert!( + self.on_threshold > self.off_threshold, + "MotionConfig requires on_threshold > off_threshold" + ); + assert!(self.debounce_windows >= 1); + self + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum MotionState { + Settled, + Moving, +} + +/// State machine over [`CsiWindow::motion_energy`]. +/// +/// Emits [`CsiEventKind::MotionDetected`] on a debounced rising edge and +/// [`CsiEventKind::MotionSettled`] on a debounced falling edge. +#[derive(Debug, Clone)] +pub struct MotionDetector { + cfg: MotionConfig, + state: MotionState, + streak: u32, +} + +impl Default for MotionDetector { + fn default() -> Self { + Self::new() + } +} + +impl MotionDetector { + /// New detector with default thresholds. + pub fn new() -> Self { + Self::with_config(MotionConfig::default()) + } + + /// New detector with explicit config. + /// + /// # Panics + /// Panics if `on_threshold <= off_threshold` or `debounce_windows == 0`. + pub fn with_config(cfg: MotionConfig) -> Self { + MotionDetector { + cfg: cfg.checked(), + state: MotionState::Settled, + streak: 0, + } + } +} + +impl EventDetector for MotionDetector { + fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec { + let m = window.motion_energy; + match self.state { + MotionState::Settled => { + if m > self.cfg.on_threshold { + self.streak += 1; + if self.streak >= self.cfg.debounce_windows { + self.state = MotionState::Moving; + self.streak = 0; + let conf = (m / (2.0 * self.cfg.on_threshold)).clamp(0.0, 1.0); + return vec![make_event( + ids, + CsiEventKind::MotionDetected, + window, + window.end_ns, + conf, + )]; + } + } else { + self.streak = 0; + } + } + MotionState::Moving => { + if m < self.cfg.off_threshold { + self.streak += 1; + if self.streak >= self.cfg.debounce_windows { + self.state = MotionState::Settled; + self.streak = 0; + let rise = (m / (2.0 * self.cfg.on_threshold)).clamp(0.0, 1.0); + return vec![make_event( + ids, + CsiEventKind::MotionSettled, + window, + window.end_ns, + (1.0 - rise).clamp(0.0, 1.0), + )]; + } + } else { + self.streak = 0; + } + } + } + Vec::new() + } + + fn name(&self) -> &'static str { + "motion" + } +} + +// --------------------------------------------------------------------------- +// QualityDetector +// --------------------------------------------------------------------------- + +/// Tunables for [`QualityDetector`]. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct QualityConfig { + /// `quality_score` below this (debounced) raises [`CsiEventKind::SignalQualityDropped`]. + pub drop_threshold: f32, + /// Consecutive low windows before [`CsiEventKind::SignalQualityDropped`] fires. + pub debounce_windows: u32, + /// Consecutive low windows (counting from the first low one) before + /// [`CsiEventKind::CalibrationRequired`] also fires — once per low stretch. + pub calib_windows: u32, +} + +impl Default for QualityConfig { + fn default() -> Self { + QualityConfig { + drop_threshold: 0.4, + debounce_windows: 2, + calib_windows: 4, + } + } +} + +impl QualityConfig { + fn checked(self) -> Self { + assert!(self.debounce_windows >= 1 && self.calib_windows >= 1); + self + } +} + +/// State machine over [`CsiWindow::quality_score`]. +/// +/// While `quality_score` stays below `drop_threshold` it counts a low streak. +/// At `debounce_windows` it emits [`CsiEventKind::SignalQualityDropped`]; at +/// `calib_windows` it additionally emits [`CsiEventKind::CalibrationRequired`] +/// (only once until quality recovers). Any window at or above `drop_threshold` +/// resets the streak and re-arms both events. +#[derive(Debug, Clone)] +pub struct QualityDetector { + cfg: QualityConfig, + low_streak: u32, + dropped_emitted: bool, + calib_emitted: bool, +} + +impl Default for QualityDetector { + fn default() -> Self { + Self::new() + } +} + +impl QualityDetector { + /// New detector with default thresholds. + pub fn new() -> Self { + Self::with_config(QualityConfig::default()) + } + + /// New detector with explicit config. + pub fn with_config(cfg: QualityConfig) -> Self { + QualityDetector { + cfg: cfg.checked(), + low_streak: 0, + dropped_emitted: false, + calib_emitted: false, + } + } +} + +impl EventDetector for QualityDetector { + fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec { + let q = window.quality_score; + if q < self.cfg.drop_threshold { + self.low_streak += 1; + let mut out = Vec::new(); + if !self.dropped_emitted && self.low_streak >= self.cfg.debounce_windows { + self.dropped_emitted = true; + out.push(make_event( + ids, + CsiEventKind::SignalQualityDropped, + window, + window.end_ns, + (1.0 - q).clamp(0.0, 1.0), + )); + } + if !self.calib_emitted && self.low_streak >= self.cfg.calib_windows { + self.calib_emitted = true; + out.push(make_event( + ids, + CsiEventKind::CalibrationRequired, + window, + window.end_ns, + (1.0 - q).clamp(0.0, 1.0), + )); + } + out + } else { + self.low_streak = 0; + self.dropped_emitted = false; + self.calib_emitted = false; + Vec::new() + } + } + + fn name(&self) -> &'static str { + "quality" + } +} + +// --------------------------------------------------------------------------- +// BaselineDriftDetector +// --------------------------------------------------------------------------- + +/// Tunables for [`BaselineDriftDetector`]. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct BaselineDriftConfig { + /// Per-window drift `||mean_amplitude - baseline||_2 / sqrt(n)` above this + /// for `drift_windows` windows in a row triggers [`CsiEventKind::BaselineChanged`]. + pub drift_threshold: f32, + /// Consecutive drifting windows before [`CsiEventKind::BaselineChanged`] fires. + pub drift_windows: u32, + /// A single window with drift above this (much larger) value triggers + /// [`CsiEventKind::AnomalyDetected`]. + pub anomaly_threshold: f32, + /// EWMA smoothing factor for the running baseline (`baseline = a*current + (1-a)*baseline`). + pub ewma_alpha: f32, +} + +impl Default for BaselineDriftConfig { + fn default() -> Self { + BaselineDriftConfig { + drift_threshold: 0.15, + drift_windows: 3, + anomaly_threshold: 1.0, + ewma_alpha: 0.1, + } + } +} + +impl BaselineDriftConfig { + fn checked(self) -> Self { + assert!(self.drift_windows >= 1); + assert!(self.anomaly_threshold > self.drift_threshold); + assert!(self.ewma_alpha > 0.0 && self.ewma_alpha <= 1.0); + self + } +} + +/// Tracks an EWMA baseline of `mean_amplitude` and flags sustained drift / +/// single-window anomalies. +#[derive(Debug, Clone)] +pub struct BaselineDriftDetector { + cfg: BaselineDriftConfig, + baseline: Option>, + drift_streak: u32, +} + +impl Default for BaselineDriftDetector { + fn default() -> Self { + Self::new() + } +} + +impl BaselineDriftDetector { + /// New detector with default thresholds. + pub fn new() -> Self { + Self::with_config(BaselineDriftConfig::default()) + } + + /// New detector with explicit config. + pub fn with_config(cfg: BaselineDriftConfig) -> Self { + BaselineDriftDetector { + cfg: cfg.checked(), + baseline: None, + drift_streak: 0, + } + } + + /// L2 distance between two equal-length vectors, normalized by `sqrt(len)`. + fn rms_distance(a: &[f32], b: &[f32]) -> f32 { + let n = a.len(); + if n == 0 { + return 0.0; + } + let mut sq = 0.0f64; + for k in 0..n { + let d = (a[k] - b[k]) as f64; + sq += d * d; + } + (sq.sqrt() / (n as f64).sqrt()) as f32 + } + + fn update_ewma(&mut self, current: &[f32]) { + match &mut self.baseline { + None => self.baseline = Some(current.to_vec()), + Some(b) if b.len() != current.len() => { + self.baseline = Some(current.to_vec()); + } + Some(b) => { + let a = self.cfg.ewma_alpha; + for k in 0..b.len() { + b[k] = a * current[k] + (1.0 - a) * b[k]; + } + } + } + } +} + +impl EventDetector for BaselineDriftDetector { + fn on_window(&mut self, window: &CsiWindow, ids: &IdGenerator) -> Vec { + let current = &window.mean_amplitude; + let baseline = match &self.baseline { + None => { + // First window establishes the baseline; no drift possible yet. + self.baseline = Some(current.clone()); + return Vec::new(); + } + Some(b) if b.len() != current.len() => { + // Subcarrier count changed — reset and skip this window. + self.baseline = Some(current.clone()); + self.drift_streak = 0; + return Vec::new(); + } + Some(b) => b.clone(), + }; + + let drift = Self::rms_distance(current, &baseline); + let mut out = Vec::new(); + + if drift > self.cfg.anomaly_threshold { + out.push(make_event( + ids, + CsiEventKind::AnomalyDetected, + window, + window.end_ns, + (drift / (2.0 * self.cfg.anomaly_threshold)).clamp(0.0, 1.0), + )); + } + + if drift > self.cfg.drift_threshold { + self.drift_streak += 1; + if self.drift_streak >= self.cfg.drift_windows { + out.push(make_event( + ids, + CsiEventKind::BaselineChanged, + window, + window.end_ns, + (drift / (2.0 * self.cfg.drift_threshold)).clamp(0.0, 1.0), + )); + self.drift_streak = 0; + // Hard-reset the baseline to the new operating point. + self.baseline = Some(current.clone()); + return out; + } + } else { + self.drift_streak = 0; + } + + self.update_ewma(current); + out + } + + fn name(&self) -> &'static str { + "baseline_drift" + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{SessionId, SourceId}; + + fn window(window_id: u64, end_ns: u64, motion: f32, presence: f32, quality: f32) -> CsiWindow { + let end_ns = end_ns.max(1); + CsiWindow { + window_id: WindowId(window_id), + session_id: SessionId(0), + source_id: SourceId::from("s"), + start_ns: end_ns.saturating_sub(1_000), + end_ns, + frame_count: 8, + mean_amplitude: vec![1.0; 8], + phase_variance: vec![0.0; 8], + motion_energy: motion, + presence_score: presence, + quality_score: quality, + } + } + + fn window_amp(window_id: u64, end_ns: u64, amp: Vec) -> CsiWindow { + let n = amp.len(); + CsiWindow { + window_id: WindowId(window_id), + session_id: SessionId(0), + source_id: SourceId::from("s"), + start_ns: 0, + end_ns: end_ns.max(1), + frame_count: 8, + mean_amplitude: amp, + phase_variance: vec![0.0; n], + motion_energy: 0.0, + presence_score: 0.0, + quality_score: 0.9, + } + } + + #[test] + fn presence_detector_emits_started_then_ended() { + let g = IdGenerator::new(); + let mut d = PresenceDetector::with_config(PresenceConfig { + on_threshold: 0.6, + off_threshold: 0.35, + enter_windows: 2, + exit_windows: 3, + }); + let mut events = Vec::new(); + // Low windows. + for k in 0..3u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.05, 0.9), &g)); + } + assert!(events.is_empty()); + // High run -> PresenceStarted after the 2nd one. + for k in 3..8u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.5, 0.95, 0.9), &g)); + } + // Low run -> PresenceEnded after the 3rd low one. + for k in 8..13u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.05, 0.9), &g)); + } + assert_eq!(events.len(), 2, "events = {events:?}"); + assert_eq!(events[0].kind, CsiEventKind::PresenceStarted); + assert_eq!(events[1].kind, CsiEventKind::PresenceEnded); + for e in &events { + assert!(e.validate().is_ok()); + assert!(!e.evidence_window_ids.is_empty()); + assert!((0.0..=1.0).contains(&e.confidence)); + } + } + + #[test] + fn presence_detector_streak_reset() { + let g = IdGenerator::new(); + let mut d = PresenceDetector::new(); + // 1 high, 1 low (resets), then enough highs. + assert!(d.on_window(&window(0, 1_000, 0.0, 0.95, 0.9), &g).is_empty()); + assert!(d.on_window(&window(1, 2_000, 0.0, 0.05, 0.9), &g).is_empty()); + assert!(d.on_window(&window(2, 3_000, 0.0, 0.95, 0.9), &g).is_empty()); + let e = d.on_window(&window(3, 4_000, 0.0, 0.95, 0.9), &g); + assert_eq!(e.len(), 1); + assert_eq!(e[0].kind, CsiEventKind::PresenceStarted); + } + + #[test] + fn motion_detector_emits_detected_then_settled() { + let g = IdGenerator::new(); + let mut d = MotionDetector::with_config(MotionConfig { + on_threshold: 0.05, + off_threshold: 0.02, + debounce_windows: 2, + }); + let mut events = Vec::new(); + for k in 0..2u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.001, 0.0, 0.9), &g)); + } + for k in 2..6u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.3, 0.0, 0.9), &g)); + } + for k in 6..10u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.9), &g)); + } + assert_eq!(events.len(), 2, "events = {events:?}"); + assert_eq!(events[0].kind, CsiEventKind::MotionDetected); + assert_eq!(events[1].kind, CsiEventKind::MotionSettled); + for e in &events { + assert!(e.validate().is_ok()); + } + } + + #[test] + fn quality_detector_drop_then_calibration_once() { + let g = IdGenerator::new(); + let mut d = QualityDetector::with_config(QualityConfig { + drop_threshold: 0.4, + debounce_windows: 2, + calib_windows: 4, + }); + let mut events = Vec::new(); + // Good window first. + events.extend(d.on_window(&window(0, 1_000, 0.0, 0.0, 0.9), &g)); + // Low run. + for k in 1..8u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.1), &g)); + } + let dropped = events + .iter() + .filter(|e| e.kind == CsiEventKind::SignalQualityDropped) + .count(); + let calib = events + .iter() + .filter(|e| e.kind == CsiEventKind::CalibrationRequired) + .count(); + assert_eq!(dropped, 1, "events = {events:?}"); + assert_eq!(calib, 1, "events = {events:?}"); + for e in &events { + assert!(e.validate().is_ok()); + } + // Recover and drop again -> re-armed. + events.clear(); + events.extend(d.on_window(&window(8, 9_000, 0.0, 0.0, 0.95), &g)); + for k in 9..14u64 { + events.extend(d.on_window(&window(k, (k + 1) * 1_000, 0.0, 0.0, 0.1), &g)); + } + assert_eq!( + events + .iter() + .filter(|e| e.kind == CsiEventKind::SignalQualityDropped) + .count(), + 1 + ); + } + + #[test] + fn baseline_drift_stable_then_shift_then_anomaly() { + let g = IdGenerator::new(); + let mut d = BaselineDriftDetector::with_config(BaselineDriftConfig { + drift_threshold: 0.15, + drift_windows: 3, + anomaly_threshold: 1.0, + ewma_alpha: 0.1, + }); + // Stable baseline -> no events. + let mut events = Vec::new(); + for k in 0..5u64 { + events.extend(d.on_window(&window_amp(k, (k + 1) * 1_000, vec![1.0; 8]), &g)); + } + assert!(events.is_empty(), "events = {events:?}"); + // Sustained shift -> BaselineChanged. + for k in 5..10u64 { + events.extend(d.on_window(&window_amp(k, (k + 1) * 1_000, vec![1.5; 8]), &g)); + } + assert!( + events.iter().any(|e| e.kind == CsiEventKind::BaselineChanged), + "events = {events:?}" + ); + // Single huge spike -> AnomalyDetected. + events.clear(); + events.extend(d.on_window(&window_amp(10, 11_000, vec![50.0; 8]), &g)); + assert!( + events.iter().any(|e| e.kind == CsiEventKind::AnomalyDetected), + "events = {events:?}" + ); + for e in &events { + assert!(e.validate().is_ok()); + } + } + + #[test] + fn baseline_drift_resets_on_subcarrier_change() { + let g = IdGenerator::new(); + let mut d = BaselineDriftDetector::new(); + assert!(d.on_window(&window_amp(0, 1_000, vec![1.0; 8]), &g).is_empty()); + // Different length -> reset, no event. + assert!(d.on_window(&window_amp(1, 2_000, vec![1.0; 16]), &g).is_empty()); + assert!(d.on_window(&window_amp(2, 3_000, vec![1.0; 16]), &g).is_empty()); + } +} diff --git a/v2/crates/rvcsi-events/src/lib.rs b/v2/crates/rvcsi-events/src/lib.rs index 67811f8a..bdc74204 100644 --- a/v2/crates/rvcsi-events/src/lib.rs +++ b/v2/crates/rvcsi-events/src/lib.rs @@ -1,7 +1,37 @@ -//! # rvCSI events (skeleton — implemented by the rvcsi-events swarm agent) +//! # rvCSI events — window aggregation + semantic event extraction (ADR-095 FR5) //! -//! Window aggregation and semantic event extraction (ADR-095 FR5). -#![forbid(unsafe_code)] +//! This crate turns a stream of validated [`rvcsi_core::CsiFrame`]s into +//! [`rvcsi_core::CsiWindow`]s and then into [`rvcsi_core::CsiEvent`]s. +//! +//! The pipeline has three layers: +//! +//! 1. [`WindowBuffer`] — buffers exposable frames from one +//! `(session_id, source_id)` and emits a [`rvcsi_core::CsiWindow`] when a +//! frame-count or duration threshold is hit. Per-subcarrier statistics +//! (`mean_amplitude`, `phase_variance`) and the scalar `motion_energy`, +//! `presence_score` and `quality_score` are computed here. +//! 2. [`EventDetector`] implementations — small, deterministic state machines +//! that consume windows and emit events: +//! [`PresenceDetector`], [`MotionDetector`], [`QualityDetector`] and +//! [`BaselineDriftDetector`]. +//! 3. [`EventPipeline`] — wires a [`WindowBuffer`] and a set of detectors +//! together and owns an [`rvcsi_core::IdGenerator`]. +//! +//! Determinism: feeding the same frame stream through an [`EventPipeline`] +//! always produces the same event list (modulo the ids, which are minted in a +//! deterministic order). All "noise" in the tests comes from a tiny LCG, never +//! from `rand`. -/// Placeholder so the crate compiles before the agent fills it in. -pub fn __rvcsi_events_placeholder() {} +#![forbid(unsafe_code)] +#![warn(missing_docs)] + +mod detectors; +mod pipeline; +mod window_buffer; + +pub use detectors::{ + BaselineDriftConfig, BaselineDriftDetector, EventDetector, MotionConfig, MotionDetector, + PresenceConfig, PresenceDetector, QualityConfig, QualityDetector, +}; +pub use pipeline::EventPipeline; +pub use window_buffer::{WindowBuffer, WindowBufferConfig}; diff --git a/v2/crates/rvcsi-events/src/pipeline.rs b/v2/crates/rvcsi-events/src/pipeline.rs new file mode 100644 index 00000000..f85f7bce --- /dev/null +++ b/v2/crates/rvcsi-events/src/pipeline.rs @@ -0,0 +1,260 @@ +//! [`EventPipeline`] — wires a [`WindowBuffer`] to a set of [`EventDetector`]s. +//! +//! A pipeline owns its own [`IdGenerator`] so window/event ids are minted in a +//! deterministic order. Feed it frames with [`EventPipeline::process_frame`] +//! and drain the tail with [`EventPipeline::flush`]. + +use rvcsi_core::{CsiEvent, CsiFrame, CsiWindow, IdGenerator, SessionId, SourceId}; + +use crate::detectors::{ + BaselineDriftDetector, EventDetector, MotionDetector, PresenceDetector, QualityDetector, +}; +use crate::window_buffer::{WindowBuffer, WindowBufferConfig}; + +/// How many recently-closed windows the pipeline keeps for inspection. +const RECENT_WINDOW_CAP: usize = 32; + +/// Aggregates frames into windows and runs detectors over them. +pub struct EventPipeline { + buffer: WindowBuffer, + detectors: Vec>, + ids: IdGenerator, + recent: Vec, +} + +impl core::fmt::Debug for EventPipeline { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("EventPipeline") + .field("detectors", &self.detectors.iter().map(|d| d.name()).collect::>()) + .field("pending_frame_count", &self.buffer.pending_frame_count()) + .field("recent_windows", &self.recent.len()) + .finish() + } +} + +impl EventPipeline { + /// New pipeline with the given window-buffer config and no detectors. + /// + /// Add detectors with [`EventPipeline::add_detector`]. + pub fn new(session_id: SessionId, source_id: SourceId, buffer_cfg: WindowBufferConfig) -> Self { + EventPipeline { + buffer: WindowBuffer::with_config(session_id, source_id, buffer_cfg), + detectors: Vec::new(), + ids: IdGenerator::new(), + recent: Vec::new(), + } + } + + /// New pipeline with the four default detectors and a 16-frame / 1-second + /// window buffer. + pub fn with_defaults(session_id: SessionId, source_id: SourceId) -> Self { + let mut p = Self::new( + session_id, + source_id, + WindowBufferConfig::new(16, 1_000_000_000), + ); + p.add_detector(Box::new(PresenceDetector::new())); + p.add_detector(Box::new(MotionDetector::new())); + p.add_detector(Box::new(QualityDetector::new())); + p.add_detector(Box::new(BaselineDriftDetector::new())); + p + } + + /// Append a detector. Detectors run in insertion order on every window. + pub fn add_detector(&mut self, detector: Box) { + self.detectors.push(detector); + } + + /// Names of the registered detectors, in order. + pub fn detector_names(&self) -> Vec<&'static str> { + self.detectors.iter().map(|d| d.name()).collect() + } + + /// The most-recently-closed windows (newest last), capped at 32. + pub fn recent_windows(&self) -> &[CsiWindow] { + &self.recent + } + + /// Frames buffered but not yet emitted as a window. + pub fn pending_frame_count(&self) -> usize { + self.buffer.pending_frame_count() + } + + /// Push one frame; if it closes a window, run every detector on that window + /// and return their concatenated events. Otherwise return an empty `Vec`. + pub fn process_frame(&mut self, frame: &CsiFrame) -> Vec { + match self.buffer.push(frame, &self.ids) { + Some(window) => self.run_detectors(window), + None => Vec::new(), + } + } + + /// Close whatever frames remain in the buffer into a final window and run + /// detectors on it. Returns an empty `Vec` if the buffer was empty. + pub fn flush(&mut self) -> Vec { + match self.buffer.flush(&self.ids) { + Some(window) => self.run_detectors(window), + None => Vec::new(), + } + } + + fn run_detectors(&mut self, window: CsiWindow) -> Vec { + let mut events = Vec::new(); + for d in &mut self.detectors { + events.extend(d.on_window(&window, &self.ids)); + } + debug_assert!(events.iter().all(|e| e.validate().is_ok())); + self.recent.push(window); + if self.recent.len() > RECENT_WINDOW_CAP { + let overflow = self.recent.len() - RECENT_WINDOW_CAP; + self.recent.drain(0..overflow); + } + events + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{AdapterKind, CsiEventKind, FrameId, ValidationStatus}; + + /// Deterministic LCG (Numerical Recipes constants) -> `[0.0, 1.0)`. + struct Lcg(u64); + impl Lcg { + fn new(seed: u64) -> Self { + Lcg(seed) + } + fn next_unit(&mut self) -> f32 { + self.0 = self.0.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + // top 24 bits -> [0,1) + ((self.0 >> 40) as f32) / (1u64 << 24) as f32 + } + } + + fn accepted_frame(frame_id: u64, ts: u64, amp: &[f32], quality: f32) -> CsiFrame { + let i: Vec = amp.to_vec(); + let q: Vec = vec![0.0; amp.len()]; + let mut f = CsiFrame::from_iq( + FrameId(frame_id), + SessionId(1), + SourceId::from("dev"), + AdapterKind::Synthetic, + ts, + 6, + 20, + i, + q, + ); + f.validation = ValidationStatus::Accepted; + f.quality_score = quality; + f + } + + /// Build a quiet / active / quiet frame stream with monotonic 50 ms + /// timestamps. Long enough that the default 16-frame window buffer yields + /// enough windows for the detectors' debounce / hysteresis chains. + fn synthetic_stream() -> Vec { + let mut rng = Lcg::new(0xC0FFEE); + let mut frames = Vec::new(); + let dt = 50_000_000u64; // 50 ms + let quiet_a = 30u64; + let active = 60u64; + let quiet_b = 60u64; + let total = quiet_a + active + quiet_b; + for k in 0..total { + let ts = k * dt; + let is_active = (quiet_a..quiet_a + active).contains(&k); + let amp: Vec = (0..32) + .map(|_| { + if is_active { + // Large per-frame jitter. + 1.0 + (rng.next_unit() - 0.5) * 4.0 + } else { + // Tiny deterministic noise around 1.0. + 1.0 + (rng.next_unit() - 0.5) * 0.001 + } + }) + .collect(); + frames.push(accepted_frame(k, ts, &, 0.9)); + } + frames + } + + fn run_stream(frames: &[CsiFrame]) -> Vec { + let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev")); + let mut events = Vec::new(); + for f in frames { + events.extend(p.process_frame(f)); + } + events.extend(p.flush()); + events + } + + #[test] + fn pipeline_detects_motion_and_presence_and_settles() { + let frames = synthetic_stream(); + let events = run_stream(&frames); + assert!(!events.is_empty(), "expected some events"); + for e in &events { + assert!(e.validate().is_ok(), "invalid event: {e:?}"); + } + let kinds: Vec = events.iter().map(|e| e.kind).collect(); + assert!(kinds.contains(&CsiEventKind::MotionDetected), "kinds = {kinds:?}"); + assert!(kinds.contains(&CsiEventKind::PresenceStarted), "kinds = {kinds:?}"); + assert!(kinds.contains(&CsiEventKind::MotionSettled), "kinds = {kinds:?}"); + assert!(kinds.contains(&CsiEventKind::PresenceEnded), "kinds = {kinds:?}"); + + // MotionDetected should come before MotionSettled. + let det = events.iter().position(|e| e.kind == CsiEventKind::MotionDetected).unwrap(); + let set = events.iter().position(|e| e.kind == CsiEventKind::MotionSettled).unwrap(); + assert!(det < set); + let start = events.iter().position(|e| e.kind == CsiEventKind::PresenceStarted).unwrap(); + let end = events.iter().position(|e| e.kind == CsiEventKind::PresenceEnded).unwrap(); + assert!(start < end); + } + + #[test] + fn pipeline_is_deterministic() { + let frames = synthetic_stream(); + let a = run_stream(&frames); + let b = run_stream(&frames); + assert_eq!(a, b, "same stream must yield identical events"); + } + + #[test] + fn pipeline_recent_windows_and_pending_count() { + let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev")); + let amp = vec![1.0f32; 32]; + // Two windows worth of frames (16 each at the 16-frame cap). + for k in 0..16u64 { + p.process_frame(&accepted_frame(k, k * 10_000, &, 0.9)); + } + assert_eq!(p.recent_windows().len(), 1); + assert_eq!(p.pending_frame_count(), 0); + p.process_frame(&accepted_frame(16, 200_000, &, 0.9)); + assert_eq!(p.pending_frame_count(), 1); + let leftover = p.flush(); + let _ = leftover; + assert_eq!(p.recent_windows().len(), 2); + assert_eq!(p.pending_frame_count(), 0); + } + + #[test] + fn pipeline_skips_foreign_frames() { + let mut p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev")); + let amp = vec![1.0f32; 8]; + let mut foreign = accepted_frame(0, 0, &, 0.9); + foreign.session_id = SessionId(99); + assert!(p.process_frame(&foreign).is_empty()); + assert_eq!(p.pending_frame_count(), 0); + } + + #[test] + fn detector_names_in_order() { + let p = EventPipeline::with_defaults(SessionId(1), SourceId::from("dev")); + assert_eq!( + p.detector_names(), + vec!["presence", "motion", "quality", "baseline_drift"] + ); + } +} diff --git a/v2/crates/rvcsi-events/src/window_buffer.rs b/v2/crates/rvcsi-events/src/window_buffer.rs new file mode 100644 index 00000000..fdf6ca98 --- /dev/null +++ b/v2/crates/rvcsi-events/src/window_buffer.rs @@ -0,0 +1,392 @@ +//! [`WindowBuffer`] — aggregates exposable [`CsiFrame`]s into [`CsiWindow`]s. + +use rvcsi_core::{CsiFrame, CsiWindow, IdGenerator, SessionId, SourceId}; + +/// Tunables for a [`WindowBuffer`]. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct WindowBufferConfig { + /// Close the window once this many frames have been buffered. Must be `>= 2`. + pub max_frames: usize, + /// Close the window once `last_ts - first_ts >= max_duration_ns`. + pub max_duration_ns: u64, + /// Centre of the logistic that maps `motion_energy` to `presence_score`. + pub presence_threshold: f32, +} + +impl WindowBufferConfig { + /// Build a config with a default `presence_threshold` of `0.05`. + /// + /// # Panics + /// Panics if `max_frames < 2`. + pub fn new(max_frames: usize, max_duration_ns: u64) -> Self { + assert!(max_frames >= 2, "WindowBuffer max_frames must be >= 2"); + WindowBufferConfig { + max_frames, + max_duration_ns, + presence_threshold: 0.05, + } + } + + /// Builder-style setter for [`WindowBufferConfig::presence_threshold`]. + pub fn with_presence_threshold(mut self, t: f32) -> Self { + self.presence_threshold = t; + self + } +} + +/// Buffers frames from one `(session_id, source_id)` and emits windows. +/// +/// Use [`WindowBuffer::push`] for each incoming frame; it returns `Some(window)` +/// on the frame that closes a window (that frame being the last in the window). +/// Call [`WindowBuffer::flush`] at end-of-stream to drain whatever is buffered. +#[derive(Debug, Clone)] +pub struct WindowBuffer { + session_id: SessionId, + source_id: SourceId, + cfg: WindowBufferConfig, + /// Subcarrier count fixed by the first buffered frame of the current window. + subcarrier_count: Option, + /// Buffered `amplitude` vectors (one per accepted frame). + amplitudes: Vec>, + /// Buffered `phase` vectors (one per accepted frame). + phases: Vec>, + /// Buffered `quality_score`s. + qualities: Vec, + /// Buffered timestamps (ns). + timestamps: Vec, +} + +impl WindowBuffer { + /// Create a buffer for `session_id` / `source_id` with the given thresholds. + /// + /// # Panics + /// Panics if `max_frames < 2`. + pub fn new( + session_id: SessionId, + source_id: SourceId, + max_frames: usize, + max_duration_ns: u64, + ) -> Self { + Self::with_config( + session_id, + source_id, + WindowBufferConfig::new(max_frames, max_duration_ns), + ) + } + + /// Create a buffer from a [`WindowBufferConfig`]. + /// + /// # Panics + /// Panics if `cfg.max_frames < 2`. + pub fn with_config(session_id: SessionId, source_id: SourceId, cfg: WindowBufferConfig) -> Self { + assert!(cfg.max_frames >= 2, "WindowBuffer max_frames must be >= 2"); + WindowBuffer { + session_id, + source_id, + cfg, + subcarrier_count: None, + amplitudes: Vec::new(), + phases: Vec::new(), + qualities: Vec::new(), + timestamps: Vec::new(), + } + } + + /// Number of frames currently buffered (not yet emitted as a window). + pub fn pending_frame_count(&self) -> usize { + self.amplitudes.len() + } + + /// Add a frame; returns `Some(window)` if this frame closed a window. + /// + /// Frames are skipped (returning `None`, not buffered) when: + /// * `!frame.is_exposable()`, + /// * the frame's `session_id` / `source_id` don't match the buffer's, or + /// * the frame's `subcarrier_count` differs from the first buffered frame's. + pub fn push(&mut self, frame: &CsiFrame, ids: &IdGenerator) -> Option { + if !frame.is_exposable() { + return None; + } + if frame.session_id != self.session_id || frame.source_id != self.source_id { + return None; + } + match self.subcarrier_count { + None => self.subcarrier_count = Some(frame.subcarrier_count), + Some(n) if n != frame.subcarrier_count => return None, + Some(_) => {} + } + + self.amplitudes.push(frame.amplitude.clone()); + self.phases.push(frame.phase.clone()); + self.qualities.push(frame.quality_score); + self.timestamps.push(frame.timestamp_ns); + + let reached_count = self.amplitudes.len() >= self.cfg.max_frames; + let reached_duration = match (self.timestamps.first(), self.timestamps.last()) { + (Some(&first), Some(&last)) => last.saturating_sub(first) >= self.cfg.max_duration_ns, + _ => false, + }; + if reached_count || reached_duration { + Some(self.close(ids)) + } else { + None + } + } + + /// Drain whatever is buffered (>= 1 frame) into a final window. + /// + /// Returns `None` when the buffer is empty. + pub fn flush(&mut self, ids: &IdGenerator) -> Option { + if self.amplitudes.is_empty() { + None + } else { + Some(self.close(ids)) + } + } + + /// Build the [`CsiWindow`] from the buffered frames and reset the buffer. + fn close(&mut self, ids: &IdGenerator) -> CsiWindow { + let frame_count = self.amplitudes.len(); + debug_assert!(frame_count >= 1, "close() called on an empty buffer"); + let n = self.subcarrier_count.unwrap_or(0) as usize; + + // Per-subcarrier mean amplitude. + let mut mean_amplitude = vec![0.0f32; n]; + for amp in &self.amplitudes { + for (slot, a) in mean_amplitude.iter_mut().zip(amp.iter()) { + *slot += *a; + } + } + for v in &mut mean_amplitude { + *v /= frame_count as f32; + } + + // Per-subcarrier population variance of the phase. + let mut phase_mean = vec![0.0f32; n]; + for ph in &self.phases { + for (slot, p) in phase_mean.iter_mut().zip(ph.iter()) { + *slot += *p; + } + } + for v in &mut phase_mean { + *v /= frame_count as f32; + } + let mut phase_variance = vec![0.0f32; n]; + for ph in &self.phases { + for k in 0..n { + let d = ph.get(k).copied().unwrap_or(0.0) - phase_mean[k]; + phase_variance[k] += d * d; + } + } + for v in &mut phase_variance { + *v /= frame_count as f32; + } + + // Motion energy: mean over consecutive pairs of ||amp_b - amp_a||_2 / sqrt(n). + let motion_energy = if frame_count < 2 || n == 0 { + 0.0 + } else { + let mut acc = 0.0f64; + for w in self.amplitudes.windows(2) { + let (a, b) = (&w[0], &w[1]); + let mut sq = 0.0f64; + for k in 0..n { + let d = (b.get(k).copied().unwrap_or(0.0) - a.get(k).copied().unwrap_or(0.0)) + as f64; + sq += d * d; + } + acc += sq.sqrt() / (n as f64).sqrt(); + } + (acc / (frame_count - 1) as f64) as f32 + }; + let motion_energy = if motion_energy.is_finite() && motion_energy >= 0.0 { + motion_energy + } else { + 0.0 + }; + + // Presence score: logistic of (motion_energy - threshold). + let z = (motion_energy - self.cfg.presence_threshold) * 8.0; + let presence_score = (1.0 / (1.0 + (-z).exp())).clamp(0.0, 1.0); + + // Quality score: mean of frame quality scores. + let quality_sum: f32 = self.qualities.iter().sum(); + let quality_score = (quality_sum / frame_count as f32).clamp(0.0, 1.0); + + let start_ns = *self.timestamps.first().unwrap(); + let raw_end = *self.timestamps.last().unwrap(); + // Edge case: a single-frame window would have start_ns == end_ns, which + // CsiWindow::validate() rejects. Bump the end by 1 ns so it stays valid. + let end_ns = if raw_end > start_ns { raw_end } else { start_ns + 1 }; + + let window = CsiWindow { + window_id: ids.next_window(), + session_id: self.session_id, + source_id: self.source_id.clone(), + start_ns, + end_ns, + frame_count: frame_count as u32, + mean_amplitude, + phase_variance, + motion_energy, + presence_score, + quality_score, + }; + debug_assert!( + window.validate().is_ok(), + "WindowBuffer produced an invalid CsiWindow: {:?}", + window.validate() + ); + + // Reset for the next window. + self.subcarrier_count = None; + self.amplitudes.clear(); + self.phases.clear(); + self.qualities.clear(); + self.timestamps.clear(); + + window + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{AdapterKind, FrameId, ValidationStatus}; + + fn frame( + session: u64, + source: &str, + frame_id: u64, + ts: u64, + amp: &[f32], + quality: f32, + ) -> CsiFrame { + // Build I/Q so that amplitude == amp and phase == 0. + let i: Vec = amp.to_vec(); + let q: Vec = vec![0.0; amp.len()]; + let mut f = CsiFrame::from_iq( + FrameId(frame_id), + SessionId(session), + SourceId::from(source), + AdapterKind::Synthetic, + ts, + 6, + 20, + i, + q, + ); + f.validation = ValidationStatus::Accepted; + f.quality_score = quality; + f + } + + #[test] + fn closes_after_exactly_max_frames() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 4, u64::MAX); + let amp = [1.0f32, 1.0, 1.0]; + assert!(buf.push(&frame(0, "s", 0, 0, &, 0.9), &g).is_none()); + assert!(buf.push(&frame(0, "s", 1, 10, &, 0.9), &g).is_none()); + assert!(buf.push(&frame(0, "s", 2, 20, &, 0.9), &g).is_none()); + assert_eq!(buf.pending_frame_count(), 3); + let w = buf.push(&frame(0, "s", 3, 30, &, 0.9), &g).expect("window"); + assert_eq!(w.frame_count, 4); + assert_eq!(buf.pending_frame_count(), 0); + assert!(w.validate().is_ok()); + } + + #[test] + fn closes_on_duration_with_fewer_frames() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 100, 1_000); + let amp = [1.0f32, 2.0]; + assert!(buf.push(&frame(0, "s", 0, 0, &, 0.8), &g).is_none()); + assert!(buf.push(&frame(0, "s", 1, 500, &, 0.8), &g).is_none()); + let w = buf + .push(&frame(0, "s", 2, 1_000, &, 0.8), &g) + .expect("window closed on duration"); + assert_eq!(w.frame_count, 3); + assert_eq!(w.start_ns, 0); + assert_eq!(w.end_ns, 1_000); + assert!(w.validate().is_ok()); + } + + #[test] + fn flush_returns_remainder_and_handles_single_frame() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 10, u64::MAX); + let amp = [1.0f32, 1.0]; + assert!(buf.push(&frame(0, "s", 0, 100, &, 0.7), &g).is_none()); + let w = buf.flush(&g).expect("flush returns the single buffered frame"); + assert_eq!(w.frame_count, 1); + assert_eq!(w.start_ns, 100); + assert_eq!(w.end_ns, 101); // bumped so validate() passes + assert_eq!(w.motion_energy, 0.0); + assert!(w.validate().is_ok()); + assert!(buf.flush(&g).is_none()); + } + + #[test] + fn skips_mismatched_session_and_source() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(7), SourceId::from("good"), 4, u64::MAX); + let amp = [1.0f32, 1.0]; + assert!(buf.push(&frame(7, "good", 0, 0, &, 0.9), &g).is_none()); + // Wrong session. + assert!(buf.push(&frame(8, "good", 1, 10, &, 0.9), &g).is_none()); + // Wrong source. + assert!(buf.push(&frame(7, "bad", 2, 20, &, 0.9), &g).is_none()); + assert_eq!(buf.pending_frame_count(), 1); + } + + #[test] + fn skips_non_exposable_and_mismatched_subcarrier_count() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 4, u64::MAX); + // Non-exposable frame is dropped. + let mut bad = frame(0, "s", 0, 0, &[1.0, 1.0], 0.9); + bad.validation = ValidationStatus::Pending; + assert!(buf.push(&bad, &g).is_none()); + assert_eq!(buf.pending_frame_count(), 0); + // First good frame fixes subcarrier count = 2. + assert!(buf.push(&frame(0, "s", 1, 10, &[1.0, 1.0], 0.9), &g).is_none()); + // Different subcarrier count is dropped. + assert!(buf + .push(&frame(0, "s", 2, 20, &[1.0, 1.0, 1.0], 0.9), &g) + .is_none()); + assert_eq!(buf.pending_frame_count(), 1); + } + + #[test] + fn identical_frames_have_zero_motion_low_presence() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 8, u64::MAX); + let amp = [1.0f32; 32]; + let mut last = None; + for k in 0..8u64 { + last = buf.push(&frame(0, "s", k, k * 10, &, 0.9), &g); + } + let w = last.expect("window"); + assert_eq!(w.motion_energy, 0.0); + assert!(w.presence_score < 0.5, "presence_score = {}", w.presence_score); + assert!(w.validate().is_ok()); + } + + #[test] + fn growing_jitter_raises_motion_and_presence() { + let g = IdGenerator::new(); + let mut buf = WindowBuffer::new(SessionId(0), SourceId::from("s"), 16, u64::MAX); + // Large alternating jitter -> high motion energy. + let mut last = None; + for k in 0..16u64 { + let bump = if k % 2 == 0 { 0.0 } else { 1.0 }; + let amp: Vec = (0..32).map(|_| 1.0 + bump).collect(); + last = buf.push(&frame(0, "s", k, k * 10, &, 0.9), &g); + } + let w = last.expect("window"); + assert!(w.motion_energy > 0.1, "motion_energy = {}", w.motion_energy); + assert!(w.presence_score > 0.5, "presence_score = {}", w.presence_score); + assert!(w.validate().is_ok()); + } +}