feat(adr-125 iter 3): BFLD PrivacyGate + semantic-event naming at HAP boundary

Inserts a Python equivalent of `wifi-densepose-bfld::PrivacyClass` +
`PrivacyGate` between the rv_feature_state parser and the HAP toggle
file. ADR-125 §2.1.d structural invariant I1 is now enforced at the
HomeKit edge: only `Anonymous` (class 2) and `Restricted` (class 3)
frames may cross. `Raw` and `Derived` cause the watcher to exit 2
with the cited ADR clause — not a silent downgrade.

Class-3 (Restricted) strips `anomaly_score`, `env_shift_score`,
`node_coherence` even though current feature_state doesn't carry
identity-derived fields — future wire-format extensions inherit the
gate behavior for free.

Operator-facing semantic naming follows ADR-125 §2.1.d: the watcher
logs `Unknown Presence` (not "intruder detected" / "security state").
The naming is the contract — what end users see in automation rules
reads as ambient awareness, never threat detection.

Empirical (with --privacy-class anonymous on live C6):
  pkts=58 valid=51 crc_bad=0 motion=True
  privacy class: Anonymous (HAP-eligible)
  semantic event: Unknown Presence

Refuse path validated:
  $ ~/hap-venv/bin/python c6-presence-watcher.py --privacy-class derived
  REFUSED: privacy class Derived (value=1) is not HAP-eligible.
  ADR-125 §2.1.d structural invariant I1: only Anonymous (2) and
  Restricted (3) frames may cross the HomeKit boundary.
  $ echo $?
  2

Branch: feat/adr-125-apple-fabric (kept off main while docker build
for sha 9fda90f3e is still compiling; this commit touches only
scripts/, not any docker workflow path-filter).

Refs ADR-125 §2.1.d, ADR-118 §2.1/§2.2.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-25 16:26:21 -04:00
parent 1f13aa96c2
commit 3edf5f07dd
1 changed files with 112 additions and 11 deletions

View File

