diff --git a/.github/workflows/firmware-qemu.yml b/.github/workflows/firmware-qemu.yml index dd52eda1..55b71215 100644 --- a/.github/workflows/firmware-qemu.yml +++ b/.github/workflows/firmware-qemu.yml @@ -7,6 +7,9 @@ on: - 'scripts/qemu-esp32s3-test.sh' - 'scripts/validate_qemu_output.py' - 'scripts/generate_nvs_matrix.py' + - 'scripts/qemu_swarm.py' + - 'scripts/swarm_health.py' + - 'scripts/swarm_presets/**' - '.github/workflows/firmware-qemu.yml' pull_request: paths: @@ -14,6 +17,9 @@ on: - 'scripts/qemu-esp32s3-test.sh' - 'scripts/validate_qemu_output.py' - 'scripts/generate_nvs_matrix.py' + - 'scripts/qemu_swarm.py' + - 'scripts/swarm_health.py' + - 'scripts/swarm_presets/**' - '.github/workflows/firmware-qemu.yml' env: @@ -284,3 +290,60 @@ jobs: fi echo " OK: $(basename $f) ($SIZE bytes)" done + + # --------------------------------------------------------------------------- + # ADR-062: QEMU Swarm Configurator Test + # + # Runs a lightweight 3-node swarm (ci_matrix preset) under QEMU to validate + # multi-node orchestration, TDM slot coordination, and swarm-level health + # assertions. Uses the pre-built QEMU binary from the build-qemu job and the + # firmware built by qemu-test. + # + # The CI runner is non-root, so TAP bridge networking is unavailable. + # The orchestrator (qemu_swarm.py) detects this and falls back to SLIRP + # user-mode networking, which is sufficient for the ci_matrix preset. + # --------------------------------------------------------------------------- + swarm-test: + name: Swarm Test (ADR-062) + needs: [build-qemu, qemu-test] + runs-on: ubuntu-latest + container: + image: espressif/idf:v5.4 + + steps: + - uses: actions/checkout@v4 + + - name: Download QEMU artifact + uses: actions/download-artifact@v4 + with: + name: qemu-esp32 + path: ${{ github.workspace }}/qemu-build + + - name: Make QEMU executable + run: chmod +x ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa + + - name: Download firmware build artifacts + uses: actions/download-artifact@v4 + with: + name: qemu-logs-default + path: ${{ github.workspace }}/firmware-artifacts + + - name: Install Python dependencies + run: pip install pyyaml esptool esp-idf-nvs-partition-gen + + - name: Run swarm smoke test + run: | + python3 scripts/qemu_swarm.py --preset ci_matrix \ + --qemu-path ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa \ + --skip-build \ + --output-dir build/swarm-results + timeout-minutes: 5 + + - name: Upload swarm results + if: always() + uses: actions/upload-artifact@v4 + with: + name: swarm-results + path: | + build/swarm-results/ + retention-days: 14 diff --git a/docs/adr/ADR-062-qemu-swarm-configurator.md b/docs/adr/ADR-062-qemu-swarm-configurator.md new file mode 100644 index 00000000..85ea37bf --- /dev/null +++ b/docs/adr/ADR-062-qemu-swarm-configurator.md @@ -0,0 +1,199 @@ +# ADR-062: QEMU ESP32-S3 Swarm Configurator + +| Field | Value | +|-------------|------------------------------------------------| +| **Status** | Accepted | +| **Date** | 2026-03-14 | +| **Authors** | RuView Team | +| **Relates** | ADR-061 (QEMU testing platform), ADR-060 (channel/MAC filter), ADR-018 (binary frame), ADR-039 (edge intel) | + +## Glossary + +| Term | Definition | +|------|-----------| +| Swarm | A group of N QEMU ESP32-S3 instances running simultaneously | +| Topology | How nodes are connected: star, mesh, line, ring | +| Role | Node function: `sensor` (collects CSI), `coordinator` (aggregates + forwards), `gateway` (bridges to host) | +| Scenario matrix | Cross-product of topology × node count × NVS config × mock scenario | +| Health oracle | Python process that monitors all node UART logs and declares swarm health | + +## Context + +ADR-061 Layer 3 provides a basic multi-node mesh test: N identical nodes with sequential TDM slots connected via a Linux bridge. This is useful but limited: + +1. **All nodes are identical** — real deployments have heterogeneous roles (sensor, coordinator, gateway) +2. **Single topology** — only fully-connected bridge; no star, line, or ring topologies +3. **No scenario variation per node** — all nodes run the same mock CSI scenario +4. **Manual configuration** — each test requires hand-editing env vars and arguments +5. **No swarm-level health monitoring** — validation checks individual nodes, not collective behavior +6. **No cross-node timing validation** — TDM slot ordering and inter-frame gaps aren't verified + +Real WiFi-DensePose deployments use 3-8 ESP32-S3 nodes in various topologies. A single coordinator aggregates CSI from multiple sensors. The firmware must handle TDM conflicts, missing nodes, role-based behavior differences, and network partitions — none of which ADR-061 Layer 3 tests. + +## Decision + +Build a **QEMU Swarm Configurator** — a YAML-driven tool that defines multi-node test scenarios declaratively and orchestrates them under QEMU with swarm-level validation. + +### Architecture + +``` +┌─────────────────────────────────────────────────────┐ +│ swarm_config.yaml │ +│ nodes: [{role: sensor, scenario: 2, channel: 6}] │ +│ topology: star │ +│ duration: 60s │ +│ assertions: [all_nodes_boot, tdm_no_collision, ...] │ +└──────────────────────┬──────────────────────────────┘ + │ + ┌────────────▼────────────┐ + │ qemu_swarm.py │ + │ (orchestrator) │ + └───┬────┬────┬───┬──────┘ + │ │ │ │ + ┌────▼┐ ┌▼──┐ ▼ ┌▼────┐ + │Node0│ │N1 │... │N(n-1)│ QEMU instances + │sens │ │sen│ │coord │ + └──┬──┘ └─┬─┘ └──┬───┘ + │ │ │ + ┌──▼──────▼─────────▼──┐ + │ Virtual Network │ TAP bridge / SLIRP + │ (topology-shaped) │ + └──────────┬───────────┘ + │ + ┌──────────▼───────────┐ + │ Aggregator (Rust) │ Collects frames + └──────────┬───────────┘ + │ + ┌──────────▼───────────┐ + │ Health Oracle │ Swarm-level assertions + │ (swarm_health.py) │ + └──────────────────────┘ +``` + +### YAML Configuration Schema + +```yaml +# swarm_config.yaml +swarm: + name: "3-sensor-star" + duration_s: 60 + topology: star # star | mesh | line | ring + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 # empty room (baseline) + channel: 6 + edge_tier: 2 + is_gateway: true # receives aggregated frames + + - role: sensor + node_id: 1 + scenario: 2 # walking person + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 3 # fall event + channel: 6 + tdm_slot: 2 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - coordinator_receives_from_all + - fall_detected_by_node_2 + - frame_rate_above: 15 # Hz minimum per node + - max_boot_time_s: 10 +``` + +### Topologies + +| Topology | Network | Description | +|----------|---------|-------------| +| `star` | All sensors connect to coordinator; coordinator has TAP to each sensor | Hub-and-spoke, most common | +| `mesh` | All nodes on same bridge (existing Layer 3 behavior) | Every node sees every other | +| `line` | Node 0 ↔ Node 1 ↔ Node 2 ↔ ... | Linear chain, tests multi-hop | +| `ring` | Like line but last connects to first | Circular, tests routing | + +### Node Roles + +| Role | Behavior | NVS Keys | +|------|----------|----------| +| `sensor` | Runs mock CSI, sends frames to coordinator | `node_id`, `tdm_slot`, `target_ip` | +| `coordinator` | Receives frames from sensors, runs edge aggregation | `node_id`, `tdm_slot=0`, `edge_tier=2` | +| `gateway` | Like coordinator but also bridges to host UDP | `node_id`, `target_ip=host`, `is_gateway=1` | + +### Assertions (Swarm-Level) + +| Assertion | What It Checks | +|-----------|---------------| +| `all_nodes_boot` | Every node's UART log shows boot indicators within timeout | +| `no_crashes` | No Guru Meditation, assert, panic in any log | +| `tdm_no_collision` | No two nodes transmit in the same TDM slot | +| `all_nodes_produce_frames` | Every sensor node's log contains CSI frame output | +| `coordinator_receives_from_all` | Coordinator log shows frames from each sensor's node_id | +| `fall_detected_by_node_N` | Node N's log reports a fall detection event | +| `frame_rate_above` | Each node produces at least N frames/second | +| `max_boot_time_s` | All nodes boot within N seconds | +| `no_heap_errors` | No OOM or heap corruption in any log | +| `network_partitioned_recovery` | After deliberate partition, nodes resume communication | + +### Preset Configurations + +| Preset | Nodes | Topology | Purpose | +|--------|-------|----------|---------| +| `smoke` | 2 | star | Quick CI smoke test (15s) | +| `standard` | 3 | star | Default 3-node (sensor + sensor + coordinator) | +| `large-mesh` | 6 | mesh | Scale test with 6 fully-connected nodes | +| `line-relay` | 4 | line | Multi-hop relay chain | +| `ring-fault` | 4 | ring | Ring with fault injection mid-test | +| `heterogeneous` | 5 | star | Mixed scenarios: walk, fall, static, channel-sweep, empty | +| `ci-matrix` | 3 | star | CI-optimized preset (30s, minimal assertions) | + +## File Layout + +``` +scripts/ +├── qemu_swarm.py # Main orchestrator (CLI entry point) +├── swarm_health.py # Swarm-level health oracle +└── swarm_presets/ + ├── smoke.yaml + ├── standard.yaml + ├── large_mesh.yaml + ├── line_relay.yaml + ├── ring_fault.yaml + ├── heterogeneous.yaml + └── ci_matrix.yaml + +.github/workflows/ +└── firmware-qemu.yml # MODIFIED: add swarm test job +``` + +## Consequences + +### Benefits + +1. **Declarative testing** — define swarm topology in YAML, not shell scripts +2. **Role-based nodes** — test coordinator/sensor/gateway interactions +3. **Topology variety** — star/mesh/line/ring match real deployment patterns +4. **Swarm-level assertions** — validate collective behavior, not just individual nodes +5. **Preset library** — quick CI smoke tests and thorough manual validation +6. **Reproducible** — YAML configs are version-controlled and shareable + +### Limitations + +1. **Still requires root** for TAP bridge topologies (star, line, ring); mesh can use SLIRP +2. **QEMU resource usage** — 6+ QEMU instances use ~2GB RAM, may slow CI runners +3. **No real RF** — inter-node communication is IP-based, not WiFi CSI multipath + +## References + +- ADR-061: QEMU ESP32-S3 firmware testing platform (Layers 1-9) +- ADR-060: Channel override and MAC address filter provisioning +- ADR-018: Binary CSI frame format (magic `0xC5110001`) +- ADR-039: Edge intelligence pipeline (biquad, vitals, fall detection) diff --git a/scripts/qemu_swarm.py b/scripts/qemu_swarm.py new file mode 100644 index 00000000..352f5716 --- /dev/null +++ b/scripts/qemu_swarm.py @@ -0,0 +1,1097 @@ +#!/usr/bin/env python3 +""" +QEMU ESP32-S3 Swarm Configurator (ADR-062) + +Orchestrates multiple QEMU ESP32-S3 instances from a YAML configuration. +Supports star/mesh/line/ring topologies, role-based nodes (sensor/coordinator/ +gateway), per-node NVS provisioning, and swarm-level health assertions. + +Usage: + python3 qemu_swarm.py --config swarm_presets/standard.yaml + python3 qemu_swarm.py --preset smoke + python3 qemu_swarm.py --preset standard --timeout 90 + python3 qemu_swarm.py --list-presets + python3 qemu_swarm.py --config custom.yaml --dry-run +""" + +import argparse +import atexit +import json +import os +import platform +import re +import shutil +import signal +import subprocess +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Optional YAML import with helpful error +# --------------------------------------------------------------------------- +try: + import yaml +except ImportError: + print("ERROR: PyYAML is required but not installed.") + print(" Install: pip install pyyaml") + print(" Or: pip3 install pyyaml") + sys.exit(3) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- +SCRIPT_DIR = Path(__file__).resolve().parent +PROJECT_ROOT = SCRIPT_DIR.parent +FIRMWARE_DIR = PROJECT_ROOT / "firmware" / "esp32-csi-node" +RUST_DIR = PROJECT_ROOT / "rust-port" / "wifi-densepose-rs" +PROVISION_SCRIPT = FIRMWARE_DIR / "provision.py" +PRESETS_DIR = SCRIPT_DIR / "swarm_presets" + +VALID_TOPOLOGIES = ("star", "mesh", "line", "ring") +VALID_ROLES = ("sensor", "coordinator", "gateway") +EXIT_PASS = 0 +EXIT_WARN = 1 +EXIT_FAIL = 2 +EXIT_FATAL = 3 + +NVS_OFFSET = 0x9000 # NVS partition offset in flash image + +IS_LINUX = platform.system() == "Linux" + +# --------------------------------------------------------------------------- +# Logging helpers +# --------------------------------------------------------------------------- +USE_COLOR = sys.stdout.isatty() + + +def _c(text: str, code: str) -> str: + return f"\033[{code}m{text}\033[0m" if USE_COLOR else text + + +def info(msg: str) -> None: + print(f"[INFO] {msg}") + + +def warn(msg: str) -> None: + print(f"[{_c('WARN', '33')}] {msg}") + + +def error(msg: str) -> None: + print(f"[{_c('ERROR', '1;31')}] {msg}", file=sys.stderr) + + +def fatal(msg: str) -> None: + print(f"[{_c('FATAL', '1;31')}] {msg}", file=sys.stderr) + + +# --------------------------------------------------------------------------- +# Schema validation +# --------------------------------------------------------------------------- +@dataclass +class NodeConfig: + role: str + node_id: int + scenario: int = 0 + channel: int = 6 + tdm_slot: Optional[int] = None + edge_tier: int = 0 + is_gateway: bool = False + filter_mac: Optional[str] = None + + +@dataclass +class SwarmConfig: + name: str + duration_s: int + topology: str + aggregator_port: int + nodes: List[NodeConfig] + assertions: List[Any] + + def coordinator_nodes(self) -> List[NodeConfig]: + return [n for n in self.nodes if n.role in ("coordinator", "gateway")] + + def sensor_nodes(self) -> List[NodeConfig]: + return [n for n in self.nodes if n.role == "sensor"] + + +def validate_config(raw: dict) -> SwarmConfig: + """Parse and validate YAML config into a SwarmConfig.""" + errors: List[str] = [] + + swarm = raw.get("swarm", {}) + name = swarm.get("name", "unnamed-swarm") + duration_s = int(swarm.get("duration_s", 60)) + topology = swarm.get("topology", "mesh") + aggregator_port = int(swarm.get("aggregator_port", 5005)) + + if topology not in VALID_TOPOLOGIES: + errors.append(f"Invalid topology '{topology}'; must be one of {VALID_TOPOLOGIES}") + + if duration_s < 5: + errors.append(f"duration_s={duration_s} too short; minimum is 5") + + raw_nodes = raw.get("nodes", []) + if not raw_nodes: + errors.append("No nodes defined") + + nodes: List[NodeConfig] = [] + seen_ids: set = set() + for idx, rn in enumerate(raw_nodes): + if not isinstance(rn, dict): + errors.append(f"nodes[{idx}]: expected dict, got {type(rn).__name__}") + continue + + role = rn.get("role", "sensor") + if role not in VALID_ROLES: + errors.append(f"nodes[{idx}]: invalid role '{role}'; must be one of {VALID_ROLES}") + + node_id = rn.get("node_id", idx) + if node_id in seen_ids: + errors.append(f"nodes[{idx}]: duplicate node_id={node_id}") + seen_ids.add(node_id) + + nodes.append(NodeConfig( + role=role, + node_id=int(node_id), + scenario=int(rn.get("scenario", 0)), + channel=int(rn.get("channel", 6)), + tdm_slot=rn.get("tdm_slot"), + edge_tier=int(rn.get("edge_tier", 0)), + is_gateway=bool(rn.get("is_gateway", False)), + filter_mac=rn.get("filter_mac"), + )) + + # Auto-assign TDM slots if not set + for i, n in enumerate(nodes): + if n.tdm_slot is None: + n.tdm_slot = i + + assertions = raw.get("assertions", []) + + if errors: + for e in errors: + error(e) + fatal(f"{len(errors)} config validation error(s)") + sys.exit(EXIT_FATAL) + + return SwarmConfig( + name=name, + duration_s=duration_s, + topology=topology, + aggregator_port=aggregator_port, + nodes=nodes, + assertions=assertions, + ) + + +# --------------------------------------------------------------------------- +# Preset loading +# --------------------------------------------------------------------------- +def list_presets() -> List[Tuple[str, str]]: + """Return list of (name, description) for available presets.""" + presets = [] + if not PRESETS_DIR.is_dir(): + return presets + for f in sorted(PRESETS_DIR.glob("*.yaml")): + name = f.stem + # Read first comment line as description + desc = "" + try: + text = f.read_text(encoding="utf-8") + for line in text.splitlines(): + if line.startswith("#"): + desc = line.lstrip("#").strip() + break + except OSError: + pass + presets.append((name, desc)) + return presets + + +def load_preset(name: str) -> dict: + """Load a preset YAML file by name.""" + path = PRESETS_DIR / f"{name}.yaml" + if not path.exists(): + # Try with underscores/hyphens swapped + alt = PRESETS_DIR / f"{name.replace('-', '_')}.yaml" + if alt.exists(): + path = alt + else: + fatal(f"Preset '{name}' not found at {path}") + available = list_presets() + if available: + print("Available presets:") + for pname, pdesc in available: + print(f" {pname:20s} {pdesc}") + sys.exit(EXIT_FATAL) + return yaml.safe_load(path.read_text(encoding="utf-8")) + + +# --------------------------------------------------------------------------- +# Node provisioning +# --------------------------------------------------------------------------- +def provision_node( + node: NodeConfig, + build_dir: Path, + n_total: int, + aggregator_ip: str, + aggregator_port: int, +) -> Path: + """Generate NVS binary and per-node flash image. Returns flash image path.""" + + nvs_bin = build_dir / f"nvs_node{node.node_id}.bin" + flash_image = build_dir / f"qemu_flash_node{node.node_id}.bin" + base_image = build_dir / "qemu_flash_base.bin" + + if not base_image.exists(): + fatal(f"Base flash image not found: {base_image}") + fatal("Build the firmware first, or run without --skip-build.") + sys.exit(EXIT_FATAL) + + # Build provision.py arguments + args = [ + sys.executable, str(PROVISION_SCRIPT), + "--port", "/dev/null", + "--dry-run", + "--node-id", str(node.node_id), + "--tdm-slot", str(node.tdm_slot), + "--tdm-total", str(n_total), + "--target-ip", aggregator_ip, + "--target-port", str(aggregator_port), + ] + + if node.channel: + args.extend(["--channel", str(node.channel)]) + + if node.edge_tier: + args.extend(["--edge-tier", str(node.edge_tier)]) + + if node.filter_mac: + args.extend(["--filter-mac", node.filter_mac]) + + info(f" Provisioning node {node.node_id} ({node.role}, scenario={node.scenario}, " + f"tdm={node.tdm_slot}/{n_total}, ch={node.channel})") + + result = subprocess.run( + args, + capture_output=True, text=True, + cwd=str(build_dir), + timeout=30, + ) + + if result.returncode != 0: + error(f" provision.py failed for node {node.node_id}:") + error(f" stdout: {result.stdout.strip()}") + error(f" stderr: {result.stderr.strip()}") + sys.exit(EXIT_FATAL) + + # provision.py --dry-run writes nvs_provision.bin in cwd + nvs_src = build_dir / "nvs_provision.bin" + if not nvs_src.exists(): + fatal(f" provision.py did not produce nvs_provision.bin for node {node.node_id}") + sys.exit(EXIT_FATAL) + + nvs_src.rename(nvs_bin) + + # Copy base image and inject NVS at 0x9000 + shutil.copy2(str(base_image), str(flash_image)) + + with open(flash_image, "r+b") as f: + f.seek(NVS_OFFSET) + f.write(nvs_bin.read_bytes()) + + return flash_image + + +# --------------------------------------------------------------------------- +# Network topology setup (Linux TAP/bridge) +# --------------------------------------------------------------------------- +@dataclass +class NetworkState: + """Tracks created bridges and TAPs for cleanup.""" + bridges: List[str] = field(default_factory=list) + taps: List[str] = field(default_factory=list) + use_slirp: bool = False + + +def _run_ip(args: List[str], check: bool = False) -> subprocess.CompletedProcess: + return subprocess.run(["ip"] + args, capture_output=True, text=True, check=check) + + +def setup_network(cfg: SwarmConfig, net: NetworkState) -> Dict[int, List[str]]: + """ + Create network topology. Returns dict mapping node_id -> QEMU network args. + + Falls back to SLIRP user-mode networking if not root or not Linux. + """ + node_net_args: Dict[int, List[str]] = {} + n = len(cfg.nodes) + + # Check if we can use TAP/bridge (requires root on Linux) + can_tap = IS_LINUX and os.geteuid() == 0 + + if not can_tap: + if IS_LINUX: + warn("Not running as root; falling back to SLIRP user-mode networking.") + warn("Nodes can reach the aggregator but cannot see each other.") + else: + info("Non-Linux platform; using SLIRP user-mode networking.") + + net.use_slirp = True + for node in cfg.nodes: + node_net_args[node.node_id] = [ + "-nic", f"user,id=net{node.node_id}," + f"hostfwd=udp::{cfg.aggregator_port + node.node_id}" + f"-:{cfg.aggregator_port}", + ] + return node_net_args + + # --- TAP/bridge topology --- + info(f"Setting up {cfg.topology} topology with TAP/bridge...") + + if cfg.topology == "mesh": + # Single bridge, all nodes attached + br = "qemu-sw0" + _run_ip(["link", "add", "name", br, "type", "bridge"]) + _run_ip(["addr", "add", "10.0.0.1/24", "dev", br]) + _run_ip(["link", "set", br, "up"]) + net.bridges.append(br) + + for node in cfg.nodes: + tap = f"tap{node.node_id}" + mac = f"52:54:00:00:00:{node.node_id:02x}" + _run_ip(["tuntap", "add", "dev", tap, "mode", "tap"]) + _run_ip(["link", "set", tap, "master", br]) + _run_ip(["link", "set", tap, "up"]) + net.taps.append(tap) + + node_net_args[node.node_id] = [ + "-nic", f"tap,ifname={tap},script=no,downscript=no,mac={mac}", + ] + + elif cfg.topology == "star": + # One bridge per sensor; coordinator has a TAP on each bridge + coord_ids = {n.node_id for n in cfg.coordinator_nodes()} + for idx, sensor in enumerate(cfg.sensor_nodes()): + br = f"qemu-br{idx}" + _run_ip(["link", "add", "name", br, "type", "bridge"]) + _run_ip(["addr", "add", f"10.0.{idx + 1}.1/24", "dev", br]) + _run_ip(["link", "set", br, "up"]) + net.bridges.append(br) + + # Sensor TAP + s_tap = f"tap-s{sensor.node_id}" + s_mac = f"52:54:00:01:{idx:02x}:{sensor.node_id:02x}" + _run_ip(["tuntap", "add", "dev", s_tap, "mode", "tap"]) + _run_ip(["link", "set", s_tap, "master", br]) + _run_ip(["link", "set", s_tap, "up"]) + net.taps.append(s_tap) + node_net_args.setdefault(sensor.node_id, []).extend([ + "-nic", f"tap,ifname={s_tap},script=no,downscript=no,mac={s_mac}", + ]) + + # Coordinator TAP on this bridge + for cnode in cfg.coordinator_nodes(): + c_tap = f"tap-c{cnode.node_id}-b{idx}" + c_mac = f"52:54:00:02:{idx:02x}:{cnode.node_id:02x}" + _run_ip(["tuntap", "add", "dev", c_tap, "mode", "tap"]) + _run_ip(["link", "set", c_tap, "master", br]) + _run_ip(["link", "set", c_tap, "up"]) + net.taps.append(c_tap) + node_net_args.setdefault(cnode.node_id, []).extend([ + "-nic", f"tap,ifname={c_tap},script=no,downscript=no,mac={c_mac}", + ]) + + elif cfg.topology in ("line", "ring"): + # Chain of bridges: br_i connects node_i <-> node_(i+1) + pairs = list(range(n - 1)) + if cfg.topology == "ring" and n > 2: + pairs.append(n - 1) # extra bridge: last <-> first + + for pair_idx in range(len(pairs)): + left_idx = pairs[pair_idx] + right_idx = (pairs[pair_idx] + 1) % n + + left_node = cfg.nodes[left_idx] + right_node = cfg.nodes[right_idx] + + br = f"qemu-br{pair_idx}" + _run_ip(["link", "add", "name", br, "type", "bridge"]) + _run_ip(["addr", "add", f"10.0.{pair_idx + 1}.1/24", "dev", br]) + _run_ip(["link", "set", br, "up"]) + net.bridges.append(br) + + for side, nd in [("l", left_node), ("r", right_node)]: + tap = f"tap-{side}{nd.node_id}-b{pair_idx}" + mac = f"52:54:00:03:{pair_idx:02x}:{nd.node_id:02x}" + _run_ip(["tuntap", "add", "dev", tap, "mode", "tap"]) + _run_ip(["link", "set", tap, "master", br]) + _run_ip(["link", "set", tap, "up"]) + net.taps.append(tap) + node_net_args.setdefault(nd.node_id, []).extend([ + "-nic", f"tap,ifname={tap},script=no,downscript=no,mac={mac}", + ]) + + return node_net_args + + +def teardown_network(net: NetworkState) -> None: + """Remove all created TAP interfaces and bridges.""" + if not IS_LINUX or net.use_slirp: + return + + for tap in net.taps: + _run_ip(["link", "set", tap, "down"]) + _run_ip(["link", "delete", tap]) + + for br in net.bridges: + _run_ip(["link", "set", br, "down"]) + _run_ip(["link", "delete", br, "type", "bridge"]) + + +# --------------------------------------------------------------------------- +# QEMU instance launch +# --------------------------------------------------------------------------- +def launch_node( + node: NodeConfig, + flash_image: Path, + log_file: Path, + net_args: List[str], + qemu_bin: str, +) -> subprocess.Popen: + """Launch a single QEMU ESP32-S3 instance. Returns the Popen handle.""" + args = [ + qemu_bin, + "-machine", "esp32s3", + "-nographic", + "-drive", f"file={flash_image},if=mtd,format=raw", + "-serial", f"file:{log_file}", + "-no-reboot", + ] + args.extend(net_args) + + return subprocess.Popen( + args, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + +# --------------------------------------------------------------------------- +# Aggregator +# --------------------------------------------------------------------------- +def start_aggregator( + port: int, n_nodes: int, output_file: Path, log_file: Path +) -> Optional[subprocess.Popen]: + """Start the Rust aggregator binary. Returns Popen or None on failure.""" + cargo_toml = RUST_DIR / "Cargo.toml" + if not cargo_toml.exists(): + warn(f"Rust workspace not found at {RUST_DIR}; skipping aggregator.") + return None + + args = [ + "cargo", "run", + "--manifest-path", str(cargo_toml), + "-p", "wifi-densepose-hardware", + "--bin", "aggregator", "--", + "--listen", f"0.0.0.0:{port}", + "--expect-nodes", str(n_nodes), + "--output", str(output_file), + ] + + with open(log_file, "w") as lf: + proc = subprocess.Popen(args, stdout=lf, stderr=subprocess.STDOUT) + + # Give it a moment to bind + time.sleep(1) + if proc.poll() is not None: + error(f"Aggregator failed to start. Check {log_file}") + return None + + return proc + + +# --------------------------------------------------------------------------- +# Swarm-level health assertions +# --------------------------------------------------------------------------- +def run_assertions( + cfg: SwarmConfig, + build_dir: Path, + results_file: Path, +) -> int: + """ + Run swarm-level assertions via validate_mesh_test.py (for basic checks) + and inline checks for swarm-specific assertions. + + Returns exit code: 0=PASS, 1=WARN, 2=FAIL, 3=FATAL. + """ + n_nodes = len(cfg.nodes) + worst = EXIT_PASS + + # Collect node logs + logs: Dict[int, str] = {} + for node in cfg.nodes: + log_path = build_dir / f"qemu_node{node.node_id}.log" + if log_path.exists(): + logs[node.node_id] = log_path.read_text(encoding="utf-8", errors="replace") + else: + logs[node.node_id] = "" + + def _check(name: str, passed: bool, msg_pass: str, msg_fail: str, level: int = EXIT_FAIL): + nonlocal worst + if passed: + print(f" [{_c('PASS', '32')}] {name}: {msg_pass}") + else: + sev_str = {EXIT_WARN: "WARN", EXIT_FAIL: "FAIL", EXIT_FATAL: "FATAL"}.get(level, "FAIL") + col = "33" if level == EXIT_WARN else "1;31" + print(f" [{_c(sev_str, col)}] {name}: {msg_fail}") + worst = max(worst, level) + + print() + print("=" * 60) + print(f" Swarm Validation: {cfg.name}") + print("=" * 60) + print() + + for assertion in cfg.assertions: + # Handle parameterized assertions like {frame_rate_above: 15} + if isinstance(assertion, dict): + assert_name = list(assertion.keys())[0] + assert_param = assertion[assert_name] + else: + assert_name = str(assertion) + assert_param = None + + if assert_name == "all_nodes_boot": + booted = [ + nid for nid, log in logs.items() + if any(kw in log for kw in ["app_main", "main_task", "ESP32-S3 CSI Node"]) + ] + _check("all_nodes_boot", + len(booted) == n_nodes, + f"All {n_nodes} nodes booted", + f"Only {len(booted)}/{n_nodes} booted", + EXIT_FATAL if len(booted) == 0 else EXIT_FAIL) + + elif assert_name == "no_crashes": + crash_pats = ["Guru Meditation", "assert failed", "abort()", + "panic", "LoadProhibited", "StoreProhibited"] + crashed = [ + nid for nid, log in logs.items() + if any(pat in log for pat in crash_pats) + ] + _check("no_crashes", + len(crashed) == 0, + "No crashes detected", + f"Crashes in nodes: {crashed}", + EXIT_FATAL) + + elif assert_name == "tdm_no_collision": + slots: Dict[int, List[int]] = {} + for nid, log in logs.items(): + m = re.search(r"TDM slot[=: ]+(\d+)", log, re.IGNORECASE) + if m: + slot = int(m.group(1)) + slots.setdefault(slot, []).append(nid) + collisions = {s: ns for s, ns in slots.items() if len(ns) > 1} + _check("tdm_no_collision", + len(collisions) == 0, + "No TDM slot collisions", + f"Collisions: {collisions}", + EXIT_FAIL) + + elif assert_name == "all_nodes_produce_frames": + producing = [] + for nid, log in logs.items(): + node_cfg = next((n for n in cfg.nodes if n.node_id == nid), None) + if node_cfg and node_cfg.role == "sensor": + if re.search(r"frame|CSI|emitted", log, re.IGNORECASE): + producing.append(nid) + sensors = cfg.sensor_nodes() + _check("all_nodes_produce_frames", + len(producing) == len(sensors), + f"All {len(sensors)} sensors producing frames", + f"Only {len(producing)}/{len(sensors)} sensors producing", + EXIT_FAIL) + + elif assert_name == "coordinator_receives_from_all": + coord_logs = [ + logs.get(n.node_id, "") for n in cfg.coordinator_nodes() + ] + all_coord_text = "\n".join(coord_logs) + received_from = set() + for sensor in cfg.sensor_nodes(): + # Look for the sensor's node_id mentioned in coordinator logs + if re.search(rf"node[_ ]?id[=: ]+{sensor.node_id}\b", all_coord_text, re.IGNORECASE): + received_from.add(sensor.node_id) + sensor_ids = {s.node_id for s in cfg.sensor_nodes()} + _check("coordinator_receives_from_all", + received_from == sensor_ids, + f"Coordinator received from all {len(sensor_ids)} sensors", + f"Missing: {sensor_ids - received_from}", + EXIT_FAIL) + + elif assert_name.startswith("fall_detected_by_node_"): + target_id = int(assert_name.split("_")[-1]) + log_text = logs.get(target_id, "") + found = bool(re.search(r"fall[_ ]?detect|fall[_ ]?event", log_text, re.IGNORECASE)) + _check(assert_name, + found, + f"Node {target_id} detected fall event", + f"Node {target_id} did not report fall detection", + EXIT_WARN) + + elif assert_name == "frame_rate_above": + min_rate = int(assert_param) if assert_param else 10 + all_ok = True + for nid, log in logs.items(): + m = re.search(r"frame[_ ]?rate[=: ]+([\d.]+)", log, re.IGNORECASE) + if m: + rate = float(m.group(1)) + if rate < min_rate: + all_ok = False + _check(f"frame_rate_above({min_rate})", + all_ok, + f"All nodes >= {min_rate} Hz", + f"Some nodes below {min_rate} Hz", + EXIT_WARN) + + elif assert_name == "max_boot_time_s": + max_s = int(assert_param) if assert_param else 10 + all_ok = True + for nid, log in logs.items(): + m = re.search(r"boot[_ ]?time[=: ]+([\d.]+)", log, re.IGNORECASE) + if m: + bt = float(m.group(1)) + if bt > max_s: + all_ok = False + _check(f"max_boot_time_s({max_s})", + all_ok, + f"All nodes booted within {max_s}s", + f"Some nodes exceeded {max_s}s boot time", + EXIT_WARN) + + elif assert_name == "no_heap_errors": + heap_pats = ["heap", "OOM", "out of memory", "heap corruption"] + found_in = [ + nid for nid, log in logs.items() + if any(pat.lower() in log.lower() for pat in heap_pats) + ] + _check("no_heap_errors", + len(found_in) == 0, + "No heap errors", + f"Heap errors in nodes: {found_in}", + EXIT_FAIL) + + else: + warn(f" Unknown assertion: {assert_name} (skipped)") + + print() + verdict = {EXIT_PASS: "PASS", EXIT_WARN: "WARN", EXIT_FAIL: "FAIL", EXIT_FATAL: "FATAL"} + print(f" Verdict: {_c(verdict[worst], '32' if worst == 0 else '33' if worst == 1 else '1;31')}") + print() + + return worst + + +# --------------------------------------------------------------------------- +# Orchestrator +# --------------------------------------------------------------------------- +class SwarmOrchestrator: + """Manages the lifecycle of a QEMU swarm test.""" + + def __init__( + self, + cfg: SwarmConfig, + qemu_bin: str, + output_dir: Path, + skip_build: bool, + dry_run: bool, + ): + self.cfg = cfg + self.qemu_bin = qemu_bin + self.output_dir = output_dir + self.skip_build = skip_build + self.dry_run = dry_run + + self.build_dir = FIRMWARE_DIR / "build" + self.results_file = output_dir / "swarm_results.json" + + self.qemu_procs: List[subprocess.Popen] = [] + self.agg_proc: Optional[subprocess.Popen] = None + self.net_state = NetworkState() + + # Register cleanup + atexit.register(self.cleanup) + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGINT, self._signal_handler) + + def _signal_handler(self, signum: int, frame: Any) -> None: + info(f"Received signal {signum}, shutting down...") + self.cleanup() + sys.exit(EXIT_FATAL) + + def cleanup(self) -> None: + """Kill all QEMU processes and tear down network.""" + for proc in self.qemu_procs: + if proc.poll() is None: + try: + proc.terminate() + proc.wait(timeout=5) + except (subprocess.TimeoutExpired, OSError): + try: + proc.kill() + except OSError: + pass + + if self.agg_proc and self.agg_proc.poll() is None: + try: + self.agg_proc.terminate() + self.agg_proc.wait(timeout=5) + except (subprocess.TimeoutExpired, OSError): + try: + self.agg_proc.kill() + except OSError: + pass + + teardown_network(self.net_state) + + def run(self) -> int: + """Execute the full swarm test. Returns exit code.""" + n = len(self.cfg.nodes) + info(f"Swarm: {self.cfg.name}") + info(f"Topology: {self.cfg.topology}") + info(f"Nodes: {n}") + info(f"Duration: {self.cfg.duration_s}s") + info(f"Assertions: {len(self.cfg.assertions)}") + info(f"Output: {self.output_dir}") + print() + + if self.dry_run: + return self._dry_run() + + # Ensure output dir exists + self.output_dir.mkdir(parents=True, exist_ok=True) + self.build_dir.mkdir(parents=True, exist_ok=True) + + # 1. Check prerequisites + self._check_prerequisites() + + # 2. Provision each node + info("--- Provisioning nodes ---") + flash_images: Dict[int, Path] = {} + aggregator_ip = "10.0.0.1" + for node in self.cfg.nodes: + flash_images[node.node_id] = provision_node( + node=node, + build_dir=self.build_dir, + n_total=n, + aggregator_ip=aggregator_ip, + aggregator_port=self.cfg.aggregator_port, + ) + print() + + # 3. Setup network topology + info("--- Setting up network ---") + node_net_args = setup_network(self.cfg, self.net_state) + print() + + # 4. Start aggregator if needed + if self.cfg.coordinator_nodes(): + info("--- Starting aggregator ---") + agg_log = self.output_dir / "aggregator.log" + self.agg_proc = start_aggregator( + port=self.cfg.aggregator_port, + n_nodes=n, + output_file=self.results_file, + log_file=agg_log, + ) + if self.agg_proc: + info(f" Aggregator PID: {self.agg_proc.pid}") + print() + + # 5. Launch QEMU instances + info(f"--- Launching {n} QEMU nodes ---") + for node in self.cfg.nodes: + log_file = self.output_dir / f"qemu_node{node.node_id}.log" + net_args = node_net_args.get(node.node_id, []) + + proc = launch_node( + node=node, + flash_image=flash_images[node.node_id], + log_file=log_file, + net_args=net_args, + qemu_bin=self.qemu_bin, + ) + self.qemu_procs.append(proc) + info(f" Node {node.node_id} ({node.role}): PID={proc.pid}, log={log_file}") + print() + + # 6. Wait for test duration + info(f"All nodes launched. Waiting {self.cfg.duration_s}s...") + try: + time.sleep(self.cfg.duration_s) + except KeyboardInterrupt: + warn("Interrupted by user.") + + # 7. Stop QEMU instances + info("Duration elapsed. Stopping nodes...") + for proc in self.qemu_procs: + if proc.poll() is None: + proc.terminate() + # Give aggregator time to flush + time.sleep(2) + if self.agg_proc and self.agg_proc.poll() is None: + self.agg_proc.terminate() + print() + + # 8. Copy logs to output dir (they're already there via log_file paths) + # Also copy from build_dir if assertions reference those paths + for node in self.cfg.nodes: + src = self.output_dir / f"qemu_node{node.node_id}.log" + dst = self.build_dir / f"qemu_node{node.node_id}.log" + if src.exists() and src != dst: + shutil.copy2(str(src), str(dst)) + + # 9. Run assertions + exit_code = run_assertions( + cfg=self.cfg, + build_dir=self.output_dir, + results_file=self.results_file, + ) + + # 10. Write JSON results summary + self._write_summary(exit_code) + + return exit_code + + def _dry_run(self) -> int: + """Show what would be launched without actually running anything.""" + print(_c("=== DRY RUN ===", "1;33")) + print() + print(f"Swarm: {self.cfg.name}") + print(f"Topology: {self.cfg.topology}") + print(f"Duration: {self.cfg.duration_s}s") + print(f"Aggregator port: {self.cfg.aggregator_port}") + print() + + print("Nodes:") + for node in self.cfg.nodes: + gw = " [GATEWAY]" if node.is_gateway else "" + print(f" node_id={node.node_id} role={node.role} scenario={node.scenario} " + f"channel={node.channel} tdm={node.tdm_slot}/{len(self.cfg.nodes)} " + f"edge_tier={node.edge_tier}{gw}") + print() + + print("Network:") + if self.cfg.topology == "mesh": + print(" Single bridge: all nodes on qemu-sw0") + elif self.cfg.topology == "star": + for i, s in enumerate(self.cfg.sensor_nodes()): + print(f" Bridge qemu-br{i}: sensor {s.node_id} <-> coordinator(s)") + elif self.cfg.topology in ("line", "ring"): + n = len(self.cfg.nodes) + pairs = list(range(n - 1)) + if self.cfg.topology == "ring" and n > 2: + pairs.append(n - 1) + for p in range(len(pairs)): + l = pairs[p] + r = (pairs[p] + 1) % n + print(f" Bridge qemu-br{p}: node {self.cfg.nodes[l].node_id} " + f"<-> node {self.cfg.nodes[r].node_id}") + print() + + print("QEMU command (per node):") + print(f" {self.qemu_bin} -machine esp32s3 -nographic " + f"-drive file=,if=mtd,format=raw " + f"-serial file: -no-reboot ") + print() + + print("Assertions:") + for a in self.cfg.assertions: + if isinstance(a, dict): + name = list(a.keys())[0] + param = a[name] + print(f" - {name}: {param}") + else: + print(f" - {a}") + print() + + return EXIT_PASS + + def _check_prerequisites(self) -> None: + """Verify QEMU binary and build artifacts exist.""" + # Check QEMU binary + try: + result = subprocess.run( + [self.qemu_bin, "--version"], + capture_output=True, text=True, timeout=10, + ) + if result.returncode != 0: + fatal(f"QEMU binary returned error: {self.qemu_bin}") + sys.exit(EXIT_FATAL) + except FileNotFoundError: + fatal(f"QEMU binary not found: {self.qemu_bin}") + print(" Install: sudo apt install qemu-system-misc # Debian/Ubuntu") + print(" Or set --qemu-path to the qemu-system-xtensa binary.") + sys.exit(EXIT_FATAL) + except subprocess.TimeoutExpired: + fatal(f"QEMU binary timed out: {self.qemu_bin}") + sys.exit(EXIT_FATAL) + + # Check base flash image + base = self.build_dir / "qemu_flash_base.bin" + if not base.exists(): + if self.skip_build: + fatal(f"Base flash image not found: {base}") + fatal("Build the firmware first, or run without --skip-build.") + sys.exit(EXIT_FATAL) + else: + warn("Base flash image not found; firmware build will create it.") + + # Check provision.py + if not PROVISION_SCRIPT.exists(): + fatal(f"Provisioning script not found: {PROVISION_SCRIPT}") + sys.exit(EXIT_FATAL) + + def _write_summary(self, exit_code: int) -> None: + """Write JSON summary of the swarm test run.""" + verdict_map = {EXIT_PASS: "PASS", EXIT_WARN: "WARN", + EXIT_FAIL: "FAIL", EXIT_FATAL: "FATAL"} + summary = { + "swarm": self.cfg.name, + "topology": self.cfg.topology, + "node_count": len(self.cfg.nodes), + "duration_s": self.cfg.duration_s, + "verdict": verdict_map.get(exit_code, "UNKNOWN"), + "exit_code": exit_code, + "nodes": [ + { + "node_id": n.node_id, + "role": n.role, + "scenario": n.scenario, + "channel": n.channel, + "tdm_slot": n.tdm_slot, + } + for n in self.cfg.nodes + ], + "assertions": [ + str(a) if not isinstance(a, dict) else a + for a in self.cfg.assertions + ], + } + + summary_path = self.output_dir / "swarm_summary.json" + summary_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") + info(f"Summary written to {summary_path}") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="qemu_swarm.py", + description="QEMU ESP32-S3 Swarm Configurator (ADR-062)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +Examples: + python3 qemu_swarm.py --config swarm_presets/standard.yaml + python3 qemu_swarm.py --preset smoke + python3 qemu_swarm.py --preset standard --timeout 90 + python3 qemu_swarm.py --list-presets + python3 qemu_swarm.py --config custom.yaml --dry-run + +Exit codes: + 0 PASS - all assertions passed + 1 WARN - non-critical assertions failed + 2 FAIL - critical assertions failed + 3 FATAL - infrastructure or build failure +""", + ) + + source = parser.add_mutually_exclusive_group() + source.add_argument("--config", metavar="FILE", + help="Path to YAML swarm configuration file") + source.add_argument("--preset", metavar="NAME", + help="Use a built-in preset (e.g. smoke, standard, large-mesh)") + source.add_argument("--list-presets", action="store_true", + help="List available preset configurations and exit") + + parser.add_argument("--timeout", type=int, default=None, + help="Override swarm duration_s from config") + parser.add_argument("--dry-run", action="store_true", + help="Show what would be launched without running") + parser.add_argument("--qemu-path", default="qemu-system-xtensa", + help="Path to QEMU binary (default: qemu-system-xtensa)") + parser.add_argument("--skip-build", action="store_true", + help="Skip firmware build step") + parser.add_argument("--output-dir", metavar="DIR", default=None, + help="Directory for logs and results (default: build/swarm_)") + + return parser + + +def main() -> int: + parser = build_parser() + args = parser.parse_args() + + # List presets + if args.list_presets: + presets = list_presets() + if not presets: + print(f"No presets found in {PRESETS_DIR}") + return EXIT_PASS + print("Available swarm presets:") + print() + for name, desc in presets: + print(f" {name:20s} {desc}") + print() + print(f"Use: python3 qemu_swarm.py --preset ") + return EXIT_PASS + + # Load config + if args.config: + config_path = Path(args.config) + if not config_path.exists(): + fatal(f"Config file not found: {config_path}") + return EXIT_FATAL + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + elif args.preset: + raw = load_preset(args.preset) + else: + parser.print_help() + print() + error("Provide --config FILE or --preset NAME (or use --list-presets)") + return EXIT_FATAL + + cfg = validate_config(raw) + + # Apply overrides + if args.timeout is not None: + cfg.duration_s = args.timeout + + # Determine output directory + if args.output_dir: + output_dir = Path(args.output_dir) + else: + output_dir = FIRMWARE_DIR / "build" / f"swarm_{cfg.name.replace(' ', '_')}" + + # Run orchestrator + orch = SwarmOrchestrator( + cfg=cfg, + qemu_bin=args.qemu_path, + output_dir=output_dir, + skip_build=args.skip_build, + dry_run=args.dry_run, + ) + + return orch.run() + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/swarm_health.py b/scripts/swarm_health.py new file mode 100644 index 00000000..8d2a5974 --- /dev/null +++ b/scripts/swarm_health.py @@ -0,0 +1,653 @@ +#!/usr/bin/env python3 +""" +QEMU Swarm Health Oracle (ADR-062) + +Validates collective health of a multi-node ESP32-S3 QEMU swarm. +Checks cross-node assertions like TDM ordering, inter-node communication, +and swarm-level frame rates. + +Usage: + python3 swarm_health.py --config swarm_config.yaml --log-dir build/swarm_logs/ + python3 swarm_health.py --log-dir build/swarm_logs/ --assertions all_nodes_boot no_crashes +""" + +import argparse +import re +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +try: + import yaml +except ImportError: + yaml = None # type: ignore[assignment] + + +# --------------------------------------------------------------------------- +# ANSI helpers (disabled when not a TTY) +# --------------------------------------------------------------------------- +USE_COLOR = sys.stdout.isatty() + + +def _color(text: str, code: str) -> str: + return f"\033[{code}m{text}\033[0m" if USE_COLOR else text + + +def green(t: str) -> str: + return _color(t, "32") + + +def yellow(t: str) -> str: + return _color(t, "33") + + +def red(t: str) -> str: + return _color(t, "1;31") + + +# --------------------------------------------------------------------------- +# Data types +# --------------------------------------------------------------------------- + +@dataclass +class AssertionResult: + """Result of a single swarm-level assertion.""" + name: str + passed: bool + message: str + severity: int # 0 = pass, 1 = warn, 2 = fail + + +@dataclass +class NodeLog: + """Parsed log for a single QEMU node.""" + node_id: int + lines: List[str] + text: str + + +# --------------------------------------------------------------------------- +# Log loading +# --------------------------------------------------------------------------- + +def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]: + """Load node_0.log .. node_{n-1}.log from *log_dir*.""" + logs: List[NodeLog] = [] + for i in range(node_count): + path = log_dir / f"node_{i}.log" + if path.exists(): + text = path.read_text(encoding="utf-8", errors="replace") + else: + text = "" + logs.append(NodeLog(node_id=i, lines=text.splitlines(), text=text)) + return logs + + +def _node_count_from_dir(log_dir: Path) -> int: + """Auto-detect node count by scanning for node_*.log files.""" + count = 0 + while (log_dir / f"node_{count}.log").exists(): + count += 1 + return count + + +# --------------------------------------------------------------------------- +# Individual assertions +# --------------------------------------------------------------------------- + +_BOOT_PATTERNS = [ + r"app_main\(\)", r"main_task:", r"main:", r"ESP32-S3 CSI Node", +] + +_CRASH_PATTERNS = [ + r"Guru Meditation", r"assert failed", r"abort\(\)", r"panic", + r"LoadProhibited", r"StoreProhibited", r"InstrFetchProhibited", + r"IllegalInstruction", r"Unhandled debug exception", r"Fatal exception", +] + +_HEAP_PATTERNS = [ + r"HEAP_ERROR", r"out of memory", r"heap_caps_alloc.*failed", + r"malloc.*fail", r"heap corruption", r"CORRUPT HEAP", + r"multi_heap", r"heap_lock", +] + +_FRAME_PATTERNS = [ + r"frame", r"CSI", r"mock_csi", r"iq_data", r"subcarrier", + r"csi_collector", r"enqueue", +] + +_FALL_PATTERNS = [r"fall[=: ]+1", r"fall detected", r"fall_event"] + + +def assert_all_nodes_boot(logs: List[NodeLog], timeout_s: float = 10.0) -> AssertionResult: + """Check each node's log for boot patterns.""" + missing: List[int] = [] + for nl in logs: + found = any( + re.search(p, nl.text) for p in _BOOT_PATTERNS + ) + if not found: + missing.append(nl.node_id) + + if not missing: + return AssertionResult( + name="all_nodes_boot", passed=True, + message=f"All {len(logs)} nodes booted (timeout={timeout_s}s)", + severity=0, + ) + return AssertionResult( + name="all_nodes_boot", passed=False, + message=f"Nodes missing boot indicator: {missing}", + severity=2, + ) + + +def assert_no_crashes(logs: List[NodeLog]) -> AssertionResult: + """Check no node has crash patterns.""" + crashed: List[str] = [] + for nl in logs: + for line in nl.lines: + for pat in _CRASH_PATTERNS: + if re.search(pat, line): + crashed.append(f"node_{nl.node_id}: {line.strip()[:100]}") + break + if crashed and crashed[-1].startswith(f"node_{nl.node_id}"): + break # one crash per node is enough + + if not crashed: + return AssertionResult( + name="no_crashes", passed=True, + message="No crash indicators in any node", + severity=0, + ) + return AssertionResult( + name="no_crashes", passed=False, + message=f"Crashes found: {crashed[0]}" + ( + f" (+{len(crashed)-1} more)" if len(crashed) > 1 else "" + ), + severity=2, + ) + + +def assert_tdm_no_collision(logs: List[NodeLog]) -> AssertionResult: + """Parse TDM slot assignments from logs, verify uniqueness.""" + slot_map: Dict[int, List[int]] = {} # slot -> [node_ids] + tdm_pat = re.compile(r"tdm[_ ]?slot[=: ]+(\d+)", re.IGNORECASE) + + for nl in logs: + for line in nl.lines: + m = tdm_pat.search(line) + if m: + slot = int(m.group(1)) + slot_map.setdefault(slot, []) + if nl.node_id not in slot_map[slot]: + slot_map[slot].append(nl.node_id) + break # first occurrence per node + + collisions = {s: nids for s, nids in slot_map.items() if len(nids) > 1} + + if not slot_map: + return AssertionResult( + name="tdm_no_collision", passed=True, + message="No TDM slot assignments found (may be N/A)", + severity=0, + ) + if not collisions: + return AssertionResult( + name="tdm_no_collision", passed=True, + message=f"TDM slots unique across {len(slot_map)} assignments", + severity=0, + ) + return AssertionResult( + name="tdm_no_collision", passed=False, + message=f"TDM collisions: {collisions}", + severity=2, + ) + + +def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult: + """Each sensor node has CSI frame output.""" + silent: List[int] = [] + for nl in logs: + found = any( + re.search(p, line, re.IGNORECASE) + for line in nl.lines for p in _FRAME_PATTERNS + ) + if not found: + silent.append(nl.node_id) + + if not silent: + return AssertionResult( + name="all_nodes_produce_frames", passed=True, + message=f"All {len(logs)} nodes show frame activity", + severity=0, + ) + return AssertionResult( + name="all_nodes_produce_frames", passed=False, + message=f"Nodes with no frame activity: {silent}", + severity=1, + ) + + +def assert_coordinator_receives_from_all( + logs: List[NodeLog], + coordinator_id: int = 0, + sensor_ids: Optional[List[int]] = None, +) -> AssertionResult: + """Coordinator log shows frames from each sensor's node_id.""" + coord_log = None + for nl in logs: + if nl.node_id == coordinator_id: + coord_log = nl + break + + if coord_log is None: + return AssertionResult( + name="coordinator_receives_from_all", passed=False, + message=f"Coordinator node_{coordinator_id} log not found", + severity=2, + ) + + if sensor_ids is None: + sensor_ids = [nl.node_id for nl in logs if nl.node_id != coordinator_id] + + missing: List[int] = [] + recv_pat = re.compile(r"(from|node_id|src)[=: ]+(\d+)", re.IGNORECASE) + received_ids: set = set() + for line in coord_log.lines: + m = recv_pat.search(line) + if m: + received_ids.add(int(m.group(2))) + + for sid in sensor_ids: + if sid not in received_ids: + missing.append(sid) + + if not missing: + return AssertionResult( + name="coordinator_receives_from_all", passed=True, + message=f"Coordinator received from all sensors: {sensor_ids}", + severity=0, + ) + return AssertionResult( + name="coordinator_receives_from_all", passed=False, + message=f"Coordinator missing frames from nodes: {missing}", + severity=1, + ) + + +def assert_fall_detected(logs: List[NodeLog], node_id: int) -> AssertionResult: + """Specific node reports fall detection.""" + for nl in logs: + if nl.node_id == node_id: + found = any( + re.search(p, line, re.IGNORECASE) + for line in nl.lines for p in _FALL_PATTERNS + ) + if found: + return AssertionResult( + name=f"fall_detected_node_{node_id}", passed=True, + message=f"Node {node_id} reported fall event", + severity=0, + ) + return AssertionResult( + name=f"fall_detected_node_{node_id}", passed=False, + message=f"Node {node_id} did not report fall event", + severity=1, + ) + + return AssertionResult( + name=f"fall_detected_node_{node_id}", passed=False, + message=f"Node {node_id} log not found", + severity=2, + ) + + +def assert_frame_rate_above(logs: List[NodeLog], min_fps: float = 10.0) -> AssertionResult: + """Each node meets minimum frame rate.""" + fps_pat = re.compile(r"(?:fps|frame.?rate)[=: ]+([0-9.]+)", re.IGNORECASE) + count_pat = re.compile(r"(?:frame[_ ]?count|frames)[=: ]+(\d+)", re.IGNORECASE) + below: List[str] = [] + + for nl in logs: + best_fps: Optional[float] = None + # Try explicit FPS + for line in nl.lines: + m = fps_pat.search(line) + if m: + try: + best_fps = max(best_fps or 0.0, float(m.group(1))) + except ValueError: + pass + # Fallback: estimate from frame count (assume 1-second intervals) + if best_fps is None: + counts = [] + for line in nl.lines: + m = count_pat.search(line) + if m: + try: + counts.append(int(m.group(1))) + except ValueError: + pass + if len(counts) >= 2: + best_fps = float(counts[-1] - counts[0]) / max(len(counts) - 1, 1) + + if best_fps is not None and best_fps < min_fps: + below.append(f"node_{nl.node_id}={best_fps:.1f}") + + if not below: + return AssertionResult( + name="frame_rate_above", passed=True, + message=f"All nodes meet minimum {min_fps} fps", + severity=0, + ) + return AssertionResult( + name="frame_rate_above", passed=False, + message=f"Nodes below {min_fps} fps: {', '.join(below)}", + severity=1, + ) + + +def assert_max_boot_time(logs: List[NodeLog], max_seconds: float = 10.0) -> AssertionResult: + """All nodes boot within N seconds (based on timestamp in log).""" + boot_time_pat = re.compile(r"\((\d+)\)\s", re.IGNORECASE) + slow: List[str] = [] + + for nl in logs: + boot_found = False + for line in nl.lines: + if any(re.search(p, line) for p in _BOOT_PATTERNS): + boot_found = True + m = boot_time_pat.search(line) + if m: + ms = int(m.group(1)) + if ms > max_seconds * 1000: + slow.append(f"node_{nl.node_id}={ms}ms") + break + if not boot_found: + slow.append(f"node_{nl.node_id}=no_boot") + + if not slow: + return AssertionResult( + name="max_boot_time", passed=True, + message=f"All nodes booted within {max_seconds}s", + severity=0, + ) + return AssertionResult( + name="max_boot_time", passed=False, + message=f"Slow/missing boot: {', '.join(slow)}", + severity=1, + ) + + +def assert_no_heap_errors(logs: List[NodeLog]) -> AssertionResult: + """No OOM/heap errors in any log.""" + errors: List[str] = [] + for nl in logs: + for line in nl.lines: + for pat in _HEAP_PATTERNS: + if re.search(pat, line, re.IGNORECASE): + errors.append(f"node_{nl.node_id}: {line.strip()[:100]}") + break + if errors and errors[-1].startswith(f"node_{nl.node_id}"): + break + + if not errors: + return AssertionResult( + name="no_heap_errors", passed=True, + message="No heap errors in any node", + severity=0, + ) + return AssertionResult( + name="no_heap_errors", passed=False, + message=f"Heap errors: {errors[0]}" + ( + f" (+{len(errors)-1} more)" if len(errors) > 1 else "" + ), + severity=2, + ) + + +# --------------------------------------------------------------------------- +# Assertion registry & dispatcher +# --------------------------------------------------------------------------- + +ASSERTION_REGISTRY: Dict[str, Any] = { + "all_nodes_boot": assert_all_nodes_boot, + "no_crashes": assert_no_crashes, + "tdm_no_collision": assert_tdm_no_collision, + "all_nodes_produce_frames": assert_all_nodes_produce_frames, + "coordinator_receives_from_all": assert_coordinator_receives_from_all, + "frame_rate_above": assert_frame_rate_above, + "max_boot_time": assert_max_boot_time, + "no_heap_errors": assert_no_heap_errors, + # fall_detected is parameterized, handled separately +} + + +def _parse_assertion_spec(spec: Any) -> tuple: + """Parse a YAML assertion entry into (name, kwargs). + + Supported forms: + - "all_nodes_boot" -> ("all_nodes_boot", {}) + - {"frame_rate_above": 15} -> ("frame_rate_above", {"min_fps": 15}) + - "fall_detected_by_node_2" -> ("fall_detected", {"node_id": 2}) + - {"max_boot_time_s": 10} -> ("max_boot_time", {"max_seconds": 10}) + """ + if isinstance(spec, str): + # Check for fall_detected_by_node_N pattern + m = re.match(r"fall_detected_by_node_(\d+)", spec) + if m: + return ("fall_detected", {"node_id": int(m.group(1))}) + return (spec, {}) + + if isinstance(spec, dict): + for key, val in spec.items(): + m = re.match(r"fall_detected_by_node_(\d+)", str(key)) + if m: + return ("fall_detected", {"node_id": int(m.group(1))}) + if key == "frame_rate_above": + return ("frame_rate_above", {"min_fps": float(val)}) + if key == "max_boot_time_s": + return ("max_boot_time", {"max_seconds": float(val)}) + if key == "coordinator_receives_from_all": + return ("coordinator_receives_from_all", {}) + return (str(key), {}) + + return (str(spec), {}) + + +def run_assertions( + logs: List[NodeLog], + assertion_specs: List[Any], + config: Optional[Dict] = None, +) -> List[AssertionResult]: + """Run all requested assertions against loaded logs.""" + results: List[AssertionResult] = [] + + # Derive coordinator/sensor IDs from config if available + coordinator_id = 0 + sensor_ids: Optional[List[int]] = None + if config and "nodes" in config: + for node_def in config["nodes"]: + if node_def.get("role") == "coordinator": + coordinator_id = node_def.get("node_id", 0) + sensor_ids = [ + n["node_id"] for n in config["nodes"] + if n.get("role") == "sensor" + ] + + for spec in assertion_specs: + name, kwargs = _parse_assertion_spec(spec) + + if name == "fall_detected": + results.append(assert_fall_detected(logs, **kwargs)) + elif name == "coordinator_receives_from_all": + results.append(assert_coordinator_receives_from_all( + logs, coordinator_id=coordinator_id, sensor_ids=sensor_ids, + )) + elif name in ASSERTION_REGISTRY: + fn = ASSERTION_REGISTRY[name] + results.append(fn(logs, **kwargs)) + else: + results.append(AssertionResult( + name=name, passed=False, + message=f"Unknown assertion: {name}", + severity=1, + )) + + return results + + +# --------------------------------------------------------------------------- +# Report printing +# --------------------------------------------------------------------------- + +def print_report(results: List[AssertionResult], swarm_name: str = "") -> int: + """Print the assertion report and return max severity.""" + header = "QEMU Swarm Health Report (ADR-062)" + if swarm_name: + header += f" - {swarm_name}" + + print() + print("=" * 60) + print(f" {header}") + print("=" * 60) + print() + + max_sev = 0 + for r in results: + if r.severity == 0: + icon = green("PASS") + elif r.severity == 1: + icon = yellow("WARN") + else: + icon = red("FAIL") + + print(f" [{icon}] {r.name}: {r.message}") + max_sev = max(max_sev, r.severity) + + print() + passed = sum(1 for r in results if r.passed) + total = len(results) + summary = f" {passed}/{total} assertions passed" + + if max_sev == 0: + print(green(summary)) + elif max_sev == 1: + print(yellow(summary + " (with warnings)")) + else: + print(red(summary + " (with failures)")) + + print() + return max_sev + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser( + description="QEMU Swarm Health Oracle (ADR-062)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=( + "Example:\n" + " python3 swarm_health.py --config scripts/swarm_presets/standard.yaml \\\n" + " --log-dir build/swarm_logs/\n" + "\n" + " python3 swarm_health.py --log-dir build/swarm_logs/ \\\n" + " --assertions all_nodes_boot no_crashes\n" + "\n" + "Example output:\n" + " ============================================================\n" + " QEMU Swarm Health Report (ADR-062) - standard\n" + " ============================================================\n" + "\n" + " [PASS] all_nodes_boot: All 3 nodes booted (timeout=10.0s)\n" + " [PASS] no_crashes: No crash indicators in any node\n" + " [PASS] tdm_no_collision: TDM slots unique across 3 assignments\n" + " [PASS] all_nodes_produce_frames: All 3 nodes show frame activity\n" + " [PASS] coordinator_receives_from_all: Coordinator received from all\n" + " [WARN] fall_detected_node_2: Node 2 did not report fall event\n" + " [PASS] frame_rate_above: All nodes meet minimum 15.0 fps\n" + "\n" + " 6/7 assertions passed (with warnings)\n" + ), + ) + parser.add_argument( + "--config", type=str, default=None, + help="Path to swarm YAML config (defines nodes and assertions)", + ) + parser.add_argument( + "--log-dir", type=str, required=True, + help="Directory containing node_0.log, node_1.log, etc.", + ) + parser.add_argument( + "--assertions", nargs="*", default=None, + help="Override assertions (space-separated). Ignores YAML assertion list.", + ) + parser.add_argument( + "--node-count", type=int, default=None, + help="Number of nodes (auto-detected from log files if omitted)", + ) + args = parser.parse_args() + + log_dir = Path(args.log_dir) + if not log_dir.is_dir(): + print(f"ERROR: Log directory not found: {log_dir}", file=sys.stderr) + sys.exit(2) + + # Load YAML config if provided + config: Optional[Dict] = None + swarm_name = "" + yaml_assertions: List[Any] = [] + + if args.config: + if yaml is None: + print("ERROR: PyYAML is required for --config. Install with: pip install pyyaml", + file=sys.stderr) + sys.exit(2) + config_path = Path(args.config) + if not config_path.exists(): + print(f"ERROR: Config file not found: {config_path}", file=sys.stderr) + sys.exit(2) + with open(config_path, "r") as f: + config = yaml.safe_load(f) + swarm_name = config.get("swarm", {}).get("name", "") + yaml_assertions = config.get("assertions", []) + + # Determine node count + if args.node_count is not None: + node_count = args.node_count + elif config and "nodes" in config: + node_count = len(config["nodes"]) + else: + node_count = _node_count_from_dir(log_dir) + + if node_count == 0: + print("ERROR: No node logs found and node count not specified.", file=sys.stderr) + sys.exit(2) + + # Load logs + logs = load_logs(log_dir, node_count) + + # Determine which assertions to run + if args.assertions is not None: + assertion_specs = args.assertions + elif yaml_assertions: + assertion_specs = yaml_assertions + else: + # Default set + assertion_specs = ["all_nodes_boot", "no_crashes", "no_heap_errors"] + + # Run assertions + results = run_assertions(logs, assertion_specs, config) + + # Print report and exit + max_sev = print_report(results, swarm_name) + sys.exit(max_sev) + + +if __name__ == "__main__": + main() diff --git a/scripts/swarm_presets/ci_matrix.yaml b/scripts/swarm_presets/ci_matrix.yaml new file mode 100644 index 00000000..aa7a4c45 --- /dev/null +++ b/scripts/swarm_presets/ci_matrix.yaml @@ -0,0 +1,31 @@ +# CI-optimized preset: 3 nodes, star topology, 30s, minimal assertions +swarm: + name: ci-matrix + duration_s: 30 + topology: star + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 1 + + - role: sensor + node_id: 1 + scenario: 1 + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 2 + channel: 6 + tdm_slot: 2 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - max_boot_time_s: 10 diff --git a/scripts/swarm_presets/heterogeneous.yaml b/scripts/swarm_presets/heterogeneous.yaml new file mode 100644 index 00000000..6b597d3e --- /dev/null +++ b/scripts/swarm_presets/heterogeneous.yaml @@ -0,0 +1,49 @@ +# Mixed scenarios: 5 nodes with different CSI scenarios, star topology, 90s +swarm: + name: heterogeneous + duration_s: 90 + topology: star + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 2 + is_gateway: true + + - role: sensor + node_id: 1 + scenario: 1 + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 2 + channel: 6 + tdm_slot: 2 + + - role: sensor + node_id: 3 + scenario: 3 + channel: 6 + tdm_slot: 3 + + - role: sensor + node_id: 4 + scenario: 5 + channel: 11 + tdm_slot: 4 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - coordinator_receives_from_all + - fall_detected_by_node_3 + - no_heap_errors + - frame_rate_above: 12 + - max_boot_time_s: 12 diff --git a/scripts/swarm_presets/large_mesh.yaml b/scripts/swarm_presets/large_mesh.yaml new file mode 100644 index 00000000..c6ed4f8e --- /dev/null +++ b/scripts/swarm_presets/large_mesh.yaml @@ -0,0 +1,54 @@ +# Scale test: 6 fully-connected nodes in mesh topology, 90s +swarm: + name: large-mesh + duration_s: 90 + topology: mesh + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 2 + is_gateway: true + + - role: sensor + node_id: 1 + scenario: 1 + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 2 + channel: 6 + tdm_slot: 2 + + - role: sensor + node_id: 3 + scenario: 3 + channel: 6 + tdm_slot: 3 + + - role: sensor + node_id: 4 + scenario: 4 + channel: 6 + tdm_slot: 4 + + - role: sensor + node_id: 5 + scenario: 5 + channel: 6 + tdm_slot: 5 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - coordinator_receives_from_all + - no_heap_errors + - frame_rate_above: 10 + - max_boot_time_s: 15 diff --git a/scripts/swarm_presets/line_relay.yaml b/scripts/swarm_presets/line_relay.yaml new file mode 100644 index 00000000..0d2045fe --- /dev/null +++ b/scripts/swarm_presets/line_relay.yaml @@ -0,0 +1,39 @@ +# Multi-hop relay chain: 4 nodes in line topology, 60s +swarm: + name: line-relay + duration_s: 60 + topology: line + aggregator_port: 5005 + +nodes: + - role: gateway + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 2 + is_gateway: true + + - role: coordinator + node_id: 1 + scenario: 0 + channel: 6 + edge_tier: 1 + + - role: sensor + node_id: 2 + scenario: 2 + channel: 6 + tdm_slot: 2 + + - role: sensor + node_id: 3 + scenario: 1 + channel: 6 + tdm_slot: 3 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - max_boot_time_s: 12 diff --git a/scripts/swarm_presets/ring_fault.yaml b/scripts/swarm_presets/ring_fault.yaml new file mode 100644 index 00000000..0fbb0407 --- /dev/null +++ b/scripts/swarm_presets/ring_fault.yaml @@ -0,0 +1,41 @@ +# Ring topology with fault injection: 4 nodes, 75s +swarm: + name: ring-fault + duration_s: 75 + topology: ring + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 2 + is_gateway: true + + - role: sensor + node_id: 1 + scenario: 1 + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 2 + channel: 6 + tdm_slot: 2 + + - role: sensor + node_id: 3 + scenario: 3 + channel: 6 + tdm_slot: 3 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - coordinator_receives_from_all + - no_heap_errors + - max_boot_time_s: 12 diff --git a/scripts/swarm_presets/smoke.yaml b/scripts/swarm_presets/smoke.yaml new file mode 100644 index 00000000..7beef1d5 --- /dev/null +++ b/scripts/swarm_presets/smoke.yaml @@ -0,0 +1,24 @@ +# Quick CI smoke test: 2 nodes, star topology, 15s duration +swarm: + name: smoke + duration_s: 15 + topology: star + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 1 + + - role: sensor + node_id: 1 + scenario: 1 + channel: 6 + tdm_slot: 1 + +assertions: + - all_nodes_boot + - no_crashes + - max_boot_time_s: 10 diff --git a/scripts/swarm_presets/standard.yaml b/scripts/swarm_presets/standard.yaml new file mode 100644 index 00000000..07820716 --- /dev/null +++ b/scripts/swarm_presets/standard.yaml @@ -0,0 +1,36 @@ +# Standard 3-node test: 2 sensors + 1 coordinator, star topology, 60s +swarm: + name: standard + duration_s: 60 + topology: star + aggregator_port: 5005 + +nodes: + - role: coordinator + node_id: 0 + scenario: 0 + channel: 6 + edge_tier: 2 + is_gateway: true + + - role: sensor + node_id: 1 + scenario: 2 + channel: 6 + tdm_slot: 1 + + - role: sensor + node_id: 2 + scenario: 3 + channel: 6 + tdm_slot: 2 + +assertions: + - all_nodes_boot + - no_crashes + - tdm_no_collision + - all_nodes_produce_frames + - coordinator_receives_from_all + - fall_detected_by_node_2 + - frame_rate_above: 15 + - max_boot_time_s: 10