feat(sensing-server): wire ADR-099 introspection tap + /ws/introspection + /api/v1/introspection/snapshot

I3 (per ADR-099). Three changes in main.rs:

1) AppStateInner: + intro: IntrospectionState + intro_tx: broadcast::Sender<String>
   (256-slot ring, same shape as the existing tx).

2) ESP32 frame path: after the global frame_history push, before the
   per-node mutable borrow of s.node_states, compute the per-frame derived
   feature (mean amplitude across subcarriers), call s.intro.update(ts_ns,
   feature), and broadcast the snapshot JSON to s.intro_tx. Placement is
   deliberate — between the global state's mutable touch and the per-node
   &mut so borrow-checking stays linear; ns is borrowed *after* the tap
   completes its s.intro / s.intro_tx access.

3) Routes:
     ws_introspection_handler   → /ws/introspection
     api_introspection_snapshot → /api/v1/introspection/snapshot
   Same Axum + tokio::sync::broadcast pattern as ws_sensing_handler,
   subscribed against s.intro_tx. Wrapped by the bearer-auth middleware
   already on /api/v1/* — orchestrator probes and unauthenticated /ws/sensing
   reachers continue to land on the existing topic.

Verified:
  cargo build -p wifi-densepose-sensing-server --no-default-features ✓
  cargo test  -p wifi-densepose-sensing-server --no-default-features
    lib:           207 passed, 0 failed (199 pre-tap + 8 introspection)
    integration suites: 70, 8, 16, 18 passed, 0 failed
  cargo clippy: clean on the introspection surface (pre-existing warnings
                on -core / -ruvector / -signal unchanged).

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-13 23:00:31 -04:00
parent 94ef125240
commit 4a1f3a1e10
1 changed files with 91 additions and 0 deletions

View File

@ -553,6 +553,11 @@ struct AppStateInner {
/// Instant of the last ESP32 UDP frame received (for offline detection).
last_esp32_frame: Option<std::time::Instant>,
tx: broadcast::Sender<String>,
// ADR-099 D2/D3/D4: real-time CSI introspection tap. Per-frame state +
// a parallel broadcast topic (`/ws/introspection`) running alongside
// (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline.
intro: wifi_densepose_sensing_server::introspection::IntrospectionState,
intro_tx: broadcast::Sender<String>,
total_detections: u64,
start_time: std::time::Instant,
/// Vital sign detector (processes CSI frames to estimate HR/RR).
@ -2027,6 +2032,59 @@ async fn handle_ws_client(mut socket: WebSocket, state: SharedState) {
info!("WebSocket client disconnected (sensing)");
}
// ── ADR-099: real-time CSI introspection — WS topic + REST snapshot ──────────
//
// Parallel to the window-aggregated `/ws/sensing` topic. Subscribers see a
// fresh `IntrospectionSnapshot` JSON frame on every accepted CSI frame
// (regime / Lyapunov exponent / top-k DTW similarity), no window-close delay.
async fn ws_introspection_handler(
ws: WebSocketUpgrade,
State(state): State<SharedState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_ws_introspection_client(socket, state))
}
async fn handle_ws_introspection_client(mut socket: WebSocket, state: SharedState) {
let mut rx = {
let s = state.read().await;
s.intro_tx.subscribe()
};
info!("WebSocket client connected (introspection)");
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Ok(json) => {
if socket.send(Message::Text(json.into())).await.is_err() {
break;
}
}
Err(_) => break,
}
}
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
_ => {} // ignore client messages
}
}
}
}
info!("WebSocket client disconnected (introspection)");
}
/// `GET /api/v1/introspection/snapshot` — one-shot poll for the latest
/// per-frame snapshot (regime, Lyapunov, top-k similarity). Mirrors the shape
/// of `/api/v1/sensing/latest` for the dashboard one-shot path.
async fn api_introspection_snapshot(State(state): State<SharedState>) -> impl IntoResponse {
let s = state.read().await;
Json(s.intro.snapshot().clone())
}
// ── Pose WebSocket handler (sends pose_data messages for Live Demo) ──────────
async fn ws_pose_handler(
@ -3871,6 +3929,30 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.frame_history.pop_front();
}
// ── ADR-099: real-time introspection tap ────────────────
// Per-frame update of the attractor / DTW pipeline running
// parallel to the window-aggregated event path. Placed
// BEFORE the per-node `&mut` borrow of `s.node_states` so
// `s.intro` / `s.intro_tx` stay reachable. Never window-
// blocked; `/ws/introspection` sees a fresh snapshot on
// every accepted frame.
{
let intro_feature = if frame.amplitudes.is_empty() {
0.0
} else {
frame.amplitudes.iter().copied().sum::<f64>()
/ frame.amplitudes.len() as f64
};
let intro_ts_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let _ = s.intro.update(intro_ts_ns, intro_feature);
if let Ok(intro_json) = serde_json::to_string(s.intro.snapshot()) {
let _ = s.intro_tx.send(intro_json);
}
}
// ── Per-node processing (issue #249) ──────────────────
// Process entirely within per-node state so different
// ESP32 nodes never mix their smoothing/vitals buffers.
@ -4767,6 +4849,10 @@ async fn main() {
info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len());
let (tx, _) = broadcast::channel::<String>(256);
// ADR-099: parallel broadcast for the per-frame introspection snapshot stream
// consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow
// clients drop oldest, identical backpressure shape.
let (intro_tx, _) = broadcast::channel::<String>(256);
let state: SharedState = Arc::new(RwLock::new(AppStateInner {
latest_update: None,
rssi_history: VecDeque::new(),
@ -4775,6 +4861,8 @@ async fn main() {
source: source.into(),
last_esp32_frame: None,
tx,
intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(),
intro_tx,
total_detections: 0,
start_time: std::time::Instant::now(),
vital_detector: VitalSignDetector::new(vital_sample_rate),
@ -4936,6 +5024,9 @@ async fn main() {
.route("/api/v1/stream/pose", get(ws_pose_handler))
// Sensing WebSocket on the HTTP port so the UI can reach it without a second port
.route("/ws/sensing", get(ws_sensing_handler))
// ADR-099: real-time introspection — per-frame attractor + DTW snapshot.
.route("/ws/introspection", get(ws_introspection_handler))
.route("/api/v1/introspection/snapshot", get(api_introspection_snapshot))
// Model management endpoints (UI compatibility)
.route("/api/v1/models", get(list_models))
.route("/api/v1/models/active", get(get_active_model))