feat(rvcsi): rvcsi-adapter-file (.rvcsi capture/replay) + rvcsi-ruvector (RF memory)

- rvcsi-adapter-file (ADR-095 FR1/FR10, D9): the `.rvcsi` JSONL capture format
  (CaptureHeader line + one CsiFrame per line), FileRecorder, FileReplayAdapter
  (a CsiSource — deterministic replay, preserves timestamps/ordering/validation
  verbatim, carries an unenforced replay_speed for the daemon/CLI), read_all().
  20 unit tests + 1 doctest.
- rvcsi-ruvector (ADR-095 FR8, D8) — standin for the production RuVector binding:
  deterministic embeddings (window_embedding = 32 resampled mean_amplitude bins +
  32 resampled phase_variance bins + [motion_energy, presence_score, quality_score,
  ln1p(frame_count)], L2-normalized, dim 68; event_embedding = 10-wide kind
  one-hot + confidence + ln1p(evidence count), dim 12), cosine_similarity, the
  RfMemoryStore trait + value objects (EmbeddingId/RecordKind/SimilarHit/
  DriftReport), and InMemoryRfMemory + JsonlRfMemory (file-backed append log,
  identical query semantics, latest-baseline-per-room-wins on reopen).
  20 unit tests + 1 doctest.

All rvcsi crates build and test together: core 29, dsp 28, events 18,
adapter-file 20(+1), adapter-nexmon 9, ruvector 20(+1) — 124 unit + 2 doc tests,
0 failures. forbid(unsafe_code) everywhere except rvcsi-adapter-nexmon (FFI).

https://claude.ai/code/session_01CdYAPvRTjcch6YrYf42n1z
This commit is contained in:
Claude 2026-05-13 00:03:27 +00:00
parent 46f701bca8
commit 6432dfbd2d
No known key found for this signature in database
10 changed files with 2084 additions and 10 deletions

1
v2/Cargo.lock generated
View File

