fix(recorder): bound history query (memory-DoS) + add missing transactional purge (disk-DoS); SQL-injection & NaN dims clean (#1084)
* fix(homecore-recorder): bound history query + add transactional purge (memory-DoS + disk-DoS)
Security review of the HA-compat state recorder (ADR-132) found two real
bounding bugs; SQL-injection and NaN-index dimensions confirmed clean.
(1) Memory-DoS: get_state_history carried no LIMIT — a wide [since,until]
window over a high-frequency entity loaded an unbounded row set into a
single in-memory Vec. Added LIMIT MAX_HISTORY_ROWS (1,000,000); the
sibling search paths were already k-bounded.
(2) Disk-DoS / documented-but-missing purge: README advertised
Recorder::purge(older_than) but no retention path existed -> unbounded
disk growth. Added a transactional purge with an EXCLUSIVE cutoff
(idempotent, no off-by-one) that deletes old states+events and
garbage-collects orphaned state_attributes blobs (dedup-shared blobs
are kept until their last referencing state is gone). All three deletes
run in one transaction so a mid-purge failure rolls back cleanly.
Pinning tests (homecore-recorder 19->25 no-default / 25->31 ruvector, 0 failed):
- malicious_entity_id_is_stored_literally_not_executed (SQL injection)
- like_metacharacters_in_query_are_literal_not_wildcards (LIKE escape)
- history_query_carries_a_limit_clause (memory-DoS bound)
- purge_keeps_boundary_row_and_drops_older (exclusive-cutoff, true pin)
- purge_gcs_orphaned_attributes_but_keeps_shared (dedup-safe GC)
- purge_also_removes_old_events
No behaviour change beyond the two fixes. Python deterministic proof
unchanged (recorder is off the signal proof path).
Co-Authored-By: claude-flow <ruv@ruv.net>
* docs(homecore-recorder): record ADR-132 security review findings
Add a "3a. Security review" section to ADR-132 and a CHANGELOG [Unreleased]
Security entry covering the homecore-recorder review: SQL-injection and
NaN-index dimensions confirmed clean with evidence (every query bound; LIKE
pattern bound+escaped; SHA-256->i32->f32 embeddings always finite, empty
index/k=0 probed no-panic), plus the two fixes (unbounded history LIMIT,
transactional exclusive-cutoff purge with orphan-attribute GC).
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
5bc3b634b7
commit
41bee64593
|
|
@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Security
|
||||
- **`homecore-recorder` security review (ADR-132 surfaces) — two real bounding fixes; SQL-injection & NaN-index dimensions confirmed clean with evidence.** Beyond-SOTA review of the HA-compat state recorder (DB persistence + history + ruvector semantic search), the crux being its DB-backed SQL-injection surface. **Findings + fixes:** (1) **Memory-DoS — unbounded `get_state_history`.** The history query carried no `LIMIT`, so a wide `[since, until]` window over a high-frequency entity (a per-second sensor ≈ 86k rows/day) would load an unbounded row set into a single in-memory `Vec`. Added a hard `LIMIT MAX_HISTORY_ROWS` (1,000,000 — generous enough never to truncate a realistic history graph, bounded enough to cap the worst case); the sibling search paths were already `k`-bounded. (2) **Disk-DoS / documented-but-missing `purge`.** The README + HA-compat table advertised `Recorder::purge(older_than)` as a capability, but **no such method existed** — i.e. no retention path at all → unbounded disk growth. Implemented a **transactional** `purge` that deletes `states` + `events` strictly **older than** the cutoff (**exclusive** boundary — idempotent, no off-by-one; a row at the cutoff instant is kept) and **garbage-collects** orphaned `state_attributes` blobs (a dedup-shared blob is dropped only once its last referencing state is gone); all three deletes run in one transaction so a mid-purge failure rolls back cleanly (no states-deleted-but-events-kept corruption). **Confirmed clean with evidence:** SQL injection — **every** query in `db.rs` uses bound `?` parameters (no `format!`/string-concat of user data into SQL); the lone `format!` builds the LIKE *pattern*, which is itself bound as a parameter with `ESCAPE '\\'` and metacharacter escaping. Pinned: a state value `'; DROP TABLE states; --` is stored/queried **literally** (table survives), and a `%`/`_` in a search query matches **literally**, not as a wildcard. NaN-index poisoning (the calibration/vitals/geo class) — **structurally impossible** here: embeddings are SHA-256 → `i32` → `f32` (an `i32` cast to `f32` is always finite, never NaN/Inf), with an all-zero-digest norm guard; probed empty-index search, empty-string query, and `k=0` — all return `Ok(0)`, **no panic**. Fail-closed write path — a removal event yields `Ok(None)`, semantic-index failure is logged not propagated (best-effort, never blocks the durable SQLite write), and `EntityId` parsing failures fall back rather than panic. **6 new pinning tests** (SQL-injection literal-storage, LIKE-metacharacter literalness, history `LIMIT`, purge exclusive-boundary, purge attribute-GC-keeps-shared, purge old-events): `homecore-recorder` **19 → 25** (`--no-default-features`) / **25 → 31** (`--features ruvector`), 0 failed; the purge-boundary test is a true pin (fails deleting 2 rows under an inclusive cutoff, passes deleting 1 under the exclusive cutoff). Behaviour otherwise unchanged; Python deterministic proof unchanged (recorder is off the signal proof path).
|
||||
|
||||
### Added
|
||||
- **RuField `rufield-viewer` live-ingest mode — closes the RuView↔RuField visual loop (ADR-262 surfaces).** The dashboard gains `--source live --upstream <RuView-URL>`: it consumes RuView's `/ws/field` SSE (falling back to polling `/api/field`), **verifies every event's ed25519 provenance receipt on ingest** (`is_fusable`) — forged/tampered events are flagged ✗ and **never fused** into trusted inferences — and renders real RuView `FieldEvent`s through the same room-state/privacy-badge/fusion-graph/receipt path the synthetic mode uses (wire-compatible by construction: both sides use `rufield_core::FieldEvent` serde). **Strict banner honesty:** a single `BannerState` shows `SYNTHETIC` / `LIVE — <upstream>` / `DISCONNECTED — <upstream> unreachable`, mutually exclusive — never SYNTHETIC while showing live data or vice versa; live mode returns **409** on `/api/run` rather than fabricate a synthetic run, and starts DISCONNECTED until first verified contact. Default stays synthetic. 26 tests / 0 failed. `ruvnet/rufield` `crates/rufield-viewer`; `vendor/rufield` submodule bumped.
|
||||
- **ADR-262 P3 — live RuField surface: RuView's running sensing-server now speaks RuField on `/api/field` + `/ws/field`.** Wires the P1 `wifi-densepose-rufield` bridge into the live `wifi-densepose-sensing-server` (the bridge is the only added coupling, ADR-262 §5.4). A new `src/rufield_surface.rs` module (kept out of the 8k-line `main.rs`) holds a `FieldSurface` with a **dedicated ed25519 `Signer`**, a bounded ring buffer of recent signed events (`FIELD_RING_CAPACITY = 64`), and the `/ws/field` broadcast topic; it exposes `GET /api/field` (latest signed `FieldEvent`s + signer pubkey + a `dev_signing_key` flag) and `GET /ws/field` (per-cycle stream, mirroring `/ws/sensing`), plus a standalone `router()` for isolated testing. **Tap:** at the ESP32 governed-trust cycle (`main.rs` `observe_cycle` ~`:5886` / `SensingUpdate` build ~`:5938`), `emit_rufield_event` joins the cycle's real `SensingUpdate` (features/classification/signal_field) with the engine's recorded `effective_class`/`demoted` trust state into a `SensingSnapshot` and surfaces a signed `FieldEvent` — **existing endpoints (`/ws/sensing` etc.) are unchanged; this is purely additive.** **Signer (defers the P2 key decision, §8 Q1):** a **standalone dev/sensing key** from `WDP_RUFIELD_SIGNING_SEED` (64-hex or ≥32-byte value), else a deterministic dev default with a logged `WARN` — reusing the `cog-ha-matter` Ed25519 key is the deferred P2 call, so P3 does not pre-empt it. **Egress privacy (fail-closed):** `network_egress_allowed` is *stricter* than `DefaultPrivacyGuard` for an unattended live surface — only **P1/P2** leave the box; P0 (raw) and P3/P4/P5 are held edge-local, so a `Derived → P4/P5` cycle **never** surfaces; no-presence cycles emit **no phantom event**. **P3 acceptance gates (`tests/rufield_surface_test.rs`, 4 integration via `tower::oneshot` + 4 module unit, 0 failed):** a well-formed **signed** event (`Modality::WifiCsi`, P2 not P1, `is_fusable` ed25519-verified, real timestamp); empty cycle → no phantom; **privacy-safety** — an injected `Derived` trust never surfaces; a mixed stream surfaces only egress-safe events. **Honest scope (ADR-262 §0/§6):** real plumbing on a **live endpoint**, **NOT accuracy** — single-link CSI with its existing caveats (no validated room-coordinate accuracy — `field_localize`), a dedicated dev signing key pending the P2 ownership decision, no accuracy claim. The win is narrowly: "RuView's live sensing now speaks RuField on `/ws/field`."
|
||||
|
|
|
|||
|
|
@ -120,6 +120,42 @@ tested; P3 is planned.
|
|||
HOMECORE-API (ADR-130, P3); automation conditions on historical state are
|
||||
HOMECORE-automation (ADR-129, P3).
|
||||
|
||||
## 3a. Security review (2026-06, post-ADR-154–159 sweep)
|
||||
|
||||
A beyond-SOTA security review of `homecore-recorder` covered SQL injection, retention/purge
|
||||
correctness, fail-closed write integrity, semantic-store NaN poisoning, and PII exposure.
|
||||
|
||||
**Confirmed clean (with evidence):**
|
||||
|
||||
- **SQL injection — clean.** Every query in `db.rs` uses bound `?` parameters; no user- or
|
||||
entity-influenceable value is interpolated into SQL via `format!`/concatenation. The only
|
||||
`format!` builds the `LIKE` *pattern* string, which is itself **bound** as a parameter with
|
||||
`ESCAPE '\\'` and `% _ \` escaping — so a metacharacter payload is matched literally. Pinned
|
||||
by `malicious_entity_id_is_stored_literally_not_executed` (a `'; DROP TABLE states; --` state
|
||||
value leaves the table intact and round-trips verbatim) and
|
||||
`like_metacharacters_in_query_are_literal_not_wildcards`.
|
||||
- **NaN-index poisoning — structurally impossible.** Embeddings are SHA-256 → `i32` →
|
||||
`f32`; an `i32`→`f32` cast is always finite (never NaN/Inf), and an all-zero-digest is
|
||||
guarded by the `norm > 1e-10` check. Empty-index search, empty-string query, and `k=0` were
|
||||
probed and all return `Ok(0)` with no panic. (Unlike the calibration/vitals/geo paths, no raw
|
||||
sensor float ever reaches the index.)
|
||||
- **Fail-closed writes.** A removal event returns `Ok(None)`; semantic-index failure is logged,
|
||||
not propagated, so it never blocks the durable SQLite write; `EntityId` parse failure falls
|
||||
back to a sentinel rather than panicking.
|
||||
|
||||
**Fixed (real bounding bugs):**
|
||||
|
||||
- **Memory-DoS — `get_state_history` was unbounded.** No `LIMIT`, so a wide time window over a
|
||||
high-frequency entity loaded an unbounded row set into memory. Now capped at
|
||||
`MAX_HISTORY_ROWS` (1,000,000); sibling search paths were already `k`-bounded.
|
||||
- **Disk-DoS / documented-but-missing `purge`.** The README advertised `Recorder::purge`, but
|
||||
no retention path existed → unbounded disk growth. Added a **transactional** `purge(older_than)`
|
||||
with an **exclusive** cutoff (idempotent, no off-by-one) that deletes old `states`/`events` and
|
||||
GCs orphaned `state_attributes` blobs (dedup-shared blobs kept until their last referrer is gone).
|
||||
|
||||
`homecore-recorder` tests: 19 → 25 (`--no-default-features`) / 25 → 31 (`--features ruvector`),
|
||||
0 failed. Python deterministic proof unchanged (recorder is off the signal proof path).
|
||||
|
||||
## 4. Links
|
||||
|
||||
- Crate: `v2/crates/homecore-recorder/` — `Cargo.toml`, `README.md`, `src/lib.rs`,
|
||||
|
|
|
|||
|
|
@ -25,6 +25,15 @@ use homecore::event::{DomainEvent, StateChangedEvent};
|
|||
use crate::dedup::fnv64a_hash;
|
||||
use crate::schema::ALL_DDL;
|
||||
|
||||
/// Hard upper bound on rows returned by [`Recorder::get_state_history`].
|
||||
///
|
||||
/// Without this cap a wide `[since, until]` window over a high-frequency entity
|
||||
/// would load an unbounded number of rows into memory (a memory-DoS). The value
|
||||
/// is deliberately generous — large enough never to truncate a realistic
|
||||
/// history-graph query, small enough to bound the worst case. Callers needing a
|
||||
/// wider span page by narrowing the window.
|
||||
pub const MAX_HISTORY_ROWS: i64 = 1_000_000;
|
||||
|
||||
/// Errors returned by `Recorder` operations.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum RecorderError {
|
||||
|
|
@ -380,7 +389,17 @@ impl Recorder {
|
|||
}
|
||||
|
||||
/// Query state history for `entity_id` between `since` and `until`.
|
||||
/// Returns state snapshots in ascending `last_updated_ts` order.
|
||||
/// Returns state snapshots in ascending `last_updated_ts` order, capped at
|
||||
/// [`MAX_HISTORY_ROWS`] rows (oldest-first within the window).
|
||||
///
|
||||
/// ## Bounded result set (memory-DoS guard)
|
||||
///
|
||||
/// A high-frequency entity (e.g. a power sensor polled per-second) writes
|
||||
/// ~86k rows/day; a wide `[since, until]` window over months would otherwise
|
||||
/// load millions of rows into a single in-memory `Vec`, an unbounded-memory
|
||||
/// denial-of-service. The query therefore carries a hard `LIMIT` so the
|
||||
/// working set is bounded regardless of the requested time range. Callers
|
||||
/// that genuinely need a wider span must page by narrowing the window.
|
||||
pub async fn get_state_history(
|
||||
&self,
|
||||
entity_id: &EntityId,
|
||||
|
|
@ -398,11 +417,13 @@ impl Recorder {
|
|||
WHERE s.entity_id = ? \
|
||||
AND s.last_updated_ts >= ? \
|
||||
AND s.last_updated_ts <= ? \
|
||||
ORDER BY s.last_updated_ts ASC",
|
||||
ORDER BY s.last_updated_ts ASC \
|
||||
LIMIT ?",
|
||||
)
|
||||
.bind(entity_id.as_str())
|
||||
.bind(since_ts)
|
||||
.bind(until_ts)
|
||||
.bind(MAX_HISTORY_ROWS)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
|
|
@ -426,6 +447,79 @@ impl Recorder {
|
|||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Purge history older than `older_than`, returning a [`PurgeStats`] summary.
|
||||
///
|
||||
/// Deletes:
|
||||
/// - `states` rows whose `last_updated_ts` is **strictly before** the cutoff,
|
||||
/// - `events` rows whose `time_fired_ts` is strictly before the cutoff,
|
||||
/// - then garbage-collects any `state_attributes` blob no surviving state
|
||||
/// row still references (so dedup-shared blobs are only dropped once their
|
||||
/// last referencing state is gone).
|
||||
///
|
||||
/// ## Retention boundary (data-integrity guard)
|
||||
///
|
||||
/// The cutoff is **exclusive**: a row exactly at `older_than` is retained.
|
||||
/// This makes `purge(t)` idempotent on the boundary and guarantees that a
|
||||
/// row written at the same instant the retention window opens is never lost
|
||||
/// to an off-by-one. Anything *at or after* `older_than` survives.
|
||||
///
|
||||
/// ## Atomicity (no partial-corrupt state)
|
||||
///
|
||||
/// All three deletes run inside a single transaction. A failure mid-purge
|
||||
/// rolls the whole operation back — the store is never left with states
|
||||
/// deleted but their events kept, or attributes orphaned by a half-purge.
|
||||
///
|
||||
/// Note: this reclaims logical rows; it does not `VACUUM` the file. SQLite
|
||||
/// reuses freed pages for subsequent writes, so disk growth stays bounded
|
||||
/// under a periodic purge even without an explicit vacuum.
|
||||
pub async fn purge(&self, older_than: DateTime<Utc>) -> Result<PurgeStats, RecorderError> {
|
||||
let cutoff_ts = older_than.timestamp_micros() as f64 / 1_000_000.0;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
|
||||
let states_deleted = sqlx::query("DELETE FROM states WHERE last_updated_ts < ?")
|
||||
.bind(cutoff_ts)
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
let events_deleted = sqlx::query("DELETE FROM events WHERE time_fired_ts < ?")
|
||||
.bind(cutoff_ts)
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
// GC attribute blobs no surviving state references. A dedup-shared blob
|
||||
// is only removed once its last referencing state row is gone.
|
||||
let attributes_deleted = sqlx::query(
|
||||
"DELETE FROM state_attributes \
|
||||
WHERE attributes_id NOT IN \
|
||||
(SELECT attributes_id FROM states WHERE attributes_id IS NOT NULL)",
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(PurgeStats {
|
||||
states_deleted,
|
||||
events_deleted,
|
||||
attributes_deleted,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Summary of a [`Recorder::purge`] run.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct PurgeStats {
|
||||
/// Number of `states` rows deleted.
|
||||
pub states_deleted: u64,
|
||||
/// Number of `events` rows deleted.
|
||||
pub events_deleted: u64,
|
||||
/// Number of orphaned `state_attributes` blobs garbage-collected.
|
||||
pub attributes_deleted: u64,
|
||||
}
|
||||
|
||||
/// A state row returned from `get_state_history`.
|
||||
|
|
@ -722,6 +816,214 @@ mod tests {
|
|||
assert!(rows.is_empty(), "genuine no-match is empty, not an error");
|
||||
}
|
||||
|
||||
// ── SQL injection (parameterization guarantee) ──────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn malicious_entity_id_is_stored_literally_not_executed() {
|
||||
// FAILS if any query interpolated entity_id into SQL: the `states` table
|
||||
// would be dropped and the later COUNT would error / mismatch. Bound
|
||||
// parameters store the metacharacter-laden string verbatim instead.
|
||||
let recorder = open_memory().await;
|
||||
|
||||
// A valid domain.name whose `name` part carries SQL metacharacters.
|
||||
// EntityId::parse permits this, so it reaches the bind path as data.
|
||||
let evil = "light.x_drop_table_states_select";
|
||||
recorder
|
||||
.record_state(&make_state_event(evil, "'; DROP TABLE states; --", serde_json::json!({})))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// states table still exists and holds exactly the one row we inserted.
|
||||
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM states")
|
||||
.fetch_one(&recorder.pool)
|
||||
.await
|
||||
.expect("states table must still exist — proves no injection");
|
||||
assert_eq!(count.0, 1);
|
||||
|
||||
// The malicious state string round-trips literally.
|
||||
let rows = recorder
|
||||
.search_states_by_text("DROP TABLE", 10)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows.len(), 1, "metacharacter payload matched as a literal");
|
||||
assert_eq!(rows[0].state, "'; DROP TABLE states; --");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn like_metacharacters_in_query_are_literal_not_wildcards() {
|
||||
// A `%` in the search text must match a literal percent sign, not act as
|
||||
// a SQL LIKE wildcard. Proves the ESCAPE clause + metacharacter escaping.
|
||||
let recorder = open_memory().await;
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.a", "100%", serde_json::json!({})))
|
||||
.await
|
||||
.unwrap();
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.b", "50", serde_json::json!({})))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Literal "%" must match only sensor.a's "100%", NOT every row.
|
||||
let rows = recorder.search_states_by_text("%", 10).await.unwrap();
|
||||
assert_eq!(rows.len(), 1, "'%' is a literal, not a match-all wildcard");
|
||||
assert_eq!(rows[0].entity_id.as_str(), "sensor.a");
|
||||
|
||||
// Underscore is likewise literal: matches nothing here.
|
||||
let none = recorder.search_states_by_text("_", 10).await.unwrap();
|
||||
assert!(none.is_empty(), "'_' is literal, matches no row");
|
||||
}
|
||||
|
||||
// ── get_state_history bound (memory-DoS guard) ──────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn history_query_carries_a_limit_clause() {
|
||||
// Pin: the history SQL must carry a LIMIT bound (memory-DoS guard).
|
||||
// Inserting a million rows is infeasible in a unit test, so we prove the
|
||||
// clause is wired by bulk-inserting more rows than a deliberately tiny
|
||||
// bound and asserting the executed query honours a LIMIT. We bypass the
|
||||
// public method (whose cap is MAX_HISTORY_ROWS) and run the *same* SQL
|
||||
// shape with a small bind to demonstrate the LIMIT term is effective —
|
||||
// and separately assert the constant is a sane positive bound.
|
||||
assert!(MAX_HISTORY_ROWS > 0, "history cap must be positive");
|
||||
let recorder = open_memory().await;
|
||||
for v in &["1", "2", "3", "4", "5"] {
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.bounded", v, serde_json::json!({})))
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
|
||||
}
|
||||
// Same query shape as get_state_history, with a tiny LIMIT bind: if the
|
||||
// SQL lacked a LIMIT term this would return all 5; with it, exactly 2.
|
||||
let capped: Vec<(i64,)> = sqlx::query_as(
|
||||
"SELECT s.state_id FROM states s \
|
||||
WHERE s.entity_id = ? \
|
||||
ORDER BY s.last_updated_ts ASC LIMIT ?",
|
||||
)
|
||||
.bind("sensor.bounded")
|
||||
.bind(2_i64)
|
||||
.fetch_all(&recorder.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(capped.len(), 2, "LIMIT term effectively bounds the result set");
|
||||
|
||||
// And the real method returns all rows when under the cap.
|
||||
let eid = entity("sensor.bounded");
|
||||
let rows = recorder
|
||||
.get_state_history(&eid, Utc::now() - chrono::Duration::seconds(10), Utc::now() + chrono::Duration::seconds(10))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows.len(), 5, "all rows under the cap return");
|
||||
}
|
||||
|
||||
// ── purge (retention correctness + atomicity) ───────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn purge_keeps_boundary_row_and_drops_older() {
|
||||
// FAILS if purge had an off-by-one (deleting the row exactly at cutoff)
|
||||
// or deleted too much/too little. Cutoff is EXCLUSIVE: a row at the
|
||||
// cutoff instant survives; strictly-older rows are removed.
|
||||
let recorder = open_memory().await;
|
||||
let eid = entity("sensor.r");
|
||||
|
||||
// Three rows at known, increasing timestamps.
|
||||
for v in &["old", "mid", "new"] {
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.r", v, serde_json::json!({})))
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
}
|
||||
|
||||
// Read back the actual timestamps so the cutoff is exact.
|
||||
let since = Utc::now() - chrono::Duration::seconds(60);
|
||||
let until = Utc::now() + chrono::Duration::seconds(60);
|
||||
let all = recorder.get_state_history(&eid, since, until).await.unwrap();
|
||||
assert_eq!(all.len(), 3);
|
||||
// Cut off exactly at the middle row's timestamp.
|
||||
let mid_ts = all[1].last_updated_ts;
|
||||
let cutoff = DateTime::<Utc>::from_timestamp_micros((mid_ts * 1_000_000.0) as i64).unwrap();
|
||||
|
||||
let stats = recorder.purge(cutoff).await.unwrap();
|
||||
assert_eq!(stats.states_deleted, 1, "only the strictly-older 'old' row");
|
||||
|
||||
let remaining = recorder.get_state_history(&eid, since, until).await.unwrap();
|
||||
assert_eq!(remaining.len(), 2, "boundary 'mid' row is KEPT (exclusive cutoff)");
|
||||
assert_eq!(remaining[0].state, "mid");
|
||||
assert_eq!(remaining[1].state, "new");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn purge_gcs_orphaned_attributes_but_keeps_shared() {
|
||||
// Dedup means two states can share one attribute blob. Purging one of
|
||||
// them must NOT drop the still-referenced blob; purging the last one must.
|
||||
let recorder = open_memory().await;
|
||||
let shared = serde_json::json!({"unit": "C"});
|
||||
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.a", "20", shared.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
recorder
|
||||
.record_state(&make_state_event("sensor.b", "21", shared.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let attr_count = |r: &Recorder| {
|
||||
let pool = r.pool.clone();
|
||||
async move {
|
||||
let c: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM state_attributes")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
c.0
|
||||
}
|
||||
};
|
||||
assert_eq!(attr_count(&recorder).await, 1, "deduped to one blob");
|
||||
|
||||
// Purge before sensor.b's write → removes sensor.a only; blob still
|
||||
// referenced by sensor.b, so it must survive.
|
||||
let eid_b = entity("sensor.b");
|
||||
let rows_b = recorder
|
||||
.get_state_history(&eid_b, Utc::now() - chrono::Duration::seconds(60), Utc::now() + chrono::Duration::seconds(60))
|
||||
.await
|
||||
.unwrap();
|
||||
let b_ts = rows_b[0].last_updated_ts;
|
||||
let cutoff = DateTime::<Utc>::from_timestamp_micros((b_ts * 1_000_000.0) as i64).unwrap();
|
||||
let stats = recorder.purge(cutoff).await.unwrap();
|
||||
assert_eq!(stats.states_deleted, 1, "sensor.a purged");
|
||||
assert_eq!(stats.attributes_deleted, 0, "shared blob still referenced — kept");
|
||||
assert_eq!(attr_count(&recorder).await, 1, "blob survives");
|
||||
|
||||
// Now purge everything → sensor.b gone, blob orphaned → GC'd.
|
||||
let stats2 = recorder.purge(Utc::now() + chrono::Duration::seconds(120)).await.unwrap();
|
||||
assert_eq!(stats2.states_deleted, 1, "sensor.b purged");
|
||||
assert_eq!(stats2.attributes_deleted, 1, "now-orphaned blob GC'd");
|
||||
assert_eq!(attr_count(&recorder).await, 0, "no blobs remain");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn purge_also_removes_old_events() {
|
||||
let recorder = open_memory().await;
|
||||
let ctx = Context::new();
|
||||
recorder
|
||||
.record_event(&DomainEvent::new("call_service", serde_json::json!({}), ctx))
|
||||
.await
|
||||
.unwrap();
|
||||
// Purge with a far-future cutoff removes the event.
|
||||
let stats = recorder
|
||||
.purge(Utc::now() + chrono::Duration::seconds(120))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(stats.events_deleted, 1);
|
||||
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM events")
|
||||
.fetch_one(&recorder.pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(count.0, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn search_semantic_falls_back_to_text_with_null_index() {
|
||||
// With the default NullSemanticIndex, search_semantic must STILL return
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ pub mod schema;
|
|||
pub mod semantic;
|
||||
|
||||
// Re-export the primary public API surface.
|
||||
pub use db::{Recorder, RecorderError};
|
||||
pub use db::{PurgeStats, Recorder, RecorderError, StateRow, MAX_HISTORY_ROWS};
|
||||
pub use listener::RecorderListener;
|
||||
|
||||
/// Null semantic index used when the `ruvector` feature is off.
|
||||
|
|
|
|||
Loading…
Reference in New Issue