@ -52,6 +52,47 @@ RV_FEATURE_STATE_MAGIC = 0xC5110006
RV_QFLAG_PRESENCE_VALID = 1 << 0
PACKET_SIZE = 60
class PrivacyClass:
"""Mirror of `wifi-densepose-bfld::PrivacyClass` (Rust, ADR-118 §2.1).
The HAP boundary is governed by ADR-125 §2.1.d + ADR-122 §2.4: only
`Anonymous` (2) and `Restricted` (3) frames may cross. `Raw` (0) and
`Derived` (1) are HAP-ineligible by structural invariant I1.
"""
RAW = 0
DERIVED = 1
ANONYMOUS = 2
RESTRICTED = 3
_names = {RAW: "Raw", DERIVED: "Derived", ANONYMOUS: "Anonymous",
RESTRICTED: "Restricted"}
@classmethod
def name(cls, value: int) -> str:
return cls._names.get(value, f"Unknown({value})")
@classmethod
def from_str(cls, s: str) -> int:
m = {"raw": cls.RAW, "derived": cls.DERIVED,
"anonymous": cls.ANONYMOUS, "restricted": cls.RESTRICTED}
if s.lower() not in m:
raise ValueError(f"invalid privacy class {s!r}; "
f"expected one of {list(m.keys())}")
return m[s.lower()]
@classmethod
def allows_hap(cls, value: int) -> bool:
"""ADR-125 §2.1.d gate: only class-2/3 cross the HomeKit boundary."""
return value in (cls.ANONYMOUS, cls.RESTRICTED)
# Semantic-event naming per ADR-125 §2.1.d. The HAP bridge keeps
# advertising a generic MotionSensor; this is the operator-facing
# *label* for the event, written into the watcher log + summary line
# so the operator never sees "intruder detected" framing.
SEMANTIC_EVENT_UNKNOWN_PRESENCE = "Unknown Presence"
# Hysteresis — entry / exit thresholds keep the HomeKit characteristic
# from flapping when presence_score sits near the boundary.
PRESENCE_ON_THRESHOLD = 0.40
@ -93,7 +134,8 @@ def parse_packet(buf: bytes):
}
def set_motion(toggle_file: str, on: bool, current: bool) -> bool:
def set_motion(toggle_file: str, on: bool, current: bool,
semantic: str = SEMANTIC_EVENT_UNKNOWN_PRESENCE) -> bool:
"""Touch / unlink the toggle file iff state changes. Return new state."""
if on == current:
return current
@ -105,17 +147,68 @@ def set_motion(toggle_file: str, on: bool, current: bool) -> bool:
os.unlink(toggle_file)
except FileNotFoundError:
pass
print(f"[{time.strftime('%H:%M:%S')}] motion -> {on}", flush=True)
label = semantic if on else f"clear {semantic}"
print(f"[{time.strftime('%H:%M:%S')}] {label} (motion -> {on})",
flush=True)
return on
def apply_privacy_gate(pkt: dict, allowed_class: int) -> dict | None:
"""ADR-118 PrivacyGate equivalent at the HAP boundary.
The C6 emits sensor-aggregate `feature_state` frames *not* raw BFI,
*not* identity embeddings. We classify the emit at the chosen
operator class. Returns the (possibly redacted) event dict, or
`None` if the class doesn't allow HAP crossing.
"""
if not PrivacyClass.allows_hap(allowed_class):
return None
# `Restricted` (3) strips anything that could be a per-occupant
# fingerprint — even though feature_state currently carries none.
# Future iters extending the wire format will need to respect this.
if allowed_class == PrivacyClass.RESTRICTED:
return {
"presence": pkt["presence"], "motion": pkt["motion"],
"presence_valid": pkt["presence_valid"],
"node_id": pkt["node_id"], "seq": pkt["seq"],
# anomaly_score / env_shift / coherence dropped (could
# reveal longitudinal drift signatures over time).
}
# `Anonymous` (2) — production default. Carries the aggregate
# vitals so HomeKit `Unknown Presence` automations can pick up
# context, but no identity-derived fields.
return {
"presence": pkt["presence"], "motion": pkt["motion"],
"presence_valid": pkt["presence_valid"],
"node_id": pkt["node_id"], "seq": pkt["seq"],
"resp_bpm": pkt["resp_bpm"], "hb_bpm": pkt["hb_bpm"],
"anomaly": pkt["anomaly"], "env_shift": pkt["env_shift"],
"coherence": pkt["coherence"],
}
def main() -> int:
p = argparse.ArgumentParser()
p.add_argument("--port", type=int, default=5005)
p.add_argument("--toggle", default="/tmp/ruview-motion")
p.add_argument("--bind", default="0.0.0.0")
p.add_argument("--privacy-class", default="anonymous",
choices=["raw", "derived", "anonymous", "restricted"],
help="ADR-118 PrivacyClass; only anonymous/restricted "
"may cross the HAP boundary (ADR-125 §2.1.d).")
args = p.parse_args()
privacy_class = PrivacyClass.from_str(args.privacy_class)
if not PrivacyClass.allows_hap(privacy_class):
sys.stderr.write(
f"REFUSED: privacy class {PrivacyClass.name(privacy_class)} "
f"(value={privacy_class}) is not HAP-eligible. "
f"ADR-125 §2.1.d structural invariant I1: only Anonymous (2) "
f"and Restricted (3) frames may cross the HomeKit boundary. "
f"Use --privacy-class anonymous (default) or restricted.\n"
)
return 2
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
@ -128,6 +221,10 @@ def main() -> int:
print(f"[c6-presence] thresholds: on>={PRESENCE_ON_THRESHOLD}, "
f"off<={PRESENCE_OFF_THRESHOLD}, idle_release={IDLE_RELEASE_S}s",
flush=True)
print(f"[c6-presence] privacy class: "
f"{PrivacyClass.name(privacy_class)} (HAP-eligible)", flush=True)
print(f"[c6-presence] semantic event: {SEMANTIC_EVENT_UNKNOWN_PRESENCE}",
flush=True)
running = True
def _stop(*_):
@ -156,15 +253,19 @@ def main() -> int:
if pkt is not None:
if not pkt["crc_ok"]:
n_crc_bad += 1
elif pkt["presence_valid"]:
n_valid += 1
presence_sum += pkt["presence"]
motion_sum += pkt["motion"]
last_packet_ts = now
if not motion and pkt["presence"] >= PRESENCE_ON_THRESHOLD:
motion = set_motion(args.toggle, True, motion)
elif motion and pkt["presence"] <= PRESENCE_OFF_THRESHOLD:
motion = set_motion(args.toggle, False, motion)
else:
# ADR-118 PrivacyGate: classify + redact before the
# HAP boundary. Returns None for non-eligible classes.
gated = apply_privacy_gate(pkt, privacy_class)
if gated is not None and gated["presence_valid"]:
n_valid += 1
presence_sum += gated["presence"]
motion_sum += gated["motion"]
last_packet_ts = now
if not motion and gated["presence"] >= PRESENCE_ON_THRESHOLD:
motion = set_motion(args.toggle, True, motion)
elif motion and gated["presence"] <= PRESENCE_OFF_THRESHOLD:
motion = set_motion(args.toggle, False, motion)
# Idle release — if the C6 stops sending entirely, clear motion.
if motion and last_packet_ts and (now - last_packet_ts) > IDLE_RELEASE_S: