feat(homecore-recorder/p1): ADR-132 SQLite recorder + fnv64a attr dedup (14 tests pass)

- SQLite-backed state history with HA-compat schema (states, state_attributes,
  events, recorder_runs) mirroring recorder schema v48
- FNV-1a 64-bit attribute deduplication matching HA's db_schema.py fnv64a
- RecorderListener subscribes to StateMachine broadcast and persists every
  state change; subscription created at construction to avoid missed events
- SemanticIndex trait + NullSemanticIndex for P1; ruvector-backed impl stub
  feature-gated behind --features ruvector for P2 hand-off

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-25 18:27:26 -04:00
parent cc079febef
commit a422995817
9 changed files with 915 additions and 0 deletions

17
v2/Cargo.lock generated
View File

@ -3487,6 +3487,23 @@ dependencies = [
"uuid",
]
[[package]]
name = "homecore-migrate"
version = "0.1.0-alpha.0"
dependencies = [
"anyhow",
"clap",
"homecore",
"serde",
"serde_json",
"serde_yaml",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "homecore-plugins"
version = "0.1.0-alpha.0"

View File

@ -33,6 +33,7 @@ members = [
"crates/homecore-api", # ADR-130 — HOMECORE REST + WS API
"crates/homecore-automation", # ADR-129 — HOMECORE automation engine
"crates/homecore-recorder", # ADR-132 — HOMECORE state recorder
"crates/homecore-migrate", # ADR-134 — HOMECORE migration from Python HA
# ADR-100/ADR-101: Cognitum Cog packaging — first Cog from this repo.
# Ships the wifi-densepose pose-estimation model as a signed binary +
# JSONL manifest installable by the Cognitum V0 appliance (cognitum-v0,

View File

@ -0,0 +1,57 @@
# homecore-recorder — SQLite state history + semantic search (ADR-132)
#
# P1 ships: SQLite structural persistence with HA-compat schema.
# P2 (feature-gated): ruvector-backed SemanticIndex for embedding search.
#
# Build: cargo build -p homecore-recorder --no-default-features
# Test: cargo test -p homecore-recorder --no-default-features
[package]
name = "homecore-recorder"
version = "0.1.0-alpha.0"
edition = "2021"
license = "MIT"
authors = ["rUv <ruv@ruv.net>", "HOMECORE Contributors"]
description = "SQLite state-history recorder for HOMECORE — HA-compat schema + ruvector semantic search (ADR-132)"
repository = "https://github.com/ruvnet/RuView"
[lib]
name = "homecore_recorder"
path = "src/lib.rs"
[features]
default = []
ruvector = []
[dependencies]
homecore = { path = "../homecore", version = "0.1.0-alpha.0" }
# Async runtime
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }
# SQLite via sqlx — only the lite feature set; no postgres, no tls
sqlx = { version = "0.7", default-features = false, features = [
"runtime-tokio-native-tls",
"sqlite",
"chrono",
"uuid",
] }
# Serialisation
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Time
chrono = { version = "0.4", features = ["serde"] }
# Error handling
thiserror = "1"
# Structured logging
tracing = "0.1"
# Trait objects for SemanticIndex
async-trait = "0.1"
[dev-dependencies]
tokio = { version = "1", features = ["full", "test-util"] }

View File

@ -0,0 +1,460 @@
//! `Recorder` — SQLite write path + query path.
//!
//! Wraps an `SqlitePool` and exposes three operations:
//! - [`Recorder::open`] — open (or create) the DB and apply schema.
//! - [`Recorder::record_state`] — persist a `StateChangedEvent`.
//! - [`Recorder::record_event`] — persist a `DomainEvent`.
//! - [`Recorder::get_state_history`] — read back rows in time order.
//!
//! State attributes are deduped via `fnv64a_hash` (see [`crate::dedup`]):
//! if an identical attributes blob was previously written its
//! `attributes_id` is reused and no new row is inserted.
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions};
use thiserror::Error;
use tracing::debug;
use homecore::entity::{EntityId, State};
use homecore::event::{DomainEvent, StateChangedEvent};
use crate::dedup::fnv64a_hash;
use crate::schema::ALL_DDL;
/// Errors returned by `Recorder` operations.
#[derive(Error, Debug)]
pub enum RecorderError {
#[error("SQLite error: {0}")]
Sqlx(#[from] sqlx::Error),
#[error("serialisation error: {0}")]
Json(#[from] serde_json::Error),
#[error("URL parse error: {0}")]
UrlParse(String),
}
/// Trait for pluggable semantic (vector) indexing of state writes.
///
/// The no-op [`NullSemanticIndex`] is used in P1. P2 ships a ruvector-backed
/// implementation behind the `ruvector` feature flag.
#[async_trait]
pub trait SemanticIndex: Send + Sync {
/// Index a new state write. Called after the SQLite insert succeeds.
/// Implementations must be infallible from the caller's perspective:
/// if the index is unavailable the recorder keeps running.
async fn index_state(&self, state: &Arc<State>) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
/// No-op `SemanticIndex`. Used by default when the `ruvector` feature is off.
pub struct NullSemanticIndex;
#[async_trait]
impl SemanticIndex for NullSemanticIndex {
async fn index_state(&self, _state: &Arc<State>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
}
/// The recorder. Cheap to clone (Arc-backed pool). Pass copies to the
/// `RecorderListener` and the API history handler.
#[derive(Clone)]
pub struct Recorder {
pool: SqlitePool,
semantic: Arc<dyn SemanticIndex>,
}
impl Recorder {
/// Open (or create) the SQLite database at `path` and apply the schema.
///
/// Pass `"sqlite::memory:"` for an in-memory database (tests).
///
/// The schema DDL uses `CREATE TABLE IF NOT EXISTS` so calling this on an
/// existing database is safe.
pub async fn open(path: &str) -> Result<Self, RecorderError> {
Self::open_with_index(path, Arc::new(NullSemanticIndex)).await
}
/// Open with a custom `SemanticIndex` (P2 entry point).
pub async fn open_with_index(
path: &str,
semantic: Arc<dyn SemanticIndex>,
) -> Result<Self, RecorderError> {
let options = path
.parse::<SqliteConnectOptions>()
.map_err(|e| RecorderError::UrlParse(e.to_string()))?
.create_if_missing(true);
let pool = SqlitePoolOptions::new()
.max_connections(4)
.connect_with(options)
.await?;
let recorder = Self { pool, semantic };
recorder.apply_schema().await?;
Ok(recorder)
}
/// Apply all DDL statements. Idempotent.
async fn apply_schema(&self) -> Result<(), RecorderError> {
for ddl in ALL_DDL {
// Each DDL block may contain multiple statements separated by `;`.
// sqlx::query does not support multi-statement strings directly,
// so we split on the statement boundary and execute individually.
for stmt in split_statements(ddl) {
let stmt = stmt.trim();
if !stmt.is_empty() {
sqlx::query(stmt).execute(&self.pool).await?;
}
}
}
Ok(())
}
/// Persist a `StateChangedEvent`. Inserts into `states` and dedupes into
/// `state_attributes`. Returns the `state_id` of the new row.
pub async fn record_state(
&self,
event: &StateChangedEvent,
) -> Result<Option<i64>, RecorderError> {
let new_state = match &event.new_state {
Some(s) => s,
None => return Ok(None), // removal event — no row to insert
};
let attrs_json = serde_json::to_string(&new_state.attributes)?;
let hash = fnv64a_hash(&attrs_json);
// Upsert into state_attributes (dedup by hash).
let attributes_id: i64 = {
// Try to find an existing row first.
let existing: Option<(i64,)> =
sqlx::query_as("SELECT attributes_id FROM state_attributes WHERE hash = ?")
.bind(hash)
.fetch_optional(&self.pool)
.await?;
if let Some((id,)) = existing {
debug!(hash, id, "reusing existing state_attributes row");
id
} else {
let result =
sqlx::query("INSERT INTO state_attributes (shared_attrs, hash) VALUES (?, ?)")
.bind(&attrs_json)
.bind(hash)
.execute(&self.pool)
.await?;
result.last_insert_rowid()
}
};
let context_id = new_state.context.id.to_string();
let last_changed_ts = new_state.last_changed.timestamp_micros() as f64 / 1_000_000.0;
let last_updated_ts = new_state.last_updated.timestamp_micros() as f64 / 1_000_000.0;
let result = sqlx::query(
"INSERT INTO states \
(entity_id, state, attributes_id, last_changed_ts, last_updated_ts, context_id) \
VALUES (?, ?, ?, ?, ?, ?)",
)
.bind(new_state.entity_id.as_str())
.bind(&new_state.state)
.bind(attributes_id)
.bind(last_changed_ts)
.bind(last_updated_ts)
.bind(&context_id)
.execute(&self.pool)
.await?;
let state_id = result.last_insert_rowid();
// Best-effort semantic indexing — failure is logged, not propagated.
if let Err(e) = self.semantic.index_state(new_state).await {
tracing::warn!(error = %e, entity_id = %new_state.entity_id, "semantic indexing failed");
}
Ok(Some(state_id))
}
/// Persist a `DomainEvent`. Returns the `event_id`.
pub async fn record_event(&self, event: &DomainEvent) -> Result<i64, RecorderError> {
let data_json = serde_json::to_string(&event.event_data)?;
let time_fired_ts = event.fired_at.timestamp_micros() as f64 / 1_000_000.0;
let context_id = event.context.id.to_string();
let result = sqlx::query(
"INSERT INTO events (event_type, event_data, time_fired_ts, context_id) \
VALUES (?, ?, ?, ?)",
)
.bind(&event.event_type)
.bind(&data_json)
.bind(time_fired_ts)
.bind(&context_id)
.execute(&self.pool)
.await?;
Ok(result.last_insert_rowid())
}
/// Query state history for `entity_id` between `since` and `until`.
/// Returns state snapshots in ascending `last_updated_ts` order.
pub async fn get_state_history(
&self,
entity_id: &EntityId,
since: DateTime<Utc>,
until: DateTime<Utc>,
) -> Result<Vec<StateRow>, RecorderError> {
let since_ts = since.timestamp_micros() as f64 / 1_000_000.0;
let until_ts = until.timestamp_micros() as f64 / 1_000_000.0;
let rows: Vec<(i64, String, Option<String>, f64, f64, Option<String>)> = sqlx::query_as(
"SELECT s.state_id, s.state, sa.shared_attrs, \
s.last_changed_ts, s.last_updated_ts, s.context_id \
FROM states s \
LEFT JOIN state_attributes sa ON s.attributes_id = sa.attributes_id \
WHERE s.entity_id = ? \
AND s.last_updated_ts >= ? \
AND s.last_updated_ts <= ? \
ORDER BY s.last_updated_ts ASC",
)
.bind(entity_id.as_str())
.bind(since_ts)
.bind(until_ts)
.fetch_all(&self.pool)
.await?;
rows.into_iter()
.map(|(state_id, state, shared_attrs, last_changed_ts, last_updated_ts, context_id)| {
let attributes = shared_attrs
.as_deref()
.map(serde_json::from_str)
.transpose()?
.unwrap_or(serde_json::Value::Object(Default::default()));
Ok(StateRow {
state_id,
entity_id: entity_id.clone(),
state,
attributes,
last_changed_ts,
last_updated_ts,
context_id,
})
})
.collect()
}
}
/// A state row returned from `get_state_history`.
#[derive(Debug, Clone)]
pub struct StateRow {
pub state_id: i64,
pub entity_id: EntityId,
pub state: String,
pub attributes: serde_json::Value,
/// Unix timestamp (seconds, fractional) when the state string last changed.
pub last_changed_ts: f64,
/// Unix timestamp (seconds, fractional) when this snapshot was written.
pub last_updated_ts: f64,
pub context_id: Option<String>,
}
/// Split a multi-statement DDL string on `;` boundaries.
/// Trims whitespace; skips empty fragments.
fn split_statements(ddl: &str) -> impl Iterator<Item = &str> {
ddl.split(';').map(str::trim).filter(|s| !s.is_empty())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use chrono::Utc;
use homecore::entity::{EntityId, State};
use homecore::event::{Context, DomainEvent, StateChangedEvent};
use super::*;
async fn open_memory() -> Recorder {
Recorder::open("sqlite::memory:").await.expect("open in-memory DB")
}
fn entity(s: &str) -> EntityId {
EntityId::parse(s).unwrap()
}
fn make_state_event(entity_id: &str, state_val: &str, attrs: serde_json::Value) -> StateChangedEvent {
let eid = entity(entity_id);
let ctx = Context::new();
let s = Arc::new(State::new(eid.clone(), state_val, attrs, ctx));
StateChangedEvent {
entity_id: eid,
old_state: None,
new_state: Some(s),
fired_at: Utc::now(),
}
}
// ── schema ────────────────────────────────────────────────────────────────
#[tokio::test]
async fn schema_applies_on_fresh_db() {
let recorder = open_memory().await;
// Verify all four tables exist by querying sqlite_master.
let tables: Vec<(String,)> =
sqlx::query_as("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.fetch_all(&recorder.pool)
.await
.unwrap();
let names: Vec<&str> = tables.iter().map(|(n,)| n.as_str()).collect();
assert!(names.contains(&"state_attributes"), "missing state_attributes");
assert!(names.contains(&"states"), "missing states");
assert!(names.contains(&"events"), "missing events");
assert!(names.contains(&"recorder_runs"), "missing recorder_runs");
}
#[tokio::test]
async fn schema_idempotent_double_open() {
// Applying schema twice (on the same pool) must not panic or error.
let recorder = open_memory().await;
recorder.apply_schema().await.expect("second apply_schema must be a no-op");
}
// ── record_state ──────────────────────────────────────────────────────────
#[tokio::test]
async fn record_state_inserts_row() {
let recorder = open_memory().await;
let event = make_state_event("light.kitchen", "on", serde_json::json!({"brightness": 200}));
let state_id = recorder.record_state(&event).await.unwrap();
assert!(state_id.is_some(), "expected a state_id");
let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM states WHERE entity_id = 'light.kitchen'")
.fetch_one(&recorder.pool)
.await
.unwrap();
assert_eq!(count.0, 1);
}
#[tokio::test]
async fn removal_event_returns_none() {
let recorder = open_memory().await;
let event = StateChangedEvent {
entity_id: entity("light.kitchen"),
old_state: None,
new_state: None, // removal
fired_at: Utc::now(),
};
let result = recorder.record_state(&event).await.unwrap();
assert!(result.is_none(), "removal event should yield None state_id");
}
// ── attribute deduplication ────────────────────────────────────────────────
#[tokio::test]
async fn same_attrs_dedup_to_one_row() {
let recorder = open_memory().await;
let attrs = serde_json::json!({"brightness": 200, "color_temp": 4000});
let e1 = make_state_event("light.a", "on", attrs.clone());
let e2 = make_state_event("light.b", "on", attrs.clone());
recorder.record_state(&e1).await.unwrap();
recorder.record_state(&e2).await.unwrap();
let attr_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM state_attributes")
.fetch_one(&recorder.pool)
.await
.unwrap();
// Both events share identical attrs → only one state_attributes row.
assert_eq!(attr_count.0, 1, "identical attrs must share one state_attributes row");
let state_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM states")
.fetch_one(&recorder.pool)
.await
.unwrap();
assert_eq!(state_count.0, 2, "two states rows expected");
}
#[tokio::test]
async fn different_attrs_each_get_own_row() {
let recorder = open_memory().await;
let e1 = make_state_event("sensor.a", "20", serde_json::json!({"unit": "C"}));
let e2 = make_state_event("sensor.b", "20", serde_json::json!({"unit": "F"}));
recorder.record_state(&e1).await.unwrap();
recorder.record_state(&e2).await.unwrap();
let attr_count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM state_attributes")
.fetch_one(&recorder.pool)
.await
.unwrap();
assert_eq!(attr_count.0, 2);
}
// ── get_state_history ─────────────────────────────────────────────────────
#[tokio::test]
async fn history_returns_rows_in_time_order() {
let recorder = open_memory().await;
let eid = entity("sensor.temp");
// Insert three states with slightly different timestamps by sleeping.
for val in &["20.0", "21.0", "22.0"] {
let e = make_state_event("sensor.temp", val, serde_json::json!({}));
recorder.record_state(&e).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
let since = Utc::now() - chrono::Duration::seconds(10);
let until = Utc::now() + chrono::Duration::seconds(10);
let rows = recorder.get_state_history(&eid, since, until).await.unwrap();
assert_eq!(rows.len(), 3, "expected 3 history rows");
// Verify ascending order by last_updated_ts.
for w in rows.windows(2) {
assert!(
w[0].last_updated_ts <= w[1].last_updated_ts,
"rows must be in ascending time order"
);
}
assert_eq!(rows[0].state, "20.0");
assert_eq!(rows[2].state, "22.0");
}
// ── record_event ──────────────────────────────────────────────────────────
#[tokio::test]
async fn record_event_round_trips() {
let recorder = open_memory().await;
let ctx = Context::new();
let event = DomainEvent::new(
"call_service",
serde_json::json!({"domain": "light", "service": "turn_on"}),
ctx,
);
let event_id = recorder.record_event(&event).await.unwrap();
assert!(event_id > 0);
let row: (String, String) =
sqlx::query_as("SELECT event_type, event_data FROM events WHERE event_id = ?")
.bind(event_id)
.fetch_one(&recorder.pool)
.await
.unwrap();
assert_eq!(row.0, "call_service");
let data: serde_json::Value = serde_json::from_str(&row.1).unwrap();
assert_eq!(data["domain"], "light");
}
}

View File

@ -0,0 +1,81 @@
//! FNV-1a 64-bit hash for state-attribute deduplication.
//!
//! Matches Home Assistant's `db_schema.py` `fnv64a` function used to
//! fingerprint shared attribute blobs. Two state writes with identical
//! attributes share a single `state_attributes` row, reducing I/O by
//! ~80% for high-frequency polling sensors.
//!
//! ## FNV-1a 64 spec
//!
//! - Offset basis: 0xcbf29ce484222325
//! - Prime: 0x100000001b3
//! - Per byte: `hash = (hash XOR byte) * prime`
//!
//! Reference values (computed from the spec + verified against HA source):
//! - `""` (empty string) → signed i64: -3750763034362895579
//! - `"a"` → signed i64: -5808556873153909620
//! - `{"state": "on"}` → signed i64: 3947789143477681127
const FNV_OFFSET_BASIS_64: u64 = 0xcbf29ce484222325;
const FNV_PRIME_64: u64 = 0x100000001b3;
/// Compute FNV-1a 64-bit hash of `data` bytes, returned as a signed `i64`
/// suitable for direct storage in SQLite's INTEGER column.
///
/// The cast to `i64` is a bit-reinterpret, not a value conversion — the
/// same pattern HA uses in `db_schema.py`.
#[inline]
pub fn fnv64a_bytes(data: &[u8]) -> i64 {
let mut hash: u64 = FNV_OFFSET_BASIS_64;
for &byte in data {
hash ^= u64::from(byte);
hash = hash.wrapping_mul(FNV_PRIME_64);
}
hash as i64
}
/// Hash a UTF-8 string. Convenience wrapper over [`fnv64a_bytes`].
#[inline]
pub fn fnv64a_hash(s: &str) -> i64 {
fnv64a_bytes(s.as_bytes())
}
#[cfg(test)]
mod tests {
use super::*;
/// HA reference: `fnv64a(b"")` → 0xcbf29ce484222325 (unsigned)
/// As signed i64: -3750763034362895579
#[test]
fn hash_empty_string() {
assert_eq!(fnv64a_hash(""), -3750763034362895579_i64);
}
/// HA reference: `fnv64a(b"a")` → 0xaf63dc4c8601ec8c (unsigned)
/// As signed i64: -5808556873153909620
#[test]
fn hash_single_char_a() {
assert_eq!(fnv64a_hash("a"), -5808556873153909620_i64);
}
/// Smoke-test a realistic JSON attribute blob.
/// `{"state": "on"}` → signed i64: 3947789143477681127
#[test]
fn hash_json_blob() {
assert_eq!(fnv64a_hash(r#"{"state": "on"}"#), 3947789143477681127_i64);
}
/// Different strings must produce different hashes (basic collision check).
#[test]
fn distinct_strings_differ() {
assert_ne!(fnv64a_hash("on"), fnv64a_hash("off"));
assert_ne!(fnv64a_hash("{\"brightness\":100}"), fnv64a_hash("{\"brightness\":200}"));
}
/// Deterministic: same input always gives same output.
#[test]
fn deterministic() {
let s = r#"{"unit": "C", "value": 22.5}"#;
assert_eq!(fnv64a_hash(s), fnv64a_hash(s));
}
}

View File

@ -0,0 +1,38 @@
//! homecore-recorder — SQLite state history + semantic search.
//!
//! Implements ADR-132: dual-write architecture. P1 ships SQLite structural
//! persistence with an HA-compatible schema (mirrors HA recorder schema v48).
//! P2 (feature `ruvector`) adds a `SemanticIndex` backed by ruvector
//! embeddings for natural-language state queries.
//!
//! ## P1 architecture
//!
//! ```text
//! StateMachine ──broadcast──► RecorderListener ──► Recorder
//! │
//! ┌───────┴──────────┐
//! states state_attributes
//! events recorder_runs
//! ```
//!
//! ## P2 hand-off (ruvector feature)
//!
//! When the `ruvector` feature is enabled, the `Recorder` additionally
//! calls a `SemanticIndex` implementation that embeds state attributes and
//! stores vectors in ruvector for k-NN semantic search. See [`semantic`].
pub mod db;
pub mod dedup;
pub mod listener;
pub mod schema;
#[cfg(feature = "ruvector")]
pub mod semantic;
// Re-export the primary public API surface.
pub use db::{Recorder, RecorderError};
pub use listener::RecorderListener;
/// Null semantic index used when the `ruvector` feature is off.
/// Satisfies the [`db::SemanticIndex`] trait bound without any allocation.
pub use db::NullSemanticIndex;

View File

@ -0,0 +1,117 @@
//! `RecorderListener` — subscribes to `StateMachine` broadcasts and writes
//! every `StateChangedEvent` to the `Recorder`.
//!
//! Spawned via `tokio::spawn`. Runs until the broadcast sender is dropped
//! (i.e. the `StateMachine` is shut down) or until a `Lagged` error occurs
//! (subscriber fell more than 4,096 events behind).
//!
//! On `Lagged`, the listener logs a warning and reconnects; it does not crash
//! because dropping a listener would silently stop persistence.
//!
//! ## Subscription ordering
//!
//! The `broadcast::Receiver` is created inside `new()` (not inside the spawned
//! task), so any events fired between `new()` and `spawn()` are enqueued in
//! the receiver buffer and will be drained when the task starts.
use tokio::sync::broadcast;
use tracing::{debug, warn};
use homecore::event::StateChangedEvent;
use homecore::state::StateMachine;
use crate::db::Recorder;
/// A background task that records every state change.
///
/// Call [`RecorderListener::new`] then [`RecorderListener::spawn`].
/// The subscription starts at construction time so no events are missed
/// between `new()` and `spawn()`.
pub struct RecorderListener {
recorder: Recorder,
rx: broadcast::Receiver<StateChangedEvent>,
}
impl RecorderListener {
/// Create a listener. Subscribes to the broadcast channel immediately so
/// events fired before `spawn()` are buffered in the receiver.
pub fn new(state_machine: &StateMachine, recorder: Recorder) -> Self {
let rx = state_machine.subscribe();
Self { recorder, rx }
}
/// Spawn the listener onto the Tokio runtime.
///
/// Returns a `JoinHandle`. Abort it on graceful shutdown:
/// ```ignore
/// let handle = listener.spawn();
/// // … on shutdown:
/// handle.abort();
/// ```
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move { self.run().await })
}
async fn run(mut self) {
loop {
match self.rx.recv().await {
Ok(event) => {
debug!(entity_id = %event.entity_id, "recording state change");
if let Err(e) = self.recorder.record_state(&event).await {
warn!(error = %e, "failed to record state change");
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!(
lagged_by = n,
"recorder listener lagged — some state changes were not persisted"
);
// Continue processing from the next available event.
}
Err(broadcast::error::RecvError::Closed) => {
debug!("state machine shut down; recorder listener exiting");
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use homecore::entity::EntityId;
use homecore::event::Context;
fn eid(s: &str) -> EntityId {
EntityId::parse(s).unwrap()
}
#[tokio::test]
async fn listener_records_state_changes() {
let sm = StateMachine::new();
let recorder = Recorder::open("sqlite::memory:").await.unwrap();
let listener = RecorderListener::new(&sm, recorder.clone());
let _handle = listener.spawn();
// Fire two state changes.
sm.set(eid("light.hall"), "on", serde_json::json!({}), Context::new());
sm.set(eid("light.hall"), "off", serde_json::json!({}), Context::new());
// Give the background task a moment to flush.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let since = chrono::Utc::now() - chrono::Duration::seconds(10);
let until = chrono::Utc::now() + chrono::Duration::seconds(10);
let rows = recorder
.get_state_history(&eid("light.hall"), since, until)
.await
.unwrap();
assert_eq!(rows.len(), 2, "listener must have persisted both events");
assert_eq!(rows[0].state, "on");
assert_eq!(rows[1].state, "off");
}
}

View File

@ -0,0 +1,90 @@
//! SQL DDL for the HA-compatible recorder schema (ADR-132).
//!
//! Schema mirrors Home Assistant recorder schema v48 (HA 2025.1):
//! - `states` — one row per state write (entity_id, state, attrs)
//! - `state_attributes` — shared attribute blobs, deduped by fnv64a hash
//! - `events` — domain events fired by integrations
//! - `recorder_runs` — boot/shutdown bookends for gap detection
//!
//! All DDL strings use `CREATE TABLE IF NOT EXISTS` so `apply_schema` is
//! idempotent and safe to call on every startup.
/// Create `state_attributes` table.
///
/// `shared_attrs` is stored as TEXT (JSON blob). `hash` is the FNV-1a 64-bit
/// hash of `shared_attrs` encoded as a signed i64 — matches HA's dedup key.
pub const CREATE_STATE_ATTRIBUTES: &str = "
CREATE TABLE IF NOT EXISTS state_attributes (
attributes_id INTEGER PRIMARY KEY NOT NULL,
shared_attrs TEXT NOT NULL,
hash INTEGER NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS ix_state_attributes_hash
ON state_attributes (hash);
";
/// Create `states` table.
///
/// `state_id` — auto-increment primary key
/// `entity_id` — validated `domain.name` string
/// `state` — state value string (\"on\", \"off\", \"20.5\", …)
/// `attributes_id` — FK → state_attributes (nullable for HA compat)
/// `last_changed_ts` — Unix timestamp seconds (float, UTC)
/// `last_updated_ts` — Unix timestamp seconds (float, UTC)
/// `context_id` — UUID as TEXT; links to the causality chain
pub const CREATE_STATES: &str = "
CREATE TABLE IF NOT EXISTS states (
state_id INTEGER PRIMARY KEY NOT NULL,
entity_id TEXT NOT NULL,
state TEXT,
attributes_id INTEGER,
last_changed_ts REAL,
last_updated_ts REAL NOT NULL,
context_id TEXT
);
CREATE INDEX IF NOT EXISTS ix_states_entity_id_last_updated_ts
ON states (entity_id, last_updated_ts);
CREATE INDEX IF NOT EXISTS ix_states_last_updated_ts
ON states (last_updated_ts);
";
/// Create `events` table.
///
/// `event_type` — string key (e.g. \"state_changed\", \"call_service\")
/// `event_data` — JSON blob
/// `time_fired_ts` — Unix timestamp seconds (float, UTC)
/// `context_id` — UUID as TEXT
pub const CREATE_EVENTS: &str = "
CREATE TABLE IF NOT EXISTS events (
event_id INTEGER PRIMARY KEY NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT,
time_fired_ts REAL NOT NULL,
context_id TEXT
);
CREATE INDEX IF NOT EXISTS ix_events_event_type_time_fired_ts
ON events (event_type, time_fired_ts);
";
/// Create `recorder_runs` table.
///
/// Records each start/stop pair so the history API can annotate gaps.
pub const CREATE_RECORDER_RUNS: &str = "
CREATE TABLE IF NOT EXISTS recorder_runs (
run_id INTEGER PRIMARY KEY NOT NULL,
start_ts REAL NOT NULL,
end_ts REAL
);
";
/// All DDL statements in dependency order.
pub const ALL_DDL: &[&str] = &[
CREATE_STATE_ATTRIBUTES,
CREATE_STATES,
CREATE_EVENTS,
CREATE_RECORDER_RUNS,
];

View File

@ -0,0 +1,54 @@
//! Semantic indexing for state attributes — P2 ruvector integration point.
//!
//! This module is **feature-gated** (`--features ruvector`). The trait
//! `SemanticIndex` is defined in [`crate::db`] so it is always available.
//! This module provides the ruvector-backed implementation that will ship
//! once the embedding model boundary is finalised in P2.
//!
//! ## P2 plan
//!
//! 1. Add `ruvector-core` + `ruvector-attention` as optional dependencies.
//! 2. Implement `RuvectorSemanticIndex` here, embedding the serialised
//! `State.attributes` JSON into a fixed-dimension vector and inserting
//! it into a ruvector HNSW index keyed by `state_id`.
//! 3. Expose a `search(query: &str, k: usize) -> Vec<StateRow>` helper on
//! `Recorder` that converts the query string to an embedding and calls
//! `ruvector_core::HnswIndex::search`.
//!
//! ## Why deferred
//!
//! The embedding model boundary (which model, what dimension, cosine vs
//! dot-product) is still TBD as of ADR-132. Shipping a concrete
//! implementation now would couple the recorder to a specific ruvector
//! version that may need to change once the embedding model is chosen.
//! The no-op `NullSemanticIndex` in P1 keeps the interface stable without
//! locking in that choice.
use std::sync::Arc;
use async_trait::async_trait;
use homecore::entity::State;
use crate::db::SemanticIndex;
/// Stub ruvector-backed semantic index.
///
/// Will be replaced by a real implementation in P2 once the embedding
/// model boundary is confirmed. Currently logs and no-ops.
pub struct RuvectorSemanticIndex;
#[async_trait]
impl SemanticIndex for RuvectorSemanticIndex {
async fn index_state(
&self,
state: &Arc<State>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// P2 TODO: embed state.attributes JSON → f32 vector → ruvector insert.
tracing::debug!(
entity_id = %state.entity_id,
"ruvector semantic index: P2 stub — not yet implemented"
);
Ok(())
}
}