From 74807a60c8460f8442381122c76d6ed3743db3f1 Mon Sep 17 00:00:00 2001 From: ruv Date: Sun, 24 May 2026 18:08:59 -0400 Subject: [PATCH] =?UTF-8?q?feat(adr-118/p5.9):=20RumqttPublisher::connect?= =?UTF-8?q?=5Fwith=5Flwt=20=E2=80=94=20broker=20auto-publishes=20"offline"?= =?UTF-8?q?=20(220/220=20GREEN=20with=20mqtt)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 29. Wires rumqttc::MqttOptions::set_last_will so the broker auto-publishes "offline" on ruview//bfld/availability (retained, QoS 1) when the publisher's TCP session drops without a clean DISCONNECT. Closes the iter-28 lifecycle loop: explicit "online" on connect + LWT-driven "offline" on session loss + explicit "offline" on graceful shutdown. Added (in src/rumqttc_publisher.rs, gated on `feature = "mqtt"`): - RumqttPublisher::connect_with_lwt(node_id, opts, capacity) -> (Self, Connection) Convenience wrapping with_lwt(opts, node_id) then Self::connect(opts, capacity). - with_lwt(opts, node_id) -> MqttOptions free helper for operators who build their own opts (custom TLS, credentials) and want to opt in to the LWT without using the connect_with_lwt shortcut. - rumqttc 0.24 LastWill::new(topic, message, qos, retain) — 4-arg form; retain = true so HA sees "offline" on next start even if it was down when the session dropped. - pub use with_lwt, RumqttPublisher from lib.rs tests/rumqttc_lwt.rs (8 named tests, all green, gated on mqtt): with_lwt_returns_options_without_panic connect_with_lwt_constructs_publisher_and_connection connect_with_lwt_uses_documented_availability_topic (constructive proof — both LWT and discovery use the same availability_topic() function so they can't drift) connect_with_lwt_publisher_still_publishes_state_topics (LWT is purely additive — state topics work as before) publisher_trait_object_constructible_with_lwt_path with_lwt_is_idempotent_against_double_call (rumqttc replaces the will silently — useful for wrapper libraries) caller_built_options_can_opt_in_via_with_lwt_then_pass_to_connect (operator pattern: build opts with TLS/creds, attach LWT, then connect) placeholder_topicmessage_path_unaffected_by_lwt Test bug caught: - Initial test asserted 4 topics for Anonymous + no zone; actual is 5 (presence + motion + person_count + confidence + identity_risk). rf_signature_hash is a BfldEvent JSON field, not its own MQTT topic. Fixed the assertion; documented the distinction in the test comment. ACs progressed: - ADR-122 §2.2 availability surface now fully operational. Three paths: 1. Explicit publish_availability_online (iter 28) on connect 2. LWT auto-publishes "offline" if connection drops (this iter) 3. Explicit publish_availability_offline (iter 28) on graceful stop HA reads the same topic in all three cases; entities grey out device-wide via the iter-28 discovery `availability_topic` field. Test config: - cargo test --no-default-features → 72 passed - cargo test → 203 passed - cargo test --features mqtt → 220 passed (212 + 8 new) Out of scope (next iter target): - GitHub Actions workflow with mosquitto Docker service. With iter 24+29 now both depending on a live broker for full coverage, the CI lift is the next highest-value step. - Three operator-ready HA blueprints (ADR-122 §2.6): presence-driven lighting, motion-aware HVAC, identity-risk anomaly notification. Co-Authored-By: claude-flow --- v2/crates/wifi-densepose-bfld/src/lib.rs | 2 +- .../src/rumqttc_publisher.rs | 38 ++++++- .../wifi-densepose-bfld/tests/rumqttc_lwt.rs | 106 ++++++++++++++++++ 3 files changed, 144 insertions(+), 2 deletions(-) create mode 100644 v2/crates/wifi-densepose-bfld/tests/rumqttc_lwt.rs diff --git a/v2/crates/wifi-densepose-bfld/src/lib.rs b/v2/crates/wifi-densepose-bfld/src/lib.rs index 1b40948a..5d39824c 100644 --- a/v2/crates/wifi-densepose-bfld/src/lib.rs +++ b/v2/crates/wifi-densepose-bfld/src/lib.rs @@ -58,7 +58,7 @@ pub use ha_discovery::{publish_discovery, render_discovery_payloads}; #[cfg(feature = "std")] pub use mqtt_topics::{publish_event, render_events, CapturePublisher, Publish, TopicMessage}; #[cfg(feature = "mqtt")] -pub use rumqttc_publisher::RumqttPublisher; +pub use rumqttc_publisher::{with_lwt, RumqttPublisher}; pub use embedding::{IdentityEmbedding, EMBEDDING_DIM}; pub use embedding_ring::{EmbeddingRing, RING_CAPACITY}; #[cfg(feature = "std")] diff --git a/v2/crates/wifi-densepose-bfld/src/rumqttc_publisher.rs b/v2/crates/wifi-densepose-bfld/src/rumqttc_publisher.rs index e55aa781..603cff11 100644 --- a/v2/crates/wifi-densepose-bfld/src/rumqttc_publisher.rs +++ b/v2/crates/wifi-densepose-bfld/src/rumqttc_publisher.rs @@ -22,8 +22,9 @@ #![cfg(feature = "mqtt")] -use rumqttc::{Client, Connection, MqttOptions, QoS}; +use rumqttc::{Client, Connection, LastWill, MqttOptions, QoS}; +use crate::availability::{availability_topic, PAYLOAD_NOT_AVAILABLE}; use crate::mqtt_topics::{Publish, TopicMessage}; /// Sync MQTT publisher wrapping [`rumqttc::Client`]. @@ -62,6 +63,41 @@ impl RumqttPublisher { let (client, connection) = Client::new(opts, capacity); (Self::new(client, QoS::AtLeastOnce), connection) } + + /// Like [`Self::connect`] but also configures the MQTT Last Will and + /// Testament so the broker auto-publishes `"offline"` on + /// `ruview//bfld/availability` (retained, QoS 1) when the + /// publisher's TCP session drops without a clean DISCONNECT. + /// + /// Pairs with [`crate::publish_availability_online`] — call that on first + /// CONNECT to set `"online"`; the LWT covers the disconnect path. + #[must_use] + pub fn connect_with_lwt( + node_id: &str, + opts: MqttOptions, + capacity: usize, + ) -> (Self, Connection) { + let opts = with_lwt(opts, node_id); + Self::connect(opts, capacity) + } +} + +/// Mutate `opts` to attach the BFLD availability LWT. Public so callers that +/// build their own `MqttOptions` (custom tls, credentials, etc.) can still +/// opt in to the LWT without using `connect_with_lwt`. +#[must_use] +pub fn with_lwt(mut opts: MqttOptions, node_id: &str) -> MqttOptions { + // rumqttc 0.24 LastWill::new takes (topic, message, qos, retain). + // retain = true so HA sees "offline" on next start even if the session + // dropped while HA was down. + let will = LastWill::new( + availability_topic(node_id), + PAYLOAD_NOT_AVAILABLE, + QoS::AtLeastOnce, + true, + ); + opts.set_last_will(will); + opts } impl Publish for RumqttPublisher { diff --git a/v2/crates/wifi-densepose-bfld/tests/rumqttc_lwt.rs b/v2/crates/wifi-densepose-bfld/tests/rumqttc_lwt.rs new file mode 100644 index 00000000..1ab73c78 --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/rumqttc_lwt.rs @@ -0,0 +1,106 @@ +//! Acceptance tests for the LWT integration on `RumqttPublisher`. ADR-122 §2.2. + +#![cfg(feature = "mqtt")] + +use rumqttc::MqttOptions; +use wifi_densepose_bfld::{ + availability_topic, publish_event, with_lwt, BfldEvent, PrivacyClass, Publish, RumqttPublisher, + TopicMessage, +}; + +fn unreachable_opts(client_id: &str) -> MqttOptions { + MqttOptions::new(client_id, "127.0.0.1", 1) +} + +#[test] +fn with_lwt_returns_options_without_panic() { + let opts = unreachable_opts("bfld-lwt-1"); + let _opts = with_lwt(opts, "seed-01"); + // rumqttc 0.24 doesn't expose a getter for the LWT, so the structural + // assertion is the runtime non-panic + the fact that the build of the + // LastWill struct succeeded. +} + +#[test] +fn connect_with_lwt_constructs_publisher_and_connection() { + let opts = unreachable_opts("bfld-lwt-2"); + let (_publisher, _connection) = RumqttPublisher::connect_with_lwt("seed-01", opts, 16); + // Reaching here means rumqttc accepted the LWT-augmented options. +} + +#[test] +fn connect_with_lwt_uses_documented_availability_topic() { + // We can't introspect MqttOptions's LWT after construction, but the helper + // builds the topic via the same availability_topic() function used by + // the discovery publisher — assert that function returns the documented + // path so a topic drift between LWT and discovery is impossible by + // construction. + assert_eq!( + availability_topic("seed-test"), + "ruview/seed-test/bfld/availability", + ); +} + +#[test] +fn connect_with_lwt_publisher_still_publishes_state_topics() { + // Smoke: the LWT-equipped publisher must still pass state messages + // through publish() without modification. + let opts = unreachable_opts("bfld-lwt-3"); + let (mut publisher, _connection) = RumqttPublisher::connect_with_lwt("seed-01", opts, 16); + let event = BfldEvent::with_privacy_gating( + "seed-01".into(), + 1_700_000_000_000_000_000, + true, + 0.5, + 1, + 0.9, + None, + PrivacyClass::Anonymous, + Some(0.25), + None, + ); + let count = publish_event(&mut publisher, &event).expect("publish queues"); + // Anonymous + no zone publishes 5 entity topics: presence, motion, + // person_count, confidence, identity_risk. rf_signature_hash isn't an + // MQTT entity topic — it rides inside the JSON event surface only. + assert_eq!(count, 5, "Anonymous + no zone → 5 topics"); +} + +#[test] +fn publisher_trait_object_constructible_with_lwt_path() { + let opts = unreachable_opts("bfld-lwt-4"); + let (publisher, _connection) = RumqttPublisher::connect_with_lwt("seed-01", opts, 16); + let _boxed: Box> = Box::new(publisher); +} + +#[test] +fn with_lwt_is_idempotent_against_double_call() { + // Calling with_lwt twice should leave the most recent LWT installed + // without panicking — useful for libraries that may wrap operator- + // supplied options without knowing if LWT was already attached. + let opts = unreachable_opts("bfld-lwt-5"); + let opts = with_lwt(opts, "node-a"); + let opts = with_lwt(opts, "node-b"); + let _ = opts; // no panic = pass; rumqttc replaces the will silently. +} + +#[test] +fn caller_built_options_can_opt_in_via_with_lwt_then_pass_to_connect() { + // Operators with custom MqttOptions (e.g., TLS, credentials) build their + // own opts, then call with_lwt before passing to RumqttPublisher::connect. + let mut opts = unreachable_opts("bfld-lwt-6"); + opts.set_keep_alive(std::time::Duration::from_secs(30)); + let opts = with_lwt(opts, "seed-01"); + let (_publisher, _connection) = RumqttPublisher::connect(opts, 16); +} + +#[test] +fn placeholder_topicmessage_path_unaffected_by_lwt() { + // Sanity: TopicMessage and Publish surfaces from the non-mqtt path stay + // unchanged when the mqtt feature is on; the LWT addition is purely additive. + let m = TopicMessage { + topic: "ruview/x/bfld/presence/state".into(), + payload: "true".into(), + }; + assert_eq!(m.topic, "ruview/x/bfld/presence/state"); +}