diff --git a/v2/Cargo.lock b/v2/Cargo.lock index 6bc8bcec..b5cb1d31 100644 --- a/v2/Cargo.lock +++ b/v2/Cargo.lock @@ -6015,7 +6015,6 @@ name = "rvcsi-events" version = "0.3.0" dependencies = [ "rvcsi-core", - "rvcsi-dsp", "serde", "serde_json", "thiserror 1.0.69", diff --git a/v2/crates/rvcsi-adapter-file/src/format.rs b/v2/crates/rvcsi-adapter-file/src/format.rs new file mode 100644 index 00000000..df0389dc --- /dev/null +++ b/v2/crates/rvcsi-adapter-file/src/format.rs @@ -0,0 +1,144 @@ +//! The `.rvcsi` capture container format (ADR-095 FR1/FR10, D9). +//! +//! A `.rvcsi` file is plain [JSONL]: the **first line** is a +//! [`CaptureHeader`] object describing the session; every **subsequent line** +//! is one [`rvcsi_core::CsiFrame`] serialized as JSON. This keeps the format +//! simple, deterministic, append-friendly and trivially debuggable with `head` +//! / `jq`. +//! +//! [JSONL]: https://jsonlines.org/ + +use rvcsi_core::{AdapterProfile, SessionId, SourceId, ValidationPolicy}; +use serde::{Deserialize, Serialize}; + +/// Current `.rvcsi` capture format version. Written into every header and +/// checked on read. +pub const CAPTURE_VERSION: u32 = 1; + +/// Header object — the first line of every `.rvcsi` capture file. +/// +/// It records enough context to replay the session faithfully: the originating +/// session/source ids, the source's [`AdapterProfile`], the +/// [`ValidationPolicy`] that was in force, the calibration version (if any), +/// and an opaque `runtime_config_json` blob the caller may use for whatever it +/// likes (defaults to `"{}"`). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CaptureHeader { + /// Capture format version (always [`CAPTURE_VERSION`] when written). + pub rvcsi_capture_version: u32, + /// Session this capture belongs to. + pub session_id: SessionId, + /// Source the frames were captured from. + pub source_id: SourceId, + /// Capability descriptor of the source at capture time. + pub adapter_profile: AdapterProfile, + /// Validation policy that was in force during capture. + pub validation_policy: ValidationPolicy, + /// Calibration version frames were processed against, if any. + pub calibration_version: Option, + /// Opaque caller-supplied runtime config (JSON; default `"{}"`). + pub runtime_config_json: String, + /// Wall-clock creation time, nanoseconds since the Unix epoch (`0` if unknown). + pub created_unix_ns: u64, +} + +impl CaptureHeader { + /// Build a header for `session_id` / `source_id` / `adapter_profile` with + /// sensible defaults: version [`CAPTURE_VERSION`], [`ValidationPolicy::default`], + /// no calibration version, `runtime_config_json == "{}"`, and + /// `created_unix_ns` taken from the system clock (or `0` if it is unavailable + /// or before the epoch). + pub fn new(session_id: SessionId, source_id: SourceId, adapter_profile: AdapterProfile) -> Self { + CaptureHeader { + rvcsi_capture_version: CAPTURE_VERSION, + session_id, + source_id, + adapter_profile, + validation_policy: ValidationPolicy::default(), + calibration_version: None, + runtime_config_json: "{}".to_string(), + created_unix_ns: now_unix_ns(), + } + } + + /// Builder: override the validation policy. + pub fn with_validation_policy(mut self, policy: ValidationPolicy) -> Self { + self.validation_policy = policy; + self + } + + /// Builder: set the calibration version. + pub fn with_calibration_version(mut self, version: impl Into) -> Self { + self.calibration_version = Some(version.into()); + self + } + + /// Builder: set the opaque runtime config blob. + pub fn with_runtime_config_json(mut self, json: impl Into) -> Self { + self.runtime_config_json = json.into(); + self + } + + /// Builder: pin `created_unix_ns` (useful for deterministic tests). + pub fn with_created_unix_ns(mut self, ns: u64) -> Self { + self.created_unix_ns = ns; + self + } +} + +/// Best-effort "nanoseconds since the Unix epoch" using the system clock; +/// returns `0` when the clock is unavailable or set before the epoch. +fn now_unix_ns() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos().min(u128::from(u64::MAX)) as u64) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::AdapterKind; + + #[test] + fn header_defaults() { + let h = CaptureHeader::new( + SessionId(7), + SourceId::from("file:lab.rvcsi"), + AdapterProfile::offline(AdapterKind::File), + ); + assert_eq!(h.rvcsi_capture_version, CAPTURE_VERSION); + assert_eq!(h.runtime_config_json, "{}"); + assert!(h.calibration_version.is_none()); + assert_eq!(h.validation_policy, ValidationPolicy::default()); + } + + #[test] + fn header_builders() { + let h = CaptureHeader::new( + SessionId(1), + SourceId::from("s"), + AdapterProfile::offline(AdapterKind::File), + ) + .with_calibration_version("room@v2") + .with_runtime_config_json(r#"{"foo":1}"#) + .with_created_unix_ns(42); + assert_eq!(h.calibration_version.as_deref(), Some("room@v2")); + assert_eq!(h.runtime_config_json, r#"{"foo":1}"#); + assert_eq!(h.created_unix_ns, 42); + } + + #[test] + fn header_json_roundtrips() { + let h = CaptureHeader::new( + SessionId(3), + SourceId::from("esp32"), + AdapterProfile::esp32_default(), + ) + .with_created_unix_ns(123); + let json = serde_json::to_string(&h).unwrap(); + let back: CaptureHeader = serde_json::from_str(&json).unwrap(); + assert_eq!(h, back); + } +} diff --git a/v2/crates/rvcsi-adapter-file/src/lib.rs b/v2/crates/rvcsi-adapter-file/src/lib.rs index 390cc8fd..a601af98 100644 --- a/v2/crates/rvcsi-adapter-file/src/lib.rs +++ b/v2/crates/rvcsi-adapter-file/src/lib.rs @@ -1,7 +1,342 @@ -//! # rvCSI file/replay adapter (skeleton — implemented by the adapters swarm agent) +//! # rvCSI file/replay adapter //! -//! Records and replays `.rvcsi` capture sessions deterministically (ADR-095 D9). -#![forbid(unsafe_code)] +//! The `.rvcsi` capture container, its [`FileRecorder`], and the +//! [`FileReplayAdapter`] [`CsiSource`](rvcsi_core::CsiSource) (ADR-095 FR1/FR10, +//! D9). +//! +//! A `.rvcsi` file is plain [JSONL]: the first line is a [`CaptureHeader`] +//! describing the session; every subsequent line is one +//! [`rvcsi_core::CsiFrame`] serialized as compact JSON. The format is simple, +//! deterministic, append-friendly and trivially inspectable with `head` / `jq`. +//! +//! Typical use: +//! +//! ```no_run +//! use rvcsi_adapter_file::{CaptureHeader, FileRecorder, FileReplayAdapter}; +//! use rvcsi_core::{AdapterKind, AdapterProfile, CsiSource, SessionId, SourceId}; +//! +//! # fn demo() -> rvcsi_core::Result<()> { +//! let header = CaptureHeader::new( +//! SessionId(1), +//! SourceId::from("file:lab.rvcsi"), +//! AdapterProfile::offline(AdapterKind::File), +//! ); +//! let mut rec = FileRecorder::create("lab.rvcsi", &header)?; +//! // rec.write_frame(&frame)?; ... +//! rec.finish()?; +//! +//! let mut replay = FileReplayAdapter::open("lab.rvcsi")?; +//! while let Some(frame) = replay.next_frame()? { +//! // hand `frame` downstream — its ValidationStatus is preserved as recorded +//! let _ = frame; +//! } +//! # Ok(()) +//! # } +//! ``` +//! +//! [JSONL]: https://jsonlines.org/ -/// Placeholder so the crate compiles before the agent fills it in. -pub fn __rvcsi_adapter_file_placeholder() {} +#![forbid(unsafe_code)] +#![warn(missing_docs)] + +mod format; +mod recorder; +mod replay; + +pub use format::{CaptureHeader, CAPTURE_VERSION}; +pub use recorder::FileRecorder; +pub use replay::FileReplayAdapter; + +use std::path::Path; + +use rvcsi_core::{CsiFrame, Result}; + +/// Read an entire `.rvcsi` capture into memory: its [`CaptureHeader`] and every +/// [`CsiFrame`] it contains, in recording order. +/// +/// This is a convenience wrapper over [`FileReplayAdapter`]; for large captures +/// or streaming use, prefer iterating [`FileReplayAdapter`] directly. Errors are +/// the same as [`FileReplayAdapter::open`] / [`FileReplayAdapter::next_frame`]: +/// an [`rvcsi_core::RvcsiError::Io`] for a missing/unreadable file, an +/// [`rvcsi_core::RvcsiError::Parse`] (offset `0`) for a bad header, or an +/// [`rvcsi_core::RvcsiError::Parse`] carrying the 1-based line number for a +/// malformed frame line. +pub fn read_all(path: impl AsRef) -> Result<(CaptureHeader, Vec)> { + use rvcsi_core::CsiSource; + let mut adapter = FileReplayAdapter::open(path)?; + let header = adapter.header().clone(); + let mut frames = Vec::new(); + while let Some(frame) = adapter.next_frame()? { + frames.push(frame); + } + Ok((header, frames)) +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{ + AdapterKind, AdapterProfile, CsiSource, FrameId, RvcsiError, SessionId, SourceId, + ValidationStatus, + }; + use std::fs::File; + use std::io::{Read, Write}; + + fn header() -> CaptureHeader { + CaptureHeader::new( + SessionId(1), + SourceId::from("it-test"), + AdapterProfile::offline(AdapterKind::File), + ) + .with_created_unix_ns(0) + .with_calibration_version("room@v1") + .with_runtime_config_json(r#"{"window_ms":500}"#) + } + + /// A small varied set of frames: two accepted (quality 0.9), two degraded + /// with reasons, one recovered — varying timestamps / channels / subcarrier + /// counts. + fn sample_frames() -> Vec { + let mut frames = Vec::new(); + + let mut f0 = CsiFrame::from_iq( + FrameId(0), + SessionId(1), + SourceId::from("it-test"), + AdapterKind::File, + 1_000, + 1, + 20, + vec![1.0, 2.0, 3.0, 4.0], + vec![0.5, 0.5, 0.5, 0.5], + ) + .with_rssi(-55); + f0.validation = ValidationStatus::Accepted; + f0.quality_score = 0.9; + frames.push(f0); + + let mut f1 = CsiFrame::from_iq( + FrameId(1), + SessionId(1), + SourceId::from("it-test"), + AdapterKind::File, + 2_000, + 6, + 40, + vec![0.1; 8], + vec![0.2; 8], + ); + f1.validation = ValidationStatus::Degraded; + f1.quality_score = 0.4; + f1.quality_reasons = vec!["missing rssi".to_string(), "low snr".to_string()]; + frames.push(f1); + + let mut f2 = CsiFrame::from_iq( + FrameId(2), + SessionId(1), + SourceId::from("it-test"), + AdapterKind::File, + 3_000, + 11, + 20, + vec![5.0, 6.0], + vec![1.0, -1.0], + ) + .with_rssi(-70) + .with_noise_floor(-95); + f2.validation = ValidationStatus::Accepted; + f2.quality_score = 0.9; + frames.push(f2); + + let mut f3 = CsiFrame::from_iq( + FrameId(3), + SessionId(1), + SourceId::from("it-test"), + AdapterKind::File, + 2_500, // deliberately out of order — replay preserves it verbatim + 6, + 20, + vec![0.0; 3], + vec![0.0; 3], + ); + f3.validation = ValidationStatus::Recovered; + f3.quality_score = 0.3; + frames.push(f3); + + let mut f4 = CsiFrame::from_iq( + FrameId(4), + SessionId(1), + SourceId::from("it-test"), + AdapterKind::File, + 4_000, + 36, + 80, + vec![2.0; 6], + vec![0.0; 6], + ); + f4.validation = ValidationStatus::Degraded; + f4.quality_score = 0.5; + f4.quality_reasons = vec!["amplitude spike".to_string()]; + frames.push(f4); + + frames + } + + #[test] + fn record_then_replay_roundtrips_exactly() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + let frames = sample_frames(); + + let mut rec = FileRecorder::create(tmp.path(), &header).unwrap(); + for f in &frames { + rec.write_frame(f).unwrap(); + } + assert_eq!(rec.frames_written(), frames.len() as u64); + rec.finish().unwrap(); + + let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap(); + assert_eq!(adapter.header(), &header); + let mut got = Vec::new(); + while let Some(f) = adapter.next_frame().unwrap() { + got.push(f); + } + assert_eq!(got, frames); + assert_eq!(adapter.health().frames_delivered, frames.len() as u64); + assert!(!adapter.health().connected); + } + + #[test] + fn re_serializing_replayed_frames_is_byte_identical() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + let frames = sample_frames(); + let mut rec = FileRecorder::create(tmp.path(), &header).unwrap(); + for f in &frames { + rec.write_frame(f).unwrap(); + } + rec.finish().unwrap(); + + let mut original = String::new(); + File::open(tmp.path()).unwrap().read_to_string(&mut original).unwrap(); + + // Round-trip the whole capture and re-emit it; bytes must match. + let (h, fs) = read_all(tmp.path()).unwrap(); + let tmp2 = tempfile::NamedTempFile::new().unwrap(); + let mut rec2 = FileRecorder::create(tmp2.path(), &h).unwrap(); + for f in &fs { + rec2.write_frame(f).unwrap(); + } + rec2.finish().unwrap(); + let mut reemitted = String::new(); + File::open(tmp2.path()).unwrap().read_to_string(&mut reemitted).unwrap(); + + assert_eq!(original, reemitted); + } + + #[test] + fn read_all_matches_replay() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + let frames = sample_frames(); + let mut rec = FileRecorder::create(tmp.path(), &header).unwrap(); + for f in &frames { + rec.write_frame(f).unwrap(); + } + rec.finish().unwrap(); + + let (h, fs) = read_all(tmp.path()).unwrap(); + assert_eq!(h, header); + assert_eq!(fs, frames); + } + + #[test] + fn header_only_capture_has_no_frames() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + FileRecorder::create(tmp.path(), &header).unwrap().finish().unwrap(); + + let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap(); + assert!(adapter.next_frame().unwrap().is_none()); + + let (h, fs) = read_all(tmp.path()).unwrap(); + assert_eq!(h, header); + assert!(fs.is_empty()); + } + + #[test] + fn bad_header_line_is_parse_error_at_offset_zero() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + { + let mut f = File::create(tmp.path()).unwrap(); + f.write_all(b"not json\n").unwrap(); + } + match FileReplayAdapter::open(tmp.path()) { + Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 0), + other => panic!("expected Parse at offset 0, got {other:?}"), + } + match read_all(tmp.path()) { + Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 0), + other => panic!("expected Parse at offset 0, got {other:?}"), + } + } + + #[test] + fn garbage_frame_after_good_frames_reports_line_number() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + { + let mut f = File::create(tmp.path()).unwrap(); + serde_json::to_writer(&mut f, &header).unwrap(); + f.write_all(b"\n").unwrap(); + // lines 2 + 3: good frames + let frames = sample_frames(); + serde_json::to_writer(&mut f, &frames[0]).unwrap(); + f.write_all(b"\n").unwrap(); + serde_json::to_writer(&mut f, &frames[1]).unwrap(); + f.write_all(b"\n").unwrap(); + // line 4: garbage + f.write_all(b"{ not a frame }\n").unwrap(); + } + let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap(); + assert!(adapter.next_frame().unwrap().is_some()); // line 2 + assert!(adapter.next_frame().unwrap().is_some()); // line 3 + match adapter.next_frame() { + Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 4), + other => panic!("expected Parse at line 4, got {other:?}"), + } + } + + #[test] + fn nonexistent_path_is_io_error() { + match FileReplayAdapter::open("/no/such/file/at/all.rvcsi") { + Err(RvcsiError::Io(_)) => {} + other => panic!("expected Io error, got {other:?}"), + } + match read_all("/no/such/file/at/all.rvcsi") { + Err(RvcsiError::Io(_)) => {} + other => panic!("expected Io error, got {other:?}"), + } + } + + #[test] + fn counters_are_consistent() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = header(); + let frames = sample_frames(); + let mut rec = FileRecorder::create(tmp.path(), &header).unwrap(); + for (i, f) in frames.iter().enumerate() { + rec.write_frame(f).unwrap(); + assert_eq!(rec.frames_written(), (i + 1) as u64); + } + rec.finish().unwrap(); + + let mut adapter = FileReplayAdapter::open(tmp.path()).unwrap(); + let mut n = 0u64; + while adapter.next_frame().unwrap().is_some() { + n += 1; + assert_eq!(adapter.health().frames_delivered, n); + } + assert_eq!(n, frames.len() as u64); + } +} diff --git a/v2/crates/rvcsi-adapter-file/src/recorder.rs b/v2/crates/rvcsi-adapter-file/src/recorder.rs new file mode 100644 index 00000000..81d69cd7 --- /dev/null +++ b/v2/crates/rvcsi-adapter-file/src/recorder.rs @@ -0,0 +1,113 @@ +//! [`FileRecorder`] — writes a `.rvcsi` capture: a header line followed by one +//! JSON line per [`CsiFrame`]. + +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::path::Path; + +use rvcsi_core::{CsiFrame, Result}; + +use crate::format::CaptureHeader; + +/// Append-only writer for a `.rvcsi` capture file. +/// +/// Create one with [`FileRecorder::create`] (which writes the header line), +/// push frames with [`FileRecorder::write_frame`], and call +/// [`FileRecorder::finish`] (or just drop it after [`FileRecorder::flush`]) to +/// be sure everything reached disk. +pub struct FileRecorder { + writer: BufWriter, + frames_written: u64, +} + +impl FileRecorder { + /// Create `path` (truncating any existing file) and write `header` as the + /// first line. + pub fn create(path: impl AsRef, header: &CaptureHeader) -> Result { + let file = File::create(path.as_ref())?; + let mut writer = BufWriter::new(file); + write_json_line(&mut writer, header)?; + Ok(FileRecorder { + writer, + frames_written: 0, + }) + } + + /// Append one frame as a JSON line. + pub fn write_frame(&mut self, frame: &CsiFrame) -> Result<()> { + write_json_line(&mut self.writer, frame)?; + self.frames_written += 1; + Ok(()) + } + + /// Flush buffered bytes to the underlying file. + pub fn flush(&mut self) -> Result<()> { + self.writer.flush()?; + Ok(()) + } + + /// Number of frames written so far (the header line is not counted). + pub fn frames_written(&self) -> u64 { + self.frames_written + } + + /// Flush and close the file, consuming the recorder. + pub fn finish(mut self) -> Result<()> { + self.flush() + } +} + +/// Serialize `value` as a single JSON line (no embedded newlines — `serde_json` +/// compact form never produces them) followed by `\n`. +fn write_json_line(writer: &mut W, value: &T) -> Result<()> { + serde_json::to_writer(&mut *writer, value)?; + writer.write_all(b"\n")?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{AdapterKind, AdapterProfile, FrameId, SessionId, SourceId}; + use std::io::Read; + + fn frame(id: u64, ts: u64) -> CsiFrame { + CsiFrame::from_iq( + FrameId(id), + SessionId(1), + SourceId::from("rec-test"), + AdapterKind::File, + ts, + 6, + 20, + vec![1.0, 2.0, 3.0], + vec![0.5, 0.5, 0.5], + ) + } + + #[test] + fn writes_header_then_frames_and_counts() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = CaptureHeader::new( + SessionId(1), + SourceId::from("rec-test"), + AdapterProfile::offline(AdapterKind::File), + ) + .with_created_unix_ns(0); + let mut rec = FileRecorder::create(tmp.path(), &header).unwrap(); + assert_eq!(rec.frames_written(), 0); + rec.write_frame(&frame(0, 100)).unwrap(); + rec.write_frame(&frame(1, 200)).unwrap(); + assert_eq!(rec.frames_written(), 2); + rec.finish().unwrap(); + + let mut contents = String::new(); + File::open(tmp.path()).unwrap().read_to_string(&mut contents).unwrap(); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!(lines.len(), 3); + let parsed_header: CaptureHeader = serde_json::from_str(lines[0]).unwrap(); + assert_eq!(parsed_header, header); + let f0: CsiFrame = serde_json::from_str(lines[1]).unwrap(); + assert_eq!(f0, frame(0, 100)); + } +} diff --git a/v2/crates/rvcsi-adapter-file/src/replay.rs b/v2/crates/rvcsi-adapter-file/src/replay.rs new file mode 100644 index 00000000..48a58020 --- /dev/null +++ b/v2/crates/rvcsi-adapter-file/src/replay.rs @@ -0,0 +1,304 @@ +//! [`FileReplayAdapter`] — a [`CsiSource`] that replays a `.rvcsi` capture +//! file, frame by frame, exactly as it was recorded. + +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::Path; + +use rvcsi_core::{ + AdapterProfile, CsiFrame, CsiSource, Result, RvcsiError, SessionId, SourceHealth, SourceId, +}; + +use crate::format::{CaptureHeader, CAPTURE_VERSION}; + +/// Deterministic replay source backed by a `.rvcsi` capture file. +/// +/// The header is parsed eagerly on [`FileReplayAdapter::open`]; frames are +/// parsed lazily, one line at a time, on each [`CsiSource::next_frame`] call. +/// Timestamps, ordering and per-frame [`rvcsi_core::ValidationStatus`] are +/// preserved verbatim — replay does not re-validate or re-order anything, it +/// only deserializes what was stored. +/// +/// `replay_speed` is carried for the daemon/CLI to pace playback with; the +/// adapter itself never sleeps. +#[derive(Debug)] +pub struct FileReplayAdapter { + header: CaptureHeader, + profile: AdapterProfile, + source_id: SourceId, + reader: BufReader, + /// 1-based line number of the line a subsequent `next_frame` will read. + next_line: usize, + frames_delivered: u64, + at_eof: bool, + replay_speed: f32, + last_status: Option, +} + +impl FileReplayAdapter { + /// Open `path` for replay at real-time speed (`replay_speed == 1.0`). + pub fn open(path: impl AsRef) -> Result { + Self::open_with_speed(path, 1.0) + } + + /// Open `path` for replay, carrying `replay_speed` for downstream pacing. + pub fn open_with_speed(path: impl AsRef, replay_speed: f32) -> Result { + let file = File::open(path.as_ref())?; + let mut reader = BufReader::new(file); + + let mut first = String::new(); + let n = reader.read_line(&mut first)?; + if n == 0 { + return Err(RvcsiError::parse(0, "empty capture file: missing header line")); + } + let header: CaptureHeader = serde_json::from_str(first.trim_end_matches(['\n', '\r'])) + .map_err(|e| RvcsiError::parse(0, format!("invalid .rvcsi header line: {e}")))?; + if header.rvcsi_capture_version != CAPTURE_VERSION { + return Err(RvcsiError::parse( + 0, + format!( + "unsupported .rvcsi capture version {} (this build supports {})", + header.rvcsi_capture_version, CAPTURE_VERSION + ), + )); + } + + let profile = header.adapter_profile.clone(); + let source_id = header.source_id.clone(); + Ok(FileReplayAdapter { + header, + profile, + source_id, + reader, + next_line: 2, + frames_delivered: 0, + at_eof: false, + replay_speed, + last_status: None, + }) + } + + /// The capture header parsed from the file. + pub fn header(&self) -> &CaptureHeader { + &self.header + } + + /// Playback speed multiplier carried for the daemon/CLI (the adapter itself + /// does not sleep). + pub fn replay_speed(&self) -> f32 { + self.replay_speed + } + + /// Whether the underlying file has been fully consumed. + pub fn is_at_eof(&self) -> bool { + self.at_eof + } +} + +impl CsiSource for FileReplayAdapter { + fn profile(&self) -> &AdapterProfile { + &self.profile + } + + fn session_id(&self) -> SessionId { + self.header.session_id + } + + fn source_id(&self) -> &SourceId { + &self.source_id + } + + fn next_frame(&mut self) -> core::result::Result, RvcsiError> { + if self.at_eof { + return Ok(None); + } + loop { + let mut line = String::new(); + let read = self.reader.read_line(&mut line)?; + if read == 0 { + self.at_eof = true; + return Ok(None); + } + let line_no = self.next_line; + self.next_line += 1; + let trimmed = line.trim_end_matches(['\n', '\r']); + if trimmed.is_empty() { + // Tolerate blank lines (e.g. a trailing newline at EOF). + continue; + } + let frame: CsiFrame = serde_json::from_str(trimmed).map_err(|e| { + self.last_status = Some(format!("parse error at line {line_no}")); + RvcsiError::parse(line_no, format!("invalid frame line {line_no}: {e}")) + })?; + self.frames_delivered += 1; + return Ok(Some(frame)); + } + } + + fn health(&self) -> SourceHealth { + SourceHealth { + connected: !self.at_eof, + frames_delivered: self.frames_delivered, + frames_rejected: 0, + status: self.last_status.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::recorder::FileRecorder; + use rvcsi_core::{AdapterKind, FrameId, ValidationStatus}; + use std::io::Write; + + fn frame(id: u64, ts: u64) -> CsiFrame { + CsiFrame::from_iq( + FrameId(id), + SessionId(1), + SourceId::from("rep-test"), + AdapterKind::File, + ts, + 6, + 20, + vec![1.0, 2.0], + vec![0.0, 1.0], + ) + } + + fn write_capture(path: &Path, frames: &[CsiFrame]) -> CaptureHeader { + let header = CaptureHeader::new( + SessionId(1), + SourceId::from("rep-test"), + AdapterProfile::offline(AdapterKind::File), + ) + .with_created_unix_ns(0); + let mut rec = FileRecorder::create(path, &header).unwrap(); + for f in frames { + rec.write_frame(f).unwrap(); + } + rec.finish().unwrap(); + header + } + + #[test] + fn open_speed_default_is_one() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + write_capture(tmp.path(), &[]); + let a = FileReplayAdapter::open(tmp.path()).unwrap(); + assert_eq!(a.replay_speed(), 1.0); + let b = FileReplayAdapter::open_with_speed(tmp.path(), 4.0).unwrap(); + assert_eq!(b.replay_speed(), 4.0); + } + + #[test] + fn replays_frames_in_order() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let frames = vec![frame(0, 10), frame(1, 20), frame(2, 30)]; + let header = write_capture(tmp.path(), &frames); + let mut a = FileReplayAdapter::open(tmp.path()).unwrap(); + assert_eq!(a.header(), &header); + assert_eq!(a.session_id(), SessionId(1)); + assert_eq!(a.source_id(), &SourceId::from("rep-test")); + let mut got = Vec::new(); + while let Some(f) = a.next_frame().unwrap() { + got.push(f); + } + assert_eq!(got, frames); + assert!(a.is_at_eof()); + assert!(!a.health().connected); + assert_eq!(a.health().frames_delivered, 3); + // Repeated calls after EOF stay at None. + assert!(a.next_frame().unwrap().is_none()); + } + + #[test] + fn header_only_file_yields_no_frames() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + write_capture(tmp.path(), &[]); + let mut a = FileReplayAdapter::open(tmp.path()).unwrap(); + assert!(a.next_frame().unwrap().is_none()); + assert_eq!(a.health().frames_delivered, 0); + } + + #[test] + fn validation_status_preserved() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let mut f = frame(0, 1); + f.validation = ValidationStatus::Degraded; + f.quality_score = 0.42; + f.quality_reasons = vec!["missing rssi".to_string()]; + write_capture(tmp.path(), &[f.clone()]); + let mut a = FileReplayAdapter::open(tmp.path()).unwrap(); + let back = a.next_frame().unwrap().unwrap(); + assert_eq!(back, f); + assert_eq!(back.validation, ValidationStatus::Degraded); + assert_eq!(back.quality_reasons, vec!["missing rssi".to_string()]); + } + + #[test] + fn bad_header_is_parse_error_at_offset_zero() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + { + let mut f = File::create(tmp.path()).unwrap(); + f.write_all(b"not json\n").unwrap(); + } + let err = FileReplayAdapter::open(tmp.path()).unwrap_err(); + match err { + RvcsiError::Parse { offset, .. } => assert_eq!(offset, 0), + other => panic!("expected Parse, got {other:?}"), + } + } + + #[test] + fn garbage_frame_line_is_parse_error_with_line_number() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let header = CaptureHeader::new( + SessionId(1), + SourceId::from("rep-test"), + AdapterProfile::offline(AdapterKind::File), + ) + .with_created_unix_ns(0); + { + let mut f = File::create(tmp.path()).unwrap(); + serde_json::to_writer(&mut f, &header).unwrap(); + f.write_all(b"\n").unwrap(); + // line 2: a good frame + serde_json::to_writer(&mut f, &frame(0, 1)).unwrap(); + f.write_all(b"\n").unwrap(); + // line 3: garbage + f.write_all(b"{not a frame}\n").unwrap(); + } + let mut a = FileReplayAdapter::open(tmp.path()).unwrap(); + assert!(a.next_frame().unwrap().is_some()); // line 2 ok + let err = a.next_frame().unwrap_err(); // line 3 + match err { + RvcsiError::Parse { offset, .. } => assert_eq!(offset, 3), + other => panic!("expected Parse at line 3, got {other:?}"), + } + } + + #[test] + fn nonexistent_path_is_io_error() { + let err = FileReplayAdapter::open("/no/such/rvcsi/file.rvcsi").unwrap_err(); + assert!(matches!(err, RvcsiError::Io(_)), "expected Io, got {err:?}"); + } + + #[test] + fn wrong_version_rejected() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let mut header = CaptureHeader::new( + SessionId(1), + SourceId::from("x"), + AdapterProfile::offline(AdapterKind::File), + ); + header.rvcsi_capture_version = 999; + { + let mut f = File::create(tmp.path()).unwrap(); + serde_json::to_writer(&mut f, &header).unwrap(); + f.write_all(b"\n").unwrap(); + } + let err = FileReplayAdapter::open(tmp.path()).unwrap_err(); + assert!(matches!(err, RvcsiError::Parse { offset: 0, .. })); + } +} diff --git a/v2/crates/rvcsi-ruvector/src/embedding.rs b/v2/crates/rvcsi-ruvector/src/embedding.rs new file mode 100644 index 00000000..baa62505 --- /dev/null +++ b/v2/crates/rvcsi-ruvector/src/embedding.rs @@ -0,0 +1,272 @@ +//! Deterministic, dependency-free embedding functions for RF memory records. +//! +//! [`window_embedding`] turns a [`CsiWindow`] into a fixed-length +//! [`WINDOW_EMBEDDING_DIM`]-vector regardless of subcarrier count; +//! [`event_embedding`] turns a [`CsiEvent`] into a fixed-length +//! [`EVENT_EMBEDDING_DIM`]-vector. [`cosine_similarity`] is the comparison +//! metric used by the [`crate::RfMemoryStore`] implementations. +//! +//! All functions are pure and deterministic — the same input always yields the +//! same bytes, with no clocks, randomness, threads or floating-point +//! reductions whose order could vary. + +use rvcsi_core::{CsiEvent, CsiEventKind, CsiWindow}; + +/// Length of a [`window_embedding`] vector. +/// +/// Layout (all indices into the returned `Vec`): +/// * `0..32` — `mean_amplitude` linearly resampled to 32 bins +/// * `32..64` — `phase_variance` linearly resampled to 32 bins +/// * `64` — `motion_energy` +/// * `65` — `presence_score` +/// * `66` — `quality_score` +/// * `67` — `ln(1 + frame_count)` +/// +/// The whole vector is then L2-normalized (left all-zero if its norm is 0, +/// e.g. for an empty window). +pub const WINDOW_EMBEDDING_DIM: usize = 68; + +/// Length of an [`event_embedding`] vector. +/// +/// Layout: +/// * `0..10` — one-hot of [`CsiEventKind`] in declaration order (see +/// [`kind_index`]) +/// * `10` — `confidence` +/// * `11` — `ln(1 + evidence_window_ids.len())` +/// +/// Event embeddings are **not** normalized (the one-hot block already gives +/// them a stable scale). +pub const EVENT_EMBEDDING_DIM: usize = 12; + +/// Number of bins each per-subcarrier vector is resampled to. +const SUBCARRIER_BINS: usize = 32; + +/// Linearly resample `src` (length `n`) to length `m`. +/// +/// * `n == 0` → `vec![0.0; m]` +/// * `n == 1` → `vec![src[0]; m]` +/// * otherwise, for each output index `j`: `pos = j * (n-1) / (m-1)`, +/// `lo = floor(pos)`, `frac = pos - lo`, value `src[lo] * (1 - frac) + +/// src[min(lo+1, n-1)] * frac`. +fn resample_linear(src: &[f32], m: usize) -> Vec { + let n = src.len(); + if n == 0 { + return vec![0.0; m]; + } + if n == 1 { + return vec![src[0]; m]; + } + if m == 0 { + return Vec::new(); + } + if m == 1 { + // Degenerate target: just take the first sample (avoids /0 below). + return vec![src[0]]; + } + let mut out = Vec::with_capacity(m); + let denom = (m - 1) as f32; + let span = (n - 1) as f32; + for j in 0..m { + let pos = j as f32 * span / denom; + let lo = pos.floor() as usize; + let frac = pos - lo as f32; + let hi = (lo + 1).min(n - 1); + out.push(src[lo] * (1.0 - frac) + src[hi] * frac); + } + out +} + +/// L2 norm of a slice (`0.0` for an empty slice). +fn l2_norm(v: &[f32]) -> f32 { + v.iter().map(|x| x * x).sum::().sqrt() +} + +/// In-place L2 normalization; leaves `v` unchanged if its norm is `0` or +/// non-finite. +fn l2_normalize(v: &mut [f32]) { + let norm = l2_norm(v); + if norm.is_finite() && norm > 0.0 { + for x in v.iter_mut() { + *x /= norm; + } + } +} + +/// Build the deterministic embedding for a [`CsiWindow`]. +/// +/// The returned vector has length [`WINDOW_EMBEDDING_DIM`]; see that constant's +/// docs for the exact bin layout. The result is L2-normalized (or all-zero for +/// an empty window — i.e. `subcarrier_count == 0` and `frame_count == 0`). +pub fn window_embedding(w: &CsiWindow) -> Vec { + let mut out = Vec::with_capacity(WINDOW_EMBEDDING_DIM); + out.extend(resample_linear(&w.mean_amplitude, SUBCARRIER_BINS)); + out.extend(resample_linear(&w.phase_variance, SUBCARRIER_BINS)); + out.push(w.motion_energy); + out.push(w.presence_score); + out.push(w.quality_score); + out.push((w.frame_count as f32).ln_1p()); + debug_assert_eq!(out.len(), WINDOW_EMBEDDING_DIM); + l2_normalize(&mut out); + out +} + +/// Fixed index of a [`CsiEventKind`] in the one-hot block of an event +/// embedding — the variant declaration order in `rvcsi_core`. +fn kind_index(k: CsiEventKind) -> usize { + match k { + CsiEventKind::PresenceStarted => 0, + CsiEventKind::PresenceEnded => 1, + CsiEventKind::MotionDetected => 2, + CsiEventKind::MotionSettled => 3, + CsiEventKind::BaselineChanged => 4, + CsiEventKind::SignalQualityDropped => 5, + CsiEventKind::DeviceDisconnected => 6, + CsiEventKind::BreathingCandidate => 7, + CsiEventKind::AnomalyDetected => 8, + CsiEventKind::CalibrationRequired => 9, + } +} + +/// Build the deterministic embedding for a [`CsiEvent`]. +/// +/// The returned vector has length [`EVENT_EMBEDDING_DIM`]; see that constant's +/// docs for the exact layout. Not normalized. +pub fn event_embedding(e: &CsiEvent) -> Vec { + let mut out = vec![0.0_f32; EVENT_EMBEDDING_DIM]; + out[kind_index(e.kind)] = 1.0; + out[10] = e.confidence; + out[11] = (e.evidence_window_ids.len() as f32).ln_1p(); + out +} + +/// Cosine similarity of two equal-length vectors. +/// +/// Returns `0.0` if the lengths differ or either vector is all-zero (or has a +/// non-finite norm); otherwise `dot(a, b) / (||a|| * ||b||)` clamped to +/// `[-1.0, 1.0]`. +pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 { + if a.len() != b.len() || a.is_empty() { + return 0.0; + } + let na = l2_norm(a); + let nb = l2_norm(b); + if !(na.is_finite() && nb.is_finite()) || na == 0.0 || nb == 0.0 { + return 0.0; + } + let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum(); + (dot / (na * nb)).clamp(-1.0, 1.0) +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{EventId, SessionId, SourceId, WindowId}; + + fn window() -> CsiWindow { + CsiWindow { + window_id: WindowId(7), + session_id: SessionId(1), + source_id: SourceId::from("emb-test"), + start_ns: 1_000, + end_ns: 2_000, + frame_count: 12, + mean_amplitude: vec![1.0, 2.0, 3.0, 4.0, 5.0], + phase_variance: vec![0.1, 0.2, 0.1, 0.3, 0.2], + motion_energy: 0.42, + presence_score: 0.8, + quality_score: 0.9, + } + } + + fn event(kind: CsiEventKind) -> CsiEvent { + CsiEvent::new( + EventId(3), + kind, + SessionId(1), + SourceId::from("emb-test"), + 5_000, + 0.75, + vec![WindowId(1), WindowId(2)], + ) + } + + #[test] + fn resample_edge_cases() { + assert_eq!(resample_linear(&[], 4), vec![0.0; 4]); + assert_eq!(resample_linear(&[2.5], 3), vec![2.5, 2.5, 2.5]); + // identity-ish: 3 -> 3 keeps endpoints + let r = resample_linear(&[0.0, 1.0, 2.0], 3); + assert!((r[0] - 0.0).abs() < 1e-6); + assert!((r[1] - 1.0).abs() < 1e-6); + assert!((r[2] - 2.0).abs() < 1e-6); + // upsample 2 -> 5 is a straight line + let r = resample_linear(&[0.0, 4.0], 5); + assert!((r[2] - 2.0).abs() < 1e-6); + } + + #[test] + fn window_embedding_is_deterministic_and_unit_length() { + let w = window(); + let a = window_embedding(&w); + let b = window_embedding(&w); + assert_eq!(a, b); + assert_eq!(a.len(), WINDOW_EMBEDDING_DIM); + let norm = l2_norm(&a); + assert!((norm - 1.0).abs() < 1e-5, "norm was {norm}"); + } + + #[test] + fn empty_window_embeds_to_zero() { + let mut w = window(); + w.mean_amplitude.clear(); + w.phase_variance.clear(); + w.motion_energy = 0.0; + w.presence_score = 0.0; + w.quality_score = 0.0; + w.frame_count = 0; + let e = window_embedding(&w); + assert_eq!(e.len(), WINDOW_EMBEDDING_DIM); + assert!(e.iter().all(|x| *x == 0.0)); + } + + #[test] + fn window_embedding_length_independent_of_subcarrier_count() { + let mut a = window(); + a.mean_amplitude = vec![1.0; 56]; + a.phase_variance = vec![0.1; 56]; + let mut b = window(); + b.mean_amplitude = vec![1.0; 234]; + b.phase_variance = vec![0.1; 234]; + assert_eq!(window_embedding(&a).len(), window_embedding(&b).len()); + } + + #[test] + fn event_embedding_layout() { + let e = event(CsiEventKind::MotionDetected); + let v = event_embedding(&e); + assert_eq!(v.len(), EVENT_EMBEDDING_DIM); + assert_eq!(v[kind_index(CsiEventKind::MotionDetected)], 1.0); + // exactly one hot in the first 10 + assert_eq!(v[..10].iter().filter(|x| **x == 1.0).count(), 1); + assert!((v[10] - 0.75).abs() < 1e-6); + assert!((v[11] - (2.0_f32).ln_1p()).abs() < 1e-6); + + // a different kind lights a different bin + let v2 = event_embedding(&event(CsiEventKind::AnomalyDetected)); + assert_eq!(v2[kind_index(CsiEventKind::AnomalyDetected)], 1.0); + assert_ne!(v, v2); + } + + #[test] + fn cosine_basic_identities() { + let v = window_embedding(&window()); + assert!((cosine_similarity(&v, &v) - 1.0).abs() < 1e-5); + let neg: Vec = v.iter().map(|x| -x).collect(); + assert!((cosine_similarity(&v, &neg) + 1.0).abs() < 1e-5); + // mismatched lengths -> 0 + assert_eq!(cosine_similarity(&v, &v[..3]), 0.0); + // all-zero -> 0 + assert_eq!(cosine_similarity(&[0.0; 4], &[1.0; 4]), 0.0); + assert_eq!(cosine_similarity(&[], &[]), 0.0); + } +} diff --git a/v2/crates/rvcsi-ruvector/src/jsonl.rs b/v2/crates/rvcsi-ruvector/src/jsonl.rs new file mode 100644 index 00000000..0bb12e07 --- /dev/null +++ b/v2/crates/rvcsi-ruvector/src/jsonl.rs @@ -0,0 +1,396 @@ +//! [`JsonlRfMemory`] — a file-backed [`RfMemoryStore`]. +//! +//! The store is a [JSONL] file: each line is one JSON object that is *either* a +//! stored record: +//! +//! ```json +//! {"record":{"id":3,"kind":"Window","source_id":"esp32","timestamp_ns":1700,"embedding":[0.1,0.2]}} +//! ``` +//! +//! or a baseline write: +//! +//! ```json +//! {"baseline":{"room":"livingroom","version":"v3","embedding":[0.1,0.2]}} +//! ``` +//! +//! Opening replays every line into an in-memory index identical to +//! [`crate::InMemoryRfMemory`], so queries are all in-memory; `store_*` / +//! `set_baseline` append a line (and `flush`) so a crash loses at most the +//! line currently being written. The **last** baseline line for a room wins. +//! +//! [JSONL]: https://jsonlines.org/ + +use std::fs::{File, OpenOptions}; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; + +use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId}; + +use crate::embedding::{event_embedding, window_embedding}; +use crate::memory::{IndexRecord, RfIndex}; +use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit}; + +/// On-disk shape of a stored record line. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RecordLine { + id: u64, + kind: RecordKind, + source_id: SourceId, + timestamp_ns: u64, + embedding: Vec, +} + +/// On-disk shape of a baseline line. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct BaselineLine { + room: String, + version: String, + embedding: Vec, +} + +/// One line in the JSONL store — exactly one field is present. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct StoreLine { + #[serde(skip_serializing_if = "Option::is_none", default)] + record: Option, + #[serde(skip_serializing_if = "Option::is_none", default)] + baseline: Option, +} + +impl StoreLine { + fn record(r: RecordLine) -> Self { + StoreLine { + record: Some(r), + baseline: None, + } + } + fn baseline(b: BaselineLine) -> Self { + StoreLine { + record: None, + baseline: Some(b), + } + } +} + +/// A file-backed [`RfMemoryStore`]. See the module docs for the on-disk format. +#[derive(Debug)] +pub struct JsonlRfMemory { + path: PathBuf, + writer: BufWriter, + index: RfIndex, +} + +impl JsonlRfMemory { + /// Create a new, empty store at `path`, truncating any existing file. + pub fn create(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + let file = File::create(&path)?; + Ok(JsonlRfMemory { + path, + writer: BufWriter::new(file), + index: RfIndex::new(), + }) + } + + /// Open an existing store at `path`, replaying every line into the + /// in-memory index, then positioning for appends. The file must exist (use + /// [`JsonlRfMemory::create`] otherwise). + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + let mut index = RfIndex::new(); + { + let file = File::open(&path)?; + let reader = BufReader::new(file); + for (i, line) in reader.lines().enumerate() { + let line = line?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + let parsed: StoreLine = serde_json::from_str(trimmed).map_err(|e| { + RvcsiError::parse(i + 1, format!("invalid RF-memory line {}: {e}", i + 1)) + })?; + match (parsed.record, parsed.baseline) { + (Some(r), None) => index.insert(IndexRecord { + id: EmbeddingId(r.id), + kind: r.kind, + source_id: r.source_id, + timestamp_ns: r.timestamp_ns, + embedding: r.embedding, + }), + (None, Some(b)) => index.set_baseline(&b.room, &b.version, b.embedding), + _ => { + return Err(RvcsiError::parse( + i + 1, + format!("RF-memory line {} must have exactly one of 'record'/'baseline'", i + 1), + )) + } + } + } + } + let file = OpenOptions::new().append(true).open(&path)?; + Ok(JsonlRfMemory { + path, + writer: BufWriter::new(file), + index, + }) + } + + /// Path the store is backed by. + pub fn path(&self) -> &Path { + &self.path + } + + /// Flush buffered writes to disk. + pub fn flush(&mut self) -> Result<(), RvcsiError> { + self.writer.flush()?; + Ok(()) + } + + fn append_line(&mut self, line: &StoreLine) -> Result<(), RvcsiError> { + serde_json::to_writer(&mut self.writer, line)?; + self.writer.write_all(b"\n")?; + self.writer.flush()?; + Ok(()) + } + + fn append_record( + &mut self, + kind: RecordKind, + source_id: SourceId, + timestamp_ns: u64, + embedding: Vec, + ) -> Result { + let id = self.index.mint_id(); + self.append_line(&StoreLine::record(RecordLine { + id: id.0, + kind, + source_id: source_id.clone(), + timestamp_ns, + embedding: embedding.clone(), + }))?; + self.index.insert(IndexRecord { + id, + kind, + source_id, + timestamp_ns, + embedding, + }); + Ok(id) + } +} + +impl RfMemoryStore for JsonlRfMemory { + fn store_window(&mut self, w: &CsiWindow) -> Result { + self.append_record( + RecordKind::Window, + w.source_id.clone(), + w.start_ns, + window_embedding(w), + ) + } + + fn store_event(&mut self, e: &CsiEvent) -> Result { + self.append_record( + RecordKind::Event, + e.source_id.clone(), + e.timestamp_ns, + event_embedding(e), + ) + } + + fn query_similar(&self, query: &[f32], k: usize) -> Result, RvcsiError> { + Ok(self.index.query_similar(query, k)) + } + + fn set_baseline( + &mut self, + room: &str, + version: &str, + embedding: Vec, + ) -> Result<(), RvcsiError> { + self.append_line(&StoreLine::baseline(BaselineLine { + room: room.to_string(), + version: version.to_string(), + embedding: embedding.clone(), + }))?; + self.index.set_baseline(room, version, embedding); + Ok(()) + } + + fn compute_drift( + &self, + room: &str, + current: &[f32], + threshold: f32, + ) -> Result, RvcsiError> { + Ok(self.index.compute_drift(room, current, threshold)) + } + + fn len(&self) -> usize { + self.index.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::embedding::window_embedding; + use rvcsi_core::{CsiEventKind, EventId, SessionId, WindowId}; + + fn window(id: u64, amp: f32) -> CsiWindow { + CsiWindow { + window_id: WindowId(id), + session_id: SessionId(1), + source_id: SourceId::from(format!("src-{id}").as_str()), + start_ns: 1_000 + id, + end_ns: 2_000 + id, + frame_count: 10, + mean_amplitude: vec![amp, amp + 1.0, amp + 2.0], + phase_variance: vec![0.1, 0.2, 0.1], + motion_energy: amp / 5.0, + presence_score: 0.6, + quality_score: 0.9, + } + } + + fn event() -> CsiEvent { + CsiEvent::new( + EventId(0), + CsiEventKind::MotionDetected, + SessionId(1), + SourceId::from("ev"), + 9_000, + 0.7, + vec![WindowId(1), WindowId(2)], + ) + } + + #[test] + fn persist_and_reopen() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("rf.jsonl"); + + let w1 = window(0, 1.0); + let w2 = window(1, 50.0); + let e = event(); + let base_emb = window_embedding(&window(7, 5.0)); + { + let mut mem = JsonlRfMemory::create(&path).unwrap(); + mem.store_window(&w1).unwrap(); + mem.store_window(&w2).unwrap(); + mem.store_event(&e).unwrap(); + mem.set_baseline("room1", "v1", base_emb.clone()).unwrap(); + mem.flush().unwrap(); + } + + let reopened = JsonlRfMemory::open(&path).unwrap(); + assert_eq!(reopened.len(), 3); + let hits = reopened.query_similar(&window_embedding(&w1), 3).unwrap(); + assert!((hits[0].score - 1.0).abs() < 1e-5); + let ev_hits = reopened.query_similar(&crate::embedding::event_embedding(&e), 1).unwrap(); + assert_eq!(ev_hits[0].kind, RecordKind::Event); + + // baseline persisted + let drift = reopened.compute_drift("room1", &base_emb, 0.1).unwrap().unwrap(); + assert_eq!(drift.baseline_version, "v1"); + assert!(!drift.exceeded); + assert!(drift.distance < 1e-5); + assert!(reopened.compute_drift("other", &base_emb, 0.1).unwrap().is_none()); + } + + #[test] + fn newer_baseline_wins_after_reopen() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("rf.jsonl"); + let v1_emb = window_embedding(&window(1, 1.0)); + let v2_emb = window_embedding(&window(2, 2.0)); + { + let mut mem = JsonlRfMemory::create(&path).unwrap(); + mem.set_baseline("r", "v1", v1_emb.clone()).unwrap(); + mem.flush().unwrap(); + } + { + let mut mem = JsonlRfMemory::open(&path).unwrap(); + mem.set_baseline("r", "v2", v2_emb.clone()).unwrap(); + mem.flush().unwrap(); + } + let reopened = JsonlRfMemory::open(&path).unwrap(); + let drift = reopened.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap(); + assert_eq!(drift.baseline_version, "v2"); + assert!(drift.distance < 1e-5); + assert!(!drift.exceeded); + } + + #[test] + fn ids_stay_unique_across_reopen() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("rf.jsonl"); + let (id0, id1); + { + let mut mem = JsonlRfMemory::create(&path).unwrap(); + id0 = mem.store_window(&window(0, 1.0)).unwrap(); + id1 = mem.store_window(&window(1, 2.0)).unwrap(); + mem.flush().unwrap(); + } + assert_eq!(id0, EmbeddingId(0)); + assert_eq!(id1, EmbeddingId(1)); + let id2 = { + let mut mem = JsonlRfMemory::open(&path).unwrap(); + mem.store_window(&window(2, 3.0)).unwrap() + }; + assert_eq!(id2, EmbeddingId(2)); + assert_eq!(JsonlRfMemory::open(&path).unwrap().len(), 3); + } + + #[test] + fn open_missing_file_is_io_error() { + match JsonlRfMemory::open("/no/such/rf/store.jsonl") { + Err(RvcsiError::Io(_)) => {} + other => panic!("expected Io error, got {other:?}"), + } + } + + #[test] + fn garbage_line_is_parse_error_with_line_number() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("rf.jsonl"); + { + let mut mem = JsonlRfMemory::create(&path).unwrap(); + mem.store_window(&window(0, 1.0)).unwrap(); + mem.flush().unwrap(); + } + // append a garbage line manually + { + use std::io::Write as _; + let mut f = OpenOptions::new().append(true).open(&path).unwrap(); + f.write_all(b"{not valid}\n").unwrap(); + } + match JsonlRfMemory::open(&path) { + Err(RvcsiError::Parse { offset, .. }) => assert_eq!(offset, 2), + other => panic!("expected Parse at line 2, got {other:?}"), + } + } + + #[test] + fn determinism_across_rebuilds() { + let dir = tempfile::tempdir().unwrap(); + let build = |name: &str| { + let path = dir.path().join(name); + let mut mem = JsonlRfMemory::create(&path).unwrap(); + for i in 0..4 { + mem.store_window(&window(i, (i as f32 + 1.0) * 2.0)).unwrap(); + } + mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))).unwrap(); + mem.flush().unwrap(); + JsonlRfMemory::open(&path).unwrap() + }; + let a = build("a.jsonl"); + let b = build("b.jsonl"); + assert_eq!(a.len(), b.len()); + let q = window_embedding(&window(1, 4.0)); + assert_eq!(a.query_similar(&q, 4).unwrap(), b.query_similar(&q, 4).unwrap()); + } +} diff --git a/v2/crates/rvcsi-ruvector/src/lib.rs b/v2/crates/rvcsi-ruvector/src/lib.rs index 643e0e27..7a8ff3f1 100644 --- a/v2/crates/rvcsi-ruvector/src/lib.rs +++ b/v2/crates/rvcsi-ruvector/src/lib.rs @@ -1,8 +1,58 @@ -//! # rvCSI RuVector bridge (skeleton — implemented by the rvcsi-ruvector swarm agent) +//! # rvCSI RuVector bridge //! //! Exports temporal RF embeddings + event metadata as a queryable RF-memory //! store (ADR-095 FR8, D8). -#![forbid(unsafe_code)] +//! +//! This crate is a **standin** for the production RuVector vector-database +//! binding (which gets wired in later). It provides: +//! +//! * deterministic, dependency-free embedding functions — +//! [`window_embedding`] / [`event_embedding`] / [`cosine_similarity`]; +//! * the [`RfMemoryStore`] trait plus value objects ([`EmbeddingId`], +//! [`RecordKind`], [`SimilarHit`], [`DriftReport`]); +//! * two implementations: the in-process [`InMemoryRfMemory`] and the +//! file-backed [`JsonlRfMemory`] (JSONL append log, identical query semantics). +//! +//! Everything here is pure and deterministic given the same sequence of +//! operations — no clocks, randomness, or order-dependent reductions — so +//! captures replayed twice yield byte-identical stores and query results. +//! +//! ``` +//! use rvcsi_ruvector::{InMemoryRfMemory, RfMemoryStore, window_embedding}; +//! use rvcsi_core::{CsiWindow, SessionId, SourceId, WindowId}; +//! +//! let w = CsiWindow { +//! window_id: WindowId(0), +//! session_id: SessionId(1), +//! source_id: SourceId::from("esp32"), +//! start_ns: 1_000, +//! end_ns: 2_000, +//! frame_count: 10, +//! mean_amplitude: vec![1.0, 2.0, 3.0], +//! phase_variance: vec![0.1, 0.2, 0.1], +//! motion_energy: 0.3, +//! presence_score: 0.7, +//! quality_score: 0.9, +//! }; +//! let mut mem = InMemoryRfMemory::new(); +//! let id = mem.store_window(&w).unwrap(); +//! let hits = mem.query_similar(&window_embedding(&w), 1).unwrap(); +//! assert_eq!(hits[0].id, id); +//! assert!((hits[0].score - 1.0).abs() < 1e-5); +//! ``` -/// Placeholder so the crate compiles before the agent fills it in. -pub fn __rvcsi_ruvector_placeholder() {} +#![forbid(unsafe_code)] +#![warn(missing_docs)] + +mod embedding; +mod jsonl; +mod memory; +mod store; + +pub use embedding::{ + cosine_similarity, event_embedding, window_embedding, EVENT_EMBEDDING_DIM, + WINDOW_EMBEDDING_DIM, +}; +pub use jsonl::JsonlRfMemory; +pub use memory::InMemoryRfMemory; +pub use store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit}; diff --git a/v2/crates/rvcsi-ruvector/src/memory.rs b/v2/crates/rvcsi-ruvector/src/memory.rs new file mode 100644 index 00000000..545945cc --- /dev/null +++ b/v2/crates/rvcsi-ruvector/src/memory.rs @@ -0,0 +1,313 @@ +//! [`InMemoryRfMemory`] — an in-process [`RfMemoryStore`] backed by plain +//! `Vec`s. Also defines the shared [`RfIndex`] used by the file-backed store. + +use std::collections::HashMap; + +use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId}; + +use crate::embedding::{cosine_similarity, event_embedding, window_embedding}; +use crate::store::{DriftReport, EmbeddingId, RecordKind, RfMemoryStore, SimilarHit}; + +/// One stored record inside an [`RfIndex`]. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct IndexRecord { + pub(crate) id: EmbeddingId, + pub(crate) kind: RecordKind, + pub(crate) source_id: SourceId, + pub(crate) timestamp_ns: u64, + pub(crate) embedding: Vec, +} + +/// The in-memory index that both [`InMemoryRfMemory`] and the file-backed store +/// build queries on top of. Holds records (with monotonic ids) and the latest +/// baseline per room. +#[derive(Debug, Default, Clone)] +pub(crate) struct RfIndex { + records: Vec, + /// room -> (version, embedding); the most recently set wins. + baselines: HashMap)>, + next_id: u64, +} + +impl RfIndex { + pub(crate) fn new() -> Self { + RfIndex::default() + } + + pub(crate) fn mint_id(&mut self) -> EmbeddingId { + let id = EmbeddingId(self.next_id); + self.next_id += 1; + id + } + + /// Insert an already-built record. The record's `id` must come from + /// [`RfIndex::mint_id`] (or be a replay of a previously-minted id, in which + /// case `next_id` is advanced past it so future mints stay unique). + pub(crate) fn insert(&mut self, rec: IndexRecord) { + if rec.id.0 >= self.next_id { + self.next_id = rec.id.0 + 1; + } + self.records.push(rec); + } + + pub(crate) fn set_baseline(&mut self, room: &str, version: &str, embedding: Vec) { + self.baselines + .insert(room.to_string(), (version.to_string(), embedding)); + } + + pub(crate) fn len(&self) -> usize { + self.records.len() + } + + pub(crate) fn query_similar(&self, query: &[f32], k: usize) -> Vec { + if k == 0 { + return Vec::new(); + } + let mut scored: Vec<(usize, f32)> = self + .records + .iter() + .enumerate() + .map(|(i, r)| (i, cosine_similarity(query, &r.embedding))) + .collect(); + // Deterministic sort: by score desc, ties broken by record id asc. + scored.sort_by(|(ia, sa), (ib, sb)| { + sb.partial_cmp(sa) + .unwrap_or(std::cmp::Ordering::Equal) + .then(self.records[*ia].id.cmp(&self.records[*ib].id)) + }); + scored + .into_iter() + .take(k) + .map(|(i, score)| { + let r = &self.records[i]; + SimilarHit { + id: r.id, + score, + kind: r.kind, + source_id: r.source_id.clone(), + timestamp_ns: r.timestamp_ns, + } + }) + .collect() + } + + pub(crate) fn compute_drift( + &self, + room: &str, + current: &[f32], + threshold: f32, + ) -> Option { + let (version, baseline) = self.baselines.get(room)?; + let distance = 1.0 - cosine_similarity(baseline, current); + Some(DriftReport { + room: room.to_string(), + baseline_version: version.clone(), + distance, + threshold, + exceeded: distance > threshold, + }) + } +} + +/// An entirely in-process [`RfMemoryStore`] — no persistence. +/// +/// Useful for tests, ephemeral runs, and as the query engine behind the +/// file-backed [`crate::JsonlRfMemory`]. +#[derive(Debug, Default, Clone)] +pub struct InMemoryRfMemory { + index: RfIndex, +} + +impl InMemoryRfMemory { + /// A fresh, empty store. + pub fn new() -> Self { + InMemoryRfMemory { + index: RfIndex::new(), + } + } +} + +impl RfMemoryStore for InMemoryRfMemory { + fn store_window(&mut self, w: &CsiWindow) -> Result { + let id = self.index.mint_id(); + self.index.insert(IndexRecord { + id, + kind: RecordKind::Window, + source_id: w.source_id.clone(), + timestamp_ns: w.start_ns, + embedding: window_embedding(w), + }); + Ok(id) + } + + fn store_event(&mut self, e: &CsiEvent) -> Result { + let id = self.index.mint_id(); + self.index.insert(IndexRecord { + id, + kind: RecordKind::Event, + source_id: e.source_id.clone(), + timestamp_ns: e.timestamp_ns, + embedding: event_embedding(e), + }); + Ok(id) + } + + fn query_similar(&self, query: &[f32], k: usize) -> Result, RvcsiError> { + Ok(self.index.query_similar(query, k)) + } + + fn set_baseline( + &mut self, + room: &str, + version: &str, + embedding: Vec, + ) -> Result<(), RvcsiError> { + self.index.set_baseline(room, version, embedding); + Ok(()) + } + + fn compute_drift( + &self, + room: &str, + current: &[f32], + threshold: f32, + ) -> Result, RvcsiError> { + Ok(self.index.compute_drift(room, current, threshold)) + } + + fn len(&self) -> usize { + self.index.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rvcsi_core::{CsiEventKind, EventId, SessionId, SourceId, WindowId}; + + fn window(id: u64, amp: f32) -> CsiWindow { + CsiWindow { + window_id: WindowId(id), + session_id: SessionId(1), + source_id: SourceId::from(format!("src-{id}").as_str()), + start_ns: 1_000 + id, + end_ns: 2_000 + id, + frame_count: 10 + id as u32, + mean_amplitude: vec![amp, amp + 1.0, amp + 2.0, amp + 3.0], + phase_variance: vec![0.1, 0.2, 0.1, 0.05], + motion_energy: amp / 10.0, + presence_score: 0.5, + quality_score: 0.9, + } + } + + fn event() -> CsiEvent { + CsiEvent::new( + EventId(0), + CsiEventKind::PresenceStarted, + SessionId(1), + SourceId::from("ev"), + 9_000, + 0.8, + vec![WindowId(1)], + ) + } + + #[test] + fn store_and_query_windows() { + let mut mem = InMemoryRfMemory::new(); + let w1 = window(0, 1.0); + let w2 = window(1, 50.0); + let w3 = window(2, 100.0); + let id1 = mem.store_window(&w1).unwrap(); + mem.store_window(&w2).unwrap(); + mem.store_window(&w3).unwrap(); + assert_eq!(mem.len(), 3); + assert!(!mem.is_empty()); + + let q = window_embedding(&w1); + let hits = mem.query_similar(&q, 3).unwrap(); + assert_eq!(hits.len(), 3); + assert_eq!(hits[0].id, id1); + assert_eq!(hits[0].kind, RecordKind::Window); + assert!((hits[0].score - 1.0).abs() < 1e-5); + // descending + assert!(hits[0].score >= hits[1].score); + assert!(hits[1].score >= hits[2].score); + } + + #[test] + fn store_and_query_event() { + let mut mem = InMemoryRfMemory::new(); + mem.store_window(&window(0, 1.0)).unwrap(); + let e = event(); + let eid = mem.store_event(&e).unwrap(); + let hits = mem.query_similar(&event_embedding(&e), 1).unwrap(); + assert_eq!(hits.len(), 1); + assert_eq!(hits[0].id, eid); + assert_eq!(hits[0].kind, RecordKind::Event); + assert!((hits[0].score - 1.0).abs() < 1e-5); + assert_eq!(hits[0].timestamp_ns, 9_000); + } + + #[test] + fn baseline_drift() { + let mut mem = InMemoryRfMemory::new(); + let base = window(0, 10.0); + let base_emb = window_embedding(&base); + mem.set_baseline("room1", "v1", base_emb.clone()).unwrap(); + + // near-identical: tiny perturbation + let mut near = base.clone(); + near.motion_energy += 0.001; + let near_emb = window_embedding(&near); + let r = mem.compute_drift("room1", &near_emb, 0.2).unwrap().unwrap(); + assert_eq!(r.room, "room1"); + assert_eq!(r.baseline_version, "v1"); + assert!(!r.exceeded, "distance was {}", r.distance); + + // very different + let far_emb = window_embedding(&window(9, 1_000.0)); + let r2 = mem.compute_drift("room1", &far_emb, 0.001).unwrap().unwrap(); + assert!(r2.exceeded, "distance was {}", r2.distance); + + // unknown room + assert!(mem.compute_drift("nope", &near_emb, 0.2).unwrap().is_none()); + } + + #[test] + fn replaying_baseline_keeps_latest() { + let mut mem = InMemoryRfMemory::new(); + mem.set_baseline("r", "v1", window_embedding(&window(0, 1.0))) + .unwrap(); + let v2_emb = window_embedding(&window(1, 2.0)); + mem.set_baseline("r", "v2", v2_emb.clone()).unwrap(); + let r = mem.compute_drift("r", &v2_emb, 0.5).unwrap().unwrap(); + assert_eq!(r.baseline_version, "v2"); + assert!(!r.exceeded); + assert!(r.distance < 1e-5); + } + + #[test] + fn deterministic_across_rebuilds() { + let build = || { + let mut m = InMemoryRfMemory::new(); + for i in 0..5 { + m.store_window(&window(i, (i as f32 + 1.0) * 3.0)).unwrap(); + } + m + }; + let a = build(); + let b = build(); + assert_eq!(a.len(), b.len()); + let q = window_embedding(&window(2, 9.0)); + assert_eq!(a.query_similar(&q, 5).unwrap(), b.query_similar(&q, 5).unwrap()); + } + + #[test] + fn k_zero_returns_empty() { + let mut m = InMemoryRfMemory::new(); + m.store_window(&window(0, 1.0)).unwrap(); + assert!(m.query_similar(&window_embedding(&window(0, 1.0)), 0).unwrap().is_empty()); + } +} diff --git a/v2/crates/rvcsi-ruvector/src/store.rs b/v2/crates/rvcsi-ruvector/src/store.rs new file mode 100644 index 00000000..c762b04e --- /dev/null +++ b/v2/crates/rvcsi-ruvector/src/store.rs @@ -0,0 +1,148 @@ +//! The [`RfMemoryStore`] trait and its value objects. +//! +//! An RF-memory store keeps embeddings of [`CsiWindow`](rvcsi_core::CsiWindow)s +//! and [`CsiEvent`](rvcsi_core::CsiEvent)s plus per-room baseline embeddings, +//! and answers similarity / drift queries over them. This is a standin for the +//! production RuVector binding (ADR-095 FR8, D8) — see the crate docs. + +use serde::{Deserialize, Serialize}; + +use rvcsi_core::{CsiEvent, CsiWindow, RvcsiError, SourceId}; + +/// Identifier minted for each stored embedding (monotonic within a store). +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct EmbeddingId(pub u64); + +impl EmbeddingId { + /// The raw integer value. + #[inline] + pub const fn value(self) -> u64 { + self.0 + } +} + +impl From for EmbeddingId { + #[inline] + fn from(v: u64) -> Self { + EmbeddingId(v) + } +} + +/// Which kind of record an embedding came from. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum RecordKind { + /// Embedding of a [`CsiWindow`](rvcsi_core::CsiWindow). + Window, + /// Embedding of a [`CsiEvent`](rvcsi_core::CsiEvent). + Event, +} + +/// One hit returned by [`RfMemoryStore::query_similar`]. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SimilarHit { + /// Id of the matched stored embedding. + pub id: EmbeddingId, + /// Cosine similarity to the query in `[-1.0, 1.0]`. + pub score: f32, + /// Whether the matched record was a window or an event. + pub kind: RecordKind, + /// Source the matched record came from. + pub source_id: SourceId, + /// Timestamp of the matched record (ns). + pub timestamp_ns: u64, +} + +/// Result of a baseline-drift comparison ([`RfMemoryStore::compute_drift`]). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct DriftReport { + /// Room the baseline belongs to. + pub room: String, + /// Baseline version that was compared against. + pub baseline_version: String, + /// Cosine *distance* `1 - cosine_similarity(baseline, current)` in `[0.0, 2.0]`. + pub distance: f32, + /// Threshold the distance was compared against. + pub threshold: f32, + /// Whether `distance > threshold`. + pub exceeded: bool, +} + +/// A queryable RF-memory store: append window/event embeddings, search by +/// cosine similarity, and track per-room baseline drift. +/// +/// Implementations are deterministic given the same sequence of operations. +pub trait RfMemoryStore { + /// Store the embedding of `w`, returning its newly-minted id. + fn store_window(&mut self, w: &CsiWindow) -> Result; + + /// Store the embedding of `e`, returning its newly-minted id. + fn store_event(&mut self, e: &CsiEvent) -> Result; + + /// Return up to `k` stored records most similar to `query`, by descending + /// cosine similarity. Records whose embedding length differs from `query` + /// (e.g. events vs. window queries) score `0.0` and so sort last. + fn query_similar(&self, query: &[f32], k: usize) -> Result, RvcsiError>; + + /// Set (or replace) the baseline embedding for `room` at `version`. + fn set_baseline( + &mut self, + room: &str, + version: &str, + embedding: Vec, + ) -> Result<(), RvcsiError>; + + /// Compare `current` against `room`'s baseline. Returns `None` if there is + /// no baseline for `room`, otherwise a [`DriftReport`] with + /// `distance = 1 - cosine_similarity(baseline, current)` and + /// `exceeded = distance > threshold`. + fn compute_drift( + &self, + room: &str, + current: &[f32], + threshold: f32, + ) -> Result, RvcsiError>; + + /// Number of stored records (windows + events; baselines are not counted). + fn len(&self) -> usize; + + /// Whether [`RfMemoryStore::len`] is zero. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn embedding_id_roundtrips() { + let id = EmbeddingId::from(42); + assert_eq!(id.value(), 42); + let json = serde_json::to_string(&id).unwrap(); + assert_eq!(serde_json::from_str::(&json).unwrap(), id); + } + + #[test] + fn value_objects_serde() { + let hit = SimilarHit { + id: EmbeddingId(1), + score: 0.9, + kind: RecordKind::Window, + source_id: SourceId::from("s"), + timestamp_ns: 5, + }; + let json = serde_json::to_string(&hit).unwrap(); + assert_eq!(serde_json::from_str::(&json).unwrap(), hit); + + let d = DriftReport { + room: "lab".into(), + baseline_version: "v1".into(), + distance: 0.1, + threshold: 0.2, + exceeded: false, + }; + let json = serde_json::to_string(&d).unwrap(); + assert_eq!(serde_json::from_str::(&json).unwrap(), d); + } +}