diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index b4af1348..26f05ffe 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -553,6 +553,21 @@ impl NodeState { /// `udp_receiver_task`. Uses `last_frame_time` as the previous-frame /// anchor; the first frame after init seeds the timer without producing /// a sample (no prior dt to measure). + /// ADR-110 iter 32 — apply a freshly-decoded sync packet to this node. + /// Overwrites `latest_sync` with the new packet and stamps + /// `latest_sync_at` so the staleness gate in `mesh_aligned_us_for_csi_frame` + /// can age it out after 9 s. Used by `udp_receiver_task` on every + /// successful magic-dispatched sync datagram; extracted so the dispatch + /// path is testable without spinning up the tokio UDP socket. + pub(crate) fn apply_sync_packet( + &mut self, + pkt: wifi_densepose_hardware::SyncPacket, + now: std::time::Instant, + ) { + self.latest_sync = Some(pkt); + self.latest_sync_at = Some(now); + } + /// ADR-110 iter 30 — pure snapshot of this node's mesh-sync state. /// Returns `None` when no sync packet has been observed. Used by both /// the WebSocket broadcaster (iter 23) and the REST handlers (iter 29); @@ -4390,10 +4405,10 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { sync.flags.smoothed_used, sync.sequence, sync.local_minus_epoch_us()); let mut s = state.write().await; - let ns = s.node_states.entry(sync.node_id) + let node_id = sync.node_id; + let ns = s.node_states.entry(node_id) .or_insert_with(NodeState::new); - ns.latest_sync = Some(sync); - ns.latest_sync_at = Some(std::time::Instant::now()); + ns.apply_sync_packet(sync, std::time::Instant::now()); continue; } Err(e) => { @@ -5885,6 +5900,48 @@ mod sync_snapshot_helper_tests { assert_eq!(snap.csi_fps_samples, 42); } + #[test] + fn apply_sync_packet_populates_a_fresh_node() { + // Mirrors what udp_receiver_task does on the very first sync + // packet from a previously-unseen node. + let mut ns = NodeState::new(); + assert!(ns.latest_sync.is_none()); + assert!(ns.latest_sync_at.is_none()); + + let now = std::time::Instant::now(); + ns.apply_sync_packet(populated_sync(9), now); + + let sync = ns.latest_sync.as_ref().expect("must be populated"); + assert_eq!(sync.node_id, 9); + assert_eq!(sync.sequence, 20); + // latest_sync_at must be exactly the Instant we passed (no clock skew). + assert_eq!(ns.latest_sync_at, Some(now)); + // sync_snapshot now produces a value (REST 200 OK path). + assert!(ns.sync_snapshot().is_some()); + } + + #[test] + fn apply_sync_packet_overwrites_older_data() { + // Subsequent packets must replace, not accumulate. Otherwise the + // §A0.10-smoothed offset would lag the latest beacon. + let mut ns = NodeState::new(); + let t0 = std::time::Instant::now(); + ns.apply_sync_packet(populated_sync(9), t0); + + // Second packet: same node, advanced sequence + offset. + let mut second = populated_sync(9); + second.sequence = 40; + second.local_us = 30_000_000; + second.epoch_us = 28_834_900; + let t1 = t0 + std::time::Duration::from_secs(2); + ns.apply_sync_packet(second, t1); + + let cur = ns.latest_sync.as_ref().unwrap(); + assert_eq!(cur.sequence, 40); // newer sequence persisted + assert_eq!(cur.local_us, 30_000_000); // newer local persisted + assert_eq!(ns.latest_sync_at, Some(t1)); // staleness clock reset + } + #[test] fn snapshot_reflects_leader_state() { // Same data shape that /api/v1/mesh emits for a leader node.