From 8805c8ec0b47b71f4e7b626eb662703618d9c97a Mon Sep 17 00:00:00 2001 From: ruv Date: Sat, 23 May 2026 14:44:25 -0400 Subject: [PATCH] test+refactor(adr-110): NodeState::apply_sync_packet + 2 tests for the receive-side dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 32 — completes the helper-extraction discipline started in iter 30. The iter 15 inline `ns.latest_sync = Some(sync); ns.latest_sync_at = ...` was the LAST untested receive-side mutation; now it's a named method with 2 tests covering its full state-transition surface. Refactor: Add `NodeState::apply_sync_packet(pkt, now)` taking an Instant so the test can pass deterministic timing. udp_receiver_task now calls the method instead of touching the fields inline — one less place to break the staleness gate. Tests (2 new — sync_snapshot_helper_tests module now at 5 tests): apply_sync_packet_populates_a_fresh_node Mirrors udp_receiver_task's first-packet-from-unknown-node path: asserts latest_sync goes from None → Some, latest_sync_at matches the passed Instant exactly (no clock skew from real Instant::now()), and sync_snapshot() now returns Some (REST 200 OK path lit up). apply_sync_packet_overwrites_older_data Subsequent packets must replace, not accumulate. Asserts sequence, local_us advance, and the staleness clock resets. This is what keeps the §A0.10-smoothed offset tracking the latest beacon rather than drifting with stale state. cargo test sync_snapshot_helper → 5/5 green. Branch-coord clean — no Cargo.toml / cli.rs touched. Co-Authored-By: claude-flow --- .../wifi-densepose-sensing-server/src/main.rs | 63 ++++++++++++++++++- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index b4af1348..26f05ffe 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -553,6 +553,21 @@ impl NodeState { /// `udp_receiver_task`. Uses `last_frame_time` as the previous-frame /// anchor; the first frame after init seeds the timer without producing /// a sample (no prior dt to measure). + /// ADR-110 iter 32 — apply a freshly-decoded sync packet to this node. + /// Overwrites `latest_sync` with the new packet and stamps + /// `latest_sync_at` so the staleness gate in `mesh_aligned_us_for_csi_frame` + /// can age it out after 9 s. Used by `udp_receiver_task` on every + /// successful magic-dispatched sync datagram; extracted so the dispatch + /// path is testable without spinning up the tokio UDP socket. + pub(crate) fn apply_sync_packet( + &mut self, + pkt: wifi_densepose_hardware::SyncPacket, + now: std::time::Instant, + ) { + self.latest_sync = Some(pkt); + self.latest_sync_at = Some(now); + } + /// ADR-110 iter 30 — pure snapshot of this node's mesh-sync state. /// Returns `None` when no sync packet has been observed. Used by both /// the WebSocket broadcaster (iter 23) and the REST handlers (iter 29); @@ -4390,10 +4405,10 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { sync.flags.smoothed_used, sync.sequence, sync.local_minus_epoch_us()); let mut s = state.write().await; - let ns = s.node_states.entry(sync.node_id) + let node_id = sync.node_id; + let ns = s.node_states.entry(node_id) .or_insert_with(NodeState::new); - ns.latest_sync = Some(sync); - ns.latest_sync_at = Some(std::time::Instant::now()); + ns.apply_sync_packet(sync, std::time::Instant::now()); continue; } Err(e) => { @@ -5885,6 +5900,48 @@ mod sync_snapshot_helper_tests { assert_eq!(snap.csi_fps_samples, 42); } + #[test] + fn apply_sync_packet_populates_a_fresh_node() { + // Mirrors what udp_receiver_task does on the very first sync + // packet from a previously-unseen node. + let mut ns = NodeState::new(); + assert!(ns.latest_sync.is_none()); + assert!(ns.latest_sync_at.is_none()); + + let now = std::time::Instant::now(); + ns.apply_sync_packet(populated_sync(9), now); + + let sync = ns.latest_sync.as_ref().expect("must be populated"); + assert_eq!(sync.node_id, 9); + assert_eq!(sync.sequence, 20); + // latest_sync_at must be exactly the Instant we passed (no clock skew). + assert_eq!(ns.latest_sync_at, Some(now)); + // sync_snapshot now produces a value (REST 200 OK path). + assert!(ns.sync_snapshot().is_some()); + } + + #[test] + fn apply_sync_packet_overwrites_older_data() { + // Subsequent packets must replace, not accumulate. Otherwise the + // §A0.10-smoothed offset would lag the latest beacon. + let mut ns = NodeState::new(); + let t0 = std::time::Instant::now(); + ns.apply_sync_packet(populated_sync(9), t0); + + // Second packet: same node, advanced sequence + offset. + let mut second = populated_sync(9); + second.sequence = 40; + second.local_us = 30_000_000; + second.epoch_us = 28_834_900; + let t1 = t0 + std::time::Duration::from_secs(2); + ns.apply_sync_packet(second, t1); + + let cur = ns.latest_sync.as_ref().unwrap(); + assert_eq!(cur.sequence, 40); // newer sequence persisted + assert_eq!(cur.local_us, 30_000_000); // newer local persisted + assert_eq!(ns.latest_sync_at, Some(t1)); // staleness clock reset + } + #[test] fn snapshot_reflects_leader_state() { // Same data shape that /api/v1/mesh emits for a leader node.