from enum import IntEnum
import ctypes
import binascii
import numpy as np
from . import constants
# Internal constants
SPI_TYPE_HEADER_RADAR_TX_REPORT = 0x52545852
SPI_TYPE_HEADER_JUMBO_FRAME = 0xDECAFBAD
JUMBO_FRAGMENT_TERMINATOR_UID = 0
CSISTREAM_UID_SENSOR_SHIFT = 29
CSISTREAM_UID_SENSOR_MASK = 0x7
# Other constants
HT_COEFFICIENTS_PER_CHANNEL = 57
"Number of channel coefficients (active subcarriers) per Wi-Fi channel in HT mode (HT-LTF)"
LEGACY_COEFFICIENTS_PER_CHANNEL = 53
"Number of channel coefficients (active subcarriers) per Wi-Fi channel in legacy mode (L-LTF)"
HE20_COEFFICIENTS_PER_CHANNEL = 245
"Number of channel coefficients (active plus invalid subcarriers) for a 20 MHz HE-LTF"
HT40_GAP_SUBCARRIERS = 3
"Gap between primary and secondary channel in HT40 mode, in subcarriers"
COMPRESSED_LLTF_FFT_SIZE = 64
COMPRESSED_HT20_FFT_SIZE = 64
COMPRESSED_HT40_FFT_SIZE = 128
COMPRESSED_HE20_FFT_SIZE = 256
COMPRESSED_LLTF_FIX32_SHIFT = 17
COMPRESSED_LLTF_8BIT_MODE_FIX32_SHIFT = 21
COMPRESSED_HT20_FIX32_SHIFT = 21
COMPRESSED_HT40_FIX32_SHIFT = 21
COMPRESSED_HE20_FIX32_SHIFT = 21
COMPRESSED_TAP_COUNT = 16
COMPRESSED_LLTF_TAP_START = 27
COMPRESSED_HT20_TAP_START = 27
COMPRESSED_HT40_TAP_START = 55
COMPRESSED_HE20_TAP_START = 120
SERIALIZED_CSI_TLV_TYPE_FRAME_META = 2
SERIALIZED_CSI_TLV_TYPE_TIMING_META = 3
SERIALIZED_CSI_TLV_TYPE_ACQUIRE_META = 4
SERIALIZED_CSI_TLV_TYPE_RX_CTRL_RAW = 5
SERIALIZED_CSI_TLV_TYPE_CSI_RAW = 6
SERIALIZED_CSI_TLV_TYPE_CSI_COMPRESSED = 7
SERIALIZED_CSI_TLV_TYPE_RX_CTRL_COMPRESSED = 8
SERIALIZED_CSI_TLV_TYPE_GAIN_TABLE_ENTRY = 9
SERIALIZED_CSI_TLV_TYPE_CRC32 = 255
RADAR_TX_REPORT_TLV_TYPE_FRAME_META = 1
RADAR_TX_REPORT_TLV_TYPE_TIMING_META = 2
RADAR_TX_REPORT_TLV_TYPE_RADAR_META = 3
RADAR_TX_REPORT_TLV_TYPE_TX_META = 4
RADAR_TX_REPORT_TLV_TYPE_RAW_META = 5
RADAR_TX_REPORT_TLV_TYPE_CRC32 = 255
RADAR_TX_REPORT_FLAG_HAS_HW_TIMESTAMP = 1 << 0
SERIALIZED_CSI_TLV_FRAME_FLAG_IS_CALIB = 1 << 0
SERIALIZED_CSI_TLV_FRAME_FLAG_IS_RADAR = 1 << 1
SERIALIZED_CSI_TLV_FRAME_FLAG_FIRST_WORD_INVALID = 1 << 2
SERIALIZED_CSI_TLV_ACQUIRE_FLAG_FORCE_LLTF = 1 << 0
SERIALIZED_CSI_TLV_ACQUIRE_FLAG_LLTF_8BIT_MODE = 1 << 1
SERIALIZED_CSI_TLV_RX_CTRL_COMPRESSED_FLAG_IS_HT40 = 1 << 0
SERIALIZED_CSI_TLV_RX_CTRL_COMPRESSED_FLAG_CHANNEL_ESTIMATE_INFO_VLD = 1 << 1
#####################################################
# Enums used by multiple PHY versions #
#####################################################
[docs]
class wifi_sig_mode_t(IntEnum):
SIG_MODE_LEGACY = 0
SIG_MODE_HT = 1
SIG_MODE_HE = 2
SIG_MODE_VHT = 3
[docs]
class wifi_phy_mode_t(IntEnum):
WIFI_PHY_MODE_LR = 0
WIFI_PHY_MODE_11B = 1
WIFI_PHY_MODE_11G = 2
WIFI_PHY_MODE_11A = 3
WIFI_PHY_MODE_HT20 = 4
WIFI_PHY_MODE_HT40 = 5
WIFI_PHY_MODE_HE20 = 6
WIFI_PHY_MODE_VHT20 = 7
[docs]
class wifi_phy_rate_t(IntEnum):
WIFI_PHY_RATE_1M_L = 0x00
WIFI_PHY_RATE_2M_L = 0x01
WIFI_PHY_RATE_5M_L = 0x02
WIFI_PHY_RATE_11M_L = 0x03
WIFI_PHY_RATE_2M_S = 0x05
WIFI_PHY_RATE_5M_S = 0x06
WIFI_PHY_RATE_11M_S = 0x07
WIFI_PHY_RATE_48M = 0x08
WIFI_PHY_RATE_24M = 0x09
WIFI_PHY_RATE_12M = 0x0A
WIFI_PHY_RATE_6M = 0x0B
WIFI_PHY_RATE_54M = 0x0C
WIFI_PHY_RATE_36M = 0x0D
WIFI_PHY_RATE_18M = 0x0E
WIFI_PHY_RATE_9M = 0x0F
WIFI_PHY_RATE_MCS0_LGI = 0x10
WIFI_PHY_RATE_MCS1_LGI = 0x11
WIFI_PHY_RATE_MCS2_LGI = 0x12
WIFI_PHY_RATE_MCS3_LGI = 0x13
WIFI_PHY_RATE_MCS4_LGI = 0x14
WIFI_PHY_RATE_MCS5_LGI = 0x15
WIFI_PHY_RATE_MCS6_LGI = 0x16
WIFI_PHY_RATE_MCS7_LGI = 0x17
WIFI_PHY_RATE_MCS8_LGI = 0x18
WIFI_PHY_RATE_MCS9_LGI = 0x19
WIFI_PHY_RATE_MCS0_SGI = 0x1A
WIFI_PHY_RATE_MCS1_SGI = 0x1B
WIFI_PHY_RATE_MCS2_SGI = 0x1C
WIFI_PHY_RATE_MCS3_SGI = 0x1D
WIFI_PHY_RATE_MCS4_SGI = 0x1E
WIFI_PHY_RATE_MCS5_SGI = 0x1F
WIFI_PHY_RATE_MCS6_SGI = 0x20
WIFI_PHY_RATE_MCS7_SGI = 0x21
WIFI_PHY_RATE_MCS8_SGI = 0x22
WIFI_PHY_RATE_MCS9_SGI = 0x23
WIFI_PHY_RATE_LORA_250K = 0x29
WIFI_PHY_RATE_LORA_500K = 0x2A
WIFI_PHY_RATE_MAX = 0x2B
[docs]
class wifi_tx_power_t(IntEnum):
WIFI_TX_POWER_2_DBM = 8
WIFI_TX_POWER_5_DBM = 20
WIFI_TX_POWER_7_DBM = 28
WIFI_TX_POWER_8_5_DBM = 34
WIFI_TX_POWER_11_DBM = 44
WIFI_TX_POWER_13_DBM = 52
WIFI_TX_POWER_14_DBM = 56
WIFI_TX_POWER_15_DBM = 60
WIFI_TX_POWER_16_5_DBM = 66
WIFI_TX_POWER_18_DBM = 72
WIFI_TX_POWER_20_DBM = 80
[docs]
class rfswitch_state_t(IntEnum):
SENSOR_RFSWITCH_ISOLATION = 0
SENSOR_RFSWITCH_REFERENCE = 1
SENSOR_RFSWITCH_ANTENNA_R = 2
SENSOR_RFSWITCH_ANTENNA_L = 3
SENSOR_RFSWITCH_ANTENNA_RANDOM = 4
SENSOR_RFSWITCH_UNKNOWN = 255
#####################################################
# Common C Structures used by multiple PHY versions #
#####################################################
[docs]
class seq_ctrl_t(ctypes.LittleEndianStructure):
"""
A ctypes structure representing the sequence control field of a Wi-Fi packet.
This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number.
"""
_pack_ = 1
_fields_ = [("frag", ctypes.c_uint16, 4), ("seg", ctypes.c_uint16, 12)]
def __new__(self, buf=None):
return self.from_buffer_copy(buf)
def __init__(self, buf=None):
pass
########################################################################
# C Structures for Espressif PHY version 3 (e.g., ESP32-C5, ESP32-C61) #
########################################################################
[docs]
class wifi_pkt_rx_ctrl_v3_t(ctypes.LittleEndianStructure):
"""
A ctypes structure representing the `wifi_pkt_rx_ctrl_t` as provided by the ESP32.
See the related `esp-idf code <https://github.com/espressif/esp-idf/blob/master/components/esp_wifi/include/esp_wifi_he_types.h>`_ for details.
Variant for Espressif PHY version 3.
"""
_pack_ = 1
_fields_ = [
("rssi", ctypes.c_uint32, 8),
("rate", ctypes.c_uint32, 5),
("lsig_reserved", ctypes.c_uint32, 1),
("sig_mode", ctypes.c_uint32, 2),
("lsig_len", ctypes.c_uint32, 12),
("rxmatch0", ctypes.c_uint32, 1),
("rxmatch1", ctypes.c_uint32, 1),
("rxmatch2", ctypes.c_uint32, 1),
("rxmatch3", ctypes.c_uint32, 1),
("he_siga1", ctypes.c_uint32, 32), # HE-SIGA1, HT-SIG, or VHT-SIG depending on cur_bb_format
("rxend_state", ctypes.c_uint32, 8),
("he_siga2", ctypes.c_uint32, 16),
("rxstart_time_cyc", ctypes.c_uint32, 7),
("is_group", ctypes.c_uint32, 1),
("timestamp", ctypes.c_uint32, 32),
("cfo_low_rate", ctypes.c_uint32, 15),
("cfo_high_rate", ctypes.c_uint32, 15),
("_reserved7", ctypes.c_uint32, 2),
("noise_floor", ctypes.c_uint32, 8),
("data_rssi", ctypes.c_uint32, 8), # signed data-field RSSI estimate, exposed as raw uint8
("fft_gain", ctypes.c_uint32, 8),
("rx_gain", ctypes.c_uint32, 8),
("_reserved11", ctypes.c_uint32, 8),
("_reserved12", ctypes.c_uint32, 8),
("_reserved13", ctypes.c_uint32, 2),
("sigb_len", ctypes.c_uint32, 10),
("_reserved14", ctypes.c_uint32, 1),
("_reserved15", ctypes.c_uint32, 1),
("_reserved16", ctypes.c_uint32, 1),
("_reserved17", ctypes.c_uint32, 1),
("channel", ctypes.c_uint32, 8),
("second", ctypes.c_uint32, 8),
("_reserved18", ctypes.c_uint32, 4),
("_reserved19", ctypes.c_uint32, 4),
("_reserved20", ctypes.c_uint32, 1),
("_reserved21", ctypes.c_uint32, 7),
("_reserved22", ctypes.c_uint32, 2),
("_reserved23", ctypes.c_uint32, 4),
("_reserved24", ctypes.c_uint32, 2),
("rxstart_time_cyc_dec", ctypes.c_uint32, 11),
("_reserved26", ctypes.c_uint32, 1),
("_reserved27", ctypes.c_uint32, 12),
("_reserved28", ctypes.c_uint32, 12),
("cur_bb_format", ctypes.c_uint32, 4),
("rx_channel_estimate_len", ctypes.c_uint32, 10),
("rx_channel_estimate_info_vld", ctypes.c_uint32, 1),
("_reserved29", ctypes.c_uint32, 5),
("_reserved30", ctypes.c_uint32, 21),
("_reserved31", ctypes.c_uint32, 10),
("_reserved32", ctypes.c_uint32, 1),
("_reserved33", ctypes.c_uint32, 3),
("_reserved34", ctypes.c_uint32, 1),
("_reserved35", ctypes.c_uint32, 6),
("_reserved36", ctypes.c_uint32, 21),
("_reserved37", ctypes.c_uint32, 1),
("_reserved38", ctypes.c_uint32, 32),
("_reserved39", ctypes.c_uint32, 7),
("_reserved40", ctypes.c_uint32, 1),
("_reserved41", ctypes.c_uint32, 8),
("_reserved42", ctypes.c_uint32, 16),
("sig_len", ctypes.c_uint32, 14),
("_reserved43", ctypes.c_uint32, 2),
("dump_len", ctypes.c_uint32, 14),
("_reserved44", ctypes.c_uint32, 2),
("rx_state", ctypes.c_uint32, 8),
("_reserved45", ctypes.c_uint32, 8),
("_reserved46", ctypes.c_uint32, 16),
]
def __new__(self, buf=None):
return self.from_buffer_copy(buf)
def __init__(self, buf=None):
pass
assert ctypes.sizeof(wifi_pkt_rx_ctrl_v3_t) == 64
[docs]
class compressed_rx_ctrl_t(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [
("rssi", ctypes.c_uint8),
("noise_floor", ctypes.c_uint8),
("channel", ctypes.c_uint8),
("secondary_channel", ctypes.c_int8),
("cur_bb_format", ctypes.c_uint8),
("rate", ctypes.c_uint8),
("sig_mode", ctypes.c_uint8),
("rxstart_time_cyc", ctypes.c_uint8),
("rx_channel_estimate_len", ctypes.c_uint16),
("flags", ctypes.c_uint16),
("timestamp", ctypes.c_uint32),
("cfo_low_rate", ctypes.c_uint16),
("cfo_high_rate", ctypes.c_uint16),
("he_sig1_mcs", ctypes.c_uint8),
("reserved", ctypes.c_uint8),
("fft_gain", ctypes.c_uint8),
("rx_gain", ctypes.c_uint8),
]
def __new__(self, buf=None):
return self.from_buffer_copy(buf)
def __init__(self, buf=None):
pass
assert ctypes.sizeof(compressed_rx_ctrl_t) == 24
_COMPRESSED_RX_CTRL_MIN_SIZE = 22
def _build_rx_ctrl_v3_from_compressed(compact_raw: bytes) -> bytes:
compact_buf = bytes(compact_raw)
if len(compact_buf) < ctypes.sizeof(compressed_rx_ctrl_t):
compact_buf += bytes(ctypes.sizeof(compressed_rx_ctrl_t) - len(compact_buf))
compact = compressed_rx_ctrl_t(compact_buf)
ctrl = wifi_pkt_rx_ctrl_v3_t(bytes(ctypes.sizeof(wifi_pkt_rx_ctrl_v3_t)))
ctrl.rssi = int(compact.rssi)
ctrl.rate = int(compact.rate)
ctrl.sig_mode = int(compact.sig_mode)
ctrl.he_siga1 = int(compact.he_sig1_mcs)
if compact.flags & SERIALIZED_CSI_TLV_RX_CTRL_COMPRESSED_FLAG_IS_HT40:
ctrl.he_siga1 |= 0x80
ctrl.rxstart_time_cyc = int(compact.rxstart_time_cyc)
ctrl.timestamp = int(compact.timestamp)
ctrl.cfo_low_rate = int(compact.cfo_low_rate)
ctrl.cfo_high_rate = int(compact.cfo_high_rate)
ctrl.noise_floor = int(compact.noise_floor)
ctrl.channel = int(compact.channel)
if compact.secondary_channel > 0:
ctrl.second = 1
elif compact.secondary_channel < 0:
ctrl.second = 2
else:
ctrl.second = 0
ctrl.cur_bb_format = int(compact.cur_bb_format)
ctrl.fft_gain = int(compact.fft_gain)
ctrl.rx_gain = int(compact.rx_gain)
ctrl.rx_channel_estimate_len = int(compact.rx_channel_estimate_len)
ctrl.rx_channel_estimate_info_vld = 1 if (compact.flags & SERIALIZED_CSI_TLV_RX_CTRL_COMPRESSED_FLAG_CHANNEL_ESTIMATE_INFO_VLD) else 0
return ctypes.string_at(ctypes.byref(ctrl), ctypes.sizeof(ctrl))
[docs]
def gain_byte_to_signed(value: int) -> int:
"""
Interpret an 8-bit gain value reported in ``rx_ctrl`` as signed.
The hardware stores gain fields as bytes. RX gain normally lives in the
positive gain-table range, while FFT gain can be negative and is therefore
reported in two's-complement form.
"""
value = int(value)
value &= 0xFF
return value - 0x100 if value & 0x80 else value
assert ctypes.sizeof(csistream_fragment_header_t) == 8
[docs]
class serialized_csi_tlv_t:
def __init__(self, buf=None):
raw = bytes(buf if buf is not None else b"")
if len(raw) < 4:
raise ValueError("CSI TLV packet too short")
self._raw = raw
self.type_header = int.from_bytes(raw[0:4], byteorder="little")
self.source_mac = bytes(6)
self.dest_mac = bytes(6)
self.seq_ctrl = seq_ctrl_t(b"\x00\x00")
self.frame_flags = 0
self.timestamp = 0
self.global_timestamp_us = 0
self.acquire_flags = 0
self.acquire_val_scale_cfg = 0
self.rfswitch_state = rfswitch_state_t.SENSOR_RFSWITCH_UNKNOWN
self.gain_table_entry_raw = bytes(12)
self.gain_table_entry_valid = False
self.antid = 0xFF
self.rx_ctrl = bytes()
self.buf = bytes()
self.csi_len = 0
self._is_compressed = False
self.crc32 = None
self._crc_valid = False
self._raw_csi_tlv = None
self._raw_csi_padded_len = 0
offset = 4
while offset < len(raw):
if offset + 3 > len(raw):
raise ValueError("Malformed CSI TLV header")
tlv_type = raw[offset]
tlv_len = int.from_bytes(raw[offset + 1 : offset + 3], byteorder="little")
tlv_start = offset
offset += 3
tlv_end = offset + tlv_len
if tlv_end > len(raw):
raise ValueError("Malformed CSI TLV length")
value = raw[offset:tlv_end]
if tlv_type == SERIALIZED_CSI_TLV_TYPE_FRAME_META:
if tlv_len < 16:
raise ValueError("Invalid frame meta TLV")
self.source_mac = bytes(value[0:6])
self.dest_mac = bytes(value[6:12])
self.seq_ctrl = seq_ctrl_t(value[12:14])
self.frame_flags = int.from_bytes(value[14:16], byteorder="little")
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_TIMING_META:
if tlv_len < 8:
raise ValueError("Invalid timing meta TLV")
self.global_timestamp_us = int.from_bytes(value[:8], byteorder="little")
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_ACQUIRE_META:
if tlv_len < 4:
raise ValueError("Invalid acquire meta TLV")
self.acquire_flags = int.from_bytes(value[0:2], byteorder="little")
self.acquire_val_scale_cfg = value[2]
self.rfswitch_state = value[3]
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_GAIN_TABLE_ENTRY:
if tlv_len < 12:
raise ValueError("Invalid gain table entry TLV")
self.gain_table_entry_raw = bytes(value[:12])
self.gain_table_entry_valid = True
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_RX_CTRL_RAW:
self.rx_ctrl = bytes(value)
if len(self.rx_ctrl) >= ctypes.sizeof(wifi_pkt_rx_ctrl_v3_t):
self.timestamp = wifi_pkt_rx_ctrl_v3_t(self.rx_ctrl).timestamp
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_RX_CTRL_COMPRESSED:
if tlv_len < _COMPRESSED_RX_CTRL_MIN_SIZE:
raise ValueError("Invalid compressed RX CTRL TLV")
self.rx_ctrl = _build_rx_ctrl_v3_from_compressed(bytes(value[: ctypes.sizeof(compressed_rx_ctrl_t)]))
self.timestamp = wifi_pkt_rx_ctrl_v3_t(self.rx_ctrl).timestamp
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_CSI_RAW:
self._raw_csi_tlv = bytes(value)
self._raw_csi_padded_len = tlv_len
self._is_compressed = False
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_CSI_COMPRESSED:
self._raw_csi_tlv = bytes(value)
self._raw_csi_padded_len = tlv_len
self._is_compressed = True
elif tlv_type == SERIALIZED_CSI_TLV_TYPE_CRC32:
if tlv_len != 4:
raise ValueError("Invalid CRC32 TLV")
if tlv_end != len(raw):
raise ValueError("CRC32 TLV must be last")
self.crc32 = int.from_bytes(value, byteorder="little")
computed_crc = binascii.crc32(raw[:tlv_start]) & 0xFFFFFFFF
if computed_crc != self.crc32:
raise ValueError(f"CSI TLV CRC32 mismatch (expected 0x{self.crc32:08x}, computed 0x{computed_crc:08x})")
self._crc_valid = True
offset = tlv_end
if not self._crc_valid:
raise ValueError("CSI TLV CRC32 missing")
if not self.rx_ctrl:
raise ValueError("CSI TLV missing RX CTRL metadata")
if self._raw_csi_tlv is not None:
if self._is_compressed:
logical_csi_len = min(1 + COMPRESSED_TAP_COUNT * 4, self._raw_csi_padded_len)
else:
logical_csi_len = min(wifi_pkt_rx_ctrl_v3_t(self.rx_ctrl).rx_channel_estimate_len, self._raw_csi_padded_len)
self.buf = self._raw_csi_tlv[:logical_csi_len]
self.csi_len = logical_csi_len
def __bytes__(self):
return self._raw
@property
def is_radar(self):
return bool(self.frame_flags & SERIALIZED_CSI_TLV_FRAME_FLAG_IS_RADAR)
@property
def is_calib(self):
return bool(self.frame_flags & SERIALIZED_CSI_TLV_FRAME_FLAG_IS_CALIB)
@property
def first_word_invalid(self):
return bool(self.frame_flags & SERIALIZED_CSI_TLV_FRAME_FLAG_FIRST_WORD_INVALID)
@property
def acquire_force_lltf(self):
return bool(self.acquire_flags & SERIALIZED_CSI_TLV_ACQUIRE_FLAG_FORCE_LLTF)
@property
def acquire_lltf_8bit_mode(self):
return bool(self.acquire_flags & SERIALIZED_CSI_TLV_ACQUIRE_FLAG_LLTF_8BIT_MODE)
@property
def acquire_lltf_bit_mode(self):
return self.acquire_lltf_8bit_mode
@property
def is_compressed(self):
return self._is_compressed
[docs]
class radar_tx_report_tlv_t:
def __init__(self, buf=None):
raw = bytes(buf if buf is not None else b"")
if len(raw) < 4:
raise ValueError("Radar TX report TLV packet too short")
self._raw = raw
self.type_header = int.from_bytes(raw[0:4], byteorder="little")
if self.type_header != SPI_TYPE_HEADER_RADAR_TX_REPORT:
raise ValueError("Unexpected radar TX report type header")
self.source_mac = bytes(6)
self.dest_mac = bytes(6)
self.seq_ctrl = seq_ctrl_t(b"\x00\x00")
self.frame_len = 0
self.software_enqueue_timestamp_us = 0
self.tx_count = 0
self.rfswitch_state = rfswitch_state_t.SENSOR_RFSWITCH_UNKNOWN
self.tx_power = -1
self.flags = 0
self.tx_status = 0
self.ifidx = 0
self.descriptor_slot = 0xFF
self.txdesc_word0 = 0
self.txdesc_word4 = 0
self.txdesc_word8 = 0
self.txdesc_word10 = 0
self.timestamp_reg0 = 0
self.timestamp_reg1 = 0
self.timestamp_reg2 = 0
self.antid = 0xFF
self.crc32 = None
self._crc_valid = False
offset = 4
while offset < len(raw):
if offset + 3 > len(raw):
raise ValueError("Malformed radar TX report TLV header")
tlv_type = raw[offset]
tlv_len = int.from_bytes(raw[offset + 1 : offset + 3], byteorder="little")
tlv_start = offset
offset += 3
tlv_end = offset + tlv_len
if tlv_end > len(raw):
raise ValueError("Malformed radar TX report TLV length")
value = raw[offset:tlv_end]
if tlv_type == RADAR_TX_REPORT_TLV_TYPE_FRAME_META:
if tlv_len < 16:
raise ValueError("Invalid radar TX report frame meta TLV")
self.source_mac = bytes(value[0:6])
self.dest_mac = bytes(value[6:12])
self.seq_ctrl = seq_ctrl_t(value[12:14])
self.frame_len = int.from_bytes(value[14:16], byteorder="little")
elif tlv_type == RADAR_TX_REPORT_TLV_TYPE_TIMING_META:
if tlv_len < 8:
raise ValueError("Invalid radar TX report timing meta TLV")
self.software_enqueue_timestamp_us = int.from_bytes(value[0:8], byteorder="little")
elif tlv_type == RADAR_TX_REPORT_TLV_TYPE_RADAR_META:
if tlv_len < 8:
raise ValueError("Invalid radar TX report radar meta TLV")
self.tx_count = int.from_bytes(value[0:4], byteorder="little")
self.rfswitch_state = value[4]
self.tx_power = value[5]
elif tlv_type == RADAR_TX_REPORT_TLV_TYPE_TX_META:
if tlv_len < 8:
raise ValueError("Invalid radar TX report TX meta TLV")
self.flags = int.from_bytes(value[0:2], byteorder="little")
self.tx_status = value[2]
self.ifidx = value[3]
self.descriptor_slot = value[4]
elif tlv_type == RADAR_TX_REPORT_TLV_TYPE_RAW_META:
if tlv_len < 28:
raise ValueError("Invalid radar TX report raw meta TLV")
self.txdesc_word0 = int.from_bytes(value[0:4], byteorder="little")
self.txdesc_word4 = int.from_bytes(value[4:8], byteorder="little")
self.txdesc_word8 = int.from_bytes(value[8:12], byteorder="little")
self.txdesc_word10 = int.from_bytes(value[12:16], byteorder="little")
self.timestamp_reg0 = int.from_bytes(value[16:20], byteorder="little")
self.timestamp_reg1 = int.from_bytes(value[20:24], byteorder="little")
self.timestamp_reg2 = int.from_bytes(value[24:28], byteorder="little")
elif tlv_type == RADAR_TX_REPORT_TLV_TYPE_CRC32:
if tlv_len != 4:
raise ValueError("Invalid radar TX report CRC32 TLV")
if tlv_end != len(raw):
raise ValueError("Radar TX report CRC32 TLV must be last")
self.crc32 = int.from_bytes(value, byteorder="little")
computed_crc = binascii.crc32(raw[:tlv_start]) & 0xFFFFFFFF
if computed_crc != self.crc32:
raise ValueError(f"Radar TX report TLV CRC32 mismatch (expected 0x{self.crc32:08x}, computed 0x{computed_crc:08x})")
self._crc_valid = True
offset = tlv_end
if not self._crc_valid:
raise ValueError("Radar TX report TLV CRC32 missing")
def __bytes__(self):
return self._raw
@property
def tx_succeeded(self):
return self.tx_status != 0
@property
def has_hardware_tx_timestamp(self):
return bool(self.flags & RADAR_TX_REPORT_FLAG_HAS_HW_TIMESTAMP)
[docs]
def get_hardware_tx_timestamp_ns(self) -> float:
"""
Decode the raw ESP32-C61 TX timestamp registers into sensor-local nanoseconds.
The wire format intentionally keeps the raw register values. This helper mirrors
the low-level recovery formula from the firmware so analysis code can choose
whether and how to use the decoded timestamp.
"""
if not self.has_hardware_tx_timestamp:
return float("nan")
raw = (((int(self.timestamp_reg0) * 80) + (int(self.timestamp_reg1) & 0x7F)) - 640) << 3
return float(raw) * 1.5625
[docs]
def get_hardware_tx_phase_raw(self) -> int:
"""
Extract the apparent signed 11-bit phase-ish field from timestamp register 2.
"""
phase = (int(self.timestamp_reg2) >> 7) & 0x7FF
if phase & 0x400:
phase = 0x800 - phase
return phase
[docs]
def unpack_lltf8_values(buf, pair_count):
values = np.frombuffer(buf[: pair_count * 2], dtype=np.int8).astype(np.float32).view(np.complex64)
return -1.0j * np.conj(values)
def _decode_wire_complex_i16_scaled(buf, pair_count, tap_scale: float):
right_shift = int(np.frombuffer(buf[:1], dtype=np.uint8)[0])
values = np.frombuffer(buf[1 : 1 + pair_count * 4], dtype="<i2").astype(np.float32)
values *= float(1 << right_shift) / tap_scale
return (values[0::2] + 1.0j * values[1::2]).astype(np.complex64)
[docs]
def unpack_lltf12_values(buf, value_count: int) -> np.ndarray:
raw = np.frombuffer(buf[: value_count * 2], dtype=np.uint8)
words = (raw[0::2].astype(np.uint16) | (raw[1::2].astype(np.uint16) << 8)).astype(np.uint16)
return (((words.astype(np.int16) << 4) >> 4)).astype(np.int16)
def _ifftshift_1d(values: np.ndarray) -> np.ndarray:
half = values.shape[-1] // 2
return np.concatenate((values[..., half:], values[..., :half]), axis=-1)
def _fftshift_1d(values: np.ndarray) -> np.ndarray:
half = values.shape[-1] // 2
return np.concatenate((values[..., -half:], values[..., :-half]), axis=-1)
def _centered_fft(values: np.ndarray, fft_size: int) -> np.ndarray:
return _fftshift_1d(np.fft.fft(_ifftshift_1d(values), n=fft_size)).astype(np.complex64)
def _active_slice(fft_size: int, active_count: int) -> slice:
start = (fft_size - active_count) // 2
return slice(start, start + active_count)
def _clamp_s32(value: int) -> int:
return max(min(int(value), (1 << 31) - 1), -(1 << 31))
def _reverse_bits16(x: int, order: int) -> int:
b = x & 0xFFFF
b = (((b & 0xFF00) >> 8) | ((b & 0x00FF) << 8)) & 0xFFFF
b = (((b & 0xF0F0) >> 4) | ((b & 0x0F0F) << 4)) & 0xFFFF
b = (((b & 0xCCCC) >> 2) | ((b & 0x3333) << 2)) & 0xFFFF
b = (((b & 0xAAAA) >> 1) | ((b & 0x5555) << 1)) & 0xFFFF
return b >> (16 - order)
def _fix32_mpy(a: int, b: int, precise_rounding: bool = False) -> int:
mul = int(a) * int(b)
if precise_rounding:
mulval = mul >> 30
tmp = mulval & 0x01
return int((mulval >> 1) + tmp)
return int(mul >> 32)
def _sensor_centered_spectrum_to_ht20_observed_taps_fix32(centered_spectrum: np.ndarray, precise_rounding: bool = False) -> np.ndarray:
return _sensor_centered_spectrum_to_direct_observed_taps_fix32(
centered_spectrum,
COMPRESSED_HT20_FFT_SIZE,
COMPRESSED_HT20_FIX32_SHIFT,
COMPRESSED_HT20_TAP_START,
precise_rounding=precise_rounding,
)
def _sensor_centered_spectrum_to_direct_observed_taps_fix32(
centered_spectrum: np.ndarray,
fft_size: int,
shift: int,
tap_start: int,
precise_rounding: bool = False,
) -> np.ndarray:
scale_divisor = 8.0
fr = np.zeros((fft_size,), dtype=np.int64)
fi = np.zeros((fft_size,), dtype=np.int64)
natural_input = np.conj(np.fft.ifftshift(np.asarray(centered_spectrum, dtype=np.complex64)))
for fft_index, coeff in enumerate(natural_input):
fr[fft_index] = int(np.rint(coeff.real * (1 << shift)))
fi[fft_index] = _clamp_s32(-int(np.rint(coeff.imag * (1 << shift))))
order = int(np.log2(fft_size))
for i in range(1, fft_size):
j = _reverse_bits16(i, order)
if j <= i:
continue
fr[i], fr[j] = fr[j], fr[i]
fi[i], fi[j] = fi[j], fi[i]
stage = 0
l = 1
while l < fft_size:
istep = l << 1
for m in range(l):
angle = (2.0 * np.pi * m) / istep
wr = int(np.rint(np.cos(angle) * np.iinfo(np.int32).max))
wi = int(np.rint(np.sin(angle) * np.iinfo(np.int32).max))
for i in range(m, fft_size, istep):
j = i + l
tmpr = int(fr[j])
tmpi = int(fi[j])
zr = _fix32_mpy(wr, tmpr, precise_rounding) - _fix32_mpy(wi, tmpi, precise_rounding)
zi = _fix32_mpy(wr, tmpi, precise_rounding) + _fix32_mpy(wi, tmpr, precise_rounding)
qr = int(fr[i])
qi = int(fi[i])
if stage & 1:
if precise_rounding:
zr >>= 1
zi >>= 1
qr >>= 1
qi >>= 1
else:
if not precise_rounding:
zr <<= 1
zi <<= 1
fr[j] = _clamp_s32(qr - zr)
fi[j] = _clamp_s32(qi - zi)
fr[i] = _clamp_s32(qr + zr)
fi[i] = _clamp_s32(qi + zi)
l = istep
stage += 1
centered_cir = np.fft.fftshift((fr + 1.0j * fi).astype(np.complex128) / float(1 << shift) / scale_divisor)
observed = np.zeros((COMPRESSED_TAP_COUNT,), dtype=np.complex64)
for i in range(COMPRESSED_TAP_COUNT):
centered_index = tap_start + i
observed[i] = np.complex64(centered_cir[centered_index])
return observed
def _sensor_centered_spectrum_to_ht40_observed_taps_fix32(centered_spectrum: np.ndarray) -> np.ndarray:
fft_size = COMPRESSED_HT40_FFT_SIZE
shift = COMPRESSED_HT40_FIX32_SHIFT
scale_divisor = 8.0
fr = np.zeros((fft_size,), dtype=np.int64)
fi = np.zeros((fft_size,), dtype=np.int64)
natural_input = np.conj(_ifftshift_1d(_sensor_ht40_model_input(np.asarray(centered_spectrum, dtype=np.complex64))))
for fft_index, coeff in enumerate(natural_input):
fr[fft_index] = _clamp_s32(int(np.rint(coeff.real * (1 << shift))))
fi[fft_index] = _clamp_s32(-int(np.rint(coeff.imag * (1 << shift))))
order = int(np.log2(fft_size))
for i in range(1, fft_size):
j = _reverse_bits16(i, order)
if j <= i:
continue
fr[i], fr[j] = fr[j], fr[i]
fi[i], fi[j] = fi[j], fi[i]
stage = 0
l = 1
while l < fft_size:
istep = l << 1
for m in range(l):
angle = (2.0 * np.pi * m) / istep
wr = int(np.rint(np.cos(angle) * np.iinfo(np.int32).max))
wi = int(np.rint(np.sin(angle) * np.iinfo(np.int32).max))
for i in range(m, fft_size, istep):
j = i + l
tmpr = int(fr[j])
tmpi = int(fi[j])
zr = _fix32_mpy(wr, tmpr) - _fix32_mpy(wi, tmpi)
zi = _fix32_mpy(wr, tmpi) + _fix32_mpy(wi, tmpr)
qr = int(fr[i])
qi = int(fi[i])
if stage & 1:
qr >>= 1
qi >>= 1
else:
zr = _clamp_s32(zr << 1)
zi = _clamp_s32(zi << 1)
fr[j] = _clamp_s32(qr - zr)
fi[j] = _clamp_s32(qi - zi)
fr[i] = _clamp_s32(qr + zr)
fi[i] = _clamp_s32(qi + zi)
l = istep
stage += 1
centered_cir = _fftshift_1d((fr + 1.0j * fi).astype(np.complex128) / float(1 << shift) / scale_divisor)
observed = np.zeros((COMPRESSED_TAP_COUNT,), dtype=np.complex64)
for i in range(COMPRESSED_TAP_COUNT):
centered_index = COMPRESSED_HT40_TAP_START + i
observed[i] = np.complex64(centered_cir[centered_index])
return observed
def _sensor_ht40_model_input(centered_spectrum: np.ndarray) -> np.ndarray:
modeled = np.zeros_like(centered_spectrum, dtype=np.complex64)
active = _active_slice(COMPRESSED_HT40_FFT_SIZE, HT_COEFFICIENTS_PER_CHANNEL * 2 + HT40_GAP_SUBCARRIERS)
lower = slice(active.start, active.start + HT_COEFFICIENTS_PER_CHANNEL)
gap = slice(active.start + HT_COEFFICIENTS_PER_CHANNEL, active.start + HT_COEFFICIENTS_PER_CHANNEL + HT40_GAP_SUBCARRIERS)
higher = slice(gap.stop, active.stop)
modeled[lower] = centered_spectrum[lower]
modeled[gap] = 0.0
modeled[higher] = centered_spectrum[higher]
return modeled
def _sensor_centered_spectrum_to_lltf_force_observed_taps_fix32(centered_spectrum: np.ndarray) -> np.ndarray:
fft_size = COMPRESSED_LLTF_FFT_SIZE
shift = COMPRESSED_LLTF_FIX32_SHIFT
scale_divisor = 8.0
active_start = (fft_size - LEGACY_COEFFICIENTS_PER_CHANNEL) // 2
fr = np.zeros((fft_size,), dtype=np.int64)
fi = np.zeros((fft_size,), dtype=np.int64)
coeff = np.zeros((LEGACY_COEFFICIENTS_PER_CHANNEL,), dtype=np.complex64)
active_spectrum = np.asarray(centered_spectrum, dtype=np.complex64)
coeff[:-1:2] = active_spectrum[:-1:2]
coeff[-1] = 2.0 * coeff[-3] - coeff[-5]
coeff[1::2] = 0.5 * (coeff[0:-1:2] + coeff[2::2])
for i in range(LEGACY_COEFFICIENTS_PER_CHANNEL):
centered_index = active_start + i
fft_index = (centered_index + fft_size // 2) % fft_size
fr[fft_index] = int(np.rint(coeff[i].real * (1 << shift)))
fi[fft_index] = int(np.rint(coeff[i].imag * (1 << shift)))
order = int(np.log2(fft_size))
for i in range(1, fft_size):
j = _reverse_bits16(i, order)
if j <= i:
continue
fr[i], fr[j] = fr[j], fr[i]
fi[i], fi[j] = fi[j], fi[i]
stage = 0
l = 1
while l < fft_size:
istep = l << 1
for m in range(l):
angle = (2.0 * np.pi * m) / istep
wr = int(np.rint(np.cos(angle) * np.iinfo(np.int32).max))
wi = int(np.rint(np.sin(angle) * np.iinfo(np.int32).max))
for i in range(m, fft_size, istep):
j = i + l
tmpr = int(fr[j])
tmpi = int(fi[j])
zr = _fix32_mpy(wr, tmpr) - _fix32_mpy(wi, tmpi)
zi = _fix32_mpy(wr, tmpi) + _fix32_mpy(wi, tmpr)
qr = int(fr[i])
qi = int(fi[i])
if stage & 1:
qr >>= 1
qi >>= 1
else:
zr <<= 1
zi <<= 1
fr[j] = _clamp_s32(qr - zr)
fi[j] = _clamp_s32(qi - zi)
fr[i] = _clamp_s32(qr + zr)
fi[i] = _clamp_s32(qi + zi)
l = istep
stage += 1
observed = np.zeros((COMPRESSED_TAP_COUNT,), dtype=np.complex64)
for i in range(COMPRESSED_TAP_COUNT):
centered_index = COMPRESSED_LLTF_TAP_START + i
fft_index = (centered_index + fft_size // 2) % fft_size
observed[i] = np.complex64((float(fr[fft_index]) + 1.0j * float(fi[fft_index])) / float(1 << shift) / scale_divisor)
return observed
def _sensor_centered_spectrum_to_lltf_observed_taps_fix32(centered_spectrum: np.ndarray) -> np.ndarray:
fft_size = COMPRESSED_LLTF_FFT_SIZE
shift = COMPRESSED_LLTF_FIX32_SHIFT
scale_divisor = 8.0
active_start = (fft_size - LEGACY_COEFFICIENTS_PER_CHANNEL) // 2
fr = np.zeros((fft_size,), dtype=np.int64)
fi = np.zeros((fft_size,), dtype=np.int64)
coeff = np.asarray(centered_spectrum, dtype=np.complex64).copy()
coeff[LEGACY_COEFFICIENTS_PER_CHANNEL - 1] = np.complex64(coeff[-1].real + 1.0j * coeff[-3].imag)
coeff[LEGACY_COEFFICIENTS_PER_CHANNEL // 2] = 0.5 * (coeff[LEGACY_COEFFICIENTS_PER_CHANNEL // 2 - 2] + coeff[LEGACY_COEFFICIENTS_PER_CHANNEL // 2 + 2])
coeff[1::2] = 0.5 * (coeff[0:-1:2] + coeff[2::2])
for i in range(LEGACY_COEFFICIENTS_PER_CHANNEL):
centered_index = active_start + i
fft_index = (centered_index + fft_size // 2) % fft_size
fr[fft_index] = _clamp_s32(int(np.rint(coeff[i].real * (1 << shift))))
fi[fft_index] = _clamp_s32(int(np.rint(coeff[i].imag * (1 << shift))))
order = int(np.log2(fft_size))
for i in range(1, fft_size):
j = _reverse_bits16(i, order)
if j <= i:
continue
fr[i], fr[j] = fr[j], fr[i]
fi[i], fi[j] = fi[j], fi[i]
stage = 0
l = 1
while l < fft_size:
istep = l << 1
for m in range(l):
angle = (2.0 * np.pi * m) / istep
wr = int(np.rint(np.cos(angle) * np.iinfo(np.int32).max))
wi = int(np.rint(np.sin(angle) * np.iinfo(np.int32).max))
for i in range(m, fft_size, istep):
j = i + l
tmpr = int(fr[j])
tmpi = int(fi[j])
zr = _fix32_mpy(wr, tmpr) - _fix32_mpy(wi, tmpi)
zi = _fix32_mpy(wr, tmpi) + _fix32_mpy(wi, tmpr)
qr = int(fr[i])
qi = int(fi[i])
if stage & 1:
qr >>= 1
qi >>= 1
else:
zr = _clamp_s32(zr << 1)
zi = _clamp_s32(zi << 1)
fr[j] = _clamp_s32(qr - zr)
fi[j] = _clamp_s32(qi - zi)
fr[i] = _clamp_s32(qr + zr)
fi[i] = _clamp_s32(qi + zi)
l = istep
stage += 1
observed = np.zeros((COMPRESSED_TAP_COUNT,), dtype=np.complex64)
for i in range(COMPRESSED_TAP_COUNT):
centered_index = COMPRESSED_LLTF_TAP_START + i
fft_index = (centered_index + fft_size // 2) % fft_size
observed[i] = np.complex64((float(fr[fft_index]) + 1.0j * float(fi[fft_index])) / float(1 << shift) / scale_divisor)
return observed
def _sensor_centered_spectrum_to_lltf_8bit_mode_observed_taps_fix32(centered_spectrum: np.ndarray) -> np.ndarray:
active = _active_slice(COMPRESSED_LLTF_FFT_SIZE, LEGACY_COEFFICIENTS_PER_CHANNEL)
modeled = np.zeros((COMPRESSED_LLTF_FFT_SIZE,), dtype=np.complex64)
modeled[active] = np.asarray(centered_spectrum, dtype=np.complex64)
modeled[active.start + LEGACY_COEFFICIENTS_PER_CHANNEL // 2] = 0.0
return _sensor_centered_spectrum_to_direct_observed_taps_fix32(
modeled,
COMPRESSED_LLTF_FFT_SIZE,
COMPRESSED_LLTF_8BIT_MODE_FIX32_SHIFT,
COMPRESSED_LLTF_TAP_START,
)
def _build_ht20_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
active = _active_slice(COMPRESSED_HT20_FFT_SIZE, HT_COEFFICIENTS_PER_CHANNEL)
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_HT20_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_HT20_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_HT20_FFT_SIZE)
masked_spectrum = np.zeros((COMPRESSED_HT20_FFT_SIZE,), dtype=np.complex64)
masked_spectrum[active] = centered_spectrum[active]
masked_spectrum[active][HT_COEFFICIENTS_PER_CHANNEL // 2] = 0.0
correction[:, col] = _sensor_centered_spectrum_to_ht20_observed_taps_fix32(masked_spectrum)
return np.linalg.pinv(correction).astype(np.complex64)
def _build_he20_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
active = _active_slice(COMPRESSED_HE20_FFT_SIZE, HE20_COEFFICIENTS_PER_CHANNEL)
gap_start = active.start + (HE20_COEFFICIENTS_PER_CHANNEL // 2) - 1
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_HE20_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_HE20_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_HE20_FFT_SIZE)
masked_spectrum = np.zeros((COMPRESSED_HE20_FFT_SIZE,), dtype=np.complex64)
masked_spectrum[active] = centered_spectrum[active]
masked_spectrum[gap_start : gap_start + HT40_GAP_SUBCARRIERS] = 0.0
correction[:, col] = _sensor_centered_spectrum_to_direct_observed_taps_fix32(
masked_spectrum,
COMPRESSED_HE20_FFT_SIZE,
COMPRESSED_HE20_FIX32_SHIFT,
COMPRESSED_HE20_TAP_START,
)
return np.linalg.pinv(correction).astype(np.complex64)
def _build_lltf_force_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_LLTF_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_LLTF_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_LLTF_FFT_SIZE)
active_spectrum = centered_spectrum[_active_slice(COMPRESSED_LLTF_FFT_SIZE, LEGACY_COEFFICIENTS_PER_CHANNEL)].copy()
correction[:, col] = _sensor_centered_spectrum_to_lltf_force_observed_taps_fix32(active_spectrum)
return np.linalg.pinv(correction).astype(np.complex64)
def _build_lltf_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_LLTF_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_LLTF_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_LLTF_FFT_SIZE)
active_spectrum = centered_spectrum[_active_slice(COMPRESSED_LLTF_FFT_SIZE, LEGACY_COEFFICIENTS_PER_CHANNEL)].copy()
correction[:, col] = _sensor_centered_spectrum_to_lltf_observed_taps_fix32(active_spectrum)
return np.linalg.pinv(correction).astype(np.complex64)
def _build_lltf_8bit_mode_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_LLTF_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_LLTF_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_LLTF_FFT_SIZE)
active_spectrum = centered_spectrum[_active_slice(COMPRESSED_LLTF_FFT_SIZE, LEGACY_COEFFICIENTS_PER_CHANNEL)].copy()
correction[:, col] = _sensor_centered_spectrum_to_lltf_8bit_mode_observed_taps_fix32(active_spectrum)
return np.linalg.pinv(correction).astype(np.complex64)
def _build_ht40_fix32_tap_correction() -> np.ndarray:
correction = np.zeros((COMPRESSED_TAP_COUNT, COMPRESSED_TAP_COUNT), dtype=np.complex64)
active = _active_slice(COMPRESSED_HT40_FFT_SIZE, HT_COEFFICIENTS_PER_CHANNEL * 2 + HT40_GAP_SUBCARRIERS)
gap_start = active.start + HT_COEFFICIENTS_PER_CHANNEL
for col in range(COMPRESSED_TAP_COUNT):
centered_cir = np.zeros((COMPRESSED_HT40_FFT_SIZE,), dtype=np.complex64)
centered_cir[COMPRESSED_HT40_TAP_START + col] = 1.0
centered_spectrum = _centered_fft(centered_cir, COMPRESSED_HT40_FFT_SIZE)
masked_spectrum = np.zeros((COMPRESSED_HT40_FFT_SIZE,), dtype=np.complex64)
masked_spectrum[active] = centered_spectrum[active]
masked_spectrum[gap_start : gap_start + HT40_GAP_SUBCARRIERS] = 0.0
correction[:, col] = _sensor_centered_spectrum_to_ht40_observed_taps_fix32(masked_spectrum)
return np.linalg.pinv(correction).astype(np.complex64)
_COMPRESSED_LLTF_FORCE_FIX32_CORRECTION = _build_lltf_force_fix32_tap_correction()
_COMPRESSED_LLTF_FIX32_CORRECTION = _build_lltf_fix32_tap_correction()
_COMPRESSED_LLTF_8BIT_MODE_FIX32_CORRECTION = _build_lltf_8bit_mode_fix32_tap_correction()
_COMPRESSED_HT20_FIX32_CORRECTION = _build_ht20_fix32_tap_correction()
_COMPRESSED_HT40_FIX32_CORRECTION = _build_ht40_fix32_tap_correction()
_COMPRESSED_HE20_FIX32_CORRECTION = _build_he20_fix32_tap_correction()
[docs]
def interpolate_lltf_gap(csi_lltf: np.ndarray) -> None:
"""
Fill the L-LTF DC subcarrier by linear interpolation in place.
:param csi_lltf: Complex L-LTF CSI array. The last dimension must contain
:data:`LEGACY_COEFFICIENTS_PER_CHANNEL` subcarriers in ascending order
``-26..26``. Any leading dimensions are preserved.
"""
dc_index = LEGACY_COEFFICIENTS_PER_CHANNEL // 2
csi_lltf[..., dc_index] = 0.5 * (csi_lltf[..., dc_index - 1] + csi_lltf[..., dc_index + 1])
[docs]
def interpolate_ht20ltf_gap(csi_ht20: np.ndarray) -> None:
"""
Fill the HT20-LTF DC subcarrier by linear interpolation in place.
:param csi_ht20: Complex HT20-LTF CSI array. The last dimension must contain
:data:`HT_COEFFICIENTS_PER_CHANNEL` subcarriers in ascending order
``-28..28``. Any leading dimensions are preserved.
"""
dc_index = HT_COEFFICIENTS_PER_CHANNEL // 2
csi_ht20[..., dc_index] = 0.5 * (csi_ht20[..., dc_index - 1] + csi_ht20[..., dc_index + 1])
[docs]
def interpolate_ht40ltf_gap(csi_ht40: np.ndarray) -> None:
"""
Fill the three HT40 gap subcarriers between primary and secondary channel in place.
:param csi_ht40: Complex HT40-LTF CSI array. The last dimension must contain
``2 * HT_COEFFICIENTS_PER_CHANNEL + HT40_GAP_SUBCARRIERS`` subcarriers
in ascending order ``-58..58``. Any leading dimensions are preserved.
"""
index_left = HT_COEFFICIENTS_PER_CHANNEL - 1
index_right = HT_COEFFICIENTS_PER_CHANNEL + HT40_GAP_SUBCARRIERS
missing_indices = np.arange(index_left + 1, index_right)
left = csi_ht40[..., index_left]
right = csi_ht40[..., index_right]
interp = (missing_indices - index_left) / (index_right - index_left)
csi_ht40[..., missing_indices] = interp * right[..., np.newaxis] + (1 - interp) * left[..., np.newaxis]
[docs]
def interpolate_he20ltf_gaps(csi_he20: np.ndarray) -> None:
"""
Fill the HE20 invalid subcarriers ``-1, 0, 1`` by linear interpolation in place.
:param csi_he20: Complex HE20-LTF CSI array. The last dimension must contain
:data:`HE20_COEFFICIENTS_PER_CHANNEL` subcarriers in ascending order
``-122..122``. Any leading dimensions are preserved.
"""
center_index = HE20_COEFFICIENTS_PER_CHANNEL // 2
index_left = center_index - 2
index_right = center_index + 2
missing_indices = np.arange(index_left + 1, index_right)
left = csi_he20[..., index_left]
right = csi_he20[..., index_right]
interp = (missing_indices - index_left) / (index_right - index_left)
csi_he20[..., missing_indices] = interp * right[..., np.newaxis] + (1 - interp) * left[..., np.newaxis]
def _decode_compressed_tap_window(
buf,
fft_size: int,
tap_start: int,
tap_count: int,
active_count: int,
correction: np.ndarray,
tap_scale: float,
) -> np.ndarray:
observed_taps = _decode_wire_complex_i16_scaled(buf, tap_count, tap_scale)
corrected_taps = np.matmul(correction, observed_taps.astype(np.complex64))
centered_cir = np.zeros((fft_size,), dtype=np.complex64)
centered_cir[tap_start : tap_start + tap_count] = corrected_taps
centered_spectrum = _centered_fft(centered_cir, fft_size)
return centered_spectrum[_active_slice(fft_size, active_count)].copy()
[docs]
def decode_compressed_lltf(buf, acquire_force_lltf: bool = False, lltf_8bit_mode: bool = False, **kwargs) -> np.ndarray:
if "lltf_bit_mode" in kwargs:
lltf_8bit_mode = bool(kwargs.pop("lltf_bit_mode"))
if kwargs:
raise TypeError(f"unexpected keyword argument: {next(iter(kwargs))}")
if lltf_8bit_mode:
correction = _COMPRESSED_LLTF_8BIT_MODE_FIX32_CORRECTION
shift = COMPRESSED_LLTF_8BIT_MODE_FIX32_SHIFT
else:
correction = _COMPRESSED_LLTF_FORCE_FIX32_CORRECTION if acquire_force_lltf else _COMPRESSED_LLTF_FIX32_CORRECTION
shift = COMPRESSED_LLTF_FIX32_SHIFT
spectrum = _decode_compressed_tap_window(
buf,
COMPRESSED_LLTF_FFT_SIZE,
COMPRESSED_LLTF_TAP_START,
COMPRESSED_TAP_COUNT,
LEGACY_COEFFICIENTS_PER_CHANNEL,
correction,
float((1 << shift) * 8.0),
)
if lltf_8bit_mode:
interpolate_lltf_gap(spectrum)
return spectrum
if acquire_force_lltf:
spectrum[-1] = 2.0 * spectrum[-3] - spectrum[-5]
spectrum[1::2] = 0.5 * (spectrum[0:-1:2] + spectrum[2::2])
else:
spectrum[-1] = np.complex64(spectrum[-1].real + 1.0j * spectrum[-3].imag)
spectrum[LEGACY_COEFFICIENTS_PER_CHANNEL // 2] = 0.5 * (spectrum[LEGACY_COEFFICIENTS_PER_CHANNEL // 2 - 2] + spectrum[LEGACY_COEFFICIENTS_PER_CHANNEL // 2 + 2])
spectrum[1::2] = 0.5 * (spectrum[0:-1:2] + spectrum[2::2])
return spectrum
[docs]
def decode_compressed_ht20(buf) -> np.ndarray:
spectrum = _decode_compressed_tap_window(
buf,
COMPRESSED_HT20_FFT_SIZE,
COMPRESSED_HT20_TAP_START,
COMPRESSED_TAP_COUNT,
HT_COEFFICIENTS_PER_CHANNEL,
_COMPRESSED_HT20_FIX32_CORRECTION,
float((1 << COMPRESSED_HT20_FIX32_SHIFT) * 8.0),
)
interpolate_ht20ltf_gap(spectrum)
return spectrum
[docs]
def decode_compressed_ht40(buf) -> np.ndarray:
spectrum = _decode_compressed_tap_window(
buf,
COMPRESSED_HT40_FFT_SIZE,
COMPRESSED_HT40_TAP_START,
COMPRESSED_TAP_COUNT,
HT_COEFFICIENTS_PER_CHANNEL * 2 + HT40_GAP_SUBCARRIERS,
_COMPRESSED_HT40_FIX32_CORRECTION,
float((1 << COMPRESSED_HT40_FIX32_SHIFT) * 8.0),
)
gap_start = HT_COEFFICIENTS_PER_CHANNEL
spectrum[gap_start : gap_start + HT40_GAP_SUBCARRIERS] = 0.0
return spectrum
[docs]
def decode_compressed_he20(buf) -> np.ndarray:
spectrum = _decode_compressed_tap_window(
buf,
COMPRESSED_HE20_FFT_SIZE,
COMPRESSED_HE20_TAP_START,
COMPRESSED_TAP_COUNT,
HE20_COEFFICIENTS_PER_CHANNEL,
_COMPRESSED_HE20_FIX32_CORRECTION,
float((1 << COMPRESSED_HE20_FIX32_SHIFT) * 8.0),
)
interpolate_he20ltf_gaps(spectrum)
return spectrum
def _extract_signed15(x: int) -> int:
x &= 0x7FFF
return x - 0x8000 if (x & 0x4000) else x
[docs]
def get_cfo_from_rx_ctrl(rx_ctrl) -> int:
"""
Compute the CFO value (in Hz) from a `wifi_pkt_rx_ctrl_v3_t` byte buffer.
This was reverse engineered from librftest.a (bb_common.o) and libphy.a (phy_feature.o).
Espressif's RX metadata exposes two CFO fields. The low-rate field is used
when the derived ``rate_index`` is below 8 (802.11b); otherwise the high-rate
field is used (802.11g/n/ax).
"""
ctrl = rx_ctrl if isinstance(rx_ctrl, wifi_pkt_rx_ctrl_v3_t) else wifi_pkt_rx_ctrl_v3_t(rx_ctrl)
if ctrl.sig_mode == wifi_sig_mode_t.SIG_MODE_LEGACY:
rate_index = ctrl.rate
else:
rate_index = (ctrl.sig_mode << 4) + (ctrl.he_siga1 & 0x7F)
if rate_index < 8:
return float(_extract_signed15(ctrl.cfo_low_rate) / -48) * 25000 * 5 / 128
return float((_extract_signed15(ctrl.cfo_high_rate) * -5) / 128) * 25000 * 5 / 128
[docs]
def deserialize_packet_buffer(revision, pktbuf):
"""
Deserialize a raw stream payload into the appropriate packet structure based on the type header.
"""
type_header = int.from_bytes(pktbuf[0:4], byteorder="little")
if type_header == revision.type_header:
return revision.serialized_csi_t(pktbuf)
if type_header == SPI_TYPE_HEADER_RADAR_TX_REPORT:
return radar_tx_report_tlv_t(pktbuf)
raise ValueError("Unexpected logical packet type header")
[docs]
def csistream_uid_to_antid(uid: int) -> int:
return (uid >> CSISTREAM_UID_SENSOR_SHIFT) & CSISTREAM_UID_SENSOR_MASK
[docs]
def parse_csistream_jumbo_message(message: bytes) -> bytes:
if len(message) < 4:
raise ValueError("CSI stream message too short")
jumbo = bytes(message)
if int.from_bytes(jumbo[:4], byteorder="little") != SPI_TYPE_HEADER_JUMBO_FRAME:
raise ValueError("CSI stream message does not contain a jumbo frame")
return jumbo
[docs]
def iter_csistream_fragments(jumbo: bytes):
if len(jumbo) < 4:
raise ValueError("Jumbo frame too short")
if int.from_bytes(jumbo[:4], byteorder="little") != SPI_TYPE_HEADER_JUMBO_FRAME:
raise ValueError("Invalid jumbo frame type header")
offset = 4
header_size = ctypes.sizeof(csistream_fragment_header_t)
while offset + header_size <= len(jumbo):
header = csistream_fragment_header_t(jumbo[offset : offset + header_size])
offset += header_size
if header.uid == JUMBO_FRAGMENT_TERMINATOR_UID:
return
end = offset + header.size
if end > len(jumbo):
raise ValueError("Fragment exceeds jumbo frame boundary")
yield header, jumbo[offset:end]
offset = end
raise ValueError("Jumbo frame terminator missing")