"""Tests for ESP32BinaryParser (ADR-018 binary frame format).""" import asyncio import math import socket import struct import threading import time import numpy as np import pytest import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src')) from hardware.csi_extractor import ( ESP32BinaryParser, CSIExtractor, CSIParseError, CSIExtractionError, SyncPacket, SyncPacketParser, ) # ADR-018 constants MAGIC = 0xC5110001 # ADR-110: bytes 18-19 are now PPDU type + flags (used to be `2x` reserved). # Pre-ADR-110 firmware sends zeros for both, which round-trip as # ('ht_legacy', flags=all-false) — fully backwards compatible. HEADER_FMT = ' bytes: """Build an ADR-018 binary frame for testing.""" if iq_pairs is None: iq_pairs = [(i % 50, (i * 2) % 50) for i in range(n_antennas * n_subcarriers)] rssi_u8 = rssi & 0xFF noise_u8 = noise_floor & 0xFF header = struct.pack( HEADER_FMT, MAGIC, node_id, n_antennas, n_subcarriers, freq_mhz, sequence, rssi_u8, noise_u8, ppdu_byte, flags_byte, ) iq_data = b'' for i_val, q_val in iq_pairs: iq_data += struct.pack(' sqrt(9+16) = 5.0 assert abs(result.amplitude[0, 0] - 5.0) < 0.001 # I=0, Q=10 -> 10.0 assert abs(result.amplitude[0, 1] - 10.0) < 0.001 def test_parse_frame_too_short(self): """Reject frames shorter than the 20-byte header.""" with pytest.raises(CSIParseError, match="too short"): self.parser.parse(b'\x00' * 10) def test_parse_invalid_magic(self): """Reject frames with wrong magic number.""" bad_frame = build_binary_frame() # Corrupt magic bad_frame = b'\xFF\xFF\xFF\xFF' + bad_frame[4:] with pytest.raises(CSIParseError, match="Invalid magic"): self.parser.parse(bad_frame) def test_parse_multi_antenna_frame(self): """Parse a frame with 3 antennas and 4 subcarriers.""" n_ant = 3 n_sc = 4 iq = [(i + 1, i + 2) for i in range(n_ant * n_sc)] frame_bytes = build_binary_frame( node_id=5, n_antennas=n_ant, n_subcarriers=n_sc, iq_pairs=iq, ) result = self.parser.parse(frame_bytes) assert result.num_antennas == 3 assert result.num_subcarriers == 4 assert result.amplitude.shape == (3, 4) assert result.phase.shape == (3, 4) def test_udp_read_with_mock_server(self): """Send a frame via UDP and verify CSIExtractor receives it.""" # Find a free port sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('127.0.0.1', 0)) port = sock.getsockname()[1] sock.close() frame_bytes = build_binary_frame( node_id=3, n_antennas=1, n_subcarriers=4, freq_mhz=2412, sequence=99, ) config = { 'hardware_type': 'esp32', 'parser_format': 'binary', 'sampling_rate': 100, 'buffer_size': 2048, 'timeout': 2, 'aggregator_host': '127.0.0.1', 'aggregator_port': port, } extractor = CSIExtractor(config) async def run_test(): # Connect await extractor.connect() # Send frame after a short delay from a background thread def send(): time.sleep(0.2) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.sendto(frame_bytes, ('127.0.0.1', port)) s.close() sender = threading.Thread(target=send, daemon=True) sender.start() result = await extractor.extract_csi() sender.join(timeout=2) assert result.metadata['node_id'] == 3 assert result.metadata['sequence'] == 99 assert result.num_subcarriers == 4 await extractor.disconnect() asyncio.run(run_test()) def test_udp_timeout(self): """Verify timeout when no UDP server is sending data.""" # Find a free port (nothing will send to it) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(('127.0.0.1', 0)) port = sock.getsockname()[1] sock.close() config = { 'hardware_type': 'esp32', 'parser_format': 'binary', 'sampling_rate': 100, 'buffer_size': 2048, 'timeout': 0.5, 'retry_attempts': 1, 'aggregator_host': '127.0.0.1', 'aggregator_port': port, } extractor = CSIExtractor(config) async def run_test(): await extractor.connect() with pytest.raises(CSIExtractionError, match="timed out"): await extractor.extract_csi() await extractor.disconnect() asyncio.run(run_test()) # ============================================================================ # ADR-110 §A0.12 — SyncPacket / SyncPacketParser tests (firmware v0.6.9+) # ============================================================================ SYNC_MAGIC = 0xC511A110 SYNC_SIZE = 32 SYNC_FMT = ' bytes: flags = 0 if is_leader: flags |= 0x01 if is_valid: flags |= 0x02 if smoothed_used: flags |= 0x04 return struct.pack( SYNC_FMT, SYNC_MAGIC, node_id, proto_ver, flags, 0, local_us, epoch_us, sequence, ) class TestSyncPacketParser: """ADR-110 §A0.12: 32-byte UDP sync packet (magic 0xC511A110).""" def test_follower_typical_packet_roundtrips(self): """Match the COM9-witnessed sync-pkt #1 byte-for-byte.""" raw = build_sync_packet( node_id=9, is_leader=False, is_valid=True, smoothed_used=True, local_us=28798450, epoch_us=27634885, sequence=20, ) assert len(raw) == SYNC_SIZE pkt = SyncPacketParser.parse(raw) assert isinstance(pkt, SyncPacket) assert pkt.node_id == 9 assert pkt.proto_ver == 1 assert pkt.is_leader is False assert pkt.is_valid is True assert pkt.smoothed_used is True assert pkt.local_us == 28798450 assert pkt.epoch_us == 27634885 assert pkt.sequence == 20 # The 1.16-second boot delta from §A0.10 should be recoverable assert pkt.local_us - pkt.epoch_us == 1163565 def test_leader_packet_has_local_close_to_epoch(self): """COM12 (leader) had flags=0x03 and epoch ≈ local.""" raw = build_sync_packet( node_id=12, is_leader=True, is_valid=True, smoothed_used=False, local_us=28864932, epoch_us=28864939, sequence=20, ) pkt = SyncPacketParser.parse(raw) assert pkt.node_id == 12 assert pkt.is_leader is True assert pkt.is_valid is True assert pkt.smoothed_used is False assert pkt.flags_raw == 0x03 assert pkt.local_us - pkt.epoch_us == -7 # leader has zero offset def test_magic_mismatch_raises(self): """A non-sync datagram must not silently decode.""" raw = bytearray(build_sync_packet()) raw[0] = 0x01 # corrupt magic low byte with pytest.raises(CSIParseError, match="magic mismatch"): SyncPacketParser.parse(bytes(raw)) def test_short_packet_raises(self): """Below 32 bytes must error early, not silently truncate.""" raw = build_sync_packet()[:16] with pytest.raises(CSIParseError, match="too short"): SyncPacketParser.parse(raw) def test_all_flag_combinations(self): """Each flag bit decodes independently.""" for is_leader in (False, True): for is_valid in (False, True): for smoothed_used in (False, True): raw = build_sync_packet( is_leader=is_leader, is_valid=is_valid, smoothed_used=smoothed_used, ) pkt = SyncPacketParser.parse(raw) assert pkt.is_leader == is_leader assert pkt.is_valid == is_valid assert pkt.smoothed_used == smoothed_used def test_dispatch_distinguishes_csi_from_sync(self): """A host can pick CSI vs sync by leading magic.""" csi_magic = struct.unpack_from('