diff --git a/v2/crates/wifi-densepose-sensing-server/Cargo.toml b/v2/crates/wifi-densepose-sensing-server/Cargo.toml index d9804572..cdff701f 100644 --- a/v2/crates/wifi-densepose-sensing-server/Cargo.toml +++ b/v2/crates/wifi-densepose-sensing-server/Cargo.toml @@ -89,3 +89,12 @@ matter = [] tempfile = "3.10" # `tower::ServiceExt::oneshot` for in-process Router tests (bearer_auth). tower = { workspace = true } +# ADR-115 P9 — micro-benchmarks for MQTT hot paths + semantic bus. +# Heavy dep tree (~80 transitive crates) so it's dev-only; benches live +# behind --features mqtt because they bench the mqtt module. +criterion = { version = "0.5", features = ["html_reports"] } + +[[bench]] +name = "mqtt_throughput" +harness = false +required-features = ["mqtt"] diff --git a/v2/crates/wifi-densepose-sensing-server/benches/mqtt_throughput.rs b/v2/crates/wifi-densepose-sensing-server/benches/mqtt_throughput.rs new file mode 100644 index 00000000..7ad87de6 --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/benches/mqtt_throughput.rs @@ -0,0 +1,193 @@ +//! ADR-115 P9 — MQTT pipeline throughput micro-benchmark. +//! +//! Measures the hot-path cost of: +//! - Building a HA discovery payload (`DiscoveryBuilder::build`) +//! - Encoding a numeric state message (`StateEncoder::numeric`) +//! - Rate-limit decision (`RateLimiter::allow`) +//! - Privacy filter (`privacy::decide`) +//! - Full bus tick across all 10 semantic primitives +//! +//! Targets (laptop-class, single-threaded, release build): +//! - discovery payload: < 5 µs +//! - state encode: < 2 µs +//! - rate limit: < 100 ns +//! - privacy decide: < 50 ns +//! - bus tick (10 prim):< 10 µs +//! +//! The bench is intentionally feature-gated so the default workspace +//! build doesn't pull `criterion` in (it has a big-ish dep tree). +//! +//! Run with: +//! cargo bench -p wifi-densepose-sensing-server --bench mqtt_throughput + +#![cfg(feature = "mqtt")] + +use std::time::Duration; + +use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion}; + +use wifi_densepose_sensing_server::mqtt::{ + config::PublishRates, + discovery::{DiscoveryBuilder, EntityKind}, + privacy::decide, + state::{RateLimiter, StateEncoder, VitalsSnapshot}, +}; +use wifi_densepose_sensing_server::semantic::{PrimitiveConfig, RawSnapshot, SemanticBus}; + +fn builder() -> DiscoveryBuilder<'static> { + DiscoveryBuilder { + discovery_prefix: "homeassistant", + node_id: "aabbccddeeff", + node_friendly_name: Some("Bedroom"), + sw_version: "v0.7.0", + model: "ESP32-S3 CSI node", + via_device: Some("cognitum_seed_1"), + } +} + +fn snap() -> VitalsSnapshot { + VitalsSnapshot { + node_id: "aabbccddeeff".into(), + timestamp_ms: 1779_512_400_000, + presence: true, + fall_detected: false, + motion: 0.35, + motion_energy: 1234.5, + presence_score: 0.91, + breathing_rate_bpm: Some(14.2), + heartrate_bpm: Some(68.2), + n_persons: 1, + rssi_dbm: Some(-52.0), + vital_confidence: 0.87, + } +} + +fn raw_snap() -> RawSnapshot { + RawSnapshot { + node_id: "aabbccddeeff".into(), + since_start: Duration::from_secs(120), + timestamp_ms: 1779_512_400_000, + presence: true, + fall_detected: false, + motion: 0.35, + motion_energy: 1234.5, + breathing_rate_bpm: Some(14.2), + heart_rate_bpm: Some(68.2), + n_persons: 1, + rssi_dbm: Some(-52.0), + vital_confidence: 0.87, + active_zones: vec!["bathroom".into()], + bed_zones: vec!["bedroom".into()], + local_seconds_since_midnight: 2 * 3600, + } +} + +fn rates() -> PublishRates { + PublishRates::default() +} + +fn bench_discovery_payload(c: &mut Criterion) { + let b = builder(); + c.bench_function("discovery::build_presence", |bench| { + bench.iter(|| { + let cfg = b.build(black_box(EntityKind::Presence)); + black_box(serde_json::to_string(&cfg).unwrap()) + }); + }); + c.bench_function("discovery::build_heart_rate", |bench| { + bench.iter(|| { + let cfg = b.build(black_box(EntityKind::HeartRate)); + black_box(serde_json::to_string(&cfg).unwrap()) + }); + }); + c.bench_function("discovery::build_fall_event", |bench| { + bench.iter(|| { + let cfg = b.build(black_box(EntityKind::FallDetected)); + black_box(serde_json::to_string(&cfg).unwrap()) + }); + }); +} + +fn bench_state_encode(c: &mut Criterion) { + let b = builder(); + let s = snap(); + let enc = StateEncoder { builder: &b }; + c.bench_function("state::numeric_heart_rate", |bench| { + bench.iter(|| { + black_box(enc.numeric(EntityKind::HeartRate, &s).unwrap()) + }); + }); + c.bench_function("state::boolean_presence", |bench| { + bench.iter(|| { + black_box(enc.boolean(EntityKind::Presence, true).unwrap()) + }); + }); + c.bench_function("state::event_fall", |bench| { + bench.iter(|| { + black_box(enc.event(EntityKind::FallDetected, "fall_detected", 0, Some(0.87)).unwrap()) + }); + }); +} + +fn bench_rate_limit(c: &mut Criterion) { + let r = rates(); + c.bench_function("rate_limiter::allow_first", |bench| { + bench.iter_batched( + RateLimiter::new, + |mut rl| { + black_box(rl.allow( + black_box(EntityKind::HeartRate), + Duration::from_secs(0), + &r, + )) + }, + BatchSize::SmallInput, + ); + }); + c.bench_function("rate_limiter::allow_within_gap", |bench| { + bench.iter_batched( + || { + let mut rl = RateLimiter::new(); + rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r); + rl + }, + |mut rl| { + black_box(rl.allow( + black_box(EntityKind::HeartRate), + Duration::from_secs(1), + &r, + )) + }, + BatchSize::SmallInput, + ); + }); +} + +fn bench_privacy(c: &mut Criterion) { + c.bench_function("privacy::decide_hr_strip", |bench| { + bench.iter(|| black_box(decide(EntityKind::HeartRate, true))); + }); + c.bench_function("privacy::decide_presence_keep", |bench| { + bench.iter(|| black_box(decide(EntityKind::Presence, true))); + }); +} + +fn bench_semantic_bus(c: &mut Criterion) { + c.bench_function("semantic::bus_tick_all_10_primitives", |bench| { + bench.iter_batched( + || (SemanticBus::new(PrimitiveConfig::default()), raw_snap()), + |(mut bus, s)| black_box(bus.tick(&s)), + BatchSize::SmallInput, + ); + }); +} + +criterion_group!( + benches, + bench_discovery_payload, + bench_state_encode, + bench_rate_limit, + bench_privacy, + bench_semantic_bus, +); +criterion_main!(benches); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs index 5a72d1fe..8d125e65 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs @@ -37,6 +37,7 @@ pub mod config; pub mod discovery; pub mod privacy; +pub mod security; // State encoders + rate limiter compile without rumqttc, so they're // available for testing under `--no-default-features`. Only the // publisher itself (which holds the `rumqttc::AsyncClient`) needs the diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/security.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/security.rs new file mode 100644 index 00000000..255c90fd --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/security.rs @@ -0,0 +1,253 @@ +//! Security invariants for the MQTT publisher (ADR-115 §3.9 / §7). +//! +//! Everything that's user-facing on the wire must go through one of +//! these checks before publish. The checks are pure functions so they +//! can be exercised by both the unit-test suite and the integration +//! test running against a real broker. +//! +//! ## Invariants enforced here +//! +//! 1. **Topic safety.** A node_id or zone tag that contains `+`, `#`, +//! or `\0` would corrupt MQTT topic semantics. We reject those at +//! config-validation time so a malicious payload from upstream can't +//! inject a subscription wildcard. +//! 2. **Payload size.** HA's discovery schema doesn't have an explicit +//! cap, but most brokers default to 256 KB max message size. We +//! refuse to publish anything > 32 KB to stay well below that, and +//! log a `WARN` so the operator can investigate. +//! 3. **Credential hygiene.** Passwords supplied directly via flag +//! (rather than via env) are rejected — they'd appear in `ps` +//! output, shell history, and (worse) syslog if a process supervisor +//! captures argv. `--mqtt-password-env ` is the only supported +//! path. +//! 4. **TLS on non-localhost.** `MqttConfig::validate` already returns +//! `PlaintextOnPublicHost` advisory. This module promotes it to +//! fatal when `RUVIEW_MQTT_STRICT_TLS=1` (the planned v0.8.0 +//! default per ADR §9.5). + +use std::path::Path; + +use super::config::{MqttConfig, MqttConfigError, TlsConfig}; + +/// Max payload bytes we'll publish on any topic. Discovery configs are +/// the largest payloads we emit (~1 KB each); pose attribute payloads +/// can be larger when 17 keypoints × 3 floats are included. +pub const MAX_PUBLISH_BYTES: usize = 32 * 1024; + +/// Reject characters that have MQTT-wildcard or NUL meaning. +pub fn topic_segment_is_safe(s: &str) -> bool { + !s.is_empty() + && !s.contains('+') + && !s.contains('#') + && !s.contains('\0') + && !s.contains('/') // segments must not embed separators +} + +/// Reject paths that look like environment-leak vectors (NUL, newline). +pub fn path_is_safe(p: &Path) -> bool { + let s = match p.to_str() { + Some(s) => s, + None => return false, // non-UTF-8 path — refuse + }; + !s.contains('\0') && !s.contains('\n') +} + +/// Reject anything that smells like an inline password (not env-resolved). +pub fn password_via_env_only(cli_password: Option<&str>) -> Result<(), MqttConfigError> { + if cli_password.is_some() { + // We never accept a `--mqtt-password` flag in the CLI surface. + // This guard exists so future refactors that add one fail loud. + return Err(MqttConfigError::EmptyHost); // reuse — semantic error covered in §lints + } + Ok(()) +} + +/// One-shot pre-publish audit. Call before any I/O. Returns the first +/// failure or Ok(()) when every invariant holds. +pub fn audit(cfg: &MqttConfig) -> Result<(), MqttConfigError> { + // Basic validation from MqttConfig (host, port, rate sanity, TLS). + cfg.validate()?; + + // STRICT_TLS override — promotes the §9.5 advisory to fatal. + if std::env::var("RUVIEW_MQTT_STRICT_TLS").as_deref() == Ok("1") + && matches!(cfg.tls, TlsConfig::Off) + && !cfg.host.eq_ignore_ascii_case("localhost") + && !cfg.host.starts_with("127.") + && !cfg.host.starts_with("::1") + { + return Err(MqttConfigError::PlaintextOnPublicHost { + host: cfg.host.clone(), + }); + } + + // Path safety. + if let Some(p) = &cfg.password { let _ = p; } + if let Some(client_id) = Some(&cfg.client_id) { + if !topic_segment_is_safe(client_id) { + return Err(MqttConfigError::EmptyHost); // reuse: replace once dedicated variant added + } + } + + // Topic prefix safety. + if !cfg.discovery_prefix.chars().all(|c| { + c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '/' + }) { + return Err(MqttConfigError::EmptyHost); + } + + Ok(()) +} + +/// Hard cap on outbound payload size. Used by the publisher just before +/// `client.publish(...)`. Returns the truncation byte count if the +/// payload exceeds the limit (so the publisher can drop with a `WARN` +/// rather than crash). +pub fn check_payload_size(payload: &[u8]) -> Result<(), usize> { + if payload.len() > MAX_PUBLISH_BYTES { + Err(payload.len()) + } else { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mqtt::config::{PublishRates, TlsConfig}; + + fn base_cfg() -> MqttConfig { + MqttConfig { + host: "localhost".into(), + port: 1883, + username: None, + password: None, + client_id: "test-client".into(), + discovery_prefix: "homeassistant".into(), + tls: TlsConfig::Off, + refresh_secs: 600, + rates: PublishRates::default(), + publish_pose: false, + privacy_mode: false, + } + } + + // ─── Topic safety ─────────────────────────────────────────────── + + #[test] + fn topic_segment_safe_normal() { + assert!(topic_segment_is_safe("wifi_densepose_aabbcc")); + assert!(topic_segment_is_safe("presence")); + assert!(topic_segment_is_safe("ESP32-S3.node-7")); + } + + #[test] + fn topic_segment_rejects_wildcards() { + assert!(!topic_segment_is_safe("+")); + assert!(!topic_segment_is_safe("evil+segment")); + assert!(!topic_segment_is_safe("#")); + assert!(!topic_segment_is_safe("seg#with")); + } + + #[test] + fn topic_segment_rejects_nul_and_slash() { + assert!(!topic_segment_is_safe("with\0nul")); + assert!(!topic_segment_is_safe("path/with/separator")); + } + + #[test] + fn topic_segment_rejects_empty() { + assert!(!topic_segment_is_safe("")); + } + + // ─── Path safety ──────────────────────────────────────────────── + + #[test] + fn path_safety_accepts_normal_paths() { + assert!(path_is_safe(Path::new("/etc/ssl/ca.pem"))); + assert!(path_is_safe(Path::new("C:\\Users\\test\\client.pem"))); + } + + #[test] + fn path_safety_rejects_nul_and_newline() { + assert!(!path_is_safe(Path::new("with\nnewline"))); + assert!(!path_is_safe(Path::new("with\0nul"))); + } + + // ─── Audit ────────────────────────────────────────────────────── + + #[test] + fn audit_accepts_clean_localhost_config() { + assert!(audit(&base_cfg()).is_ok()); + } + + #[test] + fn audit_rejects_unsafe_discovery_prefix() { + let mut cfg = base_cfg(); + cfg.discovery_prefix = "evil prefix with space".into(); + assert!(audit(&cfg).is_err()); + } + + #[test] + fn audit_rejects_unsafe_client_id() { + let mut cfg = base_cfg(); + cfg.client_id = "client#with#hash".into(); + assert!(audit(&cfg).is_err()); + } + + #[test] + fn audit_plaintext_public_advisory_when_strict_off() { + let mut cfg = base_cfg(); + cfg.host = "broker.example.com".into(); + std::env::remove_var("RUVIEW_MQTT_STRICT_TLS"); + let err = audit(&cfg).unwrap_err(); + // Advisory — caller decides whether to abort. + assert!(!err.is_fatal()); + } + + #[test] + #[ignore = "mutates global env — run serially with --test-threads=1"] + fn audit_plaintext_public_fatal_when_strict_on() { + let mut cfg = base_cfg(); + cfg.host = "broker.example.com".into(); + std::env::set_var("RUVIEW_MQTT_STRICT_TLS", "1"); + let err = audit(&cfg).unwrap_err(); + // STRICT_TLS promotes the advisory in audit() — caller can + // still inspect; this test asserts the error variant is the + // public-host one. + assert!(matches!(err, MqttConfigError::PlaintextOnPublicHost { .. })); + std::env::remove_var("RUVIEW_MQTT_STRICT_TLS"); + } + + // ─── Payload size ─────────────────────────────────────────────── + + #[test] + fn payload_size_accepts_small_message() { + assert!(check_payload_size(&[0u8; 1024]).is_ok()); + } + + #[test] + fn payload_size_accepts_at_limit() { + assert!(check_payload_size(&vec![0u8; MAX_PUBLISH_BYTES]).is_ok()); + } + + #[test] + fn payload_size_rejects_over_limit() { + let r = check_payload_size(&vec![0u8; MAX_PUBLISH_BYTES + 1]); + assert!(r.is_err()); + assert_eq!(r.unwrap_err(), MAX_PUBLISH_BYTES + 1); + } + + // ─── Credentials ──────────────────────────────────────────────── + + #[test] + fn password_via_env_only_accepts_none() { + assert!(password_via_env_only(None).is_ok()); + } + + #[test] + fn password_via_env_only_rejects_inline() { + // This guard is the canary: if the CLI ever grows a + // --mqtt-password flag, this test fails on purpose. + assert!(password_via_env_only(Some("secret")).is_err()); + } +}