feat(python): host-side decode for ADR-018 byte 18-19 (ADR-110)
Python ESP32BinaryParser was using struct format '<IBBHIIBB2x' — the
'2x' skipped bytes 18-19 as reserved. After the Rust-side decoder was
extended to surface PPDU type + flags, the Python pipeline (which
archive/v1 still uses for testing + the proof verifier) needs the same
update so its consumers see the HE metadata too.
csi_extractor.py:
- HEADER_FMT now '<IBBHIIBBBB' (captures bytes 18-19)
- New metadata fields: ppdu_type ('ht_legacy'|'he_su'|'he_mu'|'he_tb'|'unknown'),
ppdu_type_raw, he_capable, bw40, stbc, ldpc, ieee802154_sync_valid,
adr018_flags_raw
- Class constants PPDU_HT_LEGACY..PPDU_UNKNOWN mirror the firmware
test_esp32_binary_parser.py:
- build_binary_frame() takes optional ppdu_byte + flags_byte (default 0)
- New TestAdr110ByteEncoding class with 5 tests:
- Pre-ADR-110 zeros decode as 'ht_legacy' + all-flags-false
- HE-SU / HE-MU / HE-TB decode correctly
- 0xFF decodes as 'unknown'
- All-flags-set round-trip (0x1D)
11/11 parser tests pass (6 existing + 5 new). Backwards compat verified.
Pairs with the Rust-side decoder in commit 3959fabf3. Both pipelines now
read the same wire format produced by the C6 firmware's
CONFIG_CSI_FRAME_HE_TAGGING path.
Ref: ruvnet/RuView#762, draft PR #764
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
3959fabf31
commit
8eaa92cf21
|
|
@ -143,13 +143,28 @@ class ESP32BinaryParser:
|
|||
12 4 Sequence number (LE u32)
|
||||
16 1 RSSI (i8)
|
||||
17 1 Noise floor (i8)
|
||||
18 2 Reserved
|
||||
18 1 PPDU type (ADR-110): 0=HT/legacy, 1=HE-SU, 2=HE-MU,
|
||||
3=HE-TB, 0xFF=unknown. Pre-ADR-110 firmware sends 0.
|
||||
19 1 Flags (ADR-110): bit 0 = bw40, bit 2 = STBC,
|
||||
bit 3 = LDPC, bit 4 = 802.15.4 sync valid.
|
||||
20 N*2 I/Q pairs (n_antennas * n_subcarriers * 2 bytes, signed i8)
|
||||
"""
|
||||
|
||||
MAGIC = 0xC5110001
|
||||
HEADER_SIZE = 20
|
||||
HEADER_FMT = '<IBBHIIBB2x' # magic, node_id, n_ant, n_sc, freq, seq, rssi, noise
|
||||
# ADR-110: previously '<IBBHIIBB2x' (last 2 bytes skipped as reserved).
|
||||
# Now read those 2 bytes as PPDU type + flags. Pre-ADR-110 firmware
|
||||
# sends zeros, which decode as 'HT/legacy' + 'no flags' — fully
|
||||
# backwards compatible.
|
||||
HEADER_FMT = '<IBBHIIBBBB' # +2 bytes: ppdu_type, flags
|
||||
|
||||
# ADR-110 PPDU type byte values
|
||||
PPDU_HT_LEGACY = 0
|
||||
PPDU_HE_SU = 1
|
||||
PPDU_HE_MU = 2
|
||||
PPDU_HE_TB = 3
|
||||
PPDU_UNKNOWN = 0xFF
|
||||
_PPDU_NAMES = {0: 'ht_legacy', 1: 'he_su', 2: 'he_mu', 3: 'he_tb', 0xFF: 'unknown'}
|
||||
|
||||
def parse(self, raw_data: bytes) -> CSIData:
|
||||
"""Parse an ADR-018 binary frame into CSIData.
|
||||
|
|
@ -168,8 +183,8 @@ class ESP32BinaryParser:
|
|||
f"Frame too short: need {self.HEADER_SIZE} bytes, got {len(raw_data)}"
|
||||
)
|
||||
|
||||
magic, node_id, n_antennas, n_subcarriers, freq_mhz, sequence, rssi_u8, noise_u8 = \
|
||||
struct.unpack_from(self.HEADER_FMT, raw_data, 0)
|
||||
magic, node_id, n_antennas, n_subcarriers, freq_mhz, sequence, rssi_u8, noise_u8, \
|
||||
ppdu_byte, flags_byte = struct.unpack_from(self.HEADER_FMT, raw_data, 0)
|
||||
|
||||
if magic != self.MAGIC:
|
||||
raise CSIParseError(
|
||||
|
|
@ -226,6 +241,17 @@ class ESP32BinaryParser:
|
|||
'rssi_dbm': rssi,
|
||||
'noise_floor_dbm': noise_floor,
|
||||
'channel_freq_mhz': freq_mhz,
|
||||
# ADR-110 extension — zeros from pre-ADR-110 firmware land here as
|
||||
# 'ht_legacy' + all-flags-false. New consumers can branch on
|
||||
# ppdu_type / he_capable for HE-LTF-aware DSP.
|
||||
'ppdu_type': self._PPDU_NAMES.get(ppdu_byte, 'unknown'),
|
||||
'ppdu_type_raw': ppdu_byte,
|
||||
'he_capable': ppdu_byte in (1, 2, 3),
|
||||
'bw40': bool(flags_byte & 0x01),
|
||||
'stbc': bool(flags_byte & 0x04),
|
||||
'ldpc': bool(flags_byte & 0x08),
|
||||
'ieee802154_sync_valid': bool(flags_byte & 0x10),
|
||||
'adr018_flags_raw': flags_byte,
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,10 @@ from hardware.csi_extractor import (
|
|||
|
||||
# ADR-018 constants
|
||||
MAGIC = 0xC5110001
|
||||
HEADER_FMT = '<IBBHIIBB2x'
|
||||
# ADR-110: bytes 18-19 are now PPDU type + flags (used to be `2x` reserved).
|
||||
# Pre-ADR-110 firmware sends zeros for both, which round-trip as
|
||||
# ('ht_legacy', flags=all-false) — fully backwards compatible.
|
||||
HEADER_FMT = '<IBBHIIBBBB'
|
||||
HEADER_SIZE = 20
|
||||
|
||||
|
||||
|
|
@ -36,6 +39,8 @@ def build_binary_frame(
|
|||
rssi: int = -50,
|
||||
noise_floor: int = -90,
|
||||
iq_pairs: list = None,
|
||||
ppdu_byte: int = 0, # ADR-110: default 0 = HT/legacy (pre-ADR-110 behavior)
|
||||
flags_byte: int = 0, # ADR-110: default 0 = no flags set
|
||||
) -> bytes:
|
||||
"""Build an ADR-018 binary frame for testing."""
|
||||
if iq_pairs is None:
|
||||
|
|
@ -54,6 +59,8 @@ def build_binary_frame(
|
|||
sequence,
|
||||
rssi_u8,
|
||||
noise_u8,
|
||||
ppdu_byte,
|
||||
flags_byte,
|
||||
)
|
||||
|
||||
iq_data = b''
|
||||
|
|
@ -63,6 +70,52 @@ def build_binary_frame(
|
|||
return header + iq_data
|
||||
|
||||
|
||||
class TestAdr110ByteEncoding:
|
||||
"""ADR-110: byte 18 = PPDU type, byte 19 = flags."""
|
||||
|
||||
def setup_method(self):
|
||||
self.parser = ESP32BinaryParser()
|
||||
|
||||
def test_pre_adr110_zeros_decode_as_ht_legacy(self):
|
||||
"""Pre-ADR-110 firmware sends zeros → must surface as HT/legacy + no flags."""
|
||||
frame = build_binary_frame() # ppdu_byte=0, flags_byte=0 default
|
||||
csi = self.parser.parse(frame)
|
||||
assert csi.metadata['ppdu_type'] == 'ht_legacy'
|
||||
assert csi.metadata['ppdu_type_raw'] == 0
|
||||
assert csi.metadata['he_capable'] is False
|
||||
assert csi.metadata['bw40'] is False
|
||||
assert csi.metadata['stbc'] is False
|
||||
assert csi.metadata['ldpc'] is False
|
||||
assert csi.metadata['ieee802154_sync_valid'] is False
|
||||
|
||||
def test_he_su_decodes(self):
|
||||
frame = build_binary_frame(ppdu_byte=1)
|
||||
csi = self.parser.parse(frame)
|
||||
assert csi.metadata['ppdu_type'] == 'he_su'
|
||||
assert csi.metadata['he_capable'] is True
|
||||
|
||||
def test_he_mu_and_he_tb_decode(self):
|
||||
for byte, expected in [(2, 'he_mu'), (3, 'he_tb')]:
|
||||
csi = self.parser.parse(build_binary_frame(ppdu_byte=byte))
|
||||
assert csi.metadata['ppdu_type'] == expected
|
||||
assert csi.metadata['he_capable'] is True
|
||||
|
||||
def test_unknown_ppdu_byte(self):
|
||||
csi = self.parser.parse(build_binary_frame(ppdu_byte=0xFF))
|
||||
assert csi.metadata['ppdu_type'] == 'unknown'
|
||||
assert csi.metadata['ppdu_type_raw'] == 0xFF
|
||||
assert csi.metadata['he_capable'] is False
|
||||
|
||||
def test_all_flags_set_round_trip(self):
|
||||
# bw40 (0x01) + STBC (0x04) + LDPC (0x08) + 15.4-sync (0x10) = 0x1D
|
||||
csi = self.parser.parse(build_binary_frame(ppdu_byte=1, flags_byte=0x1D))
|
||||
assert csi.metadata['bw40'] is True
|
||||
assert csi.metadata['stbc'] is True
|
||||
assert csi.metadata['ldpc'] is True
|
||||
assert csi.metadata['ieee802154_sync_valid'] is True
|
||||
assert csi.metadata['adr018_flags_raw'] == 0x1D
|
||||
|
||||
|
||||
class TestESP32BinaryParser:
|
||||
"""Tests for ESP32BinaryParser."""
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue