From 194a2e1637f77bd1c5e117899d21ef0eebd842a6 Mon Sep 17 00:00:00 2001 From: ruv Date: Mon, 25 May 2026 16:44:58 -0400 Subject: [PATCH] feat(adr-125 tier1+2 iter 2): sensing-server-equivalent for @ruvnet/rvagent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/ruview-sensing-server.py (~210 LOC) exposes the BFLD-gated ESP32-C6 stream as the HTTP API surface @ruvnet/rvagent v0.1.0 (ADR-124, npm) expects. Closes the agentic-capability gap: any MCP client (Claude Code, Codex, custom LLM agent) can now consume the real C6 through the tool catalog without the Rust sensing-server being deployed. Endpoints (mirrors tools/ruview-mcp/src/tools/*.ts): GET /health GET /api/v1/sensing/latest — ADR-102 schema v2 GET /api/v1/edge/registry — node enumeration GET /api/v1/vitals//latest — EdgeVitalsMessage GET /api/v1/bfld//last_scan — BfldScanResponse POST /api/v1/bfld//subscribe — subscription_id c6-presence-watcher.py now writes a companion `/tmp/ruview-last- feature.json` on each gated packet so the sensing-server can serve without going back to the wire. Atomic tmp+rename. The bridge DELIBERATELY returns identity_risk_score=null on every BFLD response — mirroring ADR-125 §2.1.d at the HTTP boundary even though the rvagent schema's slot is nullable. Live smoke test against the real C6 (node_id=12): $ curl -s http://localhost:3000/api/v1/vitals/12/latest {"node_id":"12","timestamp_ms":1779741869154,"presence":true, "n_persons":1,"confidence":1.0,"breathing_rate_bpm":18.75, "heartrate_bpm":40.0,"motion":1.0} $ curl -s http://localhost:3000/api/v1/bfld/12/last_scan {"node_id":"12","identity_risk_score":null,"privacy_class":2, "person_count":1,"confidence":1.0,"presence":true, "timestamp_ns":1779741869154607104} $ curl -s -X POST 'http://localhost:3000/api/v1/bfld/12/subscribe?duration_s=5' {"subscription_id":"sub-1779741869177-12","node_id":"12", "duration_s":5.0,"endpoint_hint":"poll GET ..."} Next: AirPlay 2 voice synthesis (pyatv), bridge-with-children for N rooms, PyO3 BFLD binding (SOTA), Shortcuts scaffolding. Refs ADR-124 (@ruvnet/rvagent contract), ADR-125 §2.1.d, ADR-118. Co-Authored-By: claude-flow --- scripts/c6-presence-watcher.py | 33 +++++ scripts/ruview-sensing-server.py | 221 +++++++++++++++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 scripts/ruview-sensing-server.py diff --git a/scripts/c6-presence-watcher.py b/scripts/c6-presence-watcher.py index fbb46b24..f48a3807 100644 --- a/scripts/c6-presence-watcher.py +++ b/scripts/c6-presence-watcher.py @@ -268,6 +268,37 @@ def main() -> int: except OSError: pass + # Companion contract for `scripts/ruview-sensing-server.py` (the + # @ruvnet/rvagent compatibility layer): write the full BFLD-gated + # feature snapshot so the sensing-server can serve EdgeVitalsMessage + # and BfldScanResponse without going back to the wire. + feature_path = "/tmp/ruview-last-feature.json" + + def write_feature(gated: dict, motion: bool, occupancy: bool, + privacy_cls: int) -> None: + try: + tmp = feature_path + ".tmp" + with open(tmp, "w") as fh: + json.dump({ + "node_id": str(gated["node_id"]), + "timestamp_ms": int(time.time() * 1000), + "presence": occupancy, # sustained + "motion": gated["motion"], # 0..1 float + "presence_score": gated["presence"], + "n_persons": 1 if occupancy else 0, + "confidence": min(1.0, max(0.0, gated["motion"])), + "breathing_rate_bpm": (gated["resp_bpm"] + if gated.get("resp_bpm") else None), + "heartrate_bpm": (gated["hb_bpm"] + if gated.get("hb_bpm") else None), + "anomaly_score": gated.get("anomaly"), + "privacy_class": privacy_cls, + "ts": time.time(), + }, fh) + os.replace(tmp, feature_path) + except OSError: + pass + while running: try: buf, _addr = sock.recvfrom(2048) @@ -336,6 +367,8 @@ def main() -> int: if (motion != prev_motion or not state_path.endswith(".disabled")): write_state(motion, occupancy, last_anomaly_ts) + write_feature(gated, motion, occupancy, + privacy_class) # Idle release — if the C6 stops sending entirely, clear motion # AND occupancy. diff --git a/scripts/ruview-sensing-server.py b/scripts/ruview-sensing-server.py new file mode 100644 index 00000000..8750cab7 --- /dev/null +++ b/scripts/ruview-sensing-server.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python3 +""" +ruview-sensing-server.py — ADR-125 Tier 1+2 iter 2. + +A tiny HTTP server that speaks the subset of the RuView sensing-server +HTTP API that @ruvnet/rvagent (ADR-124, npm v0.1.0) expects, sourced +from the BFLD-gated state files written by c6-presence-watcher.py. + +This is the "sensing-server-equivalent" the cron stop condition names, +and it lets any MCP agent (Claude Code via `claude mcp add rvagent`, +Codex with the matching MCP config, custom LLM client) consume the +real ESP32-C6 stream through the same MCP tool surface that the Rust +sensing-server exposes — without needing the Rust binary to be running. + +Endpoints (matched against tools/ruview-mcp/src/tools/*.ts): + + GET /health — liveness + GET /api/v1/sensing/latest — ADR-102 schema v2 + GET /api/v1/edge/registry — node enumeration + GET /api/v1/vitals//latest — EdgeVitalsMessage + GET /api/v1/bfld//last_scan — BfldScanResponse + POST /api/v1/bfld//subscribe?duration_s=N — { subscription_id } + +The source-of-truth file is `/tmp/ruview-last-feature.json` written +by the watcher on every BFLD-gated feature_state packet. If absent +or stale (> STALENESS_S seconds old), endpoints return 503 with a +hint so the rvagent tool emits a graceful warn shape. + +Bearer-token auth is intentionally OFF in this dev surface — the +Rust sensing-server adds it via the #443 middleware; that path is +out of scope for the demo bridge. +""" +from __future__ import annotations +import json +import os +import re +import sys +import time +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse, parse_qs + +FEATURE_FILE = os.environ.get("RUVIEW_FEATURE_JSON", + "/tmp/ruview-last-feature.json") +STALENESS_S = 10.0 +DEFAULT_PORT = int(os.environ.get("PORT", "3000")) + + +def _load_feature() -> dict | None: + try: + with open(FEATURE_FILE, "r") as fh: + d = json.load(fh) + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + if not isinstance(d, dict): + return None + age = time.time() - float(d.get("ts", 0)) + if age > STALENESS_S: + return None + return d + + +def vitals_for(node_id: str) -> dict | None: + f = _load_feature() + if f is None or f.get("node_id") != node_id: + return None + return { + "node_id": f["node_id"], + "timestamp_ms": int(f.get("timestamp_ms", + int(time.time() * 1000))), + "presence": bool(f.get("presence", False)), + "n_persons": int(f.get("n_persons", 0)), + "confidence": float(f.get("confidence", 0.0)), + "breathing_rate_bpm": f.get("breathing_rate_bpm"), + "heartrate_bpm": f.get("heartrate_bpm"), + "motion": float(f.get("motion", 0.0)), + } + + +def bfld_scan_for(node_id: str) -> dict | None: + f = _load_feature() + if f is None or f.get("node_id") != node_id: + return None + # ADR-125 §2.1.d: identity_risk_score never crosses the HAP + # boundary. We mirror that here — even though rvagent's schema + # has a nullable identity_risk_score slot, we deliberately + # always return None for it on this bridge. + return { + "node_id": f["node_id"], + "identity_risk_score": None, # ADR-125 §2.1.d invariant + "privacy_class": int(f.get("privacy_class", 2)), + "person_count": int(f.get("n_persons", 0)), + "confidence": float(f.get("confidence", 0.0)), + "presence": bool(f.get("presence", False)), + # timestamp_ns matches BFLD wire format (BfldEvent.timestamp_ns) + "timestamp_ns": int(f.get("ts", time.time()) * 1_000_000_000), + } + + +_PATH_VITALS = re.compile(r"^/api/v1/vitals/([^/]+)/latest$") +_PATH_BFLD_SCAN = re.compile(r"^/api/v1/bfld/([^/]+)/last_scan$") +_PATH_BFLD_SUBSCRIBE = re.compile(r"^/api/v1/bfld/([^/]+)/subscribe$") + + +class Handler(BaseHTTPRequestHandler): + + def log_message(self, fmt: str, *args) -> None: + # Quiet the default per-request log; print on a single line. + sys.stdout.write( + f"[{self.log_date_time_string()}] {self.command} " + f"{self.path} -> {args[1] if len(args) > 1 else '?'}\n" + ) + + def _json(self, code: int, body: dict) -> None: + payload = json.dumps(body).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def do_GET(self) -> None: + parsed = urlparse(self.path) + path = parsed.path + + if path == "/health": + f = _load_feature() + self._json(200, { + "ok": True, + "feature_age_s": (None if f is None + else round(time.time() - f["ts"], 2)), + "source": FEATURE_FILE, + }) + return + + if path == "/api/v1/edge/registry": + f = _load_feature() + nodes = ([{"node_id": f["node_id"], "kind": "esp32-c6", + "online": True}] if f else []) + self._json(200, {"nodes": nodes}) + return + + if path == "/api/v1/sensing/latest": + f = _load_feature() + if f is None: + self._json(503, {"error": "no recent feature_state", + "hint": "is c6-presence-watcher running?"}) + return + # ADR-102 sensing/latest schema v2 — the rvagent + # csi-latest tool ingests this shape. + self._json(200, { + "schema_version": 2, + "node_id": f["node_id"], + "timestamp_ms": f["timestamp_ms"], + "presence": f["presence"], + "n_persons": f["n_persons"], + "confidence": f["confidence"], + "motion": f["motion"], + "breathing_rate_bpm": f.get("breathing_rate_bpm"), + "heartrate_bpm": f.get("heartrate_bpm"), + "privacy_class": f.get("privacy_class", 2), + }) + return + + m = _PATH_VITALS.match(path) + if m: + node_id = m.group(1) + v = vitals_for(node_id) + if v is None: + self._json(503, {"error": f"no recent vitals for {node_id}", + "hint": "watcher running? node_id correct?"}) + return + self._json(200, v) + return + + m = _PATH_BFLD_SCAN.match(path) + if m: + node_id = m.group(1) + r = bfld_scan_for(node_id) + if r is None: + self._json(503, {"error": f"no recent BFLD scan for {node_id}", + "hint": "watcher running? node_id correct?"}) + return + self._json(200, r) + return + + self._json(404, {"error": "not found", "path": path}) + + def do_POST(self) -> None: + parsed = urlparse(self.path) + m = _PATH_BFLD_SUBSCRIBE.match(parsed.path) + if m: + qs = parse_qs(parsed.query) + duration_s = float(qs.get("duration_s", ["10"])[0]) + sub_id = f"sub-{int(time.time() * 1000)}-{m.group(1)}" + self._json(200, { + "subscription_id": sub_id, + "node_id": m.group(1), + "duration_s": duration_s, + "endpoint_hint": (f"poll GET /api/v1/bfld/{m.group(1)}" + "/last_scan every 1 s for the window"), + }) + return + self._json(404, {"error": "not found", "path": parsed.path}) + + +def main() -> int: + port = DEFAULT_PORT + server = HTTPServer(("0.0.0.0", port), Handler) + print(f"[sensing-server] listening on 0.0.0.0:{port}", flush=True) + print(f"[sensing-server] feature source: {FEATURE_FILE}", flush=True) + print(f"[sensing-server] staleness limit: {STALENESS_S} s", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + return 0 + + +if __name__ == "__main__": + sys.exit(main())