diff --git a/.github/workflows/firmware-qemu.yml b/.github/workflows/firmware-qemu.yml index 55b71215..69ef8b16 100644 --- a/.github/workflows/firmware-qemu.yml +++ b/.github/workflows/firmware-qemu.yml @@ -305,7 +305,7 @@ jobs: # --------------------------------------------------------------------------- swarm-test: name: Swarm Test (ADR-062) - needs: [build-qemu, qemu-test] + needs: [build-qemu] runs-on: ubuntu-latest container: image: espressif/idf:v5.4 @@ -322,22 +322,28 @@ jobs: - name: Make QEMU executable run: chmod +x ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa - - name: Download firmware build artifacts - uses: actions/download-artifact@v4 - with: - name: qemu-logs-default - path: ${{ github.workspace }}/firmware-artifacts - - name: Install Python dependencies run: pip install pyyaml esptool esp-idf-nvs-partition-gen + - name: Build firmware for swarm + working-directory: firmware/esp32-csi-node + run: | + . $IDF_PATH/export.sh + idf.py set-target esp32s3 + idf.py -D SDKCONFIG_DEFAULTS="sdkconfig.defaults;sdkconfig.qemu" build + python3 -m esptool --chip esp32s3 merge_bin \ + -o build/qemu_flash.bin \ + --flash_mode dio --flash_freq 80m --flash_size 8MB \ + 0x0 build/bootloader/bootloader.bin \ + 0x8000 build/partition_table/partition-table.bin \ + 0x20000 build/esp32-csi-node.bin + - name: Run swarm smoke test run: | python3 scripts/qemu_swarm.py --preset ci_matrix \ --qemu-path ${{ github.workspace }}/qemu-build/bin/qemu-system-xtensa \ - --skip-build \ --output-dir build/swarm-results - timeout-minutes: 5 + timeout-minutes: 10 - name: Upload swarm results if: always() diff --git a/docs/adr/ADR-062-qemu-swarm-configurator.md b/docs/adr/ADR-062-qemu-swarm-configurator.md index 85ea37bf..a24d3ca0 100644 --- a/docs/adr/ADR-062-qemu-swarm-configurator.md +++ b/docs/adr/ADR-062-qemu-swarm-configurator.md @@ -92,7 +92,7 @@ nodes: node_id: 1 scenario: 2 # walking person channel: 6 - tdm_slot: 1 + tdm_slot: 1 # TDM slot index (auto-assigned from node position if omitted) - role: sensor node_id: 2 @@ -141,7 +141,7 @@ assertions: | `frame_rate_above` | Each node produces at least N frames/second | | `max_boot_time_s` | All nodes boot within N seconds | | `no_heap_errors` | No OOM or heap corruption in any log | -| `network_partitioned_recovery` | After deliberate partition, nodes resume communication | +| `network_partitioned_recovery` | After deliberate partition, nodes resume communication (future) | ### Preset Configurations diff --git a/scripts/qemu_swarm.py b/scripts/qemu_swarm.py index 352f5716..9cdc2883 100644 --- a/scripts/qemu_swarm.py +++ b/scripts/qemu_swarm.py @@ -246,12 +246,19 @@ def provision_node( nvs_bin = build_dir / f"nvs_node{node.node_id}.bin" flash_image = build_dir / f"qemu_flash_node{node.node_id}.bin" base_image = build_dir / "qemu_flash_base.bin" + if not base_image.exists(): + base_image = build_dir / "qemu_flash.bin" if not base_image.exists(): - fatal(f"Base flash image not found: {base_image}") + fatal(f"Base flash image not found: {build_dir / 'qemu_flash_base.bin'} or {build_dir / 'qemu_flash.bin'}") fatal("Build the firmware first, or run without --skip-build.") sys.exit(EXIT_FATAL) + # Remove stale nvs_provision.bin to prevent race with prior node + stale = build_dir / "nvs_provision.bin" + if stale.exists(): + stale.unlink() + # Build provision.py arguments args = [ sys.executable, str(PROVISION_SCRIPT), @@ -264,7 +271,7 @@ def provision_node( "--target-port", str(aggregator_port), ] - if node.channel: + if node.channel is not None: args.extend(["--channel", str(node.channel)]) if node.edge_tier: @@ -332,7 +339,7 @@ def setup_network(cfg: SwarmConfig, net: NetworkState) -> Dict[int, List[str]]: n = len(cfg.nodes) # Check if we can use TAP/bridge (requires root on Linux) - can_tap = IS_LINUX and os.geteuid() == 0 + can_tap = IS_LINUX and hasattr(os, 'geteuid') and os.geteuid() == 0 if not can_tap: if IS_LINUX: @@ -345,7 +352,7 @@ def setup_network(cfg: SwarmConfig, net: NetworkState) -> Dict[int, List[str]]: for node in cfg.nodes: node_net_args[node.node_id] = [ "-nic", f"user,id=net{node.node_id}," - f"hostfwd=udp::{cfg.aggregator_port + node.node_id}" + f"hostfwd=udp::{cfg.aggregator_port + 100 + node.node_id}" f"-:{cfg.aggregator_port}", ] return node_net_args @@ -528,6 +535,10 @@ def run_assertions( and inline checks for swarm-specific assertions. Returns exit code: 0=PASS, 1=WARN, 2=FAIL, 3=FATAL. + + NOTE: These inline assertions duplicate swarm_health.py. A future refactor + should delegate to swarm_health.run_assertions() to avoid divergence. + See ADR-062 architecture diagram. """ n_nodes = len(cfg.nodes) worst = EXIT_PASS @@ -648,38 +659,63 @@ def run_assertions( elif assert_name == "frame_rate_above": min_rate = int(assert_param) if assert_param else 10 all_ok = True + nodes_with_data = 0 for nid, log in logs.items(): m = re.search(r"frame[_ ]?rate[=: ]+([\d.]+)", log, re.IGNORECASE) if m: + nodes_with_data += 1 rate = float(m.group(1)) if rate < min_rate: all_ok = False - _check(f"frame_rate_above({min_rate})", - all_ok, - f"All nodes >= {min_rate} Hz", - f"Some nodes below {min_rate} Hz", - EXIT_WARN) + if nodes_with_data == 0: + _check(f"frame_rate_above({min_rate})", + False, + "", + "No parseable frame rate data found in any node log", + EXIT_WARN) + else: + _check(f"frame_rate_above({min_rate})", + all_ok, + f"All nodes >= {min_rate} Hz", + f"Some nodes below {min_rate} Hz", + EXIT_WARN) elif assert_name == "max_boot_time_s": max_s = int(assert_param) if assert_param else 10 all_ok = True + nodes_with_data = 0 for nid, log in logs.items(): m = re.search(r"boot[_ ]?time[=: ]+([\d.]+)", log, re.IGNORECASE) if m: + nodes_with_data += 1 bt = float(m.group(1)) if bt > max_s: all_ok = False - _check(f"max_boot_time_s({max_s})", - all_ok, - f"All nodes booted within {max_s}s", - f"Some nodes exceeded {max_s}s boot time", - EXIT_WARN) + if nodes_with_data == 0: + _check(f"max_boot_time_s({max_s})", + False, + "", + "No parseable boot time data found in any node log", + EXIT_WARN) + else: + _check(f"max_boot_time_s({max_s})", + all_ok, + f"All nodes booted within {max_s}s", + f"Some nodes exceeded {max_s}s boot time", + EXIT_WARN) elif assert_name == "no_heap_errors": - heap_pats = ["heap", "OOM", "out of memory", "heap corruption"] + heap_pats = [ + r"HEAP_ERROR", + r"heap_caps_alloc.*failed", + r"out of memory", + r"heap corruption", + r"CORRUPT HEAP", + r"malloc.*fail", + ] found_in = [ nid for nid, log in logs.items() - if any(pat.lower() in log.lower() for pat in heap_pats) + if any(re.search(pat, log, re.IGNORECASE) for pat in heap_pats) ] _check("no_heap_errors", len(found_in) == 0, @@ -943,11 +979,12 @@ class SwarmOrchestrator: fatal(f"QEMU binary timed out: {self.qemu_bin}") sys.exit(EXIT_FATAL) - # Check base flash image + # Check base flash image (accept either name) base = self.build_dir / "qemu_flash_base.bin" - if not base.exists(): + alt_base = self.build_dir / "qemu_flash.bin" + if not base.exists() and not alt_base.exists(): if self.skip_build: - fatal(f"Base flash image not found: {base}") + fatal(f"Base flash image not found: {base} or {alt_base}") fatal("Build the firmware first, or run without --skip-build.") sys.exit(EXIT_FATAL) else: diff --git a/scripts/swarm_health.py b/scripts/swarm_health.py index 8d2a5974..770b4b67 100644 --- a/scripts/swarm_health.py +++ b/scripts/swarm_health.py @@ -72,10 +72,12 @@ class NodeLog: # --------------------------------------------------------------------------- def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]: - """Load node_0.log .. node_{n-1}.log from *log_dir*.""" + """Load qemu_node{i}.log (or node_{i}.log fallback) from *log_dir*.""" logs: List[NodeLog] = [] for i in range(node_count): - path = log_dir / f"node_{i}.log" + path = log_dir / f"qemu_node{i}.log" + if not path.exists(): + path = log_dir / f"node_{i}.log" if path.exists(): text = path.read_text(encoding="utf-8", errors="replace") else: @@ -85,9 +87,9 @@ def load_logs(log_dir: Path, node_count: int) -> List[NodeLog]: def _node_count_from_dir(log_dir: Path) -> int: - """Auto-detect node count by scanning for node_*.log files.""" + """Auto-detect node count by scanning for qemu_node*.log (or node_*.log) files.""" count = 0 - while (log_dir / f"node_{count}.log").exists(): + while (log_dir / f"qemu_node{count}.log").exists() or (log_dir / f"node_{count}.log").exists(): count += 1 return count @@ -152,7 +154,7 @@ def assert_no_crashes(logs: List[NodeLog]) -> AssertionResult: if re.search(pat, line): crashed.append(f"node_{nl.node_id}: {line.strip()[:100]}") break - if crashed and crashed[-1].startswith(f"node_{nl.node_id}"): + if crashed and crashed[-1].startswith(f"node_{nl.node_id}:"): break # one crash per node is enough if not crashed: @@ -206,10 +208,21 @@ def assert_tdm_no_collision(logs: List[NodeLog]) -> AssertionResult: ) -def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult: - """Each sensor node has CSI frame output.""" +def assert_all_nodes_produce_frames( + logs: List[NodeLog], + sensor_ids: Optional[List[int]] = None, +) -> AssertionResult: + """Each sensor node has CSI frame output. + + Args: + logs: Parsed node logs. + sensor_ids: If provided, only check these node IDs (skip coordinators). + If None, check all nodes (legacy behavior). + """ silent: List[int] = [] for nl in logs: + if sensor_ids is not None and nl.node_id not in sensor_ids: + continue found = any( re.search(p, line, re.IGNORECASE) for line in nl.lines for p in _FRAME_PATTERNS @@ -217,10 +230,11 @@ def assert_all_nodes_produce_frames(logs: List[NodeLog]) -> AssertionResult: if not found: silent.append(nl.node_id) + checked = len(sensor_ids) if sensor_ids is not None else len(logs) if not silent: return AssertionResult( name="all_nodes_produce_frames", passed=True, - message=f"All {len(logs)} nodes show frame activity", + message=f"All {checked} checked nodes show frame activity", severity=0, ) return AssertionResult( @@ -390,7 +404,7 @@ def assert_no_heap_errors(logs: List[NodeLog]) -> AssertionResult: if re.search(pat, line, re.IGNORECASE): errors.append(f"node_{nl.node_id}: {line.strip()[:100]}") break - if errors and errors[-1].startswith(f"node_{nl.node_id}"): + if errors and errors[-1].startswith(f"node_{nl.node_id}:"): break if not errors: @@ -486,6 +500,10 @@ def run_assertions( results.append(assert_coordinator_receives_from_all( logs, coordinator_id=coordinator_id, sensor_ids=sensor_ids, )) + elif name == "all_nodes_produce_frames": + results.append(assert_all_nodes_produce_frames( + logs, sensor_ids=sensor_ids, **kwargs, + )) elif name in ASSERTION_REGISTRY: fn = ASSERTION_REGISTRY[name] results.append(fn(logs, **kwargs))