From 5c914e63c73273bdd7efbf65f895b94f883913c7 Mon Sep 17 00:00:00 2001 From: rUv Date: Thu, 21 May 2026 19:10:15 -0400 Subject: [PATCH] =?UTF-8?q?feat(cog-person-count):=20wire=20`run`=20subcom?= =?UTF-8?q?mand=20=E2=80=94=20v0.0.1=20fully=20functional=20(#697)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 of ADR-103. Adds the long-running polling loop so the cog's fourth verb (`run`) does real work, completing the ADR-100 runtime contract end-to-end: cog-person-count version → "person-count 0.3.0" cog-person-count manifest → JSON skeleton cog-person-count health → loads weights + 1-shot infer + emit cog-person-count run --config → long-running per-frame emit ← THIS What ships: * src/runtime.rs (new) — `run_loop` polls sensing_url every poll_ms, slides a [56, 20] CSI window, runs InferenceEngine::infer, emits publisher::person_count events. Same shape as cog-pose-estimation::runtime — fetch_frame extracts amplitudes from `snapshot.nodes[0].amplitude[]`, fails open on connect errors with a WARN log rather than crashing. * src/lib.rs — registers the runtime module. * src/main.rs — cmd_run now loads RunConfig from a JSON file, builds the InferenceEngine (with weights if cfg.model_path is set, otherwise auto-discover), emits a run.started event, and hands off to the Tokio multi-thread runtime's block_on(run_loop). Single-node fusion is a no-op for N=1 today; v0.2.0 will append predictions from sibling nodes and call fusion::fuse_confidence_weighted before emit. Verified locally: cargo check -p cog-person-count --no-default-features → clean cargo test -p cog-person-count → 15/15 pass (no regressions) cargo build -p cog-person-count --release → 2.36 MB unchanged ./cog-person-count run --config bad-config.json: line 1: {"event":"run.started","fields":{"cog":"person-count", "sensing_url":"http://127.0.0.1:9999/...",poll_ms:100, "model_path":"(auto-discover)"}} line 2: WARN sensing-server fetch failed error=Connection Failed: Connect error: actively refused (loop alive — exits cleanly on SIGTERM, no crash, no NaN) Also adds a "Relationship to the in-process score_to_person_count heuristic" section to cog/README.md explaining the dual-emitter design (sensing-server keeps emitting the PR #491 slot heuristic; the cog runs out-of-process and emits person.count events from the learned model). Operators choose by installing the cog or not — no sensing-server rebuild required. ADR-103 §"Migration" status: 1. Land ADR + scaffold ........... done (#693, #694) 2. Train count_v1 ................ done (#695) 3. Cross-compile + sign + GCS .... done (#696) 4. Server-side wiring ............ done — out-of-process design means no rewire needed; this cog is the wiring. 5. v0.2.0 multi-room + LoRA ...... data-bound (#645) --- v2/crates/cog-person-count/cog/README.md | 11 ++++ v2/crates/cog-person-count/src/lib.rs | 1 + v2/crates/cog-person-count/src/main.rs | 33 ++++++++-- v2/crates/cog-person-count/src/runtime.rs | 77 +++++++++++++++++++++++ 4 files changed, 116 insertions(+), 6 deletions(-) create mode 100644 v2/crates/cog-person-count/src/runtime.rs diff --git a/v2/crates/cog-person-count/cog/README.md b/v2/crates/cog-person-count/cog/README.md index 04774ade..48da6840 100644 --- a/v2/crates/cog-person-count/cog/README.md +++ b/v2/crates/cog-person-count/cog/README.md @@ -47,6 +47,17 @@ Downstream consumers can render the **most-likely count** when confidence is hig `cog-person-count health` will load the real safetensors and report `backend: candle-cpu` rather than `backend: stub`, so the cog-gateway can verify the model loaded — but operators should treat the v0.0.1 count outputs as scaffold-validation rather than production data. The 2.36 MB binary + 392 KB weights + 16 KB ONNX are all real and reusable as soon as more data lands. +## Relationship to the in-process `csi.rs::score_to_person_count` heuristic + +This Cog runs **out-of-process** alongside `wifi-densepose-sensing-server`. The two are complementary, not competing: + +- The sensing-server keeps emitting its existing slot-count heuristic from `csi.rs::score_to_person_count` (PR #491's RollingP95 + `dedup_factor`). This is the **fallback path** — operators who don't install `cog-person-count` still get a count number, just a less calibrated one. +- `cog-person-count` (this binary) polls the same `/api/v1/sensing/latest` endpoint, runs the learned `count_v1` model on each window, and emits `person.count` events on stdout. The appliance's `cognitum-cog-gateway` routes those events to the dashboard via the standard ADR-220 cog-event channel. + +Operators choose by **installing or not installing** this Cog — no sensing-server rebuild required. Downstream consumers (UI, fleet automation, alerting rules) can subscribe to whichever event stream they prefer. + +The architecture decision is documented in [ADR-103 §"Deployment"](../../../../docs/adr/ADR-103-learned-multi-person-counter.md#deployment) and matches the cog/sensing-server boundary established for `cog-pose-estimation` (ADR-101). + ## Security The cog has a very small attack surface — by design, it's a pure consumer of CSI data, not a server: diff --git a/v2/crates/cog-person-count/src/lib.rs b/v2/crates/cog-person-count/src/lib.rs index d5d107aa..168696cc 100644 --- a/v2/crates/cog-person-count/src/lib.rs +++ b/v2/crates/cog-person-count/src/lib.rs @@ -10,6 +10,7 @@ pub mod fusion; pub mod inference; pub mod publisher; +pub mod runtime; pub const COG_ID: &str = "person-count"; pub const COG_VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/v2/crates/cog-person-count/src/main.rs b/v2/crates/cog-person-count/src/main.rs index 976d2098..5bbb030b 100644 --- a/v2/crates/cog-person-count/src/main.rs +++ b/v2/crates/cog-person-count/src/main.rs @@ -103,10 +103,31 @@ fn cmd_health() -> Result<(), Box> { Ok(()) } -fn cmd_run(_config_path: PathBuf) -> Result<(), Box> { - // Long-running mode is wired in the v0.0.1 release follow-up — same - // approach as cog-pose-estimation's runtime.rs. For now, the cog - // satisfies the four-verb contract; downstream consumers integrate - // via the in-process `InferenceEngine` API. - Err("`run` subcommand wiring is pending v0.0.1 — for now consume via the InferenceEngine library API".into()) +fn cmd_run(config_path: PathBuf) -> Result<(), Box> { + let raw = std::fs::read_to_string(&config_path) + .map_err(|e| format!("failed to read config at {}: {}", config_path.display(), e))?; + let cfg: RunConfig = serde_json::from_str(&raw) + .map_err(|e| format!("failed to parse config at {}: {}", config_path.display(), e))?; + + let engine = InferenceEngine::with_weights(cfg.model_path.as_deref())?; + publisher::run_started( + COG_ID, + &cfg.sensing_url, + cfg.poll_ms, + &cfg.model_path + .as_ref() + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "(auto-discover)".to_string()), + ); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + rt.block_on(cog_person_count::runtime::run_loop( + cog_person_count::runtime::RunConfig { + sensing_url: cfg.sensing_url, + poll_ms: cfg.poll_ms, + }, + engine, + )) } diff --git a/v2/crates/cog-person-count/src/runtime.rs b/v2/crates/cog-person-count/src/runtime.rs new file mode 100644 index 00000000..dfc04376 --- /dev/null +++ b/v2/crates/cog-person-count/src/runtime.rs @@ -0,0 +1,77 @@ +//! Long-running inference loop. Polls the appliance's sensing-server, +//! slides a CSI window, runs the count head, and emits `person.count` +//! events. Same shape as `cog-pose-estimation::runtime`. +//! +//! Multi-node fusion is single-node only in v0.0.1 — the appliance's +//! `/api/v1/sensing/latest` endpoint already aggregates across nodes +//! before serving, so per-cog fusion is deferred until each node ships +//! raw frames separately (ADR-103 §"Multi-node fusion" v0.2.0). + +use crate::inference::{CsiWindow, InferenceEngine, INPUT_SUBCARRIERS, INPUT_TIMESTEPS}; +use crate::publisher; +use std::time::Duration; +use tokio::time::sleep; + +pub struct RunConfig { + pub sensing_url: String, + pub poll_ms: u64, +} + +pub async fn run_loop( + cfg: RunConfig, + engine: InferenceEngine, +) -> Result<(), Box> { + let mut buffer: Vec = Vec::with_capacity(INPUT_SUBCARRIERS * INPUT_TIMESTEPS); + let cap = INPUT_SUBCARRIERS * INPUT_TIMESTEPS; + let mut tick: u64 = 0; + + loop { + match fetch_frame(&cfg.sensing_url).await { + Ok(amplitudes) => { + tick += 1; + buffer.extend(amplitudes); + while buffer.len() > 2 * cap { + let extra = buffer.len() - cap; + buffer.drain(0..extra); + } + if buffer.len() >= cap { + let window = CsiWindow { data: buffer[buffer.len() - cap..].to_vec() }; + if let Ok(pred) = engine.infer(&window) { + // v0.0.1 ships single-node — fusion is a no-op for + // N=1. v0.2.0 will append additional per-node + // predictions to a vec and call + // `fusion::fuse_confidence_weighted` before emit. + publisher::person_count(tick, &pred, 1); + } + } + } + Err(e) => { + tracing::warn!(error = %e, "sensing-server fetch failed"); + } + } + sleep(Duration::from_millis(cfg.poll_ms)).await; + } +} + +async fn fetch_frame(url: &str) -> Result, Box> { + let url = url.to_string(); + let body = tokio::task::spawn_blocking(move || -> Result { + Ok(ureq::get(&url).call()?.into_string()?) + }) + .await??; + let json: serde_json::Value = serde_json::from_str(&body)?; + let snapshot = json.get("snapshot").unwrap_or(&json); + let nodes = snapshot + .get("nodes") + .and_then(|v| v.as_array()) + .ok_or("missing nodes[]")?; + let amplitude = nodes + .first() + .and_then(|n| n.get("amplitude")) + .and_then(|v| v.as_array()) + .ok_or("missing nodes[0].amplitude[]")?; + Ok(amplitude + .iter() + .filter_map(|v| v.as_f64().map(|f| f as f32)) + .collect()) +}