feat(qemu): ADR-062 QEMU swarm configurator for multi-ESP32 testing

YAML-driven orchestrator for testing multiple ESP32-S3 QEMU instances
with configurable topologies (star/mesh/line/ring), role-based nodes
(sensor/coordinator/gateway), and swarm-level health assertions.

New files:
- ADR-062: architecture decision record
- qemu_swarm.py: main orchestrator (1097 lines)
  - YAML config parsing with schema validation
  - 4 topology implementations with TAP/SLIRP fallback
  - Per-node NVS provisioning via provision.py --dry-run
  - Signal-safe cleanup, dry-run mode, JSON results output
- swarm_health.py: 9-assertion health oracle (653 lines)
- 7 preset configs: smoke (2n/15s), standard (3n/60s),
  large-mesh (6n/90s), line-relay (4n/60s), ring-fault (4n/75s),
  heterogeneous (5n/90s), ci-matrix (3n/30s)
- CI: swarm-test job in firmware-qemu.yml

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-03-14 12:24:06 -04:00
parent e574cbe129
commit a8f5276d9b
11 changed files with 2286 additions and 0 deletions

View File

@ -7,6 +7,9 @@ on:
- 'scripts/qemu-esp32s3-test.sh'
- 'scripts/validate_qemu_output.py'
- 'scripts/generate_nvs_matrix.py'
- 'scripts/qemu_swarm.py'
- 'scripts/swarm_health.py'
- 'scripts/swarm_presets/**'
- '.github/workflows/firmware-qemu.yml'
pull_request:
paths:
@ -14,6 +17,9 @@ on:
- 'scripts/qemu-esp32s3-test.sh'
- 'scripts/validate_qemu_output.py'
- 'scripts/generate_nvs_matrix.py'
- 'scripts/qemu_swarm.py'
- 'scripts/swarm_health.py'
- 'scripts/swarm_presets/**'
- '.github/workflows/firmware-qemu.yml'
env:
@ -284,3 +290,60 @@ jobs:
fi
echo " OK: $(basename $f) ($SIZE bytes)"
done
# ---------------------------------------------------------------------------
# ADR-062: QEMU Swarm Configurator Test
#
# Runs a lightweight 3-node swarm (ci_matrix preset) under QEMU to validate
# multi-node orchestration, TDM slot coordination, and swarm-level health
# assertions. Uses the pre-built QEMU binary from the build-qemu job and the
# firmware built by qemu-test.
#
# The CI runner is non-root, so TAP bridge networking is unavailable.
# The orchestrator (qemu_swarm.py) detects this and falls back to SLIRP
# user-mode networking, which is sufficient for the ci_matrix preset.
# ---------------------------------------------------------------------------
swarm-test:
name: Swarm Test (ADR-062)
needs: [build-qemu, qemu-test]
runs-on: ubuntu-latest
container:
image: espressif/idf:v5.4
steps:
- uses: actions/checkout@v4
- name: Download QEMU artifact
uses: actions/download-artifact@v4
with:
name: qemu-esp32
path: ${{ github.workspace }}/qemu-build
- name: Make QEMU executable
run: chmod +x ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa
- name: Download firmware build artifacts
uses: actions/download-artifact@v4
with:
name: qemu-logs-default
path: ${{ github.workspace }}/firmware-artifacts
- name: Install Python dependencies
run: pip install pyyaml esptool esp-idf-nvs-partition-gen
- name: Run swarm smoke test
run: |
python3 scripts/qemu_swarm.py --preset ci_matrix \
--qemu-path ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa \
--skip-build \
--output-dir build/swarm-results
timeout-minutes: 5
- name: Upload swarm results
if: always()
uses: actions/upload-artifact@v4
with:
name: swarm-results
path: |
build/swarm-results/
retention-days: 14

View File

@ -0,0 +1,199 @@
# ADR-062: QEMU ESP32-S3 Swarm Configurator
| Field | Value |
|-------------|------------------------------------------------|
| **Status** | Accepted |
| **Date** | 2026-03-14 |
| **Authors** | RuView Team |
| **Relates** | ADR-061 (QEMU testing platform), ADR-060 (channel/MAC filter), ADR-018 (binary frame), ADR-039 (edge intel) |
## Glossary
| Term | Definition |
|------|-----------|
| Swarm | A group of N QEMU ESP32-S3 instances running simultaneously |
| Topology | How nodes are connected: star, mesh, line, ring |
| Role | Node function: `sensor` (collects CSI), `coordinator` (aggregates + forwards), `gateway` (bridges to host) |
| Scenario matrix | Cross-product of topology × node count × NVS config × mock scenario |
| Health oracle | Python process that monitors all node UART logs and declares swarm health |
## Context
ADR-061 Layer 3 provides a basic multi-node mesh test: N identical nodes with sequential TDM slots connected via a Linux bridge. This is useful but limited:
1. **All nodes are identical** — real deployments have heterogeneous roles (sensor, coordinator, gateway)
2. **Single topology** — only fully-connected bridge; no star, line, or ring topologies
3. **No scenario variation per node** — all nodes run the same mock CSI scenario
4. **Manual configuration** — each test requires hand-editing env vars and arguments
5. **No swarm-level health monitoring** — validation checks individual nodes, not collective behavior
6. **No cross-node timing validation** — TDM slot ordering and inter-frame gaps aren't verified
Real WiFi-DensePose deployments use 3-8 ESP32-S3 nodes in various topologies. A single coordinator aggregates CSI from multiple sensors. The firmware must handle TDM conflicts, missing nodes, role-based behavior differences, and network partitions — none of which ADR-061 Layer 3 tests.
## Decision
Build a **QEMU Swarm Configurator** — a YAML-driven tool that defines multi-node test scenarios declaratively and orchestrates them under QEMU with swarm-level validation.
### Architecture
```
┌─────────────────────────────────────────────────────┐
│ swarm_config.yaml │
│ nodes: [{role: sensor, scenario: 2, channel: 6}] │
│ topology: star │
│ duration: 60s │
│ assertions: [all_nodes_boot, tdm_no_collision, ...] │
└──────────────────────┬──────────────────────────────┘
┌────────────▼────────────┐
│ qemu_swarm.py │
│ (orchestrator) │
└───┬────┬────┬───┬──────┘
│ │ │ │
┌────▼┐ ┌▼──┐ ▼ ┌▼────┐
│Node0│ │N1 │... │N(n-1)│ QEMU instances
│sens │ │sen│ │coord │
└──┬──┘ └─┬─┘ └──┬───┘
│ │ │
┌──▼──────▼─────────▼──┐
│ Virtual Network │ TAP bridge / SLIRP
│ (topology-shaped) │
└──────────┬───────────┘
┌──────────▼───────────┐
│ Aggregator (Rust) │ Collects frames
└──────────┬───────────┘
┌──────────▼───────────┐
│ Health Oracle │ Swarm-level assertions
│ (swarm_health.py) │
└──────────────────────┘
```
### YAML Configuration Schema
```yaml
# swarm_config.yaml
swarm:
name: "3-sensor-star"
duration_s: 60
topology: star # star | mesh | line | ring
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0 # empty room (baseline)
channel: 6
edge_tier: 2
is_gateway: true # receives aggregated frames
- role: sensor
node_id: 1
scenario: 2 # walking person
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 3 # fall event
channel: 6
tdm_slot: 2
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- coordinator_receives_from_all
- fall_detected_by_node_2
- frame_rate_above: 15 # Hz minimum per node
- max_boot_time_s: 10
```
### Topologies
| Topology | Network | Description |
|----------|---------|-------------|
| `star` | All sensors connect to coordinator; coordinator has TAP to each sensor | Hub-and-spoke, most common |
| `mesh` | All nodes on same bridge (existing Layer 3 behavior) | Every node sees every other |
| `line` | Node 0 ↔ Node 1 ↔ Node 2 ↔ ... | Linear chain, tests multi-hop |
| `ring` | Like line but last connects to first | Circular, tests routing |
### Node Roles
| Role | Behavior | NVS Keys |
|------|----------|----------|
| `sensor` | Runs mock CSI, sends frames to coordinator | `node_id`, `tdm_slot`, `target_ip` |
| `coordinator` | Receives frames from sensors, runs edge aggregation | `node_id`, `tdm_slot=0`, `edge_tier=2` |
| `gateway` | Like coordinator but also bridges to host UDP | `node_id`, `target_ip=host`, `is_gateway=1` |
### Assertions (Swarm-Level)
| Assertion | What It Checks |
|-----------|---------------|
| `all_nodes_boot` | Every node's UART log shows boot indicators within timeout |
| `no_crashes` | No Guru Meditation, assert, panic in any log |
| `tdm_no_collision` | No two nodes transmit in the same TDM slot |
| `all_nodes_produce_frames` | Every sensor node's log contains CSI frame output |
| `coordinator_receives_from_all` | Coordinator log shows frames from each sensor's node_id |
| `fall_detected_by_node_N` | Node N's log reports a fall detection event |
| `frame_rate_above` | Each node produces at least N frames/second |
| `max_boot_time_s` | All nodes boot within N seconds |
| `no_heap_errors` | No OOM or heap corruption in any log |
| `network_partitioned_recovery` | After deliberate partition, nodes resume communication |
### Preset Configurations
| Preset | Nodes | Topology | Purpose |
|--------|-------|----------|---------|
| `smoke` | 2 | star | Quick CI smoke test (15s) |
| `standard` | 3 | star | Default 3-node (sensor + sensor + coordinator) |
| `large-mesh` | 6 | mesh | Scale test with 6 fully-connected nodes |
| `line-relay` | 4 | line | Multi-hop relay chain |
| `ring-fault` | 4 | ring | Ring with fault injection mid-test |
| `heterogeneous` | 5 | star | Mixed scenarios: walk, fall, static, channel-sweep, empty |
| `ci-matrix` | 3 | star | CI-optimized preset (30s, minimal assertions) |
## File Layout
```
scripts/
├── qemu_swarm.py # Main orchestrator (CLI entry point)
├── swarm_health.py # Swarm-level health oracle
└── swarm_presets/
├── smoke.yaml
├── standard.yaml
├── large_mesh.yaml
├── line_relay.yaml
├── ring_fault.yaml
├── heterogeneous.yaml
└── ci_matrix.yaml
.github/workflows/
└── firmware-qemu.yml # MODIFIED: add swarm test job
```
## Consequences
### Benefits
1. **Declarative testing** — define swarm topology in YAML, not shell scripts
2. **Role-based nodes** — test coordinator/sensor/gateway interactions
3. **Topology variety** — star/mesh/line/ring match real deployment patterns
4. **Swarm-level assertions** — validate collective behavior, not just individual nodes
5. **Preset library** — quick CI smoke tests and thorough manual validation
6. **Reproducible** — YAML configs are version-controlled and shareable
### Limitations
1. **Still requires root** for TAP bridge topologies (star, line, ring); mesh can use SLIRP
2. **QEMU resource usage** — 6+ QEMU instances use ~2GB RAM, may slow CI runners
3. **No real RF** — inter-node communication is IP-based, not WiFi CSI multipath
## References
- ADR-061: QEMU ESP32-S3 firmware testing platform (Layers 1-9)
- ADR-060: Channel override and MAC address filter provisioning
- ADR-018: Binary CSI frame format (magic `0xC5110001`)
- ADR-039: Edge intelligence pipeline (biquad, vitals, fall detection)

1097
scripts/qemu_swarm.py Normal file

File diff suppressed because it is too large Load Diff

653
scripts/swarm_health.py Normal file
View File

@ -0,0 +1,653 @@
#!/usr/bin/env python3
"""
QEMU Swarm Health Oracle (ADR-062)
Validates collective health of a multi-node ESP32-S3 QEMU swarm.
Checks cross-node assertions like TDM ordering, inter-node communication,
and swarm-level frame rates.
Usage:
python3 swarm_health.py --config swarm_config.yaml --log-dir build/swarm_logs/
python3 swarm_health.py --log-dir build/swarm_logs/ --assertions all_nodes_boot no_crashes
"""
import argparse
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import yaml
except ImportError:
yaml = None # type: ignore[assignment]
# ---------------------------------------------------------------------------
# ANSI helpers (disabled when not a TTY)
# ---------------------------------------------------------------------------
USE_COLOR = sys.stdout.isatty()
def _color(text: str, code: str) -> str:
return f"\033[{code}m{text}\033[0m" if USE_COLOR else text
def green(t: str) -> str:
return _color(t, "32")
def yellow(t: str) -> str:
return _color(t, "33")
def red(t: str) -> str:
return _color(t, "1;31")
# ---------------------------------------------------------------------------
# Data types
# ---------------------------------------------------------------------------
@dataclass
class AssertionResult:
"""Result of a single swarm-level assertion."""
name: str
passed: bool
message: str
severity: int # 0 = pass, 1 = warn, 2 = fail
@dataclass
class NodeLog:
"""Parsed log for a single QEMU node."""
node_id: int
lines: List[str]
text: str
# ---------------------------------------------------------------------------
# Log loading
# ---------------------------------------------------------------------------
def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]:
"""Load node_0.log .. node_{n-1}.log from *log_dir*."""
logs: List[NodeLog] = []
for i in range(node_count):
path = log_dir / f"node_{i}.log"
if path.exists():
text = path.read_text(encoding="utf-8", errors="replace")
else:
text = ""
logs.append(NodeLog(node_id=i, lines=text.splitlines(), text=text))
return logs
def _node_count_from_dir(log_dir: Path) -> int:
"""Auto-detect node count by scanning for node_*.log files."""
count = 0
while (log_dir / f"node_{count}.log").exists():
count += 1
return count
# ---------------------------------------------------------------------------
# Individual assertions
# ---------------------------------------------------------------------------
_BOOT_PATTERNS = [
r"app_main\(\)", r"main_task:", r"main:", r"ESP32-S3 CSI Node",
]
_CRASH_PATTERNS = [
r"Guru Meditation", r"assert failed", r"abort\(\)", r"panic",
r"LoadProhibited", r"StoreProhibited", r"InstrFetchProhibited",
r"IllegalInstruction", r"Unhandled debug exception", r"Fatal exception",
]
_HEAP_PATTERNS = [
r"HEAP_ERROR", r"out of memory", r"heap_caps_alloc.*failed",
r"malloc.*fail", r"heap corruption", r"CORRUPT HEAP",
r"multi_heap", r"heap_lock",
]
_FRAME_PATTERNS = [
r"frame", r"CSI", r"mock_csi", r"iq_data", r"subcarrier",
r"csi_collector", r"enqueue",
]
_FALL_PATTERNS = [r"fall[=: ]+1", r"fall detected", r"fall_event"]
def assert_all_nodes_boot(logs: List[NodeLog], timeout_s: float = 10.0) -> AssertionResult:
"""Check each node's log for boot patterns."""
missing: List[int] = []
for nl in logs:
found = any(
re.search(p, nl.text) for p in _BOOT_PATTERNS
)
if not found:
missing.append(nl.node_id)
if not missing:
return AssertionResult(
name="all_nodes_boot", passed=True,
message=f"All {len(logs)} nodes booted (timeout={timeout_s}s)",
severity=0,
)
return AssertionResult(
name="all_nodes_boot", passed=False,
message=f"Nodes missing boot indicator: {missing}",
severity=2,
)
def assert_no_crashes(logs: List[NodeLog]) -> AssertionResult:
"""Check no node has crash patterns."""
crashed: List[str] = []
for nl in logs:
for line in nl.lines:
for pat in _CRASH_PATTERNS:
if re.search(pat, line):
crashed.append(f"node_{nl.node_id}: {line.strip()[:100]}")
break
if crashed and crashed[-1].startswith(f"node_{nl.node_id}"):
break # one crash per node is enough
if not crashed:
return AssertionResult(
name="no_crashes", passed=True,
message="No crash indicators in any node",
severity=0,
)
return AssertionResult(
name="no_crashes", passed=False,
message=f"Crashes found: {crashed[0]}" + (
f" (+{len(crashed)-1} more)" if len(crashed) > 1 else ""
),
severity=2,
)
def assert_tdm_no_collision(logs: List[NodeLog]) -> AssertionResult:
"""Parse TDM slot assignments from logs, verify uniqueness."""
slot_map: Dict[int, List[int]] = {} # slot -> [node_ids]
tdm_pat = re.compile(r"tdm[_ ]?slot[=: ]+(\d+)", re.IGNORECASE)
for nl in logs:
for line in nl.lines:
m = tdm_pat.search(line)
if m:
slot = int(m.group(1))
slot_map.setdefault(slot, [])
if nl.node_id not in slot_map[slot]:
slot_map[slot].append(nl.node_id)
break # first occurrence per node
collisions = {s: nids for s, nids in slot_map.items() if len(nids) > 1}
if not slot_map:
return AssertionResult(
name="tdm_no_collision", passed=True,
message="No TDM slot assignments found (may be N/A)",
severity=0,
)
if not collisions:
return AssertionResult(
name="tdm_no_collision", passed=True,
message=f"TDM slots unique across {len(slot_map)} assignments",
severity=0,
)
return AssertionResult(
name="tdm_no_collision", passed=False,
message=f"TDM collisions: {collisions}",
severity=2,
)
def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult:
"""Each sensor node has CSI frame output."""
silent: List[int] = []
for nl in logs:
found = any(
re.search(p, line, re.IGNORECASE)
for line in nl.lines for p in _FRAME_PATTERNS
)
if not found:
silent.append(nl.node_id)
if not silent:
return AssertionResult(
name="all_nodes_produce_frames", passed=True,
message=f"All {len(logs)} nodes show frame activity",
severity=0,
)
return AssertionResult(
name="all_nodes_produce_frames", passed=False,
message=f"Nodes with no frame activity: {silent}",
severity=1,
)
def assert_coordinator_receives_from_all(
logs: List[NodeLog],
coordinator_id: int = 0,
sensor_ids: Optional[List[int]] = None,
) -> AssertionResult:
"""Coordinator log shows frames from each sensor's node_id."""
coord_log = None
for nl in logs:
if nl.node_id == coordinator_id:
coord_log = nl
break
if coord_log is None:
return AssertionResult(
name="coordinator_receives_from_all", passed=False,
message=f"Coordinator node_{coordinator_id} log not found",
severity=2,
)
if sensor_ids is None:
sensor_ids = [nl.node_id for nl in logs if nl.node_id != coordinator_id]
missing: List[int] = []
recv_pat = re.compile(r"(from|node_id|src)[=: ]+(\d+)", re.IGNORECASE)
received_ids: set = set()
for line in coord_log.lines:
m = recv_pat.search(line)
if m:
received_ids.add(int(m.group(2)))
for sid in sensor_ids:
if sid not in received_ids:
missing.append(sid)
if not missing:
return AssertionResult(
name="coordinator_receives_from_all", passed=True,
message=f"Coordinator received from all sensors: {sensor_ids}",
severity=0,
)
return AssertionResult(
name="coordinator_receives_from_all", passed=False,
message=f"Coordinator missing frames from nodes: {missing}",
severity=1,
)
def assert_fall_detected(logs: List[NodeLog], node_id: int) -> AssertionResult:
"""Specific node reports fall detection."""
for nl in logs:
if nl.node_id == node_id:
found = any(
re.search(p, line, re.IGNORECASE)
for line in nl.lines for p in _FALL_PATTERNS
)
if found:
return AssertionResult(
name=f"fall_detected_node_{node_id}", passed=True,
message=f"Node {node_id} reported fall event",
severity=0,
)
return AssertionResult(
name=f"fall_detected_node_{node_id}", passed=False,
message=f"Node {node_id} did not report fall event",
severity=1,
)
return AssertionResult(
name=f"fall_detected_node_{node_id}", passed=False,
message=f"Node {node_id} log not found",
severity=2,
)
def assert_frame_rate_above(logs: List[NodeLog], min_fps: float = 10.0) -> AssertionResult:
"""Each node meets minimum frame rate."""
fps_pat = re.compile(r"(?:fps|frame.?rate)[=: ]+([0-9.]+)", re.IGNORECASE)
count_pat = re.compile(r"(?:frame[_ ]?count|frames)[=: ]+(\d+)", re.IGNORECASE)
below: List[str] = []
for nl in logs:
best_fps: Optional[float] = None
# Try explicit FPS
for line in nl.lines:
m = fps_pat.search(line)
if m:
try:
best_fps = max(best_fps or 0.0, float(m.group(1)))
except ValueError:
pass
# Fallback: estimate from frame count (assume 1-second intervals)
if best_fps is None:
counts = []
for line in nl.lines:
m = count_pat.search(line)
if m:
try:
counts.append(int(m.group(1)))
except ValueError:
pass
if len(counts) >= 2:
best_fps = float(counts[-1] - counts[0]) / max(len(counts) - 1, 1)
if best_fps is not None and best_fps < min_fps:
below.append(f"node_{nl.node_id}={best_fps:.1f}")
if not below:
return AssertionResult(
name="frame_rate_above", passed=True,
message=f"All nodes meet minimum {min_fps} fps",
severity=0,
)
return AssertionResult(
name="frame_rate_above", passed=False,
message=f"Nodes below {min_fps} fps: {', '.join(below)}",
severity=1,
)
def assert_max_boot_time(logs: List[NodeLog], max_seconds: float = 10.0) -> AssertionResult:
"""All nodes boot within N seconds (based on timestamp in log)."""
boot_time_pat = re.compile(r"\((\d+)\)\s", re.IGNORECASE)
slow: List[str] = []
for nl in logs:
boot_found = False
for line in nl.lines:
if any(re.search(p, line) for p in _BOOT_PATTERNS):
boot_found = True
m = boot_time_pat.search(line)
if m:
ms = int(m.group(1))
if ms > max_seconds * 1000:
slow.append(f"node_{nl.node_id}={ms}ms")
break
if not boot_found:
slow.append(f"node_{nl.node_id}=no_boot")
if not slow:
return AssertionResult(
name="max_boot_time", passed=True,
message=f"All nodes booted within {max_seconds}s",
severity=0,
)
return AssertionResult(
name="max_boot_time", passed=False,
message=f"Slow/missing boot: {', '.join(slow)}",
severity=1,
)
def assert_no_heap_errors(logs: List[NodeLog]) -> AssertionResult:
"""No OOM/heap errors in any log."""
errors: List[str] = []
for nl in logs:
for line in nl.lines:
for pat in _HEAP_PATTERNS:
if re.search(pat, line, re.IGNORECASE):
errors.append(f"node_{nl.node_id}: {line.strip()[:100]}")
break
if errors and errors[-1].startswith(f"node_{nl.node_id}"):
break
if not errors:
return AssertionResult(
name="no_heap_errors", passed=True,
message="No heap errors in any node",
severity=0,
)
return AssertionResult(
name="no_heap_errors", passed=False,
message=f"Heap errors: {errors[0]}" + (
f" (+{len(errors)-1} more)" if len(errors) > 1 else ""
),
severity=2,
)
# ---------------------------------------------------------------------------
# Assertion registry & dispatcher
# ---------------------------------------------------------------------------
ASSERTION_REGISTRY: Dict[str, Any] = {
"all_nodes_boot": assert_all_nodes_boot,
"no_crashes": assert_no_crashes,
"tdm_no_collision": assert_tdm_no_collision,
"all_nodes_produce_frames": assert_all_nodes_produce_frames,
"coordinator_receives_from_all": assert_coordinator_receives_from_all,
"frame_rate_above": assert_frame_rate_above,
"max_boot_time": assert_max_boot_time,
"no_heap_errors": assert_no_heap_errors,
# fall_detected is parameterized, handled separately
}
def _parse_assertion_spec(spec: Any) -> tuple:
"""Parse a YAML assertion entry into (name, kwargs).
Supported forms:
- "all_nodes_boot" -> ("all_nodes_boot", {})
- {"frame_rate_above": 15} -> ("frame_rate_above", {"min_fps": 15})
- "fall_detected_by_node_2" -> ("fall_detected", {"node_id": 2})
- {"max_boot_time_s": 10} -> ("max_boot_time", {"max_seconds": 10})
"""
if isinstance(spec, str):
# Check for fall_detected_by_node_N pattern
m = re.match(r"fall_detected_by_node_(\d+)", spec)
if m:
return ("fall_detected", {"node_id": int(m.group(1))})
return (spec, {})
if isinstance(spec, dict):
for key, val in spec.items():
m = re.match(r"fall_detected_by_node_(\d+)", str(key))
if m:
return ("fall_detected", {"node_id": int(m.group(1))})
if key == "frame_rate_above":
return ("frame_rate_above", {"min_fps": float(val)})
if key == "max_boot_time_s":
return ("max_boot_time", {"max_seconds": float(val)})
if key == "coordinator_receives_from_all":
return ("coordinator_receives_from_all", {})
return (str(key), {})
return (str(spec), {})
def run_assertions(
logs: List[NodeLog],
assertion_specs: List[Any],
config: Optional[Dict] = None,
) -> List[AssertionResult]:
"""Run all requested assertions against loaded logs."""
results: List[AssertionResult] = []
# Derive coordinator/sensor IDs from config if available
coordinator_id = 0
sensor_ids: Optional[List[int]] = None
if config and "nodes" in config:
for node_def in config["nodes"]:
if node_def.get("role") == "coordinator":
coordinator_id = node_def.get("node_id", 0)
sensor_ids = [
n["node_id"] for n in config["nodes"]
if n.get("role") == "sensor"
]
for spec in assertion_specs:
name, kwargs = _parse_assertion_spec(spec)
if name == "fall_detected":
results.append(assert_fall_detected(logs, **kwargs))
elif name == "coordinator_receives_from_all":
results.append(assert_coordinator_receives_from_all(
logs, coordinator_id=coordinator_id, sensor_ids=sensor_ids,
))
elif name in ASSERTION_REGISTRY:
fn = ASSERTION_REGISTRY[name]
results.append(fn(logs, **kwargs))
else:
results.append(AssertionResult(
name=name, passed=False,
message=f"Unknown assertion: {name}",
severity=1,
))
return results
# ---------------------------------------------------------------------------
# Report printing
# ---------------------------------------------------------------------------
def print_report(results: List[AssertionResult], swarm_name: str = "") -> int:
"""Print the assertion report and return max severity."""
header = "QEMU Swarm Health Report (ADR-062)"
if swarm_name:
header += f" - {swarm_name}"
print()
print("=" * 60)
print(f" {header}")
print("=" * 60)
print()
max_sev = 0
for r in results:
if r.severity == 0:
icon = green("PASS")
elif r.severity == 1:
icon = yellow("WARN")
else:
icon = red("FAIL")
print(f" [{icon}] {r.name}: {r.message}")
max_sev = max(max_sev, r.severity)
print()
passed = sum(1 for r in results if r.passed)
total = len(results)
summary = f" {passed}/{total} assertions passed"
if max_sev == 0:
print(green(summary))
elif max_sev == 1:
print(yellow(summary + " (with warnings)"))
else:
print(red(summary + " (with failures)"))
print()
return max_sev
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="QEMU Swarm Health Oracle (ADR-062)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Example:\n"
" python3 swarm_health.py --config scripts/swarm_presets/standard.yaml \\\n"
" --log-dir build/swarm_logs/\n"
"\n"
" python3 swarm_health.py --log-dir build/swarm_logs/ \\\n"
" --assertions all_nodes_boot no_crashes\n"
"\n"
"Example output:\n"
" ============================================================\n"
" QEMU Swarm Health Report (ADR-062) - standard\n"
" ============================================================\n"
"\n"
" [PASS] all_nodes_boot: All 3 nodes booted (timeout=10.0s)\n"
" [PASS] no_crashes: No crash indicators in any node\n"
" [PASS] tdm_no_collision: TDM slots unique across 3 assignments\n"
" [PASS] all_nodes_produce_frames: All 3 nodes show frame activity\n"
" [PASS] coordinator_receives_from_all: Coordinator received from all\n"
" [WARN] fall_detected_node_2: Node 2 did not report fall event\n"
" [PASS] frame_rate_above: All nodes meet minimum 15.0 fps\n"
"\n"
" 6/7 assertions passed (with warnings)\n"
),
)
parser.add_argument(
"--config", type=str, default=None,
help="Path to swarm YAML config (defines nodes and assertions)",
)
parser.add_argument(
"--log-dir", type=str, required=True,
help="Directory containing node_0.log, node_1.log, etc.",
)
parser.add_argument(
"--assertions", nargs="*", default=None,
help="Override assertions (space-separated). Ignores YAML assertion list.",
)
parser.add_argument(
"--node-count", type=int, default=None,
help="Number of nodes (auto-detected from log files if omitted)",
)
args = parser.parse_args()
log_dir = Path(args.log_dir)
if not log_dir.is_dir():
print(f"ERROR: Log directory not found: {log_dir}", file=sys.stderr)
sys.exit(2)
# Load YAML config if provided
config: Optional[Dict] = None
swarm_name = ""
yaml_assertions: List[Any] = []
if args.config:
if yaml is None:
print("ERROR: PyYAML is required for --config. Install with: pip install pyyaml",
file=sys.stderr)
sys.exit(2)
config_path = Path(args.config)
if not config_path.exists():
print(f"ERROR: Config file not found: {config_path}", file=sys.stderr)
sys.exit(2)
with open(config_path, "r") as f:
config = yaml.safe_load(f)
swarm_name = config.get("swarm", {}).get("name", "")
yaml_assertions = config.get("assertions", [])
# Determine node count
if args.node_count is not None:
node_count = args.node_count
elif config and "nodes" in config:
node_count = len(config["nodes"])
else:
node_count = _node_count_from_dir(log_dir)
if node_count == 0:
print("ERROR: No node logs found and node count not specified.", file=sys.stderr)
sys.exit(2)
# Load logs
logs = load_logs(log_dir, node_count)
# Determine which assertions to run
if args.assertions is not None:
assertion_specs = args.assertions
elif yaml_assertions:
assertion_specs = yaml_assertions
else:
# Default set
assertion_specs = ["all_nodes_boot", "no_crashes", "no_heap_errors"]
# Run assertions
results = run_assertions(logs, assertion_specs, config)
# Print report and exit
max_sev = print_report(results, swarm_name)
sys.exit(max_sev)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,31 @@
# CI-optimized preset: 3 nodes, star topology, 30s, minimal assertions
swarm:
name: ci-matrix
duration_s: 30
topology: star
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 1
- role: sensor
node_id: 1
scenario: 1
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 2
channel: 6
tdm_slot: 2
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- max_boot_time_s: 10

View File

@ -0,0 +1,49 @@
# Mixed scenarios: 5 nodes with different CSI scenarios, star topology, 90s
swarm:
name: heterogeneous
duration_s: 90
topology: star
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 2
is_gateway: true
- role: sensor
node_id: 1
scenario: 1
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 2
channel: 6
tdm_slot: 2
- role: sensor
node_id: 3
scenario: 3
channel: 6
tdm_slot: 3
- role: sensor
node_id: 4
scenario: 5
channel: 11
tdm_slot: 4
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- coordinator_receives_from_all
- fall_detected_by_node_3
- no_heap_errors
- frame_rate_above: 12
- max_boot_time_s: 12

View File

@ -0,0 +1,54 @@
# Scale test: 6 fully-connected nodes in mesh topology, 90s
swarm:
name: large-mesh
duration_s: 90
topology: mesh
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 2
is_gateway: true
- role: sensor
node_id: 1
scenario: 1
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 2
channel: 6
tdm_slot: 2
- role: sensor
node_id: 3
scenario: 3
channel: 6
tdm_slot: 3
- role: sensor
node_id: 4
scenario: 4
channel: 6
tdm_slot: 4
- role: sensor
node_id: 5
scenario: 5
channel: 6
tdm_slot: 5
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- coordinator_receives_from_all
- no_heap_errors
- frame_rate_above: 10
- max_boot_time_s: 15

View File

@ -0,0 +1,39 @@
# Multi-hop relay chain: 4 nodes in line topology, 60s
swarm:
name: line-relay
duration_s: 60
topology: line
aggregator_port: 5005
nodes:
- role: gateway
node_id: 0
scenario: 0
channel: 6
edge_tier: 2
is_gateway: true
- role: coordinator
node_id: 1
scenario: 0
channel: 6
edge_tier: 1
- role: sensor
node_id: 2
scenario: 2
channel: 6
tdm_slot: 2
- role: sensor
node_id: 3
scenario: 1
channel: 6
tdm_slot: 3
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- max_boot_time_s: 12

View File

@ -0,0 +1,41 @@
# Ring topology with fault injection: 4 nodes, 75s
swarm:
name: ring-fault
duration_s: 75
topology: ring
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 2
is_gateway: true
- role: sensor
node_id: 1
scenario: 1
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 2
channel: 6
tdm_slot: 2
- role: sensor
node_id: 3
scenario: 3
channel: 6
tdm_slot: 3
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- coordinator_receives_from_all
- no_heap_errors
- max_boot_time_s: 12

View File

@ -0,0 +1,24 @@
# Quick CI smoke test: 2 nodes, star topology, 15s duration
swarm:
name: smoke
duration_s: 15
topology: star
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 1
- role: sensor
node_id: 1
scenario: 1
channel: 6
tdm_slot: 1
assertions:
- all_nodes_boot
- no_crashes
- max_boot_time_s: 10

View File

@ -0,0 +1,36 @@
# Standard 3-node test: 2 sensors + 1 coordinator, star topology, 60s
swarm:
name: standard
duration_s: 60
topology: star
aggregator_port: 5005
nodes:
- role: coordinator
node_id: 0
scenario: 0
channel: 6
edge_tier: 2
is_gateway: true
- role: sensor
node_id: 1
scenario: 2
channel: 6
tdm_slot: 1
- role: sensor
node_id: 2
scenario: 3
channel: 6
tdm_slot: 2
assertions:
- all_nodes_boot
- no_crashes
- tdm_no_collision
- all_nodes_produce_frames
- coordinator_receives_from_all
- fall_detected_by_node_2
- frame_rate_above: 15
- max_boot_time_s: 10