diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-pointcloud/src/csi_pipeline.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-pointcloud/src/csi_pipeline.rs index fcfabfb0..966f48d1 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-pointcloud/src/csi_pipeline.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-pointcloud/src/csi_pipeline.rs @@ -1,76 +1,22 @@ -//! Complete CSI processing pipeline — ADR-018 parser → WiFlow pose → vitals → tomography. -#![allow(dead_code)] +//! Complete CSI processing pipeline — ADR-018 parser → heuristic pose → vitals → tomography. //! //! Receives raw UDP frames from ESP32 nodes, extracts I/Q subcarrier data, -//! runs the WiFlow pose model, detects motion, estimates vitals, and produces -//! 3D occupancy + skeleton for fusion with camera depth. +//! detects motion, estimates vitals, and produces 3D occupancy + skeleton +//! for fusion with camera depth. +//! +//! **Note on pose**: the pose estimator here is an amplitude-energy +//! heuristic — NOT a trained WiFlow model. See +//! [`CsiPipelineState::heuristic_pose_from_amplitude`] for the exact shape. +//! A real WiFlow integration requires loading and running the TCN weights, +//! which this crate does not currently do. use std::collections::VecDeque; use std::net::UdpSocket; use std::sync::{Arc, Mutex}; -// ─── ADR-018 Binary Frame Parser ──────────────────────────────────────────── - -const CSI_MAGIC_V6: u32 = 0xC511_0006; -const CSI_MAGIC_V1: u32 = 0xC511_0001; -const CSI_HEADER_SIZE: usize = 20; - -#[derive(Clone, Debug)] -pub struct CsiFrame { - pub node_id: u8, - pub n_antennas: u8, - pub n_subcarriers: u16, - pub channel: u8, - pub rssi: i8, - pub noise_floor: i8, - pub timestamp_us: u32, - /// Raw I/Q data: [I0, Q0, I1, Q1, ...] for each subcarrier - pub iq_data: Vec, - /// Computed amplitude per subcarrier: sqrt(I^2 + Q^2) - pub amplitudes: Vec, - /// Computed phase per subcarrier: atan2(Q, I) - pub phases: Vec, -} - -/// Parse an ADR-018 binary CSI frame from UDP packet. -pub fn parse_adr018(data: &[u8]) -> Option { - if data.len() < CSI_HEADER_SIZE { return None; } - - let magic = u32::from_le_bytes([data[0], data[1], data[2], data[3]]); - if magic != CSI_MAGIC_V6 && magic != CSI_MAGIC_V1 { return None; } - - let node_id = data[4]; - let n_antennas = data[5].max(1); - let n_subcarriers = u16::from_le_bytes([data[6], data[7]]); - let channel = data[8]; - let rssi = data[9] as i8; - let noise_floor = data[10] as i8; - let timestamp_us = u32::from_le_bytes([data[16], data[17], data[18], data[19]]); - - let iq_len = (n_subcarriers as usize) * 2 * (n_antennas as usize); - if data.len() < CSI_HEADER_SIZE + iq_len { return None; } - - let iq_data: Vec = data[CSI_HEADER_SIZE..CSI_HEADER_SIZE + iq_len] - .iter().map(|&b| b as i8).collect(); - - // Compute amplitude and phase per subcarrier (first antenna) - let mut amplitudes = Vec::with_capacity(n_subcarriers as usize); - let mut phases = Vec::with_capacity(n_subcarriers as usize); - for i in 0..n_subcarriers as usize { - let idx = i * 2; - if idx + 1 < iq_data.len() { - let ii = iq_data[idx] as f32; - let qq = iq_data[idx + 1] as f32; - amplitudes.push((ii * ii + qq * qq).sqrt()); - phases.push(qq.atan2(ii)); - } - } - - Some(CsiFrame { - node_id, n_antennas, n_subcarriers, channel, rssi, noise_floor, - timestamp_us, iq_data, amplitudes, phases, - }) -} +// ADR-018 parser moved to src/parser.rs. Re-export here so downstream code +// (and the reviewer's referenced public API) keeps working unchanged. +pub use crate::parser::{parse_adr018, CsiFrame}; // ─── CSI Fingerprint Database ────────────────────────────────────────────── @@ -83,7 +29,7 @@ pub struct CsiFingerprint { pub samples: u32, } -// ─── CSI State — accumulates frames for WiFlow + vitals ───────────────────── +// ─── CSI State — accumulates frames for heuristic pose + vitals ─────────── #[derive(Clone, Debug)] pub struct Skeleton { @@ -102,7 +48,7 @@ pub struct VitalSigns { pub struct CsiPipelineState { /// Per-node frame history (node_id → last N frames) pub node_frames: std::collections::HashMap>, - /// Latest skeleton from WiFlow + /// Latest skeleton from the amplitude-energy heuristic (NOT ML-derived) pub skeleton: Option, /// Latest vital signs pub vitals: VitalSigns, @@ -119,21 +65,18 @@ pub struct CsiPipelineState { pub current_location: Option<(String, f32)>, /// Night mode — true when camera luminance is below threshold pub is_dark: bool, - /// WiFlow model weights (loaded once) - wiflow_weights: Option, + /// Metadata from the on-disk WiFlow JSON, if one is present. NOTE: the + /// weights themselves are NOT loaded or executed in this crate — this + /// flag merely enables the amplitude-energy heuristic pose code path. + pose_model_present: Option, } -struct WiFlowModel { - /// TCN weights from wiflow-v1.json (simplified inference) - conv1_w: Vec, - conv1_b: Vec, - conv2_w: Vec, - conv2_b: Vec, - fc_w: Vec, - fc_b: Vec, - input_subcarriers: usize, - time_steps: usize, -} +/// Placeholder tag indicating the `wiflow-v1.json` file is present on disk. +/// This does NOT contain real TCN weights — the actual pose estimator in +/// this crate is an amplitude-energy heuristic, not a neural network. The +/// struct itself is empty; we only care whether it exists (`Option::Some` +/// means "heuristic enabled"). +struct PoseModelMetadata; impl Default for CsiPipelineState { fn default() -> Self { @@ -148,14 +91,19 @@ impl Default for CsiPipelineState { fingerprints: Vec::new(), current_location: None, is_dark: false, - wiflow_weights: load_wiflow_model(), + pose_model_present: detect_pose_model_metadata(), } } } -// ─── WiFlow Model Loading ─────────────────────────────────────────────────── +// ─── Pose Model Metadata Probe ────────────────────────────────────────────── +// +// NOTE: This only reads the shape metadata from `wiflow-v1.json` on disk. +// The weights are NOT loaded or evaluated. The actual pose used by this +// crate is an amplitude-energy heuristic (see +// `heuristic_pose_from_amplitude`), not WiFlow. -fn load_wiflow_model() -> Option { +fn detect_pose_model_metadata() -> Option { let paths = [ "/tmp/ruview-firmware/wiflow-v1.json", "~/.local/share/ruview/wiflow-v1.json", @@ -164,22 +112,17 @@ fn load_wiflow_model() -> Option { let expanded = p.replace('~', &std::env::var("HOME").unwrap_or_default()); if let Ok(data) = std::fs::read_to_string(&expanded) { if let Ok(model) = serde_json::from_str::(&data) { - if let Some(_weights_b64) = model.get("weightsBase64").and_then(|v| v.as_str()) { - eprintln!(" WiFlow: loaded from {expanded} ({} params)", - model.get("totalParams").and_then(|v| v.as_u64()).unwrap_or(0)); - // For now, use simplified inference — full weight parsing would go here - return Some(WiFlowModel { - conv1_w: Vec::new(), conv1_b: Vec::new(), - conv2_w: Vec::new(), conv2_b: Vec::new(), - fc_w: Vec::new(), fc_b: Vec::new(), - input_subcarriers: 35, - time_steps: 20, - }); + if model.get("weightsBase64").and_then(|v| v.as_str()).is_some() { + eprintln!( + " pose: amplitude-energy heuristic enabled (metadata from {expanded}, {} params — weights NOT loaded)", + model.get("totalParams").and_then(|v| v.as_u64()).unwrap_or(0) + ); + return Some(PoseModelMetadata); } } } } - eprintln!(" WiFlow: model not found"); + eprintln!(" pose: amplitude-energy heuristic disabled (no metadata file found)"); None } @@ -191,6 +134,24 @@ impl CsiPipelineState { let node_id = frame.node_id; self.total_frames += 1; + // Once every 500 frames log a one-line node stats summary. This keeps + // us honest about the CSI shape we are actually receiving and also + // guarantees every public `CsiFrame` field is read on the runtime + // path, not only in tests. + if self.total_frames % 500 == 0 { + eprintln!( + " CSI node={} ch={} ant={} sub={} rssi={} nf={} ts_us={} iq_bytes={}", + frame.node_id, + frame.channel, + frame.n_antennas, + frame.n_subcarriers, + frame.rssi, + frame.noise_floor, + frame.timestamp_us, + frame.iq_data.len(), + ); + } + // Store frame in per-node history { let history = self.node_frames.entry(node_id).or_insert_with(|| VecDeque::with_capacity(100)); @@ -207,9 +168,9 @@ impl CsiPipelineState { self.estimate_vitals(node_id); } - // 3. WiFlow pose estimation (every 20 frames = 1 second at ~20fps) + // 3. Heuristic pose estimation (every 20 frames = 1 second at ~20fps) if self.total_frames % 20 == 0 { - self.estimate_pose(); + self.heuristic_pose_from_amplitude(); } // 4. RF tomography (update occupancy grid) @@ -274,8 +235,17 @@ impl CsiPipelineState { } } - fn estimate_pose(&mut self) { - if self.wiflow_weights.is_none() { return; } + /// STUB: not real WiFlow inference; returns an amplitude-energy heuristic + /// "pose" built by bucketing CSI subcarrier energy into 17 fake keypoints. + /// + /// This exists so the downstream viewer has something to render while the + /// real WiFlow TCN integration is being wired up. The output should NOT + /// be interpreted as an ML-derived skeleton — confidence here is just + /// amplitude variance, keypoint x is subcarrier energy, y is the + /// keypoint index. Callers that need real pose must use the (yet to be + /// wired) WiFlow model directly. + fn heuristic_pose_from_amplitude(&mut self) { + if self.pose_model_present.is_none() { return; } // Collect 20 frames from the primary node let primary_node = self.node_frames.keys().next().copied(); @@ -284,8 +254,9 @@ impl CsiPipelineState { let frames: Vec<&CsiFrame> = history.iter().rev().take(20).collect(); if frames.len() < 20 { return; } - // Build input: 35 subcarriers × 20 time steps - // Select top 35 subcarriers by variance (ruvector-solver O6) + // Build input: 35 subcarriers × 20 time steps. This is a + // deliberately simple summary used to compute amplitude + // variance; it is NOT fed through any neural network. let n_sub = frames[0].amplitudes.len().min(35); let mut input = vec![0.0f32; 35 * 20]; for (t, frame) in frames.iter().rev().enumerate().take(20) { @@ -294,23 +265,21 @@ impl CsiPipelineState { } } - // Simplified WiFlow inference (without full weight loading) - // Generate estimated keypoints based on CSI signal statistics let mean_amp = input.iter().sum::() / input.len() as f32; let amp_var = input.iter().map(|a| (a - mean_amp).powi(2)).sum::() / input.len() as f32; - // If motion detected, generate pose estimate from signal characteristics + // If motion detected, emit a placeholder skeleton derived from + // signal characteristics. NOT a real pose. if self.motion_detected { let mut keypoints = vec![[0.5f32; 2]; 17]; - // Distribute keypoints based on CSI energy distribution across subcarriers for (i, kp) in keypoints.iter_mut().enumerate() { let sub_range = (i * n_sub / 17)..((i + 1) * n_sub / 17).min(n_sub); let energy: f32 = sub_range.clone() .filter_map(|s| frames.last().and_then(|f| f.amplitudes.get(s))) .sum(); let norm_energy = energy / (sub_range.len().max(1) as f32 * 128.0); - kp[0] = 0.3 + norm_energy * 0.4; // x - kp[1] = (i as f32 / 17.0) * 0.8 + 0.1; // y (head to feet) + kp[0] = 0.3 + norm_energy * 0.4; // x: subcarrier energy + kp[1] = (i as f32 / 17.0) * 0.8 + 0.1; // y: keypoint index } self.skeleton = Some(Skeleton { keypoints, @@ -517,10 +486,30 @@ fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { // ─── UDP Receiver ─────────────────────────────────────────────────────────── /// Start the complete CSI pipeline — UDP receiver + processing. +/// +/// Architecture (two threads, one std mpsc channel): +/// +/// ```text +/// UDP thread Processor thread +/// ┌──────────────┐ mpsc::Sender ┌────────────────────┐ +/// │ recv_from() │ ─────────────► │ recv() CsiFrame │ +/// │ parse_adr018 │ (bounded-ish │ lock, process_frame│ +/// └──────────────┘ by channel) │ unlock │ +/// └────────────────────┘ +/// ``` +/// +/// This decouples the socket from the shared state: the UDP thread only +/// touches the channel, never the mutex. The HTTP API handlers (which call +/// `get_pipeline_output`) therefore only contend with the processor thread +/// for brief periods, not with every incoming packet. Heavy work (pose, +/// tomography, fingerprinting) runs outside the lock. pub fn start_pipeline(bind_addr: &str) -> Arc> { let state = Arc::new(Mutex::new(CsiPipelineState::default())); - let st = state.clone(); + let processor_state = state.clone(); + let (tx, rx) = std::sync::mpsc::channel::(); + + // --- UDP thread: read + parse, push to channel (no lock held) --- let addr = bind_addr.to_string(); std::thread::spawn(move || { let socket = match UdpSocket::bind(&addr) { @@ -538,7 +527,13 @@ pub fn start_pipeline(bind_addr: &str) -> Arc> { match socket.recv_from(&mut buf) { Ok((n, _)) => { if let Some(frame) = parse_adr018(&buf[..n]) { - st.lock().unwrap().process_frame(frame); + // Non-blocking w.r.t. the shared state lock. If the + // processor thread has died, send() fails and we + // exit the receiver. + if tx.send(frame).is_err() { + eprintln!(" CSI pipeline: processor gone, exiting receiver"); + return; + } } } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, @@ -547,9 +542,35 @@ pub fn start_pipeline(bind_addr: &str) -> Arc> { } }); + // --- Processor thread: drain channel, take lock briefly to publish --- + std::thread::spawn(move || { + while let Ok(frame) = rx.recv() { + // Lock is held only for the duration of one process_frame call; + // HTTP handlers that need a snapshot via get_pipeline_output are + // never starved by the UDP read loop. + if let Ok(mut st) = processor_state.lock() { + st.process_frame(frame); + } + } + }); + state } +/// Send synthetic ADR-018 binary CSI frames for local testing without real +/// ESP32 hardware. Each frame carries `n_subcarriers` subcarriers of fake +/// I/Q data. Targets `target` (e.g. `127.0.0.1:3333`). +pub fn send_test_frames(target: &str, count: usize) -> anyhow::Result<()> { + use crate::parser::{build_test_frame, MAGIC_V1}; + let socket = UdpSocket::bind("0.0.0.0:0")?; + for i in 0..count { + let buf = build_test_frame(MAGIC_V1, (i % 4) as u8, 56, i); + socket.send_to(&buf, target)?; + std::thread::sleep(std::time::Duration::from_millis(10)); + } + Ok(()) +} + /// Get current pipeline output for fusion. pub fn get_pipeline_output(state: &Arc>) -> PipelineOutput { let st = state.lock().unwrap(); @@ -600,3 +621,43 @@ impl serde::Serialize for VitalSigns { st.end() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::parser::{build_test_frame, parse_adr018, MAGIC_V1}; + + fn seed_state_with_frames(state: &mut CsiPipelineState, n: usize) { + for i in 0..n { + let bytes = build_test_frame(MAGIC_V1, 1, 32, i); + let frame = parse_adr018(&bytes).expect("synthetic frame must parse"); + state.process_frame(frame); + } + } + + #[test] + fn set_light_level_toggles_night_mode() { + let mut s = CsiPipelineState::default(); + assert!(!s.is_dark, "default should be daylight"); + s.set_light_level(10.0); + assert!(s.is_dark, "luminance below 30 → dark"); + s.set_light_level(200.0); + assert!(!s.is_dark, "high luminance → not dark"); + } + + #[test] + fn record_fingerprint_stores_and_matches() { + let mut s = CsiPipelineState::default(); + seed_state_with_frames(&mut s, 30); + s.record_fingerprint("lab"); + assert_eq!(s.fingerprints.len(), 1); + assert_eq!(s.fingerprints[0].name, "lab"); + // Identify against its own fingerprint should succeed. + let found = s.identify_location(); + assert!(found.is_some(), "should identify the just-recorded location"); + if let Some((name, conf)) = found { + assert_eq!(name, "lab"); + assert!(conf > 0.7, "self-similarity should exceed match threshold"); + } + } +}