feat(adr-107): REST /api/v1/baseline/* + auto-recalibrate in background

Eliminates the manual `scripts/record-baseline.py` ritual:

REST endpoints
  GET  /api/v1/baseline             — current per-node baseline +
                                       last_written_sec_ago + calibration_status
  POST /api/v1/baseline/calibrate   — start a background capture, optional
                                       JSON body { duration_sec, trim_sec,
                                       clean_window_sec, out }. Returns
                                       immediately; status transitions
                                       idle → running → complete | error: ...

Auto-recalibrate background task
  Watches the live classifier. When motion_level=="absent" and CV<0.08 for
  --auto-recalibrate-quiet-sec (default 1800 = 30 min) AND the last write
  is older than --auto-recalibrate-min-age-sec (default 3600 = 1h),
  silently re-runs the capture and live-reloads the override map. No
  operator action needed.

Implementation
  capture_baseline_to_disk()  — in-process port of record-baseline.py:
                                trim head/tail, scan windows for lowest-
                                CV chunk, compute full-broadband stats,
                                write baseline.json, hot-reload override.
  BASELINE_BUS                — broadcast bus carrying every sensing_update
                                JSON so the capture can read live frames
                                without re-binding any sockets.
  BASELINE_LAST_WRITTEN       — SystemTime tracker for the cool-down.
  BASELINE_CALIBRATION_STATUS — status string for the REST endpoint.

Verified live: POST /api/v1/baseline/calibrate (5 s test window) ->
capture wrote `/tmp/test_baseline.json` with n_samples=86 per node,
override hot-reloaded (visible via GET /api/v1/baseline). Real baseline
restored on next server restart from data/baseline.json.
This commit is contained in:
arsen 2026-05-17 12:12:24 +07:00
parent 68068d73d8
commit 0f373467e5
1 changed files with 371 additions and 0 deletions

View File

@ -349,6 +349,21 @@ fn amp_baseline_override_init() -> &'static Mutex<std::collections::HashMap<u8,
AMP_BASELINE_OVERRIDE.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// ADR-107: timestamp of the most recent baseline load/write. Auto
/// recalibrator uses this to enforce a cool-down between writes; the
/// REST endpoint reports it so the UI can show "calibrated X min ago".
static BASELINE_LAST_WRITTEN: OnceLock<Mutex<std::time::SystemTime>> = OnceLock::new();
fn baseline_last_written_init() -> &'static Mutex<std::time::SystemTime> {
BASELINE_LAST_WRITTEN.get_or_init(|| Mutex::new(std::time::UNIX_EPOCH))
}
/// ADR-107: in-progress calibration state for the REST endpoint.
/// 'idle' | 'running' | 'complete' | 'error: …'
static BASELINE_CALIBRATION_STATUS: OnceLock<Mutex<String>> = OnceLock::new();
fn baseline_calib_status_init() -> &'static Mutex<String> {
BASELINE_CALIBRATION_STATUS.get_or_init(|| Mutex::new("idle".to_string()))
}
/// Load persistent baseline from JSON file. Tolerant: missing file or
/// parse errors are non-fatal (server falls back to rolling p95).
fn load_baseline_file(path: &str) {
@ -408,6 +423,12 @@ fn load_baseline_file(path: &str) {
let mut o = amp_baseline_cv_init().lock().unwrap();
for (id, cv) in &loaded_cv { o.insert(*id, *cv); }
}
// ADR-107: track when the baseline file was last loaded/written so
// the auto-recalibrator and REST endpoint can stage cool-downs.
{
let mut t = baseline_last_written_init().lock().unwrap();
*t = std::time::SystemTime::now();
}
let summary: Vec<String> = loaded.iter().map(|(id, b)| format!("node{id}={b:.2}")).collect();
let cv_summary: Vec<String> = loaded_cv.iter()
.map(|(id, cv)| format!("node{id}_cv={:.2}%", cv * 100.0)).collect();
@ -769,6 +790,18 @@ struct Args {
#[arg(long, default_value = "20")]
csi_keepalive_pps: u32,
/// ADR-107: auto-recalibrate baseline in background when the room
/// has been `absent` and quiet for N seconds. Set to 0 to disable.
/// Default 1800 = 30 min — long enough that someone occasionally
/// in the room won't trigger spurious recalibrations.
#[arg(long, default_value = "1800")]
auto_recalibrate_quiet_sec: f64,
/// ADR-107: cool-down (seconds) between auto-recalibration writes.
/// Default 3600 = at most once per hour.
#[arg(long, default_value = "3600")]
auto_recalibrate_min_age_sec: f64,
/// Path to UI static files
#[arg(long, default_value = "../../ui")]
ui_path: PathBuf,
@ -3674,6 +3707,82 @@ async fn pose_zones_summary(State(state): State<SharedState>) -> Json<serde_json
}))
}
/// ADR-107: GET /api/v1/baseline — current loaded baseline (per-node)
/// + when it was last written + calibration job status.
async fn baseline_get() -> Json<serde_json::Value> {
let overrides: Vec<(u8, f64)> = {
let m = amp_baseline_override_init().lock().unwrap();
m.iter().map(|(k,v)| (*k, *v)).collect()
};
let cvs: Vec<(u8, f64)> = {
let m = amp_baseline_cv_init().lock().unwrap();
m.iter().map(|(k,v)| (*k, *v)).collect()
};
let last_written_secs = {
let t = baseline_last_written_init().lock().unwrap();
t.elapsed().map(|d| d.as_secs() as i64).unwrap_or(-1)
};
let status = baseline_calib_status_init().lock().unwrap().clone();
let mut nodes = serde_json::Map::new();
for (id, b) in overrides {
let cv = cvs.iter().find(|(i,_)| *i == id).map(|(_,c)| *c * 100.0).unwrap_or(0.0);
nodes.insert(id.to_string(), serde_json::json!({
"full_broadband_p95": b,
"full_broadband_cv_pct": cv,
}));
}
Json(serde_json::json!({
"nodes": nodes,
"last_written_sec_ago": last_written_secs,
"calibration_status": status,
}))
}
/// ADR-107: POST /api/v1/baseline/calibrate — kick off a background
/// capture. Body (optional JSON): { "duration_sec": 90, "trim_sec": 15,
/// "clean_window_sec": 30, "out": "data/baseline.json" }. Returns
/// immediately with status; client polls GET /api/v1/baseline to see
/// calibration_status transition idle → running → complete | error: …
async fn baseline_calibrate(body: Option<Json<serde_json::Value>>) -> Json<serde_json::Value> {
let cfg = body.map(|j| j.0).unwrap_or_else(|| serde_json::json!({}));
let duration = cfg.get("duration_sec").and_then(|v| v.as_f64()).unwrap_or(90.0);
let trim = cfg.get("trim_sec").and_then(|v| v.as_f64()).unwrap_or(15.0);
let win = cfg.get("clean_window_sec").and_then(|v| v.as_f64()).unwrap_or(30.0);
let out = cfg.get("out").and_then(|v| v.as_str())
.unwrap_or("data/baseline.json").to_string();
{
let mut s = baseline_calib_status_init().lock().unwrap();
if s.starts_with("running") {
return Json(serde_json::json!({
"started": false,
"reason": "calibration already running",
"status": *s,
}));
}
*s = "running".to_string();
}
let out_for_task = out.clone();
tokio::spawn(async move {
let res = capture_baseline_to_disk(duration, trim, win, &out_for_task).await;
let mut s = baseline_calib_status_init().lock().unwrap();
*s = match res {
Ok(_) => "complete".to_string(),
Err(e) => format!("error: {e}"),
};
});
Json(serde_json::json!({
"started": true,
"duration_sec": duration,
"trim_sec": trim,
"clean_window_sec": win,
"out": out,
"hint": "operator must step out of the room within ~5 seconds; poll GET /api/v1/baseline for status",
}))
}
async fn stream_status(State(state): State<SharedState>) -> Json<serde_json::Value> {
let s = state.read().await;
Json(serde_json::json!({
@ -4546,6 +4655,235 @@ async fn csi_keepalive_task(pps: u32) {
}
}
/// ADR-107: capture an empty-room baseline from the live WS stream
/// and persist it to disk. Mirrors what `scripts/record-baseline.py`
/// does, but runs in-process so the REST endpoint and the auto-
/// recalibrator can both call it.
///
/// Records `duration_sec` of frames, trims `trim_sec` from head and
/// tail, finds the lowest-CV sub-window, computes per-node FULL-
/// broadband mean / median / p95 / std / CV %, writes
/// `data/baseline.json` and reloads it live.
async fn capture_baseline_to_disk(
duration_sec: f64,
trim_sec: f64,
clean_window_sec: f64,
out_path: &str,
) -> Result<serde_json::Value, String> {
use std::time::{Instant, SystemTime, UNIX_EPOCH};
let mut by_node: std::collections::HashMap<u8, Vec<(f64, Vec<f64>, f64)>> =
std::collections::HashMap::new();
// Read off the broadcast channel directly via subscribing to a WS
// stream loop. We share the same tx that broadcasts JSON; just
// subscribe and parse.
let mut rx = BASELINE_BUS.get().ok_or("baseline bus not initialised yet")?
.subscribe();
let start = Instant::now();
while start.elapsed().as_secs_f64() < duration_sec {
match tokio::time::timeout(
std::time::Duration::from_secs(1),
rx.recv()
).await {
Ok(Ok(json)) => {
let d: serde_json::Value = match serde_json::from_str(&json) {
Ok(v) => v, Err(_) => continue,
};
if d.get("type").and_then(|v| v.as_str()) != Some("sensing_update") {
continue;
}
let t = start.elapsed().as_secs_f64();
if let Some(arr) = d.get("nodes").and_then(|v| v.as_array()) {
for n in arr {
let nid = match n.get("node_id").and_then(|v| v.as_u64()) {
Some(x) => x as u8, None => continue,
};
let amps: Vec<f64> = n.get("amplitude")
.and_then(|v| v.as_array())
.map(|a| a.iter().filter_map(|x| x.as_f64()).collect())
.unwrap_or_default();
if amps.is_empty() { continue; }
let rssi = n.get("rssi_dbm").and_then(|v| v.as_f64()).unwrap_or(0.0);
by_node.entry(nid).or_default().push((t, amps, rssi));
}
}
}
_ => continue,
}
}
if by_node.is_empty() {
return Err("no per-node frames captured during the window".into());
}
// Per-node trim + cleanest sub-window selection.
let mut nodes_out = serde_json::Map::new();
for (nid, frames) in &by_node {
if frames.is_empty() { continue; }
let t0 = frames.first().unwrap().0;
let t1 = frames.last().unwrap().0;
let dur = t1 - t0;
let (head, tail) = if dur < trim_sec * 2.0 + clean_window_sec / 2.0 {
(dur / 6.0, dur / 6.0)
} else { (trim_sec, trim_sec) };
let trimmed: Vec<&(f64, Vec<f64>, f64)> = frames.iter()
.filter(|f| f.0 >= t0 + head && f.0 <= t1 - tail).collect();
if trimmed.is_empty() { continue; }
let full_mean = |amps: &[f64]| {
let v: Vec<f64> = amps.iter().copied().filter(|x| *x > 0.0).collect();
if v.is_empty() { 0.0 } else { v.iter().sum::<f64>() / v.len() as f64 }
};
// Scan windows for lowest-CV chunk.
let win = clean_window_sec;
let chunk: Vec<&&(f64, Vec<f64>, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win {
trimmed.iter().collect()
} else {
let mut best: Option<(f64, Vec<&&(f64, Vec<f64>, f64)>)> = None;
let step = 5.0;
let mut cursor = trimmed.first().unwrap().0;
while cursor + win <= trimmed.last().unwrap().0 {
let w: Vec<&&(f64, Vec<f64>, f64)> = trimmed.iter()
.filter(|f| f.0 >= cursor && f.0 <= cursor + win).collect();
if w.len() >= 5 {
let bms: Vec<f64> = w.iter().map(|f| full_mean(&f.1)).collect();
let mu: f64 = bms.iter().sum::<f64>() / bms.len() as f64;
if mu > 0.0 {
let var: f64 = bms.iter().map(|x| (x-mu).powi(2)).sum::<f64>() / bms.len() as f64;
let cv = var.sqrt() / mu;
if best.as_ref().map_or(true, |b| cv < b.0) {
best = Some((cv, w));
}
}
}
cursor += step;
}
match best { Some((_, w)) => w, None => trimmed.iter().collect() }
};
let bms: Vec<f64> = chunk.iter().map(|f| full_mean(&f.1)).collect();
let mean = bms.iter().sum::<f64>() / bms.len() as f64;
let var = bms.iter().map(|x| (x-mean).powi(2)).sum::<f64>() / bms.len() as f64;
let std = var.sqrt();
let cv = if mean > 0.0 { std / mean } else { 0.0 };
let mut sorted_bms = bms.clone();
sorted_bms.sort_by(|a,b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = sorted_bms[sorted_bms.len() / 2];
let p95 = sorted_bms[(sorted_bms.len() as f64 * 0.95) as usize];
let rssis: Vec<f64> = chunk.iter().map(|f| f.2).filter(|x| *x != 0.0).collect();
let rssi_mean = if rssis.is_empty() { 0.0 } else { rssis.iter().sum::<f64>() / rssis.len() as f64 };
nodes_out.insert(nid.to_string(), serde_json::json!({
"full_broadband_mean": mean,
"full_broadband_p50": p50,
"full_broadband_p95": p95,
"full_broadband_std": std,
"full_broadband_cv_pct": cv * 100.0,
"rssi_dbm": rssi_mean,
"n_samples": chunk.len(),
}));
}
if nodes_out.is_empty() {
return Err("trimming yielded zero usable windows".into());
}
let payload = serde_json::json!({
"version": 2,
"captured_at": chrono::Utc::now().to_rfc3339(),
"duration_sec": duration_sec,
"trim_head_sec": trim_sec,
"trim_tail_sec": trim_sec,
"clean_window_sec": clean_window_sec,
"method": "in-process (ADR-107): record → trim → lowest-CV sub-window → FULL-broadband stats",
"nodes": nodes_out,
});
if let Some(parent) = std::path::Path::new(out_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
std::fs::write(out_path, serde_json::to_string_pretty(&payload).map_err(|e| e.to_string())?)
.map_err(|e| format!("write {out_path}: {e}"))?;
// Hot-reload override map without restart.
load_baseline_file(out_path);
{
let mut t = baseline_last_written_init().lock().unwrap();
*t = SystemTime::now();
}
let _ = UNIX_EPOCH;
Ok(payload)
}
/// ADR-107: subscribed broadcast handle of the WS JSON stream so
/// capture_baseline_to_disk and the auto-recalibrator can read live
/// frames without re-binding the UDP socket.
static BASELINE_BUS: OnceLock<tokio::sync::broadcast::Sender<String>> = OnceLock::new();
/// ADR-107: background task — when the classifier reports `absent` and
/// CV stays low for `quiet_window_sec`, run a baseline capture in the
/// background. Cool-down `min_age_sec` between writes so we don't loop.
async fn auto_recalibrate_task(
state: SharedState,
enabled: bool,
quiet_window_sec: f64,
min_age_sec: f64,
capture_dur_sec: f64,
) {
if !enabled {
info!("Auto-recalibrate disabled (--auto-recalibrate 0)");
return;
}
info!("Auto-recalibrate enabled: trigger after {quiet_window_sec:.0}s of `absent`+low-CV, min {min_age_sec:.0}s between writes");
let mut quiet_since: Option<std::time::Instant> = None;
let mut tick = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tick.tick().await;
let (level, cv) = {
let s = state.read().await;
match &s.latest_update {
Some(u) => (u.classification.motion_level.clone(), u.classification.confidence),
None => continue,
}
};
let quiet = level == "absent" && cv < 0.08;
if !quiet { quiet_since = None; continue; }
let started = quiet_since.get_or_insert_with(std::time::Instant::now);
if started.elapsed().as_secs_f64() < quiet_window_sec { continue; }
// Cool-down vs last write
let age_sec = {
let t = baseline_last_written_init().lock().unwrap();
t.elapsed().map(|d| d.as_secs_f64()).unwrap_or(f64::INFINITY)
};
if age_sec < min_age_sec { continue; }
info!("auto-recalibrate: room quiet for {:.0}s, refreshing baseline...", started.elapsed().as_secs_f64());
{
let mut s = baseline_calib_status_init().lock().unwrap();
*s = "running (auto)".to_string();
}
let path = std::env::var("RUVIEW_BASELINE_FILE").unwrap_or_else(|_| "data/baseline.json".into());
match capture_baseline_to_disk(capture_dur_sec, 5.0, capture_dur_sec * 0.5, &path).await {
Ok(_) => {
info!("auto-recalibrate: saved new baseline to {path}");
let mut s = baseline_calib_status_init().lock().unwrap();
*s = "complete (auto)".to_string();
}
Err(e) => {
error!("auto-recalibrate: capture failed: {e}");
let mut s = baseline_calib_status_init().lock().unwrap();
*s = format!("error (auto): {e}");
}
}
quiet_since = None;
}
}
async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let addr = format!("0.0.0.0:{udp_port}");
let socket = match UdpSocket::bind(&addr).await {
@ -5910,6 +6248,28 @@ async fn main() {
},
}));
// ADR-107: initialise the baseline broadcast bus — capture
// baseline reads from this. We forward every JSON message broadcast
// on the WS into the bus so the in-process capture stays decoupled
// from individual WS clients.
{
let (tx, _rx) = tokio::sync::broadcast::channel::<String>(256);
let _ = BASELINE_BUS.set(tx);
}
{
// Forwarder: subscribe to AppState.tx, push each message into
// BASELINE_BUS. Decouples baseline capture from the live WS
// clients (no client subscribing to the bus when no calibration
// is running).
let mut rx_from_state = state.read().await.tx.subscribe();
let bus_tx = BASELINE_BUS.get().unwrap().clone();
tokio::spawn(async move {
while let Ok(msg) = rx_from_state.recv().await {
let _ = bus_tx.send(msg);
}
});
}
// Start background tasks based on source
match source {
"esp32" => {
@ -5917,6 +6277,14 @@ async fn main() {
// ADR-106: drive CSI rate by pinging sensors back ourselves
// instead of relying on the operator's ad-hoc `ping -i 0.05 …`.
tokio::spawn(csi_keepalive_task(args.csi_keepalive_pps));
// ADR-107: auto-recalibrate baseline silently when room is quiet.
tokio::spawn(auto_recalibrate_task(
state.clone(),
args.auto_recalibrate_quiet_sec > 0.0,
args.auto_recalibrate_quiet_sec,
args.auto_recalibrate_min_age_sec,
90.0, // capture window
));
tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms));
}
"wifi" => {
@ -6002,6 +6370,9 @@ async fn main() {
.route("/api/v1/pose/current", get(pose_current))
.route("/api/v1/pose/stats", get(pose_stats))
.route("/api/v1/pose/zones/summary", get(pose_zones_summary))
// ADR-107: baseline calibration REST.
.route("/api/v1/baseline", get(baseline_get))
.route("/api/v1/baseline/calibrate", axum::routing::post(baseline_calibrate))
// Stream endpoints
.route("/api/v1/stream/status", get(stream_status))
.route("/api/v1/stream/pose", get(ws_pose_handler))