diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs index afd553c8..7bbaa427 100644 --- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs +++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs @@ -299,6 +299,8 @@ struct NodeState { latest_vitals: VitalSigns, last_frame_time: Option, edge_vitals: Option, + /// Latest extracted features for cross-node fusion. + latest_features: Option, // ── RuVector Phase 2: Temporal smoothing & coherence gating ── /// Previous frame's smoothed keypoint positions for EMA temporal smoothing. prev_keypoints: Option>, @@ -344,6 +346,7 @@ impl NodeState { latest_vitals: VitalSigns::default(), last_frame_time: None, edge_vitals: None, + latest_features: None, prev_keypoints: None, motion_energy_history: VecDeque::with_capacity(COHERENCE_WINDOW), coherence_score: 1.0, // assume stable initially @@ -1988,6 +1991,61 @@ async fn latest(State(state): State) -> Json { /// with a stride-swing pattern applied to arms and legs. // ── Multi-person estimation (issue #97) ────────────────────────────────────── +/// Fuse features across all active nodes for higher SNR. +/// +/// When multiple ESP32 nodes observe the same room, their CSI features +/// can be combined: +/// - Variance: use max (most sensitive node dominates) +/// - Motion/breathing/spectral power: weighted average by RSSI (closer node = higher weight) +/// - Dominant frequency: weighted average +/// - Change points: keep current node's value (not meaningful to average) +/// - Mean RSSI: use max (best signal) +fn fuse_multi_node_features( + current_features: &FeatureInfo, + node_states: &HashMap, +) -> FeatureInfo { + let now = std::time::Instant::now(); + let active: Vec<(&FeatureInfo, f64)> = node_states.values() + .filter(|ns| ns.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) + .filter_map(|ns| { + let feat = ns.latest_features.as_ref()?; + let rssi = ns.rssi_history.back().copied().unwrap_or(-80.0); + Some((feat, rssi)) + }) + .collect(); + + if active.len() <= 1 { + return current_features.clone(); + } + + // RSSI-based weights: higher RSSI = closer to person = more weight. + // Map RSSI relative to best node into [0.1, 1.0]. + let max_rssi = active.iter().map(|(_, r)| *r).fold(f64::NEG_INFINITY, f64::max); + let weights: Vec = active.iter() + .map(|(_, r)| (1.0 + (r - max_rssi + 20.0) / 20.0).clamp(0.1, 1.0)) + .collect(); + let w_sum: f64 = weights.iter().sum::().max(1e-9); + + FeatureInfo { + // Weighted average variance (not max — max inflates person score + // and causes count flips between 1↔2 persons). + variance: active.iter().zip(&weights) + .map(|((f, _), w)| f.variance * w).sum::() / w_sum, + // Weighted average for motion/breathing/spectral + motion_band_power: active.iter().zip(&weights) + .map(|((f, _), w)| f.motion_band_power * w).sum::() / w_sum, + breathing_band_power: active.iter().zip(&weights) + .map(|((f, _), w)| f.breathing_band_power * w).sum::() / w_sum, + spectral_power: active.iter().zip(&weights) + .map(|((f, _), w)| f.spectral_power * w).sum::() / w_sum, + dominant_freq_hz: active.iter().zip(&weights) + .map(|((f, _), w)| f.dominant_freq_hz * w).sum::() / w_sum, + change_points: current_features.change_points, // keep current node's value + // Best RSSI across nodes + mean_rssi: active.iter().map(|(f, _)| f.mean_rssi).fold(f64::NEG_INFINITY, f64::max), + } +} + /// Estimate person count from CSI features using a weighted composite heuristic. /// /// Single ESP32 link limitations: variance-based detection can reliably detect @@ -3248,13 +3306,31 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { change_points: 0, spectral_power: vitals.motion_energy as f64, }; - let classification = ClassificationInfo { + + // Store latest features on node for cross-node fusion. + s.node_states.get_mut(&node_id) + .map(|ns| ns.latest_features = Some(features.clone())); + + // Cross-node fusion: combine features from all active nodes. + let fused_features = fuse_multi_node_features(&features, &s.node_states); + + let mut classification = ClassificationInfo { motion_level: motion_level.to_string(), presence: vitals.presence, confidence: vitals.presence_score as f64, }; + + // Boost classification confidence with multi-node coverage. + let n_active = s.node_states.values() + .filter(|ns| ns.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) + .count(); + if n_active > 1 { + classification.confidence = (classification.confidence + * (1.0 + 0.15 * (n_active as f64 - 1.0))).clamp(0.0, 1.0); + } + let signal_field = generate_signal_field( - vitals.rssi as f64, motion_score, vitals.breathing_rate_bpm / 60.0, + fused_features.mean_rssi, motion_score, vitals.breathing_rate_bpm / 60.0, (vitals.presence_score as f64).min(1.0), &[], ); @@ -3264,7 +3340,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { source: "esp32".to_string(), tick, nodes: active_nodes, - features: features.clone(), + features: fused_features.clone(), classification, signal_field, vital_signs: Some(VitalSigns { @@ -3398,7 +3474,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { ns.latest_vitals = vitals.clone(); let raw_score = compute_person_score(&features); - ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10; + // Slower EMA (0.05) for person score to prevent count flips + // from frame-to-frame variance oscillation in fused features. + ns.smoothed_person_score = ns.smoothed_person_score * 0.95 + raw_score * 0.05; if classification.presence { let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count); ns.prev_person_count = count; @@ -3406,6 +3484,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { ns.prev_person_count = 0; } + // Store latest features on node for cross-node fusion. + ns.latest_features = Some(features.clone()); + // Done with per-node mutable borrow; now read aggregated // state from all nodes (the borrow of `ns` ends here). // (We re-borrow node_states immutably via `s` below.) @@ -3416,6 +3497,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { } s.latest_vitals = vitals.clone(); + // Cross-node fusion: combine features from all active nodes. + let fused_features = fuse_multi_node_features(&features, &s.node_states); + s.tick += 1; let tick = s.tick; @@ -3433,6 +3517,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { .max() .unwrap_or(0); + // Boost classification confidence with multi-node coverage. + let n_active = s.node_states.values() + .filter(|ns| ns.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) + .count(); + if n_active > 1 { + classification.confidence = (classification.confidence + * (1.0 + 0.15 * (n_active as f64 - 1.0))).clamp(0.0, 1.0); + } + // Build nodes array with all active nodes. let active_nodes: Vec = s.node_states.iter() .filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10)) @@ -3453,11 +3546,11 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { source: "esp32".to_string(), tick, nodes: active_nodes, - features: features.clone(), + features: fused_features.clone(), classification, signal_field: generate_signal_field( - features.mean_rssi, motion_score, breathing_rate_hz, - features.variance.min(1.0), &sub_variances, + fused_features.mean_rssi, motion_score, breathing_rate_hz, + fused_features.variance.min(1.0), &sub_variances, ), vital_signs: Some(vitals), enhanced_motion: None,