@ -6015,7 +6015,6 @@ name = "rvcsi-events"
version = "0.3.0"
dependencies = [
"rvcsi-core",
"rvcsi-dsp",
"serde",
"serde_json",
"thiserror 1.0.69",

View File

@ -0,0 +1,144 @@
//! The `.rvcsi` capture container format (ADR-095 FR1/FR10, D9).
//!
//! A `.rvcsi` file is plain [JSONL]: the **first line** is a
//! [`CaptureHeader`] object describing the session; every **subsequent line**
//! is one [`rvcsi_core::CsiFrame`] serialized as JSON. This keeps the format
//! simple, deterministic, append-friendly and trivially debuggable with `head`
//! / `jq`.
//!
//! [JSONL]: https://jsonlines.org/
use rvcsi_core::{AdapterProfile, SessionId, SourceId, ValidationPolicy};
use serde::{Deserialize, Serialize};
/// Current `.rvcsi` capture format version. Written into every header and
/// checked on read.
pub const CAPTURE_VERSION: u32 = 1;
/// Header object — the first line of every `.rvcsi` capture file.
///
/// It records enough context to replay the session faithfully: the originating
/// session/source ids, the source's [`AdapterProfile`], the
/// [`ValidationPolicy`] that was in force, the calibration version (if any),
/// and an opaque `runtime_config_json` blob the caller may use for whatever it
/// likes (defaults to `"{}"`).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CaptureHeader {
/// Capture format version (always [`CAPTURE_VERSION`] when written).
pub rvcsi_capture_version: u32,
/// Session this capture belongs to.
pub session_id: SessionId,
/// Source the frames were captured from.
pub source_id: SourceId,
/// Capability descriptor of the source at capture time.
pub adapter_profile: AdapterProfile,
/// Validation policy that was in force during capture.
pub validation_policy: ValidationPolicy,
/// Calibration version frames were processed against, if any.
pub calibration_version: Option<String>,
/// Opaque caller-supplied runtime config (JSON; default `"{}"`).
pub runtime_config_json: String,
/// Wall-clock creation time, nanoseconds since the Unix epoch (`0` if unknown).
pub created_unix_ns: u64,
}
impl CaptureHeader {
/// Build a header for `session_id` / `source_id` / `adapter_profile` with
/// sensible defaults: version [`CAPTURE_VERSION`], [`ValidationPolicy::default`],
/// no calibration version, `runtime_config_json == "{}"`, and
/// `created_unix_ns` taken from the system clock (or `0` if it is unavailable
/// or before the epoch).
pub fn new(session_id: SessionId, source_id: SourceId, adapter_profile: AdapterProfile) -> Self {
CaptureHeader {
rvcsi_capture_version: CAPTURE_VERSION,
session_id,
source_id,
adapter_profile,
validation_policy: ValidationPolicy::default(),
calibration_version: None,
runtime_config_json: "{}".to_string(),
created_unix_ns: now_unix_ns(),
}
}
/// Builder: override the validation policy.
pub fn with_validation_policy(mut self, policy: ValidationPolicy) -> Self {
self.validation_policy = policy;
self
}
/// Builder: set the calibration version.
pub fn with_calibration_version(mut self, version: impl Into<String>) -> Self {
self.calibration_version = Some(version.into());
self
}
/// Builder: set the opaque runtime config blob.
pub fn with_runtime_config_json(mut self, json: impl Into<String>) -> Self {
self.runtime_config_json = json.into();
self
}
/// Builder: pin `created_unix_ns` (useful for deterministic tests).
pub fn with_created_unix_ns(mut self, ns: u64) -> Self {
self.created_unix_ns = ns;
self
}
}
/// Best-effort "nanoseconds since the Unix epoch" using the system clock;
/// returns `0` when the clock is unavailable or set before the epoch.
fn now_unix_ns() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::AdapterKind;
#[test]
fn header_defaults() {
let h = CaptureHeader::new(
SessionId(7),
SourceId::from("file:lab.rvcsi"),
AdapterProfile::offline(AdapterKind::File),
);
assert_eq!(h.rvcsi_capture_version, CAPTURE_VERSION);
assert_eq!(h.runtime_config_json, "{}");
assert!(h.calibration_version.is_none());
assert_eq!(h.validation_policy, ValidationPolicy::default());
}
#[test]
fn header_builders() {
let h = CaptureHeader::new(
SessionId(1),
SourceId::from("s"),
AdapterProfile::offline(AdapterKind::File),
)
.with_calibration_version("room@v2")
.with_runtime_config_json(r#"{"foo":1}"#)
.with_created_unix_ns(42);
assert_eq!(h.calibration_version.as_deref(), Some("room@v2"));
assert_eq!(h.runtime_config_json, r#"{"foo":1}"#);
assert_eq!(h.created_unix_ns, 42);
}
#[test]
fn header_json_roundtrips() {
let h = CaptureHeader::new(
SessionId(3),
SourceId::from("esp32"),
AdapterProfile::esp32_default(),
)
.with_created_unix_ns(123);
let json = serde_json::to_string(&h).unwrap();
let back: CaptureHeader = serde_json::from_str(&json).unwrap();
assert_eq!(h, back);
}
}

View File

@ -1,7 +1,342 @@
//! # rvCSI file/replay adapter (skeleton — implemented by the adapters swarm agent)
//! # rvCSI file/replay adapter
//!
//! Records and replays `.rvcsi` capture sessions deterministically (ADR-095 D9).
#![forbid(unsafe_code)]
//! The `.rvcsi` capture container, its [`FileRecorder`], and the
//! [`FileReplayAdapter`] [`CsiSource`](rvcsi_core::CsiSource) (ADR-095 FR1/FR10,
//! D9).
//!
//! A `.rvcsi` file is plain [JSONL]: the first line is a [`CaptureHeader`]
//! describing the session; every subsequent line is one
//! [`rvcsi_core::CsiFrame`] serialized as compact JSON. The format is simple,
//! deterministic, append-friendly and trivially inspectable with `head` / `jq`.
//!
//! Typical use:
//!
//! ```no_run
//! use rvcsi_adapter_file::{CaptureHeader, FileRecorder, FileReplayAdapter};
//! use rvcsi_core::{AdapterKind, AdapterProfile, CsiSource, SessionId, SourceId};
//!
//! # fn demo() -> rvcsi_core::Result<()> {
//! let header = CaptureHeader::new(
//! SessionId(1),
//! SourceId::from("file:lab.rvcsi"),
//! AdapterProfile::offline(AdapterKind::File),
//! );
//! let mut rec = FileRecorder::create("lab.rvcsi", &header)?;
//! // rec.write_frame(&frame)?; ...
//! rec.finish()?;
//!
//! let mut replay = FileReplayAdapter::open("lab.rvcsi")?;
//! while let Some(frame) = replay.next_frame()? {
//! // hand `frame` downstream — its ValidationStatus is preserved as recorded
//! let _ = frame;
//! }
//! # Ok(())
//! # }
//! ```
//!
//! [JSONL]: https://jsonlines.org/
/// Placeholder so the crate compiles before the agent fills it in.
pub fn __rvcsi_adapter_file_placeholder() {}
#![forbid(unsafe_code)]
#![warn(missing_docs)]
mod format;
mod recorder;
mod replay;
pub use format::{CaptureHeader, CAPTURE_VERSION};
pub use recorder::FileRecorder;
pub use replay::FileReplayAdapter;
use std::path::Path;
use rvcsi_core::{CsiFrame, Result};
/// Read an entire `.rvcsi` capture into memory: its [`CaptureHeader`] and every
/// [`CsiFrame`] it contains, in recording order.
///
/// This is a convenience wrapper over [`FileReplayAdapter`]; for large captures
/// or streaming use, prefer iterating [`FileReplayAdapter`] directly. Errors are
/// the same as [`FileReplayAdapter::open`] / [`FileReplayAdapter::next_frame`]:
/// an [`rvcsi_core::RvcsiError::Io`] for a missing/unreadable file, an
/// [`rvcsi_core::RvcsiError::Parse`] (offset `0`) for a bad header, or an
/// [`rvcsi_core::RvcsiError::Parse`] carrying the 1-based line number for a
/// malformed frame line.
pub fn read_all(path: impl AsRef<Path>) -> Result<(CaptureHeader, Vec<CsiFrame>)> {
use rvcsi_core::CsiSource;
let mut adapter = FileReplayAdapter::open(path)?;
let header = adapter.header().clone();
let mut frames = Vec::new();
while let Some(frame) = adapter.next_frame()? {
frames.push(frame);
}
Ok((header, frames))
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::{
AdapterKind, AdapterProfile, CsiSource, FrameId, RvcsiError, SessionId, SourceId,
ValidationStatus,
};
use std::fs::File;
use std::io::{Read, Write};
fn header() -> CaptureHeader {
CaptureHeader::new(
SessionId(1),
SourceId::from("it-test"),
AdapterProfile::offline(AdapterKind::File),
)
.with_created_unix_ns(0)
.with_calibration_version("room@v1")
.with_runtime_config_json(r#"{"window_ms":500}"#)
}
/// A small varied set of frames: two accepted (quality 0.9), two degraded
/// with reasons, one recovered — varying timestamps / channels / subcarrier
/// counts.
fn sample_frames() -> Vec<CsiFrame> {
let mut frames = Vec::new();
let mut f0 = CsiFrame::from_iq(
FrameId(0),
SessionId(1),
SourceId::from("it-test"),
AdapterKind::File,
1_000,
1,
20,
vec![1.0, 2.0, 3.0, 4.0],
vec![0.5, 0.5, 0.5, 0.5],
)
.with_rssi(-55);
f0.validation = ValidationStatus::Accepted;
f0.quality_score = 0.9;
frames.push(f0);
let mut f1 = CsiFrame::from_iq(
FrameId(1),
SessionId(1),
SourceId::from("it-test"),
AdapterKind::File,
2_000,
6,
40,
vec![0.1; 8],
vec![0.2; 8],
);
f1.validation = ValidationStatus::Degraded;
f1.quality_score = 0.4;
f1.quality_reasons = vec!["missing rssi".to_string(), "low snr".to_string()];
frames.push(f1);
let mut f2 = CsiFrame::from_iq(
FrameId(2),
SessionId(1),
SourceId::from("it-test"),
AdapterKind::File,
3_000,
11,
20,
vec![5.0, 6.0],
vec![1.0, -1.0],
)
.with_rssi(-70)
.with_noise_floor(-95);
f2.validation = ValidationStatus::Accepted;
f2.quality_score = 0.9;
frames.push(f2);
let mut f3 = CsiFrame::from_iq(
FrameId(3),
SessionId(1),
SourceId::from("it-test"),
AdapterKind::File,
2_500, // deliberately out of order — replay preserves it verbatim
6,
20,
vec![0.0; 3],
vec![0.0; 3],
);
f3.validation = ValidationStatus::Recovered;
f3.quality_score = 0.3;
frames.push(f3);
let mut f4 = CsiFrame::from_iq(
FrameId(4),
SessionId(1),
SourceId::from("it-test"),
AdapterKind::File,
4_000,
36,
80,
vec![2.0; 6],
vec![0.0; 6],
);
f4.validation = ValidationStatus::Degraded;
f4.quality_score = 0.5;
f4.quality_reasons = vec!["amplitude spike".to_string()];
frames.push(f4);
frames
}
#[test]
fn record_then_replay_roundtrips_exactly() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
let frames = sample_frames();
let mut rec = FileRecorder::create(tmp.path(), &header).unwrap();
for f in &frames {
rec.write_frame(f).unwrap();
}
assert_eq!(rec.frames_written(), frames.len() as u64);
rec.finish().unwrap();
let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap();
assert_eq!(adapter.header(), &header);
let mut got = Vec::new();
while let Some(f) = adapter.next_frame().unwrap() {
got.push(f);
}
assert_eq!(got, frames);
assert_eq!(adapter.health().frames_delivered, frames.len() as u64);
assert!(!adapter.health().connected);
}
#[test]
fn re_serializing_replayed_frames_is_byte_identical() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
let frames = sample_frames();
let mut rec = FileRecorder::create(tmp.path(), &header).unwrap();
for f in &frames {
rec.write_frame(f).unwrap();
}
rec.finish().unwrap();
let mut original = String::new();
File::open(tmp.path()).unwrap().read_to_string(&mut original).unwrap();
// Round-trip the whole capture and re-emit it; bytes must match.
let (h, fs) = read_all(tmp.path()).unwrap();
let tmp2 = tempfile::NamedTempFile::new().unwrap();
let mut rec2 = FileRecorder::create(tmp2.path(), &h).unwrap();
for f in &fs {
rec2.write_frame(f).unwrap();
}
rec2.finish().unwrap();
let mut reemitted = String::new();
File::open(tmp2.path()).unwrap().read_to_string(&mut reemitted).unwrap();
assert_eq!(original, reemitted);
}
#[test]
fn read_all_matches_replay() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
let frames = sample_frames();
let mut rec = FileRecorder::create(tmp.path(), &header).unwrap();
for f in &frames {
rec.write_frame(f).unwrap();
}
rec.finish().unwrap();
let (h, fs) = read_all(tmp.path()).unwrap();
assert_eq!(h, header);
assert_eq!(fs, frames);
}
#[test]
fn header_only_capture_has_no_frames() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
FileRecorder::create(tmp.path(), &header).unwrap().finish().unwrap();
let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap();
assert!(adapter.next_frame().unwrap().is_none());
let (h, fs) = read_all(tmp.path()).unwrap();
assert_eq!(h, header);
assert!(fs.is_empty());
}
#[test]
fn bad_header_line_is_parse_error_at_offset_zero() {
let tmp = tempfile::NamedTempFile::new().unwrap();
{
let mut f = File::create(tmp.path()).unwrap();
f.write_all(b"not json\n").unwrap();
}
match FileReplayAdapter::open(tmp.path()) {
Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 0),
other => panic!("expected Parse at offset 0, got {other:?}"),
}
match read_all(tmp.path()) {
Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 0),
other => panic!("expected Parse at offset 0, got {other:?}"),
}
}
#[test]
fn garbage_frame_after_good_frames_reports_line_number() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
{
let mut f = File::create(tmp.path()).unwrap();
serde_json::to_writer(&mut f, &header).unwrap();
f.write_all(b"\n").unwrap();
// lines 2 + 3: good frames
let frames = sample_frames();
serde_json::to_writer(&mut f, &frames[0]).unwrap();
f.write_all(b"\n").unwrap();
serde_json::to_writer(&mut f, &frames[1]).unwrap();
f.write_all(b"\n").unwrap();
// line 4: garbage
f.write_all(b"{ not a frame }\n").unwrap();
}
let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap();
assert!(adapter.next_frame().unwrap().is_some()); // line 2
assert!(adapter.next_frame().unwrap().is_some()); // line 3
match adapter.next_frame() {
Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 4),
other => panic!("expected Parse at line 4, got {other:?}"),
}
}
#[test]
fn nonexistent_path_is_io_error() {
match FileReplayAdapter::open("/no/such/file/at/all.rvcsi") {
Err(RvcsiError::Io(_)) => {}
other => panic!("expected Io error, got {other:?}"),
}
match read_all("/no/such/file/at/all.rvcsi") {
Err(RvcsiError::Io(_)) => {}
other => panic!("expected Io error, got {other:?}"),
}
}
#[test]
fn counters_are_consistent() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = header();
let frames = sample_frames();
let mut rec = FileRecorder::create(tmp.path(), &header).unwrap();
for (i, f) in frames.iter().enumerate() {
rec.write_frame(f).unwrap();
assert_eq!(rec.frames_written(), (i + 1) as u64);
}
rec.finish().unwrap();
let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap();
let mut n = 0u64;
while adapter.next_frame().unwrap().is_some() {
n += 1;
assert_eq!(adapter.health().frames_delivered, n);
}
assert_eq!(n, frames.len() as u64);
}
}

