#!/usr/bin/env python
import numpy as np
import binascii
import time
from . import revisions
from . import constants
from . import csi
_RAW_LLTF_BYTES = csi.LEGACY_COEFFICIENTS_PER_CHANNEL * 2
_RAW_HT20_BYTES = csi.HT_COEFFICIENTS_PER_CHANNEL * 2
_RAW_HT40_BYTES = (csi.HT_COEFFICIENTS_PER_CHANNEL * 2) + (csi.HT40_GAP_SUBCARRIERS * 2) + (csi.HT_COEFFICIENTS_PER_CHANNEL * 2)
_RAW_HE20_BYTES = csi.HE20_COEFFICIENTS_PER_CHANNEL * 2
[docs]
class CSICluster(object):
"""
A CSICluster object represents a collection of CSI data estimated for the same WiFi packet.
The class clusters the CSI data from multiple ESPARGOS sensors (antennas), which may belong to the same or different ESPARGOS boards.
It is used to store CSI data until it is complete and can be provided to a callback.
CSI data may be from calibration packets or over-the-air packets.
"""
def __init__(
self,
source_mac: str,
dest_mac: str,
seq_ctrl: csi.seq_ctrl_t,
board_revisions: list[revisions.BoardRevision],
):
"""
Constructor for the CSICluster class.
All channel coefficients added to this class belong to the same WiFi packet,
so they share the same source and destination MAC addresses and sequence control field.
The constructor pre-allocates memory for the CSI data.
:param source_mac: The source MAC address of the WiFi packet
:param dest_mac: The destination MAC address of the WiFi packet
:param seq_ctrl: The sequence control field of the WiFi packet
:param board_revisions: The ESPARGOS board revisions in the pool
"""
self.source_mac = source_mac
self.dest_mac = dest_mac
self.seq_ctrl = seq_ctrl
self.timestamp = time.time()
self.board_revisions = board_revisions
self.serialized_csi_all = [[[None for c in range(constants.ANTENNAS_PER_ROW)] for r in range(constants.ROWS_PER_BOARD)] for b in self.board_revisions]
self.radar_tx_report = None
self.radar_tx_index = -1
self.shape = (
len(self.board_revisions),
constants.ROWS_PER_BOARD,
constants.ANTENNAS_PER_ROW,
)
# Remember which sensors have already provided CSI data
self.csi_completion_state = np.full(self.shape, False)
self.csi_completion_state_all = False
# Allocate memory for the RSSI, gain, rf switch state and noise floor values
self.rssi_all = np.full(self.shape, fill_value=np.nan, dtype=np.float32)
self.rx_gain_all = np.full(self.shape, fill_value=np.nan, dtype=np.float32)
self.fft_gain_all = np.full(self.shape, fill_value=np.nan, dtype=np.float32)
self.rfswitch_state_all = np.full(self.shape, fill_value=csi.rfswitch_state_t.SENSOR_RFSWITCH_UNKNOWN, dtype=np.uint8)
self.noise_floor_all = np.full(self.shape, fill_value=np.nan, dtype=np.float32)
self.cfo_all = np.full(self.shape, fill_value=np.nan, dtype=np.float32)
self.lltf_8bit_mode_all = np.full(self.shape, fill_value=False, dtype=np.bool_)
self.gain_table_entry_raw_all = np.zeros(self.shape + (12,), dtype=np.uint8)
self.gain_table_entry_valid_all = np.full(self.shape, fill_value=False)
[docs]
def add_csi(
self,
board_num: int,
esp_num: int,
serialized_csi: csi.serialized_csi_tlv_t,
):
"""
Add CSI data to the cluster.
:param board_num: The number of the ESPARGOS board that received the CSI data
:param esp_num: The number of the ESPARGOS sensor within that board that received the CSI data
:param serialized_csi: The serialized CSI data
:param csi_cplx: The complex-valued CSI data
"""
assert binascii.hexlify(bytearray(serialized_csi.source_mac)).decode("utf-8") == self.source_mac
assert binascii.hexlify(bytearray(serialized_csi.dest_mac)).decode("utf-8") == self.dest_mac
assert serialized_csi.seq_ctrl.seg == self.seq_ctrl.seg
assert serialized_csi.seq_ctrl.frag == self.seq_ctrl.frag
if self.radar_tx_report is not None:
assert serialized_csi.is_radar
# TODO: Assert that esp_num matches self-identified antenna ID
# Convert esp_num to row and column, mapping may differ across board revisions
row, col = self.board_revisions[board_num].esp_num_to_row_col(esp_num)
# Store CSI data to pre-allocated memory
self.serialized_csi_all[board_num][row][col] = serialized_csi
# self.complex_csi_all[board_num, row, col] = csi_cplx # TODO: Will not work for V3 :(
self.csi_completion_state[board_num, row, col] = True
self.csi_completion_state_all = np.all(self.csi_completion_state)
# Handle signed values for RSSI and noise floor (stored as uint32_t in rx_ctrl due to ctypes packing limitations)
rx_ctrl = csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl)
rssi = rx_ctrl.rssi
noise_floor = rx_ctrl.noise_floor
self.rssi_all[board_num, row, col] = (rssi - 0x100) if (rssi & 0x80) else rssi
self.rx_gain_all[board_num, row, col] = csi.gain_byte_to_signed(rx_ctrl.rx_gain)
self.fft_gain_all[board_num, row, col] = csi.gain_byte_to_signed(rx_ctrl.fft_gain)
self.noise_floor_all[board_num, row, col] = (noise_floor - 0x100) if (noise_floor & 0x80) else noise_floor
self.rfswitch_state_all[board_num, row, col] = serialized_csi.rfswitch_state
self.cfo_all[board_num, row, col] = csi.get_cfo_from_rx_ctrl(serialized_csi.rx_ctrl)
self.lltf_8bit_mode_all[board_num, row, col] = serialized_csi.acquire_lltf_8bit_mode
self.gain_table_entry_valid_all[board_num, row, col] = bool(serialized_csi.gain_table_entry_valid)
if serialized_csi.gain_table_entry_valid:
self.gain_table_entry_raw_all[board_num, row, col, :] = np.frombuffer(serialized_csi.gain_table_entry_raw, dtype=np.uint8)
[docs]
def set_radar_tx_report(self, radar_tx_report: csi.radar_tx_report_tlv_t, board_num: int | None = None, esp_num: int | None = None):
"""
Attach radar transmit metadata for the Wi-Fi packet represented by this cluster.
"""
assert binascii.hexlify(bytearray(radar_tx_report.source_mac)).decode("utf-8") == self.source_mac
assert binascii.hexlify(bytearray(radar_tx_report.dest_mac)).decode("utf-8") == self.dest_mac
assert radar_tx_report.seq_ctrl.seg == self.seq_ctrl.seg
assert radar_tx_report.seq_ctrl.frag == self.seq_ctrl.frag
serialized_csi = self._first_complete_sensor()
if serialized_csi is not None:
assert serialized_csi.is_radar
if self.radar_tx_report is not None:
assert bytes(self.radar_tx_report) == bytes(radar_tx_report)
self.radar_tx_report = radar_tx_report
if board_num is not None and esp_num is not None:
row, col = self.board_revisions[board_num].esp_num_to_row_col(esp_num)
self.radar_tx_index = board_num * constants.ROWS_PER_BOARD * constants.ANTENNAS_PER_ROW + row * constants.ANTENNAS_PER_ROW + col
[docs]
def has_radar_tx_report(self) -> bool:
"""
Check whether this cluster has transmit-side radar metadata attached.
"""
return self.radar_tx_report is not None
[docs]
def get_radar_tx_info(self):
"""
Return the transmit-side radar report for this packet, or None if not available.
"""
return self.radar_tx_report
[docs]
def get_radar_tx_index(self) -> int:
"""
Return the flattened TX sensor index derived from the CSI stream UID, or -1 if unknown.
"""
return int(self.radar_tx_index)
[docs]
def deserialize_csi_lltf(self):
"""
Deserialize the L-LTF part of the CSI data.
:return: The L-LTF part of the CSI data as a complex-valued numpy array of shape :code:`(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.LEGACY_COEFFICIENTS_PER_CHANNEL)`
"""
assert self.has_lltf()
csi_lltf = np.zeros(self.shape + (csi.LEGACY_COEFFICIENTS_PER_CHANNEL,), dtype=np.complex64)
def deserialize_lltf_packet(b, r, a, serialized_csi):
nonlocal csi_lltf
csi_lltf_sensor = csi_lltf[b, r, a, :].view()
if serialized_csi.is_compressed:
csi_lltf_sensor[:] = csi.decode_compressed_lltf(serialized_csi.buf, serialized_csi.acquire_force_lltf, serialized_csi.acquire_lltf_8bit_mode)
return
lltf_bytes = np.frombuffer(serialized_csi.buf[:_RAW_LLTF_BYTES], dtype=np.uint8)
if serialized_csi.acquire_lltf_8bit_mode:
csi_lltf_sensor[:] = csi.unpack_lltf8_values(serialized_csi.buf, csi.LEGACY_COEFFICIENTS_PER_CHANNEL)
csi.interpolate_lltf_gap(csi_lltf_sensor)
elif serialized_csi.acquire_force_lltf:
# In forced LLTF mode the ESP32-C61 reports 52 signed 12-bit values:
# 26 complex coefficients for every second subcarrier, including DC.
# The last active subcarrier is not measured and must be extrapolated.
lltf_all = csi.unpack_lltf12_values(lltf_bytes, 52)
csi_lltf_sensor[:-1:2] = lltf_all.astype(np.float32).view(np.complex64)
csi_lltf_sensor[-1] = 2 * csi_lltf_sensor[-3] - csi_lltf_sensor[-5]
else:
# Native 11g LLTF carries 26 complex coefficients for even-indexed
# subcarriers plus a final real-only sample for the last subcarrier.
lltf_all = csi.unpack_lltf12_values(lltf_bytes, 53)
even_coeffs = lltf_all[:52].astype(np.float32).view(np.complex64)
csi_lltf_sensor[0:52:2] = even_coeffs
csi_lltf_sensor[-1] = lltf_all[52].astype(np.float32) + 1.0j * csi_lltf_sensor[-3].imag
# DC subcarrier. In sparse 12-bit LLTF this is only provided when
# force LLTF is true. 8-bit LLTF was handled above like HT20.
if not serialized_csi.acquire_force_lltf and not serialized_csi.acquire_lltf_8bit_mode:
dc_subcarrier_index = csi.LEGACY_COEFFICIENTS_PER_CHANNEL // 2
csi_lltf_sensor[dc_subcarrier_index] = (csi_lltf_sensor[dc_subcarrier_index - 2] + csi_lltf_sensor[dc_subcarrier_index + 2]) / 2.0
# Interpolate to get full 53 subcarriers
if not serialized_csi.acquire_lltf_8bit_mode:
csi_lltf_sensor[1::2] = 0.5 * (csi_lltf_sensor[0:-1:2] + csi_lltf_sensor[2::2])
self._foreach_complete_sensor(deserialize_lltf_packet)
# Need to take timestamps into account to provide phase coherence across all sensors
delay = self.get_sensor_timestamps()
subcarrier_range = csi.get_csi_format_subcarrier_indices("lltf").astype(np.float64)[np.newaxis, np.newaxis, np.newaxis, :]
# Need to adjust range if using 40MHz wide channel since LO is either above or below the primary channel that L-LTF is on
subcarrier_range -= self.get_secondary_channel_relative() * int(2 * constants.WIFI_CHANNEL_SPACING / constants.WIFI_SUBCARRIER_SPACING)
sto_delay_correction = np.exp(-1.0j * 2 * np.pi * delay[:, :, :, np.newaxis] * constants.WIFI_SUBCARRIER_SPACING * subcarrier_range)
csi_lltf = np.einsum("bras,bras->bras", csi_lltf, sto_delay_correction)
return csi_lltf
[docs]
def deserialize_csi_ht20ltf(self):
"""
Deserialize the HT20 (HT-LTF without channel bonding) part of the CSI data.
:return: The HT-LTF part of the CSI data as a complex-valued numpy array of shape :code:`(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL)`
"""
assert self.has_ht20ltf()
csi_ht20 = np.zeros(self.shape + (csi.HT_COEFFICIENTS_PER_CHANNEL,), dtype=np.complex64)
def deserialize_ht20_packet(b, r, a, serialized_csi):
nonlocal csi_ht20
csi_ht20_sensor = csi_ht20[b, r, a, :].view()
if serialized_csi.is_compressed:
csi_ht20_sensor[:] = csi.decode_compressed_ht20(serialized_csi.buf)
return
# The ESP32 provides CSI as int8_t values in (im, re) pairs (in this order!)
# To go from the (re, im) interpretation to (im, re), compute conjugate and multiply by 1.0j.
# If channel bonding is used, provide CSI of primary channel
if csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).he_siga1 & 0x80 != 0:
ht40_bytes = np.frombuffer(serialized_csi.buf[:_RAW_HT40_BYTES], dtype=np.int8)
htltf_lower = ht40_bytes[:_RAW_HT20_BYTES]
htltf_higher = ht40_bytes[_RAW_HT20_BYTES + (csi.HT40_GAP_SUBCARRIERS * 2) : _RAW_HT40_BYTES]
primary = htltf_higher if self.get_secondary_channel_relative() == -1 else htltf_lower
csi_ht20_sensor[:] = np.asarray(primary, dtype=np.int8).astype(np.float32).view(np.complex64)
else:
csi_ht20_sensor[:] = np.frombuffer(serialized_csi.buf[:_RAW_HT20_BYTES], dtype=np.int8).astype(np.float32).view(np.complex64)
csi_ht20_sensor[:] = -1.0j * np.conj(csi_ht20_sensor)
self._foreach_complete_sensor(deserialize_ht20_packet)
# Need to take timestamps into account to provide phase coherence across all sensors
delay = self.get_sensor_timestamps()
subcarrier_range = csi.get_csi_format_subcarrier_indices("ht20").astype(np.float64)[np.newaxis, np.newaxis, np.newaxis, :]
# Need to adjust range if using 40MHz wide channel since LO is either above or below the primary channel that HT20 is on
subcarrier_range -= self.get_secondary_channel_relative() * int(2 * constants.WIFI_CHANNEL_SPACING / constants.WIFI_SUBCARRIER_SPACING)
# 128 bit delay is overkill here, CSI is only 2x32 bit, product would be 2x128 bit
sto_delay_correction = np.exp(-1.0j * 2 * np.pi * delay[:, :, :, np.newaxis] * constants.WIFI_SUBCARRIER_SPACING * subcarrier_range)
csi_ht20 = np.einsum("bras,bras->bras", csi_ht20, sto_delay_correction)
return csi_ht20
[docs]
def deserialize_csi_ht40ltf(self):
"""
Deserialize the HT40 (HT-LTF with channel bonding) part of the CSI data.
:return: The HT-LTF part of the CSI data as a complex-valued numpy array of shape :code:`(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)`
"""
assert self.has_ht40ltf()
loc = self.get_secondary_channel_relative()
assert loc != 0
csi_ht40 = np.zeros(
self.shape + (csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL,),
dtype=np.complex64,
)
def deserialize_ht40_packet(b, r, a, serialized_csi):
nonlocal csi_ht40
csi_ht40_sensor = csi_ht40[b, r, a, :].view()
csi_ht40_sensor_lower = csi_ht40[b, r, a, : csi.HT_COEFFICIENTS_PER_CHANNEL].view()
csi_ht40_sensor_higher = csi_ht40[b, r, a, -csi.HT_COEFFICIENTS_PER_CHANNEL :].view()
if serialized_csi.is_compressed:
csi_ht40_sensor[:] = csi.decode_compressed_ht40(serialized_csi.buf)
return
# The ESP32 provides CSI as int8_t values in (im, re) pairs (in this order!)
# To go from the (re, im) interpretation to (im, re), compute conjugate and multiply by 1.0j.
ht40_bytes = np.frombuffer(serialized_csi.buf[:_RAW_HT40_BYTES], dtype=np.int8)
csi_ht40_sensor_higher[:] = ht40_bytes[_RAW_HT20_BYTES + (csi.HT40_GAP_SUBCARRIERS * 2) : _RAW_HT40_BYTES].astype(np.float32).view(np.complex64)
csi_ht40_sensor_lower[:] = ht40_bytes[:_RAW_HT20_BYTES].astype(np.float32).view(np.complex64)
csi_ht40_sensor[:] = -1.0j * np.conj(csi_ht40_sensor)
self._foreach_complete_sensor(deserialize_ht40_packet)
# Secondary channel experiences phase shift by pi / 2
# This is likely due to the pi / 2 phase shift specified for the pilot symbols,
# see IEEE 80211-2012 section 20.3.9.3.4 L-LTF definition
csi_ht40_higher = csi_ht40[:, :, :, : csi.HT_COEFFICIENTS_PER_CHANNEL].view()
csi_ht40_higher[:] = csi_ht40_higher * np.exp(1.0j * np.pi / 2)
# Need to take timestamps into account to provide phase coherence across all sensors
delay = self.get_sensor_timestamps()
subcarrier_range = csi.get_csi_format_subcarrier_indices("ht40").astype(np.float64)[np.newaxis, np.newaxis, np.newaxis, :]
sto_delay_correction = np.exp(-1.0j * 2 * np.pi * delay[:, :, :, np.newaxis] * constants.WIFI_SUBCARRIER_SPACING * subcarrier_range)
csi_ht40 = np.einsum("bras,bras->bras", csi_ht40, sto_delay_correction)
return csi_ht40
[docs]
def deserialize_csi_he20ltf(self):
"""
Deserialize the HE20 HE-LTF part of the CSI data.
The internal HE20 ordering is ascending subcarrier index ``-122..122``.
The invalid / null tones ``-1, 0, 1`` are explicitly zeroed because
the raw PHY payload may contain meaningless values there.
"""
assert self.has_he20ltf()
csi_he20 = np.zeros(self.shape + (csi.HE20_COEFFICIENTS_PER_CHANNEL,), dtype=np.complex64)
def deserialize_he20_packet(b, r, a, serialized_csi):
nonlocal csi_he20
csi_he20_sensor = csi_he20[b, r, a, :].view()
if serialized_csi.is_compressed:
csi_he20_sensor[:] = csi.decode_compressed_he20(serialized_csi.buf)
return
he20_raw = np.frombuffer(serialized_csi.buf[:_RAW_HE20_BYTES], dtype=np.int8).astype(np.float32).view(np.complex64)
csi_he20_sensor[:] = -1.0j * np.conj(he20_raw)
self._foreach_complete_sensor(deserialize_he20_packet)
delay = self.get_sensor_timestamps()
he20_fractional_delay = self._get_he20_fractional_timestamp_offsets()
subcarrier_range = csi.get_csi_format_subcarrier_indices("he20").astype(np.float64)[np.newaxis, np.newaxis, np.newaxis, :]
sto_delay_correction = np.exp(-1.0j * 2 * np.pi * (delay + he20_fractional_delay)[:, :, :, np.newaxis] * (constants.WIFI_SUBCARRIER_SPACING / 4.0) * subcarrier_range)
csi_he20 = np.einsum("bras,bras->bras", csi_he20, sto_delay_correction)
csi_he20[..., 121:124] = 0.0
return csi_he20
[docs]
def has_lltf(self) -> bool:
"""
Check if L-LTF channel estimates are available for all complete sensors.
:return: True if there is L-LTF CSI data for all complete sensors, False otherwise
"""
have_lltf_all = True
def check_lltf(b, r, a, serialized_csi):
nonlocal have_lltf_all
if serialized_csi.csi_len == 0:
have_lltf_all = False
return
# We only need to check this if acquire_force_lltf is false (otherwise, sensor always provides L-LTF)
if not serialized_csi.acquire_force_lltf:
# If force lltf is false, sensor module is configured to only provide L-LTF if frame is 802.11g
if not csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).cur_bb_format == csi.wifi_rx_bb_format_t.RX_BB_FORMAT_11G:
have_lltf_all = False
self._foreach_complete_sensor(check_lltf)
return have_lltf_all
[docs]
def has_ht20ltf(self) -> bool:
"""
Check if HT20 (HT-LTF without channel bonding) channel estimates are available for all complete sensors.
:return: True if there is HT20 CSI data for all complete sensors, False otherwise
"""
have_ht20_all = True
def check_ht20(b, r, a, serialized_csi):
nonlocal have_ht20_all
if serialized_csi.csi_len == 0:
have_ht20_all = False
return
# If force lltf is true, sensor only provides L-LTF, never HT20-LTF
if serialized_csi.acquire_force_lltf:
have_ht20_all = False
if not csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).cur_bb_format == csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HT:
have_ht20_all = False
self._foreach_complete_sensor(check_ht20)
return have_ht20_all
[docs]
def has_ht40ltf(self) -> bool:
"""
Check if HT40 (HT-LTF with 40MHz channel bonding) channel estimates are available for all complete sensors.
:return: True if there is HT40 CSI data for all complete sensors, False otherwise
"""
have_ht40_all = True
def check_ht40(b, r, a, serialized_csi):
nonlocal have_ht40_all
if serialized_csi.csi_len == 0:
have_ht40_all = False
return
# If force lltf is true, sensor only provides L-LTF, never HT40-LTF
if serialized_csi.acquire_force_lltf:
have_ht40_all = False
# Check if packet is HT (HT20 or HT40)
if not csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).cur_bb_format == csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HT:
have_ht40_all = False
# Check if channel bonding is used: he_siga1 is actuall ht_sig1, which contains the CWB bit at bit 7
if csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).he_siga1 & 0x80 == 0:
have_ht40_all = False
self._foreach_complete_sensor(check_ht40)
return have_ht40_all
[docs]
def has_he20ltf(self) -> bool:
"""
Check if HE20 HE-LTF channel estimates are available for all complete sensors.
"""
have_he20_all = True
def check_he20(b, r, a, serialized_csi):
nonlocal have_he20_all
if serialized_csi.csi_len == 0:
have_he20_all = False
return
if serialized_csi.acquire_force_lltf:
have_he20_all = False
return
rx_ctrl = csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl)
if not self._is_he_format(rx_ctrl.cur_bb_format):
have_he20_all = False
return
if rx_ctrl.second != 0:
have_he20_all = False
return
if csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).rx_channel_estimate_len < _RAW_HE20_BYTES:
have_he20_all = False
self._foreach_complete_sensor(check_he20)
return have_he20_all
[docs]
def get_secondary_channel_relative(self):
"""
Get the relative position of the secondary channel with respect to the primary channel.
:return: 0 if no secondary channel is used, 1 if the secondary channel is above the primary channel, -1 if the secondary channel is below the primary channel
"""
# 802.11b packets: No secondary channel, return 0
if csi.wifi_pkt_rx_ctrl_v3_t(self._first_complete_sensor().rx_ctrl).cur_bb_format == csi.wifi_rx_bb_format_t.RX_BB_FORMAT_11B:
return 0
match csi.wifi_pkt_rx_ctrl_v3_t(self._first_complete_sensor().rx_ctrl).second:
case 0:
return 0
case 1:
return 1
case 2:
return -1
raise ValueError("Unknown secondary channel value")
[docs]
def get_primary_channel(self) -> int:
"""
Get the primary channel number.
:return: The primary channel number
"""
return csi.wifi_pkt_rx_ctrl_v3_t(self._first_complete_sensor().rx_ctrl).channel
[docs]
def is_11b(self) -> bool:
"""
Check whether this packet uses the 802.11b baseband format.
:return: True if the packet is 802.11b, False otherwise
"""
return csi.wifi_pkt_rx_ctrl_v3_t(self._first_complete_sensor().rx_ctrl).cur_bb_format == csi.wifi_rx_bb_format_t.RX_BB_FORMAT_11B
[docs]
def get_secondary_channel(self) -> int:
"""
Get the secondary channel number.
:return: The secondary channel number
"""
return self.get_primary_channel() + 4 * self.get_secondary_channel_relative()
[docs]
def get_completion(self):
"""
Get the completion state of the CSI data.
:return: A boolean numpy array of shape :code:`(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)` that indicates which sensors have provided CSI data
"""
return self.csi_completion_state
[docs]
def get_completion_all(self):
"""
Get the global completion state of the CSI data, i.e., whether all sensors have provided CSI data.
:return: True if all sensors have provided CSI data, False otherwise
"""
return self.csi_completion_state_all
[docs]
def get_age(self):
"""
Get the age of the CSI data, in seconds.
The age is only approximate, it is based on the timestamp when the :class:`.CSICluster` object was created,
not on the sensor timestamps.
:return: The age of the CSI data, in seconds
"""
return time.time() - self.timestamp
[docs]
def get_sensor_timestamps(self):
"""
Get the (nanosecond-precision) timestamps at which the WiFi packet was sampled by the sensors.
This timestamp does not include the offset that the chip derived from the CSI, it is only the sampling start time.
:return: A numpy array of shape :code:`(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)` that contains the sensor timestamps in seconds
"""
sensor_timestamps = np.full(self.shape, np.nan, dtype=np.float64)
def append_sensor_timestamp(b, r, a, serialized_csi):
timestamp_ns = np.float64(self._nanosecond_timestamp(serialized_csi))
sensor_timestamps[b, r, a] = np.float64(timestamp_ns) / 1e9
self._foreach_complete_sensor(append_sensor_timestamp)
return sensor_timestamps
[docs]
def get_host_timestamp(self):
"""
Get the timestamp at which the :class:`.CSICluster` object was created, which is approximately when the first sensor received the CSI data.
:return: The timestamp at which the first sensor received the CSI data, in seconds since the epoch
"""
return self.timestamp
[docs]
def get_rx_gain(self):
"""
Get the signed RX gain values reported for the WiFi packet.
"""
return self.rx_gain_all
[docs]
def get_fft_gain(self):
"""
Get the signed FFT gain values reported for the WiFi packet.
"""
return self.fft_gain_all
[docs]
def get_rfswitch_state(self):
"""
Get the RF switch state of all sensors when the WiFi packet was received.
"""
return self.rfswitch_state_all
[docs]
def get_cfo(self):
"""
Get the CFO values decoded from the sensor rx_ctrl metadata.
"""
return self.cfo_all
[docs]
def get_lltf_8bit_mode(self):
"""
Get whether each sensor reported LLTF CSI in 8-bit mode for this packet.
"""
return self.lltf_8bit_mode_all
[docs]
def get_gain_table_entry_raw(self):
"""
Get the raw 12-byte ESP32-C61 PHY gain-table entry used for each received packet.
"""
return self.gain_table_entry_raw_all
[docs]
def get_gain_table_entry_valid(self):
"""
Return a mask indicating whether a raw gain-table entry was reported for each sensor.
"""
return self.gain_table_entry_valid_all
[docs]
def get_source_mac(self):
"""
Get the source MAC address of the WiFi packet.
:return: The source MAC address of the WiFi packet
"""
return self.source_mac
[docs]
def is_radar(self) -> bool:
"""
Check whether this cluster corresponds to a radar packet.
"""
serialized_csi = self._first_complete_sensor()
return False if serialized_csi is None else serialized_csi.is_radar
[docs]
def is_calib(self) -> bool:
"""
Check whether this cluster corresponds to a calibration packet.
"""
serialized_csi = self._first_complete_sensor()
return False if serialized_csi is None else serialized_csi.is_calib
[docs]
def get_noise_floor(self):
"""
Get the noise floor of the WiFi packet.
:return: The noise floor of the WiFi packet
"""
return self.noise_floor_all
[docs]
def get_seq_ctrl(self):
"""
Get the sequence control field of the WiFi packet.
:return: The sequence control field of the WiFi packet
"""
return self.seq_ctrl
# Internal helper functions
def _foreach_complete_sensor(self, cb):
for b, board in enumerate(self.serialized_csi_all):
for r, row in enumerate(board):
for a, serialized_csi in enumerate(row):
if serialized_csi is not None:
cb(b, r, a, serialized_csi)
def _first_complete_sensor(self):
for board in self.serialized_csi_all:
for row in board:
for serialized_csi in row:
if serialized_csi is not None:
return serialized_csi
return None
def _nanosecond_timestamp(self, serialized_csi):
rxstart_time_cyc = csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).rxstart_time_cyc
hw_latched_timestamp_ns = serialized_csi.global_timestamp_us * 1000
# "official" formula by Espressif:
# timestamp_ns = np.float128(serialized_csi.timestamp * 1000 + ((rxstart_time_cyc * 12500) // 1000) + ((rxstart_time_cyc_dec * 1562) // 1000) - 20800)
# Formula that is probably more accurate:
CYC_PERIOD_NS = 1 / 80e6 * 1e9
HW_TIMESTAMP_LAG_NS = 20800
return hw_latched_timestamp_ns - HW_TIMESTAMP_LAG_NS + rxstart_time_cyc * CYC_PERIOD_NS
def _get_he20_fractional_timestamp_offsets(self):
fractional_offsets = np.full(self.shape, np.nan, dtype=np.float64)
def append_fractional_offset(b, r, a, serialized_csi):
rxstart_time_cyc_dec = csi.wifi_pkt_rx_ctrl_v3_t(serialized_csi.rx_ctrl).rxstart_time_cyc_dec
rxstart_time_cyc_dec = 2048 - rxstart_time_cyc_dec if rxstart_time_cyc_dec >= 1024 else rxstart_time_cyc_dec
fractional_offsets[b, r, a] = float(rxstart_time_cyc_dec) / 640e6
self._foreach_complete_sensor(append_fractional_offset)
return fractional_offsets
@staticmethod
def _is_he_format(bb_format: int) -> bool:
return bb_format in (
csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HE_SU,
csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HE_MU,
csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HE_ERSU,
csi.wifi_rx_bb_format_t.RX_BB_FORMAT_HE_TB,
)