197 lines
7.1 KiB
Python
197 lines
7.1 KiB
Python
"""ADR-117 P3 — Tests for vital-sign extraction bindings.
|
|
|
|
Covers:
|
|
|
|
- VitalStatus enum (eq, eq_int, hash, frozen)
|
|
- VitalEstimate construction + getters + immutability
|
|
- VitalReading composite + getters
|
|
- BreathingExtractor + HeartRateExtractor — esp32_default, explicit
|
|
ctor, extract() return type, validation behaviour
|
|
|
|
The Rust pipeline is unit-tested in `v2/crates/wifi-densepose-vitals/`.
|
|
These tests are deliberately scoped to the *binding* layer — does the
|
|
Python surface return the right shapes, raise the right errors, and
|
|
release the GIL safely.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from random import Random
|
|
|
|
import pytest
|
|
|
|
import wifi_densepose
|
|
from wifi_densepose import (
|
|
BreathingExtractor,
|
|
HeartRateExtractor,
|
|
VitalEstimate,
|
|
VitalReading,
|
|
VitalStatus,
|
|
)
|
|
|
|
|
|
# ─── VitalStatus enum ────────────────────────────────────────────────
|
|
|
|
|
|
def test_vital_status_variants_present() -> None:
|
|
assert VitalStatus.Valid != VitalStatus.Degraded
|
|
assert VitalStatus.Unreliable != VitalStatus.Unavailable
|
|
|
|
|
|
def test_vital_status_equality_against_int() -> None:
|
|
# eq_int → enum can be compared to int (PyO3 0.22 surface)
|
|
assert VitalStatus.Valid == 0
|
|
assert VitalStatus.Unavailable == 3
|
|
|
|
|
|
def test_vital_status_is_hashable() -> None:
|
|
# frozen + hash → can be used as dict key / set member
|
|
s = {VitalStatus.Valid, VitalStatus.Valid, VitalStatus.Degraded}
|
|
assert len(s) == 2
|
|
|
|
|
|
def test_vital_status_repr_contains_variant_name() -> None:
|
|
r = repr(VitalStatus.Valid)
|
|
assert "VitalStatus" in r and "Valid" in r
|
|
|
|
|
|
# ─── VitalEstimate ───────────────────────────────────────────────────
|
|
|
|
|
|
def test_vital_estimate_construction_and_getters() -> None:
|
|
est = VitalEstimate(value_bpm=72.4, confidence=0.85, status=VitalStatus.Valid)
|
|
assert math.isclose(est.value_bpm, 72.4)
|
|
assert math.isclose(est.confidence, 0.85)
|
|
assert est.status == VitalStatus.Valid
|
|
|
|
|
|
def test_vital_estimate_is_frozen() -> None:
|
|
est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid)
|
|
with pytest.raises(AttributeError):
|
|
est.value_bpm = 100.0 # type: ignore[misc]
|
|
|
|
|
|
def test_vital_estimate_repr_is_readable() -> None:
|
|
est = VitalEstimate(value_bpm=72.0, confidence=0.9, status=VitalStatus.Valid)
|
|
r = repr(est)
|
|
assert "VitalEstimate" in r
|
|
assert "72" in r
|
|
|
|
|
|
# ─── VitalReading ────────────────────────────────────────────────────
|
|
|
|
|
|
def test_vital_reading_construction_and_getters() -> None:
|
|
br = VitalEstimate(value_bpm=14.0, confidence=0.9, status=VitalStatus.Valid)
|
|
hr = VitalEstimate(value_bpm=72.0, confidence=0.8, status=VitalStatus.Degraded)
|
|
reading = VitalReading(
|
|
respiratory_rate=br,
|
|
heart_rate=hr,
|
|
subcarrier_count=56,
|
|
signal_quality=0.77,
|
|
timestamp_secs=1700000000.5,
|
|
)
|
|
assert reading.respiratory_rate.value_bpm == 14.0
|
|
assert reading.heart_rate.status == VitalStatus.Degraded
|
|
assert reading.subcarrier_count == 56
|
|
assert math.isclose(reading.signal_quality, 0.77)
|
|
assert math.isclose(reading.timestamp_secs, 1700000000.5)
|
|
|
|
|
|
# ─── BreathingExtractor ──────────────────────────────────────────────
|
|
|
|
|
|
def test_breathing_esp32_default_constructs() -> None:
|
|
br = BreathingExtractor.esp32_default()
|
|
assert br is not None
|
|
assert "BreathingExtractor" in repr(br)
|
|
|
|
|
|
def test_breathing_explicit_ctor() -> None:
|
|
br = BreathingExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=20.0)
|
|
assert br is not None
|
|
|
|
|
|
def test_breathing_extract_returns_none_with_too_few_samples() -> None:
|
|
"""One frame can't produce a 30-second window — must return None.
|
|
|
|
Verifies the binding propagates Rust's `Option<VitalEstimate>` →
|
|
Python None correctly (vs raising or returning a default).
|
|
"""
|
|
br = BreathingExtractor.esp32_default()
|
|
out = br.extract(residuals=[0.0] * 56, weights=[])
|
|
assert out is None
|
|
|
|
|
|
def test_breathing_extract_accepts_empty_weights() -> None:
|
|
"""Empty weights vector means "equal weight per subcarrier" by
|
|
convention (per breathing.rs)."""
|
|
br = BreathingExtractor.esp32_default()
|
|
out = br.extract(residuals=[0.01] * 56, weights=[])
|
|
# Even with synthetic input it may return None until enough history
|
|
# accumulates — what matters is that the call doesn't panic.
|
|
assert out is None or isinstance(out, VitalEstimate)
|
|
|
|
|
|
def test_breathing_extract_with_synthetic_signal() -> None:
|
|
"""Drive the extractor with a synthetic 0.25 Hz sine (15 BPM) for
|
|
enough samples to fill the 30-second window. Don't assert the exact
|
|
BPM — just that the extractor *eventually* produces a result (rather
|
|
than returning None forever)."""
|
|
br = BreathingExtractor.esp32_default()
|
|
sample_rate = 100.0
|
|
target_freq = 0.25 # 15 BPM
|
|
# Run 40 seconds of synthetic data — comfortably past the 30s window.
|
|
n_samples = int(40 * sample_rate)
|
|
weights = [1.0] * 56
|
|
|
|
produced_estimate = False
|
|
rng = Random(42)
|
|
for i in range(n_samples):
|
|
t = i / sample_rate
|
|
base = math.sin(2.0 * math.pi * target_freq * t)
|
|
# Per-subcarrier residual: same signal + small per-carrier noise
|
|
residuals = [base + rng.gauss(0.0, 0.01) for _ in range(56)]
|
|
est = br.extract(residuals=residuals, weights=weights)
|
|
if est is not None:
|
|
produced_estimate = True
|
|
assert isinstance(est.value_bpm, float)
|
|
assert 0.0 <= est.confidence <= 1.0
|
|
assert est.status in (
|
|
VitalStatus.Valid,
|
|
VitalStatus.Degraded,
|
|
VitalStatus.Unreliable,
|
|
VitalStatus.Unavailable,
|
|
)
|
|
break
|
|
|
|
assert produced_estimate, "BreathingExtractor never produced an estimate after 40s of synthetic data"
|
|
|
|
|
|
# ─── HeartRateExtractor ──────────────────────────────────────────────
|
|
|
|
|
|
def test_heart_rate_esp32_default_constructs() -> None:
|
|
hr = HeartRateExtractor.esp32_default()
|
|
assert hr is not None
|
|
assert "HeartRateExtractor" in repr(hr)
|
|
|
|
|
|
def test_heart_rate_explicit_ctor() -> None:
|
|
hr = HeartRateExtractor(n_subcarriers=64, sample_rate=200.0, window_secs=10.0)
|
|
assert hr is not None
|
|
|
|
|
|
def test_heart_rate_extract_returns_none_with_too_few_samples() -> None:
|
|
hr = HeartRateExtractor.esp32_default()
|
|
out = hr.extract(residuals=[0.0] * 56, weights=[])
|
|
assert out is None
|
|
|
|
|
|
# ─── Build feature flag ──────────────────────────────────────────────
|
|
|
|
|
|
def test_p3_vitals_in_build_features() -> None:
|
|
assert "p3-vitals-bindings" in wifi_densepose.__build_features__
|