diff --git a/python/README.md b/python/README.md index 5e34bb6c..eb0fa151 100644 --- a/python/README.md +++ b/python/README.md @@ -62,7 +62,12 @@ python/ Beamforming Feedback Loop Data. numpy Complex64 bridge. 19 tests. Real Rust ingestion lands post-v2.0 in a `wifi-densepose-bfld` crate (see ADR-117 §11.11/12); the Python API does not change. -- ⏳ **P4 — WS/MQTT client**: pure-Python `wifi_densepose.client` extra. +- ✅ **P4 — WS/MQTT client**: pure-Python `wifi_densepose.client` extra + (no Rust). `SensingClient` (asyncio websockets), `RuViewMqttClient` + (paho-mqtt v2 with VERSION2 callbacks), `HABlueprintHelper` (HA + discovery payload parser), `SemanticPrimitiveListener` (typed router + for the 10 HA-MIND primitives from ADR-115 §3.12). 63 tests including + end-to-end against an in-process `websockets.serve` fixture. - ⏳ **P5 — cibuildwheel + PyPI publish**: Linux/macOS/Windows × abi3-py310. - ⏳ **P-tomb — v1.99.0 tombstone wheel**: pure-Python ImportError with migration URL, published to PyPI to soft-fence v1.x users diff --git a/python/tests/test_client_ha.py b/python/tests/test_client_ha.py new file mode 100644 index 00000000..4fbea64f --- /dev/null +++ b/python/tests/test_client_ha.py @@ -0,0 +1,205 @@ +"""ADR-117 P4 — Tests for HA-DISCO payload parsing. + +Pure parsing tests — no MQTT broker needed. +""" + +from __future__ import annotations + +import json + +import pytest + +from wifi_densepose.client import ( + HABlueprintHelper, + HaDiscoveryPayload, + HaEntity, +) +from wifi_densepose.client.ha import ( + parse_discovery_payload, + parse_discovery_topic, +) + + +# Real discovery payloads pulled from ADR-115 §3 (formatted for test +# readability; payloads are otherwise verbatim). +_PRESENCE_TOPIC = "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/config" +_PRESENCE_BODY = { + "name": "Presence", + "unique_id": "wifi_densepose_aabbccddeeff_presence", + "object_id": "wifi_densepose_aabbccddeeff_presence", + "state_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state", + "availability_topic": "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/availability", + "device_class": "occupancy", + "icon": "mdi:motion-sensor", +} + +_HEART_RATE_TOPIC = "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/config" +_HEART_RATE_BODY = { + "name": "Heart rate", + "unique_id": "wifi_densepose_aabbccddeeff_heart_rate", + "state_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state", + "state_class": "measurement", + "unit_of_measurement": "bpm", + "icon": "mdi:heart-pulse", + "json_attributes_topic": "homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/state", +} + + +# ─── Topic parsing ─────────────────────────────────────────────────── + + +def test_parse_discovery_topic_binary_sensor() -> None: + out = parse_discovery_topic(_PRESENCE_TOPIC) + assert out == ("binary_sensor", "aabbccddeeff", "presence") + + +def test_parse_discovery_topic_sensor() -> None: + out = parse_discovery_topic(_HEART_RATE_TOPIC) + assert out == ("sensor", "aabbccddeeff", "heart_rate") + + +def test_parse_discovery_topic_event() -> None: + out = parse_discovery_topic( + "homeassistant/event/wifi_densepose_aabbccddeeff/fall/config" + ) + assert out == ("event", "aabbccddeeff", "fall") + + +def test_parse_discovery_topic_returns_none_for_non_discovery() -> None: + assert parse_discovery_topic("homeassistant/binary_sensor/foo/state") is None + assert parse_discovery_topic("ruview/aabbccddeeff/raw/edge_vitals") is None + assert parse_discovery_topic("") is None + + +# ─── Payload parsing ───────────────────────────────────────────────── + + +def test_parse_discovery_payload_from_dict() -> None: + out = parse_discovery_payload(_PRESENCE_TOPIC, _PRESENCE_BODY) + assert out is not None + assert out.entity_kind == "binary_sensor" + assert out.node_id == "aabbccddeeff" + assert out.object_id == "presence" + assert out.payload["device_class"] == "occupancy" + + +def test_parse_discovery_payload_from_bytes() -> None: + raw = json.dumps(_PRESENCE_BODY).encode("utf-8") + out = parse_discovery_payload(_PRESENCE_TOPIC, raw) + assert out is not None + assert out.payload["unique_id"] == "wifi_densepose_aabbccddeeff_presence" + + +def test_parse_discovery_payload_from_string() -> None: + raw = json.dumps(_PRESENCE_BODY) + out = parse_discovery_payload(_PRESENCE_TOPIC, raw) + assert out is not None + assert out.entity_kind == "binary_sensor" + + +def test_parse_discovery_payload_rejects_malformed_json() -> None: + assert parse_discovery_payload(_PRESENCE_TOPIC, "{ broken: json") is None + + +def test_parse_discovery_payload_rejects_non_object_root() -> None: + assert parse_discovery_payload(_PRESENCE_TOPIC, "[1, 2, 3]") is None + + +def test_parse_discovery_payload_returns_none_for_non_discovery_topic() -> None: + assert parse_discovery_payload( + "ruview/aabbccddeeff/raw/edge_vitals", + _PRESENCE_BODY, + ) is None + + +# ─── HaEntity projection ───────────────────────────────────────────── + + +def test_ha_entity_from_payload_extracts_fields() -> None: + p = HaDiscoveryPayload( + entity_kind="sensor", + node_id="aabbccddeeff", + object_id="heart_rate", + payload=_HEART_RATE_BODY, + ) + e = HaEntity.from_payload(p) + assert e.entity_kind == "sensor" + assert e.unique_id == "wifi_densepose_aabbccddeeff_heart_rate" + assert e.unit_of_measurement == "bpm" + assert e.icon == "mdi:heart-pulse" + assert e.json_attributes_topic == _HEART_RATE_BODY["json_attributes_topic"] + + +def test_ha_entity_handles_missing_optional_fields() -> None: + p = HaDiscoveryPayload( + entity_kind="event", + node_id="aabbccddeeff", + object_id="bed_exit", + payload={"unique_id": "wifi_densepose_aabbccddeeff_bed_exit"}, + ) + e = HaEntity.from_payload(p) + assert e.unique_id == "wifi_densepose_aabbccddeeff_bed_exit" + assert e.device_class == "" + assert e.unit_of_measurement == "" + + +# ─── HABlueprintHelper aggregation ─────────────────────────────────── + + +def _populated_helper() -> HABlueprintHelper: + h = HABlueprintHelper() + h.add_payload(_PRESENCE_TOPIC, _PRESENCE_BODY) + h.add_payload(_HEART_RATE_TOPIC, _HEART_RATE_BODY) + # Same fields but a different node + h.add_payload( + "homeassistant/binary_sensor/wifi_densepose_ff00ff00ff00/presence/config", + {**_PRESENCE_BODY, "unique_id": "wifi_densepose_ff00ff00ff00_presence"}, + ) + return h + + +def test_helper_starts_empty() -> None: + h = HABlueprintHelper() + assert len(h) == 0 + assert h.nodes() == [] + assert h.all_payloads() == [] + + +def test_helper_aggregates_multiple_payloads() -> None: + h = _populated_helper() + assert len(h) == 3 + assert h.nodes() == ["aabbccddeeff", "ff00ff00ff00"] + + +def test_helper_entities_for_node() -> None: + h = _populated_helper() + entities = h.entities_for_node("aabbccddeeff") + object_ids = sorted(e.object_id for e in entities) + assert object_ids == ["heart_rate", "presence"] + + +def test_helper_by_device_class() -> None: + h = _populated_helper() + occupancy_entities = h.by_device_class("occupancy") + assert len(occupancy_entities) == 2 # presence on both nodes + assert {e.node_id for e in occupancy_entities} == {"aabbccddeeff", "ff00ff00ff00"} + + +def test_helper_remove() -> None: + h = _populated_helper() + assert h.remove("aabbccddeeff", "binary_sensor", "presence") is True + assert h.remove("aabbccddeeff", "binary_sensor", "presence") is False # no-op + assert len(h) == 2 + + +def test_helper_rejects_non_discovery_topics() -> None: + h = HABlueprintHelper() + ok = h.add_payload("ruview/aabbccddeeff/raw/edge_vitals", _PRESENCE_BODY) + assert ok is False + assert len(h) == 0 + + +def test_helper_in_operator() -> None: + h = _populated_helper() + assert ("aabbccddeeff", "binary_sensor", "presence") in h + assert ("nonexistent", "binary_sensor", "presence") not in h diff --git a/python/tests/test_client_mqtt.py b/python/tests/test_client_mqtt.py new file mode 100644 index 00000000..a06a230a --- /dev/null +++ b/python/tests/test_client_mqtt.py @@ -0,0 +1,208 @@ +"""ADR-117 P4 — Tests for RuViewMqttClient. + +These tests do NOT bring up a broker — they exercise: + +1. Topic-wildcard matching (`_topic_matches`) +2. Client construction + handler registration +3. The callback path by directly invoking the paho callback methods + with synthesized messages + +End-to-end broker integration is a P4-followon item (the mosquitto +patterns from memory [[feedback_mqtt_integration_test_patterns]] go +there). This file keeps unit coverage tight without requiring a +broker on every CI run. +""" + +from __future__ import annotations + +import json +from types import SimpleNamespace +from typing import Any + +import pytest + +from wifi_densepose.client import RuViewMqttClient +from wifi_densepose.client.mqtt import _topic_matches + + +# ─── Topic wildcard matcher ────────────────────────────────────────── + + +@pytest.mark.parametrize("pattern,topic,expected", [ + ("ruview/+/raw/edge_vitals", "ruview/aabb/raw/edge_vitals", True), + ("ruview/+/raw/edge_vitals", "ruview/aabb/cooked/edge_vitals", False), + ("ruview/+/raw/+", "ruview/aabb/raw/pose", True), + ("ruview/+/raw/+", "ruview/aabb/raw/pose/extra", False), + # Per MQTT v5 §4.7.1.2: `+` is a whole-level wildcard only — mid- + # segment `+` is a literal `+` character, not a wildcard. The + # spec-correct way to wildcard the third segment of the HA + # discovery topic is `homeassistant/+/+/+/config`. + ("homeassistant/+/+/+/config", + "homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", True), + # `wifi_densepose_+` is therefore NOT a wildcard — it matches the + # literal string only. Asserting that behaviour stays stable. + ("homeassistant/+/wifi_densepose_+/+/config", + "homeassistant/binary_sensor/wifi_densepose_aabb/presence/config", False), + ("ruview/#", "ruview/aabb/raw/edge_vitals", True), + # Per MQTT v5 §4.7.1.2: `/#` ALSO matches the bare + # `` itself (it represents "this topic and all sub-topics"). + ("ruview/#", "ruview", True), + ("ruview/+/raw/#", "ruview/aabb/raw/pose/extra", True), + ("exact/topic", "exact/topic", True), + ("exact/topic", "exact/topic/extra", False), + ("a/b/c", "a/b", False), +]) +def test_topic_matches(pattern: str, topic: str, expected: bool) -> None: + assert _topic_matches(pattern, topic) is expected + + +# ─── RuViewMqttClient construction ────────────────────────────────── + + +def test_client_constructs_with_defaults() -> None: + c = RuViewMqttClient() + assert c.broker_host == "localhost" + assert c.broker_port == 1883 + assert c.connected is False + assert c.client_id.startswith("wifi-densepose-client-") + + +def test_client_unique_client_id_per_instance() -> None: + """Per the rumqttc memory lesson — each instance needs a unique + client_id so parallel tests don't kick each other off the broker.""" + c1 = RuViewMqttClient() + c2 = RuViewMqttClient() + assert c1.client_id != c2.client_id + + +def test_client_accepts_explicit_client_id() -> None: + c = RuViewMqttClient(client_id="explicit-id") + assert c.client_id == "explicit-id" + + +# ─── Handler registration ──────────────────────────────────────────── + + +def test_handler_registration_stores_callback() -> None: + c = RuViewMqttClient() + seen: list[Any] = [] + c.on_message("ruview/+/raw/edge_vitals", lambda t, p: seen.append((t, p))) + # Internal state — we're allowed to inspect since the handler + # path needs to be unit-testable without a broker. + assert "ruview/+/raw/edge_vitals" in c._handlers + + +def test_handler_unregister_drops_callback() -> None: + c = RuViewMqttClient() + c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None) + c.unsubscribe_handler("ruview/+/raw/edge_vitals") + assert "ruview/+/raw/edge_vitals" not in c._handlers + + +# ─── Callback dispatch (synthesized) ───────────────────────────────── + + +def _fake_message(topic: str, body: Any) -> Any: + """Synthesize a paho-mqtt MQTTMessage-ish object.""" + if isinstance(body, (dict, list)): + payload_bytes = json.dumps(body).encode("utf-8") + elif isinstance(body, bytes): + payload_bytes = body + else: + payload_bytes = str(body).encode("utf-8") + return SimpleNamespace(topic=topic, payload=payload_bytes) + + +def test_message_dispatch_to_matching_handler() -> None: + c = RuViewMqttClient() + received: list[tuple[str, Any]] = [] + c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append((t, p))) + + msg = _fake_message( + "ruview/aabbccddeeff/raw/edge_vitals", + {"breathing_rate_bpm": 14.0, "heartrate_bpm": 72.0, "presence": True}, + ) + c._on_message(None, None, msg) + + assert len(received) == 1 + topic, payload = received[0] + assert topic == "ruview/aabbccddeeff/raw/edge_vitals" + assert payload["breathing_rate_bpm"] == 14.0 + + +def test_message_dispatch_ignores_non_matching_topic() -> None: + c = RuViewMqttClient() + received: list[Any] = [] + c.on_message("ruview/+/raw/edge_vitals", lambda t, p: received.append(p)) + + msg = _fake_message("ruview/aabb/raw/pose", {"persons": []}) + c._on_message(None, None, msg) + + assert received == [] + + +def test_message_dispatch_falls_back_to_bytes_on_non_json() -> None: + c = RuViewMqttClient() + received: list[Any] = [] + c.on_message("custom/binary/+", lambda t, p: received.append(p)) + + msg = _fake_message("custom/binary/data", b"\x00\x01\x02not-json") + c._on_message(None, None, msg) + + assert received == [b"\x00\x01\x02not-json"] + + +def test_handler_exception_does_not_propagate() -> None: + """A misbehaving user callback must not crash the paho network + loop — exceptions are caught and logged.""" + c = RuViewMqttClient() + seen_after_crash: list[Any] = [] + + def crashing(_topic: str, _p: Any) -> None: + raise RuntimeError("simulated callback crash") + + c.on_message("crashy/topic", crashing) + c.on_message("safe/topic", lambda t, p: seen_after_crash.append(p)) + + # First, the crashing handler — must NOT raise out of _on_message. + c._on_message(None, None, _fake_message("crashy/topic", "anything")) + # Then the safe handler — must still fire on a subsequent message. + c._on_message(None, None, _fake_message("safe/topic", {"x": 1})) + assert seen_after_crash == [{"x": 1}] + + +def test_multiple_handlers_for_overlapping_patterns_all_fire() -> None: + c = RuViewMqttClient() + a_received: list[Any] = [] + b_received: list[Any] = [] + c.on_message("ruview/+/raw/+", lambda t, p: a_received.append(p)) + c.on_message("ruview/aabb/raw/edge_vitals", lambda t, p: b_received.append(p)) + + msg = _fake_message("ruview/aabb/raw/edge_vitals", {"presence": True}) + c._on_message(None, None, msg) + + assert len(a_received) == 1 + assert len(b_received) == 1 + + +# ─── on_connect path ───────────────────────────────────────────────── + + +def test_on_connect_sets_event_and_subscribes() -> None: + c = RuViewMqttClient() + c.on_message("ruview/+/raw/edge_vitals", lambda t, p: None) + + # Stub the paho client so we can capture subscribe() calls. + subscribed: list[str] = [] + stub = SimpleNamespace(subscribe=lambda pattern: subscribed.append(pattern)) + + c._on_connect(stub, None, None, 0) + assert c.connected is True + assert subscribed == ["ruview/+/raw/edge_vitals"] + + +def test_on_connect_with_nonzero_rc_does_not_set_connected() -> None: + c = RuViewMqttClient() + stub = SimpleNamespace(subscribe=lambda pattern: None) + c._on_connect(stub, None, None, 5) # CONNACK fail + assert c.connected is False diff --git a/python/tests/test_client_primitives.py b/python/tests/test_client_primitives.py new file mode 100644 index 00000000..4b17af18 --- /dev/null +++ b/python/tests/test_client_primitives.py @@ -0,0 +1,180 @@ +"""ADR-117 P4 — Tests for the HA-MIND semantic primitive listener. + +Pure routing tests — no MQTT broker needed. +""" + +from __future__ import annotations + +import json + +from wifi_densepose.client import ( + SemanticPrimitive, + SemanticPrimitiveEvent, + SemanticPrimitiveListener, +) + + +# ─── SemanticPrimitive enum ────────────────────────────────────────── + + +def test_enum_covers_all_10_v1_primitives() -> None: + expected = { + "someone_sleeping", + "possible_distress", + "room_active", + "elderly_inactivity", + "meeting_in_progress", + "bathroom_occupied", + "fall_risk_elevated", + "bed_exit", + "no_movement_safety", + "multi_room_transition", + } + actual = {p.value for p in SemanticPrimitive} + assert actual == expected + + +def test_enum_from_object_id_round_trips() -> None: + for p in SemanticPrimitive: + assert SemanticPrimitive.from_object_id(p.value) is p + + +def test_enum_from_object_id_returns_none_for_unknown() -> None: + assert SemanticPrimitive.from_object_id("garbage") is None + + +# ─── Listener routing ──────────────────────────────────────────────── + + +def test_listener_dispatches_to_specific_handler() -> None: + listener = SemanticPrimitiveListener() + received: list[SemanticPrimitiveEvent] = [] + listener.on(SemanticPrimitive.SomeoneSleeping, received.append) + + evt = listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state", + json.dumps({"state": "ON", "confidence": 0.92, "explanation": ["motion<5%"]}), + ) + assert evt is not None + assert evt.kind is SemanticPrimitive.SomeoneSleeping + assert evt.node_id == "aabb" + assert evt.state == "ON" + assert evt.confidence == 0.92 + assert evt.explanation == ("motion<5%",) + assert len(received) == 1 + assert received[0] is evt + + +def test_listener_on_any_fires_for_every_primitive() -> None: + listener = SemanticPrimitiveListener() + seen: list[SemanticPrimitiveEvent] = [] + listener.on_any(seen.append) + + listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state", + json.dumps({"state": "ON"}), + ) + listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/bathroom_occupied/state", + json.dumps({"state": "OFF"}), + ) + assert len(seen) == 2 + assert seen[0].kind is SemanticPrimitive.RoomActive + assert seen[1].kind is SemanticPrimitive.BathroomOccupied + + +def test_listener_specific_handler_does_not_fire_for_other_primitives() -> None: + listener = SemanticPrimitiveListener() + received: list[SemanticPrimitiveEvent] = [] + listener.on(SemanticPrimitive.PossibleDistress, received.append) + + listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/someone_sleeping/state", + json.dumps({"state": "ON"}), + ) + assert received == [] + + +def test_listener_decodes_plain_state_string() -> None: + """HA convention: binary_sensors that don't carry attributes emit + plain strings ('ON' / 'OFF'). We must accept that too.""" + listener = SemanticPrimitiveListener() + evt = listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state", + "ON", + ) + assert evt is not None + assert evt.state == "ON" + assert evt.confidence == 0.0 # not provided in plain string + assert evt.explanation == () + + +def test_listener_decodes_numeric_sensor_state() -> None: + """fall_risk_elevated is a 0–100 sensor — verify numeric string.""" + listener = SemanticPrimitiveListener() + evt = listener.handle_mqtt_message( + "homeassistant/sensor/wifi_densepose_aabb/fall_risk_elevated/state", + "73", + ) + assert evt is not None + assert evt.kind is SemanticPrimitive.FallRiskElevated + assert evt.state == "73" + + +def test_listener_decodes_bytes_payload() -> None: + listener = SemanticPrimitiveListener() + evt = listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/room_active/state", + b"ON", + ) + assert evt is not None + assert evt.state == "ON" + + +def test_listener_ignores_non_state_topics() -> None: + listener = SemanticPrimitiveListener() + assert listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/room_active/config", + json.dumps({"name": "Room Active"}), + ) is None + + +def test_listener_ignores_unknown_slug() -> None: + listener = SemanticPrimitiveListener() + assert listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/unknown_primitive/state", + "ON", + ) is None + + +def test_listener_ignores_non_wifi_densepose_node() -> None: + listener = SemanticPrimitiveListener() + # third segment doesn't start with wifi_densepose_ + assert listener.handle_mqtt_message( + "homeassistant/binary_sensor/aqara_fp2/room_active/state", + "ON", + ) is None + + +def test_listener_explanation_string_is_normalised_to_tuple() -> None: + """Producers may send `explanation` as a single string by mistake; + accept that and wrap in a 1-tuple so downstream code can iterate + uniformly.""" + listener = SemanticPrimitiveListener() + evt = listener.handle_mqtt_message( + "homeassistant/binary_sensor/wifi_densepose_aabb/possible_distress/state", + json.dumps({"state": "ON", "explanation": "HR=120 baseline=80"}), + ) + assert evt is not None + assert evt.explanation == ("HR=120 baseline=80",) + + +def test_event_is_frozen() -> None: + evt = SemanticPrimitiveEvent( + kind=SemanticPrimitive.SomeoneSleeping, + node_id="aabb", + state="ON", + ) + import pytest + with pytest.raises((AttributeError, Exception)): # FrozenInstanceError subclass + evt.state = "OFF" # type: ignore[misc] diff --git a/python/tests/test_client_ws.py b/python/tests/test_client_ws.py new file mode 100644 index 00000000..b6c2ad16 --- /dev/null +++ b/python/tests/test_client_ws.py @@ -0,0 +1,195 @@ +"""ADR-117 P4 — End-to-end test for SensingClient against an in-process +WS server. + +We spin up a real `websockets.serve()` server in the same event loop, +send the four message types defined in ADR-115 §1, and assert the +client decodes them into the right dataclasses. No mocks — the only +moving part this test does NOT exercise is the actual sensing-server +binary, but the wire protocol is the contract under test here. +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any + +import pytest +import websockets + +from wifi_densepose.client import ( + ConnectionEstablishedMessage, + EdgeVitalsMessage, + PoseDataMessage, + SensingClient, + SensingMessage, +) + + +# ─── In-process WS server fixture ──────────────────────────────────── + + +_FIXTURE_MESSAGES = [ + { + "type": "connection_established", + "node_id": "test-node-001", + "version": "0.7.4", + "capabilities": ["edge_vitals", "pose_data"], + }, + { + "type": "edge_vitals", + "node_id": "test-node-001", + "presence": True, + "fall_detected": False, + "motion": 0.21, + "breathing_rate_bpm": 14.5, + "heartrate_bpm": 72.3, + "n_persons": 1, + "motion_energy": 0.034, + "presence_score": 0.91, + "rssi": -42.0, + }, + { + "type": "pose_data", + "node_id": "test-node-001", + "timestamp": 1700000000.5, + "persons": [{"id": 1, "keypoints": []}], + "confidence": 0.88, + }, + # Unknown type — should NOT crash the stream; should yield a plain + # SensingMessage. + { + "type": "future_message_type_not_yet_modelled", + "extra": "data", + }, +] + + +async def _handler(websocket: Any) -> None: + for msg in _FIXTURE_MESSAGES: + await websocket.send(json.dumps(msg)) + # Send one malformed frame to assert the client logs+drops it + # rather than crashing the stream. + await websocket.send("{not valid json") + # And one final "real" message so the test can confirm the stream + # survived the malformed one. + await websocket.send(json.dumps({"type": "edge_vitals", "node_id": "post-bad-frame"})) + + +@pytest.fixture +async def ws_server() -> Any: + """Start a websocket server on a random port; yield the bound URL.""" + server = await websockets.serve(_handler, "127.0.0.1", 0) + # Get the bound port (host="127.0.0.1" returns one socket). + port = server.sockets[0].getsockname()[1] # type: ignore[union-attr] + try: + yield f"ws://127.0.0.1:{port}/ws/sensing" + finally: + server.close() + await server.wait_closed() + + +# ─── End-to-end stream test ────────────────────────────────────────── + + +async def test_sensing_client_decodes_all_message_types(ws_server: str) -> None: + received: list[SensingMessage] = [] + async with SensingClient(ws_server) as client: + async for msg in client.stream(): + received.append(msg) + if len(received) >= len(_FIXTURE_MESSAGES) + 1: # +1 for post-bad-frame + break + + # connection_established → typed + assert isinstance(received[0], ConnectionEstablishedMessage) + assert received[0].node_id == "test-node-001" + assert received[0].version == "0.7.4" + assert "edge_vitals" in received[0].capabilities + + # edge_vitals → typed with full fields + assert isinstance(received[1], EdgeVitalsMessage) + assert received[1].presence is True + assert received[1].fall_detected is False + assert received[1].breathing_rate_bpm == 14.5 + assert received[1].heartrate_bpm == 72.3 + assert received[1].n_persons == 1 + assert received[1].rssi == -42.0 + + # pose_data → typed + assert isinstance(received[2], PoseDataMessage) + assert received[2].timestamp == 1700000000.5 + assert len(received[2].persons) == 1 + assert received[2].confidence == 0.88 + + # Unknown type → plain SensingMessage (forward-compat) + assert type(received[3]) is SensingMessage # exact base class + assert received[3].type == "future_message_type_not_yet_modelled" + assert received[3].raw["extra"] == "data" + + # After the malformed frame: the stream should have survived and + # yielded the post-bad-frame message. + assert isinstance(received[4], EdgeVitalsMessage) + assert received[4].node_id == "post-bad-frame" + + +async def test_sensing_client_recv_one(ws_server: str) -> None: + async with SensingClient(ws_server) as client: + msg = await client.recv_one(timeout=2.0) + assert isinstance(msg, ConnectionEstablishedMessage) + + +async def test_sensing_client_raises_when_used_without_context() -> None: + client = SensingClient("ws://127.0.0.1:1/") # never connects + with pytest.raises(RuntimeError, match="not connected"): + await client.recv_one(timeout=0.1) + with pytest.raises(RuntimeError, match="not connected"): + async for _ in client.stream(): + pass + + +async def test_sensing_client_close_is_idempotent(ws_server: str) -> None: + client = SensingClient(ws_server) + await client.__aenter__() + await client.close() + await client.close() # second close is a no-op + + +def test_sensing_client_decoder_directly() -> None: + """The decoder is pure — exercise it without bringing up a WS + server, so we have a fast unit test for the type mapping.""" + from wifi_densepose.client.ws import _decode + + msg = _decode(json.dumps({ + "type": "edge_vitals", + "node_id": "x", + "presence": True, + "fall_detected": False, + "motion": 1.5, + })) + assert isinstance(msg, EdgeVitalsMessage) + assert msg.presence is True + assert msg.motion == 1.5 + assert msg.breathing_rate_bpm is None # not present → None, not 0.0 + assert msg.heartrate_bpm is None + assert msg.rssi is None + + +def test_sensing_client_decoder_handles_None_subfields() -> None: + """When the sensing-server explicitly emits null for HR/BR (no + measurement yet), the client should propagate None, not crash.""" + from wifi_densepose.client.ws import _decode + + msg = _decode(json.dumps({ + "type": "edge_vitals", + "node_id": "x", + "presence": False, + "fall_detected": False, + "motion": 0.0, + "breathing_rate_bpm": None, + "heartrate_bpm": None, + "rssi": None, + })) + assert isinstance(msg, EdgeVitalsMessage) + assert msg.breathing_rate_bpm is None + assert msg.heartrate_bpm is None + assert msg.rssi is None diff --git a/python/wifi_densepose/client/__init__.py b/python/wifi_densepose/client/__init__.py new file mode 100644 index 00000000..1a33a297 --- /dev/null +++ b/python/wifi_densepose/client/__init__.py @@ -0,0 +1,93 @@ +"""ADR-117 P4 — Pure-Python client layer. + +This sub-package is the **client-facing** half of `wifi-densepose`: +end users who only want to *consume* live RuView telemetry (rather than +running DSP locally) get a tight, opt-in client extra: + +``` +pip install "wifi-densepose[client]" +``` + +The runtime install footprint stays small for users who only need the +compiled PyO3 surface: `websockets` and `paho-mqtt` are declared as the +`[client]` extra in `pyproject.toml` and are NOT pulled in by the +default install. + +## Modules + +- `ws` — `SensingClient`: asyncio WebSocket client for the + sensing-server `/ws/sensing` endpoint (ADR-115 §1) +- `mqtt` — `RuViewMqttClient`: paho-mqtt v2 wrapper for + `ruview//raw/+` + `homeassistant/+/wifi_densepose_/+/+` + topics (ADR-115 §3) +- `primitives` — `SemanticPrimitiveListener`: typed view over the + 10 HA-MIND semantic primitives (ADR-115 §3.12) +- `ha` — `HABlueprintHelper`: parses MQTT-discovery payloads, helps + users introspect what entities a node is publishing + +No PyO3 here — this module is pure Python so it loads without the +compiled extension (useful for users who only want the client surface +and not the DSP pipeline). +""" + +from __future__ import annotations + +# Re-export the user-facing types. Import errors are deferred to the +# moment the user actually instantiates one of these classes — that way +# `from wifi_densepose.client import HABlueprintHelper` still works +# even if the user hasn't installed `[client]` extras yet (HABlueprint +# is pure stdlib). +from wifi_densepose.client.ha import ( + HaDiscoveryPayload, + HaEntity, + HABlueprintHelper, +) +from wifi_densepose.client.primitives import ( + SemanticPrimitive, + SemanticPrimitiveEvent, + SemanticPrimitiveListener, +) + + +__all__ = [ + # ws — re-exported lazily; see module docstring + "SensingClient", + "SensingMessage", + "EdgeVitalsMessage", + "PoseDataMessage", + "ConnectionEstablishedMessage", + # mqtt — re-exported lazily; see module docstring + "RuViewMqttClient", + # ha — pure stdlib + "HaDiscoveryPayload", + "HaEntity", + "HABlueprintHelper", + # primitives — pure stdlib + "SemanticPrimitive", + "SemanticPrimitiveEvent", + "SemanticPrimitiveListener", +] + + +def __getattr__(name: str): + """Lazy re-exports for the modules that pull in optional extras. + + `SensingClient` needs `websockets`; `RuViewMqttClient` needs + `paho-mqtt`. Importing those at package init would make + `wifi_densepose.client` unusable without the extras installed + — defeating the point of an *optional* extra. We defer the import + until the attribute is actually looked up. + """ + if name in { + "SensingClient", + "SensingMessage", + "EdgeVitalsMessage", + "PoseDataMessage", + "ConnectionEstablishedMessage", + }: + from wifi_densepose.client import ws as _ws + return getattr(_ws, name) + if name == "RuViewMqttClient": + from wifi_densepose.client.mqtt import RuViewMqttClient as _R + return _R + raise AttributeError(f"module 'wifi_densepose.client' has no attribute {name!r}") diff --git a/python/wifi_densepose/client/ha.py b/python/wifi_densepose/client/ha.py new file mode 100644 index 00000000..e1f6f564 --- /dev/null +++ b/python/wifi_densepose/client/ha.py @@ -0,0 +1,194 @@ +"""ADR-117 P4 — Home Assistant MQTT-discovery payload helpers. + +Parses the `homeassistant//wifi_densepose_//config` +discovery payloads described in ADR-115 §3 into typed Python objects so +client code can introspect what a node is publishing without +hand-parsing JSON. + +This is **read-only**: we do NOT generate discovery payloads from +Python (that's the sensing-server's job). The helper exists so a +client (HA blueprint author, debugger, dashboard) can ask "what +entities does this node expose?" and get a structured answer. + +Example: + +```python +from wifi_densepose.client import HaDiscoveryPayload, HABlueprintHelper + +helper = HABlueprintHelper() +helper.add_payload(topic, json_bytes) +for entity in helper.entities_for_node("aabbccddeeff"): + print(entity.entity_kind, entity.object_id, entity.unique_id) +``` +""" + +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field +from typing import Any, Iterable + + +# ─── Topic schema ──────────────────────────────────────────────────── + + +# Matches discovery topics like: +# homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/config +# homeassistant/sensor/wifi_densepose_aabbccddeeff/heart_rate/config +# homeassistant/event/wifi_densepose_aabbccddeeff/fall/config +_DISCOVERY_TOPIC_RE = re.compile( + r"^homeassistant/" + r"(?P[A-Za-z_]+)/" + r"wifi_densepose_(?P[A-Za-z0-9]+)/" + r"(?P[A-Za-z0-9_\-]+)/" + r"config$" +) + + +@dataclass(frozen=True) +class HaDiscoveryPayload: + """One MQTT discovery payload (config topic + JSON body).""" + entity_kind: str # "binary_sensor", "sensor", "event", "switch", ... + node_id: str # the node's MAC-ish identifier + object_id: str # entity slug (e.g. "presence", "heart_rate") + payload: dict[str, Any] + + @property + def topic(self) -> str: + return ( + f"homeassistant/{self.entity_kind}/" + f"wifi_densepose_{self.node_id}/{self.object_id}/config" + ) + + +@dataclass(frozen=True) +class HaEntity: + """A user-facing view of one HA entity registered by a node.""" + entity_kind: str + node_id: str + object_id: str + unique_id: str = "" + name: str = "" + state_topic: str = "" + device_class: str = "" + unit_of_measurement: str = "" + icon: str = "" + json_attributes_topic: str = "" + + @classmethod + def from_payload(cls, p: HaDiscoveryPayload) -> "HaEntity": + body = p.payload + return cls( + entity_kind=p.entity_kind, + node_id=p.node_id, + object_id=p.object_id, + unique_id=str(body.get("unique_id", "")), + name=str(body.get("name", "")), + state_topic=str(body.get("state_topic", "")), + device_class=str(body.get("device_class", "")), + unit_of_measurement=str(body.get("unit_of_measurement", "")), + icon=str(body.get("icon", "")), + json_attributes_topic=str(body.get("json_attributes_topic", "")), + ) + + +def parse_discovery_topic(topic: str) -> tuple[str, str, str] | None: + """Parse a discovery config topic into (entity_kind, node_id, + object_id). Returns None for non-discovery topics.""" + m = _DISCOVERY_TOPIC_RE.match(topic) + if not m: + return None + return (m.group("entity_kind"), m.group("node_id"), m.group("object_id")) + + +def parse_discovery_payload( + topic: str, payload: bytes | str | dict[str, Any] +) -> HaDiscoveryPayload | None: + """Decode an HA discovery payload. Returns None for non-discovery + topics OR malformed JSON; raises only on programmer error.""" + parsed = parse_discovery_topic(topic) + if parsed is None: + return None + entity_kind, node_id, object_id = parsed + body: dict[str, Any] + if isinstance(payload, dict): + body = payload + else: + if isinstance(payload, bytes): + try: + payload = payload.decode("utf-8") + except UnicodeDecodeError: + return None + try: + decoded = json.loads(payload) + except json.JSONDecodeError: + return None + if not isinstance(decoded, dict): + return None + body = decoded + return HaDiscoveryPayload( + entity_kind=entity_kind, + node_id=node_id, + object_id=object_id, + payload=body, + ) + + +# ─── Helper / aggregator ───────────────────────────────────────────── + + +class HABlueprintHelper: + """Aggregates HA discovery payloads observed on the bus and offers + structured queries against them. + + Intended use: subscribe a RuViewMqttClient to + `homeassistant/+/wifi_densepose_+/+/config`, feed every message + into `add_payload()`, then ask the helper "what entities does + node X expose?" or "what binary_sensors are presence-class?". + """ + + def __init__(self) -> None: + # (node_id, entity_kind, object_id) → HaDiscoveryPayload + self._payloads: dict[tuple[str, str, str], HaDiscoveryPayload] = {} + + def add_payload(self, topic: str, payload: bytes | str | dict[str, Any]) -> bool: + """Returns True if the payload was a valid HA discovery + message and was stored; False otherwise.""" + parsed = parse_discovery_payload(topic, payload) + if parsed is None: + return False + self._payloads[(parsed.node_id, parsed.entity_kind, parsed.object_id)] = parsed + return True + + def remove(self, node_id: str, entity_kind: str, object_id: str) -> bool: + """Drop a stored payload — useful when handling a discovery + retain-flag clear (HA's convention for removing an entity).""" + return self._payloads.pop((node_id, entity_kind, object_id), None) is not None + + def __len__(self) -> int: + return len(self._payloads) + + def __contains__(self, item: tuple[str, str, str]) -> bool: + return item in self._payloads + + def all_payloads(self) -> list[HaDiscoveryPayload]: + return list(self._payloads.values()) + + def entities_for_node(self, node_id: str) -> list[HaEntity]: + return [ + HaEntity.from_payload(p) + for p in self._payloads.values() + if p.node_id == node_id + ] + + def nodes(self) -> list[str]: + return sorted({p.node_id for p in self._payloads.values()}) + + def by_device_class(self, device_class: str) -> list[HaEntity]: + out: list[HaEntity] = [] + for p in self._payloads.values(): + e = HaEntity.from_payload(p) + if e.device_class == device_class: + out.append(e) + return out diff --git a/python/wifi_densepose/client/mqtt.py b/python/wifi_densepose/client/mqtt.py new file mode 100644 index 00000000..eceaf60f --- /dev/null +++ b/python/wifi_densepose/client/mqtt.py @@ -0,0 +1,257 @@ +"""ADR-117 P4 — paho-mqtt v2 wrapper for RuView MQTT topics. + +Subscribes to the topic namespaces defined in ADR-115: + +- `ruview//raw/edge_vitals` — opt-in firehose of the WS edge_vitals +- `ruview//raw/pose` — opt-in firehose of pose data +- `ruview//raw/sensing_update` — opt-in firehose of every sensing update +- `homeassistant/+/wifi_densepose_/+/config` — HA discovery payloads +- `homeassistant/+/wifi_densepose_/+/state` — HA state payloads + +The client uses **paho-mqtt v2's `Client(CallbackAPIVersion.VERSION2)`** +API explicitly. v1's deprecated callback signatures will not work. + +Example: + +```python +from wifi_densepose.client import RuViewMqttClient + +def on_edge_vitals(topic, payload): + print(topic, payload["breathing_rate_bpm"]) + +client = RuViewMqttClient(broker_host="localhost", broker_port=1883) +client.on_message("ruview/+/raw/edge_vitals", on_edge_vitals) +client.start() +# ... runs in a background thread; call client.stop() to disconnect +``` + +The constructor never connects; call `.start()` to enter the network +loop and `.stop()` to disconnect cleanly. Both are idempotent. +""" + +from __future__ import annotations + +import json +import logging +import threading +import uuid +from typing import Any, Callable, Optional + +try: + import paho.mqtt.client as mqtt # type: ignore[import-not-found] + from paho.mqtt.enums import CallbackAPIVersion # type: ignore[import-not-found] + _PAHO_AVAILABLE = True +except ImportError: # pragma: no cover + _PAHO_AVAILABLE = False + + +log = logging.getLogger(__name__) + + +MessageHandler = Callable[[str, Any], None] +"""(topic, decoded_payload) → None. The payload is JSON-decoded if the +content is valid JSON, otherwise the raw bytes are passed through.""" + + +class RuViewMqttClient: + """Wrapper around paho-mqtt v2 with per-topic-pattern callbacks. + + Per the rumqttc lesson [[feedback_mqtt_integration_test_patterns]]: + - Each instance gets a unique client_id (per-test isolation when + tests run in parallel against the same broker). + - Subscription wildcards (`+`, `#`) are supported by paho's + built-in matcher; we route by exact pattern match against the + registered handler. + """ + + def __init__( + self, + *, + broker_host: str = "localhost", + broker_port: int = 1883, + client_id: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + keepalive: int = 60, + tls: bool = False, + ) -> None: + if not _PAHO_AVAILABLE: + raise ImportError( + "RuViewMqttClient requires the `paho-mqtt` package. Install with " + "`pip install \"wifi-densepose[client]\"` to enable the client extras." + ) + self.broker_host = broker_host + self.broker_port = broker_port + self.keepalive = keepalive + self._client_id = client_id or f"wifi-densepose-client-{uuid.uuid4().hex[:12]}" + self._handlers: dict[str, MessageHandler] = {} + self._handlers_lock = threading.Lock() + self._client = mqtt.Client( + callback_api_version=CallbackAPIVersion.VERSION2, + client_id=self._client_id, + clean_session=True, + ) + if username is not None: + self._client.username_pw_set(username, password) + if tls: + self._client.tls_set() + self._client.on_connect = self._on_connect + self._client.on_message = self._on_message + self._client.on_disconnect = self._on_disconnect + self._started = False + self._connected_event = threading.Event() + + @property + def client_id(self) -> str: + return self._client_id + + @property + def connected(self) -> bool: + return self._connected_event.is_set() + + # ── handler registration ───────────────────────────────────────── + + def on_message(self, topic_pattern: str, handler: MessageHandler) -> None: + """Register a handler for a topic pattern. Replaces any + previous handler for the same pattern.""" + with self._handlers_lock: + self._handlers[topic_pattern] = handler + + def unsubscribe_handler(self, topic_pattern: str) -> None: + with self._handlers_lock: + self._handlers.pop(topic_pattern, None) + if self._started: + self._client.unsubscribe(topic_pattern) + + # ── lifecycle ──────────────────────────────────────────────────── + + def start(self) -> None: + """Connect to the broker and enter the network loop in a + background thread. Idempotent.""" + if self._started: + return + self._client.connect(self.broker_host, self.broker_port, self.keepalive) + self._client.loop_start() + self._started = True + + def wait_connected(self, timeout: float = 5.0) -> bool: + """Block until CONNACK has been received. Returns True on + connect, False on timeout. Mirrors the rumqttc SubAck pump + pattern but for paho's connect step.""" + return self._connected_event.wait(timeout=timeout) + + def stop(self) -> None: + """Disconnect and stop the network loop. Idempotent.""" + if not self._started: + return + try: + self._client.disconnect() + except Exception as e: # pragma: no cover — best-effort + log.debug("ignored mqtt disconnect error: %r", e) + try: + self._client.loop_stop() + except Exception as e: # pragma: no cover + log.debug("ignored mqtt loop_stop error: %r", e) + self._started = False + self._connected_event.clear() + + def publish( + self, + topic: str, + payload: Any, + *, + qos: int = 0, + retain: bool = False, + ) -> None: + """Publish a payload. Dicts/lists are JSON-encoded; bytes pass + through; strings are encoded UTF-8.""" + if isinstance(payload, (dict, list)): + data: Any = json.dumps(payload, default=str) + else: + data = payload + info = self._client.publish(topic, data, qos=qos, retain=retain) + # paho v2 returns MQTTMessageInfo; rc != MQTT_ERR_SUCCESS is a + # broker-side error we should propagate so callers don't think + # the publish succeeded. + if info.rc != mqtt.MQTT_ERR_SUCCESS: + raise RuntimeError(f"mqtt publish failed: topic={topic} rc={info.rc}") + + # ── paho callbacks (v2 signatures) ─────────────────────────────── + + def _on_connect(self, client: Any, _userdata: Any, _flags: Any, reason_code: Any, _properties: Any = None) -> None: + # paho v2 passes ReasonCode; success is 0 ("Success" / Granted_QoS_0) + rc = int(reason_code) if hasattr(reason_code, "__int__") else reason_code + if rc == 0: + self._connected_event.set() + # Re-subscribe to all known patterns. Important after a + # reconnect — paho doesn't auto-resubscribe with + # clean_session=True. + with self._handlers_lock: + patterns = list(self._handlers.keys()) + for pattern in patterns: + client.subscribe(pattern) + log.debug("mqtt CONNACK ok; subscribed to %d pattern(s)", len(patterns)) + else: + log.warning("mqtt CONNACK with non-success rc=%r", reason_code) + + def _on_disconnect(self, _client: Any, _userdata: Any, _flags: Any = None, reason_code: Any = None, _properties: Any = None) -> None: + self._connected_event.clear() + log.debug("mqtt disconnected rc=%r", reason_code) + + def _on_message(self, _client: Any, _userdata: Any, message: Any) -> None: + topic = message.topic + # Best-effort JSON decode — fall back to raw bytes if it's not JSON. + payload: Any + try: + payload = json.loads(message.payload.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + payload = message.payload + + with self._handlers_lock: + handlers = list(self._handlers.items()) + + for pattern, handler in handlers: + if _topic_matches(pattern, topic): + try: + handler(topic, payload) + except Exception as e: # never let a user callback crash the loop + log.exception("handler for pattern %r raised: %r", pattern, e) + + # ── re-subscribe on demand ────────────────────────────────────── + + def subscribe_registered(self) -> None: + """Explicitly issue SUBSCRIBE for every registered handler. + Useful when you registered handlers AFTER calling start(). + """ + if not self._started: + return + with self._handlers_lock: + patterns = list(self._handlers.keys()) + for pattern in patterns: + self._client.subscribe(pattern) + + +# ─── Topic-pattern matching ────────────────────────────────────────── + + +def _topic_matches(pattern: str, topic: str) -> bool: + """MQTT topic wildcard matcher. + + - `+` matches exactly one topic level + - `#` matches one or more remaining levels (must be the final segment) + """ + p_parts = pattern.split("/") + t_parts = topic.split("/") + i = 0 + while i < len(p_parts): + if p_parts[i] == "#": + return i == len(p_parts) - 1 and len(t_parts) >= i + if i >= len(t_parts): + return False + if p_parts[i] == "+": + i += 1 + continue + if p_parts[i] != t_parts[i]: + return False + i += 1 + return len(p_parts) == len(t_parts) diff --git a/python/wifi_densepose/client/primitives.py b/python/wifi_densepose/client/primitives.py new file mode 100644 index 00000000..daa75d60 --- /dev/null +++ b/python/wifi_densepose/client/primitives.py @@ -0,0 +1,222 @@ +"""ADR-117 P4 — Typed listener for HA-MIND semantic primitives. + +ADR-115 §3.12 defines 10 fused inference outputs that the sensing-server +publishes under the HA-DISCO MQTT namespace. This module gives clients +a typed handle on them so they can write `if event.kind == +SemanticPrimitive.SomeoneSleeping: ...` instead of pattern-matching +strings. + +The 10 v1 primitives (ADR-115 §3.12.1): + +| Enum value | Topic suffix | Output kind | +|---|---|---| +| `SomeoneSleeping` | `someone_sleeping` | binary_sensor | +| `PossibleDistress` | `possible_distress` | binary_sensor + event | +| `RoomActive` | `room_active` | binary_sensor | +| `ElderlyInactivityAnomaly` | `elderly_inactivity` | binary_sensor + event | +| `MeetingInProgress` | `meeting_in_progress` | binary_sensor | +| `BathroomOccupied` | `bathroom_occupied` | binary_sensor | +| `FallRiskElevated` | `fall_risk_elevated` | sensor (0–100) + event | +| `BedExit` | `bed_exit` | event | +| `NoMovementSafety` | `no_movement_safety` | binary_sensor + event | +| `MultiRoomTransition` | `multi_room_transition` | event | +""" + +from __future__ import annotations + +import enum +import json +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + + +# ─── Enum ──────────────────────────────────────────────────────────── + + +class SemanticPrimitive(enum.Enum): + """One of the 10 HA-MIND fused inference outputs.""" + SomeoneSleeping = "someone_sleeping" + PossibleDistress = "possible_distress" + RoomActive = "room_active" + ElderlyInactivityAnomaly = "elderly_inactivity" + MeetingInProgress = "meeting_in_progress" + BathroomOccupied = "bathroom_occupied" + FallRiskElevated = "fall_risk_elevated" + BedExit = "bed_exit" + NoMovementSafety = "no_movement_safety" + MultiRoomTransition = "multi_room_transition" + + @classmethod + def from_object_id(cls, object_id: str) -> Optional["SemanticPrimitive"]: + for v in cls: + if v.value == object_id: + return v + return None + + +# ─── Event payload ─────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class SemanticPrimitiveEvent: + """A single fired event for one semantic primitive. + + `state` semantics depend on the primitive kind: + - binary_sensor: "ON" / "OFF" + - sensor: numeric string (e.g. "73" for fall_risk_elevated 0–100) + - event: "fired" or an event-class string like "bed_exit_detected" + """ + kind: SemanticPrimitive + node_id: str + state: str + confidence: float = 0.0 + explanation: tuple[str, ...] = () + timestamp: float = 0.0 + raw: dict[str, Any] = field(default_factory=dict, hash=False, compare=False) + + +# ─── Listener ──────────────────────────────────────────────────────── + + +Callback = Callable[[SemanticPrimitiveEvent], None] + + +class SemanticPrimitiveListener: + """Routes raw MQTT state messages to per-primitive callbacks. + + Designed to plug into RuViewMqttClient: + + ```python + from wifi_densepose.client import ( + RuViewMqttClient, SemanticPrimitive, SemanticPrimitiveListener + ) + + listener = SemanticPrimitiveListener() + listener.on(SemanticPrimitive.SomeoneSleeping, lambda e: print(e)) + + client = RuViewMqttClient() + client.on_message( + "homeassistant/+/wifi_densepose_+/+/state", + listener.handle_mqtt_message, + ) + client.start() + ``` + + The listener itself never touches MQTT — it's a pure router. You + feed it `(topic, payload)` pairs and it figures out which primitive + the topic refers to and decodes the payload. + """ + + # Matches state topics for any of the 10 primitives. + # homeassistant//wifi_densepose_//state + _SLUGS = {p.value for p in SemanticPrimitive} + + def __init__(self) -> None: + self._handlers: dict[Optional[SemanticPrimitive], list[Callback]] = {} + + def on(self, primitive: SemanticPrimitive, cb: Callback) -> None: + """Register a callback for a specific primitive.""" + self._handlers.setdefault(primitive, []).append(cb) + + def on_any(self, cb: Callback) -> None: + """Register a callback that fires for ALL primitives. Useful + for logging or dashboards.""" + self._handlers.setdefault(None, []).append(cb) + + def handle_mqtt_message(self, topic: str, payload: Any) -> Optional[SemanticPrimitiveEvent]: + """Decode one MQTT message into a SemanticPrimitiveEvent and + fire the matching callbacks. Returns the event (or None if the + topic was not a semantic-primitive state topic).""" + parts = topic.split("/") + # Shape: homeassistant / / wifi_densepose_ / / state + if len(parts) != 5: + return None + if parts[0] != "homeassistant" or parts[4] != "state": + return None + node_prefix = parts[2] + if not node_prefix.startswith("wifi_densepose_"): + return None + slug = parts[3] + if slug not in self._SLUGS: + return None + + primitive = SemanticPrimitive.from_object_id(slug) + if primitive is None: # pragma: no cover — guarded above + return None + + node_id = node_prefix[len("wifi_densepose_"):] + event = _decode_event(primitive, node_id, payload) + + # Dispatch — primitive-specific first, then "any" handlers. + for cb in self._handlers.get(primitive, ()): + cb(event) + for cb in self._handlers.get(None, ()): + cb(event) + return event + + +def _decode_event( + primitive: SemanticPrimitive, + node_id: str, + payload: Any, +) -> SemanticPrimitiveEvent: + """Decode a raw state payload into a typed event. + + HA state payloads come in two shapes: + 1. Plain string ("ON", "OFF", "73") — used by binary_sensor/sensor + with no json_attributes_topic. + 2. JSON object with `state` + `confidence` + `explanation` fields — + used by HA-MIND semantic primitives per ADR-115 §3.12.4. + + Both are supported transparently. + """ + if isinstance(payload, bytes): + try: + payload = payload.decode("utf-8") + except UnicodeDecodeError: + return SemanticPrimitiveEvent( + kind=primitive, node_id=node_id, state="", raw={} + ) + + if isinstance(payload, dict): + body = payload + elif isinstance(payload, str): + # Try to JSON-decode; if it's not JSON, treat as a plain state string. + try: + decoded = json.loads(payload) + except json.JSONDecodeError: + return SemanticPrimitiveEvent( + kind=primitive, + node_id=node_id, + state=payload, + raw={"state": payload}, + ) + if isinstance(decoded, dict): + body = decoded + else: + return SemanticPrimitiveEvent( + kind=primitive, + node_id=node_id, + state=str(decoded), + raw={"state": decoded}, + ) + else: + return SemanticPrimitiveEvent( + kind=primitive, node_id=node_id, state=str(payload), raw={} + ) + + expl = body.get("explanation") or body.get("reason") or () + if isinstance(expl, str): + expl_tuple: tuple[str, ...] = (expl,) + else: + expl_tuple = tuple(str(x) for x in expl) + + return SemanticPrimitiveEvent( + kind=primitive, + node_id=node_id, + state=str(body.get("state", "")), + confidence=float(body.get("confidence", 0.0)), + explanation=expl_tuple, + timestamp=float(body.get("timestamp", 0.0)), + raw=body, + ) diff --git a/python/wifi_densepose/client/ws.py b/python/wifi_densepose/client/ws.py new file mode 100644 index 00000000..385ff7f0 --- /dev/null +++ b/python/wifi_densepose/client/ws.py @@ -0,0 +1,256 @@ +"""ADR-117 P4 — Asyncio WebSocket client for the sensing-server. + +The Rust sensing-server (`v2/crates/wifi-densepose-sensing-server`) +broadcasts three structured message types over `ws://:/ws/sensing`: + +| `type` field | Source line in main.rs | Payload shape | +|---|---|---| +| `connection_established` | 2596 | `{node_id, version, capabilities}` | +| `pose_data` | 2655 | `{node_id, timestamp, persons: [...], confidence}` | +| `edge_vitals` | 4548 | `{node_id, presence, fall_detected, motion, breathing_rate_bpm, heartrate_bpm, ...}` | + +`SensingClient` is a pure-Python asyncio wrapper around `websockets>=12` +that connects, decodes JSON, and yields typed dataclasses. + +Example: + +```python +import asyncio +from wifi_densepose.client import SensingClient, EdgeVitalsMessage + +async def main(): + async with SensingClient("ws://localhost:8765/ws/sensing") as client: + async for msg in client.stream(): + if isinstance(msg, EdgeVitalsMessage): + print(f"BR={msg.breathing_rate_bpm}, HR={msg.heartrate_bpm}") + +asyncio.run(main()) +``` +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass, field +from typing import Any, AsyncIterator, Optional + +# Defer import — only fail at construction time, not at module load. +try: + import websockets # type: ignore[import-not-found] + from websockets.exceptions import ConnectionClosed # type: ignore[import-not-found] + _WEBSOCKETS_AVAILABLE = True +except ImportError: # pragma: no cover + _WEBSOCKETS_AVAILABLE = False + + +log = logging.getLogger(__name__) + + +# ─── Typed messages ────────────────────────────────────────────────── + + +@dataclass(frozen=True) +class SensingMessage: + """Base class for typed sensing-server messages. The original JSON + payload is preserved in ``raw`` for forward-compatibility with + fields not yet modelled here.""" + type: str + raw: dict[str, Any] = field(default_factory=dict, hash=False, compare=False) + + +@dataclass(frozen=True) +class ConnectionEstablishedMessage(SensingMessage): + """First message after a successful WS handshake. Lets the client + discover the node ID and capability flags without making a separate + REST call.""" + node_id: str = "" + version: str = "" + capabilities: tuple[str, ...] = () + + +@dataclass(frozen=True) +class EdgeVitalsMessage(SensingMessage): + """Vital-sign telemetry fused from the edge-vitals path + (ADR-021/ADR-110). Optional fields may be ``None`` when the + upstream channel hasn't produced a measurement yet.""" + node_id: str = "" + presence: bool = False + fall_detected: bool = False + motion: float = 0.0 + breathing_rate_bpm: Optional[float] = None + heartrate_bpm: Optional[float] = None + n_persons: int = 0 + motion_energy: float = 0.0 + presence_score: float = 0.0 + rssi: Optional[float] = None + + +@dataclass(frozen=True) +class PoseDataMessage(SensingMessage): + """17-keypoint pose data broadcast at the sensing-server's frame + cadence. Persons are a list of opaque dicts — typed PoseEstimate + decoding lives in the P2 bindings; the WS client passes through.""" + node_id: str = "" + timestamp: float = 0.0 + persons: tuple[dict[str, Any], ...] = () + confidence: float = 0.0 + + +# ─── Decoder ───────────────────────────────────────────────────────── + + +def _decode(raw_text: str) -> SensingMessage: + """Decode a single WS frame into a typed message. + + Unknown ``type`` values yield a plain ``SensingMessage`` rather + than raising — the sensing-server is on a faster release cadence + than this client, and unknown types should not break the stream. + """ + obj = json.loads(raw_text) + if not isinstance(obj, dict): + raise ValueError(f"sensing-server emitted non-dict payload: {type(obj).__name__}") + mtype = obj.get("type", "") + if mtype == "connection_established": + return ConnectionEstablishedMessage( + type=mtype, + raw=obj, + node_id=obj.get("node_id", ""), + version=obj.get("version", ""), + capabilities=tuple(obj.get("capabilities", ())), + ) + if mtype == "edge_vitals": + return EdgeVitalsMessage( + type=mtype, + raw=obj, + node_id=obj.get("node_id", ""), + presence=bool(obj.get("presence", False)), + fall_detected=bool(obj.get("fall_detected", False)), + motion=float(obj.get("motion", 0.0)), + breathing_rate_bpm=( + float(obj["breathing_rate_bpm"]) + if obj.get("breathing_rate_bpm") is not None else None + ), + heartrate_bpm=( + float(obj["heartrate_bpm"]) + if obj.get("heartrate_bpm") is not None else None + ), + n_persons=int(obj.get("n_persons", 0)), + motion_energy=float(obj.get("motion_energy", 0.0)), + presence_score=float(obj.get("presence_score", 0.0)), + rssi=(float(obj["rssi"]) if obj.get("rssi") is not None else None), + ) + if mtype == "pose_data": + persons = obj.get("persons", ()) + return PoseDataMessage( + type=mtype, + raw=obj, + node_id=obj.get("node_id", ""), + timestamp=float(obj.get("timestamp", 0.0)), + persons=tuple(persons) if isinstance(persons, list) else (), + confidence=float(obj.get("confidence", 0.0)), + ) + return SensingMessage(type=mtype, raw=obj) + + +# ─── Client ────────────────────────────────────────────────────────── + + +class SensingClient: + """Asyncio WebSocket client for the RuView sensing-server. + + Usage as async context manager: + + ```python + async with SensingClient("ws://localhost:8765/ws/sensing") as c: + async for msg in c.stream(): + ... + ``` + + The client does NOT auto-reconnect — if you want resilience, wrap + the ``async with`` in your own retry loop. Auto-reconnect logic is + application-specific (e.g., "retry forever" for a long-running + automation vs "fail fast" for a CLI tool that should exit). + """ + + def __init__( + self, + url: str, + *, + ping_interval: float = 20.0, + ping_timeout: float = 20.0, + max_size: int = 16 * 1024 * 1024, + ) -> None: + if not _WEBSOCKETS_AVAILABLE: + raise ImportError( + "SensingClient requires the `websockets` package. Install with " + "`pip install \"wifi-densepose[client]\"` to enable the client extras." + ) + self.url = url + self._ping_interval = ping_interval + self._ping_timeout = ping_timeout + self._max_size = max_size + self._ws: Any = None # websockets.WebSocketClientProtocol — typed Any to avoid import cost + + async def __aenter__(self) -> "SensingClient": + self._ws = await websockets.connect( + self.url, + ping_interval=self._ping_interval, + ping_timeout=self._ping_timeout, + max_size=self._max_size, + ) + return self + + async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + await self.close() + + async def close(self) -> None: + """Idempotent connection close.""" + if self._ws is not None: + try: + await self._ws.close() + except Exception as e: # pragma: no cover — best-effort close + log.debug("ignored WS close error: %r", e) + self._ws = None + + async def stream(self) -> AsyncIterator[SensingMessage]: + """Yield typed messages until the server closes the connection + or the context is exited. + + Decode failures on individual frames are logged at WARN and + swallowed — a malformed frame should not terminate the stream + (the next frame may be fine).""" + if self._ws is None: + raise RuntimeError("SensingClient not connected. Use `async with` first.") + try: + async for frame in self._ws: + if isinstance(frame, bytes): + frame = frame.decode("utf-8", errors="replace") + try: + yield _decode(frame) + except (ValueError, json.JSONDecodeError) as e: + log.warning("dropping malformed sensing-server frame: %r", e) + except ConnectionClosed: + # Graceful EOF — exit the iterator normally. + return + + async def send_ping(self) -> None: + """Send an application-level ping. The sensing-server replies + with `{"type": "pong"}` (main.rs:2698).""" + if self._ws is None: + raise RuntimeError("SensingClient not connected. Use `async with` first.") + await self._ws.send(json.dumps({"type": "ping"})) + + async def recv_one(self, *, timeout: Optional[float] = None) -> SensingMessage: + """Receive a single decoded message. Convenience for short + scripts and tests that don't need an async generator.""" + if self._ws is None: + raise RuntimeError("SensingClient not connected. Use `async with` first.") + if timeout is None: + frame = await self._ws.recv() + else: + frame = await asyncio.wait_for(self._ws.recv(), timeout=timeout) + if isinstance(frame, bytes): + frame = frame.decode("utf-8", errors="replace") + return _decode(frame)