//! A streaming capture runtime: a [`CsiSource`](rvcsi_core::CsiSource) + the DSP //! stage + the event pipeline, wired together. The `rvcsi-node` napi-rs //! `RvcsiRuntime` class is a thin `#[napi]` wrapper around [`CaptureRuntime`]. use rvcsi_adapter_file::FileReplayAdapter; use rvcsi_adapter_nexmon::NexmonAdapter; use rvcsi_core::{ validate_frame, AdapterProfile, CsiEvent, CsiFrame, CsiSource, RvcsiError, SessionId, SourceHealth, SourceId, ValidationPolicy, ValidationStatus, }; use rvcsi_dsp::SignalPipeline; use rvcsi_events::EventPipeline; /// Owns a source and the per-frame processing chain. /// /// `next_validated_frame` pulls from the source and guarantees the returned /// frame is *exposable* (Accepted/Degraded/Recovered) — frames that arrive /// `Pending` are validated against the source's profile, and hard-rejected /// frames are skipped (never surfaced). `drain_events` runs the remainder of the /// stream through `SignalPipeline` + `EventPipeline`. pub struct CaptureRuntime { source: Box, profile: AdapterProfile, policy: ValidationPolicy, dsp: SignalPipeline, events: EventPipeline, prev_ts: Option, frames_seen: u64, frames_dropped: u64, } impl CaptureRuntime { fn new(source: Box, policy: ValidationPolicy) -> Self { let profile = source.profile().clone(); let session_id = source.session_id(); let source_id = source.source_id().clone(); CaptureRuntime { source, profile, policy, dsp: SignalPipeline::default(), events: EventPipeline::with_defaults(session_id, source_id), prev_ts: None, frames_seen: 0, frames_dropped: 0, } } /// Open a `.rvcsi` capture file as the source. pub fn open_capture_file(path: &str) -> Result { let source = FileReplayAdapter::open(path)?; Ok(Self::new(Box::new(source), ValidationPolicy::default())) } /// Open a buffer of "rvCSI Nexmon records" (the napi-c shim format) as the source. pub fn open_nexmon_bytes(bytes: Vec, source_id: &str, session_id: u64) -> Self { let source = NexmonAdapter::from_bytes(SourceId::from(source_id), SessionId(session_id), bytes); // Permissive policy: the C-shim records may carry non-default subcarrier counts. Self::new(Box::new(source), ValidationPolicy::default()) } /// Open a Nexmon capture *file* (concatenated records) as the source. pub fn open_nexmon_file(path: &str, source_id: &str, session_id: u64) -> Result { let bytes = std::fs::read(path)?; Ok(Self::open_nexmon_bytes(bytes, source_id, session_id)) } /// Open a real nexmon_csi `.pcap` capture (`tcpdump -i wlan0 dst port 5500 -w …`) /// as the source. `port` is the CSI UDP port (`None` ⇒ 5500). pub fn open_nexmon_pcap( path: &str, source_id: &str, session_id: u64, port: Option, ) -> Result { let source = rvcsi_adapter_nexmon::NexmonPcapAdapter::open( SourceId::from(source_id), SessionId(session_id), path, port, )?; Ok(Self::new(Box::new(source), ValidationPolicy::default())) } /// Open a real nexmon_csi `.pcap` from an in-memory byte buffer. pub fn open_nexmon_pcap_bytes( pcap_bytes: &[u8], source_id: &str, session_id: u64, port: Option, ) -> Result { let source = rvcsi_adapter_nexmon::NexmonPcapAdapter::parse( SourceId::from(source_id), SessionId(session_id), pcap_bytes, port, )?; Ok(Self::new(Box::new(source), ValidationPolicy::default())) } /// Validate (if needed) a freshly pulled frame; `None` if it was hard-rejected. fn admit(&mut self, mut frame: CsiFrame) -> Option { self.frames_seen += 1; if frame.validation == ValidationStatus::Pending { let ts = frame.timestamp_ns; match validate_frame(&mut frame, &self.profile, &self.policy, self.prev_ts) { Ok(()) if frame.is_exposable() => { self.prev_ts = Some(ts); Some(frame) } _ => { self.frames_dropped += 1; None } } } else if frame.is_exposable() { Some(frame) } else { self.frames_dropped += 1; None } } /// Pull the next exposable frame, validating it if necessary. `Ok(None)` at /// end-of-stream. The frame's `amplitude`/`phase` are NOT yet DSP-cleaned /// (call [`CaptureRuntime::next_clean_frame`] for that). pub fn next_validated_frame(&mut self) -> Result, RvcsiError> { loop { match self.source.next_frame()? { None => return Ok(None), Some(frame) => { if let Some(f) = self.admit(frame) { return Ok(Some(f)); } } } } } /// Like [`CaptureRuntime::next_validated_frame`] but with `SignalPipeline` /// applied (DC removal, phase unwrap, Hampel filter, smoothing). pub fn next_clean_frame(&mut self) -> Result, RvcsiError> { match self.next_validated_frame()? { None => Ok(None), Some(mut f) => { self.dsp.process_frame(&mut f); Ok(Some(f)) } } } /// Drain the rest of the stream through DSP + the event pipeline and return /// every emitted event (in order). pub fn drain_events(&mut self) -> Result, RvcsiError> { let mut out = Vec::new(); while let Some(mut f) = self.next_validated_frame()? { self.dsp.process_frame(&mut f); out.extend(self.events.process_frame(&f)); } out.extend(self.events.flush()); Ok(out) } /// Health snapshot combining the source's view and the runtime's counters. pub fn health(&self) -> SourceHealth { let mut h = self.source.health(); // Augment the status with the runtime's drop count. let extra = format!("frames_seen={}, frames_dropped={}", self.frames_seen, self.frames_dropped); h.status = Some(match h.status { Some(s) => format!("{s}; {extra}"), None => extra, }); h } /// Frames pulled from the source so far. pub fn frames_seen(&self) -> u64 { self.frames_seen } /// Frames dropped by validation so far. pub fn frames_dropped(&self) -> u64 { self.frames_dropped } } #[cfg(test)] mod tests { use super::*; use rvcsi_adapter_file::{CaptureHeader, FileRecorder}; use rvcsi_adapter_nexmon::{encode_record, NexmonRecord}; use rvcsi_core::{AdapterKind, FrameId}; fn write_capture(path: &std::path::Path, n: usize) { let header = CaptureHeader::new( SessionId(1), SourceId::from("rt"), AdapterProfile::offline(AdapterKind::File), ); let mut rec = FileRecorder::create(path, &header).unwrap(); for k in 0..n { let amp_scale = if (k / 8) % 2 == 0 { 0.0 } else { 1.5 }; let i: Vec = (0..32).map(|s| 1.0 + amp_scale * (((k + s) % 5) as f32 - 2.0)).collect(); let q: Vec = (0..32).map(|_| 0.5).collect(); let mut f = CsiFrame::from_iq( FrameId(k as u64), SessionId(1), SourceId::from("rt"), AdapterKind::File, 1_000 + k as u64 * 50_000_000, 6, 20, i, q, ) .with_rssi(-55); f.validation = ValidationStatus::Accepted; f.quality_score = 0.9; rec.write_frame(&f).unwrap(); } rec.finish().unwrap(); } #[test] fn streams_validated_frames_from_a_capture() { let tmp = tempfile::NamedTempFile::new().unwrap(); write_capture(tmp.path(), 5); let mut rt = CaptureRuntime::open_capture_file(tmp.path().to_str().unwrap()).unwrap(); let mut count = 0; while let Some(f) = rt.next_validated_frame().unwrap() { assert!(f.is_exposable()); count += 1; } assert_eq!(count, 5); assert_eq!(rt.frames_seen(), 5); assert_eq!(rt.frames_dropped(), 0); let h = rt.health(); assert!(h.status.unwrap().contains("frames_seen=5")); } #[test] fn clean_frame_applies_dsp_without_changing_validation() { let tmp = tempfile::NamedTempFile::new().unwrap(); write_capture(tmp.path(), 3); let mut rt = CaptureRuntime::open_capture_file(tmp.path().to_str().unwrap()).unwrap(); let f = rt.next_clean_frame().unwrap().unwrap(); assert_eq!(f.validation, ValidationStatus::Accepted); assert_eq!(f.quality_score, 0.9); assert_eq!(f.amplitude.len(), 32); } #[test] fn drains_events_from_an_alternating_stream() { let tmp = tempfile::NamedTempFile::new().unwrap(); write_capture(tmp.path(), 64); let mut rt = CaptureRuntime::open_capture_file(tmp.path().to_str().unwrap()).unwrap(); let events = rt.drain_events().unwrap(); assert!(!events.is_empty()); for e in &events { e.validate().unwrap(); } } #[test] fn runs_a_nexmon_record_stream() { let mk = |ts: u64| { let rec = NexmonRecord { subcarrier_count: 64, channel: 36, bandwidth_mhz: 80, rssi_dbm: Some(-60), noise_floor_dbm: Some(-92), timestamp_ns: ts, i_values: (0..64).map(|k| (k as f32 % 3.0) - 1.0).collect(), q_values: (0..64).map(|k| (k as f32 % 5.0) * 0.1).collect(), }; encode_record(&rec).unwrap() }; let mut buf = Vec::new(); for k in 0..40 { buf.extend(mk(1_000 + k * 50_000_000)); } let mut rt = CaptureRuntime::open_nexmon_bytes(buf, "nexmon-rt", 3); let mut n = 0; while let Some(f) = rt.next_validated_frame().unwrap() { assert_eq!(f.adapter_kind, AdapterKind::Nexmon); assert!(f.is_exposable()); n += 1; } assert_eq!(n, 40); } #[test] fn runs_a_real_nexmon_csi_pcap() { use rvcsi_adapter_nexmon::NexmonCsiHeader; let chanspec = 0x1000u16 | 6; // 2.4 GHz ch6 20 MHz let nsub = 64u16; let frames: Vec<(u64, NexmonCsiHeader, Vec, Vec)> = (0..12u64) .map(|k| { let i: Vec = (0..nsub).map(|s| (s as i16 - 32 + k as i16) as f32).collect(); let q: Vec = (0..nsub).map(|_| 1.0f32).collect(); ( 1_000_000_000 + k * 50_000_000, NexmonCsiHeader { rssi_dbm: -55 - k as i16, fctl: 8, src_mac: [0, 1, 2, 3, 4, 5], seq_cnt: k as u16, core: 0, spatial_stream: 0, chanspec, chip_ver: 0x4345, channel: 0, bandwidth_mhz: 0, is_5ghz: false, subcarrier_count: nsub, }, i, q, ) }) .collect(); let pcap = rvcsi_adapter_nexmon::synthetic_nexmon_pcap(&frames, 5500).unwrap(); let mut rt = CaptureRuntime::open_nexmon_pcap_bytes(&pcap, "nexmon-pcap-rt", 1, None).unwrap(); let mut got = 0; while let Some(f) = rt.next_validated_frame().unwrap() { assert_eq!(f.adapter_kind, AdapterKind::Nexmon); assert_eq!(f.channel, 6); assert_eq!(f.bandwidth_mhz, 20); assert!(f.is_exposable()); got += 1; } assert_eq!(got, 12); let events = { let mut rt2 = CaptureRuntime::open_nexmon_pcap_bytes(&pcap, "n", 2, None).unwrap(); rt2.drain_events().unwrap() }; for e in &events { e.validate().unwrap(); } } #[test] fn missing_file_is_an_error() { assert!(CaptureRuntime::open_capture_file("/nope/x.rvcsi").is_err()); assert!(CaptureRuntime::open_nexmon_file("/nope/x.bin", "s", 0).is_err()); assert!(CaptureRuntime::open_nexmon_pcap("/nope/x.pcap", "s", 0, None).is_err()); assert!(CaptureRuntime::open_nexmon_pcap_bytes(&[0u8; 8], "s", 0, None).is_err()); } }