feat(python): host-side decode for ADR-018 byte 18-19 (ADR-110)
Python ESP32BinaryParser was using struct format '<IBBHIIBB2x' — the
'2x' skipped bytes 18-19 as reserved. After the Rust-side decoder was
extended to surface PPDU type + flags, the Python pipeline (which
archive/v1 still uses for testing + the proof verifier) needs the same
update so its consumers see the HE metadata too.
csi_extractor.py:
- HEADER_FMT now '<IBBHIIBBBB' (captures bytes 18-19)
- New metadata fields: ppdu_type ('ht_legacy'|'he_su'|'he_mu'|'he_tb'|'unknown'),
ppdu_type_raw, he_capable, bw40, stbc, ldpc, ieee802154_sync_valid,
adr018_flags_raw
- Class constants PPDU_HT_LEGACY..PPDU_UNKNOWN mirror the firmware
test_esp32_binary_parser.py:
- build_binary_frame() takes optional ppdu_byte + flags_byte (default 0)
- New TestAdr110ByteEncoding class with 5 tests:
- Pre-ADR-110 zeros decode as 'ht_legacy' + all-flags-false
- HE-SU / HE-MU / HE-TB decode correctly
- 0xFF decodes as 'unknown'
- All-flags-set round-trip (0x1D)
11/11 parser tests pass (6 existing + 5 new). Backwards compat verified.
Pairs with the Rust-side decoder in commit 3959fabf3. Both pipelines now
read the same wire format produced by the C6 firmware's
CONFIG_CSI_FRAME_HE_TAGGING path.
Ref: ruvnet/RuView#762, draft PR #764
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
3959fabf31
commit
8eaa92cf21
|
|
@ -143,13 +143,28 @@ class ESP32BinaryParser:
|
||||||
12 4 Sequence number (LE u32)
|
12 4 Sequence number (LE u32)
|
||||||
16 1 RSSI (i8)
|
16 1 RSSI (i8)
|
||||||
17 1 Noise floor (i8)
|
17 1 Noise floor (i8)
|
||||||
18 2 Reserved
|
18 1 PPDU type (ADR-110): 0=HT/legacy, 1=HE-SU, 2=HE-MU,
|
||||||
|
3=HE-TB, 0xFF=unknown. Pre-ADR-110 firmware sends 0.
|
||||||
|
19 1 Flags (ADR-110): bit 0 = bw40, bit 2 = STBC,
|
||||||
|
bit 3 = LDPC, bit 4 = 802.15.4 sync valid.
|
||||||
20 N*2 I/Q pairs (n_antennas * n_subcarriers * 2 bytes, signed i8)
|
20 N*2 I/Q pairs (n_antennas * n_subcarriers * 2 bytes, signed i8)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
MAGIC = 0xC5110001
|
MAGIC = 0xC5110001
|
||||||
HEADER_SIZE = 20
|
HEADER_SIZE = 20
|
||||||
HEADER_FMT = '<IBBHIIBB2x' # magic, node_id, n_ant, n_sc, freq, seq, rssi, noise
|
# ADR-110: previously '<IBBHIIBB2x' (last 2 bytes skipped as reserved).
|
||||||
|
# Now read those 2 bytes as PPDU type + flags. Pre-ADR-110 firmware
|
||||||
|
# sends zeros, which decode as 'HT/legacy' + 'no flags' — fully
|
||||||
|
# backwards compatible.
|
||||||
|
HEADER_FMT = '<IBBHIIBBBB' # +2 bytes: ppdu_type, flags
|
||||||
|
|
||||||
|
# ADR-110 PPDU type byte values
|
||||||
|
PPDU_HT_LEGACY = 0
|
||||||
|
PPDU_HE_SU = 1
|
||||||
|
PPDU_HE_MU = 2
|
||||||
|
PPDU_HE_TB = 3
|
||||||
|
PPDU_UNKNOWN = 0xFF
|
||||||
|
_PPDU_NAMES = {0: 'ht_legacy', 1: 'he_su', 2: 'he_mu', 3: 'he_tb', 0xFF: 'unknown'}
|
||||||
|
|
||||||
def parse(self, raw_data: bytes) -> CSIData:
|
def parse(self, raw_data: bytes) -> CSIData:
|
||||||
"""Parse an ADR-018 binary frame into CSIData.
|
"""Parse an ADR-018 binary frame into CSIData.
|
||||||
|
|
@ -168,8 +183,8 @@ class ESP32BinaryParser:
|
||||||
f"Frame too short: need {self.HEADER_SIZE} bytes, got {len(raw_data)}"
|
f"Frame too short: need {self.HEADER_SIZE} bytes, got {len(raw_data)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
magic, node_id, n_antennas, n_subcarriers, freq_mhz, sequence, rssi_u8, noise_u8 = \
|
magic, node_id, n_antennas, n_subcarriers, freq_mhz, sequence, rssi_u8, noise_u8, \
|
||||||
struct.unpack_from(self.HEADER_FMT, raw_data, 0)
|
ppdu_byte, flags_byte = struct.unpack_from(self.HEADER_FMT, raw_data, 0)
|
||||||
|
|
||||||
if magic != self.MAGIC:
|
if magic != self.MAGIC:
|
||||||
raise CSIParseError(
|
raise CSIParseError(
|
||||||
|
|
@ -226,6 +241,17 @@ class ESP32BinaryParser:
|
||||||
'rssi_dbm': rssi,
|
'rssi_dbm': rssi,
|
||||||
'noise_floor_dbm': noise_floor,
|
'noise_floor_dbm': noise_floor,
|
||||||
'channel_freq_mhz': freq_mhz,
|
'channel_freq_mhz': freq_mhz,
|
||||||
|
# ADR-110 extension — zeros from pre-ADR-110 firmware land here as
|
||||||
|
# 'ht_legacy' + all-flags-false. New consumers can branch on
|
||||||
|
# ppdu_type / he_capable for HE-LTF-aware DSP.
|
||||||
|
'ppdu_type': self._PPDU_NAMES.get(ppdu_byte, 'unknown'),
|
||||||
|
'ppdu_type_raw': ppdu_byte,
|
||||||
|
'he_capable': ppdu_byte in (1, 2, 3),
|
||||||
|
'bw40': bool(flags_byte & 0x01),
|
||||||
|
'stbc': bool(flags_byte & 0x04),
|
||||||
|
'ldpc': bool(flags_byte & 0x08),
|
||||||
|
'ieee802154_sync_valid': bool(flags_byte & 0x10),
|
||||||
|
'adr018_flags_raw': flags_byte,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,10 @@ from hardware.csi_extractor import (
|
||||||
|
|
||||||
# ADR-018 constants
|
# ADR-018 constants
|
||||||
MAGIC = 0xC5110001
|
MAGIC = 0xC5110001
|
||||||
HEADER_FMT = '<IBBHIIBB2x'
|
# ADR-110: bytes 18-19 are now PPDU type + flags (used to be `2x` reserved).
|
||||||
|
# Pre-ADR-110 firmware sends zeros for both, which round-trip as
|
||||||
|
# ('ht_legacy', flags=all-false) — fully backwards compatible.
|
||||||
|
HEADER_FMT = '<IBBHIIBBBB'
|
||||||
HEADER_SIZE = 20
|
HEADER_SIZE = 20
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -36,6 +39,8 @@ def build_binary_frame(
|
||||||
rssi: int = -50,
|
rssi: int = -50,
|
||||||
noise_floor: int = -90,
|
noise_floor: int = -90,
|
||||||
iq_pairs: list = None,
|
iq_pairs: list = None,
|
||||||
|
ppdu_byte: int = 0, # ADR-110: default 0 = HT/legacy (pre-ADR-110 behavior)
|
||||||
|
flags_byte: int = 0, # ADR-110: default 0 = no flags set
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
"""Build an ADR-018 binary frame for testing."""
|
"""Build an ADR-018 binary frame for testing."""
|
||||||
if iq_pairs is None:
|
if iq_pairs is None:
|
||||||
|
|
@ -54,6 +59,8 @@ def build_binary_frame(
|
||||||
sequence,
|
sequence,
|
||||||
rssi_u8,
|
rssi_u8,
|
||||||
noise_u8,
|
noise_u8,
|
||||||
|
ppdu_byte,
|
||||||
|
flags_byte,
|
||||||
)
|
)
|
||||||
|
|
||||||
iq_data = b''
|
iq_data = b''
|
||||||
|
|
@ -63,6 +70,52 @@ def build_binary_frame(
|
||||||
return header + iq_data
|
return header + iq_data
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdr110ByteEncoding:
|
||||||
|
"""ADR-110: byte 18 = PPDU type, byte 19 = flags."""
|
||||||
|
|
||||||
|
def setup_method(self):
|
||||||
|
self.parser = ESP32BinaryParser()
|
||||||
|
|
||||||
|
def test_pre_adr110_zeros_decode_as_ht_legacy(self):
|
||||||
|
"""Pre-ADR-110 firmware sends zeros → must surface as HT/legacy + no flags."""
|
||||||
|
frame = build_binary_frame() # ppdu_byte=0, flags_byte=0 default
|
||||||
|
csi = self.parser.parse(frame)
|
||||||
|
assert csi.metadata['ppdu_type'] == 'ht_legacy'
|
||||||
|
assert csi.metadata['ppdu_type_raw'] == 0
|
||||||
|
assert csi.metadata['he_capable'] is False
|
||||||
|
assert csi.metadata['bw40'] is False
|
||||||
|
assert csi.metadata['stbc'] is False
|
||||||
|
assert csi.metadata['ldpc'] is False
|
||||||
|
assert csi.metadata['ieee802154_sync_valid'] is False
|
||||||
|
|
||||||
|
def test_he_su_decodes(self):
|
||||||
|
frame = build_binary_frame(ppdu_byte=1)
|
||||||
|
csi = self.parser.parse(frame)
|
||||||
|
assert csi.metadata['ppdu_type'] == 'he_su'
|
||||||
|
assert csi.metadata['he_capable'] is True
|
||||||
|
|
||||||
|
def test_he_mu_and_he_tb_decode(self):
|
||||||
|
for byte, expected in [(2, 'he_mu'), (3, 'he_tb')]:
|
||||||
|
csi = self.parser.parse(build_binary_frame(ppdu_byte=byte))
|
||||||
|
assert csi.metadata['ppdu_type'] == expected
|
||||||
|
assert csi.metadata['he_capable'] is True
|
||||||
|
|
||||||
|
def test_unknown_ppdu_byte(self):
|
||||||
|
csi = self.parser.parse(build_binary_frame(ppdu_byte=0xFF))
|
||||||
|
assert csi.metadata['ppdu_type'] == 'unknown'
|
||||||
|
assert csi.metadata['ppdu_type_raw'] == 0xFF
|
||||||
|
assert csi.metadata['he_capable'] is False
|
||||||
|
|
||||||
|
def test_all_flags_set_round_trip(self):
|
||||||
|
# bw40 (0x01) + STBC (0x04) + LDPC (0x08) + 15.4-sync (0x10) = 0x1D
|
||||||
|
csi = self.parser.parse(build_binary_frame(ppdu_byte=1, flags_byte=0x1D))
|
||||||
|
assert csi.metadata['bw40'] is True
|
||||||
|
assert csi.metadata['stbc'] is True
|
||||||
|
assert csi.metadata['ldpc'] is True
|
||||||
|
assert csi.metadata['ieee802154_sync_valid'] is True
|
||||||
|
assert csi.metadata['adr018_flags_raw'] == 0x1D
|
||||||
|
|
||||||
|
|
||||||
class TestESP32BinaryParser:
|
class TestESP32BinaryParser:
|
||||||
"""Tests for ESP32BinaryParser."""
|
"""Tests for ESP32BinaryParser."""
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue