feat(homecore-automation): implement bounded RunModes Restart/Queued/max (ADR-162, completes ADR-161 §A5)

ADR-161 implemented RunMode::Single (AtomicBool re-entrancy guard) + Parallel
but honestly left Restart/Queued/max as "ACCEPTED-FUTURE / unbounded parallel" —
every non-Single mode spawned an unbounded task. This makes them real.

New `runmode` module — per-automation RunState owns the machinery:
- Restart: aborts the in-flight action task (tokio::task::AbortHandle) and
  starts a fresh one.
- Queued: serializes runs in arrival order via a per-automation async Mutex —
  sequential, never concurrent, nothing dropped.
- max: N: caps concurrency at N via a per-automation Semaphore; triggers beyond
  N queue (await a permit) rather than running concurrently (HA bounded
  semantics). Documented in the module table.
- Single/IgnoreFirst/Parallel preserved.

engine.rs now holds a RunState per registration and calls run_state.dispatch()
at all three trigger sites (event loop, timer, fire_time_for_test); the old
spawn_run is removed. engine.rs trimmed to 433 lines.

Tests (tests/engine_behaviors.rs) — verified to FAIL on the old unbounded-
parallel dispatch (simulated and confirmed each panics), pass on the new:
- restart_mode_cancels_prior_run (old: both runs complete → 2; new: 1)
- queued_mode_runs_sequentially_not_concurrently (old: max concurrency 3; new:
  all 3 run, max concurrency 1)
- max_two_caps_concurrency_at_two (old: 4 concurrent; new: all 4 run, max 2)

homecore-automation --no-default-features: 45 passed (lib 37, engine_behaviors
8), 0 failed.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-06-12 01:40:23 -04:00
parent 0ca903b497
commit 3292bd2c5d
4 changed files with 340 additions and 56 deletions

View File

@ -3,14 +3,15 @@
//!
//! ADR-129 §2 design: one Tokio task per running automation instance.
//!
//! ## Run modes (ADR-161, HC-WS-05)
//! ## Run modes (ADR-161 §A5 → completed in ADR-162)
//!
//! `RunMode::Single` is enforced via a per-automation `AtomicBool`
//! guard: while an instance is executing, a second trigger is skipped.
//! `Parallel` (and the as-yet-unbounded `Restart`/`Queued`) spawn a
//! fresh instance on every trigger. (Before this fix the doc claimed
//! AtomicBool enforcement but every trigger spawned unbounded parallel
//! tasks regardless of `mode`.)
//! Each registered automation owns a [`RunState`] that implements its
//! `RunMode`: `Single`/`IgnoreFirst` skip re-entrant triggers, `Restart`
//! aborts the in-flight run and starts a fresh one, `Queued` serializes
//! runs in arrival order (nothing dropped), `Parallel` spawns on every
//! trigger, and `max: N` caps concurrency via a per-automation semaphore.
//! (ADR-161 only honored Single/Parallel; Restart/Queued/max were
//! honestly documented as unbounded-parallel until ADR-162.)
//!
//! ## Time triggers (ADR-161, HC-WS-04)
//!
@ -26,7 +27,6 @@
//! `EvalContext::with_templates`), so `template:` conditions evaluate
//! against live state instead of always returning false.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use chrono::{Local, Timelike};
@ -34,18 +34,18 @@ use tokio::sync::broadcast;
use homecore::HomeCore;
use crate::action::ExecutionContext;
use crate::automation::{Automation, RunMode};
use crate::automation::Automation;
use crate::condition::EvalContext;
use crate::runmode::RunState;
use crate::template::TemplateEnvironment;
use crate::trigger::{Trigger, TriggerContext};
/// An automation registered with the engine, plus its runtime run-state.
struct Registered {
auto: Arc<Automation>,
/// `true` while a `Single`-mode instance is executing. Used to
/// skip re-entrant triggers (HC-WS-05).
running: Arc<AtomicBool>,
/// Run-mode machinery (re-entrancy guard / restart abort handle /
/// queue mutex / concurrency semaphore) for this automation.
run_state: RunState,
}
/// The automation engine. Holds a HOMECORE handle and a list of registered
@ -69,9 +69,10 @@ impl AutomationEngine {
/// Register an automation. Can be called before or after `start()`.
pub fn register(&self, automation: Automation) {
let run_state = RunState::new(&automation);
self.automations.lock().unwrap().push(Registered {
auto: Arc::new(automation),
running: Arc::new(AtomicBool::new(false)),
run_state,
});
}
@ -118,13 +119,13 @@ impl AutomationEngine {
loop {
match rx.recv().await {
Ok(event) => {
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = automations
let snapshot: Vec<(Arc<Automation>, RunState)> = automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.map(|r| (Arc::clone(&r.auto), r.run_state.clone()))
.collect();
for (automation, running) in snapshot {
for (automation, run_state) in snapshot {
if !automation.enabled {
continue;
}
@ -148,7 +149,7 @@ impl AutomationEngine {
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
spawn_run(&hc, automation, running);
run_state.dispatch(&hc, automation);
}
}
Err(broadcast::error::RecvError::Closed) => break,
@ -183,14 +184,14 @@ impl AutomationEngine {
if last_fired_sec.as_deref() == Some(hhmmss.as_str()) {
continue;
}
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = automations
let snapshot: Vec<(Arc<Automation>, RunState)> = automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.map(|r| (Arc::clone(&r.auto), r.run_state.clone()))
.collect();
let mut fired_any = false;
for (automation, running) in snapshot {
for (automation, run_state) in snapshot {
if !automation.enabled {
continue;
}
@ -208,7 +209,7 @@ impl AutomationEngine {
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
spawn_run(&hc, automation, running);
run_state.dispatch(&hc, automation);
fired_any = true;
}
if fired_any {
@ -231,15 +232,15 @@ impl AutomationEngine {
/// wall-clock second to roll over. Returns the number of automations
/// that fired (passed conditions and were spawned).
pub async fn fire_time_for_test(&self, hhmmss: &str) -> usize {
let snapshot: Vec<(Arc<Automation>, Arc<AtomicBool>)> = self
let snapshot: Vec<(Arc<Automation>, RunState)> = self
.automations
.lock()
.unwrap()
.iter()
.map(|r| (Arc::clone(&r.auto), Arc::clone(&r.running)))
.map(|r| (Arc::clone(&r.auto), r.run_state.clone()))
.collect();
let mut fired = 0usize;
for (automation, running) in snapshot {
for (automation, run_state) in snapshot {
if !automation.enabled {
continue;
}
@ -254,7 +255,7 @@ impl AutomationEngine {
if !conditions_pass(&automation, &eval_ctx).await {
continue;
}
spawn_run(&self.hc, automation, running);
run_state.dispatch(&self.hc, automation);
fired += 1;
}
fired
@ -281,36 +282,6 @@ fn time_at_matches(at: &str, hhmmss: &str) -> bool {
normalized == hhmmss
}
/// Spawn an automation run, honoring `RunMode::Single` re-entrancy
/// guard (HC-WS-05). For `Single`/`IgnoreFirst` modes a run already in
/// flight causes the new trigger to be skipped; the `running` flag is
/// cleared when the run finishes.
fn spawn_run(hc: &HomeCore, automation: Arc<Automation>, running: Arc<AtomicBool>) {
let single = matches!(automation.mode, RunMode::Single | RunMode::IgnoreFirst);
if single {
// Try to claim the running slot; if already running, skip.
if running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return;
}
}
let hc_clone = hc.clone();
tokio::spawn(async move {
let mut exec_ctx = ExecutionContext::new(hc_clone, automation.id.clone());
for action in &automation.action {
if let Err(e) = action.execute(&mut exec_ctx).await {
eprintln!("[homecore-automation] action error in {}: {e}", automation.id);
break;
}
}
if single {
running.store(false, Ordering::SeqCst);
}
});
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -19,6 +19,7 @@ pub mod condition;
pub mod action;
pub mod template;
pub mod engine;
pub mod runmode;
pub mod error;
pub use automation::{Automation, RunMode};

View File

@ -0,0 +1,153 @@
//! Per-automation run-mode machinery (ADR-162, completes ADR-161 §A5).
//!
//! ADR-161 implemented `RunMode::Single` (a per-automation `AtomicBool`
//! re-entrancy guard) and `Parallel`, but honestly left `Restart`, `Queued`
//! and `max: N` as "ACCEPTED-FUTURE / unbounded parallel" — every non-Single
//! mode spawned an unbounded task. This module makes them real:
//!
//! | Mode | Semantics implemented |
//! |------|-----------------------|
//! | `Single` / `IgnoreFirst` | re-entrancy guard: skip while a run is in flight (ADR-161). |
//! | `Restart` | **cancel** the in-flight run (`tokio::task::AbortHandle`) and start a fresh one. |
//! | `Queued` | **serialize**: runs execute sequentially in arrival order via a per-automation async mutex — nothing is dropped. |
//! | `Parallel` | spawn on every trigger (optionally capped, see below). |
//! | `max: N` | cap concurrency at **N** via a per-automation semaphore; triggers beyond N **queue** (await a permit) rather than running concurrently — matching HA's bounded `parallel`/`queued`. |
//!
//! Each registered automation owns one [`RunState`]; the engine calls
//! [`RunState::dispatch`] on every (trigger + conditions-passed) event.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::{Mutex as AsyncMutex, Semaphore};
use homecore::HomeCore;
use crate::action::ExecutionContext;
use crate::automation::{Automation, RunMode};
/// Per-automation runtime state backing the run-mode dispatch.
///
/// Cheap to clone (all fields are `Arc`); the engine clones it into each
/// spawned run so the machinery (abort handle, queue mutex, semaphore) is
/// shared across all triggers of the same automation.
#[derive(Clone)]
pub struct RunState {
/// `Single`/`IgnoreFirst` re-entrancy guard (ADR-161 §A5).
running: Arc<AtomicBool>,
/// `Restart`: handle to the currently-running action task, so a new
/// trigger can abort it before starting a fresh one.
current: Arc<Mutex<Option<tokio::task::AbortHandle>>>,
/// `Queued`: serializes runs in arrival order (one at a time, FIFO via
/// fair async mutex acquisition).
queue_lock: Arc<AsyncMutex<()>>,
/// `max: N` (and bounded `Parallel`): caps concurrent runs at N.
/// `None` when no cap applies.
semaphore: Option<Arc<Semaphore>>,
}
impl RunState {
/// Build run-state for an automation, sizing the concurrency semaphore
/// from its `max:` field (only meaningful for `Queued`/`Parallel`).
pub fn new(automation: &Automation) -> Self {
let semaphore = automation
.max
.filter(|n| *n > 0)
.map(|n| Arc::new(Semaphore::new(n)));
Self {
running: Arc::new(AtomicBool::new(false)),
current: Arc::new(Mutex::new(None)),
queue_lock: Arc::new(AsyncMutex::new(())),
semaphore,
}
}
/// Dispatch one trigger for `automation` according to its `RunMode`.
/// Honors Single re-entrancy, Restart cancel-and-replace, Queued
/// serialization, and `max:` concurrency capping.
pub fn dispatch(&self, hc: &HomeCore, automation: Arc<Automation>) {
match automation.mode {
RunMode::Single | RunMode::IgnoreFirst => self.dispatch_single(hc, automation),
RunMode::Restart => self.dispatch_restart(hc, automation),
RunMode::Queued => self.dispatch_queued(hc, automation),
RunMode::Parallel => self.dispatch_parallel(hc, automation),
}
}
/// `Single`: skip if a run is already in flight; clear the flag on done.
fn dispatch_single(&self, hc: &HomeCore, automation: Arc<Automation>) {
if self
.running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return; // already running — skip re-entrant trigger.
}
let hc = hc.clone();
let running = Arc::clone(&self.running);
tokio::spawn(async move {
run_actions(&hc, &automation).await;
running.store(false, Ordering::SeqCst);
});
}
/// `Restart`: abort the in-flight run (if any), then start a fresh one
/// and record its abort handle.
fn dispatch_restart(&self, hc: &HomeCore, automation: Arc<Automation>) {
// Abort any prior run before starting the new one.
if let Some(prev) = self.current.lock().unwrap().take() {
prev.abort();
}
let hc = hc.clone();
let slot = Arc::clone(&self.current);
let handle = tokio::spawn(async move {
run_actions(&hc, &automation).await;
});
*slot.lock().unwrap() = Some(handle.abort_handle());
}
/// `Queued`: serialize via the per-automation async mutex. Each trigger
/// spawns a task that waits its turn, so all triggers run in arrival
/// order, one at a time — nothing is dropped.
fn dispatch_queued(&self, hc: &HomeCore, automation: Arc<Automation>) {
let hc = hc.clone();
let lock = Arc::clone(&self.queue_lock);
let sem = self.semaphore.clone();
tokio::spawn(async move {
// Optional `max:` cap still applies on top of serialization.
let _permit = match &sem {
Some(s) => Some(s.acquire().await.expect("semaphore not closed")),
None => None,
};
let _guard = lock.lock().await; // FIFO turn — sequential execution.
run_actions(&hc, &automation).await;
});
}
/// `Parallel`: spawn on every trigger, capped at `max:` if set.
fn dispatch_parallel(&self, hc: &HomeCore, automation: Arc<Automation>) {
let hc = hc.clone();
let sem = self.semaphore.clone();
tokio::spawn(async move {
let _permit = match &sem {
Some(s) => Some(s.acquire().await.expect("semaphore not closed")),
None => None,
};
run_actions(&hc, &automation).await;
});
}
}
/// Execute an automation's action sequence once.
async fn run_actions(hc: &HomeCore, automation: &Automation) {
let mut exec_ctx = ExecutionContext::new(hc.clone(), automation.id.clone());
for action in &automation.action {
if let Err(e) = action.execute(&mut exec_ctx).await {
eprintln!(
"[homecore-automation] action error in {}: {e}",
automation.id
);
break;
}
}
}

View File

@ -257,3 +257,162 @@ async fn template_condition_evaluates_false_blocks_action() {
sleep(Duration::from_millis(50)).await;
assert_eq!(log.lock().unwrap().len(), 0, "false template condition should block the action");
}
// ── ADR-162 (completes ADR-161 §A5): bounded RunModes ───────────────
//
// ADR-161 honored only Single/Parallel; Restart/Queued/max were honestly
// documented as unbounded-parallel. These tests drive the real
// Restart/Queued/max machinery and FAIL on the old engine (where every
// non-Single mode spawned an unbounded parallel task).
/// A service that increments a live concurrency gauge on entry, sleeps,
/// then decrements — recording the maximum concurrency ever observed and
/// the total number of completed runs. Returns `(max_concurrency, completed)`.
async fn register_gauge(
hc: &HomeCore,
domain: &str,
service: &str,
work: Duration,
) -> (Arc<AtomicUsize>, Arc<AtomicUsize>) {
let live = Arc::new(AtomicUsize::new(0));
let max_seen = Arc::new(AtomicUsize::new(0));
let completed = Arc::new(AtomicUsize::new(0));
let (l, m, c) = (Arc::clone(&live), Arc::clone(&max_seen), Arc::clone(&completed));
hc.services()
.register(
ServiceName::new(domain, service),
FnHandler(move |_call: ServiceCall| {
let (l, m, c) = (Arc::clone(&l), Arc::clone(&m), Arc::clone(&c));
async move {
let now = l.fetch_add(1, Ordering::SeqCst) + 1;
m.fetch_max(now, Ordering::SeqCst);
sleep(work).await;
l.fetch_sub(1, Ordering::SeqCst);
c.fetch_add(1, Ordering::SeqCst);
Ok(serde_json::Value::Null)
}
}),
)
.await;
(max_seen, completed)
}
fn state_auto(id: &str, entity: &str, domain: &str, service: &str) -> Automation {
Automation::new(
id,
vec![Trigger::State {
entity_id: EntityId::parse(entity).unwrap(),
from: None,
to: None,
}],
vec![Action::ServiceCall {
domain: domain.into(),
service: service.into(),
data: serde_json::json!({}),
}],
)
}
// ── Restart: cancels the in-flight run ─────────────────────────────
#[tokio::test]
async fn restart_mode_cancels_prior_run() {
let hc = HomeCore::new();
// Each run sleeps 300ms before recording completion.
let (_max, completed) =
register_gauge(&hc, "light", "slow", Duration::from_millis(300)).await;
let engine = AutomationEngine::new(hc.clone());
let mut auto = state_auto("restart_auto", "switch.r", "light", "slow");
auto.mode = RunMode::Restart;
engine.register(auto);
let _handle = engine.start();
// Trigger 1 starts the slow run.
hc.states().set(EntityId::parse("switch.r").unwrap(), "a", serde_json::json!({}), Context::new());
sleep(Duration::from_millis(80)).await;
// Trigger 2 arrives mid-run → must ABORT run 1 and start run 2.
hc.states().set(EntityId::parse("switch.r").unwrap(), "b", serde_json::json!({}), Context::new());
// Wait long enough for run 2 (started ~80ms in) to finish, but run 1
// (aborted at ~80ms, would have finished at ~300ms) must NOT complete.
sleep(Duration::from_millis(400)).await;
assert_eq!(
completed.load(Ordering::SeqCst),
1,
"Restart must cancel the in-flight run: exactly the restarted run completes (not both). \
On the old engine both ran to completion 2."
);
}
// ── Queued: serialize N rapid triggers, all run, never concurrent ──
#[tokio::test]
async fn queued_mode_runs_sequentially_not_concurrently() {
let hc = HomeCore::new();
let (max_seen, completed) =
register_gauge(&hc, "light", "slow", Duration::from_millis(120)).await;
let engine = AutomationEngine::new(hc.clone());
let mut auto = state_auto("queued_auto", "switch.q", "light", "slow");
auto.mode = RunMode::Queued;
engine.register(auto);
let _handle = engine.start();
// Three rapid triggers.
for v in ["a", "b", "c"] {
hc.states().set(EntityId::parse("switch.q").unwrap(), v, serde_json::json!({}), Context::new());
sleep(Duration::from_millis(10)).await;
}
// 3 runs × 120ms serialized ≈ 360ms; wait generously.
sleep(Duration::from_millis(600)).await;
assert_eq!(
completed.load(Ordering::SeqCst),
3,
"Queued must run every trigger (nothing dropped)"
);
assert_eq!(
max_seen.load(Ordering::SeqCst),
1,
"Queued must never run two instances concurrently. On the old engine all 3 ran in \
parallel max concurrency 3."
);
}
// ── max: 2 → never more than 2 concurrent ──────────────────────────
#[tokio::test]
async fn max_two_caps_concurrency_at_two() {
let hc = HomeCore::new();
let (max_seen, completed) =
register_gauge(&hc, "light", "slow", Duration::from_millis(150)).await;
let engine = AutomationEngine::new(hc.clone());
let mut auto = state_auto("max_auto", "switch.m", "light", "slow");
auto.mode = RunMode::Parallel;
auto.max = Some(2);
engine.register(auto);
let _handle = engine.start();
// Four rapid triggers — without the cap all 4 would run at once.
for v in ["a", "b", "c", "d"] {
hc.states().set(EntityId::parse("switch.m").unwrap(), v, serde_json::json!({}), Context::new());
sleep(Duration::from_millis(10)).await;
}
sleep(Duration::from_millis(600)).await;
assert_eq!(
completed.load(Ordering::SeqCst),
4,
"max:2 must still run all 4 triggers (queued beyond the cap, not dropped)"
);
assert!(
max_seen.load(Ordering::SeqCst) <= 2,
"max:2 must never exceed 2 concurrent runs (observed {}). On the old engine all 4 ran \
concurrently 4.",
max_seen.load(Ordering::SeqCst)
);
assert!(
max_seen.load(Ordering::SeqCst) >= 2,
"max:2 should reach the cap of 2 with 4 rapid triggers (observed {})",
max_seen.load(Ordering::SeqCst)
);
}