86 lines
3.2 KiB
Python
86 lines
3.2 KiB
Python
"""ADR-117 hardening sweep — Benchmarks for the P3 vitals hot paths.
|
||
|
||
Targets the ESP32 production rate: 100 Hz × 56 subcarriers, which is
|
||
what `BreathingExtractor.esp32_default()` is tuned for. The bench
|
||
asserts the *per-extract* cost is comfortably below 10 ms — at 100 Hz
|
||
that's the entire frame budget, so anything above 10 ms means the
|
||
Python binding would be the bottleneck instead of the radio.
|
||
|
||
Run with:
|
||
pytest python/bench/ --benchmark-only
|
||
|
||
The benchmarks are skipped by default (`addopts` in pyproject.toml
|
||
doesn't include them) — they live in a sibling `bench/` directory
|
||
so the main test run stays fast.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from random import Random
|
||
|
||
import pytest
|
||
|
||
from wifi_densepose import BreathingExtractor, HeartRateExtractor
|
||
|
||
|
||
def _synth_frame(n_subcarriers: int, sample_rate: float, t: float, freq_hz: float, rng: Random) -> tuple[list[float], list[float]]:
|
||
"""Build one ESP32-shape frame at time `t`: sine at `freq_hz` plus
|
||
tiny per-subcarrier noise."""
|
||
base = math.sin(2.0 * math.pi * freq_hz * t)
|
||
residuals = [base + rng.gauss(0.0, 0.01) for _ in range(n_subcarriers)]
|
||
weights = [1.0] * n_subcarriers
|
||
return residuals, weights
|
||
|
||
|
||
def test_breathing_extract_per_frame_cost(benchmark) -> None:
|
||
"""One BreathingExtractor.extract() at ESP32 defaults should
|
||
finish well under 10 ms — that's the 100 Hz frame budget."""
|
||
br = BreathingExtractor.esp32_default()
|
||
rng = Random(42)
|
||
# Pre-fill ~25 seconds of history so the bench measures the
|
||
# steady-state cost, not the cold-start cost.
|
||
for i in range(2500):
|
||
residuals, weights = _synth_frame(56, 100.0, i / 100.0, 0.25, rng)
|
||
br.extract(residuals=residuals, weights=weights)
|
||
|
||
def _one_frame():
|
||
residuals, weights = _synth_frame(56, 100.0, 30.0, 0.25, rng)
|
||
return br.extract(residuals=residuals, weights=weights)
|
||
|
||
benchmark(_one_frame)
|
||
|
||
|
||
def test_heart_rate_extract_per_frame_cost(benchmark) -> None:
|
||
"""One HeartRateExtractor.extract() at ESP32 defaults — same 10 ms
|
||
target."""
|
||
hr = HeartRateExtractor.esp32_default()
|
||
rng = Random(43)
|
||
for i in range(1500):
|
||
residuals, weights = _synth_frame(56, 100.0, i / 100.0, 1.2, rng)
|
||
hr.extract(residuals=residuals, weights=weights)
|
||
|
||
def _one_frame():
|
||
residuals, weights = _synth_frame(56, 100.0, 16.0, 1.2, rng)
|
||
return hr.extract(residuals=residuals, weights=weights)
|
||
|
||
benchmark(_one_frame)
|
||
|
||
|
||
@pytest.mark.parametrize("n_subcarriers", [56, 114, 242])
|
||
def test_breathing_extract_scaling(benchmark, n_subcarriers: int) -> None:
|
||
"""Sanity check: cost should scale roughly linearly with the
|
||
subcarrier count. Catches accidental O(n^2) regressions."""
|
||
sample_rate = 100.0
|
||
br = BreathingExtractor(n_subcarriers, sample_rate, 30.0)
|
||
rng = Random(n_subcarriers)
|
||
for i in range(2500):
|
||
residuals, weights = _synth_frame(n_subcarriers, sample_rate, i / sample_rate, 0.25, rng)
|
||
br.extract(residuals=residuals, weights=weights)
|
||
|
||
def _one_frame():
|
||
residuals, weights = _synth_frame(n_subcarriers, sample_rate, 30.0, 0.25, rng)
|
||
return br.extract(residuals=residuals, weights=weights)
|
||
|
||
benchmark(_one_frame)
|