API Reference

The API is unstable at this early stage of development.

espargos.board

class espargos.board.Board(host)[source]

Bases: object

add_consumer(clist, cv, *args)[source]

Adds a consumer to the CSI stream. A consumer is defined by a list, a condition variable and additional arguments. When a CSI packet is received, it will be appended to the list, and the condition variable will be notified.

Parameters:
  • clist – A list to which the CSI packet will be appended. The entry added to the list is a tuple (esp_num, serialized_csi, *args), where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and *args are the additional arguments.

  • cv – A condition variable that will be notified when a CSI packet is received

  • args – Additional arguments that will be added to the list along with the CSI packet

get_name()[source]

Returns the hostname of the ESPARGOS controller as configured in the web interface.

Returns:

The hostname of the ESPARGOS controller

set_calib(calibrate)[source]

Enables or disables calibration mode on the ESPARGOS controller.

Parameters:

calibrate – True to enable calibration mode, False to disable it

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller

start()[source]

Starts the CSI stream thread for the ESPARGOS controller. The thread will run indefinitely until the stop() method is called.

stop()[source]

Stops the CSI stream thread for the ESPARGOS controller. The thread will stop after the current packet has been processed, or after a short timeout.

exception espargos.board.EspargosHTTPStatusError[source]

Bases: Exception

Raised when the ESPARGOS HTTP API returns an invalid status code

exception espargos.board.EspargosUnexpectedResponseError[source]

Bases: Exception

Raised when the server (ESPARGOS controller) provides unexpected response. Is the server really ESPARGOS?

espargos.pool

class espargos.pool.CSICalibration(channel_primary, channel_secondary, calibration_values_ht40, timestamp_calibration_values, board_cable_lengths=None, board_cable_vfs=None)[source]

Bases: object

apply_ht40(values)[source]

Apply phase calibration to the provided HT40 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32, but can be less accurate if reference channel is not frequency-flat.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)

Returns:

The phase-calibrated CSI data

apply_ht40_flat(values)[source]

Apply phase calibration to the provided HT40 CSI data. Assume constant phase offset over all subcarriers, i.e., ignore effects like low-pass characteristic.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)

Returns:

The phase-calibrated CSI data

apply_timestamps(timestamps)[source]

Apply time offset calibration to the provided timestamps.

Parameters:

timestamps – The timestamps to which the calibration should be applied, as a numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)

Returns:

The calibrated timestamps

class espargos.pool.ClusteredCSI(source_mac, dest_mac, seq_ctrl, boardcount)[source]

Bases: object

A ClusteredCSI object represents a collection of CSI data estimated for the same WiFi packet.

The class clusters the CSI data from multiple ESPARGOS sensors (antennas), which may belong to the same or different ESPARGOS boards. It is used to store CSI data until it is complete and can be provided to a callback. CSI data may be from calibration packets or over-the-air packets.

add_csi(board_num, esp_num, serialized_csi, csi_cplx)[source]

Add CSI data to the cluster.

Parameters:
  • board_num – The number of the ESPARGOS board that received the CSI data

  • esp_num – The number of the ESPARGOS sensor within that board that received the CSI data

  • serialized_csi – The serialized CSI data

  • csi_cplx – The complex-valued CSI data

deserialize_csi_ht40()[source]

Deserialize the HT-LTF part of the CSI data.

Returns:

The HT-LTF part of the CSI data as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2)

deserialize_csi_lltf()[source]

Deserialize the L-LTF part of the CSI data.

Returns:

The L-LTF part of the CSI data as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.csi_buf_t.lltf.size // 2)

get_age()[source]

Get the age of the CSI data, in seconds.

The age is only approximate, it is based on the timestamp when the ClusteredCSI object was created, not on the sensor timestamps.

Returns:

The age of the CSI data, in seconds

get_completion()[source]

Get the completion state of the CSI data.

Returns:

A boolean numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW) that indicates which sensors have provided CSI data

get_completion_all()[source]

Get the global completion state of the CSI data, i.e., whether all sensors have provided CSI data.

Returns:

True if all sensors have provided CSI data, False otherwise

get_host_timestamp()[source]

Get the timestamp at which the ClusteredCSI object was created, which is approximately when the first sensor received the CSI data.

Returns:

The timestamp at which the first sensor received the CSI data, in seconds since the epoch

get_primary_channel()[source]

Get the primary channel number.

Returns:

The primary channel number

get_rssi()[source]

Get the RSSI values of the WiFi packet.

get_secondary_channel()[source]

Get the secondary channel number.

Returns:

The secondary channel number

get_secondary_channel_relative()[source]

Get the relative position of the secondary channel with respect to the primary channel.

Returns:

0 if no secondary channel is used, 1 if the secondary channel is above the primary channel, -1 if the secondary channel is below the primary channel

get_sensor_timestamps()[source]

Get the (microsecond-precision) timestamps at which the CSI data was received by the sensors.

Returns:

A numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW) that contains the sensor timestamps in seconds

get_seq_ctrl()[source]

Get the sequence control field of the WiFi packet.

Returns:

The sequence control field of the WiFi packet

get_source_mac()[source]

Get the source MAC address of the WiFi packet.

Returns:

The source MAC address of the WiFi packet

is_ht40()[source]

Check if the packet is a HT40 packet, i.e., if it uses channel bonding and hence occupies two 20 MHz channels.

class espargos.pool.Pool(boards, ota_cache_timeout=5)[source]

Bases: object

A Pool is a collection of ESPARGOS boards. The pool manages the clustering of CSI data from multiple ESPARGOS sensors (antennas) that belong to the same WiFi packet and provides :class:’ClusteredCSI’ objects to registered callbacks.

add_csi_callback(cb, cb_predicate=None)[source]

Register callback function that is invoked whenever a new CSI cluster is completed.

Parameters:
  • cb – The function to call, gets instance of class ClusteredCSI as parameter

  • cb_predicate – A function with signature (csi_completion_state, csi_age) that defines the conditions under which clustered CSI is regarded as completed and thus provided to the callback. csi_completion_state is a tensor of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW), and csi_age is the age of the packet (relative to when any sensor first received it) in seconds If cb_predicate returns true, clusterd CSI is regarded as completed. If no predicate is provided, the default behavior is to trigger the callback when CSI has been received from all sensors on all boards. If calibrated is true (default), callback is provided CSI that is already phase-calibrated.

calibrate(per_board=True, duration=2, exithandler=None, cable_lengths=None, cable_velocity_factors=None)[source]

Run calibration for a specified duration.

Parameters:
  • per_board – True to calibrate each board separately, False to calibrate all boards together. Set to False if the same phase reference signal is used for all boards, otherwise set to True.

  • duration – The duration in seconds for which calibration should be run

  • exithandler – An optional exit handler that can be used to stop calibration prematurely if exithandler.running is set to False in a separate thread

  • cable_lengths – The lengths of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards, in meters. Only needed for phase-coherent multi-board setups, omit if all cables have the same length.

  • cable_velocity_factors – The velocity factors of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards Must be the same length as cable_lengths, and all entries should be in the range [0, 1].

get_calibration()[source]

Get the stored calibration values.

Returns:

The stored calibration values as a CSICalibration object

get_shape()[source]

Get the outer shape of the stored data, i.e., only the antenna dimensions and not subcarrier dimensions or similar.

get_stats()[source]

Get collected statistics about the pool.

run()[source]

Process incoming CSI data packets and call registered callbacks if CSI clusters are complete. Repeatedly call this function from your main loop or from a separate thread. May block for a short amount of time if no data is available.

set_calib(calibrate)[source]

Set calibration mode for all boards in the pool.

Parameters:

calibrate – True to enable calibration mode, False to disable it

start()[source]

Start the streaming of CSI data for all boards in the pool.

stop()[source]

Stop the streaming of CSI data for all boards in the pool.

espargos.backlog

class espargos.backlog.CSIBacklog(pool, enable_ht40=True, calibrate=True, size=100)[source]

Bases: object

CSI backlog class. Stores CSI data in a ringbuffer for processing when needed.

Parameters:
  • pool – CSI pool object to collect CSI data from

  • enable_ht40 – Enable storing CSI from HT40 frames (default: True)

  • calibrate – Apply calibration to CSI data (default: True)

  • size – Size of the ringbuffer (default: 100)

add_update_callback(cb)[source]

Add a callback that is called when new CSI data is added to the backlog

get_ht40()[source]

Retrieve HT40 CSI data from the ringbuffer

Returns:

HT40 CSI data, oldest first

get_latest_timestamp()[source]

Retrieve the timestamp of the most recent packet in the ringbuffer

Returns:

Timestamp of the most recent packet

get_rssi()[source]

Retrieve RSSI data from the ringbuffer

Returns:

RSSI data, oldest first

get_timestamps()[source]

Retrieve packet timestamps from the ringbuffer

Returns:

Timestamps, oldest first

nonempty()[source]

Check if the backlog is nonempty

Returns:

True if the backlog is nonempty

set_mac_filter(filter_regex)[source]

Set a MAC address filter for the backlog

Parameters:

filter_regex – MAC address filter regex

start()[source]

Start the CSI backlog thread, must be called before using the backlog

stop()[source]

Stop the CSI backlog thread

espargos.util

espargos.util.csi_interp_eigenvec(csi, weights=None)[source]

Interpolates CSI data (frequency-domain or time-domain) by finding the principal eigenvector of the covariance matrix.

Parameters:
  • csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.

  • weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.

espargos.util.csi_interp_iterative(csi, weights=None, iterations=10)[source]

Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Tries to sum up the CSI data phase-coherently with the least error. More details about the algorithm (which is quite straightforward) can be found in section “IV. Linear Interpolation Baseline” in the paper “GAN-based Massive MIMO Channel Model Trained on Measured Data”.

Parameters:
  • csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.

  • weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.

  • iterations – The number of iterations to perform. Default is 10.

Returns:

The interpolated CSI data. Complex-valued NumPy array with the same shape as the input CSI data.

espargos.util.csi_interp_iterative_by_array(csi, weights=None, iterations=10)[source]

Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Same as csi_interp_iterative(), but assumes that second dimension of csi is the antenna array dimension and performs the interpolation for each antenna array separately.

espargos.util.get_cable_wavelength(frequencies, velocity_factors)[source]

Returns the wavelength of the provided subcarrier frequencies on a cable with the given velocity factors.

Parameters:
  • frequencies – The frequencies of the subcarriers, in Hz, NumPy array.

  • velocity_factors – The velocity factors of the cable, NumPy array.

Returns:

The wavelengths of the subcarriers, in meters, NumPy array.

espargos.util.get_calib_trace_wavelength(frequencies)[source]

Returns the wavelength of the subcarriers on the calibration traces on the ESPARGOS sensor board.

Parameters:

frequencies – The frequencies of the subcarriers, in Hz, NumPy array.

Returns:

The wavelengths of the subcarriers, in meters, NumPy array.

espargos.util.get_frequencies_ht40(primary_channel, secondary_channel)[source]

Returns the frequencies of the subcarriers in an HT40 2.4GHz WiFi channel. :param primary_channel: The primary channel number. :param secondary_channel: The secondary channel number. :return: The frequencies of the subcarriers, in Hz, NumPy array.

espargos.util.interpolate_ht40_gap(csi_ht40)[source]

Apply linear interpolation to determine realistic values for the subcarrier channel coefficients in the gap between the bonded channels in an HT40 channel.

Parameters:

csi_ht40 – The CSI data for an HT40 channel. Complex-valued NumPy array with arbitrary shape, but the last dimension must be the subcarriers.

Returns:

The CSI data with the values in the gap filled in.

espargos.util.shift_to_firstpeak(csi_datapoints, max_delay_taps=3, search_resolution=40, peak_threshold=0.4)[source]

Shifts the CSI data so that the first peak of the channel impulse response is at time 0. Uses a simple but rather computation-efficient algorithm to find the first peak of the channel impulse response (as opposed to superresolution-based approach).

Parameters:
  • csi_datapoints – The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

  • max_delay_taps – The maximum number of time taps to shift the CSI data by.

  • search_resolution – The number of search points (granularity) to use for the time shift.

  • peak_threshold – The threshold for the peak detection, as a fraction of the maximum peak power.

Returns:

The frequency-domain CSI data with the first peak of the channel impulse response at time 0.

espargos.csi

espargos.csi.HT40_GAP_SUBCARRIERS = 3

Gap between primary and secondary channel in HT40 mode, in subcarriers

class espargos.csi.csi_buf_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the CSI buffer as produced by the ESP32.

This structure is used to store the channel coefficients estimated from Wi-Fi packets, directly as provided in the buf field of wifi_csi_info_t by esp-idf, refer to the related esp-idf documentation for details. The structure is packed to ensure there is no padding between fields.

class espargos.csi.csistream_pkt_t(buf=None)[source]

Bases: Structure

A ctypes structure representing a CSI packet as received from the ESPARGOS controller, i.e., sensor number and the raw data buffer that should contain the serialized_csi_t structure if the type_header matches.

class espargos.csi.seq_ctrl_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the sequence control field of a Wi-Fi packet.

This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number.

class espargos.csi.serialized_csi_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the CSI buffer and metadata as provided by the ESPARGOS firmware.

class espargos.csi.wifi_pkt_rx_ctrl_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the wifi_pkt_rx_ctrl_t as provided by the ESP32. See the related esp-idf code for details.

espargos.constants

espargos.constants.ANTENNAS_PER_BOARD = 8

Number of antennas on one board

espargos.constants.ANTENNAS_PER_ROW = 4

Number of antennas per row / per SPI controller on the board

espargos.constants.ANTENNA_SEPARATION = 0.06

Distance between the centers of two antennas [m]

espargos.constants.CALIB_TRACE_DIELECTRIC_CONSTANT = 4.3

Dielectric constant of the sensor PCB material

espargos.constants.CALIB_TRACE_EFFECTIVE_DIELECTRIC_CONSTANT = 3.2283247007170446

Effective dielectric constant of the calibration trace

espargos.constants.CALIB_TRACE_HEIGHT = 0.119

Height of the calibration signal trace (distance between GND plane and microstrip), in m

espargos.constants.CALIB_TRACE_LENGTH = [[0.0708462, 0.0229349, 0.0786856, 0.14236], [0.0838888, 0.0295291, 0.0671322, 0.1308537]]

Calibration signal trace lengths on ESPARGOS PCB

espargos.constants.CALIB_TRACE_WIDTH = 0.2

Width of the calibration signal trace, in m

espargos.constants.ROWS_PER_BOARD = 2

Number of rows / SPI controllers on the board

espargos.constants.SPEED_OF_LIGHT = 299792458

Speed of light in a vacuum

espargos.constants.WIFI_CHANNEL1_FREQUENCY = 2412000000.0

Frequency of channel 1 in 2.4 GHz WiFi

espargos.constants.WIFI_CHANNEL_SPACING = 5000000.0

Frequency spacing of WiFi channels

espargos.constants.WIFI_SUBCARRIER_SPACING = 312500.0

Subcarrier spacing of WiFi (in Hz)

Common

class espargos.Logger[source]

Bases: object

Logger class for pyespargos. This class is a singleton and should be used to modify the logging level of the library.

classmethod get_level()[source]

Returns the current logging level of the logger.

logger = <Logger pyespargos (INFO)>
classmethod set_level(level)[source]

Sets the logging level of the logger.

stderrHandler = <StreamHandler <stderr> (NOTSET)>