feat(homecore/p1): ADR-127 state machine scaffold (20 tests pass)

New crate v2/crates/homecore/ — DashMap state machine, tokio
broadcast event bus, service registry (direct-dispatch P1),
in-memory entity registry, HA-compat wire constants.

20/20 unit tests pass. EntityId rejects unicode per ADR-127 Q1
(ASCII strict P1). State machine suppresses no-op writes,
preserves last_changed on attribute-only updates, fires
state_changed broadcast for every real write.

Critical path foundation — ADR-130 (API) and ADR-128 (plugins)
can begin P1 once this is in main.

Refs: docs/adr/ADR-127-homecore-state-machine-rust.md
Refs: #798

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-25 18:03:27 -04:00
parent f05dca3798
commit a52e046143
10 changed files with 1190 additions and 0 deletions

View File

@ -28,6 +28,7 @@ members = [
"crates/wifi-densepose-geo",
"crates/nvsim",
"crates/nvsim-server",
"crates/homecore", # ADR-127 — HOMECORE state machine
# ADR-100/ADR-101: Cognitum Cog packaging — first Cog from this repo.
# Ships the wifi-densepose pose-estimation model as a signed binary +
# JSONL manifest installable by the Cognitum V0 appliance (cognitum-v0,

View File

@ -0,0 +1,43 @@
# HOMECORE — Rust state machine, event bus, service registry, entity registry.
# Implements ADR-127 (HOMECORE-CORE), the foundation of the HOMECORE Home
# Assistant port (ADR-126 master + ADR-128/129/130/131/132/133/134 sub-ADRs).
#
# P1 scaffold (this commit): public types + DashMap-backed state machine +
# Tokio broadcast event bus + minimal entity registry. Persistence and the
# full HA-compat serde schema land in P2.
[package]
name = "homecore"
version = "0.1.0-alpha.0"
edition = "2021"
license = "MIT"
authors = ["rUv <ruv@ruv.net>", "HOMECORE Contributors"]
description = "Rust state machine + event bus + service registry — the foundation of the HOMECORE Home Assistant port (ADR-127)"
repository = "https://github.com/ruvnet/RuView"
[lib]
name = "homecore"
path = "src/lib.rs"
[dependencies]
# Core async runtime — matches the rest of the v2/ workspace (sensing-server etc.)
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }
# DashMap for the concurrent state store — ADR-127 §2.1.
dashmap = "6"
# Typed event channels + service handler boxing
futures = "0.3"
async-trait = "0.1"
# Time types matched to HA's UTC datetime usage
chrono = { version = "0.4", features = ["serde"] }
# Schema validation replacement for voluptuous (ADR-127 §3)
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Unique IDs (Context, ConfigEntryId, DeviceId)
uuid = { version = "1", features = ["v4", "serde"] }
# Error handling
thiserror = "1"
# Read-only static catalogs (event type names etc.)
once_cell = "1"
[dev-dependencies]
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros", "test-util"] }

View File

