feat(rvcsi): rvcsi-dsp (DSP stages + SignalPipeline) + ADR-096 (FFI/crate layout)

- rvcsi-dsp — reusable signal-processing stages (ADR-095 FR4): mean/variance/
  std_dev/median, remove_dc_offset, unwrap_phase, moving_average, ewma,
  hampel_filter(_count), short_window_variance, subtract_baseline + DspError;
  scalar features motion_energy(_series), presence_score (logistic, ≈0.5 at
  threshold), confidence_score, breathing_band_estimate (heuristic, FFT-free);
  SignalPipeline (hampel → smooth → DC-remove → baseline-subtract → unwrap,
  non-destructive of validation state) + learn_baseline. 28 tests, clippy-clean,
  forbid(unsafe_code), no heavy deps.
- docs/adr/ADR-096-rvcsi-ffi-crate-layout.md — the implementation ADR: 8-crate
  topology, the napi-c shim record format + contract, the napi-rs Node surface,
  build/test invariants, alternatives. Indexed in docs/adr/README.md.
- CHANGELOG: rvCSI entry updated to cover the implementation crates.

https://claude.ai/code/session_01CdYAPvRTjcch6YrYf42n1z
This commit is contained in:
Claude 2026-05-13 00:00:40 +00:00
parent 1e684cb208
commit 94745242a8
No known key found for this signature in database
7 changed files with 1181 additions and 6 deletions

View File

@ -19,7 +19,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
tracked in the PR.
### Added
- **rvCSI platform design — PRD, ADR-095, and DDD domain model.** New design documents for **rvCSI**, a Rust-first / TypeScript-accessible / hardware-abstracted edge RF sensing runtime that normalizes CSI from Nexmon, ESP32, Intel, Atheros, file, and replay sources into one validated `CsiFrame` schema, runs reusable DSP, emits typed confidence-scored events, and bridges to RuVector RF memory, an MCP tool server, and a TypeScript SDK. Adds `docs/prd/rvcsi-platform-prd.md` (purpose, users, success criteria, FR1FR10, NFRs, system architecture, data model), `docs/adr/ADR-095-rvcsi-edge-rf-sensing-platform.md` (the 15 architectural decisions: Rust core, C-at-the-boundary, TS SDK via napi-rs, normalized schema, validate-before-FFI, CSI-as-temporal-delta, RuVector as RF memory, replayability, detection≠decision, local-first, read-first/write-gated MCP, mandatory quality scoring, versioned calibration, plugin adapters), and `docs/ddd/rvcsi-domain-model.md` (7 bounded contexts: Capture, Validation, Signal, Calibration, Event, Memory, Agent — with aggregates, invariants, context map, and domain services). Indexed in `docs/adr/README.md` and `docs/ddd/README.md`. Design-only; no code or crates added yet.
- **rvCSI — edge RF sensing runtime (design + first implementation).** New subsystem **rvCSI**: a Rust-first / TypeScript-accessible / hardware-abstracted edge RF sensing runtime that normalizes WiFi CSI from Nexmon, ESP32, Intel, Atheros, file and replay sources into one validated `CsiFrame` schema, runs reusable DSP, emits typed confidence-scored events, and bridges to RuVector RF memory, an MCP tool server and a TS SDK.
- **Design docs:** `docs/prd/rvcsi-platform-prd.md` (purpose, users, success criteria, FR1FR10, NFRs, system architecture, data model); `docs/adr/ADR-095-rvcsi-edge-rf-sensing-platform.md` (the 15 architectural decisions: Rust core, C-at-the-boundary, TS SDK via napi-rs, normalized schema, validate-before-FFI, CSI-as-temporal-delta, RuVector as RF memory, replayability, detection≠decision, local-first, read-first/write-gated MCP, mandatory quality scoring, versioned calibration, plugin adapters); `docs/adr/ADR-096-rvcsi-ffi-crate-layout.md` (crate topology, the napi-c shim record format & contract, the napi-rs Node surface, build/test invariants); `docs/ddd/rvcsi-domain-model.md` (7 bounded contexts: Capture, Validation, Signal, Calibration, Event, Memory, Agent — with aggregates, invariants, context map and domain services). Indexed in `docs/adr/README.md` and `docs/ddd/README.md`.
- **Crates** (`v2/crates/rvcsi-*`, registered in the workspace): `rvcsi-core` (normalized `CsiFrame`/`CsiWindow`/`CsiEvent` schema, `AdapterProfile`, `CsiSource` plugin trait, id newtypes + `IdGenerator`, `RvcsiError`, the `validate_frame` pipeline + quality scoring; `forbid(unsafe_code)`); `rvcsi-adapter-nexmon` — the **napi-c** seam: `native/rvcsi_nexmon_shim.{c,h}` (the only C in the runtime — allocation-free, bounds-checked, parses/writes a byte-defined "rvCSI Nexmon record", a normalized superset of the nexmon_csi UDP payload), compiled via `build.rs`+`cc`, wrapped by a documented `ffi` module and a `NexmonAdapter` `CsiSource`; `rvcsi-dsp` (DC removal, phase unwrap, smoothing, Hampel/MAD filter, sliding variance, baseline subtraction, motion-energy/presence/confidence features, heuristic breathing-band estimate, non-destructive `SignalPipeline`); `rvcsi-events` (`WindowBuffer`, the `EventDetector` trait + presence/motion/quality/baseline-drift state machines, `EventPipeline`); `rvcsi-adapter-file` (the `.rvcsi` JSONL capture format, `FileRecorder`, `FileReplayAdapter` deterministic replay); `rvcsi-ruvector` (deterministic window/event embeddings, the `RfMemoryStore` trait, `InMemoryRfMemory` + `JsonlRfMemory` — a standin until the production RuVector binding); `rvcsi-node` — the **napi-rs** seam (a `["cdylib","rlib"]` Node addon, `build.rs` runs `napi_build::setup()`); `rvcsi-cli` (the `rvcsi` binary: inspect/replay/health/export/calibrate/stream). `cargo test --workspace --no-default-features` stays green.
- **`wifi-densepose-train`: `signal_features` module — wires `wifi-densepose-signal` into the training pipeline.** `wifi-densepose-signal` was previously a phantom dependency of `wifi-densepose-train` (listed in `Cargo.toml`, never imported). New `wifi_densepose_train::signal_features::extract_signal_features` (and `CsiSample::signal_features()`) run a windowed CSI observation's centre frame through `wifi_densepose_signal::features::FeatureExtractor`, producing a fixed-length (`FEATURE_LEN = 12`) amplitude/phase/PSD feature vector — the hook for a future vitals / multi-task supervision head (breathing- and heart-rate-band power are read off the PSD summary). The vector is produced on demand and not yet fed back into the loss. Surfaced by the 2026-05-11 training-pipeline audit (findings #1 "vitals features absent from training" and #2 "`wifi-densepose-signal` ghost dep").
- **`wifi-densepose-train`: `TrainingConfig` subcarrier-layout presets + a real-loader integration test.** New `TrainingConfig::for_subcarriers(native, target)` plus named presets `ht40_192()` (≈192-sc ESP32 HT40 → 56) and `multiband_168()` (168-sc ADR-078 multi-band mesh → 56), so non-MM-Fi CSI shapes are first-class instead of requiring manual `native_subcarriers`/`num_subcarriers` overrides; field docs now list the supported source counts and the multi-NIC mapping. New `tests/test_real_loader.rs` round-trips synthetic CSI through `.npy` files → `MmFiDataset::discover`/`get` (including the subcarrier-interpolation branch and the empty-root case) — exercising the on-disk loader path the deterministic `verify-training` proof intentionally bypasses. Addresses training-pipeline audit findings #6 (56-sc/1-NIC config default) and #7 (multi-band mesh not in config); the #4 concern ("proof uses synthetic data") is reframed — the proof *should* use a reproducible source, and this test covers the real loader it skips.