View File

@ -0,0 +1,113 @@
//! [`FileRecorder`] — writes a `.rvcsi` capture: a header line followed by one
//! JSON line per [`CsiFrame`].
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
use rvcsi_core::{CsiFrame, Result};
use crate::format::CaptureHeader;
/// Append-only writer for a `.rvcsi` capture file.
///
/// Create one with [`FileRecorder::create`] (which writes the header line),
/// push frames with [`FileRecorder::write_frame`], and call
/// [`FileRecorder::finish`] (or just drop it after [`FileRecorder::flush`]) to
/// be sure everything reached disk.
pub struct FileRecorder {
writer: BufWriter<File>,
frames_written: u64,
}
impl FileRecorder {
/// Create `path` (truncating any existing file) and write `header` as the
/// first line.
pub fn create(path: impl AsRef<Path>, header: &CaptureHeader) -> Result<Self> {
let file = File::create(path.as_ref())?;
let mut writer = BufWriter::new(file);
write_json_line(&mut writer, header)?;
Ok(FileRecorder {
writer,
frames_written: 0,
})
}
/// Append one frame as a JSON line.
pub fn write_frame(&mut self, frame: &CsiFrame) -> Result<()> {
write_json_line(&mut self.writer, frame)?;
self.frames_written += 1;
Ok(())
}
/// Flush buffered bytes to the underlying file.
pub fn flush(&mut self) -> Result<()> {
self.writer.flush()?;
Ok(())
}
/// Number of frames written so far (the header line is not counted).
pub fn frames_written(&self) -> u64 {
self.frames_written
}
/// Flush and close the file, consuming the recorder.
pub fn finish(mut self) -> Result<()> {
self.flush()
}
}
/// Serialize `value` as a single JSON line (no embedded newlines — `serde_json`
/// compact form never produces them) followed by `\n`.
fn write_json_line<W: Write, T: serde::Serialize>(writer: &mut W, value: &T) -> Result<()> {
serde_json::to_writer(&mut *writer, value)?;
writer.write_all(b"\n")?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::{AdapterKind, AdapterProfile, FrameId, SessionId, SourceId};
use std::io::Read;
fn frame(id: u64, ts: u64) -> CsiFrame {
CsiFrame::from_iq(
FrameId(id),
SessionId(1),
SourceId::from("rec-test"),
AdapterKind::File,
ts,
6,
20,
vec![1.0, 2.0, 3.0],
vec![0.5, 0.5, 0.5],
)
}
#[test]
fn writes_header_then_frames_and_counts() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = CaptureHeader::new(
SessionId(1),
SourceId::from("rec-test"),
AdapterProfile::offline(AdapterKind::File),
)
.with_created_unix_ns(0);
let mut rec = FileRecorder::create(tmp.path(), &header).unwrap();
assert_eq!(rec.frames_written(), 0);
rec.write_frame(&frame(0, 100)).unwrap();
rec.write_frame(&frame(1, 200)).unwrap();
assert_eq!(rec.frames_written(), 2);
rec.finish().unwrap();
let mut contents = String::new();
File::open(tmp.path()).unwrap().read_to_string(&mut contents).unwrap();
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 3);
let parsed_header: CaptureHeader = serde_json::from_str(lines[0]).unwrap();
assert_eq!(parsed_header, header);
let f0: CsiFrame = serde_json::from_str(lines[1]).unwrap();
assert_eq!(f0, frame(0, 100));
}
}

View File

@ -0,0 +1,304 @@
//! [`FileReplayAdapter`] — a [`CsiSource`] that replays a `.rvcsi` capture
//! file, frame by frame, exactly as it was recorded.
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use rvcsi_core::{
AdapterProfile, CsiFrame, CsiSource, Result, RvcsiError, SessionId, SourceHealth, SourceId,
};
use crate::format::{CaptureHeader, CAPTURE_VERSION};
/// Deterministic replay source backed by a `.rvcsi` capture file.
///
/// The header is parsed eagerly on [`FileReplayAdapter::open`]; frames are
/// parsed lazily, one line at a time, on each [`CsiSource::next_frame`] call.
/// Timestamps, ordering and per-frame [`rvcsi_core::ValidationStatus`] are
/// preserved verbatim — replay does not re-validate or re-order anything, it
/// only deserializes what was stored.
///
/// `replay_speed` is carried for the daemon/CLI to pace playback with; the
/// adapter itself never sleeps.
#[derive(Debug)]
pub struct FileReplayAdapter {
header: CaptureHeader,
profile: AdapterProfile,
source_id: SourceId,
reader: BufReader<File>,
/// 1-based line number of the line a subsequent `next_frame` will read.
next_line: usize,
frames_delivered: u64,
at_eof: bool,
replay_speed: f32,
last_status: Option<String>,
}
impl FileReplayAdapter {
/// Open `path` for replay at real-time speed (`replay_speed == 1.0`).
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Self::open_with_speed(path, 1.0)
}
/// Open `path` for replay, carrying `replay_speed` for downstream pacing.
pub fn open_with_speed(path: impl AsRef<Path>, replay_speed: f32) -> Result<Self> {
let file = File::open(path.as_ref())?;
let mut reader = BufReader::new(file);
let mut first = String::new();
let n = reader.read_line(&mut first)?;
if n == 0 {
return Err(RvcsiError::parse(0, "empty capture file: missing header line"));
}
let header: CaptureHeader = serde_json::from_str(first.trim_end_matches(['\n', '\r']))
.map_err(|e| RvcsiError::parse(0, format!("invalid .rvcsi header line: {e}")))?;
if header.rvcsi_capture_version != CAPTURE_VERSION {
return Err(RvcsiError::parse(
0,
format!(
"unsupported .rvcsi capture version {} (this build supports {})",
header.rvcsi_capture_version, CAPTURE_VERSION
),
));
}
let profile = header.adapter_profile.clone();
let source_id = header.source_id.clone();
Ok(FileReplayAdapter {
header,
profile,
source_id,
reader,
next_line: 2,
frames_delivered: 0,
at_eof: false,
replay_speed,
last_status: None,
})
}
/// The capture header parsed from the file.
pub fn header(&self) -> &CaptureHeader {
&self.header
}
/// Playback speed multiplier carried for the daemon/CLI (the adapter itself
/// does not sleep).
pub fn replay_speed(&self) -> f32 {
self.replay_speed
}
/// Whether the underlying file has been fully consumed.
pub fn is_at_eof(&self) -> bool {
self.at_eof
}
}
impl CsiSource for FileReplayAdapter {
fn profile(&self) -> &AdapterProfile {
&self.profile
}
fn session_id(&self) -> SessionId {
self.header.session_id
}
fn source_id(&self) -> &SourceId {
&self.source_id
}
fn next_frame(&mut self) -> core::result::Result<Option<CsiFrame>, RvcsiError> {
if self.at_eof {
return Ok(None);
}
loop {
let mut line = String::new();
let read = self.reader.read_line(&mut line)?;
if read == 0 {
self.at_eof = true;
return Ok(None);
}
let line_no = self.next_line;
self.next_line += 1;
let trimmed = line.trim_end_matches(['\n', '\r']);
if trimmed.is_empty() {
// Tolerate blank lines (e.g. a trailing newline at EOF).
continue;
}
let frame: CsiFrame = serde_json::from_str(trimmed).map_err(|e| {
self.last_status = Some(format!("parse error at line {line_no}"));
RvcsiError::parse(line_no, format!("invalid frame line {line_no}: {e}"))
})?;
self.frames_delivered += 1;
return Ok(Some(frame));
}
}
fn health(&self) -> SourceHealth {
SourceHealth {
connected: !self.at_eof,
frames_delivered: self.frames_delivered,
frames_rejected: 0,
status: self.last_status.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::recorder::FileRecorder;
use rvcsi_core::{AdapterKind, FrameId, ValidationStatus};
use std::io::Write;
fn frame(id: u64, ts: u64) -> CsiFrame {
CsiFrame::from_iq(
FrameId(id),
SessionId(1),
SourceId::from("rep-test"),
AdapterKind::File,
ts,
6,
20,
vec![1.0, 2.0],
vec![0.0, 1.0],
)
}
fn write_capture(path: &Path, frames: &[CsiFrame]) -> CaptureHeader {
let header = CaptureHeader::new(
SessionId(1),
SourceId::from("rep-test"),
AdapterProfile::offline(AdapterKind::File),
)
.with_created_unix_ns(0);
let mut rec = FileRecorder::create(path, &header).unwrap();
for f in frames {
rec.write_frame(f).unwrap();
}
rec.finish().unwrap();
header
}
#[test]
fn open_speed_default_is_one() {
let tmp = tempfile::NamedTempFile::new().unwrap();
write_capture(tmp.path(), &[]);
let a = FileReplayAdapter::open(tmp.path()).unwrap();
assert_eq!(a.replay_speed(), 1.0);
let b = FileReplayAdapter::open_with_speed(tmp.path(), 4.0).unwrap();
assert_eq!(b.replay_speed(), 4.0);
}
#[test]
fn replays_frames_in_order() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let frames = vec![frame(0, 10), frame(1, 20), frame(2, 30)];
let header = write_capture(tmp.path(), &frames);
let mut a = FileReplayAdapter::open(tmp.path()).unwrap();
assert_eq!(a.header(), &header);
assert_eq!(a.session_id(), SessionId(1));
assert_eq!(a.source_id(), &SourceId::from("rep-test"));
let mut got = Vec::new();
while let Some(f) = a.next_frame().unwrap() {
got.push(f);
}
assert_eq!(got, frames);
assert!(a.is_at_eof());
assert!(!a.health().connected);
assert_eq!(a.health().frames_delivered, 3);
// Repeated calls after EOF stay at None.
assert!(a.next_frame().unwrap().is_none());
}
#[test]
fn header_only_file_yields_no_frames() {
let tmp = tempfile::NamedTempFile::new().unwrap();
write_capture(tmp.path(), &[]);
let mut a = FileReplayAdapter::open(tmp.path()).unwrap();
assert!(a.next_frame().unwrap().is_none());
assert_eq!(a.health().frames_delivered, 0);
}
#[test]
fn validation_status_preserved() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let mut f = frame(0, 1);
f.validation = ValidationStatus::Degraded;
f.quality_score = 0.42;
f.quality_reasons = vec!["missing rssi".to_string()];
write_capture(tmp.path(), &[f.clone()]);
let mut a = FileReplayAdapter::open(tmp.path()).unwrap();
let back = a.next_frame().unwrap().unwrap();
assert_eq!(back, f);
assert_eq!(back.validation, ValidationStatus::Degraded);
assert_eq!(back.quality_reasons, vec!["missing rssi".to_string()]);
}
#[test]
fn bad_header_is_parse_error_at_offset_zero() {
let tmp = tempfile::NamedTempFile::new().unwrap();
{
let mut f = File::create(tmp.path()).unwrap();
f.write_all(b"not json\n").unwrap();
}
let err = FileReplayAdapter::open(tmp.path()).unwrap_err();
match err {
RvcsiError::Parse { offset, .. } => assert_eq!(offset, 0),
other => panic!("expected Parse, got {other:?}"),
}
}
#[test]
fn garbage_frame_line_is_parse_error_with_line_number() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let header = CaptureHeader::new(
SessionId(1),
SourceId::from("rep-test"),
AdapterProfile::offline(AdapterKind::File),
)
.with_created_unix_ns(0);
{
let mut f = File::create(tmp.path()).unwrap();
serde_json::to_writer(&mut f, &header).unwrap();
f.write_all(b"\n").unwrap();
// line 2: a good frame
serde_json::to_writer(&mut f, &frame(0, 1)).unwrap();
f.write_all(b"\n").unwrap();
// line 3: garbage
f.write_all(b"{not a frame}\n").unwrap();
}
let mut a = FileReplayAdapter::open(tmp.path()).unwrap();
assert!(a.next_frame().unwrap().is_some()); // line 2 ok
let err = a.next_frame().unwrap_err(); // line 3
match err {
RvcsiError::Parse { offset, .. } => assert_eq!(offset, 3),
other => panic!("expected Parse at line 3, got {other:?}"),
}
}
#[test]
fn nonexistent_path_is_io_error() {
let err = FileReplayAdapter::open("/no/such/rvcsi/file.rvcsi").unwrap_err();
assert!(matches!(err, RvcsiError::Io(_)), "expected Io, got {err:?}");
}
#[test]
fn wrong_version_rejected() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let mut header = CaptureHeader::new(
SessionId(1),
SourceId::from("x"),
AdapterProfile::offline(AdapterKind::File),
);
header.rvcsi_capture_version = 999;
{
let mut f = File::create(tmp.path()).unwrap();
serde_json::to_writer(&mut f, &header).unwrap();
f.write_all(b"\n").unwrap();
}
let err = FileReplayAdapter::open(tmp.path()).unwrap_err();
assert!(matches!(err, RvcsiError::Parse { offset: 0, .. }));
}
}

View File

