223 lines
7.8 KiB
Python
223 lines
7.8 KiB
Python
"""ADR-117 P4 — Typed listener for HA-MIND semantic primitives.
|
||
|
||
ADR-115 §3.12 defines 10 fused inference outputs that the sensing-server
|
||
publishes under the HA-DISCO MQTT namespace. This module gives clients
|
||
a typed handle on them so they can write `if event.kind ==
|
||
SemanticPrimitive.SomeoneSleeping: ...` instead of pattern-matching
|
||
strings.
|
||
|
||
The 10 v1 primitives (ADR-115 §3.12.1):
|
||
|
||
| Enum value | Topic suffix | Output kind |
|
||
|---|---|---|
|
||
| `SomeoneSleeping` | `someone_sleeping` | binary_sensor |
|
||
| `PossibleDistress` | `possible_distress` | binary_sensor + event |
|
||
| `RoomActive` | `room_active` | binary_sensor |
|
||
| `ElderlyInactivityAnomaly` | `elderly_inactivity` | binary_sensor + event |
|
||
| `MeetingInProgress` | `meeting_in_progress` | binary_sensor |
|
||
| `BathroomOccupied` | `bathroom_occupied` | binary_sensor |
|
||
| `FallRiskElevated` | `fall_risk_elevated` | sensor (0–100) + event |
|
||
| `BedExit` | `bed_exit` | event |
|
||
| `NoMovementSafety` | `no_movement_safety` | binary_sensor + event |
|
||
| `MultiRoomTransition` | `multi_room_transition` | event |
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import enum
|
||
import json
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Callable, Optional
|
||
|
||
|
||
# ─── Enum ────────────────────────────────────────────────────────────
|
||
|
||
|
||
class SemanticPrimitive(enum.Enum):
|
||
"""One of the 10 HA-MIND fused inference outputs."""
|
||
SomeoneSleeping = "someone_sleeping"
|
||
PossibleDistress = "possible_distress"
|
||
RoomActive = "room_active"
|
||
ElderlyInactivityAnomaly = "elderly_inactivity"
|
||
MeetingInProgress = "meeting_in_progress"
|
||
BathroomOccupied = "bathroom_occupied"
|
||
FallRiskElevated = "fall_risk_elevated"
|
||
BedExit = "bed_exit"
|
||
NoMovementSafety = "no_movement_safety"
|
||
MultiRoomTransition = "multi_room_transition"
|
||
|
||
@classmethod
|
||
def from_object_id(cls, object_id: str) -> Optional["SemanticPrimitive"]:
|
||
for v in cls:
|
||
if v.value == object_id:
|
||
return v
|
||
return None
|
||
|
||
|
||
# ─── Event payload ───────────────────────────────────────────────────
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SemanticPrimitiveEvent:
|
||
"""A single fired event for one semantic primitive.
|
||
|
||
`state` semantics depend on the primitive kind:
|
||
- binary_sensor: "ON" / "OFF"
|
||
- sensor: numeric string (e.g. "73" for fall_risk_elevated 0–100)
|
||
- event: "fired" or an event-class string like "bed_exit_detected"
|
||
"""
|
||
kind: SemanticPrimitive
|
||
node_id: str
|
||
state: str
|
||
confidence: float = 0.0
|
||
explanation: tuple[str, ...] = ()
|
||
timestamp: float = 0.0
|
||
raw: dict[str, Any] = field(default_factory=dict, hash=False, compare=False)
|
||
|
||
|
||
# ─── Listener ────────────────────────────────────────────────────────
|
||
|
||
|
||
Callback = Callable[[SemanticPrimitiveEvent], None]
|
||
|
||
|
||
class SemanticPrimitiveListener:
|
||
"""Routes raw MQTT state messages to per-primitive callbacks.
|
||
|
||
Designed to plug into RuViewMqttClient:
|
||
|
||
```python
|
||
from wifi_densepose.client import (
|
||
RuViewMqttClient, SemanticPrimitive, SemanticPrimitiveListener
|
||
)
|
||
|
||
listener = SemanticPrimitiveListener()
|
||
listener.on(SemanticPrimitive.SomeoneSleeping, lambda e: print(e))
|
||
|
||
client = RuViewMqttClient()
|
||
client.on_message(
|
||
"homeassistant/+/wifi_densepose_+/+/state",
|
||
listener.handle_mqtt_message,
|
||
)
|
||
client.start()
|
||
```
|
||
|
||
The listener itself never touches MQTT — it's a pure router. You
|
||
feed it `(topic, payload)` pairs and it figures out which primitive
|
||
the topic refers to and decodes the payload.
|
||
"""
|
||
|
||
# Matches state topics for any of the 10 primitives.
|
||
# homeassistant/<kind>/wifi_densepose_<node>/<primitive_slug>/state
|
||
_SLUGS = {p.value for p in SemanticPrimitive}
|
||
|
||
def __init__(self) -> None:
|
||
self._handlers: dict[Optional[SemanticPrimitive], list[Callback]] = {}
|
||
|
||
def on(self, primitive: SemanticPrimitive, cb: Callback) -> None:
|
||
"""Register a callback for a specific primitive."""
|
||
self._handlers.setdefault(primitive, []).append(cb)
|
||
|
||
def on_any(self, cb: Callback) -> None:
|
||
"""Register a callback that fires for ALL primitives. Useful
|
||
for logging or dashboards."""
|
||
self._handlers.setdefault(None, []).append(cb)
|
||
|
||
def handle_mqtt_message(self, topic: str, payload: Any) -> Optional[SemanticPrimitiveEvent]:
|
||
"""Decode one MQTT message into a SemanticPrimitiveEvent and
|
||
fire the matching callbacks. Returns the event (or None if the
|
||
topic was not a semantic-primitive state topic)."""
|
||
parts = topic.split("/")
|
||
# Shape: homeassistant / <kind> / wifi_densepose_<node> / <slug> / state
|
||
if len(parts) != 5:
|
||
return None
|
||
if parts[0] != "homeassistant" or parts[4] != "state":
|
||
return None
|
||
node_prefix = parts[2]
|
||
if not node_prefix.startswith("wifi_densepose_"):
|
||
return None
|
||
slug = parts[3]
|
||
if slug not in self._SLUGS:
|
||
return None
|
||
|
||
primitive = SemanticPrimitive.from_object_id(slug)
|
||
if primitive is None: # pragma: no cover — guarded above
|
||
return None
|
||
|
||
node_id = node_prefix[len("wifi_densepose_"):]
|
||
event = _decode_event(primitive, node_id, payload)
|
||
|
||
# Dispatch — primitive-specific first, then "any" handlers.
|
||
for cb in self._handlers.get(primitive, ()):
|
||
cb(event)
|
||
for cb in self._handlers.get(None, ()):
|
||
cb(event)
|
||
return event
|
||
|
||
|
||
def _decode_event(
|
||
primitive: SemanticPrimitive,
|
||
node_id: str,
|
||
payload: Any,
|
||
) -> SemanticPrimitiveEvent:
|
||
"""Decode a raw state payload into a typed event.
|
||
|
||
HA state payloads come in two shapes:
|
||
1. Plain string ("ON", "OFF", "73") — used by binary_sensor/sensor
|
||
with no json_attributes_topic.
|
||
2. JSON object with `state` + `confidence` + `explanation` fields —
|
||
used by HA-MIND semantic primitives per ADR-115 §3.12.4.
|
||
|
||
Both are supported transparently.
|
||
"""
|
||
if isinstance(payload, bytes):
|
||
try:
|
||
payload = payload.decode("utf-8")
|
||
except UnicodeDecodeError:
|
||
return SemanticPrimitiveEvent(
|
||
kind=primitive, node_id=node_id, state="", raw={}
|
||
)
|
||
|
||
if isinstance(payload, dict):
|
||
body = payload
|
||
elif isinstance(payload, str):
|
||
# Try to JSON-decode; if it's not JSON, treat as a plain state string.
|
||
try:
|
||
decoded = json.loads(payload)
|
||
except json.JSONDecodeError:
|
||
return SemanticPrimitiveEvent(
|
||
kind=primitive,
|
||
node_id=node_id,
|
||
state=payload,
|
||
raw={"state": payload},
|
||
)
|
||
if isinstance(decoded, dict):
|
||
body = decoded
|
||
else:
|
||
return SemanticPrimitiveEvent(
|
||
kind=primitive,
|
||
node_id=node_id,
|
||
state=str(decoded),
|
||
raw={"state": decoded},
|
||
)
|
||
else:
|
||
return SemanticPrimitiveEvent(
|
||
kind=primitive, node_id=node_id, state=str(payload), raw={}
|
||
)
|
||
|
||
expl = body.get("explanation") or body.get("reason") or ()
|
||
if isinstance(expl, str):
|
||
expl_tuple: tuple[str, ...] = (expl,)
|
||
else:
|
||
expl_tuple = tuple(str(x) for x in expl)
|
||
|
||
return SemanticPrimitiveEvent(
|
||
kind=primitive,
|
||
node_id=node_id,
|
||
state=str(body.get("state", "")),
|
||
confidence=float(body.get("confidence", 0.0)),
|
||
explanation=expl_tuple,
|
||
timestamp=float(body.get("timestamp", 0.0)),
|
||
raw=body,
|
||
)
|