API Reference
The API is unstable at this early stage of development.
espargos.board
- class espargos.board.Board(host: str)[source]
Bases:
object
- add_consumer(clist: list, cv: Condition, *args)[source]
Adds a consumer to the CSI stream. A consumer is defined by a list, a condition variable and additional arguments. When a CSI packet is received, it will be appended to the list, and the condition variable will be notified.
- Parameters:
clist – A list to which the CSI packet will be appended. The entry added to the list is a tuple
(esp_num, serialized_csi, *args)
, where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and*args
are the additional arguments.cv – A condition variable that will be notified when a CSI packet is received
args – Additional arguments that will be added to the list along with the CSI packet
- get_name()[source]
Returns the hostname of the ESPARGOS controller as configured in the web interface.
- Returns:
The hostname of the ESPARGOS controller
- set_calib(calibrate: bool)[source]
Enables or disables calibration mode on the ESPARGOS controller.
- Parameters:
calibrate – True to enable calibration mode, False to disable it
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller
espargos.pool
- class espargos.pool.CSICalibration(channel_primary: int, channel_secondary: int, calibration_values_ht40: ndarray, timestamp_calibration_values: ndarray, board_cable_lengths=None, board_cable_vfs=None)[source]
Bases:
object
- apply_ht40(values: ndarray, sensor_timestamps: ndarray) ndarray [source]
Apply phase calibration to the provided HT40 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32, but can be less accurate if reference channel is not frequency-flat.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)
sensor_timestamps – The precise time when the CSI data was sampled, as a numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
- Returns:
The phase-calibrated and time offset-calibrated CSI data
- apply_ht40_flat(values: ndarray) ndarray [source]
Apply phase calibration to the provided HT40 CSI data. Assume constant phase offset over all subcarriers, i.e., ignore effects like low-pass characteristic.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)
- Returns:
The phase-calibrated CSI data
- apply_timestamps(timestamps: ndarray)[source]
Apply time offset calibration to the provided timestamps.
- Parameters:
timestamps – The timestamps to which the calibration should be applied, as a numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
- Returns:
The calibrated timestamps
- class espargos.pool.ClusteredCSI(source_mac: str, dest_mac: str, seq_ctrl: seq_ctrl_t, boardcount: int)[source]
Bases:
object
A ClusteredCSI object represents a collection of CSI data estimated for the same WiFi packet.
The class clusters the CSI data from multiple ESPARGOS sensors (antennas), which may belong to the same or different ESPARGOS boards. It is used to store CSI data until it is complete and can be provided to a callback. CSI data may be from calibration packets or over-the-air packets.
- add_csi(board_num: int, esp_num: int, serialized_csi: serialized_csi_t, csi_cplx: ndarray)[source]
Add CSI data to the cluster.
- Parameters:
board_num – The number of the ESPARGOS board that received the CSI data
esp_num – The number of the ESPARGOS sensor within that board that received the CSI data
serialized_csi – The serialized CSI data
csi_cplx – The complex-valued CSI data
- deserialize_csi_ht40()[source]
Deserialize the HT-LTF part of the CSI data.
- Returns:
The HT-LTF part of the CSI data as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)
- deserialize_csi_lltf()[source]
Deserialize the L-LTF part of the CSI data.
- Returns:
The L-LTF part of the CSI data as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.csi_buf_t.lltf.size // 2)
- get_age()[source]
Get the age of the CSI data, in seconds.
The age is only approximate, it is based on the timestamp when the
ClusteredCSI
object was created, not on the sensor timestamps.- Returns:
The age of the CSI data, in seconds
- get_completion()[source]
Get the completion state of the CSI data.
- Returns:
A boolean numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
that indicates which sensors have provided CSI data
- get_completion_all()[source]
Get the global completion state of the CSI data, i.e., whether all sensors have provided CSI data.
- Returns:
True if all sensors have provided CSI data, False otherwise
- get_host_timestamp()[source]
Get the timestamp at which the
ClusteredCSI
object was created, which is approximately when the first sensor received the CSI data.- Returns:
The timestamp at which the first sensor received the CSI data, in seconds since the epoch
- get_noise_floor()[source]
Get the noise floor of the WiFi packet.
- Returns:
The noise floor of the WiFi packet
- get_primary_channel() int [source]
Get the primary channel number.
- Returns:
The primary channel number
- get_secondary_channel() int [source]
Get the secondary channel number.
- Returns:
The secondary channel number
- get_secondary_channel_relative()[source]
Get the relative position of the secondary channel with respect to the primary channel.
- Returns:
0 if no secondary channel is used, 1 if the secondary channel is above the primary channel, -1 if the secondary channel is below the primary channel
- get_sensor_timestamps()[source]
Get the (nanosecond-precision) timestamps at which the WiFi packet was received by the sensors. This timestamp includes the offset that the chip derived from the CSI.
- Returns:
A numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
that contains the sensor timestamps in seconds
- get_seq_ctrl()[source]
Get the sequence control field of the WiFi packet.
- Returns:
The sequence control field of the WiFi packet
- class espargos.pool.Pool(boards: list[Board], ota_cache_timeout=5)[source]
Bases:
object
A Pool is a collection of ESPARGOS boards. The pool manages the clustering of CSI data from multiple ESPARGOS sensors (antennas) that belong to the same WiFi packet and provides :class:’ClusteredCSI’ objects to registered callbacks.
- add_csi_callback(cb: Callable[[ClusteredCSI], None], cb_predicate: Callable[[ndarray, float], bool] = None)[source]
Register callback function that is invoked whenever a new CSI cluster is completed.
- Parameters:
cb – The function to call, gets instance of class
ClusteredCSI
as parametercb_predicate – A function with signature
(csi_completion_state, csi_age)
that defines the conditions under which clustered CSI is regarded as completed and thus provided to the callback.csi_completion_state
is a tensor of shape(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
, andcsi_age
is the age of the packet (relative to when any sensor first received it) in seconds Ifcb_predicate
returns true, clustered CSI is regarded as completed. If no predicate is provided, the default behavior is to trigger the callback when CSI has been received from all sensors on all boards. Ifcalibrated
is true (default), callback is provided CSI that is already phase-calibrated.
- calibrate(per_board=True, duration=2, exithandler=None, cable_lengths=None, cable_velocity_factors=None)[source]
Run calibration for a specified duration.
- Parameters:
per_board – True to calibrate each board separately, False to calibrate all boards together. Set to False if the same phase reference signal is used for all boards, otherwise set to True.
duration – The duration in seconds for which calibration should be run
exithandler – An optional exit handler that can be used to stop calibration prematurely if
exithandler.running
is set to False in a separate threadcable_lengths – The lengths of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards, in meters. Only needed for phase-coherent multi-board setups, omit if all cables have the same length.
cable_velocity_factors – The velocity factors of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards Must be the same length as
cable_lengths
, and all entries should be in the range [0, 1].
- get_calibration()[source]
Get the stored calibration values.
- Returns:
The stored calibration values as a
CSICalibration
object
- get_shape()[source]
Get the outer shape of the stored data, i.e., only the antenna dimensions and not subcarrier dimensions or similar.
- run()[source]
Process incoming CSI data packets and call registered callbacks if CSI clusters are complete. Repeatedly call this function from your main loop or from a separate thread. May block for a short amount of time if no data is available.
espargos.backlog
- class espargos.backlog.CSIBacklog(pool, enable_ht40=True, calibrate=True, size=100)[source]
Bases:
object
CSI backlog class. Stores CSI data in a ringbuffer for processing when needed.
- Parameters:
pool – CSI pool object to collect CSI data from
enable_ht40 – Enable storing CSI from HT40 frames (default: True)
calibrate – Apply calibration to CSI data (default: True)
size – Size of the ringbuffer (default: 100)
- add_update_callback(cb)[source]
Add a callback that is called when new CSI data is added to the backlog
- get_latest_timestamp()[source]
Retrieve the mean (over all antennas) timestamp of the most recent packet in the ringbuffer
- Returns:
Timestamp of the most recent packet, scalar
- get_timestamps()[source]
Retrieve packet timestamps for all antennas from the ringbuffer
- Returns:
Timestamps, oldest first, shape (n_packets, n_boards, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
espargos.util
- espargos.util.build_combined_array_csi(indexing_matrix, input_csi)[source]
Build the combined array CSI data from the CSI data of the subarrays.
- Parameters:
indexing_matrix – The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.
input_csi – The CSI data of the subarrays. Complex-valued NumPy array with shape (datapoints, boards, rows, columns, subcarriers).
- Returns:
The combined array CSI data. Complex-valued NumPy array with shape (datapoints, rows, columns, subcarriers).
- espargos.util.csi_interp_eigenvec(csi: ndarray, weights: ndarray = None)[source]
Interpolates CSI data (frequency-domain or time-domain) by finding the principal eigenvector of the covariance matrix.
- Parameters:
csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.
weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.
- espargos.util.csi_interp_iterative(csi: ndarray, weights: ndarray = None, iterations=10)[source]
Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Tries to sum up the CSI data phase-coherently with the least error. More details about the algorithm (which is quite straightforward) can be found in section “IV. Linear Interpolation Baseline” in the paper “GAN-based Massive MIMO Channel Model Trained on Measured Data”.
- Parameters:
csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.
weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.
iterations – The number of iterations to perform. Default is 10.
- Returns:
The interpolated CSI data. Complex-valued NumPy array with the same shape as the input CSI data.
- espargos.util.csi_interp_iterative_by_array(csi: ndarray, weights: ndarray = None, iterations=10)[source]
Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Same as
csi_interp_iterative()
, but assumes that second dimension ofcsi
is the antenna array dimension and performs the interpolation for each antenna array separately.
- espargos.util.estimate_toas_rootmusic(csi_fdomain: ndarray, max_source_count=2, chunksize=36, per_board_average=False)[source]
Estimate the time of arrivals (ToAs) of the LoS paths using the root-MUSIC algorithm.
- Parameters:
csi_fdomain – The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
max_source_count – The maximum number of sources to estimate. The number of sources is determined using the Rissanen MDL criterion, but this parameter can be used to limit the number of sources.
chunksize – The size of the chunks to use for the covariance matrix computation.
per_board_average – If True, compute the average ToA over all antennas per board. If False, return the ToAs for each antenna separately.
- Returns:
The estimated ToAs of the LoS paths, in seconds, NumPy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
.
- espargos.util.fdomain_to_tdomain_pdp_music(csi_fdomain: ndarray, source_count: int = None, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]
Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using MUSIC super-resolution.
- Param:
csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- Returns:
The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.
- espargos.util.fdomain_to_tdomain_pdp_mvdr(csi_fdomain: ndarray, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]
Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using the MVDR beamformer.
- Param:
csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- Returns:
The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.
- espargos.util.get_cable_wavelength(frequencies: ndarray, velocity_factors: ndarray)[source]
Returns the wavelength of the provided subcarrier frequencies on a cable with the given velocity factors.
- Parameters:
frequencies – The frequencies of the subcarriers, in Hz, NumPy array.
velocity_factors – The velocity factors of the cable, NumPy array.
- Returns:
The wavelengths of the subcarriers, in meters, NumPy array.
- espargos.util.get_calib_trace_wavelength(frequencies: ndarray)[source]
Returns the wavelength of the subcarriers on the calibration traces on the ESPARGOS sensor board.
- Parameters:
frequencies – The frequencies of the subcarriers, in Hz, NumPy array.
- Returns:
The wavelengths of the subcarriers, in meters, NumPy array.
- espargos.util.get_frequencies_ht40(primary_channel: int, secondary_channel: int)[source]
Returns the frequencies of the subcarriers in an HT40 2.4GHz WiFi channel. :param primary_channel: The primary channel number. :param secondary_channel: The secondary channel number. :return: The frequencies of the subcarriers, in Hz, NumPy array.
- espargos.util.interpolate_ht40_gap(csi_ht40: ndarray)[source]
Apply linear interpolation to determine realistic values for the subcarrier channel coefficients in the gap between the bonded channels in an HT40 channel.
- Parameters:
csi_ht40 – The CSI data for an HT40 channel. Complex-valued NumPy array with arbitrary shape, but the last dimension must be the subcarriers.
- Returns:
The CSI data with the values in the gap filled in.
- espargos.util.parse_combined_array_config(config_file)[source]
Parse the configuration file for demos that use combined array.
- Parameters:
config_file – The path to the configuration file.
- Return indexing_matrix:
The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.
- Return board_names_hosts:
The names of the boards and their hostnames.
- Return cable_lengths:
The lengths of the cables connecting the boards.
- Return cable_velocity_factors:
The velocity factors of the cables connecting the boards.
- Return n_rows:
The number of rows in the array.
- Return n_cols:
The number of columns in the array.
- espargos.util.shift_to_firstpeak(csi_datapoints: ndarray, max_delay_taps=3, search_resolution=40, peak_threshold=0.4)[source]
Shifts the CSI data so that the first peak of the channel impulse response is at time 0. Each CSI datapoint is shifted by a different amount, i.e., can be used to synchronize CSI based on LoS channel. Uses a simple but rather computation-efficient algorithm to find the first peak of the channel impulse response (as opposed to superresolution-based approach).
- Parameters:
csi_datapoints – The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
max_delay_taps – The maximum number of time taps to shift the CSI data by.
search_resolution – The number of search points (granularity) to use for the time shift.
peak_threshold – The threshold for the peak detection, as a fraction of the maximum peak power.
- Returns:
The frequency-domain CSI data with the first peak of the channel impulse response at time 0.
- espargos.util.shift_to_firstpeak_sync(csi_datapoints: ndarray, max_delay_taps=3, search_resolution=40, peak_threshold=0.1)[source]
Shifts the CSI data so that the first peak of the channel impulse response is at time 0. All CSI datapoints are shifted by the same amount, i.e., requires synchronized CSI.
- Parameters:
csi_datapoints – The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
max_delay_taps – The maximum number of time taps to shift the CSI data by.
search_resolution – The number of search points (granularity) to use for the time shift.
peak_threshold – The threshold for the peak detection, as a fraction of the maximum peak power.
- Returns:
The frequency-domain CSI data with the first peak of the channel impulse response at time 0.
espargos.csi
- espargos.csi.HT40_GAP_SUBCARRIERS = 3
Gap between primary and secondary channel in HT40 mode, in subcarriers
- class espargos.csi.csi_buf_t(buf=None)[source]
Bases:
Structure
A ctypes structure representing the CSI buffer as produced by the ESP32.
This structure is used to store the channel coefficients estimated from Wi-Fi packets, directly as provided in the
buf
field ofwifi_csi_info_t
by esp-idf, refer to the related esp-idf documentation for details. The structure is packed to ensure there is no padding between fields.
- class espargos.csi.csistream_pkt_t(buf=None)[source]
Bases:
Structure
A ctypes structure representing a CSI packet as received from the ESPARGOS controller, i.e., sensor number and the raw data buffer that should contain the serialized_csi_t structure if the type_header matches.
- class espargos.csi.seq_ctrl_t(buf=None)[source]
Bases:
Structure
A ctypes structure representing the sequence control field of a Wi-Fi packet.
This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number.
- class espargos.csi.serialized_csi_t(buf=None)[source]
Bases:
Structure
A ctypes structure representing the CSI buffer and metadata as provided by the ESPARGOS firmware.
- class espargos.csi.wifi_pkt_rx_ctrl_t(buf=None)[source]
Bases:
Structure
A ctypes structure representing the wifi_pkt_rx_ctrl_t as provided by the ESP32. See the related esp-idf code for details.
espargos.constants
- espargos.constants.ANTENNAS_PER_BOARD = 8
Number of antennas on one board
- espargos.constants.ANTENNAS_PER_ROW = 4
Number of antennas per row / per SPI controller on the board
- espargos.constants.ANTENNA_SEPARATION = 0.06
Distance between the centers of two antennas [m]
- espargos.constants.CALIB_TRACE_DIELECTRIC_CONSTANT = 4.3
Dielectric constant of the sensor PCB material
- espargos.constants.CALIB_TRACE_EFFECTIVE_DIELECTRIC_CONSTANT: float = 3.2283247007170446
Effective dielectric constant of the calibration trace
- espargos.constants.CALIB_TRACE_GROUP_VELOCITY: float = 166852261.88366285
Group velocity of signal on the calibration trace
- espargos.constants.CALIB_TRACE_HEIGHT = 0.119
Height of the calibration signal trace (distance between GND plane and microstrip), in m
- espargos.constants.CALIB_TRACE_LENGTH = [[0.0708462, 0.0229349, 0.0786856, 0.14236], [0.0838888, 0.0295291, 0.0671322, 0.1308537]]
Calibration signal trace lengths on ESPARGOS PCB
- espargos.constants.CALIB_TRACE_WIDTH = 0.2
Width of the calibration signal trace, in m
- espargos.constants.ROWS_PER_BOARD = 2
Number of rows / SPI controllers on the board
- espargos.constants.SPEED_OF_LIGHT = 299792458
Speed of light in a vacuum
- espargos.constants.WIFI_CHANNEL1_FREQUENCY = 2412000000.0
Frequency of channel 1 in 2.4 GHz WiFi
- espargos.constants.WIFI_CHANNEL_SPACING = 5000000.0
Frequency spacing of WiFi channels
- espargos.constants.WIFI_SUBCARRIER_SPACING = 312500.0
Subcarrier spacing of WiFi (in Hz)