@ -0,0 +1,272 @@
//! Deterministic, dependency-free embedding functions for RF memory records.
//!
//! [`window_embedding`] turns a [`CsiWindow`] into a fixed-length
//! [`WINDOW_EMBEDDING_DIM`]-vector regardless of subcarrier count;
//! [`event_embedding`] turns a [`CsiEvent`] into a fixed-length
//! [`EVENT_EMBEDDING_DIM`]-vector. [`cosine_similarity`] is the comparison
//! metric used by the [`crate::RfMemoryStore`] implementations.
//!
//! All functions are pure and deterministic — the same input always yields the
//! same bytes, with no clocks, randomness, threads or floating-point
//! reductions whose order could vary.
use rvcsi_core::{CsiEvent, CsiEventKind, CsiWindow};
/// Length of a [`window_embedding`] vector.
///
/// Layout (all indices into the returned `Vec<f32>`):
/// * `0..32` — `mean_amplitude` linearly resampled to 32 bins
/// * `32..64` — `phase_variance` linearly resampled to 32 bins
/// * `64` — `motion_energy`
/// * `65` — `presence_score`
/// * `66` — `quality_score`
/// * `67` — `ln(1 + frame_count)`
///
/// The whole vector is then L2-normalized (left all-zero if its norm is 0,
/// e.g. for an empty window).
pub const WINDOW_EMBEDDING_DIM: usize = 68;
/// Length of an [`event_embedding`] vector.
///
/// Layout:
/// * `0..10` — one-hot of [`CsiEventKind`] in declaration order (see
/// [`kind_index`])
/// * `10` — `confidence`
/// * `11` — `ln(1 + evidence_window_ids.len())`
///
/// Event embeddings are **not** normalized (the one-hot block already gives
/// them a stable scale).
pub const EVENT_EMBEDDING_DIM: usize = 12;
/// Number of bins each per-subcarrier vector is resampled to.
const SUBCARRIER_BINS: usize = 32;
/// Linearly resample `src` (length `n`) to length `m`.
///
/// * `n == 0` → `vec![0.0; m]`
/// * `n == 1` → `vec![src[0]; m]`
/// * otherwise, for each output index `j`: `pos = j * (n-1) / (m-1)`,
/// `lo = floor(pos)`, `frac = pos - lo`, value `src[lo] * (1 - frac) +
/// src[min(lo+1, n-1)] * frac`.
fn resample_linear(src: &[f32], m: usize) -> Vec<f32> {
let n = src.len();
if n == 0 {
return vec![0.0; m];
}
if n == 1 {
return vec![src[0]; m];
}
if m == 0 {
return Vec::new();
}
if m == 1 {
// Degenerate target: just take the first sample (avoids /0 below).
return vec![src[0]];
}
let mut out = Vec::with_capacity(m);
let denom = (m - 1) as f32;
let span = (n - 1) as f32;
for j in 0..m {
let pos = j as f32 * span / denom;
let lo = pos.floor() as usize;
let frac = pos - lo as f32;
let hi = (lo + 1).min(n - 1);
out.push(src[lo] * (1.0 - frac) + src[hi] * frac);
}
out
}
/// L2 norm of a slice (`0.0` for an empty slice).
fn l2_norm(v: &[f32]) -> f32 {
v.iter().map(|x| x * x).sum::<f32>().sqrt()
}
/// In-place L2 normalization; leaves `v` unchanged if its norm is `0` or
/// non-finite.
fn l2_normalize(v: &mut [f32]) {
let norm = l2_norm(v);
if norm.is_finite() && norm > 0.0 {
for x in v.iter_mut() {
*x /= norm;
}
}
}
/// Build the deterministic embedding for a [`CsiWindow`].
///
/// The returned vector has length [`WINDOW_EMBEDDING_DIM`]; see that constant's
/// docs for the exact bin layout. The result is L2-normalized (or all-zero for
/// an empty window — i.e. `subcarrier_count == 0` and `frame_count == 0`).
pub fn window_embedding(w: &CsiWindow) -> Vec<f32> {
let mut out = Vec::with_capacity(WINDOW_EMBEDDING_DIM);
out.extend(resample_linear(&w.mean_amplitude, SUBCARRIER_BINS));
out.extend(resample_linear(&w.phase_variance, SUBCARRIER_BINS));
out.push(w.motion_energy);
out.push(w.presence_score);
out.push(w.quality_score);
out.push((w.frame_count as f32).ln_1p());
debug_assert_eq!(out.len(), WINDOW_EMBEDDING_DIM);
l2_normalize(&mut out);
out
}
/// Fixed index of a [`CsiEventKind`] in the one-hot block of an event
/// embedding — the variant declaration order in `rvcsi_core`.
fn kind_index(k: CsiEventKind) -> usize {
match k {
CsiEventKind::PresenceStarted => 0,
CsiEventKind::PresenceEnded => 1,
CsiEventKind::MotionDetected => 2,
CsiEventKind::MotionSettled => 3,
CsiEventKind::BaselineChanged => 4,
CsiEventKind::SignalQualityDropped => 5,
CsiEventKind::DeviceDisconnected => 6,
CsiEventKind::BreathingCandidate => 7,
CsiEventKind::AnomalyDetected => 8,
CsiEventKind::CalibrationRequired => 9,
}
}
/// Build the deterministic embedding for a [`CsiEvent`].
///
/// The returned vector has length [`EVENT_EMBEDDING_DIM`]; see that constant's
/// docs for the exact layout. Not normalized.
pub fn event_embedding(e: &CsiEvent) -> Vec<f32> {
let mut out = vec![0.0_f32; EVENT_EMBEDDING_DIM];
out[kind_index(e.kind)] = 1.0;
out[10] = e.confidence;
out[11] = (e.evidence_window_ids.len() as f32).ln_1p();
out
}
/// Cosine similarity of two equal-length vectors.
///
/// Returns `0.0` if the lengths differ or either vector is all-zero (or has a
/// non-finite norm); otherwise `dot(a, b) / (||a|| * ||b||)` clamped to
/// `[-1.0, 1.0]`.
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
if a.len() != b.len() || a.is_empty() {
return 0.0;
}
let na = l2_norm(a);
let nb = l2_norm(b);
if !(na.is_finite() && nb.is_finite()) || na == 0.0 || nb == 0.0 {
return 0.0;
}
let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
(dot / (na * nb)).clamp(-1.0, 1.0)
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::{EventId, SessionId, SourceId, WindowId};
fn window() -> CsiWindow {
CsiWindow {
window_id: WindowId(7),
session_id: SessionId(1),
source_id: SourceId::from("emb-test"),
start_ns: 1_000,
end_ns: 2_000,
frame_count: 12,
mean_amplitude: vec![1.0, 2.0, 3.0, 4.0, 5.0],
phase_variance: vec![0.1, 0.2, 0.1, 0.3, 0.2],
motion_energy: 0.42,
presence_score: 0.8,
quality_score: 0.9,
}
}
fn event(kind: CsiEventKind) -> CsiEvent {
CsiEvent::new(
EventId(3),
kind,
SessionId(1),
SourceId::from("emb-test"),
5_000,
0.75,
vec![WindowId(1), WindowId(2)],
)
}
#[test]
fn resample_edge_cases() {
assert_eq!(resample_linear(&[], 4), vec![0.0; 4]);
assert_eq!(resample_linear(&[2.5], 3), vec![2.5, 2.5, 2.5]);
// identity-ish: 3 -> 3 keeps endpoints
let r = resample_linear(&[0.0, 1.0, 2.0], 3);
assert!((r[0] - 0.0).abs() < 1e-6);
assert!((r[1] - 1.0).abs() < 1e-6);
assert!((r[2] - 2.0).abs() < 1e-6);
// upsample 2 -> 5 is a straight line
let r = resample_linear(&[0.0, 4.0], 5);
assert!((r[2] - 2.0).abs() < 1e-6);
}
#[test]
fn window_embedding_is_deterministic_and_unit_length() {
let w = window();
let a = window_embedding(&w);
let b = window_embedding(&w);
assert_eq!(a, b);
assert_eq!(a.len(), WINDOW_EMBEDDING_DIM);
let norm = l2_norm(&a);
assert!((norm - 1.0).abs() < 1e-5, "norm was {norm}");
}
#[test]
fn empty_window_embeds_to_zero() {
let mut w = window();
w.mean_amplitude.clear();
w.phase_variance.clear();
w.motion_energy = 0.0;
w.presence_score = 0.0;
w.quality_score = 0.0;
w.frame_count = 0;
let e = window_embedding(&w);
assert_eq!(e.len(), WINDOW_EMBEDDING_DIM);
assert!(e.iter().all(|x| *x == 0.0));
}
#[test]
fn window_embedding_length_independent_of_subcarrier_count() {
let mut a = window();
a.mean_amplitude = vec![1.0; 56];
a.phase_variance = vec![0.1; 56];
let mut b = window();
b.mean_amplitude = vec![1.0; 234];
b.phase_variance = vec![0.1; 234];
assert_eq!(window_embedding(&a).len(), window_embedding(&b).len());
}
#[test]
fn event_embedding_layout() {
let e = event(CsiEventKind::MotionDetected);
let v = event_embedding(&e);
assert_eq!(v.len(), EVENT_EMBEDDING_DIM);
assert_eq!(v[kind_index(CsiEventKind::MotionDetected)], 1.0);
// exactly one hot in the first 10
assert_eq!(v[..10].iter().filter(|x| **x == 1.0).count(), 1);
assert!((v[10] - 0.75).abs() < 1e-6);
assert!((v[11] - (2.0_f32).ln_1p()).abs() < 1e-6);
// a different kind lights a different bin
let v2 = event_embedding(&event(CsiEventKind::AnomalyDetected));
assert_eq!(v2[kind_index(CsiEventKind::AnomalyDetected)], 1.0);
assert_ne!(v, v2);
}
#[test]
fn cosine_basic_identities() {
let v = window_embedding(&window());
assert!((cosine_similarity(&v, &v) - 1.0).abs() < 1e-5);
let neg: Vec<f32> = v.iter().map(|x| -x).collect();
assert!((cosine_similarity(&v, &neg) + 1.0).abs() < 1e-5);
// mismatched lengths -> 0
assert_eq!(cosine_similarity(&v, &v[..3]), 0.0);
// all-zero -> 0
assert_eq!(cosine_similarity(&[0.0; 4], &[1.0; 4]), 0.0);
assert_eq!(cosine_similarity(&[], &[]), 0.0);
}
}

View File

@ -0,0 +1,396 @@
//! [`JsonlRfMemory`] — a file-backed [`RfMemoryStore`].
//!
//! The store is a [JSONL] file: each line is one JSON object that is *either* a
//! stored record:
//!
//! ```json
//! {"record":{"id":3,"kind":"Window","source_id":"esp32","timestamp_ns":1700,"embedding":[0.1,0.2]}}
//! ```
//!
//! or a baseline write:
//!
//! ```json
//! {"baseline":{"room":"livingroom","version":"v3","embedding":[0.1,0.2]}}
//! ```
//!
//! Opening replays every line into an in-memory index identical to
//! [`crate::InMemoryRfMemory`], so queries are all in-memory; `store_*` /
//! `set_baseline` append a line (and `flush`) so a crash loses at most the
//! line currently being written. The **last** baseline line for a room wins.
//!
//! [JSONL]: https://jsonlines.org/
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
use crate::embedding::{event_embedding, window_embedding};
use crate::memory::{IndexRecord, RfIndex};
use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};
/// On-disk shape of a stored record line.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RecordLine {
id: u64,
kind: RecordKind,
source_id: SourceId,
timestamp_ns: u64,
embedding: Vec<f32>,
}
/// On-disk shape of a baseline line.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BaselineLine {
room: String,
version: String,
embedding: Vec<f32>,
}
/// One line in the JSONL store — exactly one field is present.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StoreLine {
#[serde(skip_serializing_if = "Option::is_none", default)]
record: Option<RecordLine>,
#[serde(skip_serializing_if = "Option::is_none", default)]
baseline: Option<BaselineLine>,
}
impl StoreLine {
fn record(r: RecordLine) -> Self {
StoreLine {
record: Some(r),
baseline: None,
}
}
fn baseline(b: BaselineLine) -> Self {
StoreLine {
record: None,
baseline: Some(b),
}
}
}
/// A file-backed [`RfMemoryStore`]. See the module docs for the on-disk format.
#[derive(Debug)]
pub struct JsonlRfMemory {
path: PathBuf,
writer: BufWriter<File>,
index: RfIndex,
}
impl JsonlRfMemory {
/// Create a new, empty store at `path`, truncating any existing file.
pub fn create(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
let path = path.as_ref().to_path_buf();
let file = File::create(&path)?;
Ok(JsonlRfMemory {
path,
writer: BufWriter::new(file),
index: RfIndex::new(),
})
}
/// Open an existing store at `path`, replaying every line into the
/// in-memory index, then positioning for appends. The file must exist (use
/// [`JsonlRfMemory::create`] otherwise).
pub fn open(path: impl AsRef<Path>) -> Result<Self, RvcsiError> {
let path = path.as_ref().to_path_buf();
let mut index = RfIndex::new();
{
let file = File::open(&path)?;
let reader = BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
let line = line?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let parsed: StoreLine = serde_json::from_str(trimmed).map_err(|e| {
RvcsiError::parse(i + 1, format!("invalid RF-memory line {}: {e}", i + 1))
})?;
match (parsed.record, parsed.baseline) {
(Some(r), None) => index.insert(IndexRecord {
id: EmbeddingId(r.id),
kind: r.kind,
source_id: r.source_id,
timestamp_ns: r.timestamp_ns,
embedding: r.embedding,
}),
(None, Some(b)) => index.set_baseline(&b.room, &b.version, b.embedding),
_ => {
return Err(RvcsiError::parse(
i + 1,
format!("RF-memory line {} must have exactly one of 'record'/'baseline'", i + 1),
))
}
}
}
}
let file = OpenOptions::new().append(true).open(&path)?;
Ok(JsonlRfMemory {
path,
writer: BufWriter::new(file),
index,
})
}
/// Path the store is backed by.
pub fn path(&self) -> &Path {
&self.path
}
/// Flush buffered writes to disk.
pub fn flush(&mut self) -> Result<(), RvcsiError> {
self.writer.flush()?;
Ok(())
}
fn append_line(&mut self, line: &StoreLine) -> Result<(), RvcsiError> {
serde_json::to_writer(&mut self.writer, line)?;
self.writer.write_all(b"\n")?;
self.writer.flush()?;
Ok(())
}
fn append_record(
&mut self,
kind: RecordKind,
source_id: SourceId,
timestamp_ns: u64,
embedding: Vec<f32>,
) -> Result<EmbeddingId, RvcsiError> {
let id = self.index.mint_id();
self.append_line(&StoreLine::record(RecordLine {
id: id.0,
kind,
source_id: source_id.clone(),
timestamp_ns,
embedding: embedding.clone(),
}))?;
self.index.insert(IndexRecord {
id,
kind,
source_id,
timestamp_ns,
embedding,
});
Ok(id)
}
}
impl RfMemoryStore for JsonlRfMemory {
fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError> {
self.append_record(
RecordKind::Window,
w.source_id.clone(),
w.start_ns,
window_embedding(w),
)
}
fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError> {
self.append_record(
RecordKind::Event,
e.source_id.clone(),
e.timestamp_ns,
event_embedding(e),
)
}
fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError> {
Ok(self.index.query_similar(query, k))
}
fn set_baseline(
&mut self,
room: &str,
version: &str,
embedding: Vec<f32>,
) -> Result<(), RvcsiError> {
self.append_line(&StoreLine::baseline(BaselineLine {
room: room.to_string(),
version: version.to_string(),
embedding: embedding.clone(),
}))?;
self.index.set_baseline(room, version, embedding);
Ok(())
}
fn compute_drift(
&self,
room: &str,
current: &[f32],
threshold: f32,
) -> Result<Option<DriftReport>, RvcsiError> {
Ok(self.index.compute_drift(room, current, threshold))
}
fn len(&self) -> usize {
self.index.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embedding::window_embedding;
use rvcsi_core::{CsiEventKind, EventId, SessionId, WindowId};
fn window(id: u64, amp: f32) -> CsiWindow {
CsiWindow {
window_id: WindowId(id),
session_id: SessionId(1),
source_id: SourceId::from(format!("src-{id}").as_str()),
start_ns: 1_000 + id,
end_ns: 2_000 + id,
frame_count: 10,
mean_amplitude: vec![amp, amp + 1.0, amp + 2.0],
phase_variance: vec![0.1, 0.2, 0.1],
motion_energy: amp / 5.0,
presence_score: 0.6,
quality_score: 0.9,
}
}
fn event() -> CsiEvent {
CsiEvent::new(
EventId(0),
CsiEventKind::MotionDetected,
SessionId(1),
SourceId::from("ev"),
9_000,
0.7,
vec![WindowId(1), WindowId(2)],
)
}
#[test]
fn persist_and_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let w1 = window(0, 1.0);
let w2 = window(1, 50.0);
let e = event();
let base_emb = window_embedding(&window(7, 5.0));
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.store_window(&w1).unwrap();
mem.store_window(&w2).unwrap();
mem.store_event(&e).unwrap();
mem.set_baseline("room1", "v1", base_emb.clone()).unwrap();
mem.flush().unwrap();
}
let reopened = JsonlRfMemory::open(&path).unwrap();
assert_eq!(reopened.len(), 3);
let hits = reopened.query_similar(&window_embedding(&w1), 3).unwrap();
assert!((hits[0].score - 1.0).abs() < 1e-5);
let ev_hits = reopened.query_similar(&crate::embedding::event_embedding(&e), 1).unwrap();
assert_eq!(ev_hits[0].kind, RecordKind::Event);
// baseline persisted
let drift = reopened.compute_drift("room1", &base_emb, 0.1).unwrap().unwrap();
assert_eq!(drift.baseline_version, "v1");
assert!(!drift.exceeded);
assert!(drift.distance < 1e-5);
assert!(reopened.compute_drift("other", &base_emb, 0.1).unwrap().is_none());
}
#[test]
fn newer_baseline_wins_after_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let v1_emb = window_embedding(&window(1, 1.0));
let v2_emb = window_embedding(&window(2, 2.0));
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.set_baseline("r", "v1", v1_emb.clone()).unwrap();
mem.flush().unwrap();
}
{
let mut mem = JsonlRfMemory::open(&path).unwrap();
mem.set_baseline("r", "v2", v2_emb.clone()).unwrap();
mem.flush().unwrap();
}
let reopened = JsonlRfMemory::open(&path).unwrap();
let drift = reopened.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap();
assert_eq!(drift.baseline_version, "v2");
assert!(drift.distance < 1e-5);
assert!(!drift.exceeded);
}
#[test]
fn ids_stay_unique_across_reopen() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
let (id0, id1);
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
id0 = mem.store_window(&window(0, 1.0)).unwrap();
id1 = mem.store_window(&window(1, 2.0)).unwrap();
mem.flush().unwrap();
}
assert_eq!(id0, EmbeddingId(0));
assert_eq!(id1, EmbeddingId(1));
let id2 = {
let mut mem = JsonlRfMemory::open(&path).unwrap();
mem.store_window(&window(2, 3.0)).unwrap()
};
assert_eq!(id2, EmbeddingId(2));
assert_eq!(JsonlRfMemory::open(&path).unwrap().len(), 3);
}
#[test]
fn open_missing_file_is_io_error() {
match JsonlRfMemory::open("/no/such/rf/store.jsonl") {
Err(RvcsiError::Io(_)) => {}
other => panic!("expected Io error, got {other:?}"),
}
}
#[test]
fn garbage_line_is_parse_error_with_line_number() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("rf.jsonl");
{
let mut mem = JsonlRfMemory::create(&path).unwrap();
mem.store_window(&window(0, 1.0)).unwrap();
mem.flush().unwrap();
}
// append a garbage line manually
{
use std::io::Write as _;
let mut f = OpenOptions::new().append(true).open(&path).unwrap();
f.write_all(b"{not valid}\n").unwrap();
}
match JsonlRfMemory::open(&path) {
Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 2),
other => panic!("expected Parse at line 2, got {other:?}"),
}
}
#[test]
fn determinism_across_rebuilds() {
let dir = tempfile::tempdir().unwrap();
let build = |name: &str| {
let path = dir.path().join(name);
let mut mem = JsonlRfMemory::create(&path).unwrap();
for i in 0..4 {
mem.store_window(&window(i, (i as f32 + 1.0) * 2.0)).unwrap();
}
mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))).unwrap();
mem.flush().unwrap();
JsonlRfMemory::open(&path).unwrap()
};
let a = build("a.jsonl");
let b = build("b.jsonl");
assert_eq!(a.len(), b.len());
let q = window_embedding(&window(1, 4.0));
assert_eq!(a.query_similar(&q, 4).unwrap(), b.query_similar(&q, 4).unwrap());
}
}

View File

@ -1,8 +1,58 @@
//! # rvCSI RuVector bridge (skeleton — implemented by the rvcsi-ruvector swarm agent)
//! # rvCSI RuVector bridge
//!
//! Exports temporal RF embeddings + event metadata as a queryable RF-memory
//! store (ADR-095 FR8, D8).
#![forbid(unsafe_code)]
//!
//! This crate is a **standin** for the production RuVector vector-database
//! binding (which gets wired in later). It provides:
//!
//! * deterministic, dependency-free embedding functions —
//! [`window_embedding`] / [`event_embedding`] / [`cosine_similarity`];
//! * the [`RfMemoryStore`] trait plus value objects ([`EmbeddingId`],
//! [`RecordKind`], [`SimilarHit`], [`DriftReport`]);
//! * two implementations: the in-process [`InMemoryRfMemory`] and the
//! file-backed [`JsonlRfMemory`] (JSONL append log, identical query semantics).
//!
//! Everything here is pure and deterministic given the same sequence of
//! operations — no clocks, randomness, or order-dependent reductions — so
//! captures replayed twice yield byte-identical stores and query results.
//!
//! ```
//! use rvcsi_ruvector::{InMemoryRfMemory, RfMemoryStore, window_embedding};
//! use rvcsi_core::{CsiWindow, SessionId, SourceId, WindowId};
//!
//! let w = CsiWindow {
//! window_id: WindowId(0),
//! session_id: SessionId(1),
//! source_id: SourceId::from("esp32"),
//! start_ns: 1_000,
//! end_ns: 2_000,
//! frame_count: 10,
//! mean_amplitude: vec![1.0, 2.0, 3.0],
//! phase_variance: vec![0.1, 0.2, 0.1],
//! motion_energy: 0.3,
//! presence_score: 0.7,
//! quality_score: 0.9,
//! };
//! let mut mem = InMemoryRfMemory::new();
//! let id = mem.store_window(&w).unwrap();
//! let hits = mem.query_similar(&window_embedding(&w), 1).unwrap();
//! assert_eq!(hits[0].id, id);
//! assert!((hits[0].score - 1.0).abs() < 1e-5);
//! ```
/// Placeholder so the crate compiles before the agent fills it in.
pub fn __rvcsi_ruvector_placeholder() {}
#![forbid(unsafe_code)]
#![warn(missing_docs)]
mod embedding;
mod jsonl;
mod memory;
mod store;
pub use embedding::{
cosine_similarity, event_embedding, window_embedding, EVENT_EMBEDDING_DIM,
WINDOW_EMBEDDING_DIM,
};
pub use jsonl::JsonlRfMemory;
pub use memory::InMemoryRfMemory;
pub use store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};

View File

@ -0,0 +1,313 @@
//! [`InMemoryRfMemory`] — an in-process [`RfMemoryStore`] backed by plain
//! `Vec`s. Also defines the shared [`RfIndex`] used by the file-backed store.
use std::collections::HashMap;
use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
use crate::embedding::{cosine_similarity, event_embedding, window_embedding};
use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit};
/// One stored record inside an [`RfIndex`].
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct IndexRecord {
pub(crate) id: EmbeddingId,
pub(crate) kind: RecordKind,
pub(crate) source_id: SourceId,
pub(crate) timestamp_ns: u64,
pub(crate) embedding: Vec<f32>,
}
/// The in-memory index that both [`InMemoryRfMemory`] and the file-backed store
/// build queries on top of. Holds records (with monotonic ids) and the latest
/// baseline per room.
#[derive(Debug, Default, Clone)]
pub(crate) struct RfIndex {
records: Vec<IndexRecord>,
/// room -> (version, embedding); the most recently set wins.
baselines: HashMap<String, (String, Vec<f32>)>,
next_id: u64,
}
impl RfIndex {
pub(crate) fn new() -> Self {
RfIndex::default()
}
pub(crate) fn mint_id(&mut self) -> EmbeddingId {
let id = EmbeddingId(self.next_id);
self.next_id += 1;
id
}
/// Insert an already-built record. The record's `id` must come from
/// [`RfIndex::mint_id`] (or be a replay of a previously-minted id, in which
/// case `next_id` is advanced past it so future mints stay unique).
pub(crate) fn insert(&mut self, rec: IndexRecord) {
if rec.id.0 >= self.next_id {
self.next_id = rec.id.0 + 1;
}
self.records.push(rec);
}
pub(crate) fn set_baseline(&mut self, room: &str, version: &str, embedding: Vec<f32>) {
self.baselines
.insert(room.to_string(), (version.to_string(), embedding));
}
pub(crate) fn len(&self) -> usize {
self.records.len()
}
pub(crate) fn query_similar(&self, query: &[f32], k: usize) -> Vec<SimilarHit> {
if k == 0 {
return Vec::new();
}
let mut scored: Vec<(usize, f32)> = self
.records
.iter()
.enumerate()
.map(|(i, r)| (i, cosine_similarity(query, &r.embedding)))
.collect();
// Deterministic sort: by score desc, ties broken by record id asc.
scored.sort_by(|(ia, sa), (ib, sb)| {
sb.partial_cmp(sa)
.unwrap_or(std::cmp::Ordering::Equal)
.then(self.records[*ia].id.cmp(&self.records[*ib].id))
});
scored
.into_iter()
.take(k)
.map(|(i, score)| {
let r = &self.records[i];
SimilarHit {
id: r.id,
score,
kind: r.kind,
source_id: r.source_id.clone(),
timestamp_ns: r.timestamp_ns,
}
})
.collect()
}
pub(crate) fn compute_drift(
&self,
room: &str,
current: &[f32],
threshold: f32,
) -> Option<DriftReport> {
let (version, baseline) = self.baselines.get(room)?;
let distance = 1.0 - cosine_similarity(baseline, current);
Some(DriftReport {
room: room.to_string(),
baseline_version: version.clone(),
distance,
threshold,
exceeded: distance > threshold,
})
}
}
/// An entirely in-process [`RfMemoryStore`] — no persistence.
///
/// Useful for tests, ephemeral runs, and as the query engine behind the
/// file-backed [`crate::JsonlRfMemory`].
#[derive(Debug, Default, Clone)]
pub struct InMemoryRfMemory {
index: RfIndex,
}
impl InMemoryRfMemory {
/// A fresh, empty store.
pub fn new() -> Self {
InMemoryRfMemory {
index: RfIndex::new(),
}
}
}
impl RfMemoryStore for InMemoryRfMemory {
fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError> {
let id = self.index.mint_id();
self.index.insert(IndexRecord {
id,
kind: RecordKind::Window,
source_id: w.source_id.clone(),
timestamp_ns: w.start_ns,
embedding: window_embedding(w),
});
Ok(id)
}
fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError> {
let id = self.index.mint_id();
self.index.insert(IndexRecord {
id,
kind: RecordKind::Event,
source_id: e.source_id.clone(),
timestamp_ns: e.timestamp_ns,
embedding: event_embedding(e),
});
Ok(id)
}
fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError> {
Ok(self.index.query_similar(query, k))
}
fn set_baseline(
&mut self,
room: &str,
version: &str,
embedding: Vec<f32>,
) -> Result<(), RvcsiError> {
self.index.set_baseline(room, version, embedding);
Ok(())
}
fn compute_drift(
&self,
room: &str,
current: &[f32],
threshold: f32,
) -> Result<Option<DriftReport>, RvcsiError> {
Ok(self.index.compute_drift(room, current, threshold))
}
fn len(&self) -> usize {
self.index.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use rvcsi_core::{CsiEventKind, EventId, SessionId, SourceId, WindowId};
fn window(id: u64, amp: f32) -> CsiWindow {
CsiWindow {
window_id: WindowId(id),
session_id: SessionId(1),
source_id: SourceId::from(format!("src-{id}").as_str()),
start_ns: 1_000 + id,
end_ns: 2_000 + id,
frame_count: 10 + id as u32,
mean_amplitude: vec![amp, amp + 1.0, amp + 2.0, amp + 3.0],
phase_variance: vec![0.1, 0.2, 0.1, 0.05],
motion_energy: amp / 10.0,
presence_score: 0.5,
quality_score: 0.9,
}
}
fn event() -> CsiEvent {
CsiEvent::new(
EventId(0),
CsiEventKind::PresenceStarted,
SessionId(1),
SourceId::from("ev"),
9_000,
0.8,
vec![WindowId(1)],
)
}
#[test]
fn store_and_query_windows() {
let mut mem = InMemoryRfMemory::new();
let w1 = window(0, 1.0);
let w2 = window(1, 50.0);
let w3 = window(2, 100.0);
let id1 = mem.store_window(&w1).unwrap();
mem.store_window(&w2).unwrap();
mem.store_window(&w3).unwrap();
assert_eq!(mem.len(), 3);
assert!(!mem.is_empty());
let q = window_embedding(&w1);
let hits = mem.query_similar(&q, 3).unwrap();
assert_eq!(hits.len(), 3);
assert_eq!(hits[0].id, id1);
assert_eq!(hits[0].kind, RecordKind::Window);
assert!((hits[0].score - 1.0).abs() < 1e-5);
// descending
assert!(hits[0].score >= hits[1].score);
assert!(hits[1].score >= hits[2].score);
}
#[test]
fn store_and_query_event() {
let mut mem = InMemoryRfMemory::new();
mem.store_window(&window(0, 1.0)).unwrap();
let e = event();
let eid = mem.store_event(&e).unwrap();
let hits = mem.query_similar(&event_embedding(&e), 1).unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].id, eid);
assert_eq!(hits[0].kind, RecordKind::Event);
assert!((hits[0].score - 1.0).abs() < 1e-5);
assert_eq!(hits[0].timestamp_ns, 9_000);
}
#[test]
fn baseline_drift() {
let mut mem = InMemoryRfMemory::new();
let base = window(0, 10.0);
let base_emb = window_embedding(&base);
mem.set_baseline("room1", "v1", base_emb.clone()).unwrap();
// near-identical: tiny perturbation
let mut near = base.clone();
near.motion_energy += 0.001;
let near_emb = window_embedding(&near);
let r = mem.compute_drift("room1", &near_emb, 0.2).unwrap().unwrap();
assert_eq!(r.room, "room1");
assert_eq!(r.baseline_version, "v1");
assert!(!r.exceeded, "distance was {}", r.distance);
// very different
let far_emb = window_embedding(&window(9, 1_000.0));
let r2 = mem.compute_drift("room1", &far_emb, 0.001).unwrap().unwrap();
assert!(r2.exceeded, "distance was {}", r2.distance);
// unknown room
assert!(mem.compute_drift("nope", &near_emb, 0.2).unwrap().is_none());
}
#[test]
fn replaying_baseline_keeps_latest() {
let mut mem = InMemoryRfMemory::new();
mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0)))
.unwrap();
let v2_emb = window_embedding(&window(1, 2.0));
mem.set_baseline("r", "v2", v2_emb.clone()).unwrap();
let r = mem.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap();
assert_eq!(r.baseline_version, "v2");
assert!(!r.exceeded);
assert!(r.distance < 1e-5);
}
#[test]
fn deterministic_across_rebuilds() {
let build = || {
let mut m = InMemoryRfMemory::new();
for i in 0..5 {
m.store_window(&window(i, (i as f32 + 1.0) * 3.0)).unwrap();
}
m
};
let a = build();
let b = build();
assert_eq!(a.len(), b.len());
let q = window_embedding(&window(2, 9.0));
assert_eq!(a.query_similar(&q, 5).unwrap(), b.query_similar(&q, 5).unwrap());
}
#[test]
fn k_zero_returns_empty() {
let mut m = InMemoryRfMemory::new();
m.store_window(&window(0, 1.0)).unwrap();
assert!(m.query_similar(&window_embedding(&window(0, 1.0)), 0).unwrap().is_empty());
}
}

