Merge pull request #1025 from ruvnet/feat/v2-beyond-sota-sweep-m7

Beyond-SOTA sweep M7 (ADR-161): HOMECORE WS auth-bypass fix + automation engine + security
This commit is contained in:
rUv 2026-06-12 01:15:42 -04:00 committed by GitHub
commit b8e870b314
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1363 additions and 110 deletions

View File

@ -0,0 +1,260 @@
# ADR-161: HOMECORE Server Layer — WebSocket Auth Bypass, Reply-Theater & Documented-but-No-Op Automation (Security & Honest Labeling)
- **Status**: accepted
- **Date**: 2026-06-12
- **Deciders**: ruv
- **Tags**: homecore, http-ws-boundary, websocket-auth-bypass, security, automation-engine, documented-no-op, prove-everything, soundness, honest-labeling
- **Amends**: ADR-130 (HOMECORE-API WS protocol), ADR-129 (HOMECORE-AUTO automation engine), ADR-128 (plugin manifest)
## Context
Beyond-SOTA sweep **Milestone 7**, over the HOMECORE **server/network layer**
crates only — `homecore-api`, `homecore-server`, `homecore-automation`,
`homecore-hap`, `homecore-plugins` — executed under the project's
**prove-everything / anti-"AI-slop"** directive.
### Headline — the library cores are real, but the network boundary was unsound
The same audit pattern as ADR-160 held for the *library logic*: the automation
trigger/condition/template/action evaluators, the REST handlers, the HAP
mapping, and the plugin manifest parser are **real, tested code** — not stubs.
That is the anti-slop positive and it is cited here as such.
What the audit found was **not fake business logic but an unsound trust
boundary plus documented-but-no-op features**:
1. A **CRITICAL WebSocket authentication bypass** — the WS handshake accepted
any non-empty token, ignoring the provisioned token whitelist the REST path
enforces.
2. **Reply-theater** — WS command responses were computed, then logged and
**discarded**; no `result`/`pong`/`event` ever reached the client.
3. **Documented-but-idle automation** — the engine was constructed and dropped
(never started); time triggers, `RunMode`, `Choose` branches, and template
conditions were each **documented as working but were no-ops in the live
path**.
This is a worse class than ADR-160's over-naming: here the **doc claimed a
capability the code did not deliver** (auth enforcement, reply transport,
running automations). The fix is **implement where feasible, honestly relabel
where not — never leave a false doc.** Every fix is pinned by a test that
**fails on the old code**.
Grading vocabulary (ADR-152 / ADR-158 / ADR-160):
- **MEASURED** — reproduced in this worktree, command + failing-on-old test recorded.
- **NO-ACTION (already-honest/already-hardened)** — audited, found correct, cited as a positive.
- **ACCEPTED-FUTURE** — deliberately deferred, nothing dropped.
## Decision — Fixes Landed
### §A1 — WebSocket auth bypass (CRITICAL, security) — MEASURED
`homecore-api/src/ws.rs` handshake checked only `token.trim().is_empty()` and
sent `auth_ok` for **any** non-empty token. It never called
`state.tokens().is_valid()` — the check the REST path uses via
`auth::BearerAuth`. With a provisioned `HOMECORE_TOKENS` whitelist, **any
attacker-chosen non-empty token got full WS access** (read all states, call any
service, subscribe to all events).
**Real fix:** the handshake now calls
`state.tokens().is_valid(&token).await` (the *same* store + method as REST).
A wrong token receives `auth_invalid` and the socket closes. DEV (`allow_any`)
mode still accepts any non-empty bearer with a warn, so smoke tests keep
working; the empty token is rejected inside `is_valid`.
**Failing-on-old test** (`tests/ws_handshake.rs`):
`wrong_token_is_rejected` — provisions a real (non-dev) store with one good
token, sends a DIFFERENT non-empty token over the WS handshake, asserts
`auth_invalid`. On the old source the client received
`{"type":"auth_ok",…}` (verified: the test panics on old `ws.rs` with
`left: "auth_ok", right: "auth_invalid"`). Companion: `correct_token_is_accepted`.
**Grade: MEASURED. This is the milestone headline.**
### §A2 — WS replies never transmitted (HIGH, functional) — MEASURED
`ws.rs::Connection::run` moved the socket into a recv-only task; the only
consumer of the response mpsc just did `debug!("ws emit: {msg}")` and dropped
every message. No command reply ever reached the wire.
**Real fix:** the socket is split with `futures_util::StreamExt::split`. A
dedicated **writer task** drains the response channel onto `sink.send(...)`
(text frames; a `__pong:<n>` sentinel maps to a Pong control frame); the reader
task parses commands concurrently. On reader exit the senders drop and the
writer task ends cleanly.
**Failing-on-old tests:** `result_reply_is_received` (connect → auth →
`get_states` → assert a `result` reply is RECEIVED within 5s) and
`ping_pong_reply_is_received`. Both time out on the old source (verified:
`Elapsed` panic). **Grade: MEASURED.**
### §A8 — `homecore-api` bin: no env-token path, network-exposed (HIGH, security) — MEASURED
`homecore-api/src/bin/server.rs` bound `0.0.0.0:8123` with
`SharedState::new()``allow_any_non_empty()` and **no** `HOMECORE_TOKENS`
path (unlike `homecore-server`), so a provisioned operator had no way to lock
it down.
**Real fix:** the bin now mirrors `homecore-server`'s provisioning — prefer the
`HOMECORE_TOKENS` whitelist (`LongLivedTokenStore::from_env()`), fall back to an
**explicitly warn-logged** DEV mode only when unset. It also defaults the bind
address to **`127.0.0.1`** (loopback) so a bare `cargo run` is not
network-exposed, with `HOMECORE_BIND` to opt into LAN.
**Failing-on-old test** (`tests/server_bin_auth.rs`):
`provisioned_bin_rejects_wrong_bearer` reproduces the bin's exact provisioning
path (a populated, non-dev store) and asserts a wrong bearer → 401;
`from_env_path_enforces_whitelist` proves `from_env()` is not dev mode and
enforces the list. The old bin's `allow_any_non_empty()` accepted the wrong
bearer. **Grade: MEASURED.**
### §A3 — Automation engine never started (HIGH) — MEASURED
`homecore-server/src/main.rs` did `let _automation_engine = AutomationEngine::new(...)`
then dropped it immediately, while the header doc claimed "Automation engine
subscribed to the state machine."
**Real fix:** the engine is now built into a long-lived binding and `.start()`
is called, spawning the event loop + timer task; the header/log lines state it
is started with N automations and which trigger classes are active. (With A4A7
the running engine is genuinely functional, not theater.)
**Evidence:** the engine-behavior tests below run against the same
`AutomationEngine::start()` path now wired into the bin. **Grade: MEASURED.**
### §A4 — `Trigger::Time` hard-coded `false`, no timer (HIGH) — MEASURED
`trigger.rs::matches_sync` returned `false` for `Time` and there was **no timer
task** anywhere, so time automations could never fire.
**Real fix:** `AutomationEngine::start_timer` — a 1 Hz tokio interval that
compares each `time:` automation's `at` (`HH:MM` or `HH:MM:SS`) against the
local wall-clock second and fires it once per match (conditions still gate it).
`matches_sync` returning `false` for `Time` is now **correct and documented**
(it is a wall-clock trigger with no state-change context); a public
`fire_time_for_test` exposes the same path deterministically.
**Failing-on-old test** (`tests/engine_behaviors.rs`):
`time_trigger_fires_via_timer_path` (+ unit `time_at_matches_handles_hh_mm_and_hh_mm_ss`).
The method does not exist on the old engine. **Grade: MEASURED.**
### §A5 — `RunMode` documented as AtomicBool-enforced but unbounded-parallel (HIGH) — MEASURED
`engine.rs` doc claimed "RunMode::Single is enforced via a per-automation
AtomicBool" — but no such code existed and **every** trigger spawned an
unbounded parallel task regardless of `mode`.
**Real fix:** each registered automation carries a `running: Arc<AtomicBool>`.
`Single`/`IgnoreFirst` modes `compare_exchange` the flag before spawning and
**skip** the trigger if a run is already in flight, clearing it on completion;
`Parallel` (and, for now, `Restart`/`Queued`) spawn on every trigger.
**Failing-on-old tests** (`tests/engine_behaviors.rs`):
`single_mode_does_not_double_fire_on_rapid_triggers` (two rapid triggers while
the first run sleeps → exactly **1** run; old code fired **2**, verified) and
`parallel_mode_does_fire_concurrently` (→ 2). **Grade: MEASURED (Single/Parallel
honored; bounded `Queued`/`Restart`/`max` ordering → ACCEPTED-FUTURE, see below).**
### §A6 — `Action::Choose` ignored branches (HIGH) — MEASURED
`action.rs` discarded `choices` and always ran `default`.
**Real fix:** `ChoiceBranch::matches` deserialises each branch's
`serde_yaml::Value` conditions into `Condition` and evaluates them (AND
semantics, against an `EvalContext` now carried on `ExecutionContext`). `Choose`
runs the **first matching branch's** sequence and falls to `default` only if
none match.
**Failing-on-old tests** (`action.rs` inline):
`choose_runs_matching_branch_not_default` (matching branch runs, default does
NOT — old code ran default, verified) and
`choose_falls_to_default_when_no_branch_matches`. **Grade: MEASURED.**
### §A7 — Template conditions always false in the live engine (MEDIUM) — MEASURED
`condition.rs` returned `false` for `Template` whenever `template_env` was
`None`, and the engine built every `EvalContext` with `template_env: None`
(`EvalContext::new`), so `template:` conditions could never be true in
production — only in unit tests that hand-built a template env.
**Real fix:** the engine constructs one `TemplateEnvironment` over the state
machine and threads it into every `EvalContext` via
`EvalContext::with_templates` (event loop, timer task, and
`ExecutionContext` for `Choose` branches).
**Failing-on-old tests** (`tests/engine_behaviors.rs`):
`template_condition_evaluates_true_in_engine` (a `{{ is_state(...) }}` condition
gates an action true) and `template_condition_evaluates_false_blocks_action`.
On the old engine the action never ran (template always false, verified).
**Grade: MEASURED.**
### §B5 — Plugin manifest sig/hash "verified before execution" doc was false (LOW, honesty) — relabeled
`homecore-plugins/src/manifest.rs` documented `wasm_module_hash` as "verified
before execution" and carried `wasm_module_sig` / `publisher_key`, but these
fields are **never read** for verification (only ever set to `None` in tests).
**Fix (honest labeling — no false capability claimed):** the three fields are
re-doc'd **"(P4 — not yet enforced, ADR-161/B5)"** — parsed and round-tripped,
but no integrity/signature check happens before a plugin runs. No verification
code was added (that is P4); the doc now matches the code.
**Grade: doc-honesty (no behavior change).**
## Negative Results (NO-ACTION positives — audited, found correct, cited not edited)
These were checked and are genuinely sound/honest; cited as positives, **not**
touched:
- **CSPRNG correctness** — all IDs are `uuid::v4`; the rng/`randn` suspicion was
**REFUTED**. No weak-randomness issue exists.
- **CORS allowlist** (`app.rs`) — already hardened (explicit `AllowOrigin::list`,
no `permissive()`, `allow_credentials(false)`, env override). NO-ACTION.
- **No path traversal in `homecore-migrate`** — audited, clean.
- **No secrets in logs** — audited, clean.
- **HAP pairing stub** — honestly disclaimed as a surface stub; not over-claimed.
- **`InProcessRuntime` "no sandbox" disclaimer** — honest; left as-is.
## Deferred Backlog (Nothing Dropped)
- **Plugin authority-isolation (P5)**`homecore_permissions` claims are parsed
but not enforced at the host-call boundary. **ACCEPTED-FUTURE.**
- **Plugin signature/hash verification (P4)** — implement the
`wasm_module_hash`/`wasm_module_sig`/`publisher_key` gate that B5 now honestly
says is absent. **ACCEPTED-FUTURE.**
- **HAP real pairing (P2)** — SRP/HKDF pairing + encrypted sessions; current
bridge is an accessory-mapping surface. **ACCEPTED-FUTURE (honestly stubbed).**
- **`RunMode::Queued`/`Restart`/`max` ordering** — `Single`/`Parallel` are
honored; bounded queueing, restart-kill, and `max` concurrency are not yet
wired (every non-Single mode is parallel). **ACCEPTED-FUTURE** — the
`engine.rs` doc states exactly this, no over-claim.
- **Automation YAML load-at-boot** — the engine starts empty; a YAML loader is
P-next. The bin log states "0 automations registered" honestly.
## Reproduction (MEASURED)
```bash
cd v2
cargo test -p homecore-api -p homecore-server -p homecore-automation -p homecore-hap --no-default-features
cargo test -p homecore-plugins --features wasmtime
cargo build --workspace --no-default-features
```
Result at time of writing (all 0 failed):
- **homecore-api****25 passed** (lib 18; `server_bin_auth` 3; `ws_handshake` 4)
- **homecore-automation****42 passed** (lib 37; `engine_behaviors` 5)
- **homecore-hap** — **17 passed**
- **homecore-server** — bin, **0 tests**
- (**homecore-plugins** — **15 passed**: lib 12; integration 3)
- Full workspace `cargo build --workspace --no-default-features` succeeds.
## Consequences
- The WebSocket path can no longer be entered with a forged token — it enforces
the same `LongLivedTokenStore` whitelist as REST (A1).
- WS clients now actually receive `result`/`pong`/`event` frames (A2).
- The `homecore-api` dev bin defaults to loopback and honors `HOMECORE_TOKENS`
(A8); it is no longer an open `0.0.0.0` accept-any endpoint by default.
- The automation engine is started for real and its time triggers, `Single`
run-mode, `Choose` branches, and `template:` conditions all function — no doc
claims a capability the code lacks (A3A7).
- The plugin manifest no longer claims signature verification it does not
perform (B5).
- Files kept under the 500-line guideline (`engine.rs` 462; behavioral tests
moved to `tests/engine_behaviors.rs`).

