From 3292bd2c5da487e9f0740645995eb31364c24621 Mon Sep 17 00:00:00 2001 From: ruv Date: Fri, 12 Jun 2026 01:40:23 -0400 Subject: [PATCH] =?UTF-8?q?feat(homecore-automation):=20implement=20bounde?= =?UTF-8?q?d=20RunModes=20Restart/Queued/max=20(ADR-162,=20completes=20ADR?= =?UTF-8?q?-161=20=C2=A7A5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ADR-161 implemented RunMode::Single (AtomicBool re-entrancy guard) + Parallel but honestly left Restart/Queued/max as "ACCEPTED-FUTURE / unbounded parallel" — every non-Single mode spawned an unbounded task. This makes them real. New `runmode` module — per-automation RunState owns the machinery: - Restart: aborts the in-flight action task (tokio::task::AbortHandle) and starts a fresh one. - Queued: serializes runs in arrival order via a per-automation async Mutex — sequential, never concurrent, nothing dropped. - max: N: caps concurrency at N via a per-automation Semaphore; triggers beyond N queue (await a permit) rather than running concurrently (HA bounded semantics). Documented in the module table. - Single/IgnoreFirst/Parallel preserved. engine.rs now holds a RunState per registration and calls run_state.dispatch() at all three trigger sites (event loop, timer, fire_time_for_test); the old spawn_run is removed. engine.rs trimmed to 433 lines. Tests (tests/engine_behaviors.rs) — verified to FAIL on the old unbounded- parallel dispatch (simulated and confirmed each panics), pass on the new: - restart_mode_cancels_prior_run (old: both runs complete → 2; new: 1) - queued_mode_runs_sequentially_not_concurrently (old: max concurrency 3; new: all 3 run, max concurrency 1) - max_two_caps_concurrency_at_two (old: 4 concurrent; new: all 4 run, max 2) homecore-automation --no-default-features: 45 passed (lib 37, engine_behaviors 8), 0 failed. Co-Authored-By: claude-flow --- v2/crates/homecore-automation/src/engine.rs | 83 +++------ v2/crates/homecore-automation/src/lib.rs | 1 + v2/crates/homecore-automation/src/runmode.rs | 153 +++++++++++++++++ .../tests/engine_behaviors.rs | 159 ++++++++++++++++++ 4 files changed, 340 insertions(+), 56 deletions(-) create mode 100644 v2/crates/homecore-automation/src/runmode.rs diff --git a/v2/crates/homecore-automation/src/engine.rs b/v2/crates/homecore-automation/src/engine.rs index b39c675b..9962dbf1 100644 --- a/v2/crates/homecore-automation/src/engine.rs +++ b/v2/crates/homecore-automation/src/engine.rs @@ -3,14 +3,15 @@ //! //! ADR-129 §2 design: one Tokio task per running automation instance. //! -//! ## Run modes (ADR-161, HC-WS-05) +//! ## Run modes (ADR-161 §A5 → completed in ADR-162) //! -//! `RunMode::Single` is enforced via a per-automation `AtomicBool` -//! guard: while an instance is executing, a second trigger is skipped. -//! `Parallel` (and the as-yet-unbounded `Restart`/`Queued`) spawn a -//! fresh instance on every trigger. (Before this fix the doc claimed -//! AtomicBool enforcement but every trigger spawned unbounded parallel -//! tasks regardless of `mode`.) +//! Each registered automation owns a [`RunState`] that implements its +//! `RunMode`: `Single`/`IgnoreFirst` skip re-entrant triggers, `Restart` +//! aborts the in-flight run and starts a fresh one, `Queued` serializes +//! runs in arrival order (nothing dropped), `Parallel` spawns on every +//! trigger, and `max: N` caps concurrency via a per-automation semaphore. +//! (ADR-161 only honored Single/Parallel; Restart/Queued/max were +//! honestly documented as unbounded-parallel until ADR-162.) //! //! ## Time triggers (ADR-161, HC-WS-04) //! @@ -26,7 +27,6 @@ //! `EvalContext::with_templates`), so `template:` conditions evaluate //! against live state instead of always returning false. -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use chrono::{Local, Timelike}; @@ -34,18 +34,18 @@ use tokio::sync::broadcast; use homecore::HomeCore; -use crate::action::ExecutionContext; -use crate::automation::{Automation, RunMode}; +use crate::automation::Automation; use crate::condition::EvalContext; +use crate::runmode::RunState; use crate::template::TemplateEnvironment; use crate::trigger::{Trigger, TriggerContext}; /// An automation registered with the engine, plus its runtime run-state. struct Registered { auto: Arc, - /// `true` while a `Single`-mode instance is executing. Used to - /// skip re-entrant triggers (HC-WS-05). - running: Arc, + /// Run-mode machinery (re-entrancy guard / restart abort handle / + /// queue mutex / concurrency semaphore) for this automation. + run_state: RunState, } /// The automation engine. Holds a HOMECORE handle and a list of registered @@ -69,9 +69,10 @@ impl AutomationEngine { /// Register an automation. Can be called before or after `start()`. pub fn register(&self, automation: Automation) { + let run_state = RunState::new(&automation); self.automations.lock().unwrap().push(Registered { auto: Arc::new(automation), - running: Arc::new(AtomicBool::new(false)), + run_state, }); } @@ -118,13 +119,13 @@ impl AutomationEngine { loop { match rx.recv().await { Ok(event) => { - let snapshot: Vec<(Arc, Arc)> = automations + let snapshot: Vec<(Arc, RunState)> = automations .lock() .unwrap() .iter() - .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .map(|r| (Arc::clone(&r.auto), r.run_state.clone())) .collect(); - for (automation, running) in snapshot { + for (automation, run_state) in snapshot { if !automation.enabled { continue; } @@ -148,7 +149,7 @@ impl AutomationEngine { if !conditions_pass(&automation, &eval_ctx).await { continue; } - spawn_run(&hc, automation, running); + run_state.dispatch(&hc, automation); } } Err(broadcast::error::RecvError::Closed) => break, @@ -183,14 +184,14 @@ impl AutomationEngine { if last_fired_sec.as_deref() == Some(hhmmss.as_str()) { continue; } - let snapshot: Vec<(Arc, Arc)> = automations + let snapshot: Vec<(Arc, RunState)> = automations .lock() .unwrap() .iter() - .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .map(|r| (Arc::clone(&r.auto), r.run_state.clone())) .collect(); let mut fired_any = false; - for (automation, running) in snapshot { + for (automation, run_state) in snapshot { if !automation.enabled { continue; } @@ -208,7 +209,7 @@ impl AutomationEngine { if !conditions_pass(&automation, &eval_ctx).await { continue; } - spawn_run(&hc, automation, running); + run_state.dispatch(&hc, automation); fired_any = true; } if fired_any { @@ -231,15 +232,15 @@ impl AutomationEngine { /// wall-clock second to roll over. Returns the number of automations /// that fired (passed conditions and were spawned). pub async fn fire_time_for_test(&self, hhmmss: &str) -> usize { - let snapshot: Vec<(Arc, Arc)> = self + let snapshot: Vec<(Arc, RunState)> = self .automations .lock() .unwrap() .iter() - .map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running))) + .map(|r| (Arc::clone(&r.auto), r.run_state.clone())) .collect(); let mut fired = 0usize; - for (automation, running) in snapshot { + for (automation, run_state) in snapshot { if !automation.enabled { continue; } @@ -254,7 +255,7 @@ impl AutomationEngine { if !conditions_pass(&automation, &eval_ctx).await { continue; } - spawn_run(&self.hc, automation, running); + run_state.dispatch(&self.hc, automation); fired += 1; } fired @@ -281,36 +282,6 @@ fn time_at_matches(at: &str, hhmmss: &str) -> bool { normalized == hhmmss } -/// Spawn an automation run, honoring `RunMode::Single` re-entrancy -/// guard (HC-WS-05). For `Single`/`IgnoreFirst` modes a run already in -/// flight causes the new trigger to be skipped; the `running` flag is -/// cleared when the run finishes. -fn spawn_run(hc: &HomeCore, automation: Arc, running: Arc) { - let single = matches!(automation.mode, RunMode::Single | RunMode::IgnoreFirst); - if single { - // Try to claim the running slot; if already running, skip. - if running - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - return; - } - } - let hc_clone = hc.clone(); - tokio::spawn(async move { - let mut exec_ctx = ExecutionContext::new(hc_clone, automation.id.clone()); - for action in &automation.action { - if let Err(e) = action.execute(&mut exec_ctx).await { - eprintln!("[homecore-automation] action error in {}: {e}", automation.id); - break; - } - } - if single { - running.store(false, Ordering::SeqCst); - } - }); -} - #[cfg(test)] mod tests { use super::*; diff --git a/v2/crates/homecore-automation/src/lib.rs b/v2/crates/homecore-automation/src/lib.rs index 803625af..17922899 100644 --- a/v2/crates/homecore-automation/src/lib.rs +++ b/v2/crates/homecore-automation/src/lib.rs @@ -19,6 +19,7 @@ pub mod condition; pub mod action; pub mod template; pub mod engine; +pub mod runmode; pub mod error; pub use automation::{Automation, RunMode}; diff --git a/v2/crates/homecore-automation/src/runmode.rs b/v2/crates/homecore-automation/src/runmode.rs new file mode 100644 index 00000000..7590264d --- /dev/null +++ b/v2/crates/homecore-automation/src/runmode.rs @@ -0,0 +1,153 @@ +//! Per-automation run-mode machinery (ADR-162, completes ADR-161 §A5). +//! +//! ADR-161 implemented `RunMode::Single` (a per-automation `AtomicBool` +//! re-entrancy guard) and `Parallel`, but honestly left `Restart`, `Queued` +//! and `max: N` as "ACCEPTED-FUTURE / unbounded parallel" — every non-Single +//! mode spawned an unbounded task. This module makes them real: +//! +//! | Mode | Semantics implemented | +//! |------|-----------------------| +//! | `Single` / `IgnoreFirst` | re-entrancy guard: skip while a run is in flight (ADR-161). | +//! | `Restart` | **cancel** the in-flight run (`tokio::task::AbortHandle`) and start a fresh one. | +//! | `Queued` | **serialize**: runs execute sequentially in arrival order via a per-automation async mutex — nothing is dropped. | +//! | `Parallel` | spawn on every trigger (optionally capped, see below). | +//! | `max: N` | cap concurrency at **N** via a per-automation semaphore; triggers beyond N **queue** (await a permit) rather than running concurrently — matching HA's bounded `parallel`/`queued`. | +//! +//! Each registered automation owns one [`RunState`]; the engine calls +//! [`RunState::dispatch`] on every (trigger + conditions-passed) event. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; + +use tokio::sync::{Mutex as AsyncMutex, Semaphore}; + +use homecore::HomeCore; + +use crate::action::ExecutionContext; +use crate::automation::{Automation, RunMode}; + +/// Per-automation runtime state backing the run-mode dispatch. +/// +/// Cheap to clone (all fields are `Arc`); the engine clones it into each +/// spawned run so the machinery (abort handle, queue mutex, semaphore) is +/// shared across all triggers of the same automation. +#[derive(Clone)] +pub struct RunState { + /// `Single`/`IgnoreFirst` re-entrancy guard (ADR-161 §A5). + running: Arc, + /// `Restart`: handle to the currently-running action task, so a new + /// trigger can abort it before starting a fresh one. + current: Arc>>, + /// `Queued`: serializes runs in arrival order (one at a time, FIFO via + /// fair async mutex acquisition). + queue_lock: Arc>, + /// `max: N` (and bounded `Parallel`): caps concurrent runs at N. + /// `None` when no cap applies. + semaphore: Option>, +} + +impl RunState { + /// Build run-state for an automation, sizing the concurrency semaphore + /// from its `max:` field (only meaningful for `Queued`/`Parallel`). + pub fn new(automation: &Automation) -> Self { + let semaphore = automation + .max + .filter(|n| *n > 0) + .map(|n| Arc::new(Semaphore::new(n))); + Self { + running: Arc::new(AtomicBool::new(false)), + current: Arc::new(Mutex::new(None)), + queue_lock: Arc::new(AsyncMutex::new(())), + semaphore, + } + } + + /// Dispatch one trigger for `automation` according to its `RunMode`. + /// Honors Single re-entrancy, Restart cancel-and-replace, Queued + /// serialization, and `max:` concurrency capping. + pub fn dispatch(&self, hc: &HomeCore, automation: Arc) { + match automation.mode { + RunMode::Single | RunMode::IgnoreFirst => self.dispatch_single(hc, automation), + RunMode::Restart => self.dispatch_restart(hc, automation), + RunMode::Queued => self.dispatch_queued(hc, automation), + RunMode::Parallel => self.dispatch_parallel(hc, automation), + } + } + + /// `Single`: skip if a run is already in flight; clear the flag on done. + fn dispatch_single(&self, hc: &HomeCore, automation: Arc) { + if self + .running + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; // already running — skip re-entrant trigger. + } + let hc = hc.clone(); + let running = Arc::clone(&self.running); + tokio::spawn(async move { + run_actions(&hc, &automation).await; + running.store(false, Ordering::SeqCst); + }); + } + + /// `Restart`: abort the in-flight run (if any), then start a fresh one + /// and record its abort handle. + fn dispatch_restart(&self, hc: &HomeCore, automation: Arc) { + // Abort any prior run before starting the new one. + if let Some(prev) = self.current.lock().unwrap().take() { + prev.abort(); + } + let hc = hc.clone(); + let slot = Arc::clone(&self.current); + let handle = tokio::spawn(async move { + run_actions(&hc, &automation).await; + }); + *slot.lock().unwrap() = Some(handle.abort_handle()); + } + + /// `Queued`: serialize via the per-automation async mutex. Each trigger + /// spawns a task that waits its turn, so all triggers run in arrival + /// order, one at a time — nothing is dropped. + fn dispatch_queued(&self, hc: &HomeCore, automation: Arc) { + let hc = hc.clone(); + let lock = Arc::clone(&self.queue_lock); + let sem = self.semaphore.clone(); + tokio::spawn(async move { + // Optional `max:` cap still applies on top of serialization. + let _permit = match &sem { + Some(s) => Some(s.acquire().await.expect("semaphore not closed")), + None => None, + }; + let _guard = lock.lock().await; // FIFO turn — sequential execution. + run_actions(&hc, &automation).await; + }); + } + + /// `Parallel`: spawn on every trigger, capped at `max:` if set. + fn dispatch_parallel(&self, hc: &HomeCore, automation: Arc) { + let hc = hc.clone(); + let sem = self.semaphore.clone(); + tokio::spawn(async move { + let _permit = match &sem { + Some(s) => Some(s.acquire().await.expect("semaphore not closed")), + None => None, + }; + run_actions(&hc, &automation).await; + }); + } +} + +/// Execute an automation's action sequence once. +async fn run_actions(hc: &HomeCore, automation: &Automation) { + let mut exec_ctx = ExecutionContext::new(hc.clone(), automation.id.clone()); + for action in &automation.action { + if let Err(e) = action.execute(&mut exec_ctx).await { + eprintln!( + "[homecore-automation] action error in {}: {e}", + automation.id + ); + break; + } + } +} diff --git a/v2/crates/homecore-automation/tests/engine_behaviors.rs b/v2/crates/homecore-automation/tests/engine_behaviors.rs index 01a7c7fa..be2f1088 100644 --- a/v2/crates/homecore-automation/tests/engine_behaviors.rs +++ b/v2/crates/homecore-automation/tests/engine_behaviors.rs @@ -257,3 +257,162 @@ async fn template_condition_evaluates_false_blocks_action() { sleep(Duration::from_millis(50)).await; assert_eq!(log.lock().unwrap().len(), 0, "false template condition should block the action"); } + +// ── ADR-162 (completes ADR-161 §A5): bounded RunModes ─────────────── +// +// ADR-161 honored only Single/Parallel; Restart/Queued/max were honestly +// documented as unbounded-parallel. These tests drive the real +// Restart/Queued/max machinery and FAIL on the old engine (where every +// non-Single mode spawned an unbounded parallel task). + +/// A service that increments a live concurrency gauge on entry, sleeps, +/// then decrements — recording the maximum concurrency ever observed and +/// the total number of completed runs. Returns `(max_concurrency, completed)`. +async fn register_gauge( + hc: &HomeCore, + domain: &str, + service: &str, + work: Duration, +) -> (Arc, Arc) { + let live = Arc::new(AtomicUsize::new(0)); + let max_seen = Arc::new(AtomicUsize::new(0)); + let completed = Arc::new(AtomicUsize::new(0)); + let (l, m, c) = (Arc::clone(&live), Arc::clone(&max_seen), Arc::clone(&completed)); + hc.services() + .register( + ServiceName::new(domain, service), + FnHandler(move |_call: ServiceCall| { + let (l, m, c) = (Arc::clone(&l), Arc::clone(&m), Arc::clone(&c)); + async move { + let now = l.fetch_add(1, Ordering::SeqCst) + 1; + m.fetch_max(now, Ordering::SeqCst); + sleep(work).await; + l.fetch_sub(1, Ordering::SeqCst); + c.fetch_add(1, Ordering::SeqCst); + Ok(serde_json::Value::Null) + } + }), + ) + .await; + (max_seen, completed) +} + +fn state_auto(id: &str, entity: &str, domain: &str, service: &str) -> Automation { + Automation::new( + id, + vec![Trigger::State { + entity_id: EntityId::parse(entity).unwrap(), + from: None, + to: None, + }], + vec![Action::ServiceCall { + domain: domain.into(), + service: service.into(), + data: serde_json::json!({}), + }], + ) +} + +// ── Restart: cancels the in-flight run ───────────────────────────── +#[tokio::test] +async fn restart_mode_cancels_prior_run() { + let hc = HomeCore::new(); + // Each run sleeps 300ms before recording completion. + let (_max, completed) = + register_gauge(&hc, "light", "slow", Duration::from_millis(300)).await; + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = state_auto("restart_auto", "switch.r", "light", "slow"); + auto.mode = RunMode::Restart; + engine.register(auto); + let _handle = engine.start(); + + // Trigger 1 starts the slow run. + hc.states().set(EntityId::parse("switch.r").unwrap(), "a", serde_json::json!({}), Context::new()); + sleep(Duration::from_millis(80)).await; + // Trigger 2 arrives mid-run → must ABORT run 1 and start run 2. + hc.states().set(EntityId::parse("switch.r").unwrap(), "b", serde_json::json!({}), Context::new()); + + // Wait long enough for run 2 (started ~80ms in) to finish, but run 1 + // (aborted at ~80ms, would have finished at ~300ms) must NOT complete. + sleep(Duration::from_millis(400)).await; + assert_eq!( + completed.load(Ordering::SeqCst), + 1, + "Restart must cancel the in-flight run: exactly the restarted run completes (not both). \ + On the old engine both ran to completion → 2." + ); +} + +// ── Queued: serialize N rapid triggers, all run, never concurrent ── +#[tokio::test] +async fn queued_mode_runs_sequentially_not_concurrently() { + let hc = HomeCore::new(); + let (max_seen, completed) = + register_gauge(&hc, "light", "slow", Duration::from_millis(120)).await; + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = state_auto("queued_auto", "switch.q", "light", "slow"); + auto.mode = RunMode::Queued; + engine.register(auto); + let _handle = engine.start(); + + // Three rapid triggers. + for v in ["a", "b", "c"] { + hc.states().set(EntityId::parse("switch.q").unwrap(), v, serde_json::json!({}), Context::new()); + sleep(Duration::from_millis(10)).await; + } + + // 3 runs × 120ms serialized ≈ 360ms; wait generously. + sleep(Duration::from_millis(600)).await; + assert_eq!( + completed.load(Ordering::SeqCst), + 3, + "Queued must run every trigger (nothing dropped)" + ); + assert_eq!( + max_seen.load(Ordering::SeqCst), + 1, + "Queued must never run two instances concurrently. On the old engine all 3 ran in \ + parallel → max concurrency 3." + ); +} + +// ── max: 2 → never more than 2 concurrent ────────────────────────── +#[tokio::test] +async fn max_two_caps_concurrency_at_two() { + let hc = HomeCore::new(); + let (max_seen, completed) = + register_gauge(&hc, "light", "slow", Duration::from_millis(150)).await; + + let engine = AutomationEngine::new(hc.clone()); + let mut auto = state_auto("max_auto", "switch.m", "light", "slow"); + auto.mode = RunMode::Parallel; + auto.max = Some(2); + engine.register(auto); + let _handle = engine.start(); + + // Four rapid triggers — without the cap all 4 would run at once. + for v in ["a", "b", "c", "d"] { + hc.states().set(EntityId::parse("switch.m").unwrap(), v, serde_json::json!({}), Context::new()); + sleep(Duration::from_millis(10)).await; + } + + sleep(Duration::from_millis(600)).await; + assert_eq!( + completed.load(Ordering::SeqCst), + 4, + "max:2 must still run all 4 triggers (queued beyond the cap, not dropped)" + ); + assert!( + max_seen.load(Ordering::SeqCst) <= 2, + "max:2 must never exceed 2 concurrent runs (observed {}). On the old engine all 4 ran \ + concurrently → 4.", + max_seen.load(Ordering::SeqCst) + ); + assert!( + max_seen.load(Ordering::SeqCst) >= 2, + "max:2 should reach the cap of 2 with 4 rapid triggers (observed {})", + max_seen.load(Ordering::SeqCst) + ); +}