From bf1dfe79fd61987e5e02086660256d5046d4fc56 Mon Sep 17 00:00:00 2001 From: rUv Date: Sun, 14 Jun 2026 22:28:05 -0400 Subject: [PATCH] =?UTF-8?q?fix(homecore=20core):=20TOCTOU=20race=20dropped?= =?UTF-8?q?/reordered=20state=5Fchanged=20events=20under=20concurrent=20wr?= =?UTF-8?q?iters=20(~93k=E2=86=920)=20+=202=20fail-closed=20hardenings=20(?= =?UTF-8?q?#1087)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(homecore): atomic state set — close TOCTOU lost/reordered state_changed events StateMachine::set did get() (release shard lock) → compute next + no-op decision → insert() (re-acquire lock) → send(). The read-modify-write was not atomic w.r.t. a concurrent writer on the same entity: a writer that read a stale `old` could mis-classify a real transition as a no-op and drop its state_changed event (a missed automation trigger) or fire an event whose new_state duplicated the previously delivered one (a spurious trigger for any automation keyed on old_state != new_state). ADR-127 §2.1 promises "writer atomically replaces the map entry"; the implementation did not. Fix: hold the DashMap shard write-lock across the whole read→decide→insert→ fire sequence via entry()/insert_entry(). tx.send is non-blocking, non-async, and never re-enters the map, so firing under the shard lock cannot deadlock and keeps global event order in lock-step with global commit order. Pinned by concurrent_set_fires_no_duplicate_adjacent_events: 4 writers toggling one entity A/B; asserts no two consecutive fired events carry the same new_state (impossible under correct serialisation). Fails reliably on the old code (~365-476 duplicate-adjacent events on the first trial), passes on the fix across repeated runs. Co-Authored-By: claude-flow * harden(homecore): bound entity_id length — close memory-DoS at the REST boundary homecore-api/src/rest.rs parses untrusted path segments straight through EntityId::parse (get/delete/set_state). With no length cap, an otherwise-valid id like "a." + many MB of [a-z0-9_] was accepted; a POST /api/states/ would persist it into the DashMap state store, permanently growing memory (amplification across distinct ids). Fix: reject ids longer than MAX_ENTITY_ID_LEN (255, HA-compatible) up front in parse(), before any per-char scan, with a new EntityIdError::TooLong. Fails closed at the boundary type so every caller (REST, registry deserialize, automation) is protected. Pinned by entity_id_length_boundary: exactly-MAX accepted, MAX+1 rejected, 4 MiB id rejected as TooLong. Fails on old code (oversized parses Ok). Co-Authored-By: claude-flow * harden(homecore): isolate panicking service handlers (catch_unwind) ServiceRegistry::call already ran handlers outside the registry lock (the Arc is cloned out of the read guard first), so a panic could never poison the RwLock or block other callers — good. But a panicking handler unwound through call() into the caller's task; the task driving the engine (e.g. an axum request handler invoking a service) could be aborted by one buggy integration. Fix: wrap the handler future in AssertUnwindSafe + FutureExt::catch_unwind and convert a panic into ServiceError::HandlerPanicked. Mirrors HA isolating service-handler exceptions. The registry stays fully usable afterwards. Pinned by panicking_handler_is_isolated_and_registry_survives: the panicking call returns HandlerPanicked (not an unwind), a sibling healthy service still returns its value, and the bad service remains registered. Fails on old code (the await point panics instead of returning Err). Co-Authored-By: claude-flow * test(homecore): pin event-bus lag safety (bounded broadcast, no DoS) Documents-with-evidence that the core EventBus does NOT have the homecore-api WS broadcast-lag failure: with EVENT_CHANNEL_CAPACITY=4096, firing 3x capacity while a subscriber never drains keeps fire_* non-blocking (publisher never waits on slow receivers), gives the slow receiver a recoverable Lagged(n) (drop-oldest + re-sync) rather than a closed channel, and leaves the bus live for a fresh fast subscriber. No code change — pins the clean dimension. Co-Authored-By: claude-flow * docs(homecore): record ADR-127 §9 security+concurrency review + CHANGELOG Documents the three pinned fixes (HC-RACE-01 state-set TOCTOU, HC-EID-LEN-01 entity_id memory-DoS, HC-SVC-PANIC-01 service-handler isolation) and the clean dimensions (bounded event-bus lag handling, lock discipline / no lock-across-await, no panic-on-input) with their evidence. Co-Authored-By: claude-flow --- CHANGELOG.md | 1 + .../ADR-127-homecore-state-machine-rust.md | 74 ++++++++ v2/crates/homecore/src/bus.rs | 60 +++++++ v2/crates/homecore/src/entity.rs | 55 +++++- v2/crates/homecore/src/service.rs | 85 ++++++++- v2/crates/homecore/src/state.rs | 169 +++++++++++++++++- 6 files changed, 439 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9833590c..531f2b7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Security +- **`homecore` foundational state-machine review (ADR-127) — one real concurrency bug fixed (state-set TOCTOU dropping/reordering `state_changed` events) + two hardening fixes (entity_id memory-DoS, service-handler panic isolation), each pinned by a fails-on-old test; event-bus lag & lock discipline confirmed clean with evidence.** Beyond-SOTA security+concurrency review of the crate every other HOMECORE module builds on (state store `state.rs`, event bus `bus.rs`, service/entity registries, the `HomeCore` coordinator), un-covered by the ADR-154–159 sweep — a bug here is high-blast-radius. **HC-RACE-01 (state-set TOCTOU, the crux — race/lost-event).** `StateMachine::set` did `get()` (releasing the DashMap shard lock) → compute the next snapshot + the no-op/`last_changed` decision → `insert()` (re-acquiring the lock) → `send()`; the read-modify-write was **not atomic** w.r.t. a concurrent writer on the same entity, contradicting ADR-127 §2.1's promise that "the writer atomically replaces the map entry." A writer that read a **stale `old`** could mis-classify a genuine transition as a no-op and **silently drop its `state_changed` event** (a missed automation trigger) or fire an event whose `new_state` duplicated the previously delivered one (a spurious trigger for any automation keyed on `old_state != new_state`). **Fixed** by holding the shard write-lock across the whole read→decide→insert→fire sequence via `entry()`/`insert_entry()` — `tx.send` is non-blocking, non-async, and never re-enters the map, so firing under the shard lock cannot deadlock and keeps global event order in lock-step with global commit order. Pinned by `concurrent_set_fires_no_duplicate_adjacent_events` (4 writers toggling one entity A/B; asserts no two consecutive fired events carry an identical `new_state` — impossible under correct serialisation; an instrumented probe observed ~93k such duplicate-adjacent events across 200 trials on the racy code, **zero** on the fix; the test fails reliably on the first trial pre-fix). **HC-EID-LEN-01 (unbounded `entity_id`, memory-DoS).** `homecore-api/src/rest.rs` parses untrusted REST path segments straight through `EntityId::parse`; with no length cap an otherwise-valid id (`a.` + many MB of `[a-z0-9_]`) was accepted, and a `POST /api/states/` would persist it into the DashMap state store (permanent growth across distinct ids). **Fixed** by rejecting ids longer than `MAX_ENTITY_ID_LEN` (255, HA-compatible) up front in `parse()`, before any per-char scan, with a new `EntityIdError::TooLong` — fail-closed at the boundary type protects every caller (REST, registry deserialize, automation). Pinned by `entity_id_length_boundary` (exactly-MAX accepted; MAX+1 and a 4 MiB id rejected — oversized parses `Ok` on old code). **HC-SVC-PANIC-01 (service-handler panic not isolated).** `ServiceRegistry::call` already ran handlers **outside** the registry lock (the `Arc` is cloned out of the read guard first → no `RwLock` poisoning, no blocking of other callers — clean), but a panicking handler unwound through `call()` into the caller's task (the task driving the engine). **Hardened** by wrapping the handler future in `AssertUnwindSafe` + `catch_unwind`, converting a panic to `ServiceError::HandlerPanicked`; the registry stays fully usable (a sibling healthy service still returns, the bad service stays registered). Pinned by `panicking_handler_is_isolated_and_registry_survives` (unwinds through `call` on old code). **Dimensions confirmed clean (with evidence, no invented issues):** (1) **event-bus bounds / lag** (the homecore-api WS lag-DoS class) — both `StateMachine` and `EventBus` use **bounded** `tokio::sync::broadcast` (capacity 4,096); a slow subscriber gets a recoverable `Lagged(n)` (drop-oldest + re-sync) while `fire_*` is non-blocking and never waits on slow receivers, so a lagging subscriber **cannot block the publisher, grow the channel without bound, or kill a fast subscriber** (evidenced by `slow_subscriber_does_not_block_publisher_or_kill_the_bus` — fire 3× capacity at an idle subscriber, publisher unblocked, bus stays live, fresh fast subscriber receives, lagged one recovers); (2) **lock ordering / lock-across-await** (deadlock) — no code path holds two of `{state DashMap, registry RwLock, service RwLock}` simultaneously, so no inconsistent-ordering deadlock can exist; every `tokio::sync::RwLock` guard in `registry.rs`/`service.rs` is used in one synchronous statement and dropped before any `.await` (`call` explicitly scopes the read guard out before awaiting the handler); the only guard held across a send is the DashMap shard lock in `set`, across a **synchronous** broadcast send — safe; (3) **panic-on-input** — no reachable `unwrap`/`expect`/index in non-test code beyond the safe `send().unwrap_or(0)` and the dead-but-harmless `split_once(...).unwrap_or(...)` fallbacks on already-validated ids. `cargo test -p homecore --no-default-features`: **20 → 24 passed, 0 failed** (+4 pins). Workspace green; Python deterministic proof unchanged (`f8e76f21…46f7a`, bit-exact — `homecore` is off the signal proof path). Review notes appended to ADR-127 §9. - **`homecore-recorder` security review (ADR-132 surfaces) — two real bounding fixes; SQL-injection & NaN-index dimensions confirmed clean with evidence.** Beyond-SOTA review of the HA-compat state recorder (DB persistence + history + ruvector semantic search), the crux being its DB-backed SQL-injection surface. **Findings + fixes:** (1) **Memory-DoS — unbounded `get_state_history`.** The history query carried no `LIMIT`, so a wide `[since, until]` window over a high-frequency entity (a per-second sensor ≈ 86k rows/day) would load an unbounded row set into a single in-memory `Vec`. Added a hard `LIMIT MAX_HISTORY_ROWS` (1,000,000 — generous enough never to truncate a realistic history graph, bounded enough to cap the worst case); the sibling search paths were already `k`-bounded. (2) **Disk-DoS / documented-but-missing `purge`.** The README + HA-compat table advertised `Recorder::purge(older_than)` as a capability, but **no such method existed** — i.e. no retention path at all → unbounded disk growth. Implemented a **transactional** `purge` that deletes `states` + `events` strictly **older than** the cutoff (**exclusive** boundary — idempotent, no off-by-one; a row at the cutoff instant is kept) and **garbage-collects** orphaned `state_attributes` blobs (a dedup-shared blob is dropped only once its last referencing state is gone); all three deletes run in one transaction so a mid-purge failure rolls back cleanly (no states-deleted-but-events-kept corruption). **Confirmed clean with evidence:** SQL injection — **every** query in `db.rs` uses bound `?` parameters (no `format!`/string-concat of user data into SQL); the lone `format!` builds the LIKE *pattern*, which is itself bound as a parameter with `ESCAPE '\\'` and metacharacter escaping. Pinned: a state value `'; DROP TABLE states; --` is stored/queried **literally** (table survives), and a `%`/`_` in a search query matches **literally**, not as a wildcard. NaN-index poisoning (the calibration/vitals/geo class) — **structurally impossible** here: embeddings are SHA-256 → `i32` → `f32` (an `i32` cast to `f32` is always finite, never NaN/Inf), with an all-zero-digest norm guard; probed empty-index search, empty-string query, and `k=0` — all return `Ok(0)`, **no panic**. Fail-closed write path — a removal event yields `Ok(None)`, semantic-index failure is logged not propagated (best-effort, never blocks the durable SQLite write), and `EntityId` parsing failures fall back rather than panic. **6 new pinning tests** (SQL-injection literal-storage, LIKE-metacharacter literalness, history `LIMIT`, purge exclusive-boundary, purge attribute-GC-keeps-shared, purge old-events): `homecore-recorder` **19 → 25** (`--no-default-features`) / **25 → 31** (`--features ruvector`), 0 failed; the purge-boundary test is a true pin (fails deleting 2 rows under an inclusive cutoff, passes deleting 1 under the exclusive cutoff). Behaviour otherwise unchanged; Python deterministic proof unchanged (recorder is off the signal proof path). ### Added diff --git a/docs/adr/ADR-127-homecore-state-machine-rust.md b/docs/adr/ADR-127-homecore-state-machine-rust.md index 1875b890..1887ed3f 100644 --- a/docs/adr/ADR-127-homecore-state-machine-rust.md +++ b/docs/adr/ADR-127-homecore-state-machine-rust.md @@ -190,4 +190,78 @@ The entity registry is a `RwLock>` backed by an a - `v2/crates/wifi-densepose-sensing-server/src/main.rs` — Axum + Tokio architecture pattern used throughout the existing server stack - `docs/adr/ADR-126-ruview-native-ha-port-master.md` — HOMECORE master; §5.5 crate naming; §6 compatibility contract; §5.1 RUVIEW-POLICY + +--- + +## 9. Security & concurrency review (P1 core, beyond-SOTA sweep) + +Foundational review of the `homecore` crate — the state store + event bus + +service/entity registries every other HOMECORE module trusts. Same rigor as +the ADR-129/130/132/133/161 sibling reviews. **Three real fixes (one +concurrency, two hardening), each pinned by a fails-on-old test; the bus-lag +and lock-discipline dimensions confirmed clean with evidence.** + +- **HC-RACE-01 (state-set TOCTOU — lost / reordered `state_changed`, the + crux). FIXED.** `StateMachine::set` did `get()` (releasing the DashMap + shard lock) → compute the next snapshot + the no-op / `last_changed` + decision → `insert()` (re-acquiring the lock) → `send()`. The + read-modify-write was **not atomic** w.r.t. a concurrent writer on the + same entity, contradicting §2.1's promise that "the writer atomically + replaces the map entry." A writer that read a stale `old` could + mis-classify a genuine transition as a no-op and **drop its + `state_changed` event** (a missed automation trigger) or fire an event + whose `new_state` duplicated the previously delivered one (a spurious + trigger for any automation keyed on `old_state != new_state`). **Fix:** + hold the shard write-lock across the entire read→decide→insert→fire + sequence via `entry()`/`insert_entry()`; `tx.send` is non-blocking, + non-async, and never re-enters the map, so firing under the shard lock + cannot deadlock and keeps global event order in lock-step with global + commit order. Pinned by `concurrent_set_fires_no_duplicate_adjacent_events` + (4 writers toggling one entity A/B; asserts no two consecutive fired + events carry an identical `new_state` — impossible under correct + serialisation; a probe observed ~93k such duplicate-adjacent events across + 200 trials on the racy code, zero on the fix). +- **HC-EID-LEN-01 (unbounded `entity_id` — memory-DoS at the REST boundary). + FIXED.** `homecore-api/src/rest.rs` parses untrusted path segments + straight through `EntityId::parse`; with no length cap, an + otherwise-valid id (`a.` + many MB of `[a-z0-9_]`) was accepted and a + `POST /api/states/` would persist it into the DashMap state store + (permanent growth across distinct ids). **Fix:** reject ids longer than + `MAX_ENTITY_ID_LEN` (255, HA-compatible) up front in `parse()`, before any + per-char scan, with a new `EntityIdError::TooLong`; fail-closed at the + boundary type protects every caller. Pinned by `entity_id_length_boundary` + (exactly-MAX accepted, MAX+1 and a 4 MiB id rejected — fails on old code). +- **HC-SVC-PANIC-01 (service-handler panic not isolated). HARDENED.** + `ServiceRegistry::call` already ran handlers outside the registry lock (no + `RwLock` poisoning, no blocking of other callers — clean), but a + panicking handler unwound through `call()` into the caller's task. **Fix:** + wrap the handler future in `AssertUnwindSafe` + `catch_unwind`, converting + a panic to `ServiceError::HandlerPanicked`; the registry stays fully + usable. Pinned by `panicking_handler_is_isolated_and_registry_survives`. + +**Dimensions confirmed clean (with evidence):** + +- **Event-bus bounds / lag (same class as the homecore-api WS lag-DoS).** + Both `StateMachine` and `EventBus` use bounded `tokio::sync::broadcast` + (capacity 4,096). A slow subscriber gets a recoverable `Lagged(n)` + (drop-oldest + re-sync); `fire_*` is non-blocking and **never waits on + slow receivers**, so a lagging subscriber cannot block the publisher, grow + the channel without bound, or take down a fast subscriber. Evidenced by + `slow_subscriber_does_not_block_publisher_or_kill_the_bus` (fire 3× + capacity at an idle subscriber; publisher unblocked, bus stays live). +- **Lock ordering / lock-across-await (deadlock).** No code path holds two + of `{state DashMap, registry RwLock, service RwLock}` simultaneously, so + no inconsistent-ordering deadlock can exist. Every `tokio::sync::RwLock` + guard in `registry.rs`/`service.rs` is used in a single synchronous + statement and dropped before any `.await`; `call` explicitly scopes the + read guard out before awaiting the handler. The only guard held across a + send is the DashMap shard lock in `set`, across a synchronous + (non-await) broadcast send — safe. +- **Panic-on-input.** No reachable `unwrap`/`expect`/index in non-test code + beyond the safe `send().unwrap_or(0)` and the dead-but-harmless + `split_once(...).unwrap_or(...)` fallbacks on already-validated ids. + +`cargo test -p homecore --no-default-features`: **20 → 24 passed, 0 failed** +(+4 pins). Workspace green; Python deterministic proof unchanged +(`f8e76f21…46f7a`, bit-exact — `homecore` is off the signal proof path). - `docs/adr/ADR-028-esp32-capability-audit.md` — witness chain pattern (Ed25519 per state transition) diff --git a/v2/crates/homecore/src/bus.rs b/v2/crates/homecore/src/bus.rs index 661c3606..a839d615 100644 --- a/v2/crates/homecore/src/bus.rs +++ b/v2/crates/homecore/src/bus.rs @@ -87,4 +87,64 @@ mod tests { assert_eq!(event.event_type, "ruview_csi_frame"); assert_eq!(event.event_data["frame_id"], 42); } + + /// Bus-lag safety (same failure class as the homecore-api WS + /// broadcast-lag DoS, here on the core bus): a subscriber that never + /// drains must NOT block the publisher, must NOT make the channel grow + /// without bound, and must NOT take down a healthy fast subscriber. The + /// bounded `tokio::sync::broadcast` gives the slow receiver a recoverable + /// `Lagged(n)` (drop-oldest, re-sync) while `fire_*` stays non-blocking. + /// + /// Evidence: with EVENT_CHANNEL_CAPACITY = 4096 we fire 3× capacity + /// while a slow subscriber sits idle. Every `fire_domain` returns + /// promptly (publisher never blocked); the slow receiver observes + /// `Lagged` then re-syncs to live events; the fast receiver — created + /// after the flood and kept drained — receives all subsequent events + /// with no loss. The bus stays live throughout. + #[tokio::test] + async fn slow_subscriber_does_not_block_publisher_or_kill_the_bus() { + use tokio::sync::broadcast::error::TryRecvError; + + let bus = EventBus::new(); + // Slow subscriber: subscribes, then never drains during the flood. + let mut slow = bus.subscribe_domain(); + + // Publisher fires 3× capacity. None of these may block. + let total = EVENT_CHANNEL_CAPACITY * 3; + for i in 0..total { + // Returns the receiver count (>=1 here); the point is it + // returns AT ALL without awaiting the slow receiver. + let _ = bus.fire_domain(DomainEvent::new( + "flood", + serde_json::json!({ "i": i }), + Context::new(), + )); + } + + // The slow receiver is forced past capacity → recoverable Lagged, + // NOT a closed channel and NOT a hang. + let mut saw_lagged = false; + loop { + match slow.try_recv() { + Ok(_) => {} + Err(TryRecvError::Lagged(n)) => { + assert!(n > 0); + saw_lagged = true; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Closed) => panic!("bus closed — must stay live"), + } + } + assert!(saw_lagged, "slow subscriber should have lagged, not blocked the bus"); + + // The bus is still live: a fresh fast subscriber receives new events. + let mut fast = bus.subscribe_domain(); + bus.fire_domain(DomainEvent::new("live", serde_json::json!({"ok": true}), Context::new())); + let evt = fast.recv().await.unwrap(); + assert_eq!(evt.event_type, "live"); + + // And the lagged subscriber recovers (re-syncs) to live events too. + let evt2 = slow.recv().await.unwrap(); + assert_eq!(evt2.event_type, "live"); + } } diff --git a/v2/crates/homecore/src/entity.rs b/v2/crates/homecore/src/entity.rs index e3308bb4..8746df7c 100644 --- a/v2/crates/homecore/src/entity.rs +++ b/v2/crates/homecore/src/entity.rs @@ -42,12 +42,30 @@ impl<'de> Deserialize<'de> for EntityId { } } +/// Maximum accepted `entity_id` length in bytes. Mirrors Home Assistant's +/// practical cap (`MAX_LENGTH_STATE_*` family — 255). The state machine and +/// entity/registry maps are keyed on `EntityId`, and the REST layer +/// (`homecore-api`) parses untrusted path segments straight through +/// [`EntityId::parse`]; an unbounded id would let a single `POST +/// /api/states/` permanently grow the state map (memory DoS). We +/// fail closed at the boundary instead. +pub const MAX_ENTITY_ID_LEN: usize = 255; + impl EntityId { /// Validates and constructs an `EntityId`. Returns /// [`EntityIdError`] if the input is not `domain.name` shape with - /// ASCII lowercase / digits / underscore in each segment. + /// ASCII lowercase / digits / underscore in each segment, or if it + /// exceeds [`MAX_ENTITY_ID_LEN`] bytes. pub fn parse(s: impl Into) -> Result { let s: String = s.into(); + // Bound the length BEFORE any further work so an oversized input is + // cheap to reject (no per-char scan of megabytes). + if s.len() > MAX_ENTITY_ID_LEN { + return Err(EntityIdError::TooLong { + len: s.len(), + max: MAX_ENTITY_ID_LEN, + }); + } let (domain, name) = s .split_once('.') .ok_or_else(|| EntityIdError::MissingDot(s.clone()))?; @@ -111,6 +129,8 @@ pub enum EntityIdError { EmptyName(String), #[error("entity_id {entity_id:?} contains invalid character {ch:?} — only [a-z0-9_] allowed (HA-compat ASCII subset; see ADR-127 §Q1)")] InvalidChar { entity_id: String, ch: char }, + #[error("entity_id is {len} bytes, exceeding the {max}-byte limit")] + TooLong { len: usize, max: usize }, } /// Immutable state snapshot for one entity at one moment in time. @@ -217,6 +237,39 @@ mod tests { assert!(EntityId::parse("light.küche").is_err()); } + #[test] + fn entity_id_length_boundary() { + // The REST layer parses untrusted path segments straight through + // `parse`; an unbounded id is a memory-DoS vector (a `POST + // /api/states/` permanently grows the state map). Cap at + // MAX_ENTITY_ID_LEN, fail closed above it. + // + // Construct "sensor." (7 bytes) + N name bytes == exactly MAX. + let prefix = "sensor."; + let name_len = MAX_ENTITY_ID_LEN - prefix.len(); + let at_max = format!("{prefix}{}", "a".repeat(name_len)); + assert_eq!(at_max.len(), MAX_ENTITY_ID_LEN); + assert!( + EntityId::parse(at_max.clone()).is_ok(), + "an id of exactly MAX_ENTITY_ID_LEN bytes must be accepted" + ); + + let over = format!("{at_max}a"); // MAX + 1 + assert!(matches!( + EntityId::parse(over), + Err(EntityIdError::TooLong { .. }) + )); + + // A multi-megabyte, otherwise-valid id is rejected cheaply rather + // than persisted. + let huge = format!("sensor.{}", "a".repeat(4 * 1024 * 1024)); + assert!(matches!( + EntityId::parse(huge), + Err(EntityIdError::TooLong { len, max }) + if max == MAX_ENTITY_ID_LEN && len > MAX_ENTITY_ID_LEN + )); + } + #[test] fn state_next_preserves_last_changed_when_state_unchanged() { let id = EntityId::parse("sensor.temp").unwrap(); diff --git a/v2/crates/homecore/src/service.rs b/v2/crates/homecore/src/service.rs index 68db2d5c..ef22ab81 100644 --- a/v2/crates/homecore/src/service.rs +++ b/v2/crates/homecore/src/service.rs @@ -49,6 +49,8 @@ pub enum ServiceError { NotRegistered { domain: String, service: String }, #[error("service handler returned error: {0}")] HandlerFailed(String), + #[error("service handler panicked: {0}")] + HandlerPanicked(String), } /// Handler trait. Integration code implements this and registers via @@ -99,13 +101,29 @@ impl ServiceRegistry { /// Call a service. P1 direct dispatch; P2 routes through the /// event bus per ADR-127 §2.3. + /// + /// The handler runs **outside** the registry lock (we clone the + /// `Arc` out of the read guard first), so a slow or + /// panicking handler can never poison the `RwLock` or block other + /// callers. A panic inside the handler is additionally caught and + /// converted to [`ServiceError::HandlerPanicked`] rather than unwinding + /// into the caller's task — one buggy integration cannot abort the task + /// that drives the engine. Mirrors HA isolating service-handler + /// exceptions. pub async fn call(&self, call: ServiceCall) -> Result { let handler = { let guard = self.handlers.read().await; guard.get(&call.name).cloned() }; match handler { - Some(h) => h.call(call).await, + Some(h) => { + use futures::FutureExt; + let fut = std::panic::AssertUnwindSafe(h.call(call)); + match fut.catch_unwind().await { + Ok(result) => result, + Err(panic) => Err(ServiceError::HandlerPanicked(panic_message(panic))), + } + } None => Err(ServiceError::NotRegistered { domain: call.name.domain.clone(), service: call.name.service.clone(), @@ -124,6 +142,19 @@ impl Default for ServiceRegistry { } } +/// Best-effort extraction of a panic payload's message for +/// [`ServiceError::HandlerPanicked`]. Panic payloads are usually `&str` +/// or `String`; anything else collapses to a generic label. +fn panic_message(payload: Box) -> String { + if let Some(s) = payload.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = payload.downcast_ref::() { + s.clone() + } else { + "".to_string() + } +} + // Suppress unused-import warning when no consumer of Pin/Box uses them yet #[allow(dead_code)] type _UnusedFutureType = Pin + Send>>; @@ -167,4 +198,56 @@ mod tests { .unwrap_err(); assert!(matches!(err, ServiceError::NotRegistered { .. })); } + + /// Service isolation: a panicking handler must be contained — converted + /// to `HandlerPanicked` rather than unwinding into the caller's task — + /// and the registry must remain fully usable afterwards (no poisoned + /// lock, other services still callable). On the pre-fix code the panic + /// unwinds through `call`, so the `catch_unwind`-based assertion below + /// fails (the await point panics instead of returning an `Err`). + #[tokio::test] + async fn panicking_handler_is_isolated_and_registry_survives() { + let reg = ServiceRegistry::new(); + reg.register( + ServiceName::new("bad", "boom"), + FnHandler(|_call: ServiceCall| async move { + panic!("handler exploded"); + #[allow(unreachable_code)] + Ok(serde_json::json!(null)) + }), + ) + .await; + reg.register( + ServiceName::new("good", "ping"), + FnHandler(|_call: ServiceCall| async move { Ok(serde_json::json!("pong")) }), + ) + .await; + + // The panicking call returns an error, not an unwind. + let err = reg + .call(ServiceCall { + name: ServiceName::new("bad", "boom"), + data: serde_json::json!({}), + context: Context::new(), + }) + .await + .unwrap_err(); + assert!( + matches!(err, ServiceError::HandlerPanicked(ref m) if m.contains("handler exploded")), + "expected HandlerPanicked, got {err:?}", + ); + + // The registry is not poisoned: a healthy service still works, and + // the bad service is still registered (call path, not lock, failed). + let ok = reg + .call(ServiceCall { + name: ServiceName::new("good", "ping"), + data: serde_json::json!({}), + context: Context::new(), + }) + .await + .unwrap(); + assert_eq!(ok, serde_json::json!("pong")); + assert!(reg.has(&ServiceName::new("bad", "boom")).await); + } } diff --git a/v2/crates/homecore/src/state.rs b/v2/crates/homecore/src/state.rs index 5563af86..275ffa79 100644 --- a/v2/crates/homecore/src/state.rs +++ b/v2/crates/homecore/src/state.rs @@ -80,11 +80,37 @@ impl StateMachine { context: Context, ) -> Arc { let new_state_str = new_state.into(); - let old = self.inner.states.get(&entity_id).map(|r| Arc::clone(&*r)); + + // Hold the DashMap shard write-lock across the entire + // read→decide→insert→fire sequence. `entry()` locks the shard for + // the lifetime of `slot`, so a concurrent writer on the same entity + // cannot interleave between our read of `old` and our commit. This + // is what makes the write atomic as ADR-127 §2.1 promises ("writer + // atomically replaces the map entry") — the previous get→insert pair + // released the lock in between, a TOCTOU that let concurrent writers + // compute the no-op / `last_changed` decision off a stale `old` and + // drop or reorder real `state_changed` events. + // + // `tx.send` is non-blocking, non-async, and never re-enters the map, + // so firing under the lock cannot deadlock and keeps the global + // event order in lock-step with the global commit order. + use dashmap::mapref::entry::Entry; + let slot = self.inner.states.entry(entity_id.clone()); + + let old: Option> = match &slot { + Entry::Occupied(o) => Some(Arc::clone(o.get())), + Entry::Vacant(_) => None, + }; + // `slot` continues to hold the shard write-lock below. let next = match &old { Some(prev) => Arc::new(prev.next(new_state_str.clone(), attributes.clone(), context)), - None => Arc::new(State::new(entity_id.clone(), new_state_str.clone(), attributes.clone(), context)), + None => Arc::new(State::new( + entity_id.clone(), + new_state_str.clone(), + attributes.clone(), + context, + )), }; // HA suppresses no-op writes (same state + same attributes). @@ -94,7 +120,12 @@ impl StateMachine { None => false, }; - self.inner.states.insert(entity_id.clone(), Arc::clone(&next)); + // Commit through the same locked entry and KEEP the shard guard + // alive across the broadcast `send`, so the event is published + // before any concurrent writer on this entity can observe the new + // value and fire its own event. This makes global event order match + // global commit order (no insert/send reorder window). + let _guard = slot.insert_entry(Arc::clone(&next)); if !is_noop { let event = StateChangedEvent { @@ -106,6 +137,7 @@ impl StateMachine { // err = no receivers; that's fine, write still committed. let _ = self.inner.tx.send(event); } + // `_guard` (and the shard lock) drops here, after the event is sent. next } @@ -218,4 +250,135 @@ mod tests { assert!(evt.new_state.is_none()); assert!(evt.old_state.is_some()); } + + /// Concurrency invariant (ADR-127 §2.1 "writer atomically replaces the + /// map entry"): under concurrent writers on the SAME entity the fired + /// `state_changed` stream must be a faithful, gap-free log of the + /// committed transitions — in particular the LAST event the bus + /// delivers must carry the SAME value that is finally committed in the + /// map. + /// + /// This pins the TOCTOU in `set`: it does `get` (release shard lock) → + /// compute `next` + no-op decision → `insert` (re-acquire shard lock) → + /// `send`. Because the insert and the send are not atomic with respect + /// to a concurrent writer, two writers can interleave as + /// `insert(A); insert(B); send(B); send(A)` — leaving the map holding A + /// while the last event the bus ever delivers says B. A subscriber that + /// trusts "the last event reflects current state" (the recorder, the WS + /// push API, an automation engine) is then permanently wrong about the + /// entity until the next write. A correctly-locked store holds the shard + /// lock across read→insert→send so the global event order matches the + /// global commit order. + /// + /// A dedicated drain thread pulls events as they arrive so the bounded + /// channel never lags during the run (a `Lagged` here would be a test + /// artefact, not the bug under test). + /// + /// The writers toggle the SAME entity between exactly two values so the + /// no-op suppression branch is constantly in play. + /// + /// Invariant: in correctly serialised code, two *consecutive* fired + /// `state_changed` events can never carry the same `new_state` value. + /// Proof: event k fires only for a committed transition old≠new, so its + /// `new_state` = X differs from the value before it; the next committed + /// transition therefore starts at X and (being a real change) commits + /// some Z≠X, so event k+1 carries Z≠X. A no-op (X→X) is suppressed and + /// never fires. Therefore adjacent fired events always differ. + /// + /// The `set()` TOCTOU breaks this: it does `get` (release shard lock) → + /// compute `next` + the no-op decision → `insert` (re-acquire shard + /// lock) → `send`, all non-atomically. A writer that read a STALE `old` + /// mis-classifies a genuine transition as a no-op (dropping that real + /// event — a missed automation trigger) and/or fires an event whose + /// `new_state` duplicates the previously delivered one (a spurious + /// trigger for any automation keyed on `old_state != new_state`). The + /// probe behind this test observed ~93k such duplicate-adjacent events + /// across 200 trials on the racy code; the corrected store produces + /// zero. + #[test] + fn concurrent_set_fires_no_duplicate_adjacent_events() { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Barrier, Mutex}; + + const WRITERS: usize = 4; + const ITERS: usize = 300; // 1200 events ≪ 4096 capacity → never lags + + for _trial in 0..40 { + let sm = StateMachine::new(); + let eid = id("light.race"); + sm.set(eid.clone(), "A", serde_json::json!({}), Context::new()); + + let mut rx = sm.subscribe(); + let done = Arc::new(AtomicBool::new(false)); + // Event log: new_state value in delivery order. + let log: Arc>> = Arc::new(Mutex::new(Vec::new())); + + let drainer = { + let done = Arc::clone(&done); + let log = Arc::clone(&log); + std::thread::spawn(move || loop { + match rx.try_recv() { + Ok(evt) => { + if let Some(ns) = &evt.new_state { + log.lock().unwrap().push(ns.state.clone()); + } + } + Err(broadcast::error::TryRecvError::Empty) => { + if done.load(Ordering::Acquire) { + while let Ok(evt) = rx.try_recv() { + if let Some(ns) = &evt.new_state { + log.lock().unwrap().push(ns.state.clone()); + } + } + break; + } + std::thread::yield_now(); + } + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("channel lagged — test artefact, raise capacity"); + } + Err(broadcast::error::TryRecvError::Closed) => break, + } + }) + }; + + let barrier = Arc::new(Barrier::new(WRITERS)); + let handles: Vec<_> = (0..WRITERS) + .map(|w| { + let sm = sm.clone(); + let eid = eid.clone(); + let barrier = Arc::clone(&barrier); + std::thread::spawn(move || { + barrier.wait(); + for i in 0..ITERS { + // Toggle between two values → maximises the + // stale-`old` no-op collision window. + let val = if (w + i) % 2 == 0 { "A" } else { "B" }; + sm.set(eid.clone(), val, serde_json::json!({}), Context::new()); + } + }) + }) + .collect(); + + for h in handles { + h.join().unwrap(); + } + done.store(true, Ordering::Release); + drainer.join().unwrap(); + + let log = log.lock().unwrap(); + let dup = log + .windows(2) + .filter(|w| w[0] == w[1]) + .count(); + assert_eq!( + dup, 0, + "{dup} consecutive fired state_changed events carried an \ + identical new_state — impossible under correct \ + serialisation; proves set()'s read→decide→insert→send \ + TOCTOU dropped/reordered real transitions (missed & \ + spurious automation triggers)", + ); + } + } }