test+refactor(adr-110): NodeState::apply_sync_packet + 2 tests for the receive-side dispatch
Iter 32 — completes the helper-extraction discipline started in iter 30.
The iter 15 inline `ns.latest_sync = Some(sync); ns.latest_sync_at = ...`
was the LAST untested receive-side mutation; now it's a named method
with 2 tests covering its full state-transition surface.
Refactor:
Add `NodeState::apply_sync_packet(pkt, now)` taking an Instant so
the test can pass deterministic timing.
udp_receiver_task now calls the method instead of touching the
fields inline — one less place to break the staleness gate.
Tests (2 new — sync_snapshot_helper_tests module now at 5 tests):
apply_sync_packet_populates_a_fresh_node
Mirrors udp_receiver_task's first-packet-from-unknown-node path:
asserts latest_sync goes from None → Some, latest_sync_at matches
the passed Instant exactly (no clock skew from real Instant::now()),
and sync_snapshot() now returns Some (REST 200 OK path lit up).
apply_sync_packet_overwrites_older_data
Subsequent packets must replace, not accumulate. Asserts sequence,
local_us advance, and the staleness clock resets. This is what
keeps the §A0.10-smoothed offset tracking the latest beacon rather
than drifting with stale state.
cargo test sync_snapshot_helper → 5/5 green.
Branch-coord clean — no Cargo.toml / cli.rs touched.
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
473c5d11db
commit
8805c8ec0b
|
|
@ -553,6 +553,21 @@ impl NodeState {
|
|||
/// `udp_receiver_task`. Uses `last_frame_time` as the previous-frame
|
||||
/// anchor; the first frame after init seeds the timer without producing
|
||||
/// a sample (no prior dt to measure).
|
||||
/// ADR-110 iter 32 — apply a freshly-decoded sync packet to this node.
|
||||
/// Overwrites `latest_sync` with the new packet and stamps
|
||||
/// `latest_sync_at` so the staleness gate in `mesh_aligned_us_for_csi_frame`
|
||||
/// can age it out after 9 s. Used by `udp_receiver_task` on every
|
||||
/// successful magic-dispatched sync datagram; extracted so the dispatch
|
||||
/// path is testable without spinning up the tokio UDP socket.
|
||||
pub(crate) fn apply_sync_packet(
|
||||
&mut self,
|
||||
pkt: wifi_densepose_hardware::SyncPacket,
|
||||
now: std::time::Instant,
|
||||
) {
|
||||
self.latest_sync = Some(pkt);
|
||||
self.latest_sync_at = Some(now);
|
||||
}
|
||||
|
||||
/// ADR-110 iter 30 — pure snapshot of this node's mesh-sync state.
|
||||
/// Returns `None` when no sync packet has been observed. Used by both
|
||||
/// the WebSocket broadcaster (iter 23) and the REST handlers (iter 29);
|
||||
|
|
@ -4390,10 +4405,10 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
|||
sync.flags.smoothed_used, sync.sequence,
|
||||
sync.local_minus_epoch_us());
|
||||
let mut s = state.write().await;
|
||||
let ns = s.node_states.entry(sync.node_id)
|
||||
let node_id = sync.node_id;
|
||||
let ns = s.node_states.entry(node_id)
|
||||
.or_insert_with(NodeState::new);
|
||||
ns.latest_sync = Some(sync);
|
||||
ns.latest_sync_at = Some(std::time::Instant::now());
|
||||
ns.apply_sync_packet(sync, std::time::Instant::now());
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
|
|
@ -5885,6 +5900,48 @@ mod sync_snapshot_helper_tests {
|
|||
assert_eq!(snap.csi_fps_samples, 42);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn apply_sync_packet_populates_a_fresh_node() {
|
||||
// Mirrors what udp_receiver_task does on the very first sync
|
||||
// packet from a previously-unseen node.
|
||||
let mut ns = NodeState::new();
|
||||
assert!(ns.latest_sync.is_none());
|
||||
assert!(ns.latest_sync_at.is_none());
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
ns.apply_sync_packet(populated_sync(9), now);
|
||||
|
||||
let sync = ns.latest_sync.as_ref().expect("must be populated");
|
||||
assert_eq!(sync.node_id, 9);
|
||||
assert_eq!(sync.sequence, 20);
|
||||
// latest_sync_at must be exactly the Instant we passed (no clock skew).
|
||||
assert_eq!(ns.latest_sync_at, Some(now));
|
||||
// sync_snapshot now produces a value (REST 200 OK path).
|
||||
assert!(ns.sync_snapshot().is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn apply_sync_packet_overwrites_older_data() {
|
||||
// Subsequent packets must replace, not accumulate. Otherwise the
|
||||
// §A0.10-smoothed offset would lag the latest beacon.
|
||||
let mut ns = NodeState::new();
|
||||
let t0 = std::time::Instant::now();
|
||||
ns.apply_sync_packet(populated_sync(9), t0);
|
||||
|
||||
// Second packet: same node, advanced sequence + offset.
|
||||
let mut second = populated_sync(9);
|
||||
second.sequence = 40;
|
||||
second.local_us = 30_000_000;
|
||||
second.epoch_us = 28_834_900;
|
||||
let t1 = t0 + std::time::Duration::from_secs(2);
|
||||
ns.apply_sync_packet(second, t1);
|
||||
|
||||
let cur = ns.latest_sync.as_ref().unwrap();
|
||||
assert_eq!(cur.sequence, 40); // newer sequence persisted
|
||||
assert_eq!(cur.local_us, 30_000_000); // newer local persisted
|
||||
assert_eq!(ns.latest_sync_at, Some(t1)); // staleness clock reset
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_reflects_leader_state() {
|
||||
// Same data shape that /api/v1/mesh emits for a leader node.
|
||||
|
|
|
|||
Loading…
Reference in New Issue