diff --git a/CHANGELOG.md b/CHANGELOG.md index a63ff7fa..a766aebd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **ADR-260: RuField MFS — the open specification for camera-free multimodal field sensing.** A common event / tensor / calibration / privacy / provenance model that sits *above* WiFi CSI/CIR/BFLD, UWB, BLE Channel Sounding, mmWave radar, ultrasound, subsonic, infrared, and future quantum sensors (each modality emits a normalized `FieldEvent` → `FieldTensor` → `FusionGraph` → `PrivacyClass` → `ProvenanceReceipt`). Published as a **standalone repo** [`ruvnet/rufield`](https://github.com/ruvnet/rufield) and vendored here as the `vendor/rufield` submodule (the `vendor/rvcsi` pattern — not a `v2/` workspace member). The v0.1 reference stack is a self-contained 6-crate Rust workspace (`rufield-core`, `-provenance` [sha256 + ed25519], `-privacy` [P0–P5 guard], `-adapters` [deterministic `SyntheticSim` across wifi_csi/mmwave_radar/infrared_thermal], `-fusion` [graph + TOML weighted-Bayes rules → 7 room-state inferences], `-bench` [deterministic runner + the §31 acceptance test]). **60 tests / 0 failed, clippy-clean.** §27 acceptance criteria 1–8 and 10 PASS; the live dashboard (9) is deferred. **All benchmark metrics are SYNTHETIC** (scored against the simulator's own ground truth — presence/breathing/bed_exit/room_transition F1 = 1.000, nocturnal_scratch 0.923 reported honestly, p95 latency ~0.01 ms, provenance coverage 100%, 0 privacy violations) — they prove the pipeline recovers known truth, **not** field accuracy; real hardware adapters (ESP32 CSI, mmWave, thermal IR) are a documented roadmap item, none validated in v0.1. The Python deterministic proof is unchanged (rufield is off the signal-processing proof path). ### Security +- **`homecore-api` (HA-wire-compat REST + WebSocket) beyond-SOTA security review — `GET /api/` auth-gate gap FIXED + WS event-stream lag-DoS robustness FIXED; auth/traversal/injection/info-leak dimensions confirmed clean (ADR-161 / ADR-130).** Network-facing review of the HA-wire-compat API layer (remote attack surface), not covered by the ADR-154–159 sweep — same scrutiny the sibling `wifi-densepose-engine` and `-bfld` reviews got. **Two real bugs fixed, each pinned by a fails-on-old test.** + - **HC-API-AUTH-01 (auth-gate gap, LOW) — `GET /api/` was unauthenticated; FIXED.** Every sibling REST route (`/api/config`, `/api/states`, `/api/services`, …) calls `BearerAuth::from_headers` first, but `rest::api_root` took no headers and unconditionally returned `200 {"message":"API running."}`. HA's `APIStatusView` inherits `requires_auth = True`, so an unauthenticated/wrong-token request to `/api/` must be **401** — HA clients use this status route as a token-validation probe, and a 200 both told a bad-token client its token was good and let an unauthenticated party confirm a live endpoint. Severity is LOW (the body is a static string — no entity/state data leaks), reported at true severity, not inflated. **Fix:** `api_root` now validates the bearer like its siblings. Pinned by `api_root_rejects_missing_bearer` + `api_root_rejects_wrong_bearer` (both 200→assert-401 on old code) and guarded by `api_root_accepts_correct_bearer`. + - **HC-WS-LAG-01 (DoS-adjacent silent failure, LOW) — `subscribe_events` killed the event stream on a broadcast lag; FIXED.** The per-subscription task matched `Err(_) => break` on both `broadcast::Receiver::recv()` arms, but `Lagged(n)` (a slow consumer falling >4,096 events — `EVENT_CHANNEL_CAPACITY` — behind) is **recoverable**: the bus doc itself says "Lagged receivers must re-sync", and HA's WS contract keeps the subscription alive across a lag. The old code treated the first lag as fatal, so after an event burst the client's stream went **permanently silent** with no error frame — a self-inflicted event-delivery DoS under load. **Fix:** `Lagged(_) => continue` (skip the dropped window, re-sync), `Closed => break`, on both the system and domain arms. Pinned by `subscription_survives_broadcast_lag` (subscribes, floods 6,000 filtered events past the 4,096 capacity to force a `Lagged`, then asserts a subsequent subscribed event is still delivered — 5s-timeout panic on old code). + - **Dimensions confirmed clean (with evidence, no invented issues):** (1) **AuthN/AuthZ** — all 7 other REST handlers (`get_config`/`get_states`/`get_state`/`set_state`/`delete_state`/`get_services`/`call_service`) gate on `BearerAuth::from_headers` → `LongLivedTokenStore::is_valid` before any work; the WS handshake validates the `auth` token against the **same** store before entering the command loop and the privileged commands are unreachable pre-`auth_ok` (HC-WS-01, already fixed). Token compare is a `HashSet::contains` (content-independent timing, not the byte-`==` oracle ADR-157 §B4 fixed in hardware) — no timing-oracle finding. No route skips the gate, no result-ignored check, no default/empty token accepted (`is_valid` rejects empty internally; `from_env` is non-dev). (2) **Path traversal** — **no route maps user input to a filesystem path** (state lives in an in-memory `DashMap`); `:entity_id` is funneled through `EntityId::parse`, a strict `[a-z0-9_]+\.[a-z0-9_]+` ASCII allowlist that rejects `..`, `/`, `\`, and absolute paths. No traversal surface exists. (3) **Injection** — no SQL, no shell/subprocess, no `format!`-into-response; `call_service`/`set_state` bodies are typed `serde_json::Value` passed to the in-process service registry (matches HA). (4) **Info-leak** — `ApiError` maps to fixed status + a `{message}` derived only from typed variants; `call_service`'s `ServiceError::HandlerFailed(String)` is integration-controlled (mirrors HA surfacing the handler error), not framework internals/paths/stack-traces (no ADR-080-class leak). (5) **CORS** is an explicit allowlist (`allow_credentials(false)`, HC-05 already fixed), not `permissive()`. (6) **De-magic** — no bare security-relevant literals in this crate worth extracting (`EVENT_CHANNEL_CAPACITY` already named in `homecore`; CORS dev-default ports are documented). `homecore-api --no-default-features`: **25→29 tests**, 0 failed (+2 api-root auth, +1 api-root accept-guard, +1 WS lag-survival); workspace green; Python deterministic proof unchanged (homecore-api is off the signal proof path). Review notes appended to ADR-161. - **`wifi-densepose-engine` governed-trust review — witness domain-separation gap FIXED + privacy monotonicity confirmed clean (ADR-137 / ADR-141 / ADR-032).** Beyond-SOTA correctness+security review of the security-critical composition root (the cycle enforcing RuView's privacy guarantees), not covered by the ADR-154–159 sweep. **One real witness-integrity bug fixed.** `witness_of` concatenated `model_version`, `calibration_version`, and `privacy_decision` boundary-to-boundary and left the variable-length evidence list without a count, so a string straddling a field boundary collided with a *different* trust decision — e.g. a per-room adapter id (ADR-150 §3.4, operator-influenceable) absorbing the leading bytes of the calibration epoch (`model="…cal:00a"`,`cal="b"`) yields the same witness as `model="…"`,`cal="cal:00ab"`. Two distinct privacy-relevant input tuples → one witness defeats the ADR-137 §2.7 "any privacy-relevant delta → different witness" tamper/drift audit. **Fix:** domain-tag the BLAKE3 hash (`ruview.engine.witness.v1`), write an explicit evidence count, and **length-prefix every field** (8-byte LE length ‖ bytes) — unambiguous framing regardless of contents. Witness-layout change by design (prior witness bytes invalidated); downstream consumers (`engine_bridge`, rufield) assert only witness *relationships* (`assert_ne`/`assert_eq` across runs), never absolute bytes, so nothing breaks. Pinned by two fails-on-old tests: `witness_distinguishes_model_calibration_boundary`, `witness_distinguishes_evidence_model_boundary`. **Dimensions confirmed clean (with evidence, no invented issues):** (1) **privacy monotonicity** — `effective_class` is recomputed each cycle from the active mode's floor with at most a single-step `demote_one` (clamped at `Restricted`), no cross-cycle state, proven over **all 5 modes** by `forced_contradiction_never_relaxes_class` (forced contradiction only ever raises the class byte; clean cycle == base); (2) **fail-closed** — empty cycle errors with no degenerate output (`empty_cycle_fails_closed`), single-node boundary characterized (`single_node_cycle_is_well_formed`), NaN coupling → `max(0.0)`→absent edge→at-risk (more restrictive); (3) **witness determinism** — no HashMap iteration / float formatting feeds the hash; (4) **mesh_guard** (ADR-032) — partition-risk → demotion path verified, thresholds already named documented fields. De-magicked the engine-construction literals (coherence accept gate, ADR-143 SLAM discovery + static-anchor thresholds) into named documented consts, value-identical, pinned by `engine_constants_match_prior_values`. `wifi-densepose-engine --no-default-features`: **27→33 tests**, 0 failed (+2 witness, +1 monotonicity property, +2 fail-closed boundary, +1 de-magic pin). Python deterministic proof unchanged (`f8e76f21…46f7a`, bit-exact — the engine is off the signal proof path). Review notes appended to ADR-137 (witness) and ADR-141 (monotonicity). - **ADR-141 BFLD privacy-bypass closed — `process_to_frame` now routes the payload through `PrivacyGate` (`wifi-densepose-bfld`).** `BfldPipeline::process_to_frame` stamped the emitted `BfldFrame` header with the active `PrivacyClass` but serialized the caller-supplied `BfldPayload` **unchanged** via `BfldFrame::from_payload`. A frame labeled `Anonymous`(2) or `Restricted`(3) therefore carried the full identity-leaky `compressed_angle_matrix` (the beamforming-angle identity surface) + amplitude/phase proxies + `csi_delta` — exactly the sections `PrivacyGate::demote` is documented and tested (`privacy_gate_demote.rs`) to strip at those classes. Because a `NetworkSink` accepts class ≥ `Derived`(1), such a frame would publish the identity surface across the node boundary despite its restrictive class byte; the class byte lied about payload content. **Fix:** after building the frame at the active class, apply `PrivacyGate::demote` to the same class — a no-op class transition that strips the sections that class forbids (research classes `Raw`/`Derived` keep the full payload). Pinned by three fails-on-old tests in `pipeline_to_frame.rs` (`…_at_anonymous_strips_identity_leaky_sections`, `…_in_privacy_mode_strips_amplitude_and_phase` — both FAILED pre-fix; `…_at_derived_preserves_full_payload` guards against over-stripping). Grade: privacy-bypass FIXED + regression-pinned. - **ADR-157 Milestone-1 B4 - constant-time HMAC sync-beacon tag compare (`wifi-densepose-hardware`).** `AuthenticatedBeacon::verify` compared the 8-byte HMAC-SHA256 tag with `self.hmac_tag == expected`, which short-circuits on the first differing byte and leaks, through verification latency, how many leading bytes an attacker's forged tag matched - a byte-by-byte tag-recovery oracle (~256*N trials instead of 256^N). Replaced with a hand-rolled branch-free `constant_time_tag_eq` (XOR-accumulate every byte difference into a single `u8`, no early exit, `#[inline(never)]` + `core::hint::black_box` to stop the optimizer reintroducing a short-circuit or a non-constant-time `memcmp`). **No new dependency** - ADR-157 had deferred this only to avoid adding the `subtle` crate; a fixed 8-byte compare needs none. Grade MEASURED (constant-time *construction*; micro-timing on a noisy host is a smoke check only, gated `#[ignore]`). Pinned by `tag_compare_is_constant_time_shape` (equal/first-differ/last-differ/all-differ/length-mismatch + an end-to-end `verify()` last-byte tamper), proven to fail on a last-byte-skipping constant-time bug. ADR-157 §8 B4 -> RESOLVED. diff --git a/docs/adr/ADR-161-homecore-server-layer-security.md b/docs/adr/ADR-161-homecore-server-layer-security.md index 4404c608..6fea0862 100644 --- a/docs/adr/ADR-161-homecore-server-layer-security.md +++ b/docs/adr/ADR-161-homecore-server-layer-security.md @@ -265,3 +265,74 @@ Result at time of writing (all 0 failed): perform (B5). - Files kept under the 500-line guideline (`engine.rs` 462; behavioral tests moved to `tests/engine_behaviors.rs`). + +## Addendum — `homecore-api` follow-up security review (beyond-SOTA pass) + +A later network-facing review of `homecore-api` (the remote REST + WS attack +surface) — independent of the ADR-154–159 sweep — found and fixed two real +issues the original M7 pass (which focused on the WS auth bypass HC-WS-01, the +reply-theater HC-WS-02, and the bin token provisioning HC-WS-08) did not catch. +Both are LOW severity and reported at true severity. + +### HC-API-AUTH-01 — `GET /api/` was unauthenticated (FIXED) + +`rest::api_root` took no headers and unconditionally returned +`200 {"message":"API running."}`, while every sibling route gates on +`BearerAuth::from_headers`. HA's `APIStatusView` inherits `requires_auth = True`, +so `/api/` must return **401** for a missing/wrong bearer. HA clients use the +status route as a token-validation probe; a 200 told a bad-token client its +token was valid and let an unauthenticated party confirm a live endpoint. +LOW severity (the body is a static string; no entity/state data leaks). + +**Fix:** `api_root(headers, State)` now validates the bearer like `get_config`. +**Pinned by** (fail-on-old, `tests/server_bin_auth.rs`): +`api_root_rejects_missing_bearer`, `api_root_rejects_wrong_bearer` (both 200→401), +guarded by `api_root_accepts_correct_bearer` (still 200 with a valid token). + +### HC-WS-LAG-01 — `subscribe_events` killed the stream on a broadcast lag (FIXED) + +The per-subscription task matched `Err(_) => break` on both broadcast +`recv()` arms. `RecvError::Lagged(n)` (a slow consumer falling +>`EVENT_CHANNEL_CAPACITY` = 4,096 events behind) is **recoverable** — the bus +doc says "Lagged receivers must re-sync" and HA keeps the subscription alive +across a lag. The old code treated the first lag as fatal, so after an event +burst the client's stream went permanently silent with no error frame — a +self-inflicted event-delivery DoS under load. + +**Fix:** `Lagged(_) => continue` (skip the dropped window, re-sync), +`Closed => break`, on both the system and domain arms of the `select!`. +**Pinned by** `subscription_survives_broadcast_lag` (`tests/ws_handshake.rs`): +subscribes to a filtered event type, floods 6,000 unrelated events past the +4,096 capacity to force a `Lagged`, then asserts a subsequent subscribed event +is still delivered (old code: 5s-timeout panic). + +### Dimensions confirmed clean (with evidence) + +- **AuthN/AuthZ** — all 7 other REST handlers gate on `BearerAuth::from_headers` + → `LongLivedTokenStore::is_valid` before any work; the WS handshake validates + the `auth` token against the same store before the command loop, and + privileged commands are unreachable pre-`auth_ok`. Token compare is + `HashSet::contains` (content-independent timing — not the byte-`==` oracle of + ADR-157 §B4), so no timing-oracle finding. No route skips the gate; no + result-ignored check; no default/empty token accepted. +- **Path traversal** — no route maps user input to a filesystem path (state is an + in-memory `DashMap`); `:entity_id` passes through `EntityId::parse`, a strict + `[a-z0-9_]+\.[a-z0-9_]+` ASCII allowlist that rejects `..`, `/`, `\`, and + absolute paths. No traversal surface. +- **Injection** — no SQL, no shell/subprocess, no `format!`-into-response; + service/state bodies are typed `serde_json::Value` handed to the in-process + registry (HA-equivalent). +- **Info-leak** — `ApiError` maps to fixed status + a typed `{message}`; + `ServiceError::HandlerFailed(String)` is integration-controlled (HA surfaces + the handler error too), never framework internals/paths/stack-traces — no + ADR-080-class leak. +- **CORS** — explicit allowlist with `allow_credentials(false)` (HC-05), + not `permissive()`. +- **De-magic** — no bare security-relevant literals in the crate worth + extracting (`EVENT_CHANNEL_CAPACITY` is already named in `homecore`; CORS + dev-default ports are documented). + +**Tests:** `homecore-api --no-default-features` **25 → 29** (+2 api-root auth, ++1 api-root accept-guard, +1 WS lag-survival), 0 failed. Workspace green. +Python deterministic proof unchanged (homecore-api is off the signal proof +path). diff --git a/v2/crates/homecore-api/src/rest.rs b/v2/crates/homecore-api/src/rest.rs index b2965fd3..ba5f5dbf 100644 --- a/v2/crates/homecore-api/src/rest.rs +++ b/v2/crates/homecore-api/src/rest.rs @@ -12,8 +12,20 @@ use crate::state::SharedState; #[derive(Serialize)] pub struct ApiRunning { message: &'static str } -pub async fn api_root() -> Json { - Json(ApiRunning { message: "API running." }) +/// `GET /api/` — the HA `APIStatusView` ("API running." ping). +/// +/// Security (HC-API-AUTH-01): HA's `APIStatusView` inherits +/// `requires_auth = True` from `HomeAssistantView`, so an unauthenticated +/// (or wrong-token) request to `/api/` returns **401**, not 200. HA +/// clients (and the companion app) rely on this status route as a +/// *token-validation probe* — a 200 here would tell a client a bad token +/// is good, and would let an unauthenticated party confirm a live +/// HOMECORE-API endpoint. The P2 handler skipped the bearer gate that +/// every sibling route applies; this restores wire-compat by validating +/// the bearer like `get_config`/`get_states` before replying. +pub async fn api_root(headers: HeaderMap, State(s): State) -> ApiResult> { + let _ = BearerAuth::from_headers(&headers, s.tokens()).await?; + Ok(Json(ApiRunning { message: "API running." })) } #[derive(Serialize)] diff --git a/v2/crates/homecore-api/src/ws.rs b/v2/crates/homecore-api/src/ws.rs index cea9f77f..352cbec1 100644 --- a/v2/crates/homecore-api/src/ws.rs +++ b/v2/crates/homecore-api/src/ws.rs @@ -298,7 +298,17 @@ impl Connection { } } Ok(_) => {} - Err(_) => break, + // A slow consumer that falls >4,096 events behind + // gets `Lagged(n)`, which is RECOVERABLE: the bus + // doc (`bus.rs` §"Lagged receivers must re-sync") + // and HA's WS contract both keep the subscription + // alive across a lag. The pre-fix `Err(_) => break` + // treated `Lagged` as fatal, silently killing the + // client's event stream on a burst (HC-WS-LAG-01). + // Skip the dropped window and continue; only a + // `Closed` sender ends the task. + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, }, evt = domain_rx.recv() => match evt { Ok(de) => { @@ -316,7 +326,12 @@ impl Connection { if tx_clone.send(payload.to_string()).is_err() { break; } } } - Err(_) => break, + // Same recoverable-lag handling as the system arm + // above (HC-WS-LAG-01): a lagged domain-event + // receiver re-syncs and continues; only `Closed` + // terminates the subscription. + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, } } } diff --git a/v2/crates/homecore-api/tests/server_bin_auth.rs b/v2/crates/homecore-api/tests/server_bin_auth.rs index feea856e..a1f886c1 100644 --- a/v2/crates/homecore-api/tests/server_bin_auth.rs +++ b/v2/crates/homecore-api/tests/server_bin_auth.rs @@ -75,3 +75,72 @@ async fn from_env_path_enforces_whitelist() { assert!(!store.is_valid("not_in_whitelist").await); assert!(!store.is_dev_mode().await, "from_env must NOT be dev mode"); } + +// ─── HC-API-AUTH-01: `GET /api/` must be auth-gated like every sibling ─── +// +// HA's `APIStatusView` inherits `requires_auth = True`, so `/api/` returns +// 401 for a missing/wrong bearer and 200 only for a valid one. The pre-fix +// `api_root` took no headers and unconditionally returned 200 — these two +// tests FAIL on that code. + +#[tokio::test] +async fn api_root_rejects_missing_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::UNAUTHORIZED, + "GET /api/ with NO bearer must be 401 (HC-API-AUTH-01) — HA's \ + APIStatusView requires_auth=True; a 200 here lets an \ + unauthenticated party confirm a live endpoint and tells a \ + token-validation probe a bad token is good" + ); +} + +#[tokio::test] +async fn api_root_rejects_wrong_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/") + .header("Authorization", "Bearer the_wrong_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::UNAUTHORIZED, + "GET /api/ with a WRONG bearer must be 401 (HC-API-AUTH-01)" + ); +} + +#[tokio::test] +async fn api_root_accepts_correct_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/") + .header("Authorization", "Bearer the_real_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "GET /api/ with the correct bearer must still return 200 (API running.)" + ); +} diff --git a/v2/crates/homecore-api/tests/ws_handshake.rs b/v2/crates/homecore-api/tests/ws_handshake.rs index b4df3424..b629753b 100644 --- a/v2/crates/homecore-api/tests/ws_handshake.rs +++ b/v2/crates/homecore-api/tests/ws_handshake.rs @@ -166,3 +166,100 @@ async fn ping_pong_reply_is_received() { assert_eq!(reply["type"], "pong"); assert_eq!(reply["id"], 7); } + +/// Variant of [`spawn_server_with_token`] that also returns a `HomeCore` +/// handle (cheap `Arc` clone) so the test can fire events into the *same* +/// bus the served subscription reads from. +async fn spawn_server_returning_homecore(valid_token: &str) -> (SocketAddr, HomeCore) { + let hc = HomeCore::new(); + let tokens = LongLivedTokenStore::empty(); + tokens.register(valid_token).await; + let state = SharedState::with_tokens(hc.clone(), "Test", "test-version", tokens); + let app = router(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + (addr, hc) +} + +#[tokio::test] +async fn subscription_survives_broadcast_lag() { + // HC-WS-LAG-01: the per-subscription event task must treat a broadcast + // `Lagged(n)` as RECOVERABLE (re-sync + continue), matching the bus + // contract ("Lagged receivers must re-sync") and HA's WS semantics. + // + // The pre-fix `Err(_) => break` killed the whole event-stream task on + // the first lag, so after a >4,096-event burst the client's stream + // went permanently silent. This test fires far more than the 4,096 + // channel capacity to force a `Lagged`, then fires ONE more event and + // asserts the subscription still delivers it. FAILS (5s timeout) on + // the old code because the task is already dead. + use homecore::{Context, DomainEvent}; + + let (addr, hc) = spawn_server_returning_homecore("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let _ = next_json(&mut ws).await; // auth_required + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + let auth = next_json(&mut ws).await; + assert_eq!(auth["type"], "auth_ok"); + + // Subscribe to a specific domain event type so unrelated traffic is + // filtered out and we can deterministically match the post-lag event. + ws.send(Message::Text( + serde_json::json!({"id": 1, "type": "subscribe_events", "event_type": "lag_probe"}) + .to_string(), + )) + .await + .unwrap(); + let ack = next_json(&mut ws).await; // result ok for the subscribe + assert_eq!(ack["type"], "result"); + assert_eq!(ack["success"], true); + + // Flood the bus far past EVENT_CHANNEL_CAPACITY (4,096) with events the + // subscription FILTERS OUT (different event_type). Because the client + // never reads them off the WS, the server-side broadcast receiver falls + // behind and the NEXT `recv()` yields `Lagged`. We fire synchronously + // and don't yield to the WS reader, guaranteeing the overflow. + for i in 0..6000u32 { + hc.bus().fire_domain(DomainEvent::new( + "noise", + serde_json::json!({ "i": i }), + Context::new(), + )); + } + + // Now fire the event the client IS subscribed to. On the fixed code the + // task recovered from `Lagged` and continues, so this is delivered. On + // the old code the task broke on `Lagged` and this never arrives. + hc.bus().fire_domain(DomainEvent::new( + "lag_probe", + serde_json::json!({ "marker": "post-lag" }), + Context::new(), + )); + + // Drain frames until we see our post-lag event (ignoring any noise the + // filter let slip before the lag), bounded by a timeout. + let got = tokio::time::timeout(std::time::Duration::from_secs(5), async { + loop { + let v = next_json(&mut ws).await; + if v["type"] == "event" && v["event"]["event_type"] == "lag_probe" { + return v; + } + } + }) + .await + .expect( + "subscription went silent after a broadcast lag — Lagged was treated \ + as fatal (HC-WS-LAG-01)", + ); + assert_eq!(got["event"]["data"]["marker"], "post-lag"); +}