diff --git a/v2/Cargo.toml b/v2/Cargo.toml index c285f9f6..b6a11087 100644 --- a/v2/Cargo.toml +++ b/v2/Cargo.toml @@ -28,6 +28,7 @@ members = [ "crates/wifi-densepose-geo", "crates/nvsim", "crates/nvsim-server", + "crates/homecore", # ADR-127 — HOMECORE state machine # ADR-100/ADR-101: Cognitum Cog packaging — first Cog from this repo. # Ships the wifi-densepose pose-estimation model as a signed binary + # JSONL manifest installable by the Cognitum V0 appliance (cognitum-v0, diff --git a/v2/crates/homecore/Cargo.toml b/v2/crates/homecore/Cargo.toml new file mode 100644 index 00000000..2242e6f4 --- /dev/null +++ b/v2/crates/homecore/Cargo.toml @@ -0,0 +1,43 @@ +# HOMECORE — Rust state machine, event bus, service registry, entity registry. +# Implements ADR-127 (HOMECORE-CORE), the foundation of the HOMECORE Home +# Assistant port (ADR-126 master + ADR-128/129/130/131/132/133/134 sub-ADRs). +# +# P1 scaffold (this commit): public types + DashMap-backed state machine + +# Tokio broadcast event bus + minimal entity registry. Persistence and the +# full HA-compat serde schema land in P2. + +[package] +name = "homecore" +version = "0.1.0-alpha.0" +edition = "2021" +license = "MIT" +authors = ["rUv ", "HOMECORE Contributors"] +description = "Rust state machine + event bus + service registry — the foundation of the HOMECORE Home Assistant port (ADR-127)" +repository = "https://github.com/ruvnet/RuView" + +[lib] +name = "homecore" +path = "src/lib.rs" + +[dependencies] +# Core async runtime — matches the rest of the v2/ workspace (sensing-server etc.) +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } +# DashMap for the concurrent state store — ADR-127 §2.1. +dashmap = "6" +# Typed event channels + service handler boxing +futures = "0.3" +async-trait = "0.1" +# Time types matched to HA's UTC datetime usage +chrono = { version = "0.4", features = ["serde"] } +# Schema validation replacement for voluptuous (ADR-127 §3) +serde = { version = "1", features = ["derive"] } +serde_json = "1" +# Unique IDs (Context, ConfigEntryId, DeviceId) +uuid = { version = "1", features = ["v4", "serde"] } +# Error handling +thiserror = "1" +# Read-only static catalogs (event type names etc.) +once_cell = "1" + +[dev-dependencies] +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros", "test-util"] } diff --git a/v2/crates/homecore/src/bus.rs b/v2/crates/homecore/src/bus.rs new file mode 100644 index 00000000..661c3606 --- /dev/null +++ b/v2/crates/homecore/src/bus.rs @@ -0,0 +1,90 @@ +//! Event bus — typed system events + untyped domain events. +//! +//! ADR-127 §2.2: HA's single dict-typed event channel becomes two: +//! - typed `SystemEvent` channel for known shapes (recorder, automation) +//! - untyped `DomainEvent` channel for arbitrary integration events +//! +//! Capacity 4,096 on both. Lagged receivers must re-sync (recorder +//! re-reads current state; automation re-evaluates triggers). + +use std::sync::Arc; + +use tokio::sync::broadcast; + +use crate::event::{DomainEvent, SystemEvent}; + +pub const EVENT_CHANNEL_CAPACITY: usize = 4096; + +#[derive(Clone)] +pub struct EventBus { + inner: Arc, +} + +struct EventBusInner { + system_tx: broadcast::Sender, + domain_tx: broadcast::Sender, +} + +impl EventBus { + pub fn new() -> Self { + let (system_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + let (domain_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + Self { + inner: Arc::new(EventBusInner { system_tx, domain_tx }), + } + } + + pub fn subscribe_system(&self) -> broadcast::Receiver { + self.inner.system_tx.subscribe() + } + + pub fn subscribe_domain(&self) -> broadcast::Receiver { + self.inner.domain_tx.subscribe() + } + + /// Fire a typed system event. Returns the number of active + /// receivers (zero is fine). + pub fn fire_system(&self, event: SystemEvent) -> usize { + self.inner.system_tx.send(event).unwrap_or(0) + } + + /// Fire an untyped domain event. Mirrors `hass.bus.async_fire`. + pub fn fire_domain(&self, event: DomainEvent) -> usize { + self.inner.domain_tx.send(event).unwrap_or(0) + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::Context; + + #[tokio::test] + async fn fire_system_reaches_subscriber() { + let bus = EventBus::new(); + let mut rx = bus.subscribe_system(); + bus.fire_system(SystemEvent::HomeCoreStarted); + let event = rx.recv().await.unwrap(); + assert!(matches!(event, SystemEvent::HomeCoreStarted)); + } + + #[tokio::test] + async fn fire_domain_reaches_subscriber() { + let bus = EventBus::new(); + let mut rx = bus.subscribe_domain(); + bus.fire_domain(DomainEvent::new( + "ruview_csi_frame", + serde_json::json!({"frame_id": 42}), + Context::new(), + )); + let event = rx.recv().await.unwrap(); + assert_eq!(event.event_type, "ruview_csi_frame"); + assert_eq!(event.event_data["frame_id"], 42); + } +} diff --git a/v2/crates/homecore/src/entity.rs b/v2/crates/homecore/src/entity.rs new file mode 100644 index 00000000..e3308bb4 --- /dev/null +++ b/v2/crates/homecore/src/entity.rs @@ -0,0 +1,238 @@ +//! Entity ID newtype + immutable state snapshot type. +//! +//! Mirrors `homeassistant/core.py` `State` and the `entity_id` string +//! validation that every public HA call performs. +//! +//! ## EntityId validation (ADR-127 §2.1 + Q1) +//! +//! HA accepts unicode entity IDs since 2024.3. HOMECORE P1 accepts the +//! ASCII subset `[a-z0-9_]+\.[a-z0-9_]+` and rejects everything else +//! with a clear error. Unicode acceptance is deferred to P2 once the +//! Q1 strictness decision is made (see ADR-127 §8). + +use std::fmt; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use thiserror::Error; + +use crate::event::Context; + +/// Validated `domain.name` entity identifier. +/// +/// Construct via [`EntityId::parse`] or [`EntityId::new`]; both validate +/// against the format `[a-z0-9_]+\.[a-z0-9_]+`. Custom `Serialize` / +/// `Deserialize` round-trips as a plain JSON string (matching HA's wire +/// format) and re-validates on deserialize so invalid IDs from disk +/// fail at load time rather than at first use. +#[derive(Clone, Eq, PartialEq, Hash)] +pub struct EntityId(Arc); + +impl Serialize for EntityId { + fn serialize(&self, ser: S) -> Result { + ser.serialize_str(&self.0) + } +} + +impl<'de> Deserialize<'de> for EntityId { + fn deserialize>(de: D) -> Result { + let s = String::deserialize(de)?; + EntityId::parse(s).map_err(serde::de::Error::custom) + } +} + +impl EntityId { + /// Validates and constructs an `EntityId`. Returns + /// [`EntityIdError`] if the input is not `domain.name` shape with + /// ASCII lowercase / digits / underscore in each segment. + pub fn parse(s: impl Into) -> Result { + let s: String = s.into(); + let (domain, name) = s + .split_once('.') + .ok_or_else(|| EntityIdError::MissingDot(s.clone()))?; + if domain.is_empty() { + return Err(EntityIdError::EmptyDomain(s)); + } + if name.is_empty() { + return Err(EntityIdError::EmptyName(s)); + } + for ch in domain.chars().chain(name.chars()) { + if !(ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_') { + return Err(EntityIdError::InvalidChar { entity_id: s, ch }); + } + } + Ok(Self(Arc::from(s))) + } + + /// Same as [`Self::parse`] but takes a `&str` and returns + /// `Result<&'static EntityId, ...>` for constant entity IDs known + /// at compile time. Used by ADR-128 plugins to register fixed-name + /// services like `homeassistant.restart`. + pub fn new(s: &str) -> Result { + Self::parse(s.to_owned()) + } + + /// Returns the `domain` part (everything before the first `.`). + pub fn domain(&self) -> &str { + self.0.split_once('.').map(|(d, _)| d).unwrap_or(&self.0) + } + + /// Returns the `name` part (everything after the first `.`). + pub fn name(&self) -> &str { + self.0.split_once('.').map(|(_, n)| n).unwrap_or("") + } + + /// Underlying string view. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Debug for EntityId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "EntityId({})", self.0) + } +} + +impl fmt::Display for EntityId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +#[derive(Error, Debug, Clone, Eq, PartialEq)] +pub enum EntityIdError { + #[error("entity_id {0:?} is missing the required '.' between domain and name")] + MissingDot(String), + #[error("entity_id {0:?} has an empty domain segment")] + EmptyDomain(String), + #[error("entity_id {0:?} has an empty name segment")] + EmptyName(String), + #[error("entity_id {entity_id:?} contains invalid character {ch:?} — only [a-z0-9_] allowed (HA-compat ASCII subset; see ADR-127 §Q1)")] + InvalidChar { entity_id: String, ch: char }, +} + +/// Immutable state snapshot for one entity at one moment in time. +/// +/// Mirrors `homeassistant.core.State`. Reader-cloneable via `Arc`; +/// writers atomically replace the entry in the `DashMap` so observers +/// never see a partial mutation. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct State { + pub entity_id: EntityId, + pub state: String, + /// Attribute bag — accepts whatever JSON the integration emits. + /// Mirrors HA's `Dict[str, Any]` attribute model. + pub attributes: serde_json::Value, + /// When the `state` field last changed value. Only bumped if the + /// new state string differs from the old; attribute-only updates + /// preserve this timestamp. + pub last_changed: DateTime, + /// When this snapshot was written. Bumped on every `set` call, + /// including attribute-only updates. + pub last_updated: DateTime, + /// Causality context — links state changes to the user / automation + /// / service call that originated them. Mirrors HA's `Context`. + pub context: Context, +} + +impl State { + /// Construct a fresh state snapshot at `now`. + pub fn new( + entity_id: EntityId, + state: impl Into, + attributes: serde_json::Value, + context: Context, + ) -> Self { + let now = Utc::now(); + Self { + entity_id, + state: state.into(), + attributes, + last_changed: now, + last_updated: now, + context, + } + } + + /// Construct the next state snapshot. If the new `state` string + /// equals the prior `state`, `last_changed` is preserved. + pub fn next( + &self, + new_state: impl Into, + new_attributes: serde_json::Value, + context: Context, + ) -> Self { + let new_state = new_state.into(); + let now = Utc::now(); + let last_changed = if new_state == self.state { + self.last_changed + } else { + now + }; + Self { + entity_id: self.entity_id.clone(), + state: new_state, + attributes: new_attributes, + last_changed, + last_updated: now, + context, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn entity_id_parses_valid() { + let e = EntityId::parse("light.living_room").unwrap(); + assert_eq!(e.domain(), "light"); + assert_eq!(e.name(), "living_room"); + assert_eq!(e.as_str(), "light.living_room"); + } + + #[test] + fn entity_id_rejects_missing_dot() { + assert!(matches!( + EntityId::parse("light_living_room"), + Err(EntityIdError::MissingDot(_)) + )); + } + + #[test] + fn entity_id_rejects_uppercase() { + let err = EntityId::parse("light.LivingRoom").unwrap_err(); + match err { + EntityIdError::InvalidChar { ch, .. } => assert_eq!(ch, 'L'), + other => panic!("expected InvalidChar, got {other:?}"), + } + } + + #[test] + fn entity_id_rejects_unicode() { + // ADR-127 §Q1 — P1 is strict ASCII. Unicode acceptance deferred. + assert!(EntityId::parse("light.küche").is_err()); + } + + #[test] + fn state_next_preserves_last_changed_when_state_unchanged() { + let id = EntityId::parse("sensor.temp").unwrap(); + let s1 = State::new(id.clone(), "20.0", serde_json::json!({}), Context::default()); + std::thread::sleep(std::time::Duration::from_millis(2)); + let s2 = s1.next("20.0", serde_json::json!({"updated": true}), Context::default()); + assert_eq!(s1.last_changed, s2.last_changed); + assert!(s2.last_updated > s1.last_updated); + } + + #[test] + fn state_next_bumps_last_changed_when_state_changes() { + let id = EntityId::parse("sensor.temp").unwrap(); + let s1 = State::new(id, "20.0", serde_json::json!({}), Context::default()); + std::thread::sleep(std::time::Duration::from_millis(2)); + let s2 = s1.next("21.0", serde_json::json!({}), Context::default()); + assert!(s2.last_changed > s1.last_changed); + } +} diff --git a/v2/crates/homecore/src/event.rs b/v2/crates/homecore/src/event.rs new file mode 100644 index 00000000..7856a1be --- /dev/null +++ b/v2/crates/homecore/src/event.rs @@ -0,0 +1,163 @@ +//! Typed system events + untyped domain events + Context. +//! +//! Mirrors `homeassistant.core.EventBus` + `homeassistant.const.EVENT_*` +//! constants. ADR-127 §2.2 splits HA's single dict-typed event channel +//! into two: a typed system channel (zero-allocation read path) and a +//! json-blob domain channel (for arbitrary integration-fired events). + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::entity::{EntityId, State}; + +/// Well-known HA event-type string constants. +/// +/// Mirrors `homeassistant/const.py` `EVENT_*` constants. Used by +/// integrations that fire untyped [`DomainEvent`]s. +#[non_exhaustive] +pub struct EventType; + +impl EventType { + pub const STATE_CHANGED: &'static str = "state_changed"; + pub const SERVICE_REGISTERED: &'static str = "service_registered"; + pub const SERVICE_REMOVED: &'static str = "service_removed"; + pub const CALL_SERVICE: &'static str = "call_service"; + pub const COMPONENT_LOADED: &'static str = "component_loaded"; + pub const PLATFORM_DISCOVERED: &'static str = "platform_discovered"; + pub const HOMEASSISTANT_START: &'static str = "homeassistant_start"; + pub const HOMEASSISTANT_STARTED: &'static str = "homeassistant_started"; + pub const HOMEASSISTANT_STOP: &'static str = "homeassistant_stop"; + pub const HOMEASSISTANT_FINAL_WRITE: &'static str = "homeassistant_final_write"; + pub const HOMEASSISTANT_CLOSE: &'static str = "homeassistant_close"; +} + +/// Causality context for a state change or service call. +/// +/// Mirrors `homeassistant.core.Context`. Used by automations to detect +/// loops ("don't re-fire on a state change my own automation caused") +/// and by the recorder (ADR-132) to attribute changes to users. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct Context { + pub id: Uuid, + pub user_id: Option, + pub parent_id: Option, +} + +impl Context { + pub fn new() -> Self { + Self::default() + } + + pub fn with_user(user_id: impl Into) -> Self { + Self { + id: Uuid::new_v4(), + user_id: Some(user_id.into()), + parent_id: None, + } + } + + pub fn child_of(parent: &Context) -> Self { + Self { + id: Uuid::new_v4(), + user_id: parent.user_id.clone(), + parent_id: Some(parent.id), + } + } +} + +impl Default for Context { + fn default() -> Self { + Self { + id: Uuid::new_v4(), + user_id: None, + parent_id: None, + } + } +} + +/// Typed enum of system events. Subscribers that only care about a +/// specific shape (the recorder, the websocket subscriber) can match on +/// the variant without going through `serde_json::Value`. +#[derive(Clone, Debug)] +pub enum SystemEvent { + StateChanged(StateChangedEvent), + ServiceRegistered { domain: String, service: String }, + ServiceRemoved { domain: String, service: String }, + ComponentLoaded { component: String }, + HomeCoreStart, + HomeCoreStarted, + HomeCoreStop, +} + +/// State-change event payload. Carries the old and new snapshots so a +/// subscriber doesn't need to read the state machine again to learn +/// what changed. +/// +/// Mirrors HA's event_data `{ entity_id, old_state, new_state }`. +#[derive(Clone, Debug)] +pub struct StateChangedEvent { + pub entity_id: EntityId, + pub old_state: Option>, + pub new_state: Option>, + pub fired_at: DateTime, +} + +/// Untyped event fired by integrations. Mirrors HA's +/// `EventBus.async_fire(event_type, event_data)`. +#[derive(Clone, Debug)] +pub struct DomainEvent { + pub event_type: String, + pub event_data: serde_json::Value, + pub origin: EventOrigin, + pub context: Context, + pub fired_at: DateTime, +} + +/// Where an event originated. Mirrors HA's `EventOrigin` enum (`local` +/// vs `remote`). +#[derive(Clone, Debug, Copy, Eq, PartialEq, Serialize, Deserialize)] +pub enum EventOrigin { + Local, + Remote, +} + +impl DomainEvent { + pub fn new( + event_type: impl Into, + event_data: serde_json::Value, + context: Context, + ) -> Self { + Self { + event_type: event_type.into(), + event_data, + origin: EventOrigin::Local, + context, + fired_at: Utc::now(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn context_child_inherits_user_id() { + let parent = Context::with_user("alice"); + let child = Context::child_of(&parent); + assert_eq!(child.user_id.as_deref(), Some("alice")); + assert_eq!(child.parent_id, Some(parent.id)); + assert_ne!(child.id, parent.id); + } + + #[test] + fn event_type_constants_match_ha_names() { + // These string values are wire-format with HA — must match + // exactly so ADR-130 can serve a wire-compat WebSocket API. + assert_eq!(EventType::STATE_CHANGED, "state_changed"); + assert_eq!(EventType::HOMEASSISTANT_START, "homeassistant_start"); + } +} diff --git a/v2/crates/homecore/src/homecore.rs b/v2/crates/homecore/src/homecore.rs new file mode 100644 index 00000000..c71a8a03 --- /dev/null +++ b/v2/crates/homecore/src/homecore.rs @@ -0,0 +1,75 @@ +//! `HomeCore` runtime coordinator. Mirrors `homeassistant.core.HomeAssistant`. +//! +//! Cheap to clone — all internals are `Arc`-shared so tasks can each +//! hold their own `HomeCore` handle without coordination overhead. + +use std::sync::Arc; + +use crate::bus::EventBus; +use crate::registry::EntityRegistry; +use crate::service::ServiceRegistry; +use crate::state::StateMachine; + +#[derive(Clone)] +pub struct HomeCore { + inner: Arc, +} + +struct HomeCoreInner { + pub bus: EventBus, + pub states: StateMachine, + pub services: ServiceRegistry, + pub entities: EntityRegistry, +} + +impl HomeCore { + pub fn new() -> Self { + Self { + inner: Arc::new(HomeCoreInner { + bus: EventBus::new(), + states: StateMachine::new(), + services: ServiceRegistry::new(), + entities: EntityRegistry::new(), + }), + } + } + + pub fn bus(&self) -> &EventBus { + &self.inner.bus + } + + pub fn states(&self) -> &StateMachine { + &self.inner.states + } + + pub fn services(&self) -> &ServiceRegistry { + &self.inner.services + } + + pub fn entities(&self) -> &EntityRegistry { + &self.inner.entities + } +} + +impl Default for HomeCore { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::entity::EntityId; + use crate::event::Context; + + #[tokio::test] + async fn end_to_end_set_then_get() { + let hc = HomeCore::new(); + let id = EntityId::parse("light.kitchen").unwrap(); + hc.states().set(id.clone(), "on", serde_json::json!({"brightness": 200}), Context::new()); + let snap = hc.states().get(&id).unwrap(); + assert_eq!(snap.state, "on"); + assert_eq!(snap.attributes["brightness"], 200); + } +} diff --git a/v2/crates/homecore/src/lib.rs b/v2/crates/homecore/src/lib.rs new file mode 100644 index 00000000..ef0b7e39 --- /dev/null +++ b/v2/crates/homecore/src/lib.rs @@ -0,0 +1,59 @@ +//! HOMECORE — Rust port of `homeassistant/core.py`. +//! +//! Implements [ADR-127](../../docs/adr/ADR-127-homecore-state-machine-rust.md): +//! the state machine, event bus, service registry, and entity registry that +//! every other HOMECORE module depends on. +//! +//! ## Layout (P1 scaffold) +//! +//! - [`entity`] — `EntityId` newtype + validation; `State` snapshot type +//! - [`event`] — typed `SystemEvent` + untyped `DomainEvent` + `Context` +//! - [`state`] — `StateMachine`: DashMap-backed concurrent state store +//! - [`bus`] — `EventBus`: tokio broadcast wiring for system + domain events +//! - [`service`] — `ServiceRegistry` (stub; full mpsc dispatch lands in P2) +//! - [`registry`] — `EntityRegistry` (in-memory P1; persistence lands in P2) +//! - [`homecore`] — `HomeCore` runtime coordinator: holds bus + states + services +//! +//! ## Threading model +//! +//! HOMECORE is multi-threaded — concurrent reads from any number of tasks +//! return zero-copy `Arc` clones. Writes are serialised per-entity +//! by the DashMap shard lock but the global state machine itself is never +//! locked. See ADR-127 §2.1. +//! +//! ## What's NOT here yet (deferred to P2+) +//! +//! - Persistence of entity registry to `.homecore/storage/core.entity_registry` +//! - Schema validation (`schemas` module from §3 stub) +//! - Service handler mpsc dispatch (`service::ServiceRegistry::call`) +//! - Device registry (mirror of HA's `core.device_registry`) +//! - Witness chain integration (ADR-028) +//! +//! Each is marked `// TODO P2:` at the relevant call site. + +pub mod entity; +pub mod event; +pub mod state; +pub mod bus; +pub mod service; +pub mod registry; + +mod homecore; + +pub use homecore::HomeCore; + +pub use entity::{EntityId, EntityIdError, State}; +pub use event::{Context, DomainEvent, EventType, StateChangedEvent, SystemEvent}; +pub use state::StateMachine; +pub use bus::EventBus; +pub use service::{ServiceCall, ServiceError, ServiceName, ServiceRegistry}; +pub use registry::{EntityCategory, EntityEntry, EntityRegistry}; + +/// HOMECORE protocol/data-model version. Bumped when the public surface +/// or on-disk persistence schema changes in a backwards-incompatible way. +/// Mirrors HA's `core.entity_registry` schema version (currently 13). +pub const HOMECORE_VERSION: u32 = 1; + +/// Compile-time identifier for the HOMECORE build. Wired in by `vergen` +/// or git SHA in a later phase; constant for now. +pub const HOMECORE_BUILD_TAG: &str = env!("CARGO_PKG_VERSION"); diff --git a/v2/crates/homecore/src/registry.rs b/v2/crates/homecore/src/registry.rs new file mode 100644 index 00000000..2e637956 --- /dev/null +++ b/v2/crates/homecore/src/registry.rs @@ -0,0 +1,130 @@ +//! In-memory entity registry (P1). Persistence to +//! `.homecore/storage/core.entity_registry` lands in P2. +//! +//! Schema fields mirror HA `core.entity_registry` v13 per ADR-127 §2.4. + +use std::collections::HashMap; +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; + +use crate::entity::EntityId; + +/// Entity category enum. Mirrors HA `homeassistant.helpers.entity.EntityCategory`. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EntityCategory { + Config, + Diagnostic, +} + +/// Source that disabled an entity. Mirrors HA `disabled_by` enum. +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DisabledBy { + User, + Integration, + ConfigEntry, + Device, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EntityEntry { + pub entity_id: EntityId, + pub unique_id: Option, + pub platform: String, + /// User-set display name. None means "use the entity's default name". + pub name: Option, + pub disabled_by: Option, + pub area_id: Option, + pub device_id: Option, + pub entity_category: Option, + pub config_entry_id: Option, +} + +#[derive(Clone)] +pub struct EntityRegistry { + entries: Arc>>, +} + +impl EntityRegistry { + pub fn new() -> Self { + Self { + entries: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn register(&self, entry: EntityEntry) { + self.entries.write().await.insert(entry.entity_id.clone(), entry); + } + + pub async fn get(&self, entity_id: &EntityId) -> Option { + self.entries.read().await.get(entity_id).cloned() + } + + pub async fn remove(&self, entity_id: &EntityId) -> Option { + self.entries.write().await.remove(entity_id) + } + + pub async fn all(&self) -> Vec { + self.entries.read().await.values().cloned().collect() + } + + pub async fn len(&self) -> usize { + self.entries.read().await.len() + } +} + +impl Default for EntityRegistry { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn register_and_read() { + let reg = EntityRegistry::new(); + let id = EntityId::parse("light.kitchen").unwrap(); + reg.register(EntityEntry { + entity_id: id.clone(), + unique_id: Some("hue_lamp_42".into()), + platform: "hue".into(), + name: Some("Kitchen lamp".into()), + disabled_by: None, + area_id: Some("kitchen".into()), + device_id: None, + entity_category: None, + config_entry_id: None, + }) + .await; + let got = reg.get(&id).await.unwrap(); + assert_eq!(got.platform, "hue"); + assert_eq!(got.name.as_deref(), Some("Kitchen lamp")); + } + + #[tokio::test] + async fn disabled_by_round_trips_via_serde() { + let entry = EntityEntry { + entity_id: EntityId::parse("sensor.x").unwrap(), + unique_id: None, + platform: "test".into(), + name: None, + disabled_by: Some(DisabledBy::Integration), + area_id: None, + device_id: None, + entity_category: Some(EntityCategory::Diagnostic), + config_entry_id: None, + }; + let json = serde_json::to_string(&entry).unwrap(); + // HA wire format uses snake_case for the disabled_by enum. + assert!(json.contains("\"disabled_by\":\"integration\"")); + assert!(json.contains("\"entity_category\":\"diagnostic\"")); + let back: EntityEntry = serde_json::from_str(&json).unwrap(); + assert_eq!(back.disabled_by, Some(DisabledBy::Integration)); + } +} diff --git a/v2/crates/homecore/src/service.rs b/v2/crates/homecore/src/service.rs new file mode 100644 index 00000000..68db2d5c --- /dev/null +++ b/v2/crates/homecore/src/service.rs @@ -0,0 +1,170 @@ +//! Service registry stub. +//! +//! Mirrors `homeassistant.core.ServiceRegistry`. P1 ships the public +//! surface + a simple direct-dispatch `call` so downstream ADRs can +//! depend on it; ADR-127 P2 replaces direct dispatch with the +//! mpsc-router pattern described in §2.3. + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::sync::RwLock; + +use crate::event::Context; + +/// Service name within a domain. e.g. `light.turn_on` → domain +/// `"light"`, service `"turn_on"`. +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct ServiceName { + pub domain: String, + pub service: String, +} + +impl ServiceName { + pub fn new(domain: impl Into, service: impl Into) -> Self { + Self { + domain: domain.into(), + service: service.into(), + } + } +} + +/// Inbound service-call payload. Mirrors HA's `service_data` dict +/// plus the originating `Context`. +#[derive(Clone, Debug)] +pub struct ServiceCall { + pub name: ServiceName, + pub data: serde_json::Value, + pub context: Context, +} + +#[derive(Error, Debug)] +pub enum ServiceError { + #[error("service not registered: {domain}.{service}")] + NotRegistered { domain: String, service: String }, + #[error("service handler returned error: {0}")] + HandlerFailed(String), +} + +/// Handler trait. Integration code implements this and registers via +/// [`ServiceRegistry::register`]. P2 will add schema validation via +/// `serde` `Deserialize<'_>`. +#[async_trait] +pub trait ServiceHandler: Send + Sync + 'static { + async fn call(&self, call: ServiceCall) -> Result; +} + +/// Direct closure adapter so simple handlers don't need a struct. +pub struct FnHandler(pub F); + +#[async_trait] +impl ServiceHandler for FnHandler +where + F: Fn(ServiceCall) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, +{ + async fn call(&self, call: ServiceCall) -> Result { + (self.0)(call).await + } +} + +#[derive(Clone)] +pub struct ServiceRegistry { + handlers: Arc>>>, +} + +impl ServiceRegistry { + pub fn new() -> Self { + Self { + handlers: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn register(&self, name: ServiceName, handler: H) { + self.handlers.write().await.insert(name, Arc::new(handler)); + } + + pub async fn remove(&self, name: &ServiceName) { + self.handlers.write().await.remove(name); + } + + pub async fn has(&self, name: &ServiceName) -> bool { + self.handlers.read().await.contains_key(name) + } + + /// Call a service. P1 direct dispatch; P2 routes through the + /// event bus per ADR-127 §2.3. + pub async fn call(&self, call: ServiceCall) -> Result { + let handler = { + let guard = self.handlers.read().await; + guard.get(&call.name).cloned() + }; + match handler { + Some(h) => h.call(call).await, + None => Err(ServiceError::NotRegistered { + domain: call.name.domain.clone(), + service: call.name.service.clone(), + }), + } + } + + pub async fn registered_services(&self) -> Vec { + self.handlers.read().await.keys().cloned().collect() + } +} + +impl Default for ServiceRegistry { + fn default() -> Self { + Self::new() + } +} + +// Suppress unused-import warning when no consumer of Pin/Box uses them yet +#[allow(dead_code)] +type _UnusedFutureType = Pin + Send>>; + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn register_and_call_returns_handler_value() { + let reg = ServiceRegistry::new(); + reg.register( + ServiceName::new("light", "turn_on"), + FnHandler(|call: ServiceCall| async move { + Ok(serde_json::json!({"called_with": call.data})) + }), + ) + .await; + + let resp = reg + .call(ServiceCall { + name: ServiceName::new("light", "turn_on"), + data: serde_json::json!({"brightness": 200}), + context: Context::new(), + }) + .await + .unwrap(); + assert_eq!(resp["called_with"]["brightness"], 200); + } + + #[tokio::test] + async fn unregistered_service_returns_error() { + let reg = ServiceRegistry::new(); + let err = reg + .call(ServiceCall { + name: ServiceName::new("light", "turn_on"), + data: serde_json::json!({}), + context: Context::new(), + }) + .await + .unwrap_err(); + assert!(matches!(err, ServiceError::NotRegistered { .. })); + } +} diff --git a/v2/crates/homecore/src/state.rs b/v2/crates/homecore/src/state.rs new file mode 100644 index 00000000..5563af86 --- /dev/null +++ b/v2/crates/homecore/src/state.rs @@ -0,0 +1,221 @@ +//! Concurrent state machine — the heart of HOMECORE. +//! +//! Mirrors `homeassistant.core.StateMachine`. Differences from HA per +//! ADR-127 §2.1: +//! +//! - DashMap shard-locked instead of one asyncio.Lock for the whole map +//! - Writers atomically replace `Arc` entries; readers get +//! zero-copy clones +//! - State changes fan out via a tokio broadcast channel (capacity +//! 4,096); slow subscribers get `Lagged` and must re-sync from the +//! current map +//! +//! ## NOT in P1 (deferred to P2+) +//! +//! - `async_set_internal` schema validation +//! - Bulk delete of an entire domain (`async_remove_domain`) +//! - Restore-state on startup from the recorder (ADR-132) + +use std::sync::Arc; + +use chrono::Utc; +use dashmap::DashMap; +use tokio::sync::broadcast; + +use crate::entity::{EntityId, State}; +use crate::event::{Context, StateChangedEvent}; + +/// Broadcast channel capacity for state-changed events. 4,096 events +/// at 20 Hz per entity covers ~3 minutes of backlog for a single hot +/// entity. Slow subscribers must re-sync from the current map. +pub const STATE_CHANGED_CHANNEL_CAPACITY: usize = 4096; + +/// The state machine. Cheap to clone (one `Arc`) — pass copies to as +/// many tasks as you like. +#[derive(Clone)] +pub struct StateMachine { + inner: Arc, +} + +struct StateMachineInner { + states: DashMap>, + tx: broadcast::Sender, +} + +impl StateMachine { + pub fn new() -> Self { + let (tx, _) = broadcast::channel(STATE_CHANGED_CHANNEL_CAPACITY); + Self { + inner: Arc::new(StateMachineInner { + states: DashMap::with_capacity(256), + tx, + }), + } + } + + /// Subscribe to state-changed events. Each subscriber gets an + /// independent receiver; capacity is shared. Falling behind by + /// 4,096 events yields `RecvError::Lagged(n)`. + pub fn subscribe(&self) -> broadcast::Receiver { + self.inner.tx.subscribe() + } + + /// Read a state. Returns `None` if the entity is unknown. + /// Zero-copy: caller gets an `Arc` clone. + pub fn get(&self, entity_id: &EntityId) -> Option> { + self.inner.states.get(entity_id).map(|s| Arc::clone(&s)) + } + + /// Write a state. Fires a `state_changed` broadcast even on the + /// first write (old_state = None). HA semantics: only fires if the + /// state string OR attributes changed; pure no-op writes are + /// suppressed. + /// + /// Returns the new state snapshot. + pub fn set( + &self, + entity_id: EntityId, + new_state: impl Into, + attributes: serde_json::Value, + context: Context, + ) -> Arc { + let new_state_str = new_state.into(); + let old = self.inner.states.get(&entity_id).map(|r| Arc::clone(&*r)); + + let next = match &old { + Some(prev) => Arc::new(prev.next(new_state_str.clone(), attributes.clone(), context)), + None => Arc::new(State::new(entity_id.clone(), new_state_str.clone(), attributes.clone(), context)), + }; + + // HA suppresses no-op writes (same state + same attributes). + // We follow the same rule to keep the broadcast channel quiet. + let is_noop = match &old { + Some(prev) => prev.state == new_state_str && prev.attributes == attributes, + None => false, + }; + + self.inner.states.insert(entity_id.clone(), Arc::clone(&next)); + + if !is_noop { + let event = StateChangedEvent { + entity_id, + old_state: old, + new_state: Some(Arc::clone(&next)), + fired_at: Utc::now(), + }; + // err = no receivers; that's fine, write still committed. + let _ = self.inner.tx.send(event); + } + next + } + + /// Remove a state. Fires `state_changed` with `new_state = None`. + pub fn remove(&self, entity_id: &EntityId) -> Option> { + let removed = self.inner.states.remove(entity_id).map(|(_, s)| s); + if let Some(old) = &removed { + let event = StateChangedEvent { + entity_id: entity_id.clone(), + old_state: Some(Arc::clone(old)), + new_state: None, + fired_at: Utc::now(), + }; + let _ = self.inner.tx.send(event); + } + removed + } + + /// Snapshot all current states. Allocates a new Vec — useful for + /// the REST GET /api/states path (ADR-130). + pub fn all(&self) -> Vec> { + self.inner.states.iter().map(|r| Arc::clone(r.value())).collect() + } + + /// Snapshot all states whose entity_id matches a domain prefix. + /// Mirrors HA's `hass.states.async_all(domain)`. + pub fn all_by_domain(&self, domain: &str) -> Vec> { + self.inner + .states + .iter() + .filter(|r| r.key().domain() == domain) + .map(|r| Arc::clone(r.value())) + .collect() + } + + /// Number of entities currently tracked. + pub fn len(&self) -> usize { + self.inner.states.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.states.len() == 0 + } +} + +impl Default for StateMachine { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn id(s: &str) -> EntityId { + EntityId::parse(s).unwrap() + } + + #[tokio::test] + async fn set_writes_and_fires() { + let sm = StateMachine::new(); + let mut rx = sm.subscribe(); + sm.set(id("light.kitchen"), "on", serde_json::json!({"brightness": 200}), Context::new()); + let evt = rx.recv().await.unwrap(); + assert_eq!(evt.entity_id.as_str(), "light.kitchen"); + assert!(evt.old_state.is_none()); + assert_eq!(evt.new_state.as_ref().unwrap().state, "on"); + } + + #[tokio::test] + async fn noop_writes_are_suppressed() { + let sm = StateMachine::new(); + sm.set(id("light.k"), "on", serde_json::json!({}), Context::new()); + let mut rx = sm.subscribe(); + // Same state + same attributes → no event. + sm.set(id("light.k"), "on", serde_json::json!({}), Context::new()); + let try_recv = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await; + assert!(try_recv.is_err(), "expected no event for no-op write"); + } + + #[tokio::test] + async fn attribute_only_change_fires_but_preserves_last_changed() { + let sm = StateMachine::new(); + let s1 = sm.set(id("sensor.t"), "20", serde_json::json!({"unit": "C"}), Context::new()); + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + let s2 = sm.set(id("sensor.t"), "20", serde_json::json!({"unit": "F"}), Context::new()); + assert_eq!(s1.last_changed, s2.last_changed); + assert!(s2.last_updated > s1.last_updated); + } + + #[test] + fn all_by_domain_filters() { + let sm = StateMachine::new(); + sm.set(id("light.a"), "on", serde_json::json!({}), Context::new()); + sm.set(id("light.b"), "off", serde_json::json!({}), Context::new()); + sm.set(id("sensor.t"), "20", serde_json::json!({}), Context::new()); + assert_eq!(sm.all_by_domain("light").len(), 2); + assert_eq!(sm.all_by_domain("sensor").len(), 1); + assert_eq!(sm.all().len(), 3); + } + + #[tokio::test] + async fn remove_fires_with_no_new_state() { + let sm = StateMachine::new(); + sm.set(id("light.k"), "on", serde_json::json!({}), Context::new()); + let mut rx = sm.subscribe(); + sm.remove(&id("light.k")); + let evt = rx.recv().await.unwrap(); + assert!(evt.new_state.is_none()); + assert!(evt.old_state.is_some()); + } +}