feat(cog-person-count): wire `run` subcommand — v0.0.1 fully functional (#697)

Phase 4 of ADR-103. Adds the long-running polling loop so the cog's
fourth verb (`run`) does real work, completing the ADR-100 runtime
contract end-to-end:

  cog-person-count version    → "person-count 0.3.0"
  cog-person-count manifest   → JSON skeleton
  cog-person-count health     → loads weights + 1-shot infer + emit
  cog-person-count run --config  → long-running per-frame emit  ← THIS

What ships:

* src/runtime.rs (new) — `run_loop` polls sensing_url every poll_ms,
  slides a [56, 20] CSI window, runs InferenceEngine::infer, emits
  publisher::person_count events. Same shape as
  cog-pose-estimation::runtime — fetch_frame extracts amplitudes
  from `snapshot.nodes[0].amplitude[]`, fails open on connect errors
  with a WARN log rather than crashing.
* src/lib.rs — registers the runtime module.
* src/main.rs — cmd_run now loads RunConfig from a JSON file, builds
  the InferenceEngine (with weights if cfg.model_path is set,
  otherwise auto-discover), emits a run.started event, and hands off
  to the Tokio multi-thread runtime's block_on(run_loop). Single-node
  fusion is a no-op for N=1 today; v0.2.0 will append predictions
  from sibling nodes and call fusion::fuse_confidence_weighted before
  emit.

Verified locally:

  cargo check  -p cog-person-count --no-default-features   → clean
  cargo test   -p cog-person-count                          → 15/15 pass (no regressions)
  cargo build  -p cog-person-count --release                → 2.36 MB unchanged
  ./cog-person-count run --config bad-config.json:
    line 1: {"event":"run.started","fields":{"cog":"person-count",
             "sensing_url":"http://127.0.0.1:9999/...",poll_ms:100,
             "model_path":"(auto-discover)"}}
    line 2: WARN sensing-server fetch failed
            error=Connection Failed: Connect error: actively refused
    (loop alive — exits cleanly on SIGTERM, no crash, no NaN)

Also adds a "Relationship to the in-process score_to_person_count
heuristic" section to cog/README.md explaining the dual-emitter
design (sensing-server keeps emitting the PR #491 slot heuristic;
the cog runs out-of-process and emits person.count events from the
learned model). Operators choose by installing the cog or not — no
sensing-server rebuild required.

ADR-103 §"Migration" status:
  1. Land ADR + scaffold ........... done (#693, #694)
  2. Train count_v1 ................ done (#695)
  3. Cross-compile + sign + GCS .... done (#696)
  4. Server-side wiring ............ done — out-of-process design
                                      means no rewire needed; this
                                      cog is the wiring.
  5. v0.2.0 multi-room + LoRA ...... data-bound (#645)
This commit is contained in:
rUv 2026-05-21 19:10:15 -04:00 committed by GitHub
parent a5e99670f8
commit 5c914e63c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 6 deletions

View File

@ -47,6 +47,17 @@ Downstream consumers can render the **most-likely count** when confidence is hig
`cog-person-count health` will load the real safetensors and report `backend: candle-cpu` rather than `backend: stub`, so the cog-gateway can verify the model loaded — but operators should treat the v0.0.1 count outputs as scaffold-validation rather than production data. The 2.36 MB binary + 392 KB weights + 16 KB ONNX are all real and reusable as soon as more data lands.
## Relationship to the in-process `csi.rs::score_to_person_count` heuristic
This Cog runs **out-of-process** alongside `wifi-densepose-sensing-server`. The two are complementary, not competing:
- The sensing-server keeps emitting its existing slot-count heuristic from `csi.rs::score_to_person_count` (PR #491's RollingP95 + `dedup_factor`). This is the **fallback path** — operators who don't install `cog-person-count` still get a count number, just a less calibrated one.
- `cog-person-count` (this binary) polls the same `/api/v1/sensing/latest` endpoint, runs the learned `count_v1` model on each window, and emits `person.count` events on stdout. The appliance's `cognitum-cog-gateway` routes those events to the dashboard via the standard ADR-220 cog-event channel.
Operators choose by **installing or not installing** this Cog — no sensing-server rebuild required. Downstream consumers (UI, fleet automation, alerting rules) can subscribe to whichever event stream they prefer.
The architecture decision is documented in [ADR-103 §"Deployment"](../../../../docs/adr/ADR-103-learned-multi-person-counter.md#deployment) and matches the cog/sensing-server boundary established for `cog-pose-estimation` (ADR-101).
## Security
The cog has a very small attack surface — by design, it's a pure consumer of CSI data, not a server:

View File

@ -10,6 +10,7 @@
pub mod fusion;
pub mod inference;
pub mod publisher;
pub mod runtime;
pub const COG_ID: &str = "person-count";
pub const COG_VERSION: &str = env!("CARGO_PKG_VERSION");

View File

@ -103,10 +103,31 @@ fn cmd_health() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
fn cmd_run(_config_path: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
// Long-running mode is wired in the v0.0.1 release follow-up — same
// approach as cog-pose-estimation's runtime.rs. For now, the cog
// satisfies the four-verb contract; downstream consumers integrate
// via the in-process `InferenceEngine` API.
Err("`run` subcommand wiring is pending v0.0.1 — for now consume via the InferenceEngine library API".into())
fn cmd_run(config_path: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
let raw = std::fs::read_to_string(&config_path)
.map_err(|e| format!("failed to read config at {}: {}", config_path.display(), e))?;
let cfg: RunConfig = serde_json::from_str(&raw)
.map_err(|e| format!("failed to parse config at {}: {}", config_path.display(), e))?;
let engine = InferenceEngine::with_weights(cfg.model_path.as_deref())?;
publisher::run_started(
COG_ID,
&cfg.sensing_url,
cfg.poll_ms,
&cfg.model_path
.as_ref()
.map(|p| p.display().to_string())
.unwrap_or_else(|| "(auto-discover)".to_string()),
);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(cog_person_count::runtime::run_loop(
cog_person_count::runtime::RunConfig {
sensing_url: cfg.sensing_url,
poll_ms: cfg.poll_ms,
},
engine,
))
}

View File

@ -0,0 +1,77 @@
//! Long-running inference loop. Polls the appliance's sensing-server,
//! slides a CSI window, runs the count head, and emits `person.count`
//! events. Same shape as `cog-pose-estimation::runtime`.
//!
//! Multi-node fusion is single-node only in v0.0.1 — the appliance's
//! `/api/v1/sensing/latest` endpoint already aggregates across nodes
//! before serving, so per-cog fusion is deferred until each node ships
//! raw frames separately (ADR-103 §"Multi-node fusion" v0.2.0).
use crate::inference::{CsiWindow, InferenceEngine, INPUT_SUBCARRIERS, INPUT_TIMESTEPS};
use crate::publisher;
use std::time::Duration;
use tokio::time::sleep;
pub struct RunConfig {
pub sensing_url: String,
pub poll_ms: u64,
}
pub async fn run_loop(
cfg: RunConfig,
engine: InferenceEngine,
) -> Result<(), Box<dyn std::error::Error>> {
let mut buffer: Vec<f32> = Vec::with_capacity(INPUT_SUBCARRIERS * INPUT_TIMESTEPS);
let cap = INPUT_SUBCARRIERS * INPUT_TIMESTEPS;
let mut tick: u64 = 0;
loop {
match fetch_frame(&cfg.sensing_url).await {
Ok(amplitudes) => {
tick += 1;
buffer.extend(amplitudes);
while buffer.len() > 2 * cap {
let extra = buffer.len() - cap;
buffer.drain(0..extra);
}
if buffer.len() >= cap {
let window = CsiWindow { data: buffer[buffer.len() - cap..].to_vec() };
if let Ok(pred) = engine.infer(&window) {
// v0.0.1 ships single-node — fusion is a no-op for
// N=1. v0.2.0 will append additional per-node
// predictions to a vec and call
// `fusion::fuse_confidence_weighted` before emit.
publisher::person_count(tick, &pred, 1);
}
}
}
Err(e) => {
tracing::warn!(error = %e, "sensing-server fetch failed");
}
}
sleep(Duration::from_millis(cfg.poll_ms)).await;
}
}
async fn fetch_frame(url: &str) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
let url = url.to_string();
let body = tokio::task::spawn_blocking(move || -> Result<String, ureq::Error> {
Ok(ureq::get(&url).call()?.into_string()?)
})
.await??;
let json: serde_json::Value = serde_json::from_str(&body)?;
let snapshot = json.get("snapshot").unwrap_or(&json);
let nodes = snapshot
.get("nodes")
.and_then(|v| v.as_array())
.ok_or("missing nodes[]")?;
let amplitude = nodes
.first()
.and_then(|n| n.get("amplitude"))
.and_then(|v| v.as_array())
.ok_or("missing nodes[0].amplitude[]")?;
Ok(amplitude
.iter()
.filter_map(|v| v.as_f64().map(|f| f as f32))
.collect())
}