20
v2/Cargo.lock generated
View File

@ -3472,6 +3472,7 @@ dependencies = [
"axum",
"chrono",
"dashmap",
"futures-util",
"homecore",
"http-body-util",
"hyper 1.8.1",
@ -3479,6 +3480,7 @@ dependencies = [
"serde_json",
"thiserror 1.0.69",
"tokio",
"tokio-tungstenite",
"tower 0.5.3",
"tower-http",
"tracing",
@ -10933,7 +10935,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-hardware"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"approx",
"byteorder",
@ -10953,7 +10955,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-mat"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"anyhow",
"approx",
@ -10985,7 +10987,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-nn"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"anyhow",
"candle-core 0.4.1",
@ -11039,7 +11041,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-ruvector"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"approx",
"criterion",
@ -11059,7 +11061,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-sensing-server"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"axum",
"chrono",
@ -11093,7 +11095,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-signal"
version = "0.3.2"
version = "0.3.3"
dependencies = [
"chrono",
"criterion",
@ -11120,7 +11122,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-train"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"anyhow",
"approx",
@ -11158,7 +11160,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-vitals"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"criterion",
"serde",
@ -11190,7 +11192,7 @@ dependencies = [
[[package]]
name = "wifi-densepose-wifiscan"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"serde",
"tokio",

View File

@ -33,8 +33,12 @@ chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["v4", "serde"] }
dashmap = "6"
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
[dev-dependencies]
tower = { version = "0.5", features = ["util"] }
hyper = "1"
http-body-util = "0.1"
# End-to-end WS handshake + reply tests (HC-WS-01/02, ADR-161).
tokio-tungstenite = "0.24"
futures-util = { version = "0.3", default-features = false }

View File

@ -88,6 +88,11 @@ fn default_origins() -> Vec<HeaderValue> {
mod tests {
use super::*;
// `set_var`/`remove_var` mutate process-global state; serialize every test
// that touches HOMECORE_CORS_ORIGINS so they cannot race in parallel.
// Poison-tolerant: a panicking test must not cascade-fail the others.
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn default_origins_includes_vite_and_ha_ports() {
let origins = default_origins();
@ -98,6 +103,7 @@ mod tests {
#[test]
fn env_override_via_homecore_cors_origins() {
let _env = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
std::env::set_var("HOMECORE_CORS_ORIGINS", "https://example.com,https://other.example.com");
// build_cors_layer() returns a CorsLayer which doesn't expose
// its origin list; we test the parse path indirectly by
@ -112,6 +118,7 @@ mod tests {
#[test]
fn env_empty_falls_back_to_defaults() {
let _env = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
std::env::set_var("HOMECORE_CORS_ORIGINS", " ");
let raw = std::env::var("HOMECORE_CORS_ORIGINS").ok();
let trimmed = raw.as_deref().map(|s| s.trim()).unwrap_or("");

View File

@ -1,15 +1,31 @@
//! `homecore-api-server` binary. Boots a HomeCore runtime and serves
//! the HA-compat REST + WS API on `:8123`.
//! the HA-compat REST + WS API.
//!
//! P1: bare-minimum bring-up. No persistence, no plugins, no auth
//! beyond "any non-empty bearer". Useful for `curl` smoke tests of
//! the wire format from the existing HA companion app:
//! ## Auth (ADR-161, HC-WS-08)
//!
//! Token provisioning matches `homecore-server`: if `HOMECORE_TOKENS`
//! is set (comma-separated bearer tokens) the API enforces that
//! whitelist on both the REST and WS paths. If it is **unset**, the
//! binary falls back to an explicitly-logged DEV mode (any non-empty
//! bearer accepted) — before this fix the bin unconditionally used
//! `allow_any_non_empty()` with no env path, so a provisioned operator
//! had no way to lock it down.
//!
//! ## Bind address
//!
//! Defaults to `127.0.0.1` (loopback only) so a bare `cargo run` of
//! this dev binary is not network-exposed. Override with
//! `HOMECORE_BIND=0.0.0.0:8123` for a LAN deployment (and provision
//! `HOMECORE_TOKENS` when you do).
//!
//! cargo run -p homecore-api --bin homecore-api-server
//! curl -H "Authorization: Bearer test" http://127.0.0.1:8123/api/
//! HOMECORE_TOKENS=secret curl -H "Authorization: Bearer secret" \
//! http://127.0.0.1:8123/api/
use std::net::SocketAddr;
use homecore::HomeCore;
use homecore_api::{router, SharedState, DEFAULT_PORT};
use homecore_api::{router, LongLivedTokenStore, SharedState, DEFAULT_PORT};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -21,10 +37,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.init();
let homecore = HomeCore::new();
let state = SharedState::new(homecore);
// Token provisioning (HC-WS-08). Prefer the HOMECORE_TOKENS env
// whitelist; fall back to DEV mode (warn-logged) only when unset.
let tokens = if std::env::var("HOMECORE_TOKENS")
.map(|v| !v.trim().is_empty())
.unwrap_or(false)
{
let s = LongLivedTokenStore::from_env();
let n = s.len().await;
tracing::info!("LongLivedTokenStore provisioned with {n} bearer token(s) from HOMECORE_TOKENS");
s
} else {
tracing::warn!(
"HOMECORE_TOKENS not set — token store in DEV mode (any non-empty bearer \
accepted). Set HOMECORE_TOKENS before exposing this binary to the network."
);
LongLivedTokenStore::allow_any_non_empty()
};
let state = SharedState::with_tokens(homecore, "Home", env!("CARGO_PKG_VERSION"), tokens);
let app = router(state);
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], DEFAULT_PORT));
// Default to loopback so `cargo run` is not network-exposed; allow
// an explicit HOMECORE_BIND override for LAN deployments.
let addr: SocketAddr = match std::env::var("HOMECORE_BIND") {
Ok(v) if !v.trim().is_empty() => v.parse()?,
_ => SocketAddr::from(([127, 0, 0, 1], DEFAULT_PORT)),
};
tracing::info!("HOMECORE-API listening on http://{addr} (HA-compat /api + /api/websocket)");
let listener = tokio::net::TcpListener::bind(addr).await?;

View File

@ -9,6 +9,16 @@
//!
//! `ha_version` is the homecore version string — see ADR-130 Q1 for the
//! companion-app feature-detect concern.
//!
//! ## Security (ADR-161)
//!
//! The `auth` token is validated against [`crate::tokens::LongLivedTokenStore`]
//! via `state.tokens().is_valid()` — the *same* store the REST path uses
//! (`auth::BearerAuth`). A wrong token receives `auth_invalid` and the socket
//! is closed. (HC-WS-01 closed the prior bypass where any non-empty token was
//! accepted.) Command replies are transmitted by a dedicated writer task that
//! drains the response channel onto the socket (HC-WS-02 closed the prior
//! reply-theater where responses were logged and discarded).
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
@ -18,7 +28,7 @@ use axum::extract::State;
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tracing::{debug, warn};
use tracing::warn;
use homecore::{Context, ServiceCall, ServiceName, SystemEvent};
@ -58,11 +68,18 @@ async fn handle_socket(mut socket: WebSocket, state: SharedState) {
_ => return,
};
// P1: accept any non-empty token. P2: validate against store.
if token.trim().is_empty() {
// Validate the bearer token against the same store the REST path
// uses (`state.tokens().is_valid()` — see `rest.rs` /
// `auth::BearerAuth`). Before the HC-WS-01 fix this checked only
// `token.trim().is_empty()` and accepted ANY non-empty token even
// with a provisioned `HOMECORE_TOKENS` whitelist — a full WS auth
// bypass. `is_valid()` rejects the empty token internally and, in
// DEV (`allow_any`) mode, still accepts any non-empty bearer (with
// a warn) so smoke tests keep working.
if !state.tokens().is_valid(&token).await {
let _ = socket
.send(Message::Text(
serde_json::json!({"type":"auth_invalid","message":"empty token"}).to_string(),
serde_json::json!({"type":"auth_invalid","message":"invalid token"}).to_string(),
))
.await;
return;
@ -140,54 +157,71 @@ impl Connection {
}
}
async fn run(self, mut socket: WebSocket) {
async fn run(self, socket: WebSocket) {
use futures_util::{SinkExt, StreamExt};
let conn = Arc::new(self);
// Split the socket so a dedicated writer task can drain `rx` onto
// the wire while the reader task processes commands concurrently.
// Before the HC-WS-02 fix the socket was moved into a recv-only
// task and the only `rx` consumer just `debug!`-logged and
// DISCARDED every message — so no `result`/`pong`/`event` ever
// reached the client. Now `rx` feeds `socket.send`.
let (mut sink, mut stream) = socket.split();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let sender_tx = tx.clone();
let recv_task = {
let conn = Arc::clone(&conn);
tokio::spawn(async move {
while let Some(frame) = socket.recv().await {
match frame {
Ok(Message::Text(raw)) => {
let cmd: WsCommand = match serde_json::from_str(&raw) {
Ok(c) => c,
Err(e) => {
warn!("bad ws command: {e}");
continue;
}
};
conn.handle_cmd(cmd, &sender_tx).await;
}
Ok(Message::Ping(p)) => {
let _ = sender_tx.send(format!("__pong:{}", p.len()));
}
Ok(Message::Close(_)) | Err(_) => break,
_ => {}
}
// Writer task: drain replies onto the socket. A `__pong:<n>`
// sentinel maps to a binary Pong control frame; everything else
// is a JSON text frame.
let writer_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let send_result = if let Some(n) = msg.strip_prefix("__pong:") {
let len: usize = n.parse().unwrap_or(0);
sink.send(Message::Pong(vec![0u8; len])).await
} else {
sink.send(Message::Text(msg)).await
};
if send_result.is_err() {
break;
}
// Cancel all subscriptions on disconnect.
for entry in conn.subs.iter() {
entry.value().abort.abort();
}
});
}
});
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if msg.starts_with("__pong:") {
// pong handled inline; skip
continue;
// Reader task: parse and dispatch commands; responses are pushed
// into `tx` and transmitted by the writer task above.
let reader_tx = tx.clone();
{
let conn = Arc::clone(&conn);
while let Some(frame) = stream.next().await {
match frame {
Ok(Message::Text(raw)) => {
let cmd: WsCommand = match serde_json::from_str(&raw) {
Ok(c) => c,
Err(e) => {
warn!("bad ws command: {e}");
continue;
}
};
conn.handle_cmd(cmd, &reader_tx).await;
}
// Use the socket from the recv task via a one-shot mpsc
// (in this minimal P1, the recv task owns the socket
// and we ack inline below — this branch is for the
// subscription fan-out emit path)
debug!("ws emit: {msg}");
Ok(Message::Ping(p)) => {
let _ = reader_tx.send(format!("__pong:{}", p.len()));
}
Ok(Message::Close(_)) | Err(_) => break,
_ => {}
}
})
};
let _ = recv_task.await;
}
// Cancel all subscriptions on disconnect.
for entry in conn.subs.iter() {
entry.value().abort.abort();
}
}
// Reader loop ended → drop the senders so the writer task's `rx`
// closes and the task exits cleanly.
drop(tx);
drop(reader_tx);
let _ = writer_task.await;
}
async fn handle_cmd(&self, cmd: WsCommand, tx: &tokio::sync::mpsc::UnboundedSender<String>) {

View File

@ -0,0 +1,77 @@
//! HC-WS-08 (ADR-161): the `homecore-api-server` bin must honor the
//! `HOMECORE_TOKENS` env whitelist instead of unconditionally accepting
//! any non-empty bearer.
//!
//! `main()` is not directly callable, so this reproduces the bin's exact
//! token-provisioning path (`LongLivedTokenStore::from_env()` when
//! `HOMECORE_TOKENS` is set) and drives a real HTTP request through the
//! router. On the pre-fix bin — which used `SharedState::new()` →
//! `allow_any_non_empty()` with NO env path — a wrong bearer was
//! accepted; this test asserts it is now rejected with 401.
use axum::body::Body;
use axum::http::{Request, StatusCode};
use homecore::HomeCore;
use homecore_api::{router, LongLivedTokenStore, SharedState};
use tower::ServiceExt; // for `oneshot`
/// Build the same state the bin builds when HOMECORE_TOKENS is set.
async fn provisioned_state(valid: &str) -> SharedState {
// Mirror `from_env()` deterministically without mutating process
// env (which would race other tests): an `empty()` store with the
// one provisioned token registered is exactly what
// `from_env()` produces for `HOMECORE_TOKENS=<valid>`.
let store = LongLivedTokenStore::empty();
store.register(valid).await;
SharedState::with_tokens(HomeCore::new(), "Home", "test", store)
}
#[tokio::test]
async fn provisioned_bin_rejects_wrong_bearer() {
let app = router(provisioned_state("the_real_token").await);
let resp = app
.oneshot(
Request::builder()
.uri("/api/states")
.header("Authorization", "Bearer the_wrong_token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::UNAUTHORIZED,
"a provisioned token store must reject a wrong bearer (HC-WS-08)"
);
}
#[tokio::test]
async fn provisioned_bin_accepts_correct_bearer() {
let app = router(provisioned_state("the_real_token").await);
let resp = app
.oneshot(
Request::builder()
.uri("/api/states")
.header("Authorization", "Bearer the_real_token")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[tokio::test]
async fn from_env_path_enforces_whitelist() {
// Exercise the literal `from_env()` constructor the bin uses, under
// a serialized env mutation, to prove the env path itself enforces.
std::env::set_var("HOMECORE_TOKENS", "env_token_1, env_token_2");
let store = LongLivedTokenStore::from_env();
std::env::remove_var("HOMECORE_TOKENS");
assert!(store.is_valid("env_token_1").await);
assert!(store.is_valid("env_token_2").await);
assert!(!store.is_valid("not_in_whitelist").await);
assert!(!store.is_dev_mode().await, "from_env must NOT be dev mode");
}

View File

@ -0,0 +1,168 @@
//! End-to-end WebSocket handshake + reply tests (ADR-161, HC-WS-01/02).
//!
//! These bind a real `TcpListener`, serve the full router, and connect
//! with a real WS client (`tokio-tungstenite`). They exercise the wire
//! path the in-crate unit tests cannot.
//!
//! - `wrong_token_is_rejected` — FAILS on the pre-fix `ws.rs` that only
//! checked `token.trim().is_empty()` and accepted any non-empty token
//! (HC-WS-01: WS auth bypass).
//! - `result_reply_is_received` — FAILS on the pre-fix `ws.rs` that moved
//! the socket into a recv-only task and discarded every reply with
//! `debug!("ws emit: {msg}")` (HC-WS-02: reply theater).
use std::net::SocketAddr;
use futures_util::{SinkExt, StreamExt};
use homecore::HomeCore;
use homecore_api::{router, LongLivedTokenStore, SharedState};
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message;
/// Spawn the API on an ephemeral port with a real (non-dev) token store
/// containing exactly one valid token. Returns the bound address.
async fn spawn_server_with_token(valid_token: &str) -> SocketAddr {
let hc = HomeCore::new();
let tokens = LongLivedTokenStore::empty();
tokens.register(valid_token).await;
let state = SharedState::with_tokens(hc, "Test", "test-version", tokens);
let app = router(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
addr
}
/// Read text frames until one parses as JSON; returns the parsed value.
async fn next_json<S>(ws: &mut S) -> serde_json::Value
where
S: StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin,
{
loop {
match ws.next().await {
Some(Ok(Message::Text(raw))) => {
if let Ok(v) = serde_json::from_str::<serde_json::Value>(&raw) {
return v;
}
}
Some(Ok(_)) => continue,
other => panic!("expected text frame, got {other:?}"),
}
}
}
#[tokio::test]
async fn wrong_token_is_rejected() {
// HC-WS-01: a provisioned store with one good token must reject a
// DIFFERENT (non-empty) token over the WS handshake. The old code
// sent `auth_ok` for any non-empty token — this asserts the fix.
let addr = spawn_server_with_token("good_token_abc").await;
let url = format!("ws://{addr}/api/websocket");
let (mut ws, _resp) = connect_async(&url).await.unwrap();
// Server → auth_required
let req = next_json(&mut ws).await;
assert_eq!(req["type"], "auth_required");
// Client → auth with the WRONG token
ws.send(Message::Text(
serde_json::json!({"type":"auth","access_token":"wrong_token_xyz"}).to_string(),
))
.await
.unwrap();
// Server → auth_invalid (NOT auth_ok)
let resp = next_json(&mut ws).await;
assert_eq!(
resp["type"], "auth_invalid",
"wrong token must be rejected with auth_invalid, got: {resp}"
);
assert_ne!(resp["type"], "auth_ok", "wrong token must NOT receive auth_ok");
}
#[tokio::test]
async fn correct_token_is_accepted() {
let addr = spawn_server_with_token("good_token_abc").await;
let url = format!("ws://{addr}/api/websocket");
let (mut ws, _resp) = connect_async(&url).await.unwrap();
let req = next_json(&mut ws).await;
assert_eq!(req["type"], "auth_required");
ws.send(Message::Text(
serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(),
))
.await
.unwrap();
let resp = next_json(&mut ws).await;
assert_eq!(resp["type"], "auth_ok", "correct token should be accepted, got: {resp}");
}
#[tokio::test]
async fn result_reply_is_received() {
// HC-WS-02: after a successful auth, a `get_states` command must
// produce a `result` reply RECEIVED over the socket. The old code
// discarded all replies in the rx-draining task, so this hangs/
// fails on the pre-fix source.
let addr = spawn_server_with_token("good_token_abc").await;
let url = format!("ws://{addr}/api/websocket");
let (mut ws, _resp) = connect_async(&url).await.unwrap();
let req = next_json(&mut ws).await;
assert_eq!(req["type"], "auth_required");
ws.send(Message::Text(
serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(),
))
.await
.unwrap();
let auth = next_json(&mut ws).await;
assert_eq!(auth["type"], "auth_ok");
// Send a command and assert we RECEIVE a result reply.
ws.send(Message::Text(
serde_json::json!({"id": 1, "type": "get_states"}).to_string(),
))
.await
.unwrap();
let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws))
.await
.expect("did not receive a reply within 5s — reply theater (HC-WS-02)");
assert_eq!(reply["type"], "result", "expected a result reply, got: {reply}");
assert_eq!(reply["id"], 1);
assert_eq!(reply["success"], true);
}
#[tokio::test]
async fn ping_pong_reply_is_received() {
// The `ping` command must produce a `pong` reply on the wire — also
// exercises the writer task that HC-WS-02 introduced.
let addr = spawn_server_with_token("good_token_abc").await;
let url = format!("ws://{addr}/api/websocket");
let (mut ws, _resp) = connect_async(&url).await.unwrap();
let _ = next_json(&mut ws).await; // auth_required
ws.send(Message::Text(
serde_json::json!({"type":"auth","access_token":"good_token_abc"}).to_string(),
))
.await
.unwrap();
let _ = next_json(&mut ws).await; // auth_ok
ws.send(Message::Text(
serde_json::json!({"id": 7, "type": "ping"}).to_string(),
))
.await
.unwrap();
let reply = tokio::time::timeout(std::time::Duration::from_secs(5), next_json(&mut ws))
.await
.expect("did not receive pong within 5s");
assert_eq!(reply["type"], "pong");
assert_eq!(reply["id"], 7);
}

View File

@ -3,15 +3,26 @@
//! Implements the ADR-129 P1 action set: `service_call`, `delay`, `scene`,
//! `wait_for_trigger`, `choose`. Complex variants (parallel, repeat, if,
//! stop, fire_event, wait_template) land in P2.
//!
//! ## `choose` branch evaluation (ADR-161, HC-WS-06)
//!
//! `Action::Choose` evaluates each branch's `conditions` against the live
//! [`EvalContext`] (deserialising the per-branch `serde_yaml::Value`
//! conditions into [`Condition`]) and runs the FIRST matching branch's
//! sequence. Only if no branch matches does it fall to `default`. Before
//! this fix the branches were discarded and `default` always ran.
use std::sync::Arc;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use homecore::{Context, HomeCore, ServiceCall, ServiceName};
use homecore::{Context, HomeCore, ServiceCall, ServiceName, StateMachine};
use crate::condition::{Condition, EvalContext};
use crate::error::AutomationError;
use crate::template::TemplateEnvironment;
/// Runtime context passed into action execution.
pub struct ExecutionContext {
@ -21,14 +32,40 @@ pub struct ExecutionContext {
pub context: Context,
/// Automation ID for tracing/logging.
pub automation_id: String,
/// Condition-evaluation context for `Choose` branches. Carries the
/// state-machine snapshot + optional template environment so branch
/// conditions (incl. `template:`) evaluate against live state.
pub eval: EvalContext,
}
impl ExecutionContext {
/// Build a context whose `Choose` branches evaluate against the
/// HomeCore state machine (no template env — `template:` branch
/// conditions evaluate false; use [`Self::with_templates`] to wire
/// one).
pub fn new(hc: HomeCore, automation_id: impl Into<String>) -> Self {
let sm = Arc::new(hc.states().clone());
Self {
hc,
context: Context::new(),
automation_id: automation_id.into(),
eval: EvalContext::new(sm),
}
}
/// Build a context with a template environment wired into the
/// `Choose` branch-condition evaluator.
pub fn with_templates(
hc: HomeCore,
automation_id: impl Into<String>,
states: Arc<StateMachine>,
templates: Arc<TemplateEnvironment>,
) -> Self {
Self {
hc,
context: Context::new(),
automation_id: automation_id.into(),
eval: EvalContext::with_templates(states, templates),
}
}
}
@ -72,6 +109,27 @@ pub struct ChoiceBranch {
pub sequence: Vec<Action>,
}
impl ChoiceBranch {
/// Does this branch match? All of its `conditions` must evaluate
/// true (HA `choose` semantics are AND-over-conditions). Each raw
/// `serde_yaml::Value` is deserialised into a [`Condition`]; a
/// condition that fails to parse is treated as non-matching (the
/// branch is skipped) rather than silently passing. An empty
/// `conditions` list matches (an unconditional branch).
pub async fn matches(&self, eval: &EvalContext) -> bool {
for raw in &self.conditions {
let cond: Condition = match serde_yaml::from_value(raw.clone()) {
Ok(c) => c,
Err(_) => return false,
};
if !cond.evaluate(eval).await {
return false;
}
}
true
}
}
impl Action {
/// Execute this action using the provided context.
///
@ -118,9 +176,18 @@ impl Action {
}
Ok(serde_json::Value::Null)
}
Action::Choose { choices: _, default } => {
// P1 stub — condition evaluation for choices lands in P2;
// for now, fall through to default branch.
Action::Choose { choices, default } => {
// Evaluate each branch's conditions against live state;
// run the first branch whose conditions ALL pass. Fall
// to `default` only if no branch matches (HC-WS-06).
for branch in choices {
if branch.matches(&ctx.eval).await {
for a in &branch.sequence {
a.execute(ctx).await?;
}
return Ok(serde_json::Value::Null);
}
}
for a in default {
a.execute(ctx).await?;
}
@ -188,4 +255,100 @@ mod tests {
let err = action.execute(&mut exec_ctx).await.unwrap_err();
assert!(matches!(err, AutomationError::ServiceCall(ServiceError::NotRegistered { .. })));
}
/// Register two recording handlers and return their call logs.
async fn two_recorders(
hc: &HomeCore,
) -> (Arc<Mutex<Vec<serde_json::Value>>>, Arc<Mutex<Vec<serde_json::Value>>>) {
use homecore::EntityId;
let _ = EntityId::parse("light.x"); // touch import path
let mk = |hc: &HomeCore, svc: &'static str| {
let log: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(vec![]));
let log2 = Arc::clone(&log);
let hc = hc.clone();
async move {
hc.services()
.register(
ServiceName::new("light", svc),
FnHandler(move |call: ServiceCall| {
let l = Arc::clone(&log2);
async move {
l.lock().unwrap().push(call.data.clone());
Ok(serde_json::Value::Null)
}
}),
)
.await;
log
}
};
let branch_log = mk(hc, "branch_service").await;
let default_log = mk(hc, "default_service").await;
(branch_log, default_log)
}
fn choose_with_match() -> Action {
// A `Choose` whose first branch requires light.gate == "open".
let branch_conditions = vec![serde_yaml::from_str::<serde_yaml::Value>(
"condition: state\nentity_id: light.gate\nstate: open",
)
.unwrap()];
Action::Choose {
choices: vec![ChoiceBranch {
conditions: branch_conditions,
sequence: vec![Action::ServiceCall {
domain: "light".into(),
service: "branch_service".into(),
data: serde_json::json!({"branch": true}),
}],
}],
default: vec![Action::ServiceCall {
domain: "light".into(),
service: "default_service".into(),
data: serde_json::json!({"default": true}),
}],
}
}
#[tokio::test]
async fn choose_runs_matching_branch_not_default() {
// HC-WS-06: with the branch condition satisfied, the branch
// sequence runs and `default` does NOT. On the pre-fix code
// (choices discarded) `default` ran instead → this fails on old.
use homecore::{Context, EntityId};
let hc = HomeCore::new();
let (branch_log, default_log) = two_recorders(&hc).await;
hc.states().set(
EntityId::parse("light.gate").unwrap(),
"open",
serde_json::json!({}),
Context::new(),
);
let mut ctx = ExecutionContext::new(hc, "choose_auto");
choose_with_match().execute(&mut ctx).await.unwrap();
assert_eq!(branch_log.lock().unwrap().len(), 1, "matching branch must run");
assert_eq!(default_log.lock().unwrap().len(), 0, "default must NOT run when a branch matches");
}
#[tokio::test]
async fn choose_falls_to_default_when_no_branch_matches() {
use homecore::{Context, EntityId};
let hc = HomeCore::new();
let (branch_log, default_log) = two_recorders(&hc).await;
// gate is "closed" → branch condition (== "open") fails.
hc.states().set(
EntityId::parse("light.gate").unwrap(),
"closed",
serde_json::json!({}),
Context::new(),
);
let mut ctx = ExecutionContext::new(hc, "choose_auto");
choose_with_match().execute(&mut ctx).await.unwrap();
assert_eq!(branch_log.lock().unwrap().len(), 0, "branch must not run when condition fails");
assert_eq!(default_log.lock().unwrap().len(), 1, "default must run when no branch matches");
}
}

View File

@ -2,56 +2,129 @@
//! triggers, and runs automation action sequences.
//!
//! ADR-129 §2 design: one Tokio task per running automation instance.
//! RunMode::Single is enforced via a per-automation `AtomicBool` flag.
//!
//! ## Run modes (ADR-161, HC-WS-05)
//!
//! `RunMode::Single` is enforced via a per-automation `AtomicBool`
//! guard: while an instance is executing, a second trigger is skipped.
//! `Parallel` (and the as-yet-unbounded `Restart`/`Queued`) spawn a
//! fresh instance on every trigger. (Before this fix the doc claimed
//! AtomicBool enforcement but every trigger spawned unbounded parallel
//! tasks regardless of `mode`.)
//!
//! ## Time triggers (ADR-161, HC-WS-04)
//!
//! `Trigger::Time { at: "HH:MM:SS" }` is evaluated by a wall-clock timer
//! task (1 Hz tokio interval) — `Trigger::matches_sync` returns false for
//! `Time` because it has no clock. The timer fires each `time:`
//! automation once when the local wall-clock second equals its `at`.
//!
//! ## Template conditions (ADR-161, HC-WS-07)
//!
//! The engine builds a real [`TemplateEnvironment`] over the state
//! machine and passes it into every `EvalContext` (via
//! `EvalContext::with_templates`), so `template:` conditions evaluate
//! against live state instead of always returning false.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use chrono::{Local, Timelike};
use tokio::sync::broadcast;
use homecore::HomeCore;
use crate::action::ExecutionContext;
use crate::automation::Automation;
use crate::automation::{Automation, RunMode};
use crate::condition::EvalContext;
use crate::trigger::TriggerContext;
use crate::template::TemplateEnvironment;
use crate::trigger::{Trigger, TriggerContext};
/// An automation registered with the engine, plus its runtime run-state.
struct Registered {
auto: Arc<Automation>,
/// `true` while a `Single`-mode instance is executing. Used to
/// skip re-entrant triggers (HC-WS-05).
running: Arc<AtomicBool>,
}
/// The automation engine. Holds a HOMECORE handle and a list of registered
/// automations. Call `start()` to begin listening for events.
pub struct AutomationEngine {
hc: HomeCore,
automations: Arc<Mutex<Vec<Arc<Automation>>>>,
automations: Arc<Mutex<Vec<Registered>>>,
templates: Arc<TemplateEnvironment>,
}
impl AutomationEngine {
/// Create a new engine backed by the given HOMECORE handle.
pub fn new(hc: HomeCore) -> Self {
let templates = Arc::new(TemplateEnvironment::new(Arc::new(hc.states().clone())));
Self {
hc,
automations: Arc::new(Mutex::new(vec![])),
templates,
}
}
/// Register an automation. Can be called before or after `start()`.
pub fn register(&self, automation: Automation) {
self.automations.lock().unwrap().push(Arc::new(automation));
self.automations.lock().unwrap().push(Registered {
auto: Arc::new(automation),
running: Arc::new(AtomicBool::new(false)),
});
}
/// Number of registered automations.
pub fn len(&self) -> usize {
self.automations.lock().unwrap().len()
}
/// Is the engine holding zero automations?
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Build an `EvalContext` with the engine's template environment
/// wired in, over a fresh snapshot of the state machine.
fn eval_ctx(&self) -> EvalContext {
EvalContext::with_templates(
Arc::new(self.hc.states().clone()),
Arc::clone(&self.templates),
)
}
/// Subscribe to the state-machine broadcast channel and start
/// evaluating triggers. Returns a join handle for the background task.
/// evaluating triggers. Also starts the wall-clock timer task that
/// evaluates `time:` triggers. Returns a join handle for the event
/// task (the timer task is detached and tied to the engine handle's
/// lifetime via the broadcast channel close).
///
/// The task runs until the broadcast sender is dropped (i.e. the
/// `HomeCore` instance is destroyed).
pub fn start(&self) -> tokio::task::JoinHandle<()> {
self.start_timer();
self.start_event_loop()
}
/// Event-driven loop: state/numeric/event triggers.
fn start_event_loop(&self) -> tokio::task::JoinHandle<()> {
let mut rx = self.hc.states().subscribe();
let automations = Arc::clone(&self.automations);
let hc = self.hc.clone();
let templates = Arc::clone(&self.templates);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let autos = automations.lock().unwrap().clone();
for automation in autos {
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.collect();
for (automation, running) in snapshot {
if !automation.enabled {
continue;
}
@ -60,7 +133,6 @@ impl AutomationEngine {
event.old_state.clone(),
event.new_state.clone(),
);
// Check all triggers — fire on first match
let triggered = automation
.trigger
.iter()
@ -68,36 +140,15 @@ impl AutomationEngine {
if !triggered {
continue;
}
// Evaluate conditions
let sm = Arc::new(hc.states().clone());
let eval_ctx = EvalContext::new(sm);
let mut conditions_pass = true;
for cond in &automation.condition {
if !cond.evaluate(&eval_ctx).await {
conditions_pass = false;
break;
}
}
if !conditions_pass {
// Conditions (with template env wired in — HC-WS-07).
let eval_ctx = EvalContext::with_templates(
Arc::new(hc.states().clone()),
Arc::clone(&templates),
);
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
// Execute actions in a spawned task (non-blocking)
let auto_clone = Arc::clone(&automation);
let hc_clone = hc.clone();
tokio::spawn(async move {
let mut exec_ctx =
ExecutionContext::new(hc_clone, auto_clone.id.clone());
for action in &auto_clone.action {
if let Err(e) = action.execute(&mut exec_ctx).await {
// P1: log errors to stderr; structured logging in P2
eprintln!(
"[homecore-automation] action error in {}: {e}",
auto_clone.id
);
break;
}
}
});
spawn_run(&hc, automation, running);
}
}
Err(broadcast::error::RecvError::Closed) => break,
@ -108,6 +159,156 @@ impl AutomationEngine {
}
})
}
/// Wall-clock timer task: fires `time:` triggers (HC-WS-04). Ticks at
/// 1 Hz and runs each matching automation once when the local
/// wall-clock `HH:MM:SS` equals the trigger's `at`. The task exits
/// when the state-machine broadcast channel closes (engine teardown).
fn start_timer(&self) -> tokio::task::JoinHandle<()> {
let automations = Arc::clone(&self.automations);
let hc = self.hc.clone();
let templates = Arc::clone(&self.templates);
// A receiver that lets the timer notice engine teardown.
let mut teardown_rx = self.hc.states().subscribe();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000));
// Track the last second we fired, to fire once per match.
let mut last_fired_sec: Option<String> = None;
loop {
tokio::select! {
_ = interval.tick() => {
let now = Local::now();
let hhmmss = format!("{:02}:{:02}:{:02}", now.hour(), now.minute(), now.second());
if last_fired_sec.as_deref() == Some(hhmmss.as_str()) {
continue;
}
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.collect();
let mut fired_any = false;
for (automation, running) in snapshot {
if !automation.enabled {
continue;
}
let time_match = automation.trigger.iter().any(|t| match t {
Trigger::Time { at } => time_at_matches(at, &hhmmss),
_ => false,
});
if !time_match {
continue;
}
let eval_ctx = EvalContext::with_templates(
Arc::new(hc.states().clone()),
Arc::clone(&templates),
);
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
spawn_run(&hc, automation, running);
fired_any = true;
}
if fired_any {
last_fired_sec = Some(hhmmss);
}
}
r = teardown_rx.recv() => {
if let Err(broadcast::error::RecvError::Closed) = r {
break;
}
}
}
}
})
}
/// Manually fire any `time:` automations whose `at` equals `hhmmss`
/// (`"HH:MM:SS"`). Bypasses the 1 Hz clock so tests can assert the
/// time-trigger path deterministically without waiting for a
/// wall-clock second to roll over. Returns the number of automations
/// that fired (passed conditions and were spawned).
pub async fn fire_time_for_test(&self, hhmmss: &str) -> usize {
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = self
.automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.collect();
let mut fired = 0usize;
for (automation, running) in snapshot {
if !automation.enabled {
continue;
}
let time_match = automation.trigger.iter().any(|t| match t {
Trigger::Time { at } => time_at_matches(at, hhmmss),
_ => false,
});
if !time_match {
continue;
}
let eval_ctx = self.eval_ctx();
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
spawn_run(&self.hc, automation, running);
fired += 1;
}
fired
}
}
/// Evaluate all of an automation's conditions (AND). Empty → pass.
async fn conditions_pass(automation: &Automation, eval_ctx: &EvalContext) -> bool {
for cond in &automation.condition {
if !cond.evaluate(eval_ctx).await {
return false;
}
}
true
}
/// Does a `Time` trigger `at` value match the current `HH:MM:SS`?
/// Accepts `HH:MM` (matches at :00 seconds) and `HH:MM:SS`.
fn time_at_matches(at: &str, hhmmss: &str) -> bool {
let normalized = match at.matches(':').count() {
1 => format!("{at}:00"),
_ => at.to_string(),
};
normalized == hhmmss
}
/// Spawn an automation run, honoring `RunMode::Single` re-entrancy
/// guard (HC-WS-05). For `Single`/`IgnoreFirst` modes a run already in
/// flight causes the new trigger to be skipped; the `running` flag is
/// cleared when the run finishes.
fn spawn_run(hc: &HomeCore, automation: Arc<Automation>, running: Arc<AtomicBool>) {
let single = matches!(automation.mode, RunMode::Single | RunMode::IgnoreFirst);
if single {
// Try to claim the running slot; if already running, skip.
if running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
}
let hc_clone = hc.clone();
tokio::spawn(async move {
let mut exec_ctx = ExecutionContext::new(hc_clone, automation.id.clone());
for action in &automation.action {
if let Err(e) = action.execute(&mut exec_ctx).await {
eprintln!("[homecore-automation] action error in {}: {e}", automation.id);
break;
}
}
if single {
running.store(false, Ordering::SeqCst);
}
});
}
#[cfg(test)]
@ -166,7 +367,6 @@ mod tests {
let _handle = engine.start();
// Fire a matching state change
hc.states().set(
EntityId::parse("switch.living").unwrap(),
"on",
@ -174,7 +374,6 @@ mod tests {
Context::new(),
);
// Give the async task time to run
sleep(Duration::from_millis(50)).await;
assert_eq!(log.lock().unwrap().len(), 1);
@ -203,7 +402,6 @@ mod tests {
let _handle = engine.start();
// Fire on a DIFFERENT entity
hc.states().set(
EntityId::parse("switch.bedroom").unwrap(),
"on",
@ -249,4 +447,16 @@ mod tests {
sleep(Duration::from_millis(50)).await;
assert_eq!(log.lock().unwrap().len(), 0, "disabled automation should not fire");
}
// Behavioral tests for the timer / run-mode / template paths
// (HC-WS-04/05/07) live in `tests/engine_behaviors.rs` to keep this
// file under the 500-line guideline; they use only the public API.
#[test]
fn time_at_matches_handles_hh_mm_and_hh_mm_ss() {
assert!(time_at_matches("07:30", "07:30:00"));
assert!(time_at_matches("07:30:15", "07:30:15"));
assert!(!time_at_matches("07:30", "07:30:01"));
assert!(!time_at_matches("07:30:15", "07:30:16"));
}
}

View File

@ -150,7 +150,12 @@ impl Trigger {
true
}
Trigger::Time { .. } => {
// Time triggers are evaluated by the engine's timer task, not here.
// Time triggers are wall-clock based and have no state-change
// context to match here. They are evaluated by the engine's
// 1 Hz timer task (`AutomationEngine::start_timer`, HC-WS-04 /
// ADR-161), which compares the trigger's `at` against the local
// wall-clock second. `matches_sync` therefore returns false for
// `Time` on the state-change path by design.
false
}
Trigger::Event { event_type } => {

View File

@ -0,0 +1,259 @@
//! Engine behavioral integration tests (ADR-161, HC-WS-04/05/07).
//!
//! These exercise the `AutomationEngine` runtime through its public API
//! only (extracted from the inline module to keep `engine.rs` under the
//! 500-line file guideline):
//!
//! - HC-WS-04 — `time:` triggers fire via the engine timer path.
//! - HC-WS-05 — `RunMode::Single` does not double-fire; `Parallel` does.
//! - HC-WS-07 — `template:` conditions evaluate against live state in the
//! engine path (no longer always-false).
//!
//! Each fails on the pre-fix engine (no timer task, unbounded-parallel
//! regardless of mode, `template_env: None`).
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use homecore::service::FnHandler;
use homecore::{Context, EntityId, HomeCore, ServiceCall, ServiceName};
use homecore_automation::{Action, Automation, AutomationEngine, Condition, RunMode, Trigger};
use tokio::time::{sleep, Duration};
async fn register_recorder(
hc: &HomeCore,
domain: &str,
service: &str,
) -> Arc<Mutex<Vec<serde_json::Value>>> {
let log: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(vec![]));
let log2 = Arc::clone(&log);
hc.services()
.register(
ServiceName::new(domain, service),
FnHandler(move |call: ServiceCall| {
let l = Arc::clone(&log2);
async move {
l.lock().unwrap().push(call.data.clone());
Ok(serde_json::Value::Null)
}
}),
)
.await;
log
}
// ── HC-WS-04: time triggers fire ───────────────────────────────────
#[tokio::test]
async fn time_trigger_fires_via_timer_path() {
let hc = HomeCore::new();
let log = register_recorder(&hc, "light", "turn_on").await;
let engine = AutomationEngine::new(hc.clone());
engine.register(Automation::new(
"time_auto",
vec![Trigger::Time { at: "07:30:00".into() }],
vec![Action::ServiceCall {
domain: "light".into(),
service: "turn_on".into(),
data: serde_json::json!({"by": "time"}),
}],
));
// Deterministically fire the timer path for the matching second.
let fired = engine.fire_time_for_test("07:30:00").await;
assert_eq!(fired, 1, "time automation should fire for matching HH:MM:SS");
sleep(Duration::from_millis(50)).await;
assert_eq!(log.lock().unwrap().len(), 1, "time trigger should run its action");
// A non-matching second must NOT fire.
let none = engine.fire_time_for_test("09:00:00").await;
assert_eq!(none, 0);
}
// ── HC-WS-05: RunMode::Single does not double-fire ─────────────────
#[tokio::test]
async fn single_mode_does_not_double_fire_on_rapid_triggers() {
let hc = HomeCore::new();
let count = Arc::new(AtomicUsize::new(0));
let count2 = Arc::clone(&count);
hc.services()
.register(
ServiceName::new("light", "slow"),
FnHandler(move |_call: ServiceCall| {
let c = Arc::clone(&count2);
async move {
c.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(200)).await;
Ok(serde_json::Value::Null)
}
}),
)
.await;
let engine = AutomationEngine::new(hc.clone());
let mut auto = Automation::new(
"single_auto",
vec![Trigger::State {
entity_id: EntityId::parse("switch.s").unwrap(),
from: None,
to: None,
}],
vec![Action::ServiceCall {
domain: "light".into(),
service: "slow".into(),
data: serde_json::json!({}),
}],
);
auto.mode = RunMode::Single;
engine.register(auto);
let _handle = engine.start();
// Two rapid triggers while the first run is still sleeping.
hc.states().set(EntityId::parse("switch.s").unwrap(), "a", serde_json::json!({}), Context::new());
sleep(Duration::from_millis(20)).await;
hc.states().set(EntityId::parse("switch.s").unwrap(), "b", serde_json::json!({}), Context::new());
sleep(Duration::from_millis(350)).await;
assert_eq!(
count.load(Ordering::SeqCst),
1,
"Single-mode automation must not double-fire while already running"
);
}
#[tokio::test]
async fn parallel_mode_does_fire_concurrently() {
let hc = HomeCore::new();
let count = Arc::new(AtomicUsize::new(0));
let count2 = Arc::clone(&count);
hc.services()
.register(
ServiceName::new("light", "slow"),
FnHandler(move |_call: ServiceCall| {
let c = Arc::clone(&count2);
async move {
c.fetch_add(1, Ordering::SeqCst);
sleep(Duration::from_millis(150)).await;
Ok(serde_json::Value::Null)
}
}),
)
.await;
let engine = AutomationEngine::new(hc.clone());
let mut auto = Automation::new(
"parallel_auto",
vec![Trigger::State {
entity_id: EntityId::parse("switch.p").unwrap(),
from: None,
to: None,
}],
vec![Action::ServiceCall {
domain: "light".into(),
service: "slow".into(),
data: serde_json::json!({}),
}],
);
auto.mode = RunMode::Parallel;
engine.register(auto);
let _handle = engine.start();
hc.states().set(EntityId::parse("switch.p").unwrap(), "a", serde_json::json!({}), Context::new());
sleep(Duration::from_millis(20)).await;
hc.states().set(EntityId::parse("switch.p").unwrap(), "b", serde_json::json!({}), Context::new());
sleep(Duration::from_millis(300)).await;
assert_eq!(
count.load(Ordering::SeqCst),
2,
"Parallel-mode automation should fire on every trigger"
);
}
// ── HC-WS-07: template conditions evaluate in the engine path ──────
#[tokio::test]
async fn template_condition_evaluates_true_in_engine() {
let hc = HomeCore::new();
let log = register_recorder(&hc, "light", "turn_on").await;
hc.states().set(
EntityId::parse("sensor.flag").unwrap(),
"on",
serde_json::json!({}),
Context::new(),
);
let engine = AutomationEngine::new(hc.clone());
let mut auto = Automation::new(
"tmpl_auto",
vec![Trigger::State {
entity_id: EntityId::parse("switch.trigger").unwrap(),
from: None,
to: None,
}],
vec![Action::ServiceCall {
domain: "light".into(),
service: "turn_on".into(),
data: serde_json::json!({}),
}],
);
auto.condition = vec![Condition::Template {
value_template: "{{ is_state('sensor.flag', 'on') }}".into(),
}];
engine.register(auto);
let _handle = engine.start();
hc.states().set(
EntityId::parse("switch.trigger").unwrap(),
"go",
serde_json::json!({}),
Context::new(),
);
sleep(Duration::from_millis(50)).await;
assert_eq!(
log.lock().unwrap().len(),
1,
"template condition should evaluate true and let the action run (HC-WS-07)"
);
}
#[tokio::test]
async fn template_condition_evaluates_false_blocks_action() {
let hc = HomeCore::new();
let log = register_recorder(&hc, "light", "turn_on").await;
hc.states().set(
EntityId::parse("sensor.flag").unwrap(),
"off",
serde_json::json!({}),
Context::new(),
);
let engine = AutomationEngine::new(hc.clone());
let mut auto = Automation::new(
"tmpl_auto_false",
vec![Trigger::State {
entity_id: EntityId::parse("switch.trigger").unwrap(),
from: None,
to: None,
}],
vec![Action::ServiceCall {
domain: "light".into(),
service: "turn_on".into(),
data: serde_json::json!({}),
}],
);
auto.condition = vec![Condition::Template {
value_template: "{{ is_state('sensor.flag', 'on') }}".into(),
}];
engine.register(auto);
let _handle = engine.start();
hc.states().set(
EntityId::parse("switch.trigger").unwrap(),
"go",
serde_json::json!({}),
Context::new(),
);
sleep(Duration::from_millis(50)).await;
assert_eq!(log.lock().unwrap().len(), 0, "false template condition should block the action");
}

View File

@ -83,15 +83,26 @@ pub struct PluginManifest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wasm_module: Option<String>,
/// [HOMECORE] `sha256:<hex>` hash of the wasm binary; verified before execution.
/// [HOMECORE] `sha256:<hex>` hash of the wasm binary.
///
/// **(P4 — not yet enforced, ADR-161/B5):** this field is parsed and
/// round-tripped but is NOT verified before execution. The hash/sig
/// gate lands in P4; until then the presence of this field implies no
/// integrity guarantee.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wasm_module_hash: Option<String>,
/// [HOMECORE] Ed25519 signature of the wasm binary hash (`ed25519:<base64>`).
///
/// **(P4 — not yet enforced, ADR-161/B5):** parsed but never checked.
/// No signature verification happens before a plugin runs.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub wasm_module_sig: Option<String>,
/// [HOMECORE] Ed25519 public key of the plugin publisher.
///
/// **(P4 — not yet enforced, ADR-161/B5):** parsed but never used to
/// verify `wasm_module_sig`.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub publisher_key: Option<String>,

View File

@ -121,8 +121,21 @@ async fn main() -> Result<()> {
let _ = plugin_registry; // wired-but-empty at boot; integrations register here
// ── 4. Automation engine ────────────────────────────────────────
let _automation_engine = AutomationEngine::new(hc.clone());
info!("Automation engine ready (no automations loaded yet)");
// Construct AND start the engine (HC-WS-03, ADR-161). `start()`
// spawns the state-change event loop + the 1 Hz wall-clock timer
// task so state/numeric/event AND time triggers all fire. The
// engine is kept alive for the process lifetime (it is moved into a
// long-lived binding); its background tasks run until the HomeCore
// broadcast channel closes at shutdown. No automations are loaded at
// boot yet (YAML loader is P-next); integrations register via
// `engine.register(..)`.
let automation_engine = AutomationEngine::new(hc.clone());
let _automation_task = automation_engine.start();
info!(
"Automation engine started ({} automations registered) — \
state/numeric/event + time triggers active",
automation_engine.len()
);
// ── 5. Assist pipeline ──────────────────────────────────────────
let recognizer = RegexIntentRecognizer::new();