feat(adr-118/p6.13): examples/bfld_handle.rs worker-thread pattern (319/319 GREEN)
Iter 48. Ships the production-recommended operator example: full
lifecycle through the worker-thread handle. Companion to iter-47's
minimal example which uses BfldPipeline::process directly. The
handle example demonstrates the multi-thread pattern operators
actually deploy with HA + MQTT.
Lifecycle demonstrated in the example:
1. publish_availability_online (retained → HA marks device online)
2. publish_discovery (retained → HA auto-creates 6 BFLD entities)
3. BfldPipelineHandle::spawn (worker owns gate + ring + hasher)
4. handle.send(input) per BFI frame (worker process + publish)
5. handle.shutdown() (clean worker join)
6. publish_availability_offline (explicit graceful disconnect)
Example output (verified pre-commit):
bootstrap: 1 availability + 6 discovery payloads
total messages published: 33
first three topics:
ruview/seed-handle-demo/bfld/availability
homeassistant/binary_sensor/seed-handle-demo_bfld_presence/config
homeassistant/sensor/seed-handle-demo_bfld_motion/config
last three topics:
ruview/seed-handle-demo/bfld/confidence/state
ruview/seed-handle-demo/bfld/identity_risk/state
ruview/seed-handle-demo/bfld/availability
Added:
- v2/crates/wifi-densepose-bfld/examples/bfld_handle.rs (~110 LOC):
* Documents the 6-phase lifecycle with inline comments
* Pointer to RumqttPublisher::connect_with_lwt for prod use
* 5 sensing frames × 5 state topics = 25 per-frame messages
- v2/crates/wifi-densepose-bfld/tests/example_handle.rs (4 named tests):
handle_example_documents_full_lifecycle_phases
(doc drift guard: 8 operator-facing symbols must appear)
handle_example_carries_run_instructions_and_prod_pointer
(cargo run line + RumqttPublisher pointer present)
handle_example_lifecycle_produces_expected_message_counts
*** Re-executes full lifecycle inline; asserts total == 33,
first message payload == "online", last == "offline" ***
handle_example_returns_box_dyn_error_for_main_signature
- v2/crates/wifi-densepose-bfld/Cargo.toml:
[[example]] name = "bfld_handle", required-features = ["std"]
ADR-124 status (iter step 0 sibling check):
- docs/adr/ADR-124-rvagent-mcp-ruvector-npm-integration.md unchanged
at 431 lines. SENSE-BRIDGE scope remains orthogonal.
ACs progressed:
- ADR-118 §2.1 documentation surface — two runnable operator examples
now shipped (iter 47 minimal, iter 48 worker-thread). Together
they cover the two operator patterns: simple in-process consumer
(process + to_json) and the full HA-integration deployment
(handle + bootstrap + lifecycle).
- ADR-122 §2.1 + §2.2 + §2.6 — the worker example exercises every
layer of the HA-DISCO publish chain in one runnable file:
availability, discovery, state, graceful shutdown.
Test config:
- cargo test --no-default-features → 101 passed (example_handle cfg-out)
- cargo test → 319 passed (315 + 4)
Out of scope (next iter target):
- PR-readiness pivot still pending. External-resource-gated work
(KIT BFId, Pi5/Nexmon) still skipped.
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
ea7b5711a1
commit
519e0044b1
|
|
@ -46,6 +46,11 @@ proptest.workspace = true
|
|||
name = "bfld_minimal"
|
||||
required-features = ["serde-json"]
|
||||
|
||||
# The handle example uses the std-only publish helpers and pipeline handle.
|
||||
[[example]]
|
||||
name = "bfld_handle"
|
||||
required-features = ["std"]
|
||||
|
||||
[lints.rust]
|
||||
unsafe_code = "forbid"
|
||||
missing_docs = "warn"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,109 @@
|
|||
//! Worker-thread BFLD example — the production-recommended pattern.
|
||||
//!
|
||||
//! Demonstrates the full operator lifecycle:
|
||||
//! 1. publish_availability_online (retained) → HA marks device online
|
||||
//! 2. publish_discovery (retained) → HA auto-creates 6 BFLD entities
|
||||
//! 3. BfldPipelineHandle::spawn → worker owns gate + ring + hasher
|
||||
//! 4. handle.send(input) per BFI frame → worker process + publish
|
||||
//! 5. handle.shutdown() → clean worker join
|
||||
//! 6. publish_availability_offline → HA marks device offline
|
||||
//!
|
||||
//! Run with:
|
||||
//! ```sh
|
||||
//! cargo run -p wifi-densepose-bfld --example bfld_handle
|
||||
//! ```
|
||||
//!
|
||||
//! For a real broker, swap `CapturePublisher` for `RumqttPublisher::connect_with_lwt(...)`
|
||||
//! (requires `--features mqtt`).
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use wifi_densepose_bfld::{
|
||||
publish_availability_offline, publish_availability_online, publish_discovery, BfldConfig,
|
||||
BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding, PipelineInput,
|
||||
PrivacyClass, SensingInputs, SignatureHasher, EMBEDDING_DIM, SITE_SALT_LEN,
|
||||
};
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let node_id = "seed-handle-demo";
|
||||
let site_salt: [u8; SITE_SALT_LEN] = [0xC0; SITE_SALT_LEN];
|
||||
|
||||
// Shared publisher (CapturePublisher for demo; RumqttPublisher in prod).
|
||||
let publisher = Arc::new(Mutex::new(CapturePublisher::default()));
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// Phase 1 — Bootstrap. Three messages land on the broker (or
|
||||
// capture log) BEFORE the worker starts: online + 6 discovery payloads.
|
||||
// In production these should be published with retain=true so HA picks
|
||||
// them up on reconnect.
|
||||
// ----------------------------------------------------------------
|
||||
publish_availability_online(&mut publisher.clone(), node_id)?;
|
||||
let discovery_count = publish_discovery(&mut publisher.clone(), node_id, PrivacyClass::Anonymous)?;
|
||||
println!("bootstrap: 1 availability + {discovery_count} discovery payloads");
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// Phase 2 — Spawn the worker thread. From this point on, the
|
||||
// operator only calls handle.send(...) per frame; the worker owns
|
||||
// every piece of pipeline state.
|
||||
// ----------------------------------------------------------------
|
||||
let pipeline = BfldPipeline::new(
|
||||
BfldConfig::new(node_id).with_signature_hasher(SignatureHasher::new(site_salt)),
|
||||
);
|
||||
let handle = BfldPipelineHandle::spawn(pipeline, publisher.clone());
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// Phase 3 — Drive 5 sensing frames. Each one becomes 5 MQTT state
|
||||
// messages (presence/motion/count/conf/identity_risk for Anonymous
|
||||
// class, no zone configured).
|
||||
// ----------------------------------------------------------------
|
||||
for i in 0..5u64 {
|
||||
let timestamp_ns = 1_700_000_000_000_000_000 + i * 200_000_000;
|
||||
let mut emb = [0.0f32; EMBEDDING_DIM];
|
||||
for (j, v) in emb.iter_mut().enumerate() {
|
||||
*v = (j as f32 + i as f32) * 0.005;
|
||||
}
|
||||
let input = PipelineInput {
|
||||
inputs: SensingInputs {
|
||||
timestamp_ns,
|
||||
presence: true,
|
||||
motion: 0.3 + (i as f32) * 0.1,
|
||||
person_count: 1,
|
||||
sensing_confidence: 0.9,
|
||||
sep: 0.2,
|
||||
stab: 0.2,
|
||||
consist: 0.2,
|
||||
risk_conf: 0.2,
|
||||
rf_signature_hash: None,
|
||||
},
|
||||
embedding: Some(IdentityEmbedding::from_raw(emb)),
|
||||
};
|
||||
handle.send(input)?;
|
||||
}
|
||||
|
||||
// Give the worker time to drain the channel before shutdown.
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// Phase 4 — Graceful shutdown. handle.shutdown() joins the worker;
|
||||
// publish_availability_offline then signals HA explicitly (the LWT
|
||||
// configured on RumqttPublisher::connect_with_lwt would handle the
|
||||
// crash case).
|
||||
// ----------------------------------------------------------------
|
||||
handle.shutdown();
|
||||
publish_availability_offline(&mut publisher.clone(), node_id)?;
|
||||
|
||||
// Print a summary so the example produces visible output.
|
||||
let log = publisher.lock().expect("publisher mutex");
|
||||
println!("total messages published: {}", log.published.len());
|
||||
println!("first three topics:");
|
||||
for msg in log.published.iter().take(3) {
|
||||
println!(" {}", msg.topic);
|
||||
}
|
||||
println!("last three topics:");
|
||||
for msg in log.published.iter().rev().take(3).collect::<Vec<_>>().iter().rev() {
|
||||
println!(" {}", msg.topic);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
//! Validate `examples/bfld_handle.rs` operator quickstart. Re-runs the same
|
||||
//! lifecycle inline so CI proves the worker-thread pattern works end-to-end.
|
||||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use wifi_densepose_bfld::{
|
||||
publish_availability_offline, publish_availability_online, publish_discovery, BfldConfig,
|
||||
BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding, PipelineInput,
|
||||
PrivacyClass, SensingInputs, SignatureHasher, EMBEDDING_DIM, SITE_SALT_LEN,
|
||||
};
|
||||
|
||||
const HANDLE_EXAMPLE: &str = include_str!("../examples/bfld_handle.rs");
|
||||
|
||||
#[test]
|
||||
fn handle_example_documents_full_lifecycle_phases() {
|
||||
// Doc drift guard: every operator-facing symbol must appear in the file.
|
||||
for needle in [
|
||||
"publish_availability_online",
|
||||
"publish_discovery",
|
||||
"BfldPipelineHandle::spawn",
|
||||
"handle.send",
|
||||
"handle.shutdown",
|
||||
"publish_availability_offline",
|
||||
"SignatureHasher",
|
||||
"PipelineInput",
|
||||
] {
|
||||
assert!(
|
||||
HANDLE_EXAMPLE.contains(needle),
|
||||
"example must reference {needle}",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_example_carries_run_instructions_and_prod_pointer() {
|
||||
assert!(
|
||||
HANDLE_EXAMPLE.contains("cargo run -p wifi-densepose-bfld --example bfld_handle"),
|
||||
"example must document its own run command",
|
||||
);
|
||||
assert!(
|
||||
HANDLE_EXAMPLE.contains("RumqttPublisher::connect_with_lwt"),
|
||||
"example must point operators at the production publisher path",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_example_lifecycle_produces_expected_message_counts() {
|
||||
// Re-execute the lifecycle inline. End state must show:
|
||||
// 1 (online) + 6 (discovery anonymous + zone-less) + 5×5 (state per
|
||||
// send) + 1 (offline) = 33 messages.
|
||||
let node_id = "seed-handle-test";
|
||||
let site_salt: [u8; SITE_SALT_LEN] = [0xC0; SITE_SALT_LEN];
|
||||
|
||||
let publisher = Arc::new(Mutex::new(CapturePublisher::default()));
|
||||
|
||||
publish_availability_online(&mut publisher.clone(), node_id).expect("online");
|
||||
let discovery_count =
|
||||
publish_discovery(&mut publisher.clone(), node_id, PrivacyClass::Anonymous)
|
||||
.expect("discovery");
|
||||
assert_eq!(discovery_count, 6);
|
||||
|
||||
let pipeline = BfldPipeline::new(
|
||||
BfldConfig::new(node_id).with_signature_hasher(SignatureHasher::new(site_salt)),
|
||||
);
|
||||
let handle = BfldPipelineHandle::spawn(pipeline, publisher.clone());
|
||||
|
||||
for i in 0..5u64 {
|
||||
let timestamp_ns = 1_700_000_000_000_000_000 + i * 200_000_000;
|
||||
let input = PipelineInput {
|
||||
inputs: SensingInputs {
|
||||
timestamp_ns,
|
||||
presence: true,
|
||||
motion: 0.3 + (i as f32) * 0.1,
|
||||
person_count: 1,
|
||||
sensing_confidence: 0.9,
|
||||
sep: 0.2,
|
||||
stab: 0.2,
|
||||
consist: 0.2,
|
||||
risk_conf: 0.2,
|
||||
rf_signature_hash: None,
|
||||
},
|
||||
embedding: Some(IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])),
|
||||
};
|
||||
handle.send(input).expect("send");
|
||||
}
|
||||
thread::sleep(Duration::from_millis(120));
|
||||
handle.shutdown();
|
||||
|
||||
publish_availability_offline(&mut publisher.clone(), node_id).expect("offline");
|
||||
|
||||
let log = publisher.lock().expect("publisher mutex");
|
||||
let total = log.published.len();
|
||||
|
||||
// Expected: 1 online + 6 discovery + 5 × 5 state + 1 offline = 33.
|
||||
assert_eq!(
|
||||
total, 33,
|
||||
"expected 33 total messages from full lifecycle, got {total}; \
|
||||
topics: {:?}",
|
||||
log.published
|
||||
.iter()
|
||||
.map(|m| &m.topic)
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
// First message is the online availability.
|
||||
assert_eq!(log.published[0].payload, "online");
|
||||
// Last message is the offline availability.
|
||||
assert_eq!(log.published[total - 1].payload, "offline");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_example_returns_box_dyn_error_for_main_signature() {
|
||||
assert!(
|
||||
HANDLE_EXAMPLE.contains("fn main() -> Result<(), Box<dyn std::error::Error>>"),
|
||||
);
|
||||
}
|
||||
Loading…
Reference in New Issue