fix(homecore core): TOCTOU race dropped/reordered state_changed events under concurrent writers (~93k→0) + 2 fail-closed hardenings (#1087)

* fix(homecore): atomic state set — close TOCTOU lost/reordered state_changed events

StateMachine::set did get() (release shard lock) → compute next + no-op
decision → insert() (re-acquire lock) → send(). The read-modify-write was
not atomic w.r.t. a concurrent writer on the same entity: a writer that
read a stale `old` could mis-classify a real transition as a no-op and drop
its state_changed event (a missed automation trigger) or fire an event whose
new_state duplicated the previously delivered one (a spurious trigger for any
automation keyed on old_state != new_state). ADR-127 §2.1 promises "writer
atomically replaces the map entry"; the implementation did not.

Fix: hold the DashMap shard write-lock across the whole read→decide→insert→
fire sequence via entry()/insert_entry(). tx.send is non-blocking, non-async,
and never re-enters the map, so firing under the shard lock cannot deadlock
and keeps global event order in lock-step with global commit order.

Pinned by concurrent_set_fires_no_duplicate_adjacent_events: 4 writers
toggling one entity A/B; asserts no two consecutive fired events carry the
same new_state (impossible under correct serialisation). Fails reliably on
the old code (~365-476 duplicate-adjacent events on the first trial), passes
on the fix across repeated runs.

Co-Authored-By: claude-flow <ruv@ruv.net>

* harden(homecore): bound entity_id length — close memory-DoS at the REST boundary

homecore-api/src/rest.rs parses untrusted path segments straight through
EntityId::parse (get/delete/set_state). With no length cap, an otherwise-valid
id like "a." + many MB of [a-z0-9_] was accepted; a POST /api/states/<giant>
would persist it into the DashMap state store, permanently growing memory
(amplification across distinct ids).

Fix: reject ids longer than MAX_ENTITY_ID_LEN (255, HA-compatible) up front in
parse(), before any per-char scan, with a new EntityIdError::TooLong. Fails
closed at the boundary type so every caller (REST, registry deserialize,
automation) is protected.

Pinned by entity_id_length_boundary: exactly-MAX accepted, MAX+1 rejected,
4 MiB id rejected as TooLong. Fails on old code (oversized parses Ok).

Co-Authored-By: claude-flow <ruv@ruv.net>

* harden(homecore): isolate panicking service handlers (catch_unwind)

ServiceRegistry::call already ran handlers outside the registry lock (the
Arc<dyn ServiceHandler> is cloned out of the read guard first), so a panic
could never poison the RwLock or block other callers — good. But a panicking
handler unwound through call() into the caller's task; the task driving the
engine (e.g. an axum request handler invoking a service) could be aborted by
one buggy integration.

Fix: wrap the handler future in AssertUnwindSafe + FutureExt::catch_unwind and
convert a panic into ServiceError::HandlerPanicked. Mirrors HA isolating
service-handler exceptions. The registry stays fully usable afterwards.

Pinned by panicking_handler_is_isolated_and_registry_survives: the panicking
call returns HandlerPanicked (not an unwind), a sibling healthy service still
returns its value, and the bad service remains registered. Fails on old code
(the await point panics instead of returning Err).

Co-Authored-By: claude-flow <ruv@ruv.net>

* test(homecore): pin event-bus lag safety (bounded broadcast, no DoS)

Documents-with-evidence that the core EventBus does NOT have the homecore-api
WS broadcast-lag failure: with EVENT_CHANNEL_CAPACITY=4096, firing 3x capacity
while a subscriber never drains keeps fire_* non-blocking (publisher never
waits on slow receivers), gives the slow receiver a recoverable Lagged(n)
(drop-oldest + re-sync) rather than a closed channel, and leaves the bus live
for a fresh fast subscriber. No code change — pins the clean dimension.

Co-Authored-By: claude-flow <ruv@ruv.net>

* docs(homecore): record ADR-127 §9 security+concurrency review + CHANGELOG

Documents the three pinned fixes (HC-RACE-01 state-set TOCTOU, HC-EID-LEN-01
entity_id memory-DoS, HC-SVC-PANIC-01 service-handler isolation) and the
clean dimensions (bounded event-bus lag handling, lock discipline / no
lock-across-await, no panic-on-input) with their evidence.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
rUv 2026-06-14 22:28:05 -04:00 committed by GitHub
parent 9b126e927e
commit bf1dfe79fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 439 additions and 5 deletions

File diff suppressed because one or more lines are too long

View File

@ -190,4 +190,78 @@ The entity registry is a `RwLock<HashMap<EntityId, EntityEntry>>` backed by an a
- `v2/crates/wifi-densepose-sensing-server/src/main.rs` — Axum + Tokio architecture pattern used throughout the existing server stack
- `docs/adr/ADR-126-ruview-native-ha-port-master.md` — HOMECORE master; §5.5 crate naming; §6 compatibility contract; §5.1 RUVIEW-POLICY
---
## 9. Security & concurrency review (P1 core, beyond-SOTA sweep)
Foundational review of the `homecore` crate — the state store + event bus +
service/entity registries every other HOMECORE module trusts. Same rigor as
the ADR-129/130/132/133/161 sibling reviews. **Three real fixes (one
concurrency, two hardening), each pinned by a fails-on-old test; the bus-lag
and lock-discipline dimensions confirmed clean with evidence.**
- **HC-RACE-01 (state-set TOCTOU — lost / reordered `state_changed`, the
crux). FIXED.** `StateMachine::set` did `get()` (releasing the DashMap
shard lock) → compute the next snapshot + the no-op / `last_changed`
decision → `insert()` (re-acquiring the lock) → `send()`. The
read-modify-write was **not atomic** w.r.t. a concurrent writer on the
same entity, contradicting §2.1's promise that "the writer atomically
replaces the map entry." A writer that read a stale `old` could
mis-classify a genuine transition as a no-op and **drop its
`state_changed` event** (a missed automation trigger) or fire an event
whose `new_state` duplicated the previously delivered one (a spurious
trigger for any automation keyed on `old_state != new_state`). **Fix:**
hold the shard write-lock across the entire read→decide→insert→fire
sequence via `entry()`/`insert_entry()`; `tx.send` is non-blocking,
non-async, and never re-enters the map, so firing under the shard lock
cannot deadlock and keeps global event order in lock-step with global
commit order. Pinned by `concurrent_set_fires_no_duplicate_adjacent_events`
(4 writers toggling one entity A/B; asserts no two consecutive fired
events carry an identical `new_state` — impossible under correct
serialisation; a probe observed ~93k such duplicate-adjacent events across
200 trials on the racy code, zero on the fix).
- **HC-EID-LEN-01 (unbounded `entity_id` — memory-DoS at the REST boundary).
FIXED.** `homecore-api/src/rest.rs` parses untrusted path segments
straight through `EntityId::parse`; with no length cap, an
otherwise-valid id (`a.` + many MB of `[a-z0-9_]`) was accepted and a
`POST /api/states/<giant>` would persist it into the DashMap state store
(permanent growth across distinct ids). **Fix:** reject ids longer than
`MAX_ENTITY_ID_LEN` (255, HA-compatible) up front in `parse()`, before any
per-char scan, with a new `EntityIdError::TooLong`; fail-closed at the
boundary type protects every caller. Pinned by `entity_id_length_boundary`
(exactly-MAX accepted, MAX+1 and a 4 MiB id rejected — fails on old code).
- **HC-SVC-PANIC-01 (service-handler panic not isolated). HARDENED.**
`ServiceRegistry::call` already ran handlers outside the registry lock (no
`RwLock` poisoning, no blocking of other callers — clean), but a
panicking handler unwound through `call()` into the caller's task. **Fix:**
wrap the handler future in `AssertUnwindSafe` + `catch_unwind`, converting
a panic to `ServiceError::HandlerPanicked`; the registry stays fully
usable. Pinned by `panicking_handler_is_isolated_and_registry_survives`.
**Dimensions confirmed clean (with evidence):**
- **Event-bus bounds / lag (same class as the homecore-api WS lag-DoS).**
Both `StateMachine` and `EventBus` use bounded `tokio::sync::broadcast`
(capacity 4,096). A slow subscriber gets a recoverable `Lagged(n)`
(drop-oldest + re-sync); `fire_*` is non-blocking and **never waits on
slow receivers**, so a lagging subscriber cannot block the publisher, grow
the channel without bound, or take down a fast subscriber. Evidenced by
`slow_subscriber_does_not_block_publisher_or_kill_the_bus` (fire 3×
capacity at an idle subscriber; publisher unblocked, bus stays live).
- **Lock ordering / lock-across-await (deadlock).** No code path holds two
of `{state DashMap, registry RwLock, service RwLock}` simultaneously, so
no inconsistent-ordering deadlock can exist. Every `tokio::sync::RwLock`
guard in `registry.rs`/`service.rs` is used in a single synchronous
statement and dropped before any `.await`; `call` explicitly scopes the
read guard out before awaiting the handler. The only guard held across a
send is the DashMap shard lock in `set`, across a synchronous
(non-await) broadcast send — safe.
- **Panic-on-input.** No reachable `unwrap`/`expect`/index in non-test code
beyond the safe `send().unwrap_or(0)` and the dead-but-harmless
`split_once(...).unwrap_or(...)` fallbacks on already-validated ids.
`cargo test -p homecore --no-default-features`: **20 → 24 passed, 0 failed**
(+4 pins). Workspace green; Python deterministic proof unchanged
(`f8e76f21…46f7a`, bit-exact — `homecore` is off the signal proof path).
- `docs/adr/ADR-028-esp32-capability-audit.md` — witness chain pattern (Ed25519 per state transition)

View File

@ -87,4 +87,64 @@ mod tests {
assert_eq!(event.event_type, "ruview_csi_frame");
assert_eq!(event.event_data["frame_id"], 42);
}
/// Bus-lag safety (same failure class as the homecore-api WS
/// broadcast-lag DoS, here on the core bus): a subscriber that never
/// drains must NOT block the publisher, must NOT make the channel grow
/// without bound, and must NOT take down a healthy fast subscriber. The
/// bounded `tokio::sync::broadcast` gives the slow receiver a recoverable
/// `Lagged(n)` (drop-oldest, re-sync) while `fire_*` stays non-blocking.
///
/// Evidence: with EVENT_CHANNEL_CAPACITY = 4096 we fire 3× capacity
/// while a slow subscriber sits idle. Every `fire_domain` returns
/// promptly (publisher never blocked); the slow receiver observes
/// `Lagged` then re-syncs to live events; the fast receiver — created
/// after the flood and kept drained — receives all subsequent events
/// with no loss. The bus stays live throughout.
#[tokio::test]
async fn slow_subscriber_does_not_block_publisher_or_kill_the_bus() {
use tokio::sync::broadcast::error::TryRecvError;
let bus = EventBus::new();
// Slow subscriber: subscribes, then never drains during the flood.
let mut slow = bus.subscribe_domain();
// Publisher fires 3× capacity. None of these may block.
let total = EVENT_CHANNEL_CAPACITY * 3;
for i in 0..total {
// Returns the receiver count (>=1 here); the point is it
// returns AT ALL without awaiting the slow receiver.
let _ = bus.fire_domain(DomainEvent::new(
"flood",
serde_json::json!({ "i": i }),
Context::new(),
));
}
// The slow receiver is forced past capacity → recoverable Lagged,
// NOT a closed channel and NOT a hang.
let mut saw_lagged = false;
loop {
match slow.try_recv() {
Ok(_) => {}
Err(TryRecvError::Lagged(n)) => {
assert!(n > 0);
saw_lagged = true;
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Closed) => panic!("bus closed — must stay live"),
}
}
assert!(saw_lagged, "slow subscriber should have lagged, not blocked the bus");
// The bus is still live: a fresh fast subscriber receives new events.
let mut fast = bus.subscribe_domain();
bus.fire_domain(DomainEvent::new("live", serde_json::json!({"ok": true}), Context::new()));
let evt = fast.recv().await.unwrap();
assert_eq!(evt.event_type, "live");
// And the lagged subscriber recovers (re-syncs) to live events too.
let evt2 = slow.recv().await.unwrap();
assert_eq!(evt2.event_type, "live");
}
}

View File

@ -42,12 +42,30 @@ impl<'de> Deserialize<'de> for EntityId {
}
}
/// Maximum accepted `entity_id` length in bytes. Mirrors Home Assistant's
/// practical cap (`MAX_LENGTH_STATE_*` family — 255). The state machine and
/// entity/registry maps are keyed on `EntityId`, and the REST layer
/// (`homecore-api`) parses untrusted path segments straight through
/// [`EntityId::parse`]; an unbounded id would let a single `POST
/// /api/states/<giant>` permanently grow the state map (memory DoS). We
/// fail closed at the boundary instead.
pub const MAX_ENTITY_ID_LEN: usize = 255;
impl EntityId {
/// Validates and constructs an `EntityId`. Returns
/// [`EntityIdError`] if the input is not `domain.name` shape with
/// ASCII lowercase / digits / underscore in each segment.
/// ASCII lowercase / digits / underscore in each segment, or if it
/// exceeds [`MAX_ENTITY_ID_LEN`] bytes.
pub fn parse(s: impl Into<String>) -> Result<Self, EntityIdError> {
let s: String = s.into();
// Bound the length BEFORE any further work so an oversized input is
// cheap to reject (no per-char scan of megabytes).
if s.len() > MAX_ENTITY_ID_LEN {
return Err(EntityIdError::TooLong {
len: s.len(),
max: MAX_ENTITY_ID_LEN,
});
}
let (domain, name) = s
.split_once('.')
.ok_or_else(|| EntityIdError::MissingDot(s.clone()))?;
@ -111,6 +129,8 @@ pub enum EntityIdError {
EmptyName(String),
#[error("entity_id {entity_id:?} contains invalid character {ch:?} — only [a-z0-9_] allowed (HA-compat ASCII subset; see ADR-127 §Q1)")]
InvalidChar { entity_id: String, ch: char },
#[error("entity_id is {len} bytes, exceeding the {max}-byte limit")]
TooLong { len: usize, max: usize },
}
/// Immutable state snapshot for one entity at one moment in time.
@ -217,6 +237,39 @@ mod tests {
assert!(EntityId::parse("light.küche").is_err());
}
#[test]
fn entity_id_length_boundary() {
// The REST layer parses untrusted path segments straight through
// `parse`; an unbounded id is a memory-DoS vector (a `POST
// /api/states/<giant>` permanently grows the state map). Cap at
// MAX_ENTITY_ID_LEN, fail closed above it.
//
// Construct "sensor." (7 bytes) + N name bytes == exactly MAX.
let prefix = "sensor.";
let name_len = MAX_ENTITY_ID_LEN - prefix.len();
let at_max = format!("{prefix}{}", "a".repeat(name_len));
assert_eq!(at_max.len(), MAX_ENTITY_ID_LEN);
assert!(
EntityId::parse(at_max.clone()).is_ok(),
"an id of exactly MAX_ENTITY_ID_LEN bytes must be accepted"
);
let over = format!("{at_max}a"); // MAX + 1
assert!(matches!(
EntityId::parse(over),
Err(EntityIdError::TooLong { .. })
));
// A multi-megabyte, otherwise-valid id is rejected cheaply rather
// than persisted.
let huge = format!("sensor.{}", "a".repeat(4 * 1024 * 1024));
assert!(matches!(
EntityId::parse(huge),
Err(EntityIdError::TooLong { len, max })
if max == MAX_ENTITY_ID_LEN && len > MAX_ENTITY_ID_LEN
));
}
#[test]
fn state_next_preserves_last_changed_when_state_unchanged() {
let id = EntityId::parse("sensor.temp").unwrap();

View File

@ -49,6 +49,8 @@ pub enum ServiceError {
NotRegistered { domain: String, service: String },
#[error("service handler returned error: {0}")]
HandlerFailed(String),
#[error("service handler panicked: {0}")]
HandlerPanicked(String),
}
/// Handler trait. Integration code implements this and registers via
@ -99,13 +101,29 @@ impl ServiceRegistry {
/// Call a service. P1 direct dispatch; P2 routes through the
/// event bus per ADR-127 §2.3.
///
/// The handler runs **outside** the registry lock (we clone the
/// `Arc<dyn ServiceHandler>` out of the read guard first), so a slow or
/// panicking handler can never poison the `RwLock` or block other
/// callers. A panic inside the handler is additionally caught and
/// converted to [`ServiceError::HandlerPanicked`] rather than unwinding
/// into the caller's task — one buggy integration cannot abort the task
/// that drives the engine. Mirrors HA isolating service-handler
/// exceptions.
pub async fn call(&self, call: ServiceCall) -> Result<serde_json::Value, ServiceError> {
let handler = {
let guard = self.handlers.read().await;
guard.get(&call.name).cloned()
};
match handler {
Some(h) => h.call(call).await,
Some(h) => {
use futures::FutureExt;
let fut = std::panic::AssertUnwindSafe(h.call(call));
match fut.catch_unwind().await {
Ok(result) => result,
Err(panic) => Err(ServiceError::HandlerPanicked(panic_message(panic))),
}
}
None => Err(ServiceError::NotRegistered {
domain: call.name.domain.clone(),
service: call.name.service.clone(),
@ -124,6 +142,19 @@ impl Default for ServiceRegistry {
}
}
/// Best-effort extraction of a panic payload's message for
/// [`ServiceError::HandlerPanicked`]. Panic payloads are usually `&str`
/// or `String`; anything else collapses to a generic label.
fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"<non-string panic payload>".to_string()
}
}
// Suppress unused-import warning when no consumer of Pin/Box uses them yet
#[allow(dead_code)]
type _UnusedFutureType = Pin<Box<dyn Future<Output = ()> + Send>>;
@ -167,4 +198,56 @@ mod tests {
.unwrap_err();
assert!(matches!(err, ServiceError::NotRegistered { .. }));
}
/// Service isolation: a panicking handler must be contained — converted
/// to `HandlerPanicked` rather than unwinding into the caller's task —
/// and the registry must remain fully usable afterwards (no poisoned
/// lock, other services still callable). On the pre-fix code the panic
/// unwinds through `call`, so the `catch_unwind`-based assertion below
/// fails (the await point panics instead of returning an `Err`).
#[tokio::test]
async fn panicking_handler_is_isolated_and_registry_survives() {
let reg = ServiceRegistry::new();
reg.register(
ServiceName::new("bad", "boom"),
FnHandler(|_call: ServiceCall| async move {
panic!("handler exploded");
#[allow(unreachable_code)]
Ok(serde_json::json!(null))
}),
)
.await;
reg.register(
ServiceName::new("good", "ping"),
FnHandler(|_call: ServiceCall| async move { Ok(serde_json::json!("pong")) }),
)
.await;
// The panicking call returns an error, not an unwind.
let err = reg
.call(ServiceCall {
name: ServiceName::new("bad", "boom"),
data: serde_json::json!({}),
context: Context::new(),
})
.await
.unwrap_err();
assert!(
matches!(err, ServiceError::HandlerPanicked(ref m) if m.contains("handler exploded")),
"expected HandlerPanicked, got {err:?}",
);
// The registry is not poisoned: a healthy service still works, and
// the bad service is still registered (call path, not lock, failed).
let ok = reg
.call(ServiceCall {
name: ServiceName::new("good", "ping"),
data: serde_json::json!({}),
context: Context::new(),
})
.await
.unwrap();
assert_eq!(ok, serde_json::json!("pong"));
assert!(reg.has(&ServiceName::new("bad", "boom")).await);
}
}

View File

@ -80,11 +80,37 @@ impl StateMachine {
context: Context,
) -> Arc<State> {
let new_state_str = new_state.into();
let old = self.inner.states.get(&entity_id).map(|r| Arc::clone(&*r));
// Hold the DashMap shard write-lock across the entire
// read→decide→insert→fire sequence. `entry()` locks the shard for
// the lifetime of `slot`, so a concurrent writer on the same entity
// cannot interleave between our read of `old` and our commit. This
// is what makes the write atomic as ADR-127 §2.1 promises ("writer
// atomically replaces the map entry") — the previous get→insert pair
// released the lock in between, a TOCTOU that let concurrent writers
// compute the no-op / `last_changed` decision off a stale `old` and
// drop or reorder real `state_changed` events.
//
// `tx.send` is non-blocking, non-async, and never re-enters the map,
// so firing under the lock cannot deadlock and keeps the global
// event order in lock-step with the global commit order.
use dashmap::mapref::entry::Entry;
let slot = self.inner.states.entry(entity_id.clone());
let old: Option<Arc<State>> = match &slot {
Entry::Occupied(o) => Some(Arc::clone(o.get())),
Entry::Vacant(_) => None,
};
// `slot` continues to hold the shard write-lock below.
let next = match &old {
Some(prev) => Arc::new(prev.next(new_state_str.clone(), attributes.clone(), context)),
None => Arc::new(State::new(entity_id.clone(), new_state_str.clone(), attributes.clone(), context)),
None => Arc::new(State::new(
entity_id.clone(),
new_state_str.clone(),
attributes.clone(),
context,
)),
};
// HA suppresses no-op writes (same state + same attributes).
@ -94,7 +120,12 @@ impl StateMachine {
None => false,
};
self.inner.states.insert(entity_id.clone(), Arc::clone(&next));
// Commit through the same locked entry and KEEP the shard guard
// alive across the broadcast `send`, so the event is published
// before any concurrent writer on this entity can observe the new
// value and fire its own event. This makes global event order match
// global commit order (no insert/send reorder window).
let _guard = slot.insert_entry(Arc::clone(&next));
if !is_noop {
let event = StateChangedEvent {
@ -106,6 +137,7 @@ impl StateMachine {
// err = no receivers; that's fine, write still committed.
let _ = self.inner.tx.send(event);
}
// `_guard` (and the shard lock) drops here, after the event is sent.
next
}
@ -218,4 +250,135 @@ mod tests {
assert!(evt.new_state.is_none());
assert!(evt.old_state.is_some());
}
/// Concurrency invariant (ADR-127 §2.1 "writer atomically replaces the
/// map entry"): under concurrent writers on the SAME entity the fired
/// `state_changed` stream must be a faithful, gap-free log of the
/// committed transitions — in particular the LAST event the bus
/// delivers must carry the SAME value that is finally committed in the
/// map.
///
/// This pins the TOCTOU in `set`: it does `get` (release shard lock) →
/// compute `next` + no-op decision → `insert` (re-acquire shard lock) →
/// `send`. Because the insert and the send are not atomic with respect
/// to a concurrent writer, two writers can interleave as
/// `insert(A); insert(B); send(B); send(A)` — leaving the map holding A
/// while the last event the bus ever delivers says B. A subscriber that
/// trusts "the last event reflects current state" (the recorder, the WS
/// push API, an automation engine) is then permanently wrong about the
/// entity until the next write. A correctly-locked store holds the shard
/// lock across read→insert→send so the global event order matches the
/// global commit order.
///
/// A dedicated drain thread pulls events as they arrive so the bounded
/// channel never lags during the run (a `Lagged` here would be a test
/// artefact, not the bug under test).
///
/// The writers toggle the SAME entity between exactly two values so the
/// no-op suppression branch is constantly in play.
///
/// Invariant: in correctly serialised code, two *consecutive* fired
/// `state_changed` events can never carry the same `new_state` value.
/// Proof: event k fires only for a committed transition old≠new, so its
/// `new_state` = X differs from the value before it; the next committed
/// transition therefore starts at X and (being a real change) commits
/// some Z≠X, so event k+1 carries Z≠X. A no-op (X→X) is suppressed and
/// never fires. Therefore adjacent fired events always differ.
///
/// The `set()` TOCTOU breaks this: it does `get` (release shard lock) →
/// compute `next` + the no-op decision → `insert` (re-acquire shard
/// lock) → `send`, all non-atomically. A writer that read a STALE `old`
/// mis-classifies a genuine transition as a no-op (dropping that real
/// event — a missed automation trigger) and/or fires an event whose
/// `new_state` duplicates the previously delivered one (a spurious
/// trigger for any automation keyed on `old_state != new_state`). The
/// probe behind this test observed ~93k such duplicate-adjacent events
/// across 200 trials on the racy code; the corrected store produces
/// zero.
#[test]
fn concurrent_set_fires_no_duplicate_adjacent_events() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Barrier, Mutex};
const WRITERS: usize = 4;
const ITERS: usize = 300; // 1200 events ≪ 4096 capacity → never lags
for _trial in 0..40 {
let sm = StateMachine::new();
let eid = id("light.race");
sm.set(eid.clone(), "A", serde_json::json!({}), Context::new());
let mut rx = sm.subscribe();
let done = Arc::new(AtomicBool::new(false));
// Event log: new_state value in delivery order.
let log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let drainer = {
let done = Arc::clone(&done);
let log = Arc::clone(&log);
std::thread::spawn(move || loop {
match rx.try_recv() {
Ok(evt) => {
if let Some(ns) = &evt.new_state {
log.lock().unwrap().push(ns.state.clone());
}
}
Err(broadcast::error::TryRecvError::Empty) => {
if done.load(Ordering::Acquire) {
while let Ok(evt) = rx.try_recv() {
if let Some(ns) = &evt.new_state {
log.lock().unwrap().push(ns.state.clone());
}
}
break;
}
std::thread::yield_now();
}
Err(broadcast::error::TryRecvError::Lagged(_)) => {
panic!("channel lagged — test artefact, raise capacity");
}
Err(broadcast::error::TryRecvError::Closed) => break,
}
})
};
let barrier = Arc::new(Barrier::new(WRITERS));
let handles: Vec<_> = (0..WRITERS)
.map(|w| {
let sm = sm.clone();
let eid = eid.clone();
let barrier = Arc::clone(&barrier);
std::thread::spawn(move || {
barrier.wait();
for i in 0..ITERS {
// Toggle between two values → maximises the
// stale-`old` no-op collision window.
let val = if (w + i) % 2 == 0 { "A" } else { "B" };
sm.set(eid.clone(), val, serde_json::json!({}), Context::new());
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
done.store(true, Ordering::Release);
drainer.join().unwrap();
let log = log.lock().unwrap();
let dup = log
.windows(2)
.filter(|w| w[0] == w[1])
.count();
assert_eq!(
dup, 0,
"{dup} consecutive fired state_changed events carried an \
identical new_state impossible under correct \
serialisation; proves set()'s readdecideinsertsend \
TOCTOU dropped/reordered real transitions (missed & \
spurious automation triggers)",
);
}
}
}