diff --git a/v2/Cargo.lock b/v2/Cargo.lock index 7b825927..bc892cd2 100644 --- a/v2/Cargo.lock +++ b/v2/Cargo.lock @@ -3487,6 +3487,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "homecore-migrate" +version = "0.1.0-alpha.0" +dependencies = [ + "anyhow", + "clap", + "homecore", + "serde", + "serde_json", + "serde_yaml", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "homecore-plugins" version = "0.1.0-alpha.0" diff --git a/v2/Cargo.toml b/v2/Cargo.toml index 23369461..f05d50b2 100644 --- a/v2/Cargo.toml +++ b/v2/Cargo.toml @@ -33,6 +33,7 @@ members = [ "crates/homecore-api", # ADR-130 — HOMECORE REST + WS API "crates/homecore-automation", # ADR-129 — HOMECORE automation engine "crates/homecore-recorder", # ADR-132 — HOMECORE state recorder + "crates/homecore-migrate", # ADR-134 — HOMECORE migration from Python HA # ADR-100/ADR-101: Cognitum Cog packaging — first Cog from this repo. # Ships the wifi-densepose pose-estimation model as a signed binary + # JSONL manifest installable by the Cognitum V0 appliance (cognitum-v0, diff --git a/v2/crates/homecore-recorder/Cargo.toml b/v2/crates/homecore-recorder/Cargo.toml new file mode 100644 index 00000000..06d0bf7a --- /dev/null +++ b/v2/crates/homecore-recorder/Cargo.toml @@ -0,0 +1,57 @@ +# homecore-recorder — SQLite state history + semantic search (ADR-132) +# +# P1 ships: SQLite structural persistence with HA-compat schema. +# P2 (feature-gated): ruvector-backed SemanticIndex for embedding search. +# +# Build: cargo build -p homecore-recorder --no-default-features +# Test: cargo test -p homecore-recorder --no-default-features + +[package] +name = "homecore-recorder" +version = "0.1.0-alpha.0" +edition = "2021" +license = "MIT" +authors = ["rUv ", "HOMECORE Contributors"] +description = "SQLite state-history recorder for HOMECORE — HA-compat schema + ruvector semantic search (ADR-132)" +repository = "https://github.com/ruvnet/RuView" + +[lib] +name = "homecore_recorder" +path = "src/lib.rs" + +[features] +default = [] +ruvector = [] + +[dependencies] +homecore = { path = "../homecore", version = "0.1.0-alpha.0" } + +# Async runtime +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } + +# SQLite via sqlx — only the lite feature set; no postgres, no tls +sqlx = { version = "0.7", default-features = false, features = [ + "runtime-tokio-native-tls", + "sqlite", + "chrono", + "uuid", +] } + +# Serialisation +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +# Time +chrono = { version = "0.4", features = ["serde"] } + +# Error handling +thiserror = "1" + +# Structured logging +tracing = "0.1" + +# Trait objects for SemanticIndex +async-trait = "0.1" + +[dev-dependencies] +tokio = { version = "1", features = ["full", "test-util"] } diff --git a/v2/crates/homecore-recorder/src/db.rs b/v2/crates/homecore-recorder/src/db.rs new file mode 100644 index 00000000..db576e68 --- /dev/null +++ b/v2/crates/homecore-recorder/src/db.rs @@ -0,0 +1,460 @@ +//! `Recorder` — SQLite write path + query path. +//! +//! Wraps an `SqlitePool` and exposes three operations: +//! - [`Recorder::open`] — open (or create) the DB and apply schema. +//! - [`Recorder::record_state`] — persist a `StateChangedEvent`. +//! - [`Recorder::record_event`] — persist a `DomainEvent`. +//! - [`Recorder::get_state_history`] — read back rows in time order. +//! +//! State attributes are deduped via `fnv64a_hash` (see [`crate::dedup`]): +//! if an identical attributes blob was previously written its +//! `attributes_id` is reused and no new row is inserted. + +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions}; +use thiserror::Error; +use tracing::debug; + +use homecore::entity::{EntityId, State}; +use homecore::event::{DomainEvent, StateChangedEvent}; + +use crate::dedup::fnv64a_hash; +use crate::schema::ALL_DDL; + +/// Errors returned by `Recorder` operations. +#[derive(Error, Debug)] +pub enum RecorderError { + #[error("SQLite error: {0}")] + Sqlx(#[from] sqlx::Error), + + #[error("serialisation error: {0}")] + Json(#[from] serde_json::Error), + + #[error("URL parse error: {0}")] + UrlParse(String), +} + +/// Trait for pluggable semantic (vector) indexing of state writes. +/// +/// The no-op [`NullSemanticIndex`] is used in P1. P2 ships a ruvector-backed +/// implementation behind the `ruvector` feature flag. +#[async_trait] +pub trait SemanticIndex: Send + Sync { + /// Index a new state write. Called after the SQLite insert succeeds. + /// Implementations must be infallible from the caller's perspective: + /// if the index is unavailable the recorder keeps running. + async fn index_state(&self, state: &Arc) -> Result<(), Box>; +} + +/// No-op `SemanticIndex`. Used by default when the `ruvector` feature is off. +pub struct NullSemanticIndex; + +#[async_trait] +impl SemanticIndex for NullSemanticIndex { + async fn index_state(&self, _state: &Arc) -> Result<(), Box> { + Ok(()) + } +} + +/// The recorder. Cheap to clone (Arc-backed pool). Pass copies to the +/// `RecorderListener` and the API history handler. +#[derive(Clone)] +pub struct Recorder { + pool: SqlitePool, + semantic: Arc, +} + +impl Recorder { + /// Open (or create) the SQLite database at `path` and apply the schema. + /// + /// Pass `"sqlite::memory:"` for an in-memory database (tests). + /// + /// The schema DDL uses `CREATE TABLE IF NOT EXISTS` so calling this on an + /// existing database is safe. + pub async fn open(path: &str) -> Result { + Self::open_with_index(path, Arc::new(NullSemanticIndex)).await + } + + /// Open with a custom `SemanticIndex` (P2 entry point). + pub async fn open_with_index( + path: &str, + semantic: Arc, + ) -> Result { + let options = path + .parse::() + .map_err(|e| RecorderError::UrlParse(e.to_string()))? + .create_if_missing(true); + + let pool = SqlitePoolOptions::new() + .max_connections(4) + .connect_with(options) + .await?; + + let recorder = Self { pool, semantic }; + recorder.apply_schema().await?; + Ok(recorder) + } + + /// Apply all DDL statements. Idempotent. + async fn apply_schema(&self) -> Result<(), RecorderError> { + for ddl in ALL_DDL { + // Each DDL block may contain multiple statements separated by `;`. + // sqlx::query does not support multi-statement strings directly, + // so we split on the statement boundary and execute individually. + for stmt in split_statements(ddl) { + let stmt = stmt.trim(); + if !stmt.is_empty() { + sqlx::query(stmt).execute(&self.pool).await?; + } + } + } + Ok(()) + } + + /// Persist a `StateChangedEvent`. Inserts into `states` and dedupes into + /// `state_attributes`. Returns the `state_id` of the new row. + pub async fn record_state( + &self, + event: &StateChangedEvent, + ) -> Result, RecorderError> { + let new_state = match &event.new_state { + Some(s) => s, + None => return Ok(None), // removal event — no row to insert + }; + + let attrs_json = serde_json::to_string(&new_state.attributes)?; + let hash = fnv64a_hash(&attrs_json); + + // Upsert into state_attributes (dedup by hash). + let attributes_id: i64 = { + // Try to find an existing row first. + let existing: Option<(i64,)> = + sqlx::query_as("SELECT attributes_id FROM state_attributes WHERE hash = ?") + .bind(hash) + .fetch_optional(&self.pool) + .await?; + + if let Some((id,)) = existing { + debug!(hash, id, "reusing existing state_attributes row"); + id + } else { + let result = + sqlx::query("INSERT INTO state_attributes (shared_attrs, hash) VALUES (?, ?)") + .bind(&attrs_json) + .bind(hash) + .execute(&self.pool) + .await?; + result.last_insert_rowid() + } + }; + + let context_id = new_state.context.id.to_string(); + let last_changed_ts = new_state.last_changed.timestamp_micros() as f64 / 1_000_000.0; + let last_updated_ts = new_state.last_updated.timestamp_micros() as f64 / 1_000_000.0; + + let result = sqlx::query( + "INSERT INTO states \ + (entity_id, state, attributes_id, last_changed_ts, last_updated_ts, context_id) \ + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(new_state.entity_id.as_str()) + .bind(&new_state.state) + .bind(attributes_id) + .bind(last_changed_ts) + .bind(last_updated_ts) + .bind(&context_id) + .execute(&self.pool) + .await?; + + let state_id = result.last_insert_rowid(); + + // Best-effort semantic indexing — failure is logged, not propagated. + if let Err(e) = self.semantic.index_state(new_state).await { + tracing::warn!(error = %e, entity_id = %new_state.entity_id, "semantic indexing failed"); + } + + Ok(Some(state_id)) + } + + /// Persist a `DomainEvent`. Returns the `event_id`. + pub async fn record_event(&self, event: &DomainEvent) -> Result { + let data_json = serde_json::to_string(&event.event_data)?; + let time_fired_ts = event.fired_at.timestamp_micros() as f64 / 1_000_000.0; + let context_id = event.context.id.to_string(); + + let result = sqlx::query( + "INSERT INTO events (event_type, event_data, time_fired_ts, context_id) \ + VALUES (?, ?, ?, ?)", + ) + .bind(&event.event_type) + .bind(&data_json) + .bind(time_fired_ts) + .bind(&context_id) + .execute(&self.pool) + .await?; + + Ok(result.last_insert_rowid()) + } + + /// Query state history for `entity_id` between `since` and `until`. + /// Returns state snapshots in ascending `last_updated_ts` order. + pub async fn get_state_history( + &self, + entity_id: &EntityId, + since: DateTime, + until: DateTime, + ) -> Result, RecorderError> { + let since_ts = since.timestamp_micros() as f64 / 1_000_000.0; + let until_ts = until.timestamp_micros() as f64 / 1_000_000.0; + + let rows: Vec<(i64, String, Option, f64, f64, Option)> = sqlx::query_as( + "SELECT s.state_id, s.state, sa.shared_attrs, \ + s.last_changed_ts, s.last_updated_ts, s.context_id \ + FROM states s \ + LEFT JOIN state_attributes sa ON s.attributes_id = sa.attributes_id \ + WHERE s.entity_id = ? \ + AND s.last_updated_ts >= ? \ + AND s.last_updated_ts <= ? \ + ORDER BY s.last_updated_ts ASC", + ) + .bind(entity_id.as_str()) + .bind(since_ts) + .bind(until_ts) + .fetch_all(&self.pool) + .await?; + + rows.into_iter() + .map(|(state_id, state, shared_attrs, last_changed_ts, last_updated_ts, context_id)| { + let attributes = shared_attrs + .as_deref() + .map(serde_json::from_str) + .transpose()? + .unwrap_or(serde_json::Value::Object(Default::default())); + + Ok(StateRow { + state_id, + entity_id: entity_id.clone(), + state, + attributes, + last_changed_ts, + last_updated_ts, + context_id, + }) + }) + .collect() + } +} + +/// A state row returned from `get_state_history`. +#[derive(Debug, Clone)] +pub struct StateRow { + pub state_id: i64, + pub entity_id: EntityId, + pub state: String, + pub attributes: serde_json::Value, + /// Unix timestamp (seconds, fractional) when the state string last changed. + pub last_changed_ts: f64, + /// Unix timestamp (seconds, fractional) when this snapshot was written. + pub last_updated_ts: f64, + pub context_id: Option, +} + +/// Split a multi-statement DDL string on `;` boundaries. +/// Trims whitespace; skips empty fragments. +fn split_statements(ddl: &str) -> impl Iterator { + ddl.split(';').map(str::trim).filter(|s| !s.is_empty()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use chrono::Utc; + + use homecore::entity::{EntityId, State}; + use homecore::event::{Context, DomainEvent, StateChangedEvent}; + + use super::*; + + async fn open_memory() -> Recorder { + Recorder::open("sqlite::memory:").await.expect("open in-memory DB") + } + + fn entity(s: &str) -> EntityId { + EntityId::parse(s).unwrap() + } + + fn make_state_event(entity_id: &str, state_val: &str, attrs: serde_json::Value) -> StateChangedEvent { + let eid = entity(entity_id); + let ctx = Context::new(); + let s = Arc::new(State::new(eid.clone(), state_val, attrs, ctx)); + StateChangedEvent { + entity_id: eid, + old_state: None, + new_state: Some(s), + fired_at: Utc::now(), + } + } + + // ── schema ──────────────────────────────────────────────────────────────── + + #[tokio::test] + async fn schema_applies_on_fresh_db() { + let recorder = open_memory().await; + // Verify all four tables exist by querying sqlite_master. + let tables: Vec<(String,)> = + sqlx::query_as("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + .fetch_all(&recorder.pool) + .await + .unwrap(); + let names: Vec<&str> = tables.iter().map(|(n,)| n.as_str()).collect(); + assert!(names.contains(&"state_attributes"), "missing state_attributes"); + assert!(names.contains(&"states"), "missing states"); + assert!(names.contains(&"events"), "missing events"); + assert!(names.contains(&"recorder_runs"), "missing recorder_runs"); + } + + #[tokio::test] + async fn schema_idempotent_double_open() { + // Applying schema twice (on the same pool) must not panic or error. + let recorder = open_memory().await; + recorder.apply_schema().await.expect("second apply_schema must be a no-op"); + } + + // ── record_state ────────────────────────────────────────────────────────── + + #[tokio::test] + async fn record_state_inserts_row() { + let recorder = open_memory().await; + let event = make_state_event("light.kitchen", "on", serde_json::json!({"brightness": 200})); + + let state_id = recorder.record_state(&event).await.unwrap(); + assert!(state_id.is_some(), "expected a state_id"); + + let count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM states WHERE entity_id = 'light.kitchen'") + .fetch_one(&recorder.pool) + .await + .unwrap(); + assert_eq!(count.0, 1); + } + + #[tokio::test] + async fn removal_event_returns_none() { + let recorder = open_memory().await; + let event = StateChangedEvent { + entity_id: entity("light.kitchen"), + old_state: None, + new_state: None, // removal + fired_at: Utc::now(), + }; + let result = recorder.record_state(&event).await.unwrap(); + assert!(result.is_none(), "removal event should yield None state_id"); + } + + // ── attribute deduplication ──────────────────────────────────────────────── + + #[tokio::test] + async fn same_attrs_dedup_to_one_row() { + let recorder = open_memory().await; + let attrs = serde_json::json!({"brightness": 200, "color_temp": 4000}); + + let e1 = make_state_event("light.a", "on", attrs.clone()); + let e2 = make_state_event("light.b", "on", attrs.clone()); + + recorder.record_state(&e1).await.unwrap(); + recorder.record_state(&e2).await.unwrap(); + + let attr_count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM state_attributes") + .fetch_one(&recorder.pool) + .await + .unwrap(); + // Both events share identical attrs → only one state_attributes row. + assert_eq!(attr_count.0, 1, "identical attrs must share one state_attributes row"); + + let state_count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM states") + .fetch_one(&recorder.pool) + .await + .unwrap(); + assert_eq!(state_count.0, 2, "two states rows expected"); + } + + #[tokio::test] + async fn different_attrs_each_get_own_row() { + let recorder = open_memory().await; + let e1 = make_state_event("sensor.a", "20", serde_json::json!({"unit": "C"})); + let e2 = make_state_event("sensor.b", "20", serde_json::json!({"unit": "F"})); + + recorder.record_state(&e1).await.unwrap(); + recorder.record_state(&e2).await.unwrap(); + + let attr_count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM state_attributes") + .fetch_one(&recorder.pool) + .await + .unwrap(); + assert_eq!(attr_count.0, 2); + } + + // ── get_state_history ───────────────────────────────────────────────────── + + #[tokio::test] + async fn history_returns_rows_in_time_order() { + let recorder = open_memory().await; + let eid = entity("sensor.temp"); + + // Insert three states with slightly different timestamps by sleeping. + for val in &["20.0", "21.0", "22.0"] { + let e = make_state_event("sensor.temp", val, serde_json::json!({})); + recorder.record_state(&e).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } + + let since = Utc::now() - chrono::Duration::seconds(10); + let until = Utc::now() + chrono::Duration::seconds(10); + let rows = recorder.get_state_history(&eid, since, until).await.unwrap(); + + assert_eq!(rows.len(), 3, "expected 3 history rows"); + // Verify ascending order by last_updated_ts. + for w in rows.windows(2) { + assert!( + w[0].last_updated_ts <= w[1].last_updated_ts, + "rows must be in ascending time order" + ); + } + assert_eq!(rows[0].state, "20.0"); + assert_eq!(rows[2].state, "22.0"); + } + + // ── record_event ────────────────────────────────────────────────────────── + + #[tokio::test] + async fn record_event_round_trips() { + let recorder = open_memory().await; + let ctx = Context::new(); + let event = DomainEvent::new( + "call_service", + serde_json::json!({"domain": "light", "service": "turn_on"}), + ctx, + ); + + let event_id = recorder.record_event(&event).await.unwrap(); + assert!(event_id > 0); + + let row: (String, String) = + sqlx::query_as("SELECT event_type, event_data FROM events WHERE event_id = ?") + .bind(event_id) + .fetch_one(&recorder.pool) + .await + .unwrap(); + + assert_eq!(row.0, "call_service"); + let data: serde_json::Value = serde_json::from_str(&row.1).unwrap(); + assert_eq!(data["domain"], "light"); + } +} diff --git a/v2/crates/homecore-recorder/src/dedup.rs b/v2/crates/homecore-recorder/src/dedup.rs new file mode 100644 index 00000000..d1d7b18f --- /dev/null +++ b/v2/crates/homecore-recorder/src/dedup.rs @@ -0,0 +1,81 @@ +//! FNV-1a 64-bit hash for state-attribute deduplication. +//! +//! Matches Home Assistant's `db_schema.py` `fnv64a` function used to +//! fingerprint shared attribute blobs. Two state writes with identical +//! attributes share a single `state_attributes` row, reducing I/O by +//! ~80% for high-frequency polling sensors. +//! +//! ## FNV-1a 64 spec +//! +//! - Offset basis: 0xcbf29ce484222325 +//! - Prime: 0x100000001b3 +//! - Per byte: `hash = (hash XOR byte) * prime` +//! +//! Reference values (computed from the spec + verified against HA source): +//! - `""` (empty string) → signed i64: -3750763034362895579 +//! - `"a"` → signed i64: -5808556873153909620 +//! - `{"state": "on"}` → signed i64: 3947789143477681127 + +const FNV_OFFSET_BASIS_64: u64 = 0xcbf29ce484222325; +const FNV_PRIME_64: u64 = 0x100000001b3; + +/// Compute FNV-1a 64-bit hash of `data` bytes, returned as a signed `i64` +/// suitable for direct storage in SQLite's INTEGER column. +/// +/// The cast to `i64` is a bit-reinterpret, not a value conversion — the +/// same pattern HA uses in `db_schema.py`. +#[inline] +pub fn fnv64a_bytes(data: &[u8]) -> i64 { + let mut hash: u64 = FNV_OFFSET_BASIS_64; + for &byte in data { + hash ^= u64::from(byte); + hash = hash.wrapping_mul(FNV_PRIME_64); + } + hash as i64 +} + +/// Hash a UTF-8 string. Convenience wrapper over [`fnv64a_bytes`]. +#[inline] +pub fn fnv64a_hash(s: &str) -> i64 { + fnv64a_bytes(s.as_bytes()) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// HA reference: `fnv64a(b"")` → 0xcbf29ce484222325 (unsigned) + /// As signed i64: -3750763034362895579 + #[test] + fn hash_empty_string() { + assert_eq!(fnv64a_hash(""), -3750763034362895579_i64); + } + + /// HA reference: `fnv64a(b"a")` → 0xaf63dc4c8601ec8c (unsigned) + /// As signed i64: -5808556873153909620 + #[test] + fn hash_single_char_a() { + assert_eq!(fnv64a_hash("a"), -5808556873153909620_i64); + } + + /// Smoke-test a realistic JSON attribute blob. + /// `{"state": "on"}` → signed i64: 3947789143477681127 + #[test] + fn hash_json_blob() { + assert_eq!(fnv64a_hash(r#"{"state": "on"}"#), 3947789143477681127_i64); + } + + /// Different strings must produce different hashes (basic collision check). + #[test] + fn distinct_strings_differ() { + assert_ne!(fnv64a_hash("on"), fnv64a_hash("off")); + assert_ne!(fnv64a_hash("{\"brightness\":100}"), fnv64a_hash("{\"brightness\":200}")); + } + + /// Deterministic: same input always gives same output. + #[test] + fn deterministic() { + let s = r#"{"unit": "C", "value": 22.5}"#; + assert_eq!(fnv64a_hash(s), fnv64a_hash(s)); + } +} diff --git a/v2/crates/homecore-recorder/src/lib.rs b/v2/crates/homecore-recorder/src/lib.rs new file mode 100644 index 00000000..02add36e --- /dev/null +++ b/v2/crates/homecore-recorder/src/lib.rs @@ -0,0 +1,38 @@ +//! homecore-recorder — SQLite state history + semantic search. +//! +//! Implements ADR-132: dual-write architecture. P1 ships SQLite structural +//! persistence with an HA-compatible schema (mirrors HA recorder schema v48). +//! P2 (feature `ruvector`) adds a `SemanticIndex` backed by ruvector +//! embeddings for natural-language state queries. +//! +//! ## P1 architecture +//! +//! ```text +//! StateMachine ──broadcast──► RecorderListener ──► Recorder +//! │ +//! ┌───────┴──────────┐ +//! states state_attributes +//! events recorder_runs +//! ``` +//! +//! ## P2 hand-off (ruvector feature) +//! +//! When the `ruvector` feature is enabled, the `Recorder` additionally +//! calls a `SemanticIndex` implementation that embeds state attributes and +//! stores vectors in ruvector for k-NN semantic search. See [`semantic`]. + +pub mod db; +pub mod dedup; +pub mod listener; +pub mod schema; + +#[cfg(feature = "ruvector")] +pub mod semantic; + +// Re-export the primary public API surface. +pub use db::{Recorder, RecorderError}; +pub use listener::RecorderListener; + +/// Null semantic index used when the `ruvector` feature is off. +/// Satisfies the [`db::SemanticIndex`] trait bound without any allocation. +pub use db::NullSemanticIndex; diff --git a/v2/crates/homecore-recorder/src/listener.rs b/v2/crates/homecore-recorder/src/listener.rs new file mode 100644 index 00000000..efb758ea --- /dev/null +++ b/v2/crates/homecore-recorder/src/listener.rs @@ -0,0 +1,117 @@ +//! `RecorderListener` — subscribes to `StateMachine` broadcasts and writes +//! every `StateChangedEvent` to the `Recorder`. +//! +//! Spawned via `tokio::spawn`. Runs until the broadcast sender is dropped +//! (i.e. the `StateMachine` is shut down) or until a `Lagged` error occurs +//! (subscriber fell more than 4,096 events behind). +//! +//! On `Lagged`, the listener logs a warning and reconnects; it does not crash +//! because dropping a listener would silently stop persistence. +//! +//! ## Subscription ordering +//! +//! The `broadcast::Receiver` is created inside `new()` (not inside the spawned +//! task), so any events fired between `new()` and `spawn()` are enqueued in +//! the receiver buffer and will be drained when the task starts. + +use tokio::sync::broadcast; +use tracing::{debug, warn}; + +use homecore::event::StateChangedEvent; +use homecore::state::StateMachine; + +use crate::db::Recorder; + +/// A background task that records every state change. +/// +/// Call [`RecorderListener::new`] then [`RecorderListener::spawn`]. +/// The subscription starts at construction time so no events are missed +/// between `new()` and `spawn()`. +pub struct RecorderListener { + recorder: Recorder, + rx: broadcast::Receiver, +} + +impl RecorderListener { + /// Create a listener. Subscribes to the broadcast channel immediately so + /// events fired before `spawn()` are buffered in the receiver. + pub fn new(state_machine: &StateMachine, recorder: Recorder) -> Self { + let rx = state_machine.subscribe(); + Self { recorder, rx } + } + + /// Spawn the listener onto the Tokio runtime. + /// + /// Returns a `JoinHandle`. Abort it on graceful shutdown: + /// ```ignore + /// let handle = listener.spawn(); + /// // … on shutdown: + /// handle.abort(); + /// ``` + pub fn spawn(self) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { self.run().await }) + } + + async fn run(mut self) { + loop { + match self.rx.recv().await { + Ok(event) => { + debug!(entity_id = %event.entity_id, "recording state change"); + if let Err(e) = self.recorder.record_state(&event).await { + warn!(error = %e, "failed to record state change"); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!( + lagged_by = n, + "recorder listener lagged — some state changes were not persisted" + ); + // Continue processing from the next available event. + } + Err(broadcast::error::RecvError::Closed) => { + debug!("state machine shut down; recorder listener exiting"); + break; + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use homecore::entity::EntityId; + use homecore::event::Context; + + fn eid(s: &str) -> EntityId { + EntityId::parse(s).unwrap() + } + + #[tokio::test] + async fn listener_records_state_changes() { + let sm = StateMachine::new(); + let recorder = Recorder::open("sqlite::memory:").await.unwrap(); + + let listener = RecorderListener::new(&sm, recorder.clone()); + let _handle = listener.spawn(); + + // Fire two state changes. + sm.set(eid("light.hall"), "on", serde_json::json!({}), Context::new()); + sm.set(eid("light.hall"), "off", serde_json::json!({}), Context::new()); + + // Give the background task a moment to flush. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let since = chrono::Utc::now() - chrono::Duration::seconds(10); + let until = chrono::Utc::now() + chrono::Duration::seconds(10); + let rows = recorder + .get_state_history(&eid("light.hall"), since, until) + .await + .unwrap(); + + assert_eq!(rows.len(), 2, "listener must have persisted both events"); + assert_eq!(rows[0].state, "on"); + assert_eq!(rows[1].state, "off"); + } +} diff --git a/v2/crates/homecore-recorder/src/schema.rs b/v2/crates/homecore-recorder/src/schema.rs new file mode 100644 index 00000000..6941ee31 --- /dev/null +++ b/v2/crates/homecore-recorder/src/schema.rs @@ -0,0 +1,90 @@ +//! SQL DDL for the HA-compatible recorder schema (ADR-132). +//! +//! Schema mirrors Home Assistant recorder schema v48 (HA 2025.1): +//! - `states` — one row per state write (entity_id, state, attrs) +//! - `state_attributes` — shared attribute blobs, deduped by fnv64a hash +//! - `events` — domain events fired by integrations +//! - `recorder_runs` — boot/shutdown bookends for gap detection +//! +//! All DDL strings use `CREATE TABLE IF NOT EXISTS` so `apply_schema` is +//! idempotent and safe to call on every startup. + +/// Create `state_attributes` table. +/// +/// `shared_attrs` is stored as TEXT (JSON blob). `hash` is the FNV-1a 64-bit +/// hash of `shared_attrs` encoded as a signed i64 — matches HA's dedup key. +pub const CREATE_STATE_ATTRIBUTES: &str = " +CREATE TABLE IF NOT EXISTS state_attributes ( + attributes_id INTEGER PRIMARY KEY NOT NULL, + shared_attrs TEXT NOT NULL, + hash INTEGER NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS ix_state_attributes_hash + ON state_attributes (hash); +"; + +/// Create `states` table. +/// +/// `state_id` — auto-increment primary key +/// `entity_id` — validated `domain.name` string +/// `state` — state value string (\"on\", \"off\", \"20.5\", …) +/// `attributes_id` — FK → state_attributes (nullable for HA compat) +/// `last_changed_ts` — Unix timestamp seconds (float, UTC) +/// `last_updated_ts` — Unix timestamp seconds (float, UTC) +/// `context_id` — UUID as TEXT; links to the causality chain +pub const CREATE_STATES: &str = " +CREATE TABLE IF NOT EXISTS states ( + state_id INTEGER PRIMARY KEY NOT NULL, + entity_id TEXT NOT NULL, + state TEXT, + attributes_id INTEGER, + last_changed_ts REAL, + last_updated_ts REAL NOT NULL, + context_id TEXT +); + +CREATE INDEX IF NOT EXISTS ix_states_entity_id_last_updated_ts + ON states (entity_id, last_updated_ts); + +CREATE INDEX IF NOT EXISTS ix_states_last_updated_ts + ON states (last_updated_ts); +"; + +/// Create `events` table. +/// +/// `event_type` — string key (e.g. \"state_changed\", \"call_service\") +/// `event_data` — JSON blob +/// `time_fired_ts` — Unix timestamp seconds (float, UTC) +/// `context_id` — UUID as TEXT +pub const CREATE_EVENTS: &str = " +CREATE TABLE IF NOT EXISTS events ( + event_id INTEGER PRIMARY KEY NOT NULL, + event_type TEXT NOT NULL, + event_data TEXT, + time_fired_ts REAL NOT NULL, + context_id TEXT +); + +CREATE INDEX IF NOT EXISTS ix_events_event_type_time_fired_ts + ON events (event_type, time_fired_ts); +"; + +/// Create `recorder_runs` table. +/// +/// Records each start/stop pair so the history API can annotate gaps. +pub const CREATE_RECORDER_RUNS: &str = " +CREATE TABLE IF NOT EXISTS recorder_runs ( + run_id INTEGER PRIMARY KEY NOT NULL, + start_ts REAL NOT NULL, + end_ts REAL +); +"; + +/// All DDL statements in dependency order. +pub const ALL_DDL: &[&str] = &[ + CREATE_STATE_ATTRIBUTES, + CREATE_STATES, + CREATE_EVENTS, + CREATE_RECORDER_RUNS, +]; diff --git a/v2/crates/homecore-recorder/src/semantic.rs b/v2/crates/homecore-recorder/src/semantic.rs new file mode 100644 index 00000000..c868abf6 --- /dev/null +++ b/v2/crates/homecore-recorder/src/semantic.rs @@ -0,0 +1,54 @@ +//! Semantic indexing for state attributes — P2 ruvector integration point. +//! +//! This module is **feature-gated** (`--features ruvector`). The trait +//! `SemanticIndex` is defined in [`crate::db`] so it is always available. +//! This module provides the ruvector-backed implementation that will ship +//! once the embedding model boundary is finalised in P2. +//! +//! ## P2 plan +//! +//! 1. Add `ruvector-core` + `ruvector-attention` as optional dependencies. +//! 2. Implement `RuvectorSemanticIndex` here, embedding the serialised +//! `State.attributes` JSON into a fixed-dimension vector and inserting +//! it into a ruvector HNSW index keyed by `state_id`. +//! 3. Expose a `search(query: &str, k: usize) -> Vec` helper on +//! `Recorder` that converts the query string to an embedding and calls +//! `ruvector_core::HnswIndex::search`. +//! +//! ## Why deferred +//! +//! The embedding model boundary (which model, what dimension, cosine vs +//! dot-product) is still TBD as of ADR-132. Shipping a concrete +//! implementation now would couple the recorder to a specific ruvector +//! version that may need to change once the embedding model is chosen. +//! The no-op `NullSemanticIndex` in P1 keeps the interface stable without +//! locking in that choice. + +use std::sync::Arc; + +use async_trait::async_trait; + +use homecore::entity::State; + +use crate::db::SemanticIndex; + +/// Stub ruvector-backed semantic index. +/// +/// Will be replaced by a real implementation in P2 once the embedding +/// model boundary is confirmed. Currently logs and no-ops. +pub struct RuvectorSemanticIndex; + +#[async_trait] +impl SemanticIndex for RuvectorSemanticIndex { + async fn index_state( + &self, + state: &Arc, + ) -> Result<(), Box> { + // P2 TODO: embed state.attributes JSON → f32 vector → ruvector insert. + tracing::debug!( + entity_id = %state.entity_id, + "ruvector semantic index: P2 stub — not yet implemented" + ); + Ok(()) + } +}