diff --git a/v2/crates/cog-person-count/cog/README.md b/v2/crates/cog-person-count/cog/README.md index 04774ade..48da6840 100644 --- a/v2/crates/cog-person-count/cog/README.md +++ b/v2/crates/cog-person-count/cog/README.md @@ -47,6 +47,17 @@ Downstream consumers can render the **most-likely count** when confidence is hig `cog-person-count health` will load the real safetensors and report `backend: candle-cpu` rather than `backend: stub`, so the cog-gateway can verify the model loaded — but operators should treat the v0.0.1 count outputs as scaffold-validation rather than production data. The 2.36 MB binary + 392 KB weights + 16 KB ONNX are all real and reusable as soon as more data lands. +## Relationship to the in-process `csi.rs::score_to_person_count` heuristic + +This Cog runs **out-of-process** alongside `wifi-densepose-sensing-server`. The two are complementary, not competing: + +- The sensing-server keeps emitting its existing slot-count heuristic from `csi.rs::score_to_person_count` (PR #491's RollingP95 + `dedup_factor`). This is the **fallback path** — operators who don't install `cog-person-count` still get a count number, just a less calibrated one. +- `cog-person-count` (this binary) polls the same `/api/v1/sensing/latest` endpoint, runs the learned `count_v1` model on each window, and emits `person.count` events on stdout. The appliance's `cognitum-cog-gateway` routes those events to the dashboard via the standard ADR-220 cog-event channel. + +Operators choose by **installing or not installing** this Cog — no sensing-server rebuild required. Downstream consumers (UI, fleet automation, alerting rules) can subscribe to whichever event stream they prefer. + +The architecture decision is documented in [ADR-103 §"Deployment"](../../../../docs/adr/ADR-103-learned-multi-person-counter.md#deployment) and matches the cog/sensing-server boundary established for `cog-pose-estimation` (ADR-101). + ## Security The cog has a very small attack surface — by design, it's a pure consumer of CSI data, not a server: diff --git a/v2/crates/cog-person-count/src/lib.rs b/v2/crates/cog-person-count/src/lib.rs index d5d107aa..168696cc 100644 --- a/v2/crates/cog-person-count/src/lib.rs +++ b/v2/crates/cog-person-count/src/lib.rs @@ -10,6 +10,7 @@ pub mod fusion; pub mod inference; pub mod publisher; +pub mod runtime; pub const COG_ID: &str = "person-count"; pub const COG_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/v2/crates/cog-person-count/src/main.rs b/v2/crates/cog-person-count/src/main.rs index 976d2098..5bbb030b 100644 --- a/v2/crates/cog-person-count/src/main.rs +++ b/v2/crates/cog-person-count/src/main.rs @@ -103,10 +103,31 @@ fn cmd_health() -> Result<(), Box> { Ok(()) } -fn cmd_run(_config_path: PathBuf) -> Result<(), Box> { - // Long-running mode is wired in the v0.0.1 release follow-up — same - // approach as cog-pose-estimation's runtime.rs. For now, the cog - // satisfies the four-verb contract; downstream consumers integrate - // via the in-process `InferenceEngine` API. - Err("`run` subcommand wiring is pending v0.0.1 — for now consume via the InferenceEngine library API".into()) +fn cmd_run(config_path: PathBuf) -> Result<(), Box> { + let raw = std::fs::read_to_string(&config_path) + .map_err(|e| format!("failed to read config at {}: {}", config_path.display(), e))?; + let cfg: RunConfig = serde_json::from_str(&raw) + .map_err(|e| format!("failed to parse config at {}: {}", config_path.display(), e))?; + + let engine = InferenceEngine::with_weights(cfg.model_path.as_deref())?; + publisher::run_started( + COG_ID, + &cfg.sensing_url, + cfg.poll_ms, + &cfg.model_path + .as_ref() + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "(auto-discover)".to_string()), + ); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + rt.block_on(cog_person_count::runtime::run_loop( + cog_person_count::runtime::RunConfig { + sensing_url: cfg.sensing_url, + poll_ms: cfg.poll_ms, + }, + engine, + )) } diff --git a/v2/crates/cog-person-count/src/runtime.rs b/v2/crates/cog-person-count/src/runtime.rs new file mode 100644 index 00000000..dfc04376 --- /dev/null +++ b/v2/crates/cog-person-count/src/runtime.rs @@ -0,0 +1,77 @@ +//! Long-running inference loop. Polls the appliance's sensing-server, +//! slides a CSI window, runs the count head, and emits `person.count` +//! events. Same shape as `cog-pose-estimation::runtime`. +//! +//! Multi-node fusion is single-node only in v0.0.1 — the appliance's +//! `/api/v1/sensing/latest` endpoint already aggregates across nodes +//! before serving, so per-cog fusion is deferred until each node ships +//! raw frames separately (ADR-103 §"Multi-node fusion" v0.2.0). + +use crate::inference::{CsiWindow, InferenceEngine, INPUT_SUBCARRIERS, INPUT_TIMESTEPS}; +use crate::publisher; +use std::time::Duration; +use tokio::time::sleep; + +pub struct RunConfig { + pub sensing_url: String, + pub poll_ms: u64, +} + +pub async fn run_loop( + cfg: RunConfig, + engine: InferenceEngine, +) -> Result<(), Box> { + let mut buffer: Vec = Vec::with_capacity(INPUT_SUBCARRIERS * INPUT_TIMESTEPS); + let cap = INPUT_SUBCARRIERS * INPUT_TIMESTEPS; + let mut tick: u64 = 0; + + loop { + match fetch_frame(&cfg.sensing_url).await { + Ok(amplitudes) => { + tick += 1; + buffer.extend(amplitudes); + while buffer.len() > 2 * cap { + let extra = buffer.len() - cap; + buffer.drain(0..extra); + } + if buffer.len() >= cap { + let window = CsiWindow { data: buffer[buffer.len() - cap..].to_vec() }; + if let Ok(pred) = engine.infer(&window) { + // v0.0.1 ships single-node — fusion is a no-op for + // N=1. v0.2.0 will append additional per-node + // predictions to a vec and call + // `fusion::fuse_confidence_weighted` before emit. + publisher::person_count(tick, &pred, 1); + } + } + } + Err(e) => { + tracing::warn!(error = %e, "sensing-server fetch failed"); + } + } + sleep(Duration::from_millis(cfg.poll_ms)).await; + } +} + +async fn fetch_frame(url: &str) -> Result, Box> { + let url = url.to_string(); + let body = tokio::task::spawn_blocking(move || -> Result { + Ok(ureq::get(&url).call()?.into_string()?) + }) + .await??; + let json: serde_json::Value = serde_json::from_str(&body)?; + let snapshot = json.get("snapshot").unwrap_or(&json); + let nodes = snapshot + .get("nodes") + .and_then(|v| v.as_array()) + .ok_or("missing nodes[]")?; + let amplitude = nodes + .first() + .and_then(|n| n.get("amplitude")) + .and_then(|v| v.as_array()) + .ok_or("missing nodes[0].amplitude[]")?; + Ok(amplitude + .iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect()) +}