#!/usr/bin/env python3 """ presence_to_pose.py — RSSI tomography centroid → sensing-server pose injector. Polls the macos-rssi-bridge HTTP endpoint at /aps for live per-AP state, computes the multistatic tomography centroid the same way dashboard.html does (Gaussian perturbation strips along each AP↔laptop link, weighted by per-AP RSSI variance), and POSTs the result as a single PersonDetection to the sensing-server's /api/v1/pose/external endpoint. The sensing-server bypasses its heuristic skeleton when external pose is fresh (within EXTERNAL_POSE_TTL = 500ms), so the 3D Observatory renders one honest figure standing where the heatmap localizes you instead of five hallucinated skeletons. Honest about what's real: - Position: derived from real RSSI variance + Fresnel-zone geometry. - Pose: STATIC standing skeleton (no joint estimation from RSSI; that needs CSI hardware). - Count: capped at 1 (single sensor, single observed environment). """ from __future__ import annotations import argparse import json import math import time import urllib.error import urllib.request from typing import Any # Coordinate frame: observatory uses meters with the laptop near origin. # Standing keypoints in poseStanding() (figure-pool's pose-system.js) sit # at y ≈ 0..1.72; we put the figure at the room's floor level (y=0) and # let the pose-system raise the joints. Width of the playable area is # roughly ±4m. ROOM_HALF_X = 3.5 ROOM_HALF_Z = 3.5 # 17-point COCO standing-pose offsets, anchored at the figure's foot # centroid (px, 0, pz). We don't actually need to send these — the # observatory regenerates keypoints from `pose='standing'` + `position`. # We include a sane skeleton anyway so the basic /ui/index.html viewer # also renders something coherent. def standing_keypoints( px: float, pz: float, conf: float, motion: float, t: float ) -> list[dict[str, Any]]: # In meters above floor. Mirrors PoseSystem.poseStanding() in # ui/observatory/js/pose-system.js so both viewers agree. # # `motion` ∈ [0..1] modulates a subtle weight-shift sway so the figure # looks alive when there's real RSSI motion. We're explicit that this # is *not* derived pose — it's animation seeded by total motion energy # so a frozen-looking figure doesn't suggest a frozen sensor. sway_x = math.sin(t * 1.4) * 0.06 * motion sway_z = math.cos(t * 0.9) * 0.04 * motion head_turn = math.sin(t * 0.5) * 0.04 * (0.3 + motion) shoulder_dip = math.sin(t * 1.4) * 0.025 * motion knee_bend_l = max(0.0, math.sin(t * 1.4)) * 0.05 * motion knee_bend_r = max(0.0, -math.sin(t * 1.4)) * 0.05 * motion layout = [ ("nose", (sway_x + head_turn, 1.72, sway_z)), ("left_eye", (-0.03 + sway_x + head_turn, 1.74, -0.02 + sway_z)), ("right_eye", (0.03 + sway_x + head_turn, 1.74, -0.02 + sway_z)), ("left_ear", (-0.07 + sway_x, 1.72, sway_z)), ("right_ear", (0.07 + sway_x, 1.72, sway_z)), ("left_shoulder", (-0.22 + sway_x * 0.7, 1.48 - shoulder_dip, sway_z * 0.7)), ("right_shoulder", (0.22 + sway_x * 0.7, 1.48 + shoulder_dip, sway_z * 0.7)), ("left_elbow", (-0.24 + sway_x * 0.5, 1.18 - shoulder_dip, 0.02 + sway_z * 0.5)), ("right_elbow", (0.24 + sway_x * 0.5, 1.18 + shoulder_dip, 0.02 + sway_z * 0.5)), ("left_wrist", (-0.26 + sway_x * 0.3, 0.92 - shoulder_dip, 0.04)), ("right_wrist", (0.26 + sway_x * 0.3, 0.92 + shoulder_dip, 0.04)), ("left_hip", (-0.14, 1.00, 0.00)), ("right_hip", (0.14, 1.00, 0.00)), ("left_knee", (-0.15, 0.55 + knee_bend_l, 0.00)), ("right_knee", (0.15, 0.55 + knee_bend_r, 0.00)), ("left_ankle", (-0.16, 0.10, 0.00)), ("right_ankle", (0.16, 0.10, 0.00)), ] return [ {"name": name, "x": px + dx, "y": dy, "z": pz + dz, "confidence": conf} for name, (dx, dy, dz) in layout ] def stable_hash_angle(s: str) -> float: """FNV-1a → angle in [0, 2π). Same as dashboard.html so AP layouts agree.""" h = 0x811c9dc5 for c in s.encode("utf-8"): h ^= c h = (h * 0x01000193) & 0xFFFFFFFF return (h / 0x100000000) * math.tau def rssi_to_radius(rssi: float, max_r: float) -> float: """Linear in dB. -30 dBm → close, -100 dBm → max_r away.""" t = max(0.0, min(1.0, (-rssi - 30.0) / 70.0)) return 0.4 + t * (max_r - 0.4) # 0.4m floor so the figure isn't on top of the laptop def compute_centroid(aps: list[dict[str, Any]]) -> tuple[float, float, float]: """Variance-weighted centroid in (x_m, z_m) plus a 'localization weight'.""" if not aps: return 0.0, 0.0, 0.0 # 1. Place each AP in 2D using the same scheme as dashboard.html. placed = [] for ap in aps: angle = stable_hash_angle(ap["id"]) radius = rssi_to_radius(ap["rssi_dbm"], min(ROOM_HALF_X, ROOM_HALF_Z)) ax = math.cos(angle) * radius az = math.sin(angle) * radius placed.append((ax, az, ap.get("variance", 0.0))) # 2. Centroid: along each AP↔laptop line, the most informative point is # the midpoint (Fresnel zone widest there). Weight each midpoint by # variance. sum_w = 0.0 sum_x = 0.0 sum_z = 0.0 for ax, az, var in placed: if var <= 0.01: continue # Midpoint between (0,0) (laptop) and (ax, az) (AP). mx, mz = ax * 0.5, az * 0.5 sum_w += var sum_x += var * mx sum_z += var * mz if sum_w < 1e-6: # No motion — figure parks at the laptop. return 0.0, 0.0, 0.0 return sum_x / sum_w, sum_z / sum_w, sum_w def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--bridge-url", default="http://127.0.0.1:9090/aps", help="macos-rssi-bridge /aps endpoint") ap.add_argument("--server-url", default="http://127.0.0.1:8080/api/v1/pose/external", help="sensing-server pose ingestion endpoint") ap.add_argument("--rate-hz", type=float, default=10.0, help="how often to POST a pose (Hz)") ap.add_argument("--smooth", type=float, default=0.6, help="EMA factor for centroid smoothing (0..1, higher = snappier)") ap.add_argument("--amplify", type=float, default=4.0, help="multiplier on the centroid coordinates so small RSSI shifts produce visible figure motion in the room") ap.add_argument("--verbose", action="store_true") args = ap.parse_args() period = 1.0 / args.rate_hz sx, sz = 0.0, 0.0 last_log = 0.0 n_posted = 0 n_fail = 0 print(f"[presence] {args.bridge_url} → {args.server_url} @ {args.rate_hz:.1f} Hz", flush=True) while True: loop_start = time.time() try: with urllib.request.urlopen(args.bridge_url, timeout=1.0) as r: snap = json.loads(r.read()) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as e: n_fail += 1 if args.verbose: print(f"[presence] bridge fetch failed: {e}", flush=True) time.sleep(period) continue aps = snap.get("aps", []) cx, cz, weight = compute_centroid(aps) # Amplify the centroid so small RSSI shifts (a few cm of raw # variance-weighted midpoint motion) produce visible motion in # the room view. Without amplification the centroid only spans # ~0.5m which is invisible at 7m room scale. cx *= args.amplify cz *= args.amplify # Clamp to room half-extents so the figure never walks through walls. cx = max(-ROOM_HALF_X + 0.5, min(ROOM_HALF_X - 0.5, cx)) cz = max(-ROOM_HALF_Z + 0.5, min(ROOM_HALF_Z - 0.5, cz)) # EMA smoothing — sensing-server runs at 10 Hz tick, so we want # the figure to glide rather than jitter on every micro-fluctuation. sx = sx * (1 - args.smooth) + cx * args.smooth sz = sz * (1 - args.smooth) + cz * args.smooth # Confidence ~ how much variance is present. Caps at ~1.0 once # there's a few dB² of cumulative motion energy across APs. conf = max(0.3, min(1.0, 0.3 + weight / 10.0)) # Normalize motion intensity from cumulative variance. motion = max(0.0, min(1.0, weight / 5.0)) t = time.time() person = { "id": 1, "confidence": conf, "keypoints": standing_keypoints(sx, sz, conf, motion, t), "bbox": {"x": sx - 0.4, "y": 0.0, "width": 0.8, "height": 1.85}, "zone": "rssi_localized", } body = json.dumps([person]).encode("utf-8") try: req = urllib.request.Request( args.server_url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=1.0) as r: _ = r.read() n_posted += 1 except urllib.error.URLError as e: n_fail += 1 if args.verbose: print(f"[presence] post failed: {e}", flush=True) now = time.time() if args.verbose and now - last_log > 1.0: print( f"[presence] aps={len(aps):>2} centroid=({sx:+.2f}, {sz:+.2f})m " f"weight={weight:>5.2f} conf={conf:.2f} posted={n_posted} fail={n_fail}", flush=True, ) last_log = now elapsed = time.time() - loop_start if elapsed < period: time.sleep(period - elapsed) if __name__ == "__main__": try: raise SystemExit(main()) except KeyboardInterrupt: print("\n[presence] stopped")