@ -0,0 +1,90 @@
//! Event bus — typed system events + untyped domain events.
//!
//! ADR-127 §2.2: HA's single dict-typed event channel becomes two:
//! - typed `SystemEvent` channel for known shapes (recorder, automation)
//! - untyped `DomainEvent` channel for arbitrary integration events
//!
//! Capacity 4,096 on both. Lagged receivers must re-sync (recorder
//! re-reads current state; automation re-evaluates triggers).
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::event::{DomainEvent, SystemEvent};
pub const EVENT_CHANNEL_CAPACITY: usize = 4096;
#[derive(Clone)]
pub struct EventBus {
inner: Arc<EventBusInner>,
}
struct EventBusInner {
system_tx: broadcast::Sender<SystemEvent>,
domain_tx: broadcast::Sender<DomainEvent>,
}
impl EventBus {
pub fn new() -> Self {
let (system_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
let (domain_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
inner: Arc::new(EventBusInner { system_tx, domain_tx }),
}
}
pub fn subscribe_system(&self) -> broadcast::Receiver<SystemEvent> {
self.inner.system_tx.subscribe()
}
pub fn subscribe_domain(&self) -> broadcast::Receiver<DomainEvent> {
self.inner.domain_tx.subscribe()
}
/// Fire a typed system event. Returns the number of active
/// receivers (zero is fine).
pub fn fire_system(&self, event: SystemEvent) -> usize {
self.inner.system_tx.send(event).unwrap_or(0)
}
/// Fire an untyped domain event. Mirrors `hass.bus.async_fire`.
pub fn fire_domain(&self, event: DomainEvent) -> usize {
self.inner.domain_tx.send(event).unwrap_or(0)
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::Context;
#[tokio::test]
async fn fire_system_reaches_subscriber() {
let bus = EventBus::new();
let mut rx = bus.subscribe_system();
bus.fire_system(SystemEvent::HomeCoreStarted);
let event = rx.recv().await.unwrap();
assert!(matches!(event, SystemEvent::HomeCoreStarted));
}
#[tokio::test]
async fn fire_domain_reaches_subscriber() {
let bus = EventBus::new();
let mut rx = bus.subscribe_domain();
bus.fire_domain(DomainEvent::new(
"ruview_csi_frame",
serde_json::json!({"frame_id": 42}),
Context::new(),
));
let event = rx.recv().await.unwrap();
assert_eq!(event.event_type, "ruview_csi_frame");
assert_eq!(event.event_data["frame_id"], 42);
}
}

View File

@ -0,0 +1,238 @@
//! Entity ID newtype + immutable state snapshot type.
//!
//! Mirrors `homeassistant/core.py` `State` and the `entity_id` string
//! validation that every public HA call performs.
//!
//! ## EntityId validation (ADR-127 §2.1 + Q1)
//!
//! HA accepts unicode entity IDs since 2024.3. HOMECORE P1 accepts the
//! ASCII subset `[a-z0-9_]+\.[a-z0-9_]+` and rejects everything else
//! with a clear error. Unicode acceptance is deferred to P2 once the
//! Q1 strictness decision is made (see ADR-127 §8).
use std::fmt;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use thiserror::Error;
use crate::event::Context;
/// Validated `domain.name` entity identifier.
///
/// Construct via [`EntityId::parse`] or [`EntityId::new`]; both validate
/// against the format `[a-z0-9_]+\.[a-z0-9_]+`. Custom `Serialize` /
/// `Deserialize` round-trips as a plain JSON string (matching HA's wire
/// format) and re-validates on deserialize so invalid IDs from disk
/// fail at load time rather than at first use.
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct EntityId(Arc<str>);
impl Serialize for EntityId {
fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
ser.serialize_str(&self.0)
}
}
impl<'de> Deserialize<'de> for EntityId {
fn deserialize<D: Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
let s = String::deserialize(de)?;
EntityId::parse(s).map_err(serde::de::Error::custom)
}
}
impl EntityId {
/// Validates and constructs an `EntityId`. Returns
/// [`EntityIdError`] if the input is not `domain.name` shape with
/// ASCII lowercase / digits / underscore in each segment.
pub fn parse(s: impl Into<String>) -> Result<Self, EntityIdError> {
let s: String = s.into();
let (domain, name) = s
.split_once('.')
.ok_or_else(|| EntityIdError::MissingDot(s.clone()))?;
if domain.is_empty() {
return Err(EntityIdError::EmptyDomain(s));
}
if name.is_empty() {
return Err(EntityIdError::EmptyName(s));
}
for ch in domain.chars().chain(name.chars()) {
if !(ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_') {
return Err(EntityIdError::InvalidChar { entity_id: s, ch });
}
}
Ok(Self(Arc::from(s)))
}
/// Same as [`Self::parse`] but takes a `&str` and returns
/// `Result<&'static EntityId, ...>` for constant entity IDs known
/// at compile time. Used by ADR-128 plugins to register fixed-name
/// services like `homeassistant.restart`.
pub fn new(s: &str) -> Result<Self, EntityIdError> {
Self::parse(s.to_owned())
}
/// Returns the `domain` part (everything before the first `.`).
pub fn domain(&self) -> &str {
self.0.split_once('.').map(|(d, _)| d).unwrap_or(&self.0)
}
/// Returns the `name` part (everything after the first `.`).
pub fn name(&self) -> &str {
self.0.split_once('.').map(|(_, n)| n).unwrap_or("")
}
/// Underlying string view.
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Debug for EntityId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "EntityId({})", self.0)
}
}
impl fmt::Display for EntityId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Error, Debug, Clone, Eq, PartialEq)]
pub enum EntityIdError {
#[error("entity_id {0:?} is missing the required '.' between domain and name")]
MissingDot(String),
#[error("entity_id {0:?} has an empty domain segment")]
EmptyDomain(String),
#[error("entity_id {0:?} has an empty name segment")]
EmptyName(String),
#[error("entity_id {entity_id:?} contains invalid character {ch:?} — only [a-z0-9_] allowed (HA-compat ASCII subset; see ADR-127 §Q1)")]
InvalidChar { entity_id: String, ch: char },
}
/// Immutable state snapshot for one entity at one moment in time.
///
/// Mirrors `homeassistant.core.State`. Reader-cloneable via `Arc<State>`;
/// writers atomically replace the entry in the `DashMap` so observers
/// never see a partial mutation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct State {
pub entity_id: EntityId,
pub state: String,
/// Attribute bag — accepts whatever JSON the integration emits.
/// Mirrors HA's `Dict[str, Any]` attribute model.
pub attributes: serde_json::Value,
/// When the `state` field last changed value. Only bumped if the
/// new state string differs from the old; attribute-only updates
/// preserve this timestamp.
pub last_changed: DateTime<Utc>,
/// When this snapshot was written. Bumped on every `set` call,
/// including attribute-only updates.
pub last_updated: DateTime<Utc>,
/// Causality context — links state changes to the user / automation
/// / service call that originated them. Mirrors HA's `Context`.
pub context: Context,
}
impl State {
/// Construct a fresh state snapshot at `now`.
pub fn new(
entity_id: EntityId,
state: impl Into<String>,
attributes: serde_json::Value,
context: Context,
) -> Self {
let now = Utc::now();
Self {
entity_id,
state: state.into(),
attributes,
last_changed: now,
last_updated: now,
context,
}
}
/// Construct the next state snapshot. If the new `state` string
/// equals the prior `state`, `last_changed` is preserved.
pub fn next(
&self,
new_state: impl Into<String>,
new_attributes: serde_json::Value,
context: Context,
) -> Self {
let new_state = new_state.into();
let now = Utc::now();
let last_changed = if new_state == self.state {
self.last_changed
} else {
now
};
Self {
entity_id: self.entity_id.clone(),
state: new_state,
attributes: new_attributes,
last_changed,
last_updated: now,
context,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn entity_id_parses_valid() {
let e = EntityId::parse("light.living_room").unwrap();
assert_eq!(e.domain(), "light");
assert_eq!(e.name(), "living_room");
assert_eq!(e.as_str(), "light.living_room");
}
#[test]
fn entity_id_rejects_missing_dot() {
assert!(matches!(
EntityId::parse("light_living_room"),
Err(EntityIdError::MissingDot(_))
));
}
#[test]
fn entity_id_rejects_uppercase() {
let err = EntityId::parse("light.LivingRoom").unwrap_err();
match err {
EntityIdError::InvalidChar { ch, .. } => assert_eq!(ch, 'L'),
other => panic!("expected InvalidChar, got {other:?}"),
}
}
#[test]
fn entity_id_rejects_unicode() {
// ADR-127 §Q1 — P1 is strict ASCII. Unicode acceptance deferred.
assert!(EntityId::parse("light.küche").is_err());
}
#[test]
fn state_next_preserves_last_changed_when_state_unchanged() {
let id = EntityId::parse("sensor.temp").unwrap();
let s1 = State::new(id.clone(), "20.0", serde_json::json!({}), Context::default());
std::thread::sleep(std::time::Duration::from_millis(2));
let s2 = s1.next("20.0", serde_json::json!({"updated": true}), Context::default());
assert_eq!(s1.last_changed, s2.last_changed);
assert!(s2.last_updated > s1.last_updated);
}
#[test]
fn state_next_bumps_last_changed_when_state_changes() {
let id = EntityId::parse("sensor.temp").unwrap();
let s1 = State::new(id, "20.0", serde_json::json!({}), Context::default());
std::thread::sleep(std::time::Duration::from_millis(2));
let s2 = s1.next("21.0", serde_json::json!({}), Context::default());
assert!(s2.last_changed > s1.last_changed);
}
}

View File

@ -0,0 +1,163 @@
//! Typed system events + untyped domain events + Context.
//!
//! Mirrors `homeassistant.core.EventBus` + `homeassistant.const.EVENT_*`
//! constants. ADR-127 §2.2 splits HA's single dict-typed event channel
//! into two: a typed system channel (zero-allocation read path) and a
//! json-blob domain channel (for arbitrary integration-fired events).
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::entity::{EntityId, State};
/// Well-known HA event-type string constants.
///
/// Mirrors `homeassistant/const.py` `EVENT_*` constants. Used by
/// integrations that fire untyped [`DomainEvent`]s.
#[non_exhaustive]
pub struct EventType;
impl EventType {
pub const STATE_CHANGED: &'static str = "state_changed";
pub const SERVICE_REGISTERED: &'static str = "service_registered";
pub const SERVICE_REMOVED: &'static str = "service_removed";
pub const CALL_SERVICE: &'static str = "call_service";
pub const COMPONENT_LOADED: &'static str = "component_loaded";
pub const PLATFORM_DISCOVERED: &'static str = "platform_discovered";
pub const HOMEASSISTANT_START: &'static str = "homeassistant_start";
pub const HOMEASSISTANT_STARTED: &'static str = "homeassistant_started";
pub const HOMEASSISTANT_STOP: &'static str = "homeassistant_stop";
pub const HOMEASSISTANT_FINAL_WRITE: &'static str = "homeassistant_final_write";
pub const HOMEASSISTANT_CLOSE: &'static str = "homeassistant_close";
}
/// Causality context for a state change or service call.
///
/// Mirrors `homeassistant.core.Context`. Used by automations to detect
/// loops ("don't re-fire on a state change my own automation caused")
/// and by the recorder (ADR-132) to attribute changes to users.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct Context {
pub id: Uuid,
pub user_id: Option<String>,
pub parent_id: Option<Uuid>,
}
impl Context {
pub fn new() -> Self {
Self::default()
}
pub fn with_user(user_id: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
user_id: Some(user_id.into()),
parent_id: None,
}
}
pub fn child_of(parent: &Context) -> Self {
Self {
id: Uuid::new_v4(),
user_id: parent.user_id.clone(),
parent_id: Some(parent.id),
}
}
}
impl Default for Context {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
user_id: None,
parent_id: None,
}
}
}
/// Typed enum of system events. Subscribers that only care about a
/// specific shape (the recorder, the websocket subscriber) can match on
/// the variant without going through `serde_json::Value`.
#[derive(Clone, Debug)]
pub enum SystemEvent {
StateChanged(StateChangedEvent),
ServiceRegistered { domain: String, service: String },
ServiceRemoved { domain: String, service: String },
ComponentLoaded { component: String },
HomeCoreStart,
HomeCoreStarted,
HomeCoreStop,
}
/// State-change event payload. Carries the old and new snapshots so a
/// subscriber doesn't need to read the state machine again to learn
/// what changed.
///
/// Mirrors HA's event_data `{ entity_id, old_state, new_state }`.
#[derive(Clone, Debug)]
pub struct StateChangedEvent {
pub entity_id: EntityId,
pub old_state: Option<Arc<State>>,
pub new_state: Option<Arc<State>>,
pub fired_at: DateTime<Utc>,
}
/// Untyped event fired by integrations. Mirrors HA's
/// `EventBus.async_fire(event_type, event_data)`.
#[derive(Clone, Debug)]
pub struct DomainEvent {
pub event_type: String,
pub event_data: serde_json::Value,
pub origin: EventOrigin,
pub context: Context,
pub fired_at: DateTime<Utc>,
}
/// Where an event originated. Mirrors HA's `EventOrigin` enum (`local`
/// vs `remote`).
#[derive(Clone, Debug, Copy, Eq, PartialEq, Serialize, Deserialize)]
pub enum EventOrigin {
Local,
Remote,
}
impl DomainEvent {
pub fn new(
event_type: impl Into<String>,
event_data: serde_json::Value,
context: Context,
) -> Self {
Self {
event_type: event_type.into(),
event_data,
origin: EventOrigin::Local,
context,
fired_at: Utc::now(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn context_child_inherits_user_id() {
let parent = Context::with_user("alice");
let child = Context::child_of(&parent);
assert_eq!(child.user_id.as_deref(), Some("alice"));
assert_eq!(child.parent_id, Some(parent.id));
assert_ne!(child.id, parent.id);
}
#[test]
fn event_type_constants_match_ha_names() {
// These string values are wire-format with HA — must match
// exactly so ADR-130 can serve a wire-compat WebSocket API.
assert_eq!(EventType::STATE_CHANGED, "state_changed");
assert_eq!(EventType::HOMEASSISTANT_START, "homeassistant_start");
}
}

View File

@ -0,0 +1,75 @@
//! `HomeCore` runtime coordinator. Mirrors `homeassistant.core.HomeAssistant`.
//!
//! Cheap to clone — all internals are `Arc`-shared so tasks can each
//! hold their own `HomeCore` handle without coordination overhead.
use std::sync::Arc;
use crate::bus::EventBus;
use crate::registry::EntityRegistry;
use crate::service::ServiceRegistry;
use crate::state::StateMachine;
#[derive(Clone)]
pub struct HomeCore {
inner: Arc<HomeCoreInner>,
}
struct HomeCoreInner {
pub bus: EventBus,
pub states: StateMachine,
pub services: ServiceRegistry,
pub entities: EntityRegistry,
}
impl HomeCore {
pub fn new() -> Self {
Self {
inner: Arc::new(HomeCoreInner {
bus: EventBus::new(),
states: StateMachine::new(),
services: ServiceRegistry::new(),
entities: EntityRegistry::new(),
}),
}
}
pub fn bus(&self) -> &EventBus {
&self.inner.bus
}
pub fn states(&self) -> &StateMachine {
&self.inner.states
}
pub fn services(&self) -> &ServiceRegistry {
&self.inner.services
}
pub fn entities(&self) -> &EntityRegistry {
&self.inner.entities
}
}
impl Default for HomeCore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::entity::EntityId;
use crate::event::Context;
#[tokio::test]
async fn end_to_end_set_then_get() {
let hc = HomeCore::new();
let id = EntityId::parse("light.kitchen").unwrap();
hc.states().set(id.clone(), "on", serde_json::json!({"brightness": 200}), Context::new());
let snap = hc.states().get(&id).unwrap();
assert_eq!(snap.state, "on");
assert_eq!(snap.attributes["brightness"], 200);
}
}

View File

@ -0,0 +1,59 @@
//! HOMECORE — Rust port of `homeassistant/core.py`.
//!
//! Implements [ADR-127](../../docs/adr/ADR-127-homecore-state-machine-rust.md):
//! the state machine, event bus, service registry, and entity registry that
//! every other HOMECORE module depends on.
//!
//! ## Layout (P1 scaffold)
//!
//! - [`entity`] — `EntityId` newtype + validation; `State` snapshot type
//! - [`event`] — typed `SystemEvent` + untyped `DomainEvent` + `Context`
//! - [`state`] — `StateMachine`: DashMap-backed concurrent state store
//! - [`bus`] — `EventBus`: tokio broadcast wiring for system + domain events
//! - [`service`] — `ServiceRegistry` (stub; full mpsc dispatch lands in P2)
//! - [`registry`] — `EntityRegistry` (in-memory P1; persistence lands in P2)
//! - [`homecore`] — `HomeCore` runtime coordinator: holds bus + states + services
//!
//! ## Threading model
//!
//! HOMECORE is multi-threaded — concurrent reads from any number of tasks
//! return zero-copy `Arc<State>` clones. Writes are serialised per-entity
//! by the DashMap shard lock but the global state machine itself is never
//! locked. See ADR-127 §2.1.
//!
//! ## What's NOT here yet (deferred to P2+)
//!
//! - Persistence of entity registry to `.homecore/storage/core.entity_registry`
//! - Schema validation (`schemas` module from §3 stub)
//! - Service handler mpsc dispatch (`service::ServiceRegistry::call`)
//! - Device registry (mirror of HA's `core.device_registry`)
//! - Witness chain integration (ADR-028)
//!
//! Each is marked `// TODO P2:` at the relevant call site.
pub mod entity;
pub mod event;
pub mod state;
pub mod bus;
pub mod service;
pub mod registry;
mod homecore;
pub use homecore::HomeCore;
pub use entity::{EntityId, EntityIdError, State};
pub use event::{Context, DomainEvent, EventType, StateChangedEvent, SystemEvent};
pub use state::StateMachine;
pub use bus::EventBus;
pub use service::{ServiceCall, ServiceError, ServiceName, ServiceRegistry};
pub use registry::{EntityCategory, EntityEntry, EntityRegistry};
/// HOMECORE protocol/data-model version. Bumped when the public surface
/// or on-disk persistence schema changes in a backwards-incompatible way.
/// Mirrors HA's `core.entity_registry` schema version (currently 13).
pub const HOMECORE_VERSION: u32 = 1;
/// Compile-time identifier for the HOMECORE build. Wired in by `vergen`
/// or git SHA in a later phase; constant for now.
pub const HOMECORE_BUILD_TAG: &str = env!("CARGO_PKG_VERSION");

View File

@ -0,0 +1,130 @@
//! In-memory entity registry (P1). Persistence to
//! `.homecore/storage/core.entity_registry` lands in P2.
//!
//! Schema fields mirror HA `core.entity_registry` v13 per ADR-127 §2.4.
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::entity::EntityId;
/// Entity category enum. Mirrors HA `homeassistant.helpers.entity.EntityCategory`.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum EntityCategory {
Config,
Diagnostic,
}
/// Source that disabled an entity. Mirrors HA `disabled_by` enum.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DisabledBy {
User,
Integration,
ConfigEntry,
Device,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EntityEntry {
pub entity_id: EntityId,
pub unique_id: Option<String>,
pub platform: String,
/// User-set display name. None means "use the entity's default name".
pub name: Option<String>,
pub disabled_by: Option<DisabledBy>,
pub area_id: Option<String>,
pub device_id: Option<String>,
pub entity_category: Option<EntityCategory>,
pub config_entry_id: Option<String>,
}
#[derive(Clone)]
pub struct EntityRegistry {
entries: Arc<RwLock<HashMap<EntityId, EntityEntry>>>,
}
impl EntityRegistry {
pub fn new() -> Self {
Self {
entries: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register(&self, entry: EntityEntry) {
self.entries.write().await.insert(entry.entity_id.clone(), entry);
}
pub async fn get(&self, entity_id: &EntityId) -> Option<EntityEntry> {
self.entries.read().await.get(entity_id).cloned()
}
pub async fn remove(&self, entity_id: &EntityId) -> Option<EntityEntry> {
self.entries.write().await.remove(entity_id)
}
pub async fn all(&self) -> Vec<EntityEntry> {
self.entries.read().await.values().cloned().collect()
}
pub async fn len(&self) -> usize {
self.entries.read().await.len()
}
}
impl Default for EntityRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn register_and_read() {
let reg = EntityRegistry::new();
let id = EntityId::parse("light.kitchen").unwrap();
reg.register(EntityEntry {
entity_id: id.clone(),
unique_id: Some("hue_lamp_42".into()),
platform: "hue".into(),
name: Some("Kitchen lamp".into()),
disabled_by: None,
area_id: Some("kitchen".into()),
device_id: None,
entity_category: None,
config_entry_id: None,
})
.await;
let got = reg.get(&id).await.unwrap();
assert_eq!(got.platform, "hue");
assert_eq!(got.name.as_deref(), Some("Kitchen lamp"));
}
#[tokio::test]
async fn disabled_by_round_trips_via_serde() {
let entry = EntityEntry {
entity_id: EntityId::parse("sensor.x").unwrap(),
unique_id: None,
platform: "test".into(),
name: None,
disabled_by: Some(DisabledBy::Integration),
area_id: None,
device_id: None,
entity_category: Some(EntityCategory::Diagnostic),
config_entry_id: None,
};
let json = serde_json::to_string(&entry).unwrap();
// HA wire format uses snake_case for the disabled_by enum.
assert!(json.contains("\"disabled_by\":\"integration\""));
assert!(json.contains("\"entity_category\":\"diagnostic\""));
let back: EntityEntry = serde_json::from_str(&json).unwrap();
assert_eq!(back.disabled_by, Some(DisabledBy::Integration));
}
}

View File

@ -0,0 +1,170 @@
//! Service registry stub.
//!
//! Mirrors `homeassistant.core.ServiceRegistry`. P1 ships the public
//! surface + a simple direct-dispatch `call` so downstream ADRs can
//! depend on it; ADR-127 P2 replaces direct dispatch with the
//! mpsc-router pattern described in §2.3.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::RwLock;
use crate::event::Context;
/// Service name within a domain. e.g. `light.turn_on` → domain
/// `"light"`, service `"turn_on"`.
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct ServiceName {
pub domain: String,
pub service: String,
}
impl ServiceName {
pub fn new(domain: impl Into<String>, service: impl Into<String>) -> Self {
Self {
domain: domain.into(),
service: service.into(),
}
}
}
/// Inbound service-call payload. Mirrors HA's `service_data` dict
/// plus the originating `Context`.
#[derive(Clone, Debug)]
pub struct ServiceCall {
pub name: ServiceName,
pub data: serde_json::Value,
pub context: Context,
}
#[derive(Error, Debug)]
pub enum ServiceError {
#[error("service not registered: {domain}.{service}")]
NotRegistered { domain: String, service: String },
#[error("service handler returned error: {0}")]
HandlerFailed(String),
}
/// Handler trait. Integration code implements this and registers via
/// [`ServiceRegistry::register`]. P2 will add schema validation via
/// `serde` `Deserialize<'_>`.
#[async_trait]
pub trait ServiceHandler: Send + Sync + 'static {
async fn call(&self, call: ServiceCall) -> Result<serde_json::Value, ServiceError>;
}
/// Direct closure adapter so simple handlers don't need a struct.
pub struct FnHandler<F>(pub F);
#[async_trait]
impl<F, Fut> ServiceHandler for FnHandler<F>
where
F: Fn(ServiceCall) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<serde_json::Value, ServiceError>> + Send + 'static,
{
async fn call(&self, call: ServiceCall) -> Result<serde_json::Value, ServiceError> {
(self.0)(call).await
}
}
#[derive(Clone)]
pub struct ServiceRegistry {
handlers: Arc<RwLock<HashMap<ServiceName, Arc<dyn ServiceHandler>>>>,
}
impl ServiceRegistry {
pub fn new() -> Self {
Self {
handlers: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register<H: ServiceHandler>(&self, name: ServiceName, handler: H) {
self.handlers.write().await.insert(name, Arc::new(handler));
}
pub async fn remove(&self, name: &ServiceName) {
self.handlers.write().await.remove(name);
}
pub async fn has(&self, name: &ServiceName) -> bool {
self.handlers.read().await.contains_key(name)
}
/// Call a service. P1 direct dispatch; P2 routes through the
/// event bus per ADR-127 §2.3.
pub async fn call(&self, call: ServiceCall) -> Result<serde_json::Value, ServiceError> {
let handler = {
let guard = self.handlers.read().await;
guard.get(&call.name).cloned()
};
match handler {
Some(h) => h.call(call).await,
None => Err(ServiceError::NotRegistered {
domain: call.name.domain.clone(),
service: call.name.service.clone(),
}),
}
}
pub async fn registered_services(&self) -> Vec<ServiceName> {
self.handlers.read().await.keys().cloned().collect()
}
}
impl Default for ServiceRegistry {
fn default() -> Self {
Self::new()
}
}
// Suppress unused-import warning when no consumer of Pin/Box uses them yet
#[allow(dead_code)]
type _UnusedFutureType = Pin<Box<dyn Future<Output = ()> + Send>>;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn register_and_call_returns_handler_value() {
let reg = ServiceRegistry::new();
reg.register(
ServiceName::new("light", "turn_on"),
FnHandler(|call: ServiceCall| async move {
Ok(serde_json::json!({"called_with": call.data}))
}),
)
.await;
let resp = reg
.call(ServiceCall {
name: ServiceName::new("light", "turn_on"),
data: serde_json::json!({"brightness": 200}),
context: Context::new(),
})
.await
.unwrap();
assert_eq!(resp["called_with"]["brightness"], 200);
}
#[tokio::test]
async fn unregistered_service_returns_error() {
let reg = ServiceRegistry::new();
let err = reg
.call(ServiceCall {
name: ServiceName::new("light", "turn_on"),
data: serde_json::json!({}),
context: Context::new(),
})
.await
.unwrap_err();
assert!(matches!(err, ServiceError::NotRegistered { .. }));
}
}

View File

@ -0,0 +1,221 @@
//! Concurrent state machine — the heart of HOMECORE.
//!
//! Mirrors `homeassistant.core.StateMachine`. Differences from HA per
//! ADR-127 §2.1:
//!
//! - DashMap shard-locked instead of one asyncio.Lock for the whole map
//! - Writers atomically replace `Arc<State>` entries; readers get
//! zero-copy clones
//! - State changes fan out via a tokio broadcast channel (capacity
//! 4,096); slow subscribers get `Lagged` and must re-sync from the
//! current map
//!
//! ## NOT in P1 (deferred to P2+)
//!
//! - `async_set_internal` schema validation
//! - Bulk delete of an entire domain (`async_remove_domain`)
//! - Restore-state on startup from the recorder (ADR-132)
use std::sync::Arc;
use chrono::Utc;
use dashmap::DashMap;
use tokio::sync::broadcast;
use crate::entity::{EntityId, State};
use crate::event::{Context, StateChangedEvent};
/// Broadcast channel capacity for state-changed events. 4,096 events
/// at 20 Hz per entity covers ~3 minutes of backlog for a single hot
/// entity. Slow subscribers must re-sync from the current map.
pub const STATE_CHANGED_CHANNEL_CAPACITY: usize = 4096;
/// The state machine. Cheap to clone (one `Arc`) — pass copies to as
/// many tasks as you like.
#[derive(Clone)]
pub struct StateMachine {
inner: Arc<StateMachineInner>,
}
struct StateMachineInner {
states: DashMap<EntityId, Arc<State>>,
tx: broadcast::Sender<StateChangedEvent>,
}
impl StateMachine {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(STATE_CHANGED_CHANNEL_CAPACITY);
Self {
inner: Arc::new(StateMachineInner {
states: DashMap::with_capacity(256),
tx,
}),
}
}
/// Subscribe to state-changed events. Each subscriber gets an
/// independent receiver; capacity is shared. Falling behind by
/// 4,096 events yields `RecvError::Lagged(n)`.
pub fn subscribe(&self) -> broadcast::Receiver<StateChangedEvent> {
self.inner.tx.subscribe()
}
/// Read a state. Returns `None` if the entity is unknown.
/// Zero-copy: caller gets an `Arc<State>` clone.
pub fn get(&self, entity_id: &EntityId) -> Option<Arc<State>> {
self.inner.states.get(entity_id).map(|s| Arc::clone(&s))
}
/// Write a state. Fires a `state_changed` broadcast even on the
/// first write (old_state = None). HA semantics: only fires if the
/// state string OR attributes changed; pure no-op writes are
/// suppressed.
///
/// Returns the new state snapshot.
pub fn set(
&self,
entity_id: EntityId,
new_state: impl Into<String>,
attributes: serde_json::Value,
context: Context,
) -> Arc<State> {
let new_state_str = new_state.into();
let old = self.inner.states.get(&entity_id).map(|r| Arc::clone(&*r));
let next = match &old {
Some(prev) => Arc::new(prev.next(new_state_str.clone(), attributes.clone(), context)),
None => Arc::new(State::new(entity_id.clone(), new_state_str.clone(), attributes.clone(), context)),
};
// HA suppresses no-op writes (same state + same attributes).
// We follow the same rule to keep the broadcast channel quiet.
let is_noop = match &old {
Some(prev) => prev.state == new_state_str && prev.attributes == attributes,
None => false,
};
self.inner.states.insert(entity_id.clone(), Arc::clone(&next));
if !is_noop {
let event = StateChangedEvent {
entity_id,
old_state: old,
new_state: Some(Arc::clone(&next)),
fired_at: Utc::now(),
};
// err = no receivers; that's fine, write still committed.
let _ = self.inner.tx.send(event);
}
next
}
/// Remove a state. Fires `state_changed` with `new_state = None`.
pub fn remove(&self, entity_id: &EntityId) -> Option<Arc<State>> {
let removed = self.inner.states.remove(entity_id).map(|(_, s)| s);
if let Some(old) = &removed {
let event = StateChangedEvent {
entity_id: entity_id.clone(),
old_state: Some(Arc::clone(old)),
new_state: None,
fired_at: Utc::now(),
};
let _ = self.inner.tx.send(event);
}
removed
}
/// Snapshot all current states. Allocates a new Vec — useful for
/// the REST GET /api/states path (ADR-130).
pub fn all(&self) -> Vec<Arc<State>> {
self.inner.states.iter().map(|r| Arc::clone(r.value())).collect()
}
/// Snapshot all states whose entity_id matches a domain prefix.
/// Mirrors HA's `hass.states.async_all(domain)`.
pub fn all_by_domain(&self, domain: &str) -> Vec<Arc<State>> {
self.inner
.states
.iter()
.filter(|r| r.key().domain() == domain)
.map(|r| Arc::clone(r.value()))
.collect()
}
/// Number of entities currently tracked.
pub fn len(&self) -> usize {
self.inner.states.len()
}
pub fn is_empty(&self) -> bool {
self.inner.states.len() == 0
}
}
impl Default for StateMachine {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn id(s: &str) -> EntityId {
EntityId::parse(s).unwrap()
}
#[tokio::test]
async fn set_writes_and_fires() {
let sm = StateMachine::new();
let mut rx = sm.subscribe();
sm.set(id("light.kitchen"), "on", serde_json::json!({"brightness": 200}), Context::new());
let evt = rx.recv().await.unwrap();
assert_eq!(evt.entity_id.as_str(), "light.kitchen");
assert!(evt.old_state.is_none());
assert_eq!(evt.new_state.as_ref().unwrap().state, "on");
}
#[tokio::test]
async fn noop_writes_are_suppressed() {
let sm = StateMachine::new();
sm.set(id("light.k"), "on", serde_json::json!({}), Context::new());
let mut rx = sm.subscribe();
// Same state + same attributes → no event.
sm.set(id("light.k"), "on", serde_json::json!({}), Context::new());
let try_recv = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
assert!(try_recv.is_err(), "expected no event for no-op write");
}
#[tokio::test]
async fn attribute_only_change_fires_but_preserves_last_changed() {
let sm = StateMachine::new();
let s1 = sm.set(id("sensor.t"), "20", serde_json::json!({"unit": "C"}), Context::new());
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let s2 = sm.set(id("sensor.t"), "20", serde_json::json!({"unit": "F"}), Context::new());
assert_eq!(s1.last_changed, s2.last_changed);
assert!(s2.last_updated > s1.last_updated);
}
#[test]
fn all_by_domain_filters() {
let sm = StateMachine::new();
sm.set(id("light.a"), "on", serde_json::json!({}), Context::new());
sm.set(id("light.b"), "off", serde_json::json!({}), Context::new());
sm.set(id("sensor.t"), "20", serde_json::json!({}), Context::new());
assert_eq!(sm.all_by_domain("light").len(), 2);
assert_eq!(sm.all_by_domain("sensor").len(), 1);
assert_eq!(sm.all().len(), 3);
}
#[tokio::test]
async fn remove_fires_with_no_new_state() {
let sm = StateMachine::new();
sm.set(id("light.k"), "on", serde_json::json!({}), Context::new());
let mut rx = sm.subscribe();
sm.remove(&id("light.k"));
let evt = rx.recv().await.unwrap();
assert!(evt.new_state.is_none());
assert!(evt.old_state.is_some());
}
}