View File

@ -0,0 +1,148 @@
# ADR-096: rvCSI — Crate Topology, the napi-c Shim, and the napi-rs Node Surface
| Field | Value |
|-------|-------|
| **Status** | Proposed |
| **Date** | 2026-05-12 |
| **Deciders** | ruv |
| **Codename** | **rvCSI** — RuVector Channel State Information runtime |
| **Relates to** | ADR-095 (rvCSI platform — D1 Rust core, D2 C-at-the-boundary, D3 TS SDK, D4 napi-rs, D5 normalized schema, D6 validate-before-FFI, D15 plugin adapters), ADR-009/ADR-040 (WASM runtimes), ADR-049 (cross-platform WiFi interface detection) |
| **PRD** | [rvCSI Platform PRD](../prd/rvcsi-platform-prd.md) |
| **Domain model** | [rvCSI Domain Model](../ddd/rvcsi-domain-model.md) |
| **Implements** | `v2/crates/rvcsi-core`, `rvcsi-dsp`, `rvcsi-events`, `rvcsi-adapter-file`, `rvcsi-adapter-nexmon`, `rvcsi-ruvector`, `rvcsi-node`, `rvcsi-cli` |
---
## 1. Context
ADR-095 set the platform-level invariant `C → Rust → TypeScript` and the fifteen decisions that constrain rvCSI. This ADR makes the *implementation* concrete: which crates exist, what each owns, where the two FFI seams are (the **napi-c** C shim below Rust, and the **napi-rs** Node addon above it), and the rules that keep `unsafe` confined and the boundary objects validated.
The two seams:
- **napi-c** — the *downward* seam to fragile vendor/firmware/driver code. Per ADR-095 D2, C is the only language allowed here, and only as a thin, allocation-free, bounds-checked shim. The Nexmon family is the first consumer.
- **napi-rs** — the *upward* seam to Node.js/TypeScript. Per ADR-095 D3/D4, the Rust runtime is exposed to JS via [napi-rs](https://napi.rs/); nothing crosses this seam that hasn't been validated (D6) and normalized (D5).
Both seams are *narrow on purpose*: everything in between — parsing, validation, DSP, windowing, event extraction, RuVector export — is safe Rust (`#![forbid(unsafe_code)]` in every crate except `rvcsi-adapter-nexmon`, which needs `extern "C"`).
---
## 2. Decision
### 2.1 Crate topology
Eight new workspace members under `v2/crates/`:
| Crate | `unsafe`? | Depends on | Owns |
|-------|-----------|------------|------|
| `rvcsi-core` | no (`forbid`) | — (serde, thiserror) | The normalized schema (`CsiFrame`/`CsiWindow`/`CsiEvent`), `AdapterProfile`, the `CsiSource` plugin trait, id newtypes + `IdGenerator`, `RvcsiError`, and the `validate_frame` pipeline + quality scoring. The shared kernel. |
| `rvcsi-dsp` | no (`forbid`) | `rvcsi-core` | Reusable DSP stages (DC removal, phase unwrap, smoothing, Hampel/MAD outlier filter, sliding variance, baseline subtraction) and scalar features (motion energy, presence score, confidence, heuristic breathing-band estimate), plus a non-destructive `SignalPipeline::process_frame`. |
| `rvcsi-events` | no (`forbid`) | `rvcsi-core` | `WindowBuffer` (frames → `CsiWindow`), the `EventDetector` trait + presence/motion/quality/baseline-drift state machines, and `EventPipeline` (windows → `CsiEvent`s). |
| `rvcsi-adapter-file` | no (`forbid`) | `rvcsi-core` | The `.rvcsi` capture format (JSONL: a header line + one `CsiFrame` per line), `FileRecorder`, and `FileReplayAdapter` (a `CsiSource`) — deterministic replay (D9). |
| `rvcsi-adapter-nexmon` | **yes** (FFI only) | `rvcsi-core` + the C shim | The **napi-c** seam: `native/rvcsi_nexmon_shim.{c,h}` compiled via `build.rs`+`cc`, a documented `ffi` module wrapping it, and `NexmonAdapter` (a `CsiSource`). |
| `rvcsi-ruvector` | no (`forbid`) | `rvcsi-core` | The RuVector RF-memory bridge: deterministic `window_embedding`/`event_embedding`, the `RfMemoryStore` trait, and `InMemoryRfMemory` + `JsonlRfMemory` (a standin until the production RuVector binding lands). |
| `rvcsi-node` | no (`deny(clippy::all)`) | all of the above | The **napi-rs** seam: the `.node` addon (cdylib + rlib) exposing a safe TS-facing surface; `build.rs` runs `napi_build::setup()`. |
| `rvcsi-cli` | no | core, dsp, events, adapter-file, adapter-nexmon, ruvector | The `rvcsi` binary: `inspect`, `replay`, `health`, `export`, `calibrate`, `stream` (ADR-095 FR7). |
`rvcsi-events` does **not** call into `rvcsi-dsp`: window statistics are simple enough to compute in `WindowBuffer` itself, and keeping the two leaves independent removes a coordination point. Higher layers (the daemon, `rvcsi-node`, `rvcsi-cli`) wire `SignalPipeline::process_frame``WindowBuffer::push` when they want cleaned frames.
The TypeScript SDK (`@ruv/rvcsi`) and the MCP tool server (`rvcsi-mcp`) and the long-running daemon (`rvcsi-daemon`) are *not* in this ADR's scope; they sit on top of `rvcsi-node` / the crates above and are tracked as follow-ups.
### 2.2 The napi-c shim — record format and contract
`native/rvcsi_nexmon_shim.{c,h}` is the only C in the runtime. It parses (and, for the recorder and tests, writes) a compact, byte-defined **"rvCSI Nexmon record"** — a normalized superset of the nexmon_csi UDP payload (magic, RSSI, chanspec, then interleaved `int16` I/Q in Q8.8 fixed point):
```
off size field
0 4 magic = 0x52564E58 ('R','V','N','X')
4 1 version = 1
5 1 flags (bit0 rssi present, bit1 noise floor present)
6 2 subcarrier_count N (1 .. 2048)
8 1 rssi_dbm (int8, valid iff flags bit0)
9 1 noise_dbm (int8, valid iff flags bit1)
10 2 channel (uint16)
12 2 bandwidth_mhz (uint16)
14 2 reserved (0)
16 8 timestamp_ns (uint64)
24 4*N N pairs of int16 (i, q), Q8.8 fixed point
total = 24 + 4*N
```
Contract:
- **Allocation-free, global-free.** Every read is bounds-checked against the caller-supplied length; nothing can scribble outside caller buffers.
- **Structured errors, never panics.** `rvcsi_nx_parse_record` returns one of a small set of `RvcsiNxError` codes (`TOO_SHORT`, `BAD_MAGIC`, `BAD_VERSION`, `CAPACITY`, `TRUNCATED`, `ZERO_SUBCARRIERS`, `TOO_MANY_SUBCARRIERS`, `NULL_ARG`); `rvcsi_nx_strerror` maps each to a static string.
- **ABI versioned.** `rvcsi_nx_abi_version()` returns `major<<16 | minor`; the Rust side `debug_assert`s the major matches the header it was compiled against.
- The Rust `ffi` module wraps these in safe functions (`record_len`, `decode_record`, `encode_record`, `shim_abi_version`); the `unsafe` blocks are limited to the FFI calls themselves and each carries a `// SAFETY:` comment, per the project rule.
A real Nexmon deployment feeds the UDP stream (or a PCAP demux) of these records to `NexmonAdapter::from_bytes`; `from_file` reads a capture dump. Production live capture (binding the UDP socket, monitor mode, firmware patch hooks) is a later increment that reuses the same record contract — the shim's job is the *parse*, not the *socket*.
### 2.3 The napi-rs surface — what crosses the seam
`rvcsi-node` is a `["cdylib", "rlib"]` crate (cdylib = the `.node` addon; rlib so `cargo test --workspace` can link and test the Rust side without Node). Rules:
- **Only normalized/validated data crosses.** The boundary types are JS-friendly mirrors of `CsiFrame`/`CsiWindow`/`CsiEvent`/`AdapterProfile`/`SourceHealth`, or plain JSON strings — never raw pointers, never `Pending` frames. A frame is run through `rvcsi_core::validate_frame` before it is handed to JS.
- **Errors map to JS exceptions** via napi-rs's `Result` integration; `RvcsiError`'s `Display` is the message.
- **The build emits link args + `index.d.ts`/`index.js`** via `napi_build::setup()` in `build.rs`; the `@ruv/rvcsi` npm package wraps the prebuilt addon and re-exports the generated `.d.ts`.
- The addon also re-exports `nexmon_shim_abi_version()` so a JS caller can confirm the linked napi-c shim's ABI.
### 2.4 Build & test invariants
- `cargo build --workspace` and `cargo test --workspace --no-default-features` (the repo's pre-merge gate) must stay green; the new crates add tests and don't regress the existing 1,031+.
- `rvcsi-node` stays a workspace *member* (not `exclude`d like `wifi-densepose-wasm-edge`): on Linux/macOS a napi cdylib links fine with Node symbols left undefined (resolved at addon-load time), so `cargo build`/`cargo test` work without a Node toolchain. Only `napi build` (npm packaging) needs Node.
- No new heavy dependencies in the rvCSI crates: `serde`, `serde_json`, `thiserror`, `cc` (build only), `napi`/`napi-derive`/`napi-build`, `clap` (CLI only), `tempfile` (dev only). DSP math is hand-rolled — no `ndarray`/`rustfft`.
---
## 3. Consequences
**Positive**
- The two FFI seams are small, audited, and independently testable: the C shim round-trips through Rust tests; the napi surface tests run under `cargo test` without Node.
- `unsafe` is confined to one crate (`rvcsi-adapter-nexmon`) and within it to one module (`ffi`), every block documented.
- Each leaf crate (`rvcsi-dsp`, `rvcsi-events`, `rvcsi-adapter-file`, `rvcsi-ruvector`) depends only on `rvcsi-core`, so they can evolve (and be reviewed, and be swarm-implemented) independently.
- The `.rvcsi` JSONL capture format and the `JsonlRfMemory` standin make the whole pipeline runnable and testable end-to-end before any hardware or the real RuVector binding exists.
**Negative / costs**
- A `cc`-built C library means a C toolchain is required to build `rvcsi-adapter-nexmon` (already true for many workspace crates via transitive `cc` deps; acceptable).
- The "rvCSI Nexmon record" is a *normalized* format, not byte-identical to any upstream nexmon_csi build — a thin demux/transcode step is needed when wiring real Nexmon output. This is intentional (we control the contract the shim parses) and documented.
- JSONL captures are larger than a packed binary format; fine for v0 (and the PRD already standardizes on JSON/WebSocket on the wire), revisit if capture size becomes a problem.
- `rvcsi-node` as a workspace member adds the `napi` dependency tree to `cargo build --workspace`; mitigated by it being a small, well-maintained crate.
**Risks**
- napi-rs major-version churn could change the macro/`build.rs` surface; pinned to `napi = "2.16"` in workspace deps, bumped deliberately.
- If a future platform can't link a napi cdylib under plain `cargo build`, `rvcsi-node` moves to the workspace `exclude` list (like `wifi-densepose-wasm-edge`) with a separate build command — same pattern, already established.
---
## 4. Alternatives considered
| Alternative | Why not |
|-------------|---------|
| One mega-crate `rvcsi` instead of eight | Couples DSP/events/adapters/FFI; can't review or implement them independently; bloats compile units for downstream users who only want `rvcsi-core`. |
| `bindgen` for the C shim | Pulls in `libclang`; the shim's C API is six functions — hand-written `extern "C"` decls are clearer and dependency-free. |
| Binary `.rvcsi` capture format (bincode/custom) | Smaller, but not human-inspectable; JSONL is debuggable, append-friendly, and matches the PRD's on-the-wire JSON. Revisit if size matters. |
| Expose raw `CsiFrame` pointers / typed arrays across napi for zero-copy | Violates ADR-095 D6 (validate-before-FFI) and the "no raw pointers to TS" safety NFR; the per-frame copy cost is negligible at the target rates. |
| `wasm-bindgen` instead of napi-rs for the JS surface | WASM can't do live capture (no raw sockets/serial); great for offline parsing (a later target) but not the primary Node runtime. |
| `rvcsi-events` depending on `rvcsi-dsp` for window stats | Adds a coordination point for two leaf crates; the stats are a few lines — keep the leaves independent and let higher layers compose them. |
---
## 5. Status of the implementation (this PR)
- `rvcsi-core` — implemented, `forbid(unsafe_code)`, 29 unit tests.
- `rvcsi-adapter-nexmon` + the napi-c shim — implemented; C compiled via `build.rs`+`cc`; `ffi` wrappers + `NexmonAdapter`; 9 tests round-tripping through the C shim.
- `rvcsi-dsp`, `rvcsi-events`, `rvcsi-adapter-file`, `rvcsi-ruvector` — implemented (parallel swarm), each with its own test suite.
- `rvcsi-node` (napi-rs surface) and `rvcsi-cli` — implemented (the addon's Rust surface + the `rvcsi` subcommands); the `@ruv/rvcsi` npm wrapper and a Node smoke test ship alongside.
- `rvcsi-mcp` (MCP tool server) and `rvcsi-daemon` (long-running capture service) — not in this PR; tracked as follow-ups on top of `rvcsi-node`.
---
## 6. References
- [ADR-095 — rvCSI Edge RF Sensing Platform](ADR-095-rvcsi-edge-rf-sensing-platform.md)
- [rvCSI Platform PRD](../prd/rvcsi-platform-prd.md)
- [rvCSI Domain Model](../ddd/rvcsi-domain-model.md)
- napi-rs — https://napi.rs/
- nexmon_csi — the upstream Broadcom CSI extractor the record format normalizes

View File

@ -106,6 +106,7 @@ Statuses: **Proposed** (under discussion), **Accepted** (approved and/or impleme
| [ADR-026](ADR-026-survivor-track-lifecycle.md) | Survivor Track Lifecycle (MAT crate) | Accepted |
| [ADR-038](ADR-038-sublinear-goal-oriented-action-planning.md) | Sublinear GOAP for Roadmap Optimization | Proposed |
| [ADR-095](ADR-095-rvcsi-edge-rf-sensing-platform.md) | rvCSI — Edge RF Sensing Runtime Platform | Proposed |
| [ADR-096](ADR-096-rvcsi-ffi-crate-layout.md) | rvCSI — Crate Topology, the napi-c Shim, and the napi-rs Node Surface | Proposed |
---

View File

@ -0,0 +1,263 @@
//! Frame/window-level scalar features (ADR-095 FR4).
//!
//! These are deterministic, dependency-light feature extractors that turn
//! cleaned amplitude/quality series into the small scalar signals downstream
//! components (presence, breathing, confidence) expose. Anything labelled
//! "heuristic" is best-effort and is meant to be quality-gated by the caller.
use crate::stages::{mean, moving_average, std_dev};
/// Per-subcarrier RMS amplitude delta between two consecutive frames.
///
/// Defined as `||cur - prev||_2 / sqrt(n)`. Returns `0.0` if either slice is
/// empty or the lengths differ (a quiet zero rather than an error keeps the
/// streaming call sites simple).
pub fn motion_energy(prev_amplitude: &[f32], cur_amplitude: &[f32]) -> f32 {
if prev_amplitude.is_empty()
|| cur_amplitude.is_empty()
|| prev_amplitude.len() != cur_amplitude.len()
{
return 0.0;
}
let sum_sq: f32 = prev_amplitude
.iter()
.zip(cur_amplitude.iter())
.map(|(p, c)| {
let d = c - p;
d * d
})
.sum();
(sum_sq / prev_amplitude.len() as f32).sqrt()
}
/// Mean of [`motion_energy`] over every consecutive pair in the series.
///
/// Returns `0.0` if fewer than two amplitude vectors are supplied.
pub fn motion_energy_series(amplitudes: &[Vec<f32>]) -> f32 {
if amplitudes.len() < 2 {
return 0.0;
}
let mut acc = 0.0f32;
for w in amplitudes.windows(2) {
acc += motion_energy(&w[0], &w[1]);
}
acc / (amplitudes.len() - 1) as f32
}
/// Fixed logistic steepness for [`presence_score`].
const PRESENCE_STEEPNESS: f32 = 8.0;
/// Logistic squash of motion energy into a `[0, 1]` presence score.
///
/// Formula: `1 / (1 + exp(-(motion_energy - threshold) * k))` with a fixed
/// steepness `k = 8.0`. Monotone increasing in `motion_energy`, bounded to
/// `[0, 1]`, and exactly `0.5` when `motion_energy == threshold`.
pub fn presence_score(motion_energy: f32, threshold: f32) -> f32 {
let z = (motion_energy - threshold) * PRESENCE_STEEPNESS;
1.0 / (1.0 + (-z).exp())
}
/// Robust aggregate of per-frame quality scores in `[0, 1]`.
///
/// Computes `mean - 0.5 * std_dev` over the supplied per-frame quality scores
/// and clamps the result to `[0, 1]`. Returns `0.0` for an empty input. The
/// `-0.5*std` term penalizes windows whose quality is uneven.
pub fn confidence_score(quality_scores: &[f32]) -> f32 {
if quality_scores.is_empty() {
return 0.0;
}
(mean(quality_scores) - 0.5 * std_dev(quality_scores)).clamp(0.0, 1.0)
}
/// Minimum number of full periods of data required before [`breathing_band_estimate`]
/// will attempt anything.
const MIN_PERIODS: f32 = 2.0;
/// Low edge of the respiration band, Hz (~6 bpm).
const RESP_LO_HZ: f32 = 0.1;
/// High edge of the respiration band, Hz (~30 bpm).
const RESP_HI_HZ: f32 = 0.5;
/// Minimum normalized autocorrelation peak to accept an estimate.
const PEAK_THRESHOLD: f32 = 0.3;
/// Best-effort respiration-rate estimate, in **breaths per minute**.
///
/// Heuristic, FFT-free pipeline:
/// 1. detrend the series by subtracting a moving average,
/// 2. compute the biased autocorrelation for lags in the 0.10.5 Hz band
/// (630 bpm),
/// 3. if there is a clear dominant peak — its normalized autocorrelation
/// (peak / zero-lag) exceeds `~0.3` and it is a local maximum — return
/// `Some(60 * sample_rate_hz / best_lag)`, otherwise `None`.
///
/// Returns `None` unless there are at least two full periods of data at the
/// slowest band edge (so the caller need not pre-trim). This is **heuristic**
/// and is meant to be quality-gated by the caller; do not treat the result as
/// a medical-grade vital sign.
pub fn breathing_band_estimate(amplitude_series: &[f32], sample_rate_hz: f32) -> Option<f32> {
if sample_rate_hz <= 0.0 || amplitude_series.len() < 4 {
return None;
}
// Lag (in samples) bounds for the respiration band.
let min_lag = (sample_rate_hz / RESP_HI_HZ).floor() as usize;
let mut max_lag = (sample_rate_hz / RESP_LO_HZ).ceil() as usize;
if min_lag < 1 {
return None;
}
// Need at least MIN_PERIODS periods at the *fast* edge of the band before
// it is worth attempting anything (a shorter series cannot resolve even the
// quickest breathing rate). The slow edge is handled by clamping `max_lag`
// to half the series length below.
let needed = (MIN_PERIODS * sample_rate_hz / RESP_HI_HZ).ceil() as usize;
if amplitude_series.len() < needed.max(2 * min_lag) {
return None;
}
max_lag = max_lag.min(amplitude_series.len() / 2);
if max_lag <= min_lag {
return None;
}
// 1. Detrend: subtract a moving average whose window spans roughly one slow
// period (clamped to the series length) so the trend, not the
// oscillation, is removed.
let trend_window = ((sample_rate_hz / RESP_LO_HZ).round() as usize)
.max(3)
.min(amplitude_series.len());
let trend = moving_average(amplitude_series, trend_window);
let detrended: Vec<f32> = amplitude_series
.iter()
.zip(trend.iter())
.map(|(x, t)| x - t)
.collect();
// 2. Biased autocorrelation (divide by N for every lag).
let n = detrended.len() as f32;
let autocorr = |lag: usize| -> f32 {
let mut s = 0.0f32;
for i in lag..detrended.len() {
s += detrended[i] * detrended[i - lag];
}
s / n
};
let zero_lag = autocorr(0);
if zero_lag <= 0.0 {
return None;
}
// 3. Find the dominant local-max lag inside the band.
let mut best_lag = 0usize;
let mut best_val = f32::NEG_INFINITY;
for lag in min_lag..=max_lag {
let v = autocorr(lag);
if v > best_val {
best_val = v;
best_lag = lag;
}
}
if best_lag == 0 {
return None;
}
// Local maximum check (compare against immediate neighbours).
let left = autocorr(best_lag - 1);
let right = if best_lag < max_lag.min(detrended.len().saturating_sub(1)) {
autocorr(best_lag + 1)
} else {
f32::NEG_INFINITY
};
let is_local_max = best_val >= left && best_val >= right;
let normalized = best_val / zero_lag;
if !is_local_max || normalized < PEAK_THRESHOLD {
return None;
}
Some(60.0 * sample_rate_hz / best_lag as f32)
}
#[cfg(test)]
mod tests {
use super::*;
fn approx(a: f32, b: f32, eps: f32) {
assert!((a - b).abs() < eps, "{a} !~= {b} (eps {eps})");
}
#[test]
fn motion_energy_zero_for_identical() {
let a = vec![1.0, 2.0, 3.0];
approx(motion_energy(&a, &a), 0.0, 1e-6);
}
#[test]
fn motion_energy_positive_for_different() {
let a = vec![0.0, 0.0, 0.0];
let b = vec![1.0, 1.0, 1.0];
// diff all 1 -> sum_sq 3, /3 = 1, sqrt = 1
approx(motion_energy(&a, &b), 1.0, 1e-6);
}
#[test]
fn motion_energy_mismatch_or_empty_is_zero() {
approx(motion_energy(&[], &[1.0]), 0.0, 1e-6);
approx(motion_energy(&[1.0, 2.0], &[1.0]), 0.0, 1e-6);
}
#[test]
fn motion_energy_series_averages() {
// frames: [0,0],[1,1],[1,1] -> energies: 1.0, 0.0 -> mean 0.5
let frames = vec![vec![0.0, 0.0], vec![1.0, 1.0], vec![1.0, 1.0]];
approx(motion_energy_series(&frames), 0.5, 1e-6);
// fewer than 2 -> 0
approx(motion_energy_series(&[vec![1.0]]), 0.0, 1e-6);
approx(motion_energy_series(&[]), 0.0, 1e-6);
}
#[test]
fn presence_score_bounded_monotone_half_at_threshold() {
let t = 0.5;
approx(presence_score(t, t), 0.5, 1e-6);
let lo = presence_score(0.0, t);
let mid = presence_score(0.5, t);
let hi = presence_score(2.0, t);
assert!(lo < mid && mid < hi, "{lo} {mid} {hi}");
assert!((0.0..=1.0).contains(&lo));
assert!((0.0..=1.0).contains(&hi));
// very small / very large saturate
assert!(presence_score(-100.0, t) < 1e-3);
assert!(presence_score(100.0, t) > 1.0 - 1e-3);
}
#[test]
fn confidence_score_basic() {
approx(confidence_score(&[0.9, 0.9, 0.9]), 0.9, 1e-6); // std 0
approx(confidence_score(&[]), 0.0, 1e-6);
// uneven quality -> penalized below the mean
let c = confidence_score(&[0.2, 1.0, 0.6]);
assert!(c < 0.6, "{c}");
assert!((0.0..=1.0).contains(&c));
}
#[test]
fn breathing_estimate_detects_quarter_hz_sine() {
// 0.25 Hz sine (15 bpm) sampled at 10 Hz for 12 s -> 120 samples.
let fs = 10.0f32;
let n = 120usize;
let freq = 0.25f32;
let mut series = Vec::with_capacity(n);
// tiny deterministic "noise" via a fixed sequence
for i in 0..n {
let t = i as f32 / fs;
let noise = 0.02 * ((i as f32 * 1.7).sin());
series.push(1.0 + 0.5 * (2.0 * core::f32::consts::PI * freq * t).sin() + noise);
}
let bpm = breathing_band_estimate(&series, fs).expect("should detect a peak");
approx(bpm, 15.0, 3.0);
}
#[test]
fn breathing_estimate_none_for_short_or_noise() {
// too short
assert!(breathing_band_estimate(&[1.0, 2.0, 3.0], 10.0).is_none());
// a flat constant -> zero-lag autocorr 0 after detrend -> None
assert!(breathing_band_estimate(&vec![1.0; 200], 10.0).is_none());
// bad sample rate
assert!(breathing_band_estimate(&vec![1.0; 200], 0.0).is_none());
}
}

View File

@ -1,7 +1,52 @@
//! # rvCSI DSP (skeleton — implemented by the rvcsi-dsp swarm agent)
//! # rvCSI DSP — reusable signal-processing stages (ADR-095 FR4)
//!
//! Reusable signal-processing stages for the rvCSI runtime (ADR-095 FR4).
#![forbid(unsafe_code)]
//! `rvcsi-dsp` is the dependency-light DSP layer of the rvCSI edge RF sensing
//! runtime. It implements **FR4 of [ADR-095]** — *"reusable Rust
//! signal-processing stages"* — as a small library of deterministic primitives
//! plus a composable per-frame [`SignalPipeline`].
//!
//! The crate is split into three modules:
//!
//! * [`stages`] — pure per-vector DSP primitives operating on `&[f32]` /
//! `&mut [f32]`: [`mean`](stages::mean), [`variance`](stages::variance),
//! [`std_dev`](stages::std_dev), [`median`](stages::median),
//! [`remove_dc_offset`](stages::remove_dc_offset),
//! [`unwrap_phase`](stages::unwrap_phase),
//! [`moving_average`](stages::moving_average), [`ewma`](stages::ewma),
//! [`hampel_filter`](stages::hampel_filter) /
//! [`hampel_filter_count`](stages::hampel_filter_count),
//! [`short_window_variance`](stages::short_window_variance),
//! [`subtract_baseline`](stages::subtract_baseline). Failable stages report
//! [`DspError`](stages::DspError).
//! * [`features`] — frame/window-level scalar features:
//! [`motion_energy`](features::motion_energy) /
//! [`motion_energy_series`](features::motion_energy_series),
//! [`presence_score`](features::presence_score),
//! [`confidence_score`](features::confidence_score),
//! [`breathing_band_estimate`](features::breathing_band_estimate) (heuristic,
//! FFT-free, meant to be quality-gated by the caller).
//! * [`pipeline`] — the [`SignalPipeline`](pipeline::SignalPipeline): a tiny
//! configuration bag with a non-destructive `process_frame` step that cleans a
//! [`rvcsi_core::CsiFrame`]'s `amplitude` / `phase` vectors *after*
//! `rvcsi_core::validate_frame` has run, never touching validation state.
//!
//! Everything here is deterministic: the same input always produces the same
//! output. There are no heavy dependencies — the math is hand-rolled.
//!
//! [ADR-095]: ../../../docs/adr/ADR-095-rvcsi-edge-rf-sensing-platform.md
/// Placeholder so the crate compiles before the agent fills it in.
pub fn __rvcsi_dsp_placeholder() {}
#![forbid(unsafe_code)]
#![warn(missing_docs)]
pub mod features;
pub mod pipeline;
pub mod stages;
pub use features::{
breathing_band_estimate, confidence_score, motion_energy, motion_energy_series, presence_score,
};
pub use pipeline::SignalPipeline;
pub use stages::{
ewma, hampel_filter, hampel_filter_count, mean, median, moving_average, remove_dc_offset,
short_window_variance, std_dev, subtract_baseline, unwrap_phase, variance, DspError,
};

View File

@ -0,0 +1,322 @@
//! The composable [`SignalPipeline`] (ADR-095 FR4).
//!
//! A pipeline is a small bag of configuration plus a non-destructive
//! `process_frame` step that cleans a [`CsiFrame`]'s `amplitude` / `phase`
//! vectors *after* `rvcsi_core::validate_frame` has run. It deliberately never
//! mutates `validation`, `quality_score`, or `quality_reasons` — those belong to
//! the validation stage, and a DSP cleanup pass must not silently "upgrade" or
//! "downgrade" a frame's trust state.
use rvcsi_core::CsiFrame;
use crate::stages::{hampel_filter, moving_average, remove_dc_offset, unwrap_phase};
/// Configurable signal-cleaning pipeline applied per frame.
///
/// The processing order in [`SignalPipeline::process_frame`] is fixed:
/// 1. Hampel outlier filter on `amplitude`
/// 2. centered moving-average smoothing on `amplitude`
/// 3. DC-offset removal on `amplitude` (if [`remove_dc`](Self::remove_dc))
/// 4. baseline subtraction on `amplitude` (if a learned baseline of matching
/// length is present)
/// 5. phase unwrap on `phase` (if [`unwrap_phase`](Self::unwrap_phase))
#[derive(Debug, Clone, PartialEq)]
pub struct SignalPipeline {
/// Window length for the moving-average smoothing of amplitude
/// (`0`/`1` disables smoothing).
pub smoothing_window: usize,
/// Half-window for the Hampel outlier filter on amplitude.
pub hampel_half_window: usize,
/// Outlier threshold (in robust sigmas) for the Hampel filter.
pub hampel_n_sigmas: f32,
/// Whether to unwrap the phase vector.
pub unwrap_phase: bool,
/// Whether to subtract the DC offset (mean) from the amplitude vector.
pub remove_dc: bool,
/// Optional learned per-subcarrier baseline amplitude; subtracted from
/// `amplitude` when its length matches the frame's subcarrier count.
pub baseline_amplitude: Option<Vec<f32>>,
}
impl Default for SignalPipeline {
fn default() -> Self {
SignalPipeline {
smoothing_window: 3,
hampel_half_window: 3,
hampel_n_sigmas: 3.0,
unwrap_phase: true,
remove_dc: true,
baseline_amplitude: None,
}
}
}
impl SignalPipeline {
/// Construct a pipeline with the [default](Self::default) configuration.
pub fn new() -> Self {
Self::default()
}
/// Builder-style setter for [`smoothing_window`](Self::smoothing_window).
pub fn with_smoothing_window(mut self, window: usize) -> Self {
self.smoothing_window = window;
self
}
/// Builder-style setter for the Hampel half-window.
pub fn with_hampel_half_window(mut self, half_window: usize) -> Self {
self.hampel_half_window = half_window;
self
}
/// Builder-style setter for the Hampel sigma threshold.
pub fn with_hampel_n_sigmas(mut self, n_sigmas: f32) -> Self {
self.hampel_n_sigmas = n_sigmas;
self
}
/// Builder-style setter for [`unwrap_phase`](Self::unwrap_phase).
pub fn with_unwrap_phase(mut self, on: bool) -> Self {
self.unwrap_phase = on;
self
}
/// Builder-style setter for [`remove_dc`](Self::remove_dc).
pub fn with_remove_dc(mut self, on: bool) -> Self {
self.remove_dc = on;
self
}
/// Builder-style setter for an explicit baseline amplitude vector.
pub fn with_baseline_amplitude(mut self, baseline: Option<Vec<f32>>) -> Self {
self.baseline_amplitude = baseline;
self
}
/// Clean a frame's `amplitude` and `phase` vectors in place.
///
/// See the [type docs](SignalPipeline) for the fixed processing order. This
/// method does **not** read or write `frame.validation`,
/// `frame.quality_score`, or `frame.quality_reasons`, and is a no-op for a
/// frame with `subcarrier_count == 0`. The lengths of `amplitude` and
/// `phase` are preserved.
pub fn process_frame(&self, frame: &mut CsiFrame) {
if frame.subcarrier_count == 0 || frame.amplitude.is_empty() {
return;
}
// 1. Hampel outlier rejection on amplitude.
if self.hampel_half_window > 0 {
frame.amplitude =
hampel_filter(&frame.amplitude, self.hampel_half_window, self.hampel_n_sigmas);
}
// 2. Moving-average smoothing on amplitude.
if self.smoothing_window > 1 {
frame.amplitude = moving_average(&frame.amplitude, self.smoothing_window);
}
// 3. DC-offset removal on amplitude.
if self.remove_dc {
remove_dc_offset(&mut frame.amplitude);
}
// 4. Baseline subtraction (only when lengths match).
if let Some(baseline) = &self.baseline_amplitude {
if baseline.len() == frame.amplitude.len() {
for (a, b) in frame.amplitude.iter_mut().zip(baseline.iter()) {
*a -= *b;
}
}
}
// 5. Phase unwrap.
if self.unwrap_phase {
unwrap_phase(&mut frame.phase);
}
}
/// Learn a per-subcarrier baseline amplitude from a batch of frames.
///
/// Sets [`baseline_amplitude`](Self::baseline_amplitude) to the element-wise
/// mean amplitude over the supplied frames, considering only frames whose
/// `subcarrier_count` equals the first frame's and whose `amplitude` vector
/// is non-empty. A no-op when `frames` is empty (or yields no usable frame).
pub fn learn_baseline(&mut self, frames: &[CsiFrame]) {
let Some(first) = frames.iter().find(|f| !f.amplitude.is_empty()) else {
return;
};
let n = first.amplitude.len();
let reference_count = first.subcarrier_count;
let mut acc = vec![0.0f32; n];
let mut used = 0usize;
for f in frames {
if f.subcarrier_count != reference_count || f.amplitude.len() != n {
continue;
}
for (a, &v) in acc.iter_mut().zip(f.amplitude.iter()) {
*a += v;
}
used += 1;
}
if used == 0 {
return;
}
let used_f = used as f32;
for a in acc.iter_mut() {
*a /= used_f;
}
self.baseline_amplitude = Some(acc);
}
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::{AdapterKind, FrameId, SessionId, SourceId, ValidationStatus};
fn frame_with_amplitude(amp: Vec<f32>) -> CsiFrame {
let n = amp.len();
// Build a frame from I/Q so phase/amplitude are consistent, then
// overwrite amplitude with the test fixture.
let i: Vec<f32> = amp.clone();
let q: Vec<f32> = vec![0.0; n];
let mut f = CsiFrame::from_iq(
FrameId(1),
SessionId(1),
SourceId::from("pipe-test"),
AdapterKind::Synthetic,
10_000,
6,
20,
i,
q,
);
f.amplitude = amp;
f.phase = vec![0.0; n];
// Pretend validation already ran.
f.validation = ValidationStatus::Accepted;
f.quality_score = 0.77;
f.quality_reasons = vec!["fixture".to_string()];
f
}
#[test]
fn process_frame_removes_spike_and_preserves_validation() {
let mut f = frame_with_amplitude(vec![5.0, 5.0, 5.0, 200.0, 5.0, 5.0, 5.0]);
let n_before = f.amplitude.len();
let pipe = SignalPipeline::default();
pipe.process_frame(&mut f);
assert_eq!(f.amplitude.len(), n_before);
assert_eq!(f.phase.len(), n_before);
// The huge spike must be gone: after hampel+smoothing+DC removal the
// amplitude should be near zero everywhere (constant signal -> ~0 mean).
for v in &f.amplitude {
assert!(v.abs() < 1.0, "spike not removed, residual {v}");
}
// Validation state untouched.
assert_eq!(f.validation, ValidationStatus::Accepted);
assert!((f.quality_score - 0.77).abs() < 1e-6);
assert_eq!(f.quality_reasons, vec!["fixture".to_string()]);
}
#[test]
fn process_frame_is_noop_on_empty_frame() {
let mut f = CsiFrame::from_iq(
FrameId(2),
SessionId(1),
SourceId::from("empty"),
AdapterKind::Synthetic,
1,
6,
20,
Vec::new(),
Vec::new(),
);
f.validation = ValidationStatus::Degraded;
let pipe = SignalPipeline::default();
pipe.process_frame(&mut f);
assert!(f.amplitude.is_empty());
assert!(f.phase.is_empty());
assert_eq!(f.validation, ValidationStatus::Degraded);
}
#[test]
fn unwrap_phase_can_be_disabled() {
let mut f = frame_with_amplitude(vec![1.0, 1.0, 1.0, 1.0]);
f.phase = vec![0.0, 3.0, -3.0, 0.0];
let pipe = SignalPipeline::default()
.with_unwrap_phase(false)
.with_hampel_half_window(0)
.with_smoothing_window(0)
.with_remove_dc(false);
pipe.process_frame(&mut f);
// phase left exactly as-is
assert_eq!(f.phase, vec![0.0, 3.0, -3.0, 0.0]);
// amplitude untouched too
assert_eq!(f.amplitude, vec![1.0, 1.0, 1.0, 1.0]);
}
#[test]
fn learn_baseline_then_process_subtracts_it() {
// Three frames whose mean amplitude is [2, 4, 6, 8].
let frames = vec![
frame_with_amplitude(vec![1.0, 3.0, 5.0, 7.0]),
frame_with_amplitude(vec![2.0, 4.0, 6.0, 8.0]),
frame_with_amplitude(vec![3.0, 5.0, 7.0, 9.0]),
];
let mut pipe = SignalPipeline::default()
.with_hampel_half_window(0)
.with_smoothing_window(0);
pipe.learn_baseline(&frames);
assert_eq!(pipe.baseline_amplitude, Some(vec![2.0, 4.0, 6.0, 8.0]));
// Process a frame equal to the baseline. After DC removal (mean 5 ->
// [-3,-1,1,3]) then baseline subtraction ([-3-2,-1-4,1-6,3-8] =
// [-5,-5,-5,-5]) — the point is just that it's "small" and bounded.
let mut f = frame_with_amplitude(vec![2.0, 4.0, 6.0, 8.0]);
pipe.process_frame(&mut f);
assert_eq!(f.amplitude.len(), 4);
for v in &f.amplitude {
assert!(v.abs() < 10.0, "baseline-subtracted residual too large: {v}");
}
// With DC removal turned off, a frame equal to the baseline goes to
// exactly zero.
let mut pipe2 = pipe.clone();
pipe2.remove_dc = false;
let mut f2 = frame_with_amplitude(vec![2.0, 4.0, 6.0, 8.0]);
pipe2.process_frame(&mut f2);
for v in &f2.amplitude {
assert!(v.abs() < 1e-5, "expected ~0, got {v}");
}
}
#[test]
fn learn_baseline_ignores_mismatched_and_empty() {
let frames = vec![
frame_with_amplitude(vec![2.0, 2.0, 2.0]),
frame_with_amplitude(vec![1.0, 2.0]), // wrong length -> ignored
frame_with_amplitude(vec![4.0, 4.0, 4.0]),
];
let mut pipe = SignalPipeline::default();
pipe.learn_baseline(&frames);
assert_eq!(pipe.baseline_amplitude, Some(vec![3.0, 3.0, 3.0]));
// empty input -> no change
let mut pipe2 = SignalPipeline::default();
pipe2.learn_baseline(&[]);
assert_eq!(pipe2.baseline_amplitude, None);
}
#[test]
fn pipeline_is_deterministic() {
let make = || frame_with_amplitude(vec![5.0, 6.0, 7.0, 50.0, 7.0, 6.0, 5.0]);
let pipe = SignalPipeline::default();
let mut a = make();
let mut b = make();
pipe.process_frame(&mut a);
pipe.process_frame(&mut b);
assert_eq!(a.amplitude, b.amplitude);
assert_eq!(a.phase, b.phase);
}
}

View File

@ -0,0 +1,394 @@
//! Pure per-vector DSP primitives (ADR-095 FR4).
//!
//! Every function here is deterministic and operates on plain `&[f32]` /
//! `&mut [f32]` slices — no allocation-heavy dependencies, no hidden state.
//! Errors are reported via [`DspError`].
use core::f32::consts::PI;
use thiserror::Error;
/// Errors produced by DSP stages that can fail.
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum DspError {
/// Two slices that were required to be the same length were not.
#[error("length mismatch: {a} vs {b}")]
LengthMismatch {
/// Length of the first slice.
a: usize,
/// Length of the second slice.
b: usize,
},
/// An operation that requires at least one sample received an empty slice.
#[error("empty input")]
EmptyInput,
}
/// Arithmetic mean of the slice. Returns `0.0` for an empty slice.
pub fn mean(xs: &[f32]) -> f32 {
if xs.is_empty() {
0.0
} else {
xs.iter().sum::<f32>() / xs.len() as f32
}
}
/// Population variance (divides by `n`, not `n - 1`). Returns `0.0` for an
/// empty slice.
pub fn variance(xs: &[f32]) -> f32 {
if xs.is_empty() {
return 0.0;
}
let m = mean(xs);
xs.iter().map(|x| {
let d = x - m;
d * d
}).sum::<f32>()
/ xs.len() as f32
}
/// Population standard deviation. Returns `0.0` for an empty slice.
pub fn std_dev(xs: &[f32]) -> f32 {
variance(xs).sqrt()
}
/// Median of the slice (clones and sorts internally). Returns `0.0` for an
/// empty slice. For an even count, returns the average of the two central
/// values.
pub fn median(xs: &[f32]) -> f32 {
if xs.is_empty() {
return 0.0;
}
let mut v = xs.to_vec();
v.sort_by(|a, b| a.partial_cmp(b).unwrap_or(core::cmp::Ordering::Equal));
let n = v.len();
if n % 2 == 1 {
v[n / 2]
} else {
0.5 * (v[n / 2 - 1] + v[n / 2])
}
}
/// Subtract the mean of the slice from every element, in place.
pub fn remove_dc_offset(xs: &mut [f32]) {
let m = mean(xs);
for x in xs.iter_mut() {
*x -= m;
}
}
/// In-place 1-D phase unwrap.
///
/// Walks left→right; whenever the raw step `phase[i] - phase[i-1]` exceeds
/// `+PI` we accumulate a `-2*PI` correction, and whenever it is below `-PI`
/// we accumulate a `+2*PI` correction. The running correction is added to
/// every subsequent sample, producing a continuous series with no step larger
/// than `PI` in magnitude.
pub fn unwrap_phase(phase: &mut [f32]) {
if phase.len() < 2 {
return;
}
let mut correction = 0.0f32;
let mut prev_raw = phase[0];
// We read `phase[i]` and write `phase[i]` in the same step; an index loop
// is the clearest way to express that, hence the lint allowance.
#[allow(clippy::needless_range_loop)]
for i in 1..phase.len() {
let raw = phase[i];
let step = raw - prev_raw;
if step > PI {
correction -= 2.0 * PI;
} else if step < -PI {
correction += 2.0 * PI;
}
prev_raw = raw;
phase[i] = raw + correction;
}
}
/// Centered moving average with edge clamping (the window shrinks at the ends).
///
/// `window == 0 || window == 1` returns a plain copy. The result has the same
/// length as the input.
pub fn moving_average(xs: &[f32], window: usize) -> Vec<f32> {
if window <= 1 || xs.is_empty() {
return xs.to_vec();
}
let half = window / 2;
let n = xs.len();
let mut out = Vec::with_capacity(n);
for i in 0..n {
let lo = i.saturating_sub(half);
let hi = (i + half + 1).min(n);
let slice = &xs[lo..hi];
out.push(mean(slice));
}
out
}
/// Exponentially-weighted moving average.
///
/// `y[0] = x[0]`, `y[i] = alpha * x[i] + (1 - alpha) * y[i-1]`. `alpha` is
/// clamped to `(0.0, 1.0]` (values `<= 0` become a tiny positive epsilon,
/// values `> 1` become `1.0`). An empty input yields an empty output.
pub fn ewma(xs: &[f32], alpha: f32) -> Vec<f32> {
if xs.is_empty() {
return Vec::new();
}
let a = if alpha > 1.0 {
1.0
} else if alpha <= 0.0 {
f32::EPSILON
} else {
alpha
};
let mut out = Vec::with_capacity(xs.len());
let mut y = xs[0];
out.push(y);
for &x in &xs[1..] {
y = a * x + (1.0 - a) * y;
out.push(y);
}
out
}
/// Hampel outlier filter.
///
/// For each index `i`, take the window `[i - half_window, i + half_window]`
/// (clamped to the slice), compute the median `m` and
/// `MAD = 1.4826 * median(|x - m|)`. If `|x[i] - m| > n_sigmas * MAD`, the
/// sample is replaced with `m`; otherwise it is kept. Returns a new `Vec` of
/// the same length.
pub fn hampel_filter(xs: &[f32], half_window: usize, n_sigmas: f32) -> Vec<f32> {
hampel_filter_count(xs, half_window, n_sigmas).0
}
/// Like [`hampel_filter`] but also reports how many samples were replaced.
pub fn hampel_filter_count(xs: &[f32], half_window: usize, n_sigmas: f32) -> (Vec<f32>, usize) {
if xs.is_empty() {
return (Vec::new(), 0);
}
let n = xs.len();
let mut out = Vec::with_capacity(n);
let mut replaced = 0usize;
for i in 0..n {
let lo = i.saturating_sub(half_window);
let hi = (i + half_window + 1).min(n);
let window = &xs[lo..hi];
let m = median(window);
let deviations: Vec<f32> = window.iter().map(|x| (x - m).abs()).collect();
let mad = 1.4826 * median(&deviations);
// When `mad == 0` (a majority of the window is identical) the test
// `dev > n_sigmas * 0` reduces to `dev > 0`, i.e. any sample that
// differs from the window median is treated as an outlier — this is the
// standard degenerate-MAD behaviour for the Hampel identifier.
if (xs[i] - m).abs() > n_sigmas * mad {
out.push(m);
replaced += 1;
} else {
out.push(xs[i]);
}
}
(out, replaced)
}
/// Sliding population variance over a centered window with edge clamping.
///
/// `window <= 1` produces an all-zero series the same length as the input
/// (a single-sample window has zero variance). The result has the same length
/// as the input.
pub fn short_window_variance(xs: &[f32], window: usize) -> Vec<f32> {
let n = xs.len();
if n == 0 {
return Vec::new();
}
if window <= 1 {
return vec![0.0; n];
}
let half = window / 2;
let mut out = Vec::with_capacity(n);
for i in 0..n {
let lo = i.saturating_sub(half);
let hi = (i + half + 1).min(n);
out.push(variance(&xs[lo..hi]));
}
out
}
/// Elementwise `current - baseline`. Errors if the lengths differ.
pub fn subtract_baseline(current: &[f32], baseline: &[f32]) -> Result<Vec<f32>, DspError> {
if current.len() != baseline.len() {
return Err(DspError::LengthMismatch {
a: current.len(),
b: baseline.len(),
});
}
Ok(current
.iter()
.zip(baseline.iter())
.map(|(c, b)| c - b)
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
fn approx(a: f32, b: f32) {
assert!((a - b).abs() < 1e-5, "{a} !~= {b}");
}
#[test]
fn mean_variance_median_basic() {
let xs = [1.0, 2.0, 3.0, 4.0];
approx(mean(&xs), 2.5);
// population variance of 1..4: mean 2.5, devs^2 = 2.25,0.25,0.25,2.25 -> 5/4 = 1.25
approx(variance(&xs), 1.25);
approx(std_dev(&xs), 1.25f32.sqrt());
// even-count median: avg of 2 and 3
approx(median(&xs), 2.5);
approx(median(&[3.0, 1.0, 2.0]), 2.0);
}
#[test]
fn empty_inputs_are_zero() {
approx(mean(&[]), 0.0);
approx(variance(&[]), 0.0);
approx(std_dev(&[]), 0.0);
approx(median(&[]), 0.0);
}
#[test]
fn remove_dc_offset_centers() {
let mut xs = [1.0, 2.0, 3.0, 4.0];
remove_dc_offset(&mut xs);
approx(mean(&xs), 0.0);
approx(xs[0], -1.5);
approx(xs[3], 1.5);
}
#[test]
fn unwrap_phase_is_continuous() {
// raw: 0, 3, -3, 0. step 3->-3 is -6 < -PI so +2PI; etc.
let mut p = [0.0f32, 3.0, -3.0, 0.0];
unwrap_phase(&mut p);
for w in p.windows(2) {
assert!((w[1] - w[0]).abs() <= PI + 1e-5, "jump too big: {w:?}");
}
// first sample untouched
approx(p[0], 0.0);
}
#[test]
fn unwrap_phase_short_slices() {
let mut a: [f32; 0] = [];
unwrap_phase(&mut a);
let mut b = [1.23f32];
unwrap_phase(&mut b);
approx(b[0], 1.23);
}
#[test]
fn moving_average_window_three() {
// [1,2,3,4,5], window 3, half=1, edge clamp:
// i=0: [1,2] -> 1.5
// i=1: [1,2,3] -> 2
// i=2: [2,3,4] -> 3
// i=3: [3,4,5] -> 4
// i=4: [4,5] -> 4.5
let out = moving_average(&[1.0, 2.0, 3.0, 4.0, 5.0], 3);
assert_eq!(out.len(), 5);
approx(out[0], 1.5);
approx(out[1], 2.0);
approx(out[2], 3.0);
approx(out[3], 4.0);
approx(out[4], 4.5);
}
#[test]
fn moving_average_window_one_is_copy() {
let xs = [1.0, 2.0, 3.0];
assert_eq!(moving_average(&xs, 1), xs.to_vec());
assert_eq!(moving_average(&xs, 0), xs.to_vec());
}
#[test]
fn ewma_first_element_and_alpha_one() {
let xs = [2.0, 4.0, 8.0];
let out = ewma(&xs, 0.5);
approx(out[0], 2.0);
approx(out[1], 0.5 * 4.0 + 0.5 * 2.0); // 3.0
approx(out[2], 0.5 * 8.0 + 0.5 * 3.0); // 5.5
// alpha = 1.0 -> copy
assert_eq!(ewma(&xs, 1.0), xs.to_vec());
// clamped: alpha > 1 also a copy
assert_eq!(ewma(&xs, 5.0), xs.to_vec());
// empty
assert!(ewma(&[], 0.5).is_empty());
}
#[test]
fn hampel_replaces_spike() {
let xs = [1.0, 1.0, 1.0, 100.0, 1.0, 1.0, 1.0];
let (out, count) = hampel_filter_count(&xs, 3, 3.0);
approx(out[3], 1.0);
assert_eq!(count, 1);
// all other points unchanged
for i in [0, 1, 2, 4, 5, 6] {
approx(out[i], 1.0);
}
// hampel_filter agrees
assert_eq!(hampel_filter(&xs, 3, 3.0), out);
}
#[test]
fn hampel_clean_signal_unchanged() {
let xs = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0];
let (out, count) = hampel_filter_count(&xs, 2, 3.0);
assert_eq!(count, 0);
assert_eq!(out, xs.to_vec());
}
#[test]
fn hampel_empty() {
let (out, count) = hampel_filter_count(&[], 2, 3.0);
assert!(out.is_empty());
assert_eq!(count, 0);
}
#[test]
fn short_window_variance_constant_is_zero() {
let xs = [5.0; 8];
let out = short_window_variance(&xs, 3);
assert_eq!(out.len(), 8);
for v in out {
approx(v, 0.0);
}
// window 1 -> all zeros
let out2 = short_window_variance(&xs, 1);
assert_eq!(out2, vec![0.0; 8]);
assert!(short_window_variance(&[], 3).is_empty());
}
#[test]
fn short_window_variance_nonconstant() {
// [0, 0, 9], window 3, half 1:
// i=0: [0,0] var 0
// i=1: [0,0,9] mean 3, devs^2 9,9,36 -> 54/3 = 18
// i=2: [0,9] mean 4.5, devs^2 20.25,20.25 -> 40.5/2 = 20.25
let out = short_window_variance(&[0.0, 0.0, 9.0], 3);
approx(out[0], 0.0);
approx(out[1], 18.0);
approx(out[2], 20.25);
}
#[test]
fn subtract_baseline_works_and_errors() {
let c = [3.0, 5.0, 7.0];
let b = [1.0, 2.0, 3.0];
let out = subtract_baseline(&c, &b).unwrap();
assert_eq!(out, vec![2.0, 3.0, 4.0]);
let err = subtract_baseline(&c, &[1.0, 2.0]).unwrap_err();
assert_eq!(err, DspError::LengthMismatch { a: 3, b: 2 });
}
}