View File

@ -0,0 +1,148 @@
//! The [`RfMemoryStore`] trait and its value objects.
//!
//! An RF-memory store keeps embeddings of [`CsiWindow`](rvcsi_core::CsiWindow)s
//! and [`CsiEvent`](rvcsi_core::CsiEvent)s plus per-room baseline embeddings,
//! and answers similarity / drift queries over them. This is a standin for the
//! production RuVector binding (ADR-095 FR8, D8) — see the crate docs.
use serde::{Deserialize, Serialize};
use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId};
/// Identifier minted for each stored embedding (monotonic within a store).
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct EmbeddingId(pub u64);
impl EmbeddingId {
/// The raw integer value.
#[inline]
pub const fn value(self) -> u64 {
self.0
}
}
impl From<u64> for EmbeddingId {
#[inline]
fn from(v: u64) -> Self {
EmbeddingId(v)
}
}
/// Which kind of record an embedding came from.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RecordKind {
/// Embedding of a [`CsiWindow`](rvcsi_core::CsiWindow).
Window,
/// Embedding of a [`CsiEvent`](rvcsi_core::CsiEvent).
Event,
}
/// One hit returned by [`RfMemoryStore::query_similar`].
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SimilarHit {
/// Id of the matched stored embedding.
pub id: EmbeddingId,
/// Cosine similarity to the query in `[-1.0, 1.0]`.
pub score: f32,
/// Whether the matched record was a window or an event.
pub kind: RecordKind,
/// Source the matched record came from.
pub source_id: SourceId,
/// Timestamp of the matched record (ns).
pub timestamp_ns: u64,
}
/// Result of a baseline-drift comparison ([`RfMemoryStore::compute_drift`]).
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DriftReport {
/// Room the baseline belongs to.
pub room: String,
/// Baseline version that was compared against.
pub baseline_version: String,
/// Cosine *distance* `1 - cosine_similarity(baseline, current)` in `[0.0, 2.0]`.
pub distance: f32,
/// Threshold the distance was compared against.
pub threshold: f32,
/// Whether `distance > threshold`.
pub exceeded: bool,
}
/// A queryable RF-memory store: append window/event embeddings, search by
/// cosine similarity, and track per-room baseline drift.
///
/// Implementations are deterministic given the same sequence of operations.
pub trait RfMemoryStore {
/// Store the embedding of `w`, returning its newly-minted id.
fn store_window(&mut self, w: &CsiWindow) -> Result<EmbeddingId, RvcsiError>;
/// Store the embedding of `e`, returning its newly-minted id.
fn store_event(&mut self, e: &CsiEvent) -> Result<EmbeddingId, RvcsiError>;
/// Return up to `k` stored records most similar to `query`, by descending
/// cosine similarity. Records whose embedding length differs from `query`
/// (e.g. events vs. window queries) score `0.0` and so sort last.
fn query_similar(&self, query: &[f32], k: usize) -> Result<Vec<SimilarHit>, RvcsiError>;
/// Set (or replace) the baseline embedding for `room` at `version`.
fn set_baseline(
&mut self,
room: &str,
version: &str,
embedding: Vec<f32>,
) -> Result<(), RvcsiError>;
/// Compare `current` against `room`'s baseline. Returns `None` if there is
/// no baseline for `room`, otherwise a [`DriftReport`] with
/// `distance = 1 - cosine_similarity(baseline, current)` and
/// `exceeded = distance > threshold`.
fn compute_drift(
&self,
room: &str,
current: &[f32],
threshold: f32,
) -> Result<Option<DriftReport>, RvcsiError>;
/// Number of stored records (windows + events; baselines are not counted).
fn len(&self) -> usize;
/// Whether [`RfMemoryStore::len`] is zero.
fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn embedding_id_roundtrips() {
let id = EmbeddingId::from(42);
assert_eq!(id.value(), 42);
let json = serde_json::to_string(&id).unwrap();
assert_eq!(serde_json::from_str::<EmbeddingId>(&json).unwrap(), id);
}
#[test]
fn value_objects_serde() {
let hit = SimilarHit {
id: EmbeddingId(1),
score: 0.9,
kind: RecordKind::Window,
source_id: SourceId::from("s"),
timestamp_ns: 5,
};
let json = serde_json::to_string(&hit).unwrap();
assert_eq!(serde_json::from_str::<SimilarHit>(&json).unwrap(), hit);
let d = DriftReport {
room: "lab".into(),
baseline_version: "v1".into(),
distance: 0.1,
threshold: 0.2,
exceeded: false,
};
let json = serde_json::to_string(&d).unwrap();
assert_eq!(serde_json::from_str::<DriftReport>(&json).unwrap(), d);
}
}