Source code for espargos.csi

from typing import TypedDict
import numpy as np
import ctypes

from . import constants

# Internal constants
_ESPARGOS_SPI_BUFFER_SIZE = 512
_ESPARGOS_SPI_TYPE_HEADER_CSI = 0x5a1f19b1

# Other constants
HT40_GAP_SUBCARRIERS = 3
"Gap between primary and secondary channel in HT40 mode, in subcarriers"

[docs] class wifi_pkt_rx_ctrl_t(ctypes.LittleEndianStructure): """ A ctypes structure representing the `wifi_pkt_rx_ctrl_t` as provided by the ESP32. See the related `esp-idf code <https://github.com/espressif/esp-idf/blob/master/components/esp_wifi/include/local/esp_wifi_types_native.h>`_ for details. """ _pack_ = 1 _fields_ = [ ("rssi", ctypes.c_int8, 8), ("rate", ctypes.c_uint8, 5), ("reserved1", ctypes.c_uint8, 1), ("sig_mode", ctypes.c_uint8, 2), ("reserved2", ctypes.c_uint16, 16), ("mcs", ctypes.c_uint8, 7), ("cwb", ctypes.c_uint8, 1), ("reserved3", ctypes.c_uint16, 16), ("smoothing", ctypes.c_uint8, 1), ("not_sounding", ctypes.c_uint8, 1), ("reserved4", ctypes.c_uint8, 1), ("aggregation", ctypes.c_uint8, 1), ("stbc", ctypes.c_uint8, 2), ("fec_coding", ctypes.c_uint8, 1), ("sgi", ctypes.c_uint8, 1), ("reserved5", ctypes.c_uint8, 8), ("ampdu_cnt", ctypes.c_uint8, 8), ("channel", ctypes.c_uint8, 4), ("secondary_channel", ctypes.c_uint8, 4), ("reserved6", ctypes.c_uint8, 8), ("timestamp", ctypes.c_uint32, 32), ("reserved7", ctypes.c_uint32, 32), ("reserved8", ctypes.c_uint32, 32), ("reserved9", ctypes.c_uint32, 31), ("ant", ctypes.c_uint8, 1), ("noise_floor", ctypes.c_int8, 8), ("reserved10", ctypes.c_uint32, 24), ("sig_len", ctypes.c_uint16, 12), ("reserved11", ctypes.c_uint16, 12), ("rx_state", ctypes.c_uint8, 8), ] def __new__(self, buf=None): if buf: buf = bytearray(buf) buf.append(0) # zero-pad to fix a bug, see below return self.from_buffer_copy(buf) def __init__(self, buf=None): pass
# Bug in ctypes, see https://bugs.python.org/issue29753 and https://github.com/python/cpython/pull/19850, need to manually specify size # We need to write some hacky workarounds here... wifi_pkt_rx_ctrl_t_size = ctypes.sizeof(wifi_pkt_rx_ctrl_t) # = 37 wifi_pkt_rx_ctrl_t_size = 36 # 0-5: lltf_guard_below # 6-58: lltf # 60-65: lltf_guard_above # 66-122: htltf primary # 123-133: htltf_guard # 134-190: htltf secondary
[docs] class csi_buf_t(ctypes.LittleEndianStructure): """ A ctypes structure representing the CSI buffer as produced by the ESP32. This structure is used to store the channel coefficients estimated from Wi-Fi packets, directly as provided in the :code:`buf` field of :code:`wifi_csi_info_t` by esp-idf, refer to the related `esp-idf documentation <https://docs.espressif.com/projects/esp-idf/en/stable/esp32/api-guides/wifi.html#wi-fi-channel-state-information>`_ for details. The structure is packed to ensure there is no padding between fields. """ _pack_ = 1 _fields_ = [ ("lltf_guard_below", ctypes.c_int8 * (6 * 2)), # all zeros ("lltf", ctypes.c_int8 * (53 * 2)), ("lltf_guard_above", ctypes.c_int8 * (7 * 2)), # all zeros ("htltf_higher", ctypes.c_int8 * (57 * 2)), ("htltf_guard", ctypes.c_int8 * (11 * 2)), # all zeros ("htltf_lower", ctypes.c_int8 * (57 * 2)) ] def __new__(self, buf=None): return self.from_buffer_copy(buf) def __init__(self, buf=None): pass
[docs] class seq_ctrl_t(ctypes.LittleEndianStructure): """ A ctypes structure representing the sequence control field of a Wi-Fi packet. This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number. """ _pack_ = 1 _fields_ = [ ("frag", ctypes.c_uint8, 4), ("seg", ctypes.c_uint16, 12) ] def __new__(self, buf=None): return self.from_buffer_copy(buf) def __init__(self, buf=None): pass
[docs] class serialized_csi_t(ctypes.LittleEndianStructure): """ A ctypes structure representing the CSI buffer and metadata as provided by the ESPARGOS firmware. """ _pack_ = 1 _fields_ = [ ("type_header", ctypes.c_uint32), ("rx_ctrl", ctypes.c_uint8 * wifi_pkt_rx_ctrl_t_size), ("source_mac", ctypes.c_uint8 * 6), ("dest_mac", ctypes.c_uint8 * 6), ("seq_ctrl", seq_ctrl_t), ("timestamp", ctypes.c_uint32), ("is_calib", ctypes.c_bool), ("first_word_invalid", ctypes.c_bool), ("buf", ctypes.c_int8 * (ctypes.sizeof(csi_buf_t))) ] def __new__(self, buf=None): return self.from_buffer_copy(buf) def __init__(self, buf=None): pass
[docs] class csistream_pkt_t(ctypes.LittleEndianStructure): """ A ctypes structure representing a CSI packet as received from the ESPARGOS controller, i.e., sensor number and the raw data buffer that should contain the serialized_csi_t structure if the type_header matches. """ _pack_ = 1 _fields_ = [ ("esp_num", ctypes.c_uint32), ("buf", ctypes.c_uint8 * _ESPARGOS_SPI_BUFFER_SIZE), ] def __new__(self, buf=None): return self.from_buffer_copy(buf) def __init__(self, buf=None): pass