From 4a1f3a1e1090523305dc23e31f8c223c9dbda7e8 Mon Sep 17 00:00:00 2001 From: ruv Date: Wed, 13 May 2026 23:00:31 -0400 Subject: [PATCH] feat(sensing-server): wire ADR-099 introspection tap + /ws/introspection + /api/v1/introspection/snapshot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I3 (per ADR-099). Three changes in main.rs: 1) AppStateInner: + intro: IntrospectionState + intro_tx: broadcast::Sender (256-slot ring, same shape as the existing tx). 2) ESP32 frame path: after the global frame_history push, before the per-node mutable borrow of s.node_states, compute the per-frame derived feature (mean amplitude across subcarriers), call s.intro.update(ts_ns, feature), and broadcast the snapshot JSON to s.intro_tx. Placement is deliberate — between the global state's mutable touch and the per-node &mut so borrow-checking stays linear; ns is borrowed *after* the tap completes its s.intro / s.intro_tx access. 3) Routes: ws_introspection_handler → /ws/introspection api_introspection_snapshot → /api/v1/introspection/snapshot Same Axum + tokio::sync::broadcast pattern as ws_sensing_handler, subscribed against s.intro_tx. Wrapped by the bearer-auth middleware already on /api/v1/* — orchestrator probes and unauthenticated /ws/sensing reachers continue to land on the existing topic. Verified: cargo build -p wifi-densepose-sensing-server --no-default-features ✓ cargo test -p wifi-densepose-sensing-server --no-default-features lib: 207 passed, 0 failed (199 pre-tap + 8 introspection) integration suites: 70, 8, 16, 18 passed, 0 failed cargo clippy: clean on the introspection surface (pre-existing warnings on -core / -ruvector / -signal unchanged). Co-Authored-By: claude-flow --- .../wifi-densepose-sensing-server/src/main.rs | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 5887a752..6180bab7 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -553,6 +553,11 @@ struct AppStateInner { /// Instant of the last ESP32 UDP frame received (for offline detection). last_esp32_frame: Option, tx: broadcast::Sender, + // ADR-099 D2/D3/D4: real-time CSI introspection tap. Per-frame state + + // a parallel broadcast topic (`/ws/introspection`) running alongside + // (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline. + intro: wifi_densepose_sensing_server::introspection::IntrospectionState, + intro_tx: broadcast::Sender, total_detections: u64, start_time: std::time::Instant, /// Vital sign detector (processes CSI frames to estimate HR/RR). @@ -2027,6 +2032,59 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) { info!("WebSocket client disconnected (sensing)"); } +// ── ADR-099: real-time CSI introspection — WS topic + REST snapshot ────────── +// +// Parallel to the window-aggregated `/ws/sensing` topic. Subscribers see a +// fresh `IntrospectionSnapshot` JSON frame on every accepted CSI frame +// (regime / Lyapunov exponent / top-k DTW similarity), no window-close delay. + +async fn ws_introspection_handler( + ws: WebSocketUpgrade, + State(state): State, +) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_ws_introspection_client(socket, state)) +} + +async fn handle_ws_introspection_client(mut socket: WebSocket, state: SharedState) { + let mut rx = { + let s = state.read().await; + s.intro_tx.subscribe() + }; + + info!("WebSocket client connected (introspection)"); + + loop { + tokio::select! { + msg = rx.recv() => { + match msg { + Ok(json) => { + if socket.send(Message::Text(json.into())).await.is_err() { + break; + } + } + Err(_) => break, + } + } + msg = socket.recv() => { + match msg { + Some(Ok(Message::Close(_))) | None => break, + _ => {} // ignore client messages + } + } + } + } + + info!("WebSocket client disconnected (introspection)"); +} + +/// `GET /api/v1/introspection/snapshot` — one-shot poll for the latest +/// per-frame snapshot (regime, Lyapunov, top-k similarity). Mirrors the shape +/// of `/api/v1/sensing/latest` for the dashboard one-shot path. +async fn api_introspection_snapshot(State(state): State) -> impl IntoResponse { + let s = state.read().await; + Json(s.intro.snapshot().clone()) +} + // ── Pose WebSocket handler (sends pose_data messages for Live Demo) ────────── async fn ws_pose_handler( @@ -3871,6 +3929,30 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { s.frame_history.pop_front(); } + // ── ADR-099: real-time introspection tap ──────────────── + // Per-frame update of the attractor / DTW pipeline running + // parallel to the window-aggregated event path. Placed + // BEFORE the per-node `&mut` borrow of `s.node_states` so + // `s.intro` / `s.intro_tx` stay reachable. Never window- + // blocked; `/ws/introspection` sees a fresh snapshot on + // every accepted frame. + { + let intro_feature = if frame.amplitudes.is_empty() { + 0.0 + } else { + frame.amplitudes.iter().copied().sum::() + / frame.amplitudes.len() as f64 + }; + let intro_ts_ns = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0); + let _ = s.intro.update(intro_ts_ns, intro_feature); + if let Ok(intro_json) = serde_json::to_string(s.intro.snapshot()) { + let _ = s.intro_tx.send(intro_json); + } + } + // ── Per-node processing (issue #249) ────────────────── // Process entirely within per-node state so different // ESP32 nodes never mix their smoothing/vitals buffers. @@ -4767,6 +4849,10 @@ async fn main() { info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len()); let (tx, _) = broadcast::channel::(256); + // ADR-099: parallel broadcast for the per-frame introspection snapshot stream + // consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow + // clients drop oldest, identical backpressure shape. + let (intro_tx, _) = broadcast::channel::(256); let state: SharedState = Arc::new(RwLock::new(AppStateInner { latest_update: None, rssi_history: VecDeque::new(), @@ -4775,6 +4861,8 @@ async fn main() { source: source.into(), last_esp32_frame: None, tx, + intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(), + intro_tx, total_detections: 0, start_time: std::time::Instant::now(), vital_detector: VitalSignDetector::new(vital_sample_rate), @@ -4936,6 +5024,9 @@ async fn main() { .route("/api/v1/stream/pose", get(ws_pose_handler)) // Sensing WebSocket on the HTTP port so the UI can reach it without a second port .route("/ws/sensing", get(ws_sensing_handler)) + // ADR-099: real-time introspection — per-frame attractor + DTW snapshot. + .route("/ws/introspection", get(ws_introspection_handler)) + .route("/api/v1/introspection/snapshot", get(api_introspection_snapshot)) // Model management endpoints (UI compatibility) .route("/api/v1/models", get(list_models)) .route("/api/v1/models/active", get(get_active_model))