diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 722ab4fd..ae9c4529 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -349,6 +349,21 @@ fn amp_baseline_override_init() -> &'static Mutex> = OnceLock::new(); +fn baseline_last_written_init() -> &'static Mutex { + BASELINE_LAST_WRITTEN.get_or_init(|| Mutex::new(std::time::UNIX_EPOCH)) +} + +/// ADR-107: in-progress calibration state for the REST endpoint. +/// 'idle' | 'running' | 'complete' | 'error: …' +static BASELINE_CALIBRATION_STATUS: OnceLock> = OnceLock::new(); +fn baseline_calib_status_init() -> &'static Mutex { + BASELINE_CALIBRATION_STATUS.get_or_init(|| Mutex::new("idle".to_string())) +} + /// Load persistent baseline from JSON file. Tolerant: missing file or /// parse errors are non-fatal (server falls back to rolling p95). fn load_baseline_file(path: &str) { @@ -408,6 +423,12 @@ fn load_baseline_file(path: &str) { let mut o = amp_baseline_cv_init().lock().unwrap(); for (id, cv) in &loaded_cv { o.insert(*id, *cv); } } + // ADR-107: track when the baseline file was last loaded/written so + // the auto-recalibrator and REST endpoint can stage cool-downs. + { + let mut t = baseline_last_written_init().lock().unwrap(); + *t = std::time::SystemTime::now(); + } let summary: Vec = loaded.iter().map(|(id, b)| format!("node{id}={b:.2}")).collect(); let cv_summary: Vec = loaded_cv.iter() .map(|(id, cv)| format!("node{id}_cv={:.2}%", cv * 100.0)).collect(); @@ -769,6 +790,18 @@ struct Args { #[arg(long, default_value = "20")] csi_keepalive_pps: u32, + /// ADR-107: auto-recalibrate baseline in background when the room + /// has been `absent` and quiet for N seconds. Set to 0 to disable. + /// Default 1800 = 30 min — long enough that someone occasionally + /// in the room won't trigger spurious recalibrations. + #[arg(long, default_value = "1800")] + auto_recalibrate_quiet_sec: f64, + + /// ADR-107: cool-down (seconds) between auto-recalibration writes. + /// Default 3600 = at most once per hour. + #[arg(long, default_value = "3600")] + auto_recalibrate_min_age_sec: f64, + /// Path to UI static files #[arg(long, default_value = "../../ui")] ui_path: PathBuf, @@ -3674,6 +3707,82 @@ async fn pose_zones_summary(State(state): State) -> Json Json { + let overrides: Vec<(u8, f64)> = { + let m = amp_baseline_override_init().lock().unwrap(); + m.iter().map(|(k,v)| (*k, *v)).collect() + }; + let cvs: Vec<(u8, f64)> = { + let m = amp_baseline_cv_init().lock().unwrap(); + m.iter().map(|(k,v)| (*k, *v)).collect() + }; + let last_written_secs = { + let t = baseline_last_written_init().lock().unwrap(); + t.elapsed().map(|d| d.as_secs() as i64).unwrap_or(-1) + }; + let status = baseline_calib_status_init().lock().unwrap().clone(); + let mut nodes = serde_json::Map::new(); + for (id, b) in overrides { + let cv = cvs.iter().find(|(i,_)| *i == id).map(|(_,c)| *c * 100.0).unwrap_or(0.0); + nodes.insert(id.to_string(), serde_json::json!({ + "full_broadband_p95": b, + "full_broadband_cv_pct": cv, + })); + } + Json(serde_json::json!({ + "nodes": nodes, + "last_written_sec_ago": last_written_secs, + "calibration_status": status, + })) +} + +/// ADR-107: POST /api/v1/baseline/calibrate — kick off a background +/// capture. Body (optional JSON): { "duration_sec": 90, "trim_sec": 15, +/// "clean_window_sec": 30, "out": "data/baseline.json" }. Returns +/// immediately with status; client polls GET /api/v1/baseline to see +/// calibration_status transition idle → running → complete | error: … +async fn baseline_calibrate(body: Option>) -> Json { + let cfg = body.map(|j| j.0).unwrap_or_else(|| serde_json::json!({})); + let duration = cfg.get("duration_sec").and_then(|v| v.as_f64()).unwrap_or(90.0); + let trim = cfg.get("trim_sec").and_then(|v| v.as_f64()).unwrap_or(15.0); + let win = cfg.get("clean_window_sec").and_then(|v| v.as_f64()).unwrap_or(30.0); + let out = cfg.get("out").and_then(|v| v.as_str()) + .unwrap_or("data/baseline.json").to_string(); + + { + let mut s = baseline_calib_status_init().lock().unwrap(); + if s.starts_with("running") { + return Json(serde_json::json!({ + "started": false, + "reason": "calibration already running", + "status": *s, + })); + } + *s = "running".to_string(); + } + + let out_for_task = out.clone(); + tokio::spawn(async move { + let res = capture_baseline_to_disk(duration, trim, win, &out_for_task).await; + let mut s = baseline_calib_status_init().lock().unwrap(); + *s = match res { + Ok(_) => "complete".to_string(), + Err(e) => format!("error: {e}"), + }; + }); + + Json(serde_json::json!({ + "started": true, + "duration_sec": duration, + "trim_sec": trim, + "clean_window_sec": win, + "out": out, + "hint": "operator must step out of the room within ~5 seconds; poll GET /api/v1/baseline for status", + })) +} + async fn stream_status(State(state): State) -> Json { let s = state.read().await; Json(serde_json::json!({ @@ -4546,6 +4655,235 @@ async fn csi_keepalive_task(pps: u32) { } } +/// ADR-107: capture an empty-room baseline from the live WS stream +/// and persist it to disk. Mirrors what `scripts/record-baseline.py` +/// does, but runs in-process so the REST endpoint and the auto- +/// recalibrator can both call it. +/// +/// Records `duration_sec` of frames, trims `trim_sec` from head and +/// tail, finds the lowest-CV sub-window, computes per-node FULL- +/// broadband mean / median / p95 / std / CV %, writes +/// `data/baseline.json` and reloads it live. +async fn capture_baseline_to_disk( + duration_sec: f64, + trim_sec: f64, + clean_window_sec: f64, + out_path: &str, +) -> Result { + use std::time::{Instant, SystemTime, UNIX_EPOCH}; + let mut by_node: std::collections::HashMap, f64)>> = + std::collections::HashMap::new(); + + // Read off the broadcast channel directly via subscribing to a WS + // stream loop. We share the same tx that broadcasts JSON; just + // subscribe and parse. + let mut rx = BASELINE_BUS.get().ok_or("baseline bus not initialised yet")? + .subscribe(); + + let start = Instant::now(); + while start.elapsed().as_secs_f64() < duration_sec { + match tokio::time::timeout( + std::time::Duration::from_secs(1), + rx.recv() + ).await { + Ok(Ok(json)) => { + let d: serde_json::Value = match serde_json::from_str(&json) { + Ok(v) => v, Err(_) => continue, + }; + if d.get("type").and_then(|v| v.as_str()) != Some("sensing_update") { + continue; + } + let t = start.elapsed().as_secs_f64(); + if let Some(arr) = d.get("nodes").and_then(|v| v.as_array()) { + for n in arr { + let nid = match n.get("node_id").and_then(|v| v.as_u64()) { + Some(x) => x as u8, None => continue, + }; + let amps: Vec = n.get("amplitude") + .and_then(|v| v.as_array()) + .map(|a| a.iter().filter_map(|x| x.as_f64()).collect()) + .unwrap_or_default(); + if amps.is_empty() { continue; } + let rssi = n.get("rssi_dbm").and_then(|v| v.as_f64()).unwrap_or(0.0); + by_node.entry(nid).or_default().push((t, amps, rssi)); + } + } + } + _ => continue, + } + } + + if by_node.is_empty() { + return Err("no per-node frames captured during the window".into()); + } + + // Per-node trim + cleanest sub-window selection. + let mut nodes_out = serde_json::Map::new(); + for (nid, frames) in &by_node { + if frames.is_empty() { continue; } + let t0 = frames.first().unwrap().0; + let t1 = frames.last().unwrap().0; + let dur = t1 - t0; + let (head, tail) = if dur < trim_sec * 2.0 + clean_window_sec / 2.0 { + (dur / 6.0, dur / 6.0) + } else { (trim_sec, trim_sec) }; + let trimmed: Vec<&(f64, Vec, f64)> = frames.iter() + .filter(|f| f.0 >= t0 + head && f.0 <= t1 - tail).collect(); + if trimmed.is_empty() { continue; } + + let full_mean = |amps: &[f64]| { + let v: Vec = amps.iter().copied().filter(|x| *x > 0.0).collect(); + if v.is_empty() { 0.0 } else { v.iter().sum::() / v.len() as f64 } + }; + + // Scan windows for lowest-CV chunk. + let win = clean_window_sec; + let chunk: Vec<&&(f64, Vec, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win { + trimmed.iter().collect() + } else { + let mut best: Option<(f64, Vec<&&(f64, Vec, f64)>)> = None; + let step = 5.0; + let mut cursor = trimmed.first().unwrap().0; + while cursor + win <= trimmed.last().unwrap().0 { + let w: Vec<&&(f64, Vec, f64)> = trimmed.iter() + .filter(|f| f.0 >= cursor && f.0 <= cursor + win).collect(); + if w.len() >= 5 { + let bms: Vec = w.iter().map(|f| full_mean(&f.1)).collect(); + let mu: f64 = bms.iter().sum::() / bms.len() as f64; + if mu > 0.0 { + let var: f64 = bms.iter().map(|x| (x-mu).powi(2)).sum::() / bms.len() as f64; + let cv = var.sqrt() / mu; + if best.as_ref().map_or(true, |b| cv < b.0) { + best = Some((cv, w)); + } + } + } + cursor += step; + } + match best { Some((_, w)) => w, None => trimmed.iter().collect() } + }; + + let bms: Vec = chunk.iter().map(|f| full_mean(&f.1)).collect(); + let mean = bms.iter().sum::() / bms.len() as f64; + let var = bms.iter().map(|x| (x-mean).powi(2)).sum::() / bms.len() as f64; + let std = var.sqrt(); + let cv = if mean > 0.0 { std / mean } else { 0.0 }; + let mut sorted_bms = bms.clone(); + sorted_bms.sort_by(|a,b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let p50 = sorted_bms[sorted_bms.len() / 2]; + let p95 = sorted_bms[(sorted_bms.len() as f64 * 0.95) as usize]; + let rssis: Vec = chunk.iter().map(|f| f.2).filter(|x| *x != 0.0).collect(); + let rssi_mean = if rssis.is_empty() { 0.0 } else { rssis.iter().sum::() / rssis.len() as f64 }; + + nodes_out.insert(nid.to_string(), serde_json::json!({ + "full_broadband_mean": mean, + "full_broadband_p50": p50, + "full_broadband_p95": p95, + "full_broadband_std": std, + "full_broadband_cv_pct": cv * 100.0, + "rssi_dbm": rssi_mean, + "n_samples": chunk.len(), + })); + } + + if nodes_out.is_empty() { + return Err("trimming yielded zero usable windows".into()); + } + + let payload = serde_json::json!({ + "version": 2, + "captured_at": chrono::Utc::now().to_rfc3339(), + "duration_sec": duration_sec, + "trim_head_sec": trim_sec, + "trim_tail_sec": trim_sec, + "clean_window_sec": clean_window_sec, + "method": "in-process (ADR-107): record → trim → lowest-CV sub-window → FULL-broadband stats", + "nodes": nodes_out, + }); + + if let Some(parent) = std::path::Path::new(out_path).parent() { + let _ = std::fs::create_dir_all(parent); + } + std::fs::write(out_path, serde_json::to_string_pretty(&payload).map_err(|e| e.to_string())?) + .map_err(|e| format!("write {out_path}: {e}"))?; + + // Hot-reload override map without restart. + load_baseline_file(out_path); + + { + let mut t = baseline_last_written_init().lock().unwrap(); + *t = SystemTime::now(); + } + + let _ = UNIX_EPOCH; + Ok(payload) +} + +/// ADR-107: subscribed broadcast handle of the WS JSON stream so +/// capture_baseline_to_disk and the auto-recalibrator can read live +/// frames without re-binding the UDP socket. +static BASELINE_BUS: OnceLock> = OnceLock::new(); + +/// ADR-107: background task — when the classifier reports `absent` and +/// CV stays low for `quiet_window_sec`, run a baseline capture in the +/// background. Cool-down `min_age_sec` between writes so we don't loop. +async fn auto_recalibrate_task( + state: SharedState, + enabled: bool, + quiet_window_sec: f64, + min_age_sec: f64, + capture_dur_sec: f64, +) { + if !enabled { + info!("Auto-recalibrate disabled (--auto-recalibrate 0)"); + return; + } + info!("Auto-recalibrate enabled: trigger after {quiet_window_sec:.0}s of `absent`+low-CV, min {min_age_sec:.0}s between writes"); + let mut quiet_since: Option = None; + let mut tick = tokio::time::interval(std::time::Duration::from_secs(5)); + loop { + tick.tick().await; + let (level, cv) = { + let s = state.read().await; + match &s.latest_update { + Some(u) => (u.classification.motion_level.clone(), u.classification.confidence), + None => continue, + } + }; + let quiet = level == "absent" && cv < 0.08; + if !quiet { quiet_since = None; continue; } + let started = quiet_since.get_or_insert_with(std::time::Instant::now); + if started.elapsed().as_secs_f64() < quiet_window_sec { continue; } + + // Cool-down vs last write + let age_sec = { + let t = baseline_last_written_init().lock().unwrap(); + t.elapsed().map(|d| d.as_secs_f64()).unwrap_or(f64::INFINITY) + }; + if age_sec < min_age_sec { continue; } + + info!("auto-recalibrate: room quiet for {:.0}s, refreshing baseline...", started.elapsed().as_secs_f64()); + { + let mut s = baseline_calib_status_init().lock().unwrap(); + *s = "running (auto)".to_string(); + } + let path = std::env::var("RUVIEW_BASELINE_FILE").unwrap_or_else(|_| "data/baseline.json".into()); + match capture_baseline_to_disk(capture_dur_sec, 5.0, capture_dur_sec * 0.5, &path).await { + Ok(_) => { + info!("auto-recalibrate: saved new baseline to {path}"); + let mut s = baseline_calib_status_init().lock().unwrap(); + *s = "complete (auto)".to_string(); + } + Err(e) => { + error!("auto-recalibrate: capture failed: {e}"); + let mut s = baseline_calib_status_init().lock().unwrap(); + *s = format!("error (auto): {e}"); + } + } + quiet_since = None; + } +} + async fn udp_receiver_task(state: SharedState, udp_port: u16) { let addr = format!("0.0.0.0:{udp_port}"); let socket = match UdpSocket::bind(&addr).await { @@ -5910,6 +6248,28 @@ async fn main() { }, })); + // ADR-107: initialise the baseline broadcast bus — capture + // baseline reads from this. We forward every JSON message broadcast + // on the WS into the bus so the in-process capture stays decoupled + // from individual WS clients. + { + let (tx, _rx) = tokio::sync::broadcast::channel::(256); + let _ = BASELINE_BUS.set(tx); + } + { + // Forwarder: subscribe to AppState.tx, push each message into + // BASELINE_BUS. Decouples baseline capture from the live WS + // clients (no client subscribing to the bus when no calibration + // is running). + let mut rx_from_state = state.read().await.tx.subscribe(); + let bus_tx = BASELINE_BUS.get().unwrap().clone(); + tokio::spawn(async move { + while let Ok(msg) = rx_from_state.recv().await { + let _ = bus_tx.send(msg); + } + }); + } + // Start background tasks based on source match source { "esp32" => { @@ -5917,6 +6277,14 @@ async fn main() { // ADR-106: drive CSI rate by pinging sensors back ourselves // instead of relying on the operator's ad-hoc `ping -i 0.05 …`. tokio::spawn(csi_keepalive_task(args.csi_keepalive_pps)); + // ADR-107: auto-recalibrate baseline silently when room is quiet. + tokio::spawn(auto_recalibrate_task( + state.clone(), + args.auto_recalibrate_quiet_sec > 0.0, + args.auto_recalibrate_quiet_sec, + args.auto_recalibrate_min_age_sec, + 90.0, // capture window + )); tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms)); } "wifi" => { @@ -6002,6 +6370,9 @@ async fn main() { .route("/api/v1/pose/current", get(pose_current)) .route("/api/v1/pose/stats", get(pose_stats)) .route("/api/v1/pose/zones/summary", get(pose_zones_summary)) + // ADR-107: baseline calibration REST. + .route("/api/v1/baseline", get(baseline_get)) + .route("/api/v1/baseline/calibrate", axum::routing::post(baseline_calibrate)) // Stream endpoints .route("/api/v1/stream/status", get(stream_status)) .route("/api/v1/stream/pose", get(ws_pose_handler))