fix(swarm): resolve 16 bugs from deep review of ADR-062

CRITICAL:
- Delete stale nvs_provision.bin before provisioning each node
- Fix log filename mismatch: swarm_health.py now finds qemu_node{i}.log
  with node_{i}.log fallback
- CI swarm-test job builds firmware instead of downloading missing artifact
- Accept both qemu_flash.bin and qemu_flash_base.bin as base image

HIGH:
- Replace broad "heap" substring match with precise regex patterns
  (HEAP_ERROR, heap_caps_alloc.*failed, etc.) to avoid false positives
- Guard os.geteuid() with hasattr for Windows compatibility
- Offset SLIRP ports by +100 to avoid collision with aggregator on 5005
- Assertions now WARN (not vacuous PASS) when no parseable data found

MEDIUM:
- Mark network_partitioned_recovery as "(future)" in ADR-062
- Fix node_id prefix dedup bug (node_1 no longer matches node_10)
- Add duplication note in qemu_swarm.py pointing to swarm_health.py
- Document implicit TDM auto-assignment in ADR YAML schema
- swarm_health.py only checks sensor nodes for frame production
- Fix channel 0 treated as falsy

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-03-14 12:36:25 -04:00
parent a8f5276d9b
commit 21ec163941
4 changed files with 100 additions and 39 deletions

View File

@ -305,7 +305,7 @@ jobs:
# ---------------------------------------------------------------------------
swarm-test:
name: Swarm Test (ADR-062)
needs: [build-qemu, qemu-test]
needs: [build-qemu]
runs-on: ubuntu-latest
container:
image: espressif/idf:v5.4
@ -322,22 +322,28 @@ jobs:
- name: Make QEMU executable
run: chmod +x ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa
- name: Download firmware build artifacts
uses: actions/download-artifact@v4
with:
name: qemu-logs-default
path: ${{ github.workspace }}/firmware-artifacts
- name: Install Python dependencies
run: pip install pyyaml esptool esp-idf-nvs-partition-gen
- name: Build firmware for swarm
working-directory: firmware/esp32-csi-node
run: |
. $IDF_PATH/export.sh
idf.py set-target esp32s3
idf.py -D SDKCONFIG_DEFAULTS="sdkconfig.defaults;sdkconfig.qemu" build
python3 -m esptool --chip esp32s3 merge_bin \
-o build/qemu_flash.bin \
--flash_mode dio --flash_freq 80m --flash_size 8MB \
0x0 build/bootloader/bootloader.bin \
0x8000 build/partition_table/partition-table.bin \
0x20000 build/esp32-csi-node.bin
- name: Run swarm smoke test
run: |
python3 scripts/qemu_swarm.py --preset ci_matrix \
--qemu-path ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa \
--skip-build \
--output-dir build/swarm-results
timeout-minutes: 5
timeout-minutes: 10
- name: Upload swarm results
if: always()

View File

@ -92,7 +92,7 @@ nodes:
node_id: 1
scenario: 2 # walking person
channel: 6
tdm_slot: 1
tdm_slot: 1 # TDM slot index (auto-assigned from node position if omitted)
- role: sensor
node_id: 2
@ -141,7 +141,7 @@ assertions:
| `frame_rate_above` | Each node produces at least N frames/second |
| `max_boot_time_s` | All nodes boot within N seconds |
| `no_heap_errors` | No OOM or heap corruption in any log |
| `network_partitioned_recovery` | After deliberate partition, nodes resume communication |
| `network_partitioned_recovery` | After deliberate partition, nodes resume communication (future) |
### Preset Configurations

View File

@ -246,12 +246,19 @@ def provision_node(
nvs_bin = build_dir / f"nvs_node{node.node_id}.bin"
flash_image = build_dir / f"qemu_flash_node{node.node_id}.bin"
base_image = build_dir / "qemu_flash_base.bin"
if not base_image.exists():
base_image = build_dir / "qemu_flash.bin"
if not base_image.exists():
fatal(f"Base flash image not found: {base_image}")
fatal(f"Base flash image not found: {build_dir / 'qemu_flash_base.bin'} or {build_dir / 'qemu_flash.bin'}")
fatal("Build the firmware first, or run without --skip-build.")
sys.exit(EXIT_FATAL)
# Remove stale nvs_provision.bin to prevent race with prior node
stale = build_dir / "nvs_provision.bin"
if stale.exists():
stale.unlink()
# Build provision.py arguments
args = [
sys.executable, str(PROVISION_SCRIPT),
@ -264,7 +271,7 @@ def provision_node(
"--target-port", str(aggregator_port),
]
if node.channel:
if node.channel is not None:
args.extend(["--channel", str(node.channel)])
if node.edge_tier:
@ -332,7 +339,7 @@ def setup_network(cfg: SwarmConfig, net: NetworkState) -> Dict[int, List[str]]:
n = len(cfg.nodes)
# Check if we can use TAP/bridge (requires root on Linux)
can_tap = IS_LINUX and os.geteuid() == 0
can_tap = IS_LINUX and hasattr(os, 'geteuid') and os.geteuid() == 0
if not can_tap:
if IS_LINUX:
@ -345,7 +352,7 @@ def setup_network(cfg: SwarmConfig, net: NetworkState) -> Dict[int, List[str]]:
for node in cfg.nodes:
node_net_args[node.node_id] = [
"-nic", f"user,id=net{node.node_id},"
f"hostfwd=udp::{cfg.aggregator_port + node.node_id}"
f"hostfwd=udp::{cfg.aggregator_port + 100 + node.node_id}"
f"-:{cfg.aggregator_port}",
]
return node_net_args
@ -528,6 +535,10 @@ def run_assertions(
and inline checks for swarm-specific assertions.
Returns exit code: 0=PASS, 1=WARN, 2=FAIL, 3=FATAL.
NOTE: These inline assertions duplicate swarm_health.py. A future refactor
should delegate to swarm_health.run_assertions() to avoid divergence.
See ADR-062 architecture diagram.
"""
n_nodes = len(cfg.nodes)
worst = EXIT_PASS
@ -648,38 +659,63 @@ def run_assertions(
elif assert_name == "frame_rate_above":
min_rate = int(assert_param) if assert_param else 10
all_ok = True
nodes_with_data = 0
for nid, log in logs.items():
m = re.search(r"frame[_ ]?rate[=: ]+([\d.]+)", log, re.IGNORECASE)
if m:
nodes_with_data += 1
rate = float(m.group(1))
if rate < min_rate:
all_ok = False
_check(f"frame_rate_above({min_rate})",
all_ok,
f"All nodes >= {min_rate} Hz",
f"Some nodes below {min_rate} Hz",
EXIT_WARN)
if nodes_with_data == 0:
_check(f"frame_rate_above({min_rate})",
False,
"",
"No parseable frame rate data found in any node log",
EXIT_WARN)
else:
_check(f"frame_rate_above({min_rate})",
all_ok,
f"All nodes >= {min_rate} Hz",
f"Some nodes below {min_rate} Hz",
EXIT_WARN)
elif assert_name == "max_boot_time_s":
max_s = int(assert_param) if assert_param else 10
all_ok = True
nodes_with_data = 0
for nid, log in logs.items():
m = re.search(r"boot[_ ]?time[=: ]+([\d.]+)", log, re.IGNORECASE)
if m:
nodes_with_data += 1
bt = float(m.group(1))
if bt > max_s:
all_ok = False
_check(f"max_boot_time_s({max_s})",
all_ok,
f"All nodes booted within {max_s}s",
f"Some nodes exceeded {max_s}s boot time",
EXIT_WARN)
if nodes_with_data == 0:
_check(f"max_boot_time_s({max_s})",
False,
"",
"No parseable boot time data found in any node log",
EXIT_WARN)
else:
_check(f"max_boot_time_s({max_s})",
all_ok,
f"All nodes booted within {max_s}s",
f"Some nodes exceeded {max_s}s boot time",
EXIT_WARN)
elif assert_name == "no_heap_errors":
heap_pats = ["heap", "OOM", "out of memory", "heap corruption"]
heap_pats = [
r"HEAP_ERROR",
r"heap_caps_alloc.*failed",
r"out of memory",
r"heap corruption",
r"CORRUPT HEAP",
r"malloc.*fail",
]
found_in = [
nid for nid, log in logs.items()
if any(pat.lower() in log.lower() for pat in heap_pats)
if any(re.search(pat, log, re.IGNORECASE) for pat in heap_pats)
]
_check("no_heap_errors",
len(found_in) == 0,
@ -943,11 +979,12 @@ class SwarmOrchestrator:
fatal(f"QEMU binary timed out: {self.qemu_bin}")
sys.exit(EXIT_FATAL)
# Check base flash image
# Check base flash image (accept either name)
base = self.build_dir / "qemu_flash_base.bin"
if not base.exists():
alt_base = self.build_dir / "qemu_flash.bin"
if not base.exists() and not alt_base.exists():
if self.skip_build:
fatal(f"Base flash image not found: {base}")
fatal(f"Base flash image not found: {base} or {alt_base}")
fatal("Build the firmware first, or run without --skip-build.")
sys.exit(EXIT_FATAL)
else:

View File

@ -72,10 +72,12 @@ class NodeLog:
# ---------------------------------------------------------------------------
def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]:
"""Load node_0.log .. node_{n-1}.log from *log_dir*."""
"""Load qemu_node{i}.log (or node_{i}.log fallback) from *log_dir*."""
logs: List[NodeLog] = []
for i in range(node_count):
path = log_dir / f"node_{i}.log"
path = log_dir / f"qemu_node{i}.log"
if not path.exists():
path = log_dir / f"node_{i}.log"
if path.exists():
text = path.read_text(encoding="utf-8", errors="replace")
else:
@ -85,9 +87,9 @@ def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]:
def _node_count_from_dir(log_dir: Path) -> int:
"""Auto-detect node count by scanning for node_*.log files."""
"""Auto-detect node count by scanning for qemu_node*.log (or node_*.log) files."""
count = 0
while (log_dir / f"node_{count}.log").exists():
while (log_dir / f"qemu_node{count}.log").exists() or (log_dir / f"node_{count}.log").exists():
count += 1
return count
@ -152,7 +154,7 @@ def assert_no_crashes(logs: List[NodeLog]) -> AssertionResult:
if re.search(pat, line):
crashed.append(f"node_{nl.node_id}: {line.strip()[:100]}")
break
if crashed and crashed[-1].startswith(f"node_{nl.node_id}"):
if crashed and crashed[-1].startswith(f"node_{nl.node_id}:"):
break # one crash per node is enough
if not crashed:
@ -206,10 +208,21 @@ def assert_tdm_no_collision(logs: List[NodeLog]) -> AssertionResult:
)
def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult:
"""Each sensor node has CSI frame output."""
def assert_all_nodes_produce_frames(
logs: List[NodeLog],
sensor_ids: Optional[List[int]] = None,
) -> AssertionResult:
"""Each sensor node has CSI frame output.
Args:
logs: Parsed node logs.
sensor_ids: If provided, only check these node IDs (skip coordinators).
If None, check all nodes (legacy behavior).
"""
silent: List[int] = []
for nl in logs:
if sensor_ids is not None and nl.node_id not in sensor_ids:
continue
found = any(
re.search(p, line, re.IGNORECASE)
for line in nl.lines for p in _FRAME_PATTERNS
@ -217,10 +230,11 @@ def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult:
if not found:
silent.append(nl.node_id)
checked = len(sensor_ids) if sensor_ids is not None else len(logs)
if not silent:
return AssertionResult(
name="all_nodes_produce_frames", passed=True,
message=f"All {len(logs)} nodes show frame activity",
message=f"All {checked} checked nodes show frame activity",
severity=0,
)
return AssertionResult(
@ -390,7 +404,7 @@ def assert_no_heap_errors(logs: List[NodeLog]) -> AssertionResult:
if re.search(pat, line, re.IGNORECASE):
errors.append(f"node_{nl.node_id}: {line.strip()[:100]}")
break
if errors and errors[-1].startswith(f"node_{nl.node_id}"):
if errors and errors[-1].startswith(f"node_{nl.node_id}:"):
break
if not errors:
@ -486,6 +500,10 @@ def run_assertions(
results.append(assert_coordinator_receives_from_all(
logs, coordinator_id=coordinator_id, sensor_ids=sensor_ids,
))
elif name == "all_nodes_produce_frames":
results.append(assert_all_nodes_produce_frames(
logs, sensor_ids=sensor_ids, **kwargs,
))
elif name in ASSERTION_REGISTRY:
fn = ASSERTION_REGISTRY[name]
results.append(fn(logs, **kwargs))