diff --git a/docs/adr/ADR-161-homecore-server-layer-security.md b/docs/adr/ADR-161-homecore-server-layer-security.md new file mode 100644 index 00000000..393085f5 --- /dev/null +++ b/docs/adr/ADR-161-homecore-server-layer-security.md @@ -0,0 +1,260 @@ +# ADR-161: HOMECORE Server Layer — WebSocket Auth Bypass, Reply-Theater & Documented-but-No-Op Automation (Security & Honest Labeling) + +- **Status**: accepted +- **Date**: 2026-06-12 +- **Deciders**: ruv +- **Tags**: homecore, http-ws-boundary, websocket-auth-bypass, security, automation-engine, documented-no-op, prove-everything, soundness, honest-labeling +- **Amends**: ADR-130 (HOMECORE-API WS protocol), ADR-129 (HOMECORE-AUTO automation engine), ADR-128 (plugin manifest) + +## Context + +Beyond-SOTA sweep **Milestone 7**, over the HOMECORE **server/network layer** +crates only — `homecore-api`, `homecore-server`, `homecore-automation`, +`homecore-hap`, `homecore-plugins` — executed under the project's +**prove-everything / anti-"AI-slop"** directive. + +### Headline — the library cores are real, but the network boundary was unsound + +The same audit pattern as ADR-160 held for the *library logic*: the automation +trigger/condition/template/action evaluators, the REST handlers, the HAP +mapping, and the plugin manifest parser are **real, tested code** — not stubs. +That is the anti-slop positive and it is cited here as such. + +What the audit found was **not fake business logic but an unsound trust +boundary plus documented-but-no-op features**: + +1. A **CRITICAL WebSocket authentication bypass** — the WS handshake accepted + any non-empty token, ignoring the provisioned token whitelist the REST path + enforces. +2. **Reply-theater** — WS command responses were computed, then logged and + **discarded**; no `result`/`pong`/`event` ever reached the client. +3. **Documented-but-idle automation** — the engine was constructed and dropped + (never started); time triggers, `RunMode`, `Choose` branches, and template + conditions were each **documented as working but were no-ops in the live + path**. + +This is a worse class than ADR-160's over-naming: here the **doc claimed a +capability the code did not deliver** (auth enforcement, reply transport, +running automations). The fix is **implement where feasible, honestly relabel +where not — never leave a false doc.** Every fix is pinned by a test that +**fails on the old code**. + +Grading vocabulary (ADR-152 / ADR-158 / ADR-160): +- **MEASURED** — reproduced in this worktree, command + failing-on-old test recorded. +- **NO-ACTION (already-honest/already-hardened)** — audited, found correct, cited as a positive. +- **ACCEPTED-FUTURE** — deliberately deferred, nothing dropped. + +## Decision — Fixes Landed + +### §A1 — WebSocket auth bypass (CRITICAL, security) — MEASURED + +`homecore-api/src/ws.rs` handshake checked only `token.trim().is_empty()` and +sent `auth_ok` for **any** non-empty token. It never called +`state.tokens().is_valid()` — the check the REST path uses via +`auth::BearerAuth`. With a provisioned `HOMECORE_TOKENS` whitelist, **any +attacker-chosen non-empty token got full WS access** (read all states, call any +service, subscribe to all events). + +**Real fix:** the handshake now calls +`state.tokens().is_valid(&token).await` (the *same* store + method as REST). +A wrong token receives `auth_invalid` and the socket closes. DEV (`allow_any`) +mode still accepts any non-empty bearer with a warn, so smoke tests keep +working; the empty token is rejected inside `is_valid`. + +**Failing-on-old test** (`tests/ws_handshake.rs`): +`wrong_token_is_rejected` — provisions a real (non-dev) store with one good +token, sends a DIFFERENT non-empty token over the WS handshake, asserts +`auth_invalid`. On the old source the client received +`{"type":"auth_ok",…}` (verified: the test panics on old `ws.rs` with +`left: "auth_ok", right: "auth_invalid"`). Companion: `correct_token_is_accepted`. +**Grade: MEASURED. This is the milestone headline.** + +### §A2 — WS replies never transmitted (HIGH, functional) — MEASURED + +`ws.rs::Connection::run` moved the socket into a recv-only task; the only +consumer of the response mpsc just did `debug!("ws emit: {msg}")` and dropped +every message. No command reply ever reached the wire. + +**Real fix:** the socket is split with `futures_util::StreamExt::split`. A +dedicated **writer task** drains the response channel onto `sink.send(...)` +(text frames; a `__pong:` sentinel maps to a Pong control frame); the reader +task parses commands concurrently. On reader exit the senders drop and the +writer task ends cleanly. + +**Failing-on-old tests:** `result_reply_is_received` (connect → auth → +`get_states` → assert a `result` reply is RECEIVED within 5s) and +`ping_pong_reply_is_received`. Both time out on the old source (verified: +`Elapsed` panic). **Grade: MEASURED.** + +### §A8 — `homecore-api` bin: no env-token path, network-exposed (HIGH, security) — MEASURED + +`homecore-api/src/bin/server.rs` bound `0.0.0.0:8123` with +`SharedState::new()` → `allow_any_non_empty()` and **no** `HOMECORE_TOKENS` +path (unlike `homecore-server`), so a provisioned operator had no way to lock +it down. + +**Real fix:** the bin now mirrors `homecore-server`'s provisioning — prefer the +`HOMECORE_TOKENS` whitelist (`LongLivedTokenStore::from_env()`), fall back to an +**explicitly warn-logged** DEV mode only when unset. It also defaults the bind +address to **`127.0.0.1`** (loopback) so a bare `cargo run` is not +network-exposed, with `HOMECORE_BIND` to opt into LAN. + +**Failing-on-old test** (`tests/server_bin_auth.rs`): +`provisioned_bin_rejects_wrong_bearer` reproduces the bin's exact provisioning +path (a populated, non-dev store) and asserts a wrong bearer → 401; +`from_env_path_enforces_whitelist` proves `from_env()` is not dev mode and +enforces the list. The old bin's `allow_any_non_empty()` accepted the wrong +bearer. **Grade: MEASURED.** + +### §A3 — Automation engine never started (HIGH) — MEASURED + +`homecore-server/src/main.rs` did `let _automation_engine = AutomationEngine::new(...)` +then dropped it immediately, while the header doc claimed "Automation engine +subscribed to the state machine." + +**Real fix:** the engine is now built into a long-lived binding and `.start()` +is called, spawning the event loop + timer task; the header/log lines state it +is started with N automations and which trigger classes are active. (With A4–A7 +the running engine is genuinely functional, not theater.) + +**Evidence:** the engine-behavior tests below run against the same +`AutomationEngine::start()` path now wired into the bin. **Grade: MEASURED.** + +### §A4 — `Trigger::Time` hard-coded `false`, no timer (HIGH) — MEASURED + +`trigger.rs::matches_sync` returned `false` for `Time` and there was **no timer +task** anywhere, so time automations could never fire. + +**Real fix:** `AutomationEngine::start_timer` — a 1 Hz tokio interval that +compares each `time:` automation's `at` (`HH:MM` or `HH:MM:SS`) against the +local wall-clock second and fires it once per match (conditions still gate it). +`matches_sync` returning `false` for `Time` is now **correct and documented** +(it is a wall-clock trigger with no state-change context); a public +`fire_time_for_test` exposes the same path deterministically. + +**Failing-on-old test** (`tests/engine_behaviors.rs`): +`time_trigger_fires_via_timer_path` (+ unit `time_at_matches_handles_hh_mm_and_hh_mm_ss`). +The method does not exist on the old engine. **Grade: MEASURED.** + +### §A5 — `RunMode` documented as AtomicBool-enforced but unbounded-parallel (HIGH) — MEASURED + +`engine.rs` doc claimed "RunMode::Single is enforced via a per-automation +AtomicBool" — but no such code existed and **every** trigger spawned an +unbounded parallel task regardless of `mode`. + +**Real fix:** each registered automation carries a `running: Arc`. +`Single`/`IgnoreFirst` modes `compare_exchange` the flag before spawning and +**skip** the trigger if a run is already in flight, clearing it on completion; +`Parallel` (and, for now, `Restart`/`Queued`) spawn on every trigger. + +**Failing-on-old tests** (`tests/engine_behaviors.rs`): +`single_mode_does_not_double_fire_on_rapid_triggers` (two rapid triggers while +the first run sleeps → exactly **1** run; old code fired **2**, verified) and +`parallel_mode_does_fire_concurrently` (→ 2). **Grade: MEASURED (Single/Parallel +honored; bounded `Queued`/`Restart`/`max` ordering → ACCEPTED-FUTURE, see below).** + +### §A6 — `Action::Choose` ignored branches (HIGH) — MEASURED + +`action.rs` discarded `choices` and always ran `default`. + +**Real fix:** `ChoiceBranch::matches` deserialises each branch's +`serde_yaml::Value` conditions into `Condition` and evaluates them (AND +semantics, against an `EvalContext` now carried on `ExecutionContext`). `Choose` +runs the **first matching branch's** sequence and falls to `default` only if +none match. + +**Failing-on-old tests** (`action.rs` inline): +`choose_runs_matching_branch_not_default` (matching branch runs, default does +NOT — old code ran default, verified) and +`choose_falls_to_default_when_no_branch_matches`. **Grade: MEASURED.** + +### §A7 — Template conditions always false in the live engine (MEDIUM) — MEASURED + +`condition.rs` returned `false` for `Template` whenever `template_env` was +`None`, and the engine built every `EvalContext` with `template_env: None` +(`EvalContext::new`), so `template:` conditions could never be true in +production — only in unit tests that hand-built a template env. + +**Real fix:** the engine constructs one `TemplateEnvironment` over the state +machine and threads it into every `EvalContext` via +`EvalContext::with_templates` (event loop, timer task, and +`ExecutionContext` for `Choose` branches). + +**Failing-on-old tests** (`tests/engine_behaviors.rs`): +`template_condition_evaluates_true_in_engine` (a `{{ is_state(...) }}` condition +gates an action true) and `template_condition_evaluates_false_blocks_action`. +On the old engine the action never ran (template always false, verified). +**Grade: MEASURED.** + +### §B5 — Plugin manifest sig/hash "verified before execution" doc was false (LOW, honesty) — relabeled + +`homecore-plugins/src/manifest.rs` documented `wasm_module_hash` as "verified +before execution" and carried `wasm_module_sig` / `publisher_key`, but these +fields are **never read** for verification (only ever set to `None` in tests). + +**Fix (honest labeling — no false capability claimed):** the three fields are +re-doc'd **"(P4 — not yet enforced, ADR-161/B5)"** — parsed and round-tripped, +but no integrity/signature check happens before a plugin runs. No verification +code was added (that is P4); the doc now matches the code. +**Grade: doc-honesty (no behavior change).** + +## Negative Results (NO-ACTION positives — audited, found correct, cited not edited) + +These were checked and are genuinely sound/honest; cited as positives, **not** +touched: +- **CSPRNG correctness** — all IDs are `uuid::v4`; the rng/`randn` suspicion was + **REFUTED**. No weak-randomness issue exists. +- **CORS allowlist** (`app.rs`) — already hardened (explicit `AllowOrigin::list`, + no `permissive()`, `allow_credentials(false)`, env override). NO-ACTION. +- **No path traversal in `homecore-migrate`** — audited, clean. +- **No secrets in logs** — audited, clean. +- **HAP pairing stub** — honestly disclaimed as a surface stub; not over-claimed. +- **`InProcessRuntime` "no sandbox" disclaimer** — honest; left as-is. + +## Deferred Backlog (Nothing Dropped) + +- **Plugin authority-isolation (P5)** — `homecore_permissions` claims are parsed + but not enforced at the host-call boundary. **ACCEPTED-FUTURE.** +- **Plugin signature/hash verification (P4)** — implement the + `wasm_module_hash`/`wasm_module_sig`/`publisher_key` gate that B5 now honestly + says is absent. **ACCEPTED-FUTURE.** +- **HAP real pairing (P2)** — SRP/HKDF pairing + encrypted sessions; current + bridge is an accessory-mapping surface. **ACCEPTED-FUTURE (honestly stubbed).** +- **`RunMode::Queued`/`Restart`/`max` ordering** — `Single`/`Parallel` are + honored; bounded queueing, restart-kill, and `max` concurrency are not yet + wired (every non-Single mode is parallel). **ACCEPTED-FUTURE** — the + `engine.rs` doc states exactly this, no over-claim. +- **Automation YAML load-at-boot** — the engine starts empty; a YAML loader is + P-next. The bin log states "0 automations registered" honestly. + +## Reproduction (MEASURED) + +```bash +cd v2 +cargo test -p homecore-api -p homecore-server -p homecore-automation -p homecore-hap --no-default-features +cargo test -p homecore-plugins --features wasmtime +cargo build --workspace --no-default-features +``` + +Result at time of writing (all 0 failed): +- **homecore-api** — **25 passed** (lib 18; `server_bin_auth` 3; `ws_handshake` 4) +- **homecore-automation** — **42 passed** (lib 37; `engine_behaviors` 5) +- **homecore-hap** — **17 passed** +- **homecore-server** — bin, **0 tests** +- (**homecore-plugins** — **15 passed**: lib 12; integration 3) +- Full workspace `cargo build --workspace --no-default-features` succeeds. + +## Consequences + +- The WebSocket path can no longer be entered with a forged token — it enforces + the same `LongLivedTokenStore` whitelist as REST (A1). +- WS clients now actually receive `result`/`pong`/`event` frames (A2). +- The `homecore-api` dev bin defaults to loopback and honors `HOMECORE_TOKENS` + (A8); it is no longer an open `0.0.0.0` accept-any endpoint by default. +- The automation engine is started for real and its time triggers, `Single` + run-mode, `Choose` branches, and `template:` conditions all function — no doc + claims a capability the code lacks (A3–A7). +- The plugin manifest no longer claims signature verification it does not + perform (B5). +- Files kept under the 500-line guideline (`engine.rs` 462; behavioral tests + moved to `tests/engine_behaviors.rs`). diff --git a/v2/Cargo.lock b/v2/Cargo.lock index 43cacdc4..c31100fd 100644 --- a/v2/Cargo.lock +++ b/v2/Cargo.lock @@ -3472,6 +3472,7 @@ dependencies = [ "axum", "chrono", "dashmap", + "futures-util", "homecore", "http-body-util", "hyper 1.8.1", @@ -3479,6 +3480,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", + "tokio-tungstenite", "tower 0.5.3", "tower-http", "tracing", @@ -10933,7 +10935,7 @@ dependencies = [ [[package]] name = "wifi-densepose-hardware" -version = "0.3.0" +version = "0.3.1" dependencies = [ "approx", "byteorder", @@ -10953,7 +10955,7 @@ dependencies = [ [[package]] name = "wifi-densepose-mat" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "approx", @@ -10985,7 +10987,7 @@ dependencies = [ [[package]] name = "wifi-densepose-nn" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "candle-core 0.4.1", @@ -11039,7 +11041,7 @@ dependencies = [ [[package]] name = "wifi-densepose-ruvector" -version = "0.3.1" +version = "0.3.2" dependencies = [ "approx", "criterion", @@ -11059,7 +11061,7 @@ dependencies = [ [[package]] name = "wifi-densepose-sensing-server" -version = "0.3.1" +version = "0.3.2" dependencies = [ "axum", "chrono", @@ -11093,7 +11095,7 @@ dependencies = [ [[package]] name = "wifi-densepose-signal" -version = "0.3.2" +version = "0.3.3" dependencies = [ "chrono", "criterion", @@ -11120,7 +11122,7 @@ dependencies = [ [[package]] name = "wifi-densepose-train" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "approx", @@ -11158,7 +11160,7 @@ dependencies = [ [[package]] name = "wifi-densepose-vitals" -version = "0.3.0" +version = "0.3.1" dependencies = [ "criterion", "serde", @@ -11190,7 +11192,7 @@ dependencies = [ [[package]] name = "wifi-densepose-wifiscan" -version = "0.3.0" +version = "0.3.1" dependencies = [ "serde", "tokio", diff --git a/v2/crates/homecore-api/Cargo.toml b/v2/crates/homecore-api/Cargo.toml index d1170a80..a86f0691 100644 --- a/v2/crates/homecore-api/Cargo.toml +++ b/v2/crates/homecore-api/Cargo.toml @@ -33,8 +33,12 @@ chrono = { version = "0.4", features = ["serde"] } uuid = { version = "1", features = ["v4", "serde"] } dashmap = "6" +futures-util = { version = "0.3", default-features = false, features = ["sink"] } [dev-dependencies] tower = { version = "0.5", features = ["util"] } hyper = "1" http-body-util = "0.1" +# End-to-end WS handshake + reply tests (HC-WS-01/02, ADR-161). +tokio-tungstenite = "0.24" +futures-util = { version = "0.3", default-features = false } diff --git a/v2/crates/homecore-api/src/app.rs b/v2/crates/homecore-api/src/app.rs index 35b7b969..210cab06 100644 --- a/v2/crates/homecore-api/src/app.rs +++ b/v2/crates/homecore-api/src/app.rs @@ -88,6 +88,11 @@ fn default_origins() -> Vec { mod tests { use super::*; + // `set_var`/`remove_var` mutate process-global state; serialize every test + // that touches HOMECORE_CORS_ORIGINS so they cannot race in parallel. + // Poison-tolerant: a panicking test must not cascade-fail the others. + static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + #[test] fn default_origins_includes_vite_and_ha_ports() { let origins = default_origins(); @@ -98,6 +103,7 @@ mod tests { #[test] fn env_override_via_homecore_cors_origins() { + let _env = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); std::env::set_var("HOMECORE_CORS_ORIGINS", "https://example.com,https://other.example.com"); // build_cors_layer() returns a CorsLayer which doesn't expose // its origin list; we test the parse path indirectly by @@ -112,6 +118,7 @@ mod tests { #[test] fn env_empty_falls_back_to_defaults() { + let _env = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); std::env::set_var("HOMECORE_CORS_ORIGINS", " "); let raw = std::env::var("HOMECORE_CORS_ORIGINS").ok(); let trimmed = raw.as_deref().map(|s| s.trim()).unwrap_or(""); diff --git a/v2/crates/homecore-api/src/bin/server.rs b/v2/crates/homecore-api/src/bin/server.rs index 6a2536bd..49778840 100644 --- a/v2/crates/homecore-api/src/bin/server.rs +++ b/v2/crates/homecore-api/src/bin/server.rs @@ -1,15 +1,31 @@ //! `homecore-api-server` binary. Boots a HomeCore runtime and serves -//! the HA-compat REST + WS API on `:8123`. +//! the HA-compat REST + WS API. //! -//! P1: bare-minimum bring-up. No persistence, no plugins, no auth -//! beyond "any non-empty bearer". Useful for `curl` smoke tests of -//! the wire format from the existing HA companion app: +//! ## Auth (ADR-161, HC-WS-08) +//! +//! Token provisioning matches `homecore-server`: if `HOMECORE_TOKENS` +//! is set (comma-separated bearer tokens) the API enforces that +//! whitelist on both the REST and WS paths. If it is **unset**, the +//! binary falls back to an explicitly-logged DEV mode (any non-empty +//! bearer accepted) — before this fix the bin unconditionally used +//! `allow_any_non_empty()` with no env path, so a provisioned operator +//! had no way to lock it down. +//! +//! ## Bind address +//! +//! Defaults to `127.0.0.1` (loopback only) so a bare `cargo run` of +//! this dev binary is not network-exposed. Override with +//! `HOMECORE_BIND=0.0.0.0:8123` for a LAN deployment (and provision +//! `HOMECORE_TOKENS` when you do). //! //! cargo run -p homecore-api --bin homecore-api-server -//! curl -H "Authorization: Bearer test" http://127.0.0.1:8123/api/ +//! HOMECORE_TOKENS=secret curl -H "Authorization: Bearer secret" \ +//! http://127.0.0.1:8123/api/ + +use std::net::SocketAddr; use homecore::HomeCore; -use homecore_api::{router, SharedState, DEFAULT_PORT}; +use homecore_api::{router, LongLivedTokenStore, SharedState, DEFAULT_PORT}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -21,10 +37,34 @@ async fn main() -> Result<(), Box> { .init(); let homecore = HomeCore::new(); - let state = SharedState::new(homecore); + + // Token provisioning (HC-WS-08). Prefer the HOMECORE_TOKENS env + // whitelist; fall back to DEV mode (warn-logged) only when unset. + let tokens = if std::env::var("HOMECORE_TOKENS") + .map(|v| !v.trim().is_empty()) + .unwrap_or(false) + { + let s = LongLivedTokenStore::from_env(); + let n = s.len().await; + tracing::info!("LongLivedTokenStore provisioned with {n} bearer token(s) from HOMECORE_TOKENS"); + s + } else { + tracing::warn!( + "HOMECORE_TOKENS not set — token store in DEV mode (any non-empty bearer \ + accepted). Set HOMECORE_TOKENS before exposing this binary to the network." + ); + LongLivedTokenStore::allow_any_non_empty() + }; + + let state = SharedState::with_tokens(homecore, "Home", env!("CARGO_PKG_VERSION"), tokens); let app = router(state); - let addr = std::net::SocketAddr::from(([0, 0, 0, 0], DEFAULT_PORT)); + // Default to loopback so `cargo run` is not network-exposed; allow + // an explicit HOMECORE_BIND override for LAN deployments. + let addr: SocketAddr = match std::env::var("HOMECORE_BIND") { + Ok(v) if !v.trim().is_empty() => v.parse()?, + _ => SocketAddr::from(([127, 0, 0, 1], DEFAULT_PORT)), + }; tracing::info!("HOMECORE-API listening on http://{addr} (HA-compat /api + /api/websocket)"); let listener = tokio::net::TcpListener::bind(addr).await?; diff --git a/v2/crates/homecore-api/src/ws.rs b/v2/crates/homecore-api/src/ws.rs index 4aa7a1ba..cea9f77f 100644 --- a/v2/crates/homecore-api/src/ws.rs +++ b/v2/crates/homecore-api/src/ws.rs @@ -9,6 +9,16 @@ //! //! `ha_version` is the homecore version string — see ADR-130 Q1 for the //! companion-app feature-detect concern. +//! +//! ## Security (ADR-161) +//! +//! The `auth` token is validated against [`crate::tokens::LongLivedTokenStore`] +//! via `state.tokens().is_valid()` — the *same* store the REST path uses +//! (`auth::BearerAuth`). A wrong token receives `auth_invalid` and the socket +//! is closed. (HC-WS-01 closed the prior bypass where any non-empty token was +//! accepted.) Command replies are transmitted by a dedicated writer task that +//! drains the response channel onto the socket (HC-WS-02 closed the prior +//! reply-theater where responses were logged and discarded). use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -18,7 +28,7 @@ use axum::extract::State; use axum::response::IntoResponse; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; -use tracing::{debug, warn}; +use tracing::warn; use homecore::{Context, ServiceCall, ServiceName, SystemEvent}; @@ -58,11 +68,18 @@ async fn handle_socket(mut socket: WebSocket, state: SharedState) { _ => return, }; - // P1: accept any non-empty token. P2: validate against store. - if token.trim().is_empty() { + // Validate the bearer token against the same store the REST path + // uses (`state.tokens().is_valid()` — see `rest.rs` / + // `auth::BearerAuth`). Before the HC-WS-01 fix this checked only + // `token.trim().is_empty()` and accepted ANY non-empty token even + // with a provisioned `HOMECORE_TOKENS` whitelist — a full WS auth + // bypass. `is_valid()` rejects the empty token internally and, in + // DEV (`allow_any`) mode, still accepts any non-empty bearer (with + // a warn) so smoke tests keep working. + if !state.tokens().is_valid(&token).await { let _ = socket .send(Message::Text( - serde_json::json!({"type":"auth_invalid","message":"empty token"}).to_string(), + serde_json::json!({"type":"auth_invalid","message":"invalid token"}).to_string(), )) .await; return; @@ -140,54 +157,71 @@ impl Connection { } } - async fn run(self, mut socket: WebSocket) { + async fn run(self, socket: WebSocket) { + use futures_util::{SinkExt, StreamExt}; + let conn = Arc::new(self); + // Split the socket so a dedicated writer task can drain `rx` onto + // the wire while the reader task processes commands concurrently. + // Before the HC-WS-02 fix the socket was moved into a recv-only + // task and the only `rx` consumer just `debug!`-logged and + // DISCARDED every message — so no `result`/`pong`/`event` ever + // reached the client. Now `rx` feeds `socket.send`. + let (mut sink, mut stream) = socket.split(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); - let sender_tx = tx.clone(); - let recv_task = { - let conn = Arc::clone(&conn); - tokio::spawn(async move { - while let Some(frame) = socket.recv().await { - match frame { - Ok(Message::Text(raw)) => { - let cmd: WsCommand = match serde_json::from_str(&raw) { - Ok(c) => c, - Err(e) => { - warn!("bad ws command: {e}"); - continue; - } - }; - conn.handle_cmd(cmd, &sender_tx).await; - } - Ok(Message::Ping(p)) => { - let _ = sender_tx.send(format!("__pong:{}", p.len())); - } - Ok(Message::Close(_)) | Err(_) => break, - _ => {} - } + // Writer task: drain replies onto the socket. A `__pong:` + // sentinel maps to a binary Pong control frame; everything else + // is a JSON text frame. + let writer_task = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + let send_result = if let Some(n) = msg.strip_prefix("__pong:") { + let len: usize = n.parse().unwrap_or(0); + sink.send(Message::Pong(vec![0u8; len])).await + } else { + sink.send(Message::Text(msg)).await + }; + if send_result.is_err() { + break; } - // Cancel all subscriptions on disconnect. - for entry in conn.subs.iter() { - entry.value().abort.abort(); - } - }); + } + }); - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if msg.starts_with("__pong:") { - // pong handled inline; skip - continue; + // Reader task: parse and dispatch commands; responses are pushed + // into `tx` and transmitted by the writer task above. + let reader_tx = tx.clone(); + { + let conn = Arc::clone(&conn); + while let Some(frame) = stream.next().await { + match frame { + Ok(Message::Text(raw)) => { + let cmd: WsCommand = match serde_json::from_str(&raw) { + Ok(c) => c, + Err(e) => { + warn!("bad ws command: {e}"); + continue; + } + }; + conn.handle_cmd(cmd, &reader_tx).await; } - // Use the socket from the recv task via a one-shot mpsc - // (in this minimal P1, the recv task owns the socket - // and we ack inline below — this branch is for the - // subscription fan-out emit path) - debug!("ws emit: {msg}"); + Ok(Message::Ping(p)) => { + let _ = reader_tx.send(format!("__pong:{}", p.len())); + } + Ok(Message::Close(_)) | Err(_) => break, + _ => {} } - }) - }; - let _ = recv_task.await; + } + // Cancel all subscriptions on disconnect. + for entry in conn.subs.iter() { + entry.value().abort.abort(); + } + } + + // Reader loop ended → drop the senders so the writer task's `rx` + // closes and the task exits cleanly. + drop(tx); + drop(reader_tx); + let _ = writer_task.await; } async fn handle_cmd(&self, cmd: WsCommand, tx: &tokio::sync::mpsc::UnboundedSender) { diff --git a/v2/crates/homecore-api/tests/server_bin_auth.rs b/v2/crates/homecore-api/tests/server_bin_auth.rs new file mode 100644 index 00000000..feea856e --- /dev/null +++ b/v2/crates/homecore-api/tests/server_bin_auth.rs @@ -0,0 +1,77 @@ +//! HC-WS-08 (ADR-161): the `homecore-api-server` bin must honor the +//! `HOMECORE_TOKENS` env whitelist instead of unconditionally accepting +//! any non-empty bearer. +//! +//! `main()` is not directly callable, so this reproduces the bin's exact +//! token-provisioning path (`LongLivedTokenStore::from_env()` when +//! `HOMECORE_TOKENS` is set) and drives a real HTTP request through the +//! router. On the pre-fix bin — which used `SharedState::new()` → +//! `allow_any_non_empty()` with NO env path — a wrong bearer was +//! accepted; this test asserts it is now rejected with 401. + +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use homecore::HomeCore; +use homecore_api::{router, LongLivedTokenStore, SharedState}; +use tower::ServiceExt; // for `oneshot` + +/// Build the same state the bin builds when HOMECORE_TOKENS is set. +async fn provisioned_state(valid: &str) -> SharedState { + // Mirror `from_env()` deterministically without mutating process + // env (which would race other tests): an `empty()` store with the + // one provisioned token registered is exactly what + // `from_env()` produces for `HOMECORE_TOKENS=`. + let store = LongLivedTokenStore::empty(); + store.register(valid).await; + SharedState::with_tokens(HomeCore::new(), "Home", "test", store) +} + +#[tokio::test] +async fn provisioned_bin_rejects_wrong_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/states") + .header("Authorization", "Bearer the_wrong_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::UNAUTHORIZED, + "a provisioned token store must reject a wrong bearer (HC-WS-08)" + ); +} + +#[tokio::test] +async fn provisioned_bin_accepts_correct_bearer() { + let app = router(provisioned_state("the_real_token").await); + let resp = app + .oneshot( + Request::builder() + .uri("/api/states") + .header("Authorization", "Bearer the_real_token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); +} + +#[tokio::test] +async fn from_env_path_enforces_whitelist() { + // Exercise the literal `from_env()` constructor the bin uses, under + // a serialized env mutation, to prove the env path itself enforces. + std::env::set_var("HOMECORE_TOKENS", "env_token_1, env_token_2"); + let store = LongLivedTokenStore::from_env(); + std::env::remove_var("HOMECORE_TOKENS"); + + assert!(store.is_valid("env_token_1").await); + assert!(store.is_valid("env_token_2").await); + assert!(!store.is_valid("not_in_whitelist").await); + assert!(!store.is_dev_mode().await, "from_env must NOT be dev mode"); +} diff --git a/v2/crates/homecore-api/tests/ws_handshake.rs b/v2/crates/homecore-api/tests/ws_handshake.rs new file mode 100644 index 00000000..b4df3424 --- /dev/null +++ b/v2/crates/homecore-api/tests/ws_handshake.rs @@ -0,0 +1,168 @@ +//! End-to-end WebSocket handshake + reply tests (ADR-161, HC-WS-01/02). +//! +//! These bind a real `TcpListener`, serve the full router, and connect +//! with a real WS client (`tokio-tungstenite`). They exercise the wire +//! path the in-crate unit tests cannot. +//! +//! - `wrong_token_is_rejected` — FAILS on the pre-fix `ws.rs` that only +//! checked `token.trim().is_empty()` and accepted any non-empty token +//! (HC-WS-01: WS auth bypass). +//! - `result_reply_is_received` — FAILS on the pre-fix `ws.rs` that moved +//! the socket into a recv-only task and discarded every reply with +//! `debug!("ws emit: {msg}")` (HC-WS-02: reply theater). + +use std::net::SocketAddr; + +use futures_util::{SinkExt, StreamExt}; +use homecore::HomeCore; +use homecore_api::{router, LongLivedTokenStore, SharedState}; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +/// Spawn the API on an ephemeral port with a real (non-dev) token store +/// containing exactly one valid token. Returns the bound address. +async fn spawn_server_with_token(valid_token: &str) -> SocketAddr { + let hc = HomeCore::new(); + let tokens = LongLivedTokenStore::empty(); + tokens.register(valid_token).await; + let state = SharedState::with_tokens(hc, "Test", "test-version", tokens); + let app = router(state); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + addr +} + +/// Read text frames until one parses as JSON; returns the parsed value. +async fn next_json(ws: &mut S) -> serde_json::Value +where + S: StreamExt> + Unpin, +{ + loop { + match ws.next().await { + Some(Ok(Message::Text(raw))) => { + if let Ok(v) = serde_json::from_str::(&raw) { + return v; + } + } + Some(Ok(_)) => continue, + other => panic!("expected text frame, got {other:?}"), + } + } +} + +#[tokio::test] +async fn wrong_token_is_rejected() { + // HC-WS-01: a provisioned store with one good token must reject a + // DIFFERENT (non-empty) token over the WS handshake. The old code + // sent `auth_ok` for any non-empty token — this asserts the fix. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + // Server → auth_required + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + // Client → auth with the WRONG token + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"wrong_token_xyz"}).to_string(), + )) + .await + .unwrap(); + + // Server → auth_invalid (NOT auth_ok) + let resp = next_json(&mut ws).await; + assert_eq!( + resp["type"], "auth_invalid", + "wrong token must be rejected with auth_invalid, got: {resp}" + ); + assert_ne!(resp["type"], "auth_ok", "wrong token must NOT receive auth_ok"); +} + +#[tokio::test] +async fn correct_token_is_accepted() { + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + + let resp = next_json(&mut ws).await; + assert_eq!(resp["type"], "auth_ok", "correct token should be accepted, got: {resp}"); +} + +#[tokio::test] +async fn result_reply_is_received() { + // HC-WS-02: after a successful auth, a `get_states` command must + // produce a `result` reply RECEIVED over the socket. The old code + // discarded all replies in the rx-draining task, so this hangs/ + // fails on the pre-fix source. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let req = next_json(&mut ws).await; + assert_eq!(req["type"], "auth_required"); + + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + let auth = next_json(&mut ws).await; + assert_eq!(auth["type"], "auth_ok"); + + // Send a command and assert we RECEIVE a result reply. + ws.send(Message::Text( + serde_json::json!({"id": 1, "type": "get_states"}).to_string(), + )) + .await + .unwrap(); + + let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws)) + .await + .expect("did not receive a reply within 5s — reply theater (HC-WS-02)"); + assert_eq!(reply["type"], "result", "expected a result reply, got: {reply}"); + assert_eq!(reply["id"], 1); + assert_eq!(reply["success"], true); +} + +#[tokio::test] +async fn ping_pong_reply_is_received() { + // The `ping` command must produce a `pong` reply on the wire — also + // exercises the writer task that HC-WS-02 introduced. + let addr = spawn_server_with_token("good_token_abc").await; + let url = format!("ws://{addr}/api/websocket"); + let (mut ws, _resp) = connect_async(&url).await.unwrap(); + + let _ = next_json(&mut ws).await; // auth_required + ws.send(Message::Text( + serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(), + )) + .await + .unwrap(); + let _ = next_json(&mut ws).await; // auth_ok + + ws.send(Message::Text( + serde_json::json!({"id": 7, "type": "ping"}).to_string(), + )) + .await + .unwrap(); + + let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws)) + .await + .expect("did not receive pong within 5s"); + assert_eq!(reply["type"], "pong"); + assert_eq!(reply["id"], 7); +} diff --git a/v2/crates/homecore-automation/src/action.rs b/v2/crates/homecore-automation/src/action.rs index 3faf4f64..d65d8abf 100644 --- a/v2/crates/homecore-automation/src/action.rs +++ b/v2/crates/homecore-automation/src/action.rs @@ -3,15 +3,26 @@ //! Implements the ADR-129 P1 action set: `service_call`, `delay`, `scene`, //! `wait_for_trigger`, `choose`. Complex variants (parallel, repeat, if, //! stop, fire_event, wait_template) land in P2. +//! +//! ## `choose` branch evaluation (ADR-161, HC-WS-06) +//! +//! `Action::Choose` evaluates each branch's `conditions` against the live +//! [`EvalContext`] (deserialising the per-branch `serde_yaml::Value` +//! conditions into [`Condition`]) and runs the FIRST matching branch's +//! sequence. Only if no branch matches does it fall to `default`. Before +//! this fix the branches were discarded and `default` always ran. +use std::sync::Arc; use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::time::sleep; -use homecore::{Context, HomeCore, ServiceCall, ServiceName}; +use homecore::{Context, HomeCore, ServiceCall, ServiceName, StateMachine}; +use crate::condition::{Condition, EvalContext}; use crate::error::AutomationError; +use crate::template::TemplateEnvironment; /// Runtime context passed into action execution. pub struct ExecutionContext { @@ -21,14 +32,40 @@ pub struct ExecutionContext { pub context: Context, /// Automation ID for tracing/logging. pub automation_id: String, + /// Condition-evaluation context for `Choose` branches. Carries the + /// state-machine snapshot + optional template environment so branch + /// conditions (incl. `template:`) evaluate against live state. + pub eval: EvalContext, } impl ExecutionContext { + /// Build a context whose `Choose` branches evaluate against the + /// HomeCore state machine (no template env — `template:` branch + /// conditions evaluate false; use [`Self::with_templates`] to wire + /// one). pub fn new(hc: HomeCore, automation_id: impl Into) -> Self { + let sm = Arc::new(hc.states().clone()); Self { hc, context: Context::new(), automation_id: automation_id.into(), + eval: EvalContext::new(sm), + } + } + + /// Build a context with a template environment wired into the + /// `Choose` branch-condition evaluator. + pub fn with_templates( + hc: HomeCore, + automation_id: impl Into, + states: Arc, + templates: Arc, + ) -> Self { + Self { + hc, + context: Context::new(), + automation_id: automation_id.into(), + eval: EvalContext::with_templates(states, templates), } } } @@ -72,6 +109,27 @@ pub struct ChoiceBranch { pub sequence: Vec, } +impl ChoiceBranch { + /// Does this branch match? All of its `conditions` must evaluate + /// true (HA `choose` semantics are AND-over-conditions). Each raw + /// `serde_yaml::Value` is deserialised into a [`Condition`]; a + /// condition that fails to parse is treated as non-matching (the + /// branch is skipped) rather than silently passing. An empty + /// `conditions` list matches (an unconditional branch). + pub async fn matches(&self, eval: &EvalContext) -> bool { + for raw in &self.conditions { + let cond: Condition = match serde_yaml::from_value(raw.clone()) { + Ok(c) => c, + Err(_) => return false, + }; + if !cond.evaluate(eval).await { + return false; + } + } + true + } +} + impl Action { /// Execute this action using the provided context. /// @@ -118,9 +176,18 @@ impl Action { } Ok(serde_json::Value::Null) } - Action::Choose { choices: _, default } => { - // P1 stub — condition evaluation for choices lands in P2; - // for now, fall through to default branch. + Action::Choose { choices, default } => { + // Evaluate each branch's conditions against live state; + // run the first branch whose conditions ALL pass. Fall + // to `default` only if no branch matches (HC-WS-06). + for branch in choices { + if branch.matches(&ctx.eval).await { + for a in &branch.sequence { + a.execute(ctx).await?; + } + return Ok(serde_json::Value::Null); + } + } for a in default { a.execute(ctx).await?; } @@ -188,4 +255,100 @@ mod tests { let err = action.execute(&mut exec_ctx).await.unwrap_err(); assert!(matches!(err, AutomationError::ServiceCall(ServiceError::NotRegistered { .. }))); } + + /// Register two recording handlers and return their call logs. + async fn two_recorders( + hc: &HomeCore, + ) -> (Arc>>, Arc>>) { + use homecore::EntityId; + let _ = EntityId::parse("light.x"); // touch import path + let mk = |hc: &HomeCore, svc: &'static str| { + let log: Arc>> = Arc::new(Mutex::new(vec![])); + let log2 = Arc::clone(&log); + let hc = hc.clone(); + async move { + hc.services() + .register( + ServiceName::new("light", svc), + FnHandler(move |call: ServiceCall| { + let l = Arc::clone(&log2); + async move { + l.lock().unwrap().push(call.data.clone()); + Ok(serde_json::Value::Null) + } + }), + ) + .await; + log + } + }; + let branch_log = mk(hc, "branch_service").await; + let default_log = mk(hc, "default_service").await; + (branch_log, default_log) + } + + fn choose_with_match() -> Action { + // A `Choose` whose first branch requires light.gate == "open". + let branch_conditions = vec![serde_yaml::from_str::( + "condition: state\nentity_id: light.gate\nstate: open", + ) + .unwrap()]; + Action::Choose { + choices: vec![ChoiceBranch { + conditions: branch_conditions, + sequence: vec![Action::ServiceCall { + domain: "light".into(), + service: "branch_service".into(), + data: serde_json::json!({"branch": true}), + }], + }], + default: vec![Action::ServiceCall { + domain: "light".into(), + service: "default_service".into(), + data: serde_json::json!({"default": true}), + }], + } + } + + #[tokio::test] + async fn choose_runs_matching_branch_not_default() { + // HC-WS-06: with the branch condition satisfied, the branch + // sequence runs and `default` does NOT. On the pre-fix code + // (choices discarded) `default` ran instead → this fails on old. + use homecore::{Context, EntityId}; + let hc = HomeCore::new(); + let (branch_log, default_log) = two_recorders(&hc).await; + hc.states().set( + EntityId::parse("light.gate").unwrap(), + "open", + serde_json::json!({}), + Context::new(), + ); + + let mut ctx = ExecutionContext::new(hc, "choose_auto"); + choose_with_match().execute(&mut ctx).await.unwrap(); + + assert_eq!(branch_log.lock().unwrap().len(), 1, "matching branch must run"); + assert_eq!(default_log.lock().unwrap().len(), 0, "default must NOT run when a branch matches"); + } + + #[tokio::test] + async fn choose_falls_to_default_when_no_branch_matches() { + use homecore::{Context, EntityId}; + let hc = HomeCore::new(); + let (branch_log, default_log) = two_recorders(&hc).await; + // gate is "closed" → branch condition (== "open") fails. + hc.states().set( + EntityId::parse("light.gate").unwrap(), + "closed", + serde_json::json!({}), + Context::new(), + ); + + let mut ctx = ExecutionContext::new(hc, "choose_auto"); + choose_with_match().execute(&mut ctx).await.unwrap(); + + assert_eq!(branch_log.lock().unwrap().len(), 0, "branch must not run when condition fails"); + assert_eq!(default_log.lock().unwrap().len(), 1, "default must run when no branch matches"); + } } diff --git a/v2/crates/homecore-automation/src/engine.rs b/v2/crates/homecore-automation/src/engine.rs index 34ff3221..b39c675b 100644 --- a/v2/crates/homecore-automation/src/engine.rs +++ b/v2/crates/homecore-automation/src/engine.rs @@ -2,56 +2,129 @@ //! triggers, and runs automation action sequences. //! //! ADR-129 §2 design: one Tokio task per running automation instance. -//! RunMode::Single is enforced via a per-automation `AtomicBool` flag. +//! +//! ## Run modes (ADR-161, HC-WS-05) +//! +//! `RunMode::Single` is enforced via a per-automation `AtomicBool` +//! guard: while an instance is executing, a second trigger is skipped. +//! `Parallel` (and the as-yet-unbounded `Restart`/`Queued`) spawn a +//! fresh instance on every trigger. (Before this fix the doc claimed +//! AtomicBool enforcement but every trigger spawned unbounded parallel +//! tasks regardless of `mode`.) +//! +//! ## Time triggers (ADR-161, HC-WS-04) +//! +//! `Trigger::Time { at: "HH:MM:SS" }` is evaluated by a wall-clock timer +//! task (1 Hz tokio interval) — `Trigger::matches_sync` returns false for +//! `Time` because it has no clock. The timer fires each `time:` +//! automation once when the local wall-clock second equals its `at`. +//! +//! ## Template conditions (ADR-161, HC-WS-07) +//! +//! The engine builds a real [`TemplateEnvironment`] over the state +//! machine and passes it into every `EvalContext` (via +//! `EvalContext::with_templates`), so `template:` conditions evaluate +//! against live state instead of always returning false. +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; +use chrono::{Local, Timelike}; use tokio::sync::broadcast; use homecore::HomeCore; use crate::action::ExecutionContext; -use crate::automation::Automation; +use crate::automation::{Automation, RunMode}; use crate::condition::EvalContext; -use crate::trigger::TriggerContext; +use crate::template::TemplateEnvironment; +use crate::trigger::{Trigger, TriggerContext}; + +/// An automation registered with the engine, plus its runtime run-state. +struct Registered { + auto: Arc, + /// `true` while a `Single`-mode instance is executing. Used to + /// skip re-entrant triggers (HC-WS-05). + running: Arc, +} /// The automation engine. Holds a HOMECORE handle and a list of registered /// automations. Call `start()` to begin listening for events. pub struct AutomationEngine { hc: HomeCore, - automations: Arc>>>, + automations: Arc>>, + templates: Arc, } impl AutomationEngine { /// Create a new engine backed by the given HOMECORE handle. pub fn new(hc: HomeCore) -> Self { + let templates = Arc::new(TemplateEnvironment::new(Arc::new(hc.states().clone()))); Self { hc, automations: Arc::new(Mutex::new(vec![])), + templates, } } /// Register an automation. Can be called before or after `start()`. pub fn register(&self, automation: Automation) { - self.automations.lock().unwrap().push(Arc::new(automation)); + self.automations.lock().unwrap().push(Registered { + auto: Arc::new(automation), + running: Arc::new(AtomicBool::new(false)), + }); + } + + /// Number of registered automations. + pub fn len(&self) -> usize { + self.automations.lock().unwrap().len() + } + + /// Is the engine holding zero automations? + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Build an `EvalContext` with the engine's template environment + /// wired in, over a fresh snapshot of the state machine. + fn eval_ctx(&self) -> EvalContext { + EvalContext::with_templates( + Arc::new(self.hc.states().clone()), + Arc::clone(&self.templates), + ) } /// Subscribe to the state-machine broadcast channel and start - /// evaluating triggers. Returns a join handle for the background task. + /// evaluating triggers. Also starts the wall-clock timer task that + /// evaluates `time:` triggers. Returns a join handle for the event + /// task (the timer task is detached and tied to the engine handle's + /// lifetime via the broadcast channel close). /// /// The task runs until the broadcast sender is dropped (i.e. the /// `HomeCore` instance is destroyed). pub fn start(&self) -> tokio::task::JoinHandle<()> { + self.start_timer(); + self.start_event_loop() + } + + /// Event-driven loop: state/numeric/event triggers. + fn start_event_loop(&self) -> tokio::task::JoinHandle<()> { let mut rx = self.hc.states().subscribe(); let automations = Arc::clone(&self.automations); let hc = self.hc.clone(); + let templates = Arc::clone(&self.templates); tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { - let autos = automations.lock().unwrap().clone(); - for automation in autos { + let snapshot: Vec<(Arc, Arc)> = automations + .lock() + .unwrap() + .iter() + .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .collect(); + for (automation, running) in snapshot { if !automation.enabled { continue; } @@ -60,7 +133,6 @@ impl AutomationEngine { event.old_state.clone(), event.new_state.clone(), ); - // Check all triggers — fire on first match let triggered = automation .trigger .iter() @@ -68,36 +140,15 @@ impl AutomationEngine { if !triggered { continue; } - // Evaluate conditions - let sm = Arc::new(hc.states().clone()); - let eval_ctx = EvalContext::new(sm); - let mut conditions_pass = true; - for cond in &automation.condition { - if !cond.evaluate(&eval_ctx).await { - conditions_pass = false; - break; - } - } - if !conditions_pass { + // Conditions (with template env wired in — HC-WS-07). + let eval_ctx = EvalContext::with_templates( + Arc::new(hc.states().clone()), + Arc::clone(&templates), + ); + if !conditions_pass(&automation, &eval_ctx).await { continue; } - // Execute actions in a spawned task (non-blocking) - let auto_clone = Arc::clone(&automation); - let hc_clone = hc.clone(); - tokio::spawn(async move { - let mut exec_ctx = - ExecutionContext::new(hc_clone, auto_clone.id.clone()); - for action in &auto_clone.action { - if let Err(e) = action.execute(&mut exec_ctx).await { - // P1: log errors to stderr; structured logging in P2 - eprintln!( - "[homecore-automation] action error in {}: {e}", - auto_clone.id - ); - break; - } - } - }); + spawn_run(&hc, automation, running); } } Err(broadcast::error::RecvError::Closed) => break, @@ -108,6 +159,156 @@ impl AutomationEngine { } }) } + + /// Wall-clock timer task: fires `time:` triggers (HC-WS-04). Ticks at + /// 1 Hz and runs each matching automation once when the local + /// wall-clock `HH:MM:SS` equals the trigger's `at`. The task exits + /// when the state-machine broadcast channel closes (engine teardown). + fn start_timer(&self) -> tokio::task::JoinHandle<()> { + let automations = Arc::clone(&self.automations); + let hc = self.hc.clone(); + let templates = Arc::clone(&self.templates); + // A receiver that lets the timer notice engine teardown. + let mut teardown_rx = self.hc.states().subscribe(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000)); + // Track the last second we fired, to fire once per match. + let mut last_fired_sec: Option = None; + loop { + tokio::select! { + _ = interval.tick() => { + let now = Local::now(); + let hhmmss = format!("{:02}:{:02}:{:02}", now.hour(), now.minute(), now.second()); + if last_fired_sec.as_deref() == Some(hhmmss.as_str()) { + continue; + } + let snapshot: Vec<(Arc, Arc)> = automations + .lock() + .unwrap() + .iter() + .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .collect(); + let mut fired_any = false; + for (automation, running) in snapshot { + if !automation.enabled { + continue; + } + let time_match = automation.trigger.iter().any(|t| match t { + Trigger::Time { at } => time_at_matches(at, &hhmmss), + _ => false, + }); + if !time_match { + continue; + } + let eval_ctx = EvalContext::with_templates( + Arc::new(hc.states().clone()), + Arc::clone(&templates), + ); + if !conditions_pass(&automation, &eval_ctx).await { + continue; + } + spawn_run(&hc, automation, running); + fired_any = true; + } + if fired_any { + last_fired_sec = Some(hhmmss); + } + } + r = teardown_rx.recv() => { + if let Err(broadcast::error::RecvError::Closed) = r { + break; + } + } + } + } + }) + } + + /// Manually fire any `time:` automations whose `at` equals `hhmmss` + /// (`"HH:MM:SS"`). Bypasses the 1 Hz clock so tests can assert the + /// time-trigger path deterministically without waiting for a + /// wall-clock second to roll over. Returns the number of automations + /// that fired (passed conditions and were spawned). + pub async fn fire_time_for_test(&self, hhmmss: &str) -> usize { + let snapshot: Vec<(Arc, Arc)> = self + .automations + .lock() + .unwrap() + .iter() + .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .collect(); + let mut fired = 0usize; + for (automation, running) in snapshot { + if !automation.enabled { + continue; + } + let time_match = automation.trigger.iter().any(|t| match t { + Trigger::Time { at } => time_at_matches(at, hhmmss), + _ => false, + }); + if !time_match { + continue; + } + let eval_ctx = self.eval_ctx(); + if !conditions_pass(&automation, &eval_ctx).await { + continue; + } + spawn_run(&self.hc, automation, running); + fired += 1; + } + fired + } +} + +/// Evaluate all of an automation's conditions (AND). Empty → pass. +async fn conditions_pass(automation: &Automation, eval_ctx: &EvalContext) -> bool { + for cond in &automation.condition { + if !cond.evaluate(eval_ctx).await { + return false; + } + } + true +} + +/// Does a `Time` trigger `at` value match the current `HH:MM:SS`? +/// Accepts `HH:MM` (matches at :00 seconds) and `HH:MM:SS`. +fn time_at_matches(at: &str, hhmmss: &str) -> bool { + let normalized = match at.matches(':').count() { + 1 => format!("{at}:00"), + _ => at.to_string(), + }; + normalized == hhmmss +} + +/// Spawn an automation run, honoring `RunMode::Single` re-entrancy +/// guard (HC-WS-05). For `Single`/`IgnoreFirst` modes a run already in +/// flight causes the new trigger to be skipped; the `running` flag is +/// cleared when the run finishes. +fn spawn_run(hc: &HomeCore, automation: Arc, running: Arc) { + let single = matches!(automation.mode, RunMode::Single | RunMode::IgnoreFirst); + if single { + // Try to claim the running slot; if already running, skip. + if running + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; + } + } + let hc_clone = hc.clone(); + tokio::spawn(async move { + let mut exec_ctx = ExecutionContext::new(hc_clone, automation.id.clone()); + for action in &automation.action { + if let Err(e) = action.execute(&mut exec_ctx).await { + eprintln!("[homecore-automation] action error in {}: {e}", automation.id); + break; + } + } + if single { + running.store(false, Ordering::SeqCst); + } + }); } #[cfg(test)] @@ -166,7 +367,6 @@ mod tests { let _handle = engine.start(); - // Fire a matching state change hc.states().set( EntityId::parse("switch.living").unwrap(), "on", @@ -174,7 +374,6 @@ mod tests { Context::new(), ); - // Give the async task time to run sleep(Duration::from_millis(50)).await; assert_eq!(log.lock().unwrap().len(), 1); @@ -203,7 +402,6 @@ mod tests { let _handle = engine.start(); - // Fire on a DIFFERENT entity hc.states().set( EntityId::parse("switch.bedroom").unwrap(), "on", @@ -249,4 +447,16 @@ mod tests { sleep(Duration::from_millis(50)).await; assert_eq!(log.lock().unwrap().len(), 0, "disabled automation should not fire"); } + + // Behavioral tests for the timer / run-mode / template paths + // (HC-WS-04/05/07) live in `tests/engine_behaviors.rs` to keep this + // file under the 500-line guideline; they use only the public API. + + #[test] + fn time_at_matches_handles_hh_mm_and_hh_mm_ss() { + assert!(time_at_matches("07:30", "07:30:00")); + assert!(time_at_matches("07:30:15", "07:30:15")); + assert!(!time_at_matches("07:30", "07:30:01")); + assert!(!time_at_matches("07:30:15", "07:30:16")); + } } diff --git a/v2/crates/homecore-automation/src/trigger.rs b/v2/crates/homecore-automation/src/trigger.rs index f1367957..3d11f156 100644 --- a/v2/crates/homecore-automation/src/trigger.rs +++ b/v2/crates/homecore-automation/src/trigger.rs @@ -150,7 +150,12 @@ impl Trigger { true } Trigger::Time { .. } => { - // Time triggers are evaluated by the engine's timer task, not here. + // Time triggers are wall-clock based and have no state-change + // context to match here. They are evaluated by the engine's + // 1 Hz timer task (`AutomationEngine::start_timer`, HC-WS-04 / + // ADR-161), which compares the trigger's `at` against the local + // wall-clock second. `matches_sync` therefore returns false for + // `Time` on the state-change path by design. false } Trigger::Event { event_type } => { diff --git a/v2/crates/homecore-automation/tests/engine_behaviors.rs b/v2/crates/homecore-automation/tests/engine_behaviors.rs new file mode 100644 index 00000000..01a7c7fa --- /dev/null +++ b/v2/crates/homecore-automation/tests/engine_behaviors.rs @@ -0,0 +1,259 @@ +//! Engine behavioral integration tests (ADR-161, HC-WS-04/05/07). +//! +//! These exercise the `AutomationEngine` runtime through its public API +//! only (extracted from the inline module to keep `engine.rs` under the +//! 500-line file guideline): +//! +//! - HC-WS-04 — `time:` triggers fire via the engine timer path. +//! - HC-WS-05 — `RunMode::Single` does not double-fire; `Parallel` does. +//! - HC-WS-07 — `template:` conditions evaluate against live state in the +//! engine path (no longer always-false). +//! +//! Each fails on the pre-fix engine (no timer task, unbounded-parallel +//! regardless of mode, `template_env: None`). + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; + +use homecore::service::FnHandler; +use homecore::{Context, EntityId, HomeCore, ServiceCall, ServiceName}; +use homecore_automation::{Action, Automation, AutomationEngine, Condition, RunMode, Trigger}; +use tokio::time::{sleep, Duration}; + +async fn register_recorder( + hc: &HomeCore, + domain: &str, + service: &str, +) -> Arc>> { + let log: Arc>> = Arc::new(Mutex::new(vec![])); + let log2 = Arc::clone(&log); + hc.services() + .register( + ServiceName::new(domain, service), + FnHandler(move |call: ServiceCall| { + let l = Arc::clone(&log2); + async move { + l.lock().unwrap().push(call.data.clone()); + Ok(serde_json::Value::Null) + } + }), + ) + .await; + log +} + +// ── HC-WS-04: time triggers fire ─────────────────────────────────── +#[tokio::test] +async fn time_trigger_fires_via_timer_path() { + let hc = HomeCore::new(); + let log = register_recorder(&hc, "light", "turn_on").await; + + let engine = AutomationEngine::new(hc.clone()); + engine.register(Automation::new( + "time_auto", + vec![Trigger::Time { at: "07:30:00".into() }], + vec![Action::ServiceCall { + domain: "light".into(), + service: "turn_on".into(), + data: serde_json::json!({"by": "time"}), + }], + )); + + // Deterministically fire the timer path for the matching second. + let fired = engine.fire_time_for_test("07:30:00").await; + assert_eq!(fired, 1, "time automation should fire for matching HH:MM:SS"); + sleep(Duration::from_millis(50)).await; + assert_eq!(log.lock().unwrap().len(), 1, "time trigger should run its action"); + + // A non-matching second must NOT fire. + let none = engine.fire_time_for_test("09:00:00").await; + assert_eq!(none, 0); +} + +// ── HC-WS-05: RunMode::Single does not double-fire ───────────────── +#[tokio::test] +async fn single_mode_does_not_double_fire_on_rapid_triggers() { + let hc = HomeCore::new(); + let count = Arc::new(AtomicUsize::new(0)); + let count2 = Arc::clone(&count); + hc.services() + .register( + ServiceName::new("light", "slow"), + FnHandler(move |_call: ServiceCall| { + let c = Arc::clone(&count2); + async move { + c.fetch_add(1, Ordering::SeqCst); + sleep(Duration::from_millis(200)).await; + Ok(serde_json::Value::Null) + } + }), + ) + .await; + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = Automation::new( + "single_auto", + vec![Trigger::State { + entity_id: EntityId::parse("switch.s").unwrap(), + from: None, + to: None, + }], + vec![Action::ServiceCall { + domain: "light".into(), + service: "slow".into(), + data: serde_json::json!({}), + }], + ); + auto.mode = RunMode::Single; + engine.register(auto); + let _handle = engine.start(); + + // Two rapid triggers while the first run is still sleeping. + hc.states().set(EntityId::parse("switch.s").unwrap(), "a", serde_json::json!({}), Context::new()); + sleep(Duration::from_millis(20)).await; + hc.states().set(EntityId::parse("switch.s").unwrap(), "b", serde_json::json!({}), Context::new()); + + sleep(Duration::from_millis(350)).await; + assert_eq!( + count.load(Ordering::SeqCst), + 1, + "Single-mode automation must not double-fire while already running" + ); +} + +#[tokio::test] +async fn parallel_mode_does_fire_concurrently() { + let hc = HomeCore::new(); + let count = Arc::new(AtomicUsize::new(0)); + let count2 = Arc::clone(&count); + hc.services() + .register( + ServiceName::new("light", "slow"), + FnHandler(move |_call: ServiceCall| { + let c = Arc::clone(&count2); + async move { + c.fetch_add(1, Ordering::SeqCst); + sleep(Duration::from_millis(150)).await; + Ok(serde_json::Value::Null) + } + }), + ) + .await; + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = Automation::new( + "parallel_auto", + vec![Trigger::State { + entity_id: EntityId::parse("switch.p").unwrap(), + from: None, + to: None, + }], + vec![Action::ServiceCall { + domain: "light".into(), + service: "slow".into(), + data: serde_json::json!({}), + }], + ); + auto.mode = RunMode::Parallel; + engine.register(auto); + let _handle = engine.start(); + + hc.states().set(EntityId::parse("switch.p").unwrap(), "a", serde_json::json!({}), Context::new()); + sleep(Duration::from_millis(20)).await; + hc.states().set(EntityId::parse("switch.p").unwrap(), "b", serde_json::json!({}), Context::new()); + + sleep(Duration::from_millis(300)).await; + assert_eq!( + count.load(Ordering::SeqCst), + 2, + "Parallel-mode automation should fire on every trigger" + ); +} + +// ── HC-WS-07: template conditions evaluate in the engine path ────── +#[tokio::test] +async fn template_condition_evaluates_true_in_engine() { + let hc = HomeCore::new(); + let log = register_recorder(&hc, "light", "turn_on").await; + + hc.states().set( + EntityId::parse("sensor.flag").unwrap(), + "on", + serde_json::json!({}), + Context::new(), + ); + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = Automation::new( + "tmpl_auto", + vec![Trigger::State { + entity_id: EntityId::parse("switch.trigger").unwrap(), + from: None, + to: None, + }], + vec![Action::ServiceCall { + domain: "light".into(), + service: "turn_on".into(), + data: serde_json::json!({}), + }], + ); + auto.condition = vec![Condition::Template { + value_template: "{{ is_state('sensor.flag', 'on') }}".into(), + }]; + engine.register(auto); + let _handle = engine.start(); + + hc.states().set( + EntityId::parse("switch.trigger").unwrap(), + "go", + serde_json::json!({}), + Context::new(), + ); + sleep(Duration::from_millis(50)).await; + assert_eq!( + log.lock().unwrap().len(), + 1, + "template condition should evaluate true and let the action run (HC-WS-07)" + ); +} + +#[tokio::test] +async fn template_condition_evaluates_false_blocks_action() { + let hc = HomeCore::new(); + let log = register_recorder(&hc, "light", "turn_on").await; + hc.states().set( + EntityId::parse("sensor.flag").unwrap(), + "off", + serde_json::json!({}), + Context::new(), + ); + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = Automation::new( + "tmpl_auto_false", + vec![Trigger::State { + entity_id: EntityId::parse("switch.trigger").unwrap(), + from: None, + to: None, + }], + vec![Action::ServiceCall { + domain: "light".into(), + service: "turn_on".into(), + data: serde_json::json!({}), + }], + ); + auto.condition = vec![Condition::Template { + value_template: "{{ is_state('sensor.flag', 'on') }}".into(), + }]; + engine.register(auto); + let _handle = engine.start(); + + hc.states().set( + EntityId::parse("switch.trigger").unwrap(), + "go", + serde_json::json!({}), + Context::new(), + ); + sleep(Duration::from_millis(50)).await; + assert_eq!(log.lock().unwrap().len(), 0, "false template condition should block the action"); +} diff --git a/v2/crates/homecore-plugins/src/manifest.rs b/v2/crates/homecore-plugins/src/manifest.rs index d1082a7d..4480fc4d 100644 --- a/v2/crates/homecore-plugins/src/manifest.rs +++ b/v2/crates/homecore-plugins/src/manifest.rs @@ -83,15 +83,26 @@ pub struct PluginManifest { #[serde(default, skip_serializing_if = "Option::is_none")] pub wasm_module: Option, - /// [HOMECORE] `sha256:` hash of the wasm binary; verified before execution. + /// [HOMECORE] `sha256:` hash of the wasm binary. + /// + /// **(P4 — not yet enforced, ADR-161/B5):** this field is parsed and + /// round-tripped but is NOT verified before execution. The hash/sig + /// gate lands in P4; until then the presence of this field implies no + /// integrity guarantee. #[serde(default, skip_serializing_if = "Option::is_none")] pub wasm_module_hash: Option, /// [HOMECORE] Ed25519 signature of the wasm binary hash (`ed25519:`). + /// + /// **(P4 — not yet enforced, ADR-161/B5):** parsed but never checked. + /// No signature verification happens before a plugin runs. #[serde(default, skip_serializing_if = "Option::is_none")] pub wasm_module_sig: Option, /// [HOMECORE] Ed25519 public key of the plugin publisher. + /// + /// **(P4 — not yet enforced, ADR-161/B5):** parsed but never used to + /// verify `wasm_module_sig`. #[serde(default, skip_serializing_if = "Option::is_none")] pub publisher_key: Option, diff --git a/v2/crates/homecore-server/src/main.rs b/v2/crates/homecore-server/src/main.rs index 05d3954e..383e8ddf 100644 --- a/v2/crates/homecore-server/src/main.rs +++ b/v2/crates/homecore-server/src/main.rs @@ -121,8 +121,21 @@ async fn main() -> Result<()> { let _ = plugin_registry; // wired-but-empty at boot; integrations register here // ── 4. Automation engine ──────────────────────────────────────── - let _automation_engine = AutomationEngine::new(hc.clone()); - info!("Automation engine ready (no automations loaded yet)"); + // Construct AND start the engine (HC-WS-03, ADR-161). `start()` + // spawns the state-change event loop + the 1 Hz wall-clock timer + // task so state/numeric/event AND time triggers all fire. The + // engine is kept alive for the process lifetime (it is moved into a + // long-lived binding); its background tasks run until the HomeCore + // broadcast channel closes at shutdown. No automations are loaded at + // boot yet (YAML loader is P-next); integrations register via + // `engine.register(..)`. + let automation_engine = AutomationEngine::new(hc.clone()); + let _automation_task = automation_engine.start(); + info!( + "Automation engine started ({} automations registered) — \ + state/numeric/event + time triggers active", + automation_engine.len() + ); // ── 5. Assist pipeline ────────────────────────────────────────── let recognizer = RegexIntentRecognizer::new();