MQTT publisher is fully implemented but publisher::spawn is never called from main.rs. args.mqtt is never read. Fix adds the channel, wires broadcast_tick_task, and spawns the publisher on startup.
This commit is contained in:
parent
9ad550d95f
commit
4e13c8baeb
|
|
@ -894,6 +894,8 @@ struct AppStateInner {
|
|||
// (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline.
|
||||
intro: wifi_densepose_sensing_server::introspection::IntrospectionState,
|
||||
intro_tx: broadcast::Sender<String>,
|
||||
#[cfg(feature = "mqtt")]
|
||||
vitals_tx: Option<broadcast::Sender<wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot>>,
|
||||
total_detections: u64,
|
||||
start_time: std::time::Instant,
|
||||
/// Vital sign detector (processes CSI frames to estimate HR/RR).
|
||||
|
|
@ -5289,6 +5291,34 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) {
|
|||
let _ = s.tx.send(json);
|
||||
}
|
||||
}
|
||||
// ADR-115: feed the MQTT publisher (compiled in only when
|
||||
// --features mqtt is active; the field and this block are
|
||||
// stripped entirely in the default feature set).
|
||||
#[cfg(feature = "mqtt")]
|
||||
if let Some(ref vtx) = s.vitals_tx {
|
||||
if vtx.receiver_count() > 0 {
|
||||
use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot;
|
||||
let vs = &s.latest_vitals;
|
||||
let snap = VitalsSnapshot {
|
||||
node_id: "ruview".to_string(),
|
||||
timestamp_ms: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64,
|
||||
presence: update.classification.presence,
|
||||
fall_detected: false,
|
||||
motion: s.smoothed_motion,
|
||||
motion_energy: update.features.motion_band_power,
|
||||
presence_score: s.smoothed_person_score,
|
||||
breathing_rate_bpm: vs.breathing_rate_bpm,
|
||||
heartrate_bpm: vs.heart_rate_bpm,
|
||||
n_persons: s.person_count() as u32,
|
||||
rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm),
|
||||
vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0,
|
||||
};
|
||||
let _ = vtx.send(snap);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5995,6 +6025,8 @@ async fn main() {
|
|||
tx,
|
||||
intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(),
|
||||
intro_tx,
|
||||
#[cfg(feature = "mqtt")]
|
||||
vitals_tx: None,
|
||||
total_detections: 0,
|
||||
start_time: std::time::Instant::now(),
|
||||
vital_detector: VitalSignDetector::new(vital_sample_rate),
|
||||
|
|
@ -6092,6 +6124,47 @@ async fn main() {
|
|||
}
|
||||
}
|
||||
|
||||
// ADR-115: spawn the MQTT publisher when --mqtt / RUVIEW_MQTT=true is set.
|
||||
// The publisher task owns connection lifecycle, HA discovery, and per-entity
|
||||
// state publication. Errors during connect are retried internally by rumqttc.
|
||||
#[cfg(feature = "mqtt")]
|
||||
if args.mqtt {
|
||||
use wifi_densepose_sensing_server::mqtt::{
|
||||
config::MqttConfig,
|
||||
publisher::{spawn as mqtt_spawn, OwnedDiscoveryBuilder},
|
||||
state::VitalsSnapshot,
|
||||
};
|
||||
let mqtt_cfg = MqttConfig::from_args(&args);
|
||||
let can_start = match mqtt_cfg.validate() {
|
||||
Ok(()) => true,
|
||||
Err(ref e) if !e.is_fatal() => {
|
||||
warn!("[mqtt] {e}");
|
||||
true
|
||||
}
|
||||
Err(e) => {
|
||||
error!("[mqtt] invalid config — MQTT disabled: {e}");
|
||||
false
|
||||
}
|
||||
};
|
||||
if can_start {
|
||||
let (vtx, vrx) = broadcast::channel::<VitalsSnapshot>(16);
|
||||
{
|
||||
let mut s = state.write().await;
|
||||
s.vitals_tx = Some(vtx);
|
||||
}
|
||||
let builder = OwnedDiscoveryBuilder {
|
||||
discovery_prefix: mqtt_cfg.discovery_prefix.clone(),
|
||||
node_id: mqtt_cfg.client_id.clone(),
|
||||
node_friendly_name: None,
|
||||
sw_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
model: "WiFi-DensePose".to_string(),
|
||||
via_device: None,
|
||||
};
|
||||
mqtt_spawn(Arc::new(mqtt_cfg), builder, vrx);
|
||||
info!("[mqtt] publisher task spawned — connecting to broker");
|
||||
}
|
||||
}
|
||||
|
||||
// ADR-050: Parse bind address once, use for all listeners
|
||||
let bind_ip: std::net::IpAddr = args
|
||||
.bind_addr
|
||||
|
|
|
|||
Loading…
Reference in New Issue