"""ADR-117 P3 — Tests for vital-sign extraction bindings. Covers: - VitalStatus enum (eq, eq_int, hash, frozen) - VitalEstimate construction + getters + immutability - VitalReading composite + getters - BreathingExtractor + HeartRateExtractor — esp32_default, explicit ctor, extract() return type, validation behaviour The Rust pipeline is unit-tested in `v2/crates/wifi-densepose-vitals/`. These tests are deliberately scoped to the *binding* layer — does the Python surface return the right shapes, raise the right errors, and release the GIL safely. """ from __future__ import annotations import math from random import Random import pytest import wifi_densepose from wifi_densepose import ( BreathingExtractor, HeartRateExtractor, VitalEstimate, VitalReading, VitalStatus, ) # ─── VitalStatus enum ──────────────────────────────────────────────── def test_vital_status_variants_present() -> None: assert VitalStatus.Valid != VitalStatus.Degraded assert VitalStatus.Unreliable != VitalStatus.Unavailable def test_vital_status_equality_against_int() -> None: # eq_int → enum can be compared to int (PyO3 0.22 surface) assert VitalStatus.Valid == 0 assert VitalStatus.Unavailable == 3 def test_vital_status_is_hashable() -> None: # frozen + hash → can be used as dict key / set member s = {VitalStatus.Valid, VitalStatus.Valid, VitalStatus.Degraded} assert len(s) == 2 def test_vital_status_repr_contains_variant_name() -> None: r = repr(VitalStatus.Valid) assert "VitalStatus" in r and "Valid" in r # ─── VitalEstimate ─────────────────────────────────────────────────── def test_vital_estimate_construction_and_getters() -> None: est = VitalEstimate(value_bpm=72.4, confidence=0.85, status=VitalStatus.Valid) assert math.isclose(est.value_bpm, 72.4) assert math.isclose(est.confidence, 0.85) assert est.status == VitalStatus.Valid def test_vital_estimate_is_frozen() -> None: est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid) with pytest.raises(AttributeError): est.value_bpm = 100.0 # type: ignore[misc] def test_vital_estimate_repr_is_readable() -> None: est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid) r = repr(est) assert "VitalEstimate" in r assert "72" in r # ─── VitalReading ──────────────────────────────────────────────────── def test_vital_reading_construction_and_getters() -> None: br = VitalEstimate(value_bpm=14.0, confidence=0.9, status=VitalStatus.Valid) hr = VitalEstimate(value_bpm=72.0, confidence=0.8, status=VitalStatus.Degraded) reading = VitalReading( respiratory_rate=br, heart_rate=hr, subcarrier_count=56, signal_quality=0.77, timestamp_secs=1700000000.5, ) assert reading.respiratory_rate.value_bpm == 14.0 assert reading.heart_rate.status == VitalStatus.Degraded assert reading.subcarrier_count == 56 assert math.isclose(reading.signal_quality, 0.77) assert math.isclose(reading.timestamp_secs, 1700000000.5) # ─── BreathingExtractor ────────────────────────────────────────────── def test_breathing_esp32_default_constructs() -> None: br = BreathingExtractor.esp32_default() assert br is not None assert "BreathingExtractor" in repr(br) def test_breathing_explicit_ctor() -> None: br = BreathingExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=20.0) assert br is not None def test_breathing_extract_returns_none_with_too_few_samples() -> None: """One frame can't produce a 30-second window — must return None. Verifies the binding propagates Rust's `Option` → Python None correctly (vs raising or returning a default). """ br = BreathingExtractor.esp32_default() out = br.extract(residuals=[0.0] * 56, weights=[]) assert out is None def test_breathing_extract_accepts_empty_weights() -> None: """Empty weights vector means "equal weight per subcarrier" by convention (per breathing.rs).""" br = BreathingExtractor.esp32_default() out = br.extract(residuals=[0.01] * 56, weights=[]) # Even with synthetic input it may return None until enough history # accumulates — what matters is that the call doesn't panic. assert out is None or isinstance(out, VitalEstimate) def test_breathing_extract_with_synthetic_signal() -> None: """Drive the extractor with a synthetic 0.25 Hz sine (15 BPM) for enough samples to fill the 30-second window. Don't assert the exact BPM — just that the extractor *eventually* produces a result (rather than returning None forever).""" br = BreathingExtractor.esp32_default() sample_rate = 100.0 target_freq = 0.25 # 15 BPM # Run 40 seconds of synthetic data — comfortably past the 30s window. n_samples = int(40 * sample_rate) weights = [1.0] * 56 produced_estimate = False rng = Random(42) for i in range(n_samples): t = i / sample_rate base = math.sin(2.0 * math.pi * target_freq * t) # Per-subcarrier residual: same signal + small per-carrier noise residuals = [base + rng.gauss(0.0, 0.01) for _ in range(56)] est = br.extract(residuals=residuals, weights=weights) if est is not None: produced_estimate = True assert isinstance(est.value_bpm, float) assert 0.0 <= est.confidence <= 1.0 assert est.status in ( VitalStatus.Valid, VitalStatus.Degraded, VitalStatus.Unreliable, VitalStatus.Unavailable, ) break assert produced_estimate, "BreathingExtractor never produced an estimate after 40s of synthetic data" # ─── HeartRateExtractor ────────────────────────────────────────────── def test_heart_rate_esp32_default_constructs() -> None: hr = HeartRateExtractor.esp32_default() assert hr is not None assert "HeartRateExtractor" in repr(hr) def test_heart_rate_explicit_ctor() -> None: hr = HeartRateExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=10.0) assert hr is not None def test_heart_rate_extract_returns_none_with_too_few_samples() -> None: hr = HeartRateExtractor.esp32_default() out = hr.extract(residuals=[0.0] * 56, weights=[]) assert out is None # ─── Build feature flag ────────────────────────────────────────────── def test_p3_vitals_in_build_features() -> None: assert "p3-vitals-bindings" in wifi_densepose.__build_features__