API Reference

The API is unstable at this early stage of development.

espargos.board

class espargos.board.Board(host: str)[source]

Bases: object

DEFAULT_CFO_CORRECTION_CONFIG = {'auto': True, 'value': 0}
DEFAULT_CSI_ACQUIRE_CONFIG = {'acquire_csi_beamformed': True, 'acquire_csi_dcm': True, 'acquire_csi_force_lltf': False, 'acquire_csi_he_stbc_mode': 2, 'acquire_csi_ht20': True, 'acquire_csi_ht40': True, 'acquire_csi_legacy': True, 'acquire_csi_mu': True, 'acquire_csi_su': True, 'acquire_csi_vht': True, 'compress_csi': False, 'dump_ack_en': True, 'enable': True, 'lltf_8bit_mode': False, 'val_scale_cfg': 2}
DEFAULT_GAIN_SETTINGS = {'fft_scale_enable': False, 'fft_scale_value': 0, 'rx_gain_enable': False, 'rx_gain_value': 0}
DEFAULT_WIFI_CHANNEL_OVERRIDES = {'channel-primary': [1, 1, 1, 1, 1, 1, 1, 1], 'channel-secondary': [0, 0, 0, 0, 0, 0, 0, 0], 'override_active': False}
add_consumer(clist: list, cv: Condition, *args)[source]

Adds a consumer to the CSI stream. A consumer is defined by a list, a condition variable and additional arguments. When a CSI packet is received, it will be appended to the list, and the condition variable will be notified.

Parameters:
  • clist – A list to which the CSI packet will be appended. The entry added to the list is a tuple (esp_num, serialized_csi, *args), where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and *args are the additional arguments.

  • cv – A condition variable that will be notified when a CSI packet is received

  • args – Additional arguments that will be added to the list along with the CSI packet

clear_mac_filter()[source]

Tell ESPARGOS board to receive packets from all transmitters.

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

close()[source]

Close transport resources associated with this board.

For UART-backed boards, this releases the serial port lock. Calling this on network-backed boards is harmless.

get_cfo_correction() dict[source]

Fetches receiver CFO correction configuration from the ESPARGOS controller.

get_csi_acquire_config() dict[source]

Fetches the current CSI acquisition configuration from the ESPARGOS controller.

Returns:

CSI acquisition configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_gain_settings() dict[source]

Fetches the current gain settings from the ESPARGOS controller.

Returns:

Gain settings dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_mac_filter() dict[source]

Fetches the current MAC filter configuration from the ESPARGOS controller.

The returned JSON/dict matches what is configured via set_mac_filter() / clear_mac_filter(). Format:

{
  "enable": true|false,
  "mac": "00:11:22:33:44:55",
  "mac_mask": "ff:ff:ff:ff:ff:ff"
}
Returns:

MAC filter configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_name()[source]

Returns the hostname of the ESPARGOS controller as configured in the web interface.

Returns:

The hostname of the ESPARGOS controller

get_radar_config() dict[source]

Fetches the current low-level radar TX configuration from the ESPARGOS controller.

The returned dict mirrors the controller’s /get_tx_control response and contains fields such as rfswitch_state, active_by_antid, start_by_antid, period_by_antid, mac_by_antid, tx_power, tx_phymode, and tx_rate.

Returns:

Radar TX configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_rfswitch() rfswitch_state_t[source]

Fetches the current RF switch state from the ESPARGOS controller.

Returns:

The current RF switch state as one of csi.rfswitch_state_t

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_wifi_channel_overrides() dict[source]

Fetches the current per-sensor WiFi channel overrides from the ESPARGOS controller.

Returns:

Per-sensor WiFi channel override settings dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

get_wificonf() dict[source]

Fetches the current WiFi configuration from the ESPARGOS controller.

The returned JSON/dict uses the same hyphenated keys as accepted by set_wificonf(), e.g. contains fields like “channel-primary”, “channel-secondary”, “country-code”, etc.

Returns:

WiFi configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

reboot()[source]

Trigger a controller reboot.

The controller responds with "ok" and then reboots shortly after sending the reply.

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_cfo_correction(auto: bool, value: int = 0)[source]

Configures receiver CFO correction on the ESPARGOS controller.

When auto is false, the receiver frequency-offset estimate is forced to value (signed 13-bit NRXFOE reg_foe_force field, -4096..4095). A value of zero can reduce packet-to-packet phase noise in radar mode when TX and RX share the same reference clock.

set_csi_acquire_config(config: dict)[source]

Sets the CSI acquisition configuration on the ESPARGOS controller.

The controller expects a JSON object (provided here as a Python dict) with integer fields (use 0/1 for booleans). Field names are fixed.

Boolean toggles:
  • enable: Enable to acquire CSI.

  • acquire_csi_legacy: Enable to acquire L-LTF when receiving a 11g PPDU.

  • acquire_csi_force_lltf: Force receiver to acquire L-LTF, regardless of PPDU type.

  • compress_csi: Transform CSI to a time-domain CIR before transport.

  • acquire_csi_ht20: Enable to acquire HT-LTF when receiving an HT20 PPDU.

  • acquire_csi_ht40: Enable to acquire HT-LTF when receiving an HT40 PPDU.

  • acquire_csi_vht: Present in the HTTP API; semantics depend on firmware build / PHY mode support.

  • acquire_csi_su: Enable to acquire HE-LTF when receiving an HE20 SU PPDU.

  • acquire_csi_mu: Enable to acquire HE-LTF when receiving an HE20 MU PPDU.

  • acquire_csi_dcm: Enable to acquire HE-LTF when receiving an HE20 DCM applied PPDU.

  • acquire_csi_beamformed: Enable to acquire HE-LTF when receiving an HE20 Beamformed applied PPDU.

  • dump_ack_en: Enable to dump 802.11 ACK frame, default disabled.

  • lltf_8bit_mode: Report L-LTF CSI as 8-bit values for every subcarrier instead of sparse 12-bit values.

Integer / enum fields:
  • acquire_csi_he_stbc_mode: When receiving an STBC applied HE PPDU:

    0 = acquire the complete HE-LTF1 1 = acquire the complete HE-LTF2 2 = sample evenly among the HE-LTF1 and HE-LTF2.

  • val_scale_cfg: Value 0-8.

Example payload:

{
  "enable": true,
  "acquire_csi_legacy": true,
  "acquire_csi_force_lltf": false,
  "compress_csi": false,
  "acquire_csi_ht20": true,
  "acquire_csi_ht40": true,
  "acquire_csi_vht": true,
  "acquire_csi_su": true,
  "acquire_csi_mu": true,
  "acquire_csi_dcm": true,
  "acquire_csi_beamformed": true,
  "acquire_csi_he_stbc_mode": 2,
  "val_scale_cfg": 2,
  "dump_ack_en": true,
  "lltf_8bit_mode": false
}
Parameters:

config – CSI acquisition configuration dict (will be JSON-encoded and POSTed to /set_csi_acquire_config)

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_gain_settings(settings: dict)[source]

Sets the gain settings on the ESPARGOS controller.

The gain settings are provided as a JSON object (here as a Python dict) with fixed field names. Values may be scalars, or arrays/lists with shape (2, 4) to configure each sensor individually in board (row, column) order.

  • fft_scale_enable (bool): Enable manual FFT scaling (false = automatic/firmware default).

  • fft_scale_value (int): FFT scale value (meaning/range depends on firmware; commonly 0 when disabled).

  • rx_gain_enable (bool): Enable manual RX gain (false = automatic/firmware default).

  • rx_gain_value (int): RX gain table index, 0..76 (commonly 0 when disabled).

  • expert_mode_enable (bool): Enable raw expert gain-table entry override.

  • expert_mode_raw (str): Raw gain-table entry as hexadecimal string.

Example payload:

{
  "fft_scale_enable": false,
  "fft_scale_value": 0,
  "rx_gain_enable": false,
  "rx_gain_value": 0
}
Parameters:

settings – Gain settings dict (will be JSON-encoded and POSTed to /set_gain_settings)

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_mac_filter(mac_filter: dict)[source]

Tell ESPARGOS board to only receive packets from transmitters with this sender MAC.

mac_filter is a dict with the following format:

{
  "enable": true|false,
  "mac": "00:11:22:33:44:55",
  "mac_mask": "ff:ff:ff:ff:ff:ff"
}

The “enable” field toggles MAC filtering. When enabled, only packets from transmitters whose MAC address matches the given “mac” (applying the “mac_mask”) will be received. “mac_mask” is a bitmask applied to both the configured MAC and the sender MAC before comparison. Only provided fields will be changed; others will remain as previously configured.

Parameters:

mac_filter – MAC filter configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_radar_config(config: dict)[source]

Sets the low-level radar TX configuration on the ESPARGOS controller.

The payload mirrors the controller’s /set_tx_control API. Supported fields are:

  • rfswitch_state (int)

  • active_by_antid (list[bool], length 8)

  • start_by_antid (list[int], length 8)

  • period_by_antid (list[int], length 8)

  • mac_by_antid (list[str], length 8, MAC addresses like "72:61:64:61:72:00")

  • tx_power (int)

  • tx_phymode (int)

  • tx_rate (int)

Only provided fields are changed; others remain unchanged on the controller.

Parameters:

config – Radar TX configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_rfswitch(state: rfswitch_state_t)[source]

Sets the RF switch state on the ESPARGOS controller for reception mode.

Parameters:

state – The RF switch state to set, must be one of csi.rfswitch_state_t

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_wifi_channel_overrides(settings: dict)[source]

Sets per-sensor WiFi channel overrides on the ESPARGOS controller.

The payload mirrors the controller’s /set_wifi_channel_overrides API:

  • override_active (bool): Enable per-sensor channel overrides.

  • channel-primary (list[int], length 8): Primary WiFi channel for each sensor.

  • channel-secondary (list[int], length 8): Secondary channel selector for each sensor (0 = none, 1 = above, 2 = below).

Passing {"override_active": False} disables the overrides.

Parameters:

settings – Per-sensor WiFi channel override settings dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

set_wificonf(wificonf: dict)[source]

Sets WiFi configuration on the ESPARGOS controller.

The controller expects a Python dict with fixed field names using hyphenated keys. Expected format:

{
  "calib-mode": 1,
  "calib-source": 0,
  "channel-primary": 13,
  "channel-secondary": 2,
  "country-code": "DE",
  "calib-txpower": 34,
  "calib-interval": 10
}
Field meanings / types (as used by the controller firmware):
  • “calib-mode” (int): When to generate phase reference packets: - 0: Never generate calibration packets - 1: Generate calibration packets if receiver RF switch is in reference channel configuration - 2: Always generate calibration packets

  • “calib-source” (int): Configures REFIN / REFOUT ports of controller: - 0: Use internal clock and phase reference for antennas - 1: Output clock and phase reference on REFOUT port, antennas expect clock and calibration from REFIN port (master mode) - 2: Antennas expect clock and calibration from REFIN port, do not output anything on REFOUT (slave mode)

  • “channel-primary” (int): Primary WiFi channel.

  • “channel-secondary” (int): Secondary channel selector (e.g. 0 = None, 1 = Above, 2 = Below).

  • “country-code” (str): Two-letter country code (e.g. “DE”).

  • “calib-txpower” (int): TX power used for calibration packets (between 8 = 2dBm and 80 = 20dBm).

  • “calib-interval” (int): Calibration interval (milliseconds).

Parameters:

wificonf – WiFi configuration dict

Raises:

EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid

start(transports=None)[source]

Starts the CSI stream thread for the ESPARGOS controller. The thread will run indefinitely until the stop() method is called.

Supported transports:

  • “udp”: The controller will send CSI packets to a local UDP socket. This transport is lower-latency and more efficient (higher throughput), but may not work in all network environments.

  • “websocket”: The controller will send CSI packets over a WebSocket connection. This transport is more widely compatible but may have higher latency and overhead.

  • “uart”: The controller will stream CSI data over the local serial/UART link. This transport is only available for hosts specified as uart:<port>.

Parameters:

transports – Optional list of transports to try, in order of preference. Valid values are “udp” and “websocket”. If None (default), tries UDP first (if supported by API version) and then WebSocket.

Raises:

EspargosCsiStreamConnectionError – If neither UDP nor WebSocket CSI stream could be established

stop()[source]

Stops the CSI stream thread for the ESPARGOS controller. The thread will stop after the current packet has been processed, or after a short timeout.

exception espargos.board.EspargosAPIVersionError[source]

Bases: Exception

Raised when the ESPARGOS controller runs an unsupported API version

exception espargos.board.EspargosCsiStreamConnectionError[source]

Bases: Exception

Raised when the CSI stream connection could not be established (e.g. magic packet not received)

exception espargos.board.EspargosHTTPStatusError(status=None, path=None, body=None)[source]

Bases: Exception

Raised when the ESPARGOS HTTP API returns an invalid status code

exception espargos.board.EspargosUnexpectedResponseError[source]

Bases: Exception

Raised when the server (ESPARGOS controller) provides unexpected response. Is the server really ESPARGOS?

class espargos.board.FragmentReassembler(timeout_s: float = 5.0, logger=None)[source]

Bases: object

clear()[source]
push(fragments, now: float | None = None)[source]

espargos.pool

exception espargos.pool.CalibrationError[source]

Bases: RuntimeError

Raised when ESPARGOS calibration cannot collect usable calibration CSI.

class espargos.pool.Pool(boards: list[Board], ota_cache_timeout=5, refgen_boards=None)[source]

Bases: object

A Pool is a collection of ESPARGOS boards. The pool manages the clustering of CSI data from multiple ESPARGOS sensors (antennas) that belong to the same WiFi packet and provides :class:’cluster.CSICluster’ objects to registered callbacks.

add_csi_callback(cb: Callable[[CSICluster], None], cb_predicate: Callable[[CSICluster], bool] = None)[source]

Register callback function that is invoked whenever a new CSI cluster is completed.

Parameters:
  • cb – The function to call, gets instance of class cluster.CSICluster as parameter

  • cb_predicate – A function with signature (csi_cluster) that defines the conditions under which clustered CSI is regarded as completed and thus provided to the callback. If cb_predicate returns true, clustered CSI is regarded as completed. If no predicate is provided, the default behavior is to trigger the callback when CSI has been received from all sensors on all boards. By default, callbacks receive over-the-air/radar CSI only. Enable set_emit_calibration_csi() to also emit calibration CSI (from internal reference generators) through this same callback path.

Returns:

A callback handle that can be passed to remove_csi_callback()

calibrate(per_board=True, duration=2, exithandler=None, cable_lengths=None, cable_velocity_factors=None, run_in_thread=True)[source]

Run calibration for a specified duration.

Parameters:
  • per_board – True to calibrate each board separately, False to calibrate all boards together. Set to False if the same phase reference signal is used for all boards, otherwise set to True.

  • duration – The duration in seconds for which calibration should be run

  • exithandler – An optional exit handler that can be used to stop calibration prematurely if exithandler.running is set to False in a separate thread

  • cable_lengths – The lengths of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards, in meters. Only needed for phase-coherent multi-board setups, omit if all cables have the same length.

  • cable_velocity_factors – The velocity factors of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards Must be the same length as cable_lengths, and all entries should be in the range [0, 1].

  • run_in_thread – If True, the pool handling will be performed in the current thread. Set to False in case the pool is already running in a separate thread (e.g., backlog is already active).

clear_mac_filter()[source]

Clear the MAC address filter for all boards in the pool.

get_calibration()[source]

Get the stored calibration values.

Returns:

The stored calibration values as a calibration.CSICalibration object

get_cfo_correction() dict[source]

Return CFO correction config; sanity-check all boards report the same value.

get_csi_acquire_config() dict[source]

Return CSI acquire config; sanity-check all boards report the same value.

get_emit_calibration_csi() bool[source]

Return whether calibration CSI clusters are emitted through normal CSI callbacks.

get_gain_settings() dict[source]

Return gain settings; sanity-check all boards report the same value.

get_mac_filter() dict[source]

Return MAC filter configuration; sanity-check all boards report the same value.

This is forwarded to pyespargos.board.Board.get_mac_filter() for each board.

get_radar_config() dict[source]

Return radar TX configuration; sanity-check all boards report the same value.

get_radar_configs() list[dict][source]

Return radar TX configuration for all boards in the pool.

get_rfswitch() rfswitch_state_t[source]

Get RF switch state from the first board in the pool.

Returns:

The RF switch state of the first board in the pool

get_shape()[source]

Get the outer shape of the stored data, i.e., only the antenna dimensions and not subcarrier dimensions or similar.

get_stats()[source]

Get collected statistics about the pool.

get_wifi_channel_overrides() dict[source]

Return per-sensor WiFi channel overrides; sanity-check all boards report the same value.

get_wificonf() dict[source]

Return WiFi config; sanity-check boards report the same value.

Consistency check ignores “calib-source” and “calib-mode” (they may legitimately differ).

reboot()[source]

Trigger a reboot on all boards in the pool.

remove_csi_callback(callback: _CSICallback) bool[source]

Remove a CSI callback previously returned by add_csi_callback().

Parameters:

callback – Callback handle returned by add_csi_callback()

Returns:

True if the callback was registered and removed, False otherwise

run()[source]

Process incoming CSI data packets and call registered callbacks if CSI clusters are complete. Repeatedly call this function from your main loop or from a separate thread. May block for a short amount of time if no data is available.

set_cfo_correction(auto: bool, value: int = 0)[source]

Configure CFO correction on all boards in this pool.

set_csi_acquire_config(config: dict)[source]

Set CSI acquisition configuration on all boards in this pool and sanity-check that all boards end up with the same config.

This is forwarded to pyespargos.board.Board.set_csi_acquire_config() for each board. For the expected JSON/dict format, refer to that method’s documentation.

Parameters:

config – CSI acquisition configuration dict to apply to all boards.

Raises:
  • ValueError – If boards in the pool disagree on the resulting config after applying.

  • EspargosUnexpectedResponseError – If any board returns an unexpected response.

set_emit_calibration_csi(enabled: bool)[source]

Control whether calibration CSI clusters are emitted through normal CSI callbacks.

Calibration clusters remain marked as calibration packets via espargos.cluster.CSICluster.is_calib().

set_gain_settings(settings: dict)[source]

Set gain settings on all boards in this pool.

This is forwarded to pyespargos.board.Board.set_gain_settings() for each board. Values may be scalars, board-local (row, column) arrays applied to every board, or pool-wide (board, row, column) arrays.

Parameters:

settings – Gain settings dict to apply to all boards.

Raises:

EspargosUnexpectedResponseError – If any board returns an unexpected response.

set_mac_filter(mac_filter: dict)[source]

Set the MAC address filter for all boards in the pool. Will only accept packets from the specified MAC address.

This is forwarded to pyespargos.board.Board.set_mac_filter() for each board.

set_radar_config(config: dict | RadarPoolConfig)[source]

Set radar TX configuration on the boards in this pool.

config may either be a single controller config dict applied to every board, or a pyespargos.espargos.radar.RadarPoolConfig containing one config per board.

set_rfswitch(state: rfswitch_state_t)[source]

Set RF switch state for all boards in the pool.

Parameters:

state – The RF switch state to set, must be one of csi.rfswitch_state_t

set_wifi_channel_overrides(settings: dict)[source]

Set per-sensor WiFi channel overrides on all boards in this pool and sanity-check that all boards end up with the same settings.

This is forwarded to pyespargos.board.Board.set_wifi_channel_overrides() for each board. For the expected JSON/dict format, refer to that method’s documentation.

Parameters:

settings – Per-sensor WiFi channel override settings dict to apply to all boards.

Raises:
  • ValueError – If boards in the pool disagree on the resulting settings after applying.

  • EspargosUnexpectedResponseError – If any board returns an unexpected response.

set_wificonf(wificonf: dict)[source]

Set WiFi config on all boards and sanity-check resulting configs match across boards.

This is forwarded to pyespargos.board.Board.set_wificonf() for each board. For the expected JSON/dict format, refer to that method’s documentation.

The values of “calib-source” and “calib-mode” are ignored and not propagated to the pool. If you need to set those, call pyespargos.board.Board.set_wificonf() on each board individually. Consistency check also ignores “calib-source” and “calib-mode” (they may legitimately differ).

Parameters:

wificonf – WiFi configuration dict to apply to all boards.

Raises:
  • ValueError – If boards in the pool disagree on the resulting config after applying (excluding ignored keys).

  • EspargosUnexpectedResponseError – If any board returns an unexpected response.

start()[source]

Start the streaming of CSI data for all boards in the pool.

stop()[source]

Stop the streaming of CSI data for all boards in the pool.

espargos.backlog

class espargos.backlog.BacklogFilter[source]

Bases: object

Base class for CSI backlog filters.

Subclasses implement matches() to decide whether a clustered CSI frame should be admitted to the backlog.

matches(clustered_csi)[source]
class espargos.backlog.CSIBacklog(pool, fields=None, calibrate=True, cb_predicate=None, size=100)[source]

Bases: object

CSI backlog class. Stores CSI data in a ringbuffer for processing when needed.

Parameters:
  • pool – CSI pool object to collect CSI data from

  • fields – List of fields to store (default: all), e.g., [“lltf”, “ht40”, “rssi”, “rx_gain”, “fft_gain”, “cfo”, “lltf_8bit_mode”, “timestamp”, “host_timestamp”, “mac”, “radar_tx_timestamp”, “radar_tx_index”, “radar_tx_power”, “radar_tx_rfswitch_state”]

  • calibrate – Apply calibration to CSI data (default: True)

  • cb_predicate – A function that defines the conditions under which clustered CSI is regarded as completed and thus added to the backlog. See espargos.pool.Pool.add_csi_callback() for more details.

  • size – Size of the ringbuffer (default: 100)

DATA_FORMATS = {'cfo': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'fft_gain': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'he20': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (245,)}, 'host_timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': False, 'shape': ()}, 'ht20': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (57,)}, 'ht40': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (117,)}, 'lltf': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (53,)}, 'lltf_8bit_mode': {'dtype': <class 'numpy.bool'>, 'per_antenna': True, 'shape': ()}, 'mac': {'dtype': <class 'numpy.uint8'>, 'per_antenna': False, 'shape': (6,)}, 'radar_tx_index': {'dtype': <class 'numpy.int16'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_power': {'dtype': <class 'numpy.int16'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_rfswitch_state': {'dtype': <class 'numpy.uint8'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': False, 'shape': ()}, 'rfswitch_state': {'dtype': <class 'numpy.uint8'>, 'per_antenna': True, 'shape': ()}, 'rssi': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'rx_gain': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': True, 'shape': ()}}
add_filter(backlog_filter)[source]

Add a filter to the backlog.

Parameters:

backlog_filter – Instance of BacklogFilter

add_update_callback(cb)[source]

Add a callback that is called when new CSI data is added to the backlog

clear()[source]

Clear all stored CSI datapoints from the ringbuffer.

This is useful after changing calibration or other receiver settings: entries already in the backlog were stored with the old interpretation and must not be mixed with freshly received CSI.

clear_filters()[source]

Remove all filters from the backlog.

count_valid_datapoints(key: str, allow_incomplete: bool = False) int[source]

Count datapoints for which a particular stored field (key) is valid (finite and nonzero).

The first axis of every backlog field is the datapoint axis. If allow_incomplete is false, only datapoints with all finite values are counted. If true, datapoints with at least one finite value are counted. Non-floating fields are always finite, so each stored entry counts.

Parameters:
  • key – Key of the data to inspect.

  • allow_incomplete – Whether partially valid datapoints count.

get(key)[source]

Retrieve data from the ringbuffer

Parameters:

key – Key of the data to retrieve (e.g., “lltf”, “ht40”, “rssi”, etc.)

Returns:

Data corresponding to the key, oldest first

get_fields()[source]

Get the list of fields currently stored in the backlog.

Returns:

List of fields currently stored in the backlog

get_filters()[source]

Get the list of currently active backlog filters.

Returns:

List of BacklogFilter instances

get_latest(key)[source]

Retrieve the latest value for a key in the ringbuffer.

Parameters:

key – Key of the data to retrieve

Returns:

Latest value, or None if no data is available

get_multiple(keys)[source]

Retrieve multiple data fields from the ringbuffer. You must use get_multiple to ensure consistency of data across multiple keys.

Parameters:

keys – List of keys of the data to retrieve (e.g., [“lltf”, “ht40”, “rssi”], etc.)

Returns:

Tuple of data arrays corresponding to the keys (in same order), contents are oldest first

get_size()[source]

Get the size of the backlog ringbuffer

Returns:

Size of the backlog ringbuffer

nonempty()[source]

Check if the backlog is nonempty

Returns:

True if the backlog is nonempty

remove_filter(backlog_filter)[source]

Remove a previously added filter from the backlog.

Parameters:

backlog_filter – Instance of BacklogFilter

set_fields(new_fields)[source]

Set the fields to be stored in the backlog. Existing data will be preserved for fields that are still present.

Parameters:

new_fields – New list of fields to store

set_size(new_size)[source]

Resize the backlog ringbuffer. If there are existing entries, they will be preserved up to the new size.

Parameters:

new_size – New size of the backlog ringbuffer

start()[source]

Start the CSI backlog thread, must be called before using the backlog

stop()[source]

Stop the CSI backlog thread

class espargos.backlog.Exclude11bFilter[source]

Bases: BacklogFilter

Backlog filter that drops 802.11b packets, which do not carry CSI.

matches(clustered_csi)[source]
class espargos.backlog.MacFilter(filter_regex)[source]

Bases: BacklogFilter

Backlog filter that matches source MAC addresses against a regular expression.

Parameters:

filter_regex – Regular expression applied to the source MAC string

matches(clustered_csi)[source]

espargos.calibration

class espargos.calibration.CSICalibration(boards: list[Board], channel_primary: int, channel_secondary: int, calibration_values_lltf: ndarray, calibration_values_ht20: ndarray, calibration_values_ht40: ndarray, calibration_values_he20: ndarray | None, sensor_clock_offsets: ndarray, board_cable_lengths=None, board_cable_vfs=None)[source]

Bases: object

apply_he20(values: ndarray) ndarray[source]

Apply phase calibration to the provided HE20 CSI data by oversampling the stored HT20 calibration.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HE20_COEFFICIENTS_PER_CHANNEL)

Returns:

The phase-calibrated CSI data

apply_ht20(values: ndarray) ndarray[source]

Apply phase calibration to the provided HT20 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL)

Returns:

The phase-calibrated CSI data

apply_ht40(values: ndarray) ndarray[source]

Apply phase calibration to the provided HT40 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)

Returns:

The phase-calibrated CSI data

apply_lltf(values: ndarray) ndarray[source]

Apply phase calibration to the provided L-LTF CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.

Parameters:

values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)

Returns:

The phase-calibrated CSI data

time_to_sensor_time(time)[source]

Convert a reference time into the corresponding local time for each sensor.

The provided reference time is interpreted relative to sensor 0. The return value therefore contains one time per sensor, offset by sensor_clock_offsets.

Parameters:

time – Reference time in seconds, relative to sensor 0. May be a scalar or an array broadcastable to the sensor layout.

Returns:

Per-sensor time values as a numpy array with shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)

espargos.cluster

class espargos.cluster.CSICluster(source_mac: str, dest_mac: str, seq_ctrl: seq_ctrl_t, board_revisions: list[BoardRevision])[source]

Bases: object

A CSICluster object represents a collection of CSI data estimated for the same WiFi packet.

The class clusters the CSI data from multiple ESPARGOS sensors (antennas), which may belong to the same or different ESPARGOS boards. It is used to store CSI data until it is complete and can be provided to a callback. CSI data may be from calibration packets or over-the-air packets.

add_csi(board_num: int, esp_num: int, serialized_csi: serialized_csi_tlv_t)[source]

Add CSI data to the cluster.

Parameters:
  • board_num – The number of the ESPARGOS board that received the CSI data

  • esp_num – The number of the ESPARGOS sensor within that board that received the CSI data

  • serialized_csi – The serialized CSI data

  • csi_cplx – The complex-valued CSI data

deserialize_csi_he20ltf()[source]

Deserialize the HE20 HE-LTF part of the CSI data.

The internal HE20 ordering is ascending subcarrier index -122..122. The invalid / null tones -1, 0, 1 are explicitly zeroed because the raw PHY payload may contain meaningless values there.

deserialize_csi_ht20ltf()[source]

Deserialize the HT20 (HT-LTF without channel bonding) part of the CSI data.

Returns:

The HT-LTF part of the CSI data as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL)

deserialize_csi_ht40ltf()[source]

Deserialize the HT40 (HT-LTF with channel bonding) part of the CSI data.

Returns:

The HT-LTF part of the CSI data as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)

deserialize_csi_lltf()[source]

Deserialize the L-LTF part of the CSI data.

Returns:

The L-LTF part of the CSI data as a complex-valued numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.LEGACY_COEFFICIENTS_PER_CHANNEL)

get_age()[source]

Get the age of the CSI data, in seconds.

The age is only approximate, it is based on the timestamp when the CSICluster object was created, not on the sensor timestamps.

Returns:

The age of the CSI data, in seconds

get_cfo()[source]

Get the CFO values decoded from the sensor rx_ctrl metadata.

get_completion()[source]

Get the completion state of the CSI data.

Returns:

A boolean numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW) that indicates which sensors have provided CSI data

get_completion_all()[source]

Get the global completion state of the CSI data, i.e., whether all sensors have provided CSI data.

Returns:

True if all sensors have provided CSI data, False otherwise

get_fft_gain()[source]

Get the signed FFT gain values reported for the WiFi packet.

get_gain_table_entry_raw()[source]

Get the raw 12-byte ESP32-C61 PHY gain-table entry used for each received packet.

get_gain_table_entry_valid()[source]

Return a mask indicating whether a raw gain-table entry was reported for each sensor.

get_host_timestamp()[source]

Get the timestamp at which the CSICluster object was created, which is approximately when the first sensor received the CSI data.

Returns:

The timestamp at which the first sensor received the CSI data, in seconds since the epoch

get_lltf_8bit_mode()[source]

Get whether each sensor reported LLTF CSI in 8-bit mode for this packet.

get_noise_floor()[source]

Get the noise floor of the WiFi packet.

Returns:

The noise floor of the WiFi packet

get_primary_channel() int[source]

Get the primary channel number.

Returns:

The primary channel number

get_radar_tx_index() int[source]

Return the flattened TX sensor index derived from the CSI stream UID, or -1 if unknown.

get_radar_tx_info()[source]

Return the transmit-side radar report for this packet, or None if not available.

get_rfswitch_state()[source]

Get the RF switch state of all sensors when the WiFi packet was received.

get_rssi()[source]

Get the RSSI values of the WiFi packet.

get_rx_gain()[source]

Get the signed RX gain values reported for the WiFi packet.

get_secondary_channel() int[source]

Get the secondary channel number.

Returns:

The secondary channel number

get_secondary_channel_relative()[source]

Get the relative position of the secondary channel with respect to the primary channel.

Returns:

0 if no secondary channel is used, 1 if the secondary channel is above the primary channel, -1 if the secondary channel is below the primary channel

get_sensor_timestamps()[source]

Get the (nanosecond-precision) timestamps at which the WiFi packet was sampled by the sensors. This timestamp does not include the offset that the chip derived from the CSI, it is only the sampling start time.

Returns:

A numpy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW) that contains the sensor timestamps in seconds

get_seq_ctrl()[source]

Get the sequence control field of the WiFi packet.

Returns:

The sequence control field of the WiFi packet

get_source_mac()[source]

Get the source MAC address of the WiFi packet.

Returns:

The source MAC address of the WiFi packet

has_he20ltf() bool[source]

Check if HE20 HE-LTF channel estimates are available for all complete sensors.

has_ht20ltf() bool[source]

Check if HT20 (HT-LTF without channel bonding) channel estimates are available for all complete sensors.

Returns:

True if there is HT20 CSI data for all complete sensors, False otherwise

has_ht40ltf() bool[source]

Check if HT40 (HT-LTF with 40MHz channel bonding) channel estimates are available for all complete sensors.

Returns:

True if there is HT40 CSI data for all complete sensors, False otherwise

has_lltf() bool[source]

Check if L-LTF channel estimates are available for all complete sensors.

Returns:

True if there is L-LTF CSI data for all complete sensors, False otherwise

has_radar_tx_report() bool[source]

Check whether this cluster has transmit-side radar metadata attached.

is_11b() bool[source]

Check whether this packet uses the 802.11b baseband format.

Returns:

True if the packet is 802.11b, False otherwise

is_calib() bool[source]

Check whether this cluster corresponds to a calibration packet.

is_radar() bool[source]

Check whether this cluster corresponds to a radar packet.

set_radar_tx_report(radar_tx_report: radar_tx_report_tlv_t, board_num: int | None = None, esp_num: int | None = None)[source]

Attach radar transmit metadata for the Wi-Fi packet represented by this cluster.

espargos.constants

espargos.constants.ANTENNAS_PER_BOARD = 8

Number of antennas on one board

espargos.constants.ANTENNAS_PER_ROW = 4

Number of antennas per row / per SPI controller on the board

espargos.constants.ANTENNA_JONES_CROSSPOL_MATRIX = array([[0.69141227+0.j        , 0.10475943+0.10475943j],        [0.10475943+0.10475943j, 0.69141227+0.j        ]])

Empirically determined cross-polarization component (due to intentional elliptical antenna polarization) of the Jones matrix

espargos.constants.ANTENNA_JONES_MATRIX = array([[-0.4148262 +0.07407611j,  0.4148262 -0.07407611j],        [ 0.56297841+0.07407611j,  0.56297841+0.07407611j]])

Jones matrix to convert from linear (H/V) to feed (R/L) polarization basis

espargos.constants.ANTENNA_JONES_MATRIX_SIMPLE = array([[-0.70710678,  0.70710678],        [ 0.70710678,  0.70710678]])

Simple Jones matrix to convert from linear (H/V) to feed (R/L) polarization basis

espargos.constants.ANTENNA_SEPARATION = 0.06

Distance between the centers of two antennas [m]

espargos.constants.FFT_GAIN_DB_PER_UNIT = 0.25

Reported FFT gain step size in dB

espargos.constants.ROWS_PER_BOARD = 2

Number of rows / SPI controllers on the board

espargos.constants.RX_GAIN_DB_PER_UNIT = 1.0

Reported RX gain step size in dB

espargos.constants.SPEED_OF_LIGHT = 299792458

Speed of light in a vacuum

espargos.constants.WIFI_CHANNEL1_FREQUENCY = 2412000000.0

Frequency of channel 1 in 2.4 GHz WiFi

espargos.constants.WIFI_CHANNEL_SPACING = 5000000.0

Frequency spacing of WiFi channels

espargos.constants.WIFI_SUBCARRIER_SPACING = 312500.0

Subcarrier spacing of WiFi (in Hz)

espargos.csi

espargos.csi.HE20_COEFFICIENTS_PER_CHANNEL = 245

Number of channel coefficients (active plus invalid subcarriers) for a 20 MHz HE-LTF

espargos.csi.HT40_GAP_SUBCARRIERS = 3

Gap between primary and secondary channel in HT40 mode, in subcarriers

espargos.csi.HT_COEFFICIENTS_PER_CHANNEL = 57

Number of channel coefficients (active subcarriers) per Wi-Fi channel in HT mode (HT-LTF)

espargos.csi.LEGACY_COEFFICIENTS_PER_CHANNEL = 53

Number of channel coefficients (active subcarriers) per Wi-Fi channel in legacy mode (L-LTF)

class espargos.csi.compressed_rx_ctrl_t(buf=None)[source]

Bases: Structure

cfo_high_rate

Structure/Union member

cfo_low_rate

Structure/Union member

channel

Structure/Union member

cur_bb_format

Structure/Union member

fft_gain

Structure/Union member

flags

Structure/Union member

he_sig1_mcs

Structure/Union member

noise_floor

Structure/Union member

rate

Structure/Union member

reserved

Structure/Union member

rssi

Structure/Union member

rx_channel_estimate_len

Structure/Union member

rx_gain

Structure/Union member

rxstart_time_cyc

Structure/Union member

secondary_channel

Structure/Union member

sig_mode

Structure/Union member

timestamp

Structure/Union member

class espargos.csi.csistream_fragment_header_t(buf=None)[source]

Bases: Structure

fragment_index

Structure/Union member

size

Structure/Union member

total_fragments

Structure/Union member

uid

Structure/Union member

espargos.csi.csistream_uid_to_antid(uid: int) int[source]
espargos.csi.decode_compressed_he20(buf) ndarray[source]
espargos.csi.decode_compressed_ht20(buf) ndarray[source]
espargos.csi.decode_compressed_ht40(buf) ndarray[source]
espargos.csi.decode_compressed_lltf(buf, acquire_force_lltf: bool = False, lltf_8bit_mode: bool = False, **kwargs) ndarray[source]
espargos.csi.deserialize_packet_buffer(revision, pktbuf)[source]

Deserialize a raw stream payload into the appropriate packet structure based on the type header.

espargos.csi.gain_byte_to_signed(value: int) int[source]

Interpret an 8-bit gain value reported in rx_ctrl as signed.

The hardware stores gain fields as bytes. RX gain normally lives in the positive gain-table range, while FFT gain can be negative and is therefore reported in two’s-complement form.

espargos.csi.get_cfo_from_rx_ctrl(rx_ctrl) int[source]

Compute the CFO value (in Hz) from a wifi_pkt_rx_ctrl_v3_t byte buffer.

This was reverse engineered from librftest.a (bb_common.o) and libphy.a (phy_feature.o).

Espressif’s RX metadata exposes two CFO fields. The low-rate field is used when the derived rate_index is below 8 (802.11b); otherwise the high-rate field is used (802.11g/n/ax).

espargos.csi.get_csi_format_subcarrier_count(preamble_format: str) int[source]
espargos.csi.get_csi_format_subcarrier_indices(preamble_format: str) ndarray[source]
espargos.csi.interpolate_he20ltf_gaps(csi_he20: ndarray) None[source]

Fill the HE20 invalid subcarriers -1, 0, 1 by linear interpolation in place.

Parameters:

csi_he20 – Complex HE20-LTF CSI array. The last dimension must contain HE20_COEFFICIENTS_PER_CHANNEL subcarriers in ascending order -122..122. Any leading dimensions are preserved.

espargos.csi.interpolate_ht20ltf_gap(csi_ht20: ndarray) None[source]

Fill the HT20-LTF DC subcarrier by linear interpolation in place.

Parameters:

csi_ht20 – Complex HT20-LTF CSI array. The last dimension must contain HT_COEFFICIENTS_PER_CHANNEL subcarriers in ascending order -28..28. Any leading dimensions are preserved.

espargos.csi.interpolate_ht40ltf_gap(csi_ht40: ndarray) None[source]

Fill the three HT40 gap subcarriers between primary and secondary channel in place.

Parameters:

csi_ht40 – Complex HT40-LTF CSI array. The last dimension must contain 2 * HT_COEFFICIENTS_PER_CHANNEL + HT40_GAP_SUBCARRIERS subcarriers in ascending order -58..58. Any leading dimensions are preserved.

espargos.csi.interpolate_lltf_gap(csi_lltf: ndarray) None[source]

Fill the L-LTF DC subcarrier by linear interpolation in place.

Parameters:

csi_lltf – Complex L-LTF CSI array. The last dimension must contain LEGACY_COEFFICIENTS_PER_CHANNEL subcarriers in ascending order -26..26. Any leading dimensions are preserved.

espargos.csi.iter_csistream_fragments(jumbo: bytes)[source]
espargos.csi.parse_csistream_jumbo_message(message: bytes) bytes[source]
class espargos.csi.radar_tx_report_tlv_t(buf=None)[source]

Bases: object

get_hardware_tx_phase_raw() int[source]

Extract the apparent signed 11-bit phase-ish field from timestamp register 2.

get_hardware_tx_timestamp_ns() float[source]

Decode the raw ESP32-C61 TX timestamp registers into sensor-local nanoseconds.

The wire format intentionally keeps the raw register values. This helper mirrors the low-level recovery formula from the firmware so analysis code can choose whether and how to use the decoded timestamp.

property has_hardware_tx_timestamp
property tx_succeeded
class espargos.csi.rfswitch_state_t(*values)[source]

Bases: IntEnum

SENSOR_RFSWITCH_ANTENNA_L = 3
SENSOR_RFSWITCH_ANTENNA_R = 2
SENSOR_RFSWITCH_ANTENNA_RANDOM = 4
SENSOR_RFSWITCH_ISOLATION = 0
SENSOR_RFSWITCH_REFERENCE = 1
SENSOR_RFSWITCH_UNKNOWN = 255
class espargos.csi.seq_ctrl_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the sequence control field of a Wi-Fi packet.

This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number.

frag

Structure/Union member

seg

Structure/Union member

class espargos.csi.serialized_csi_tlv_t(buf=None)[source]

Bases: object

property acquire_force_lltf
property acquire_lltf_8bit_mode
property acquire_lltf_bit_mode
property first_word_invalid
property is_calib
property is_compressed
property is_radar
espargos.csi.unpack_lltf12_values(buf, value_count: int) ndarray[source]
espargos.csi.unpack_lltf8_values(buf, pair_count)[source]
class espargos.csi.wifi_phy_mode_t(*values)[source]

Bases: IntEnum

WIFI_PHY_MODE_11A = 3
WIFI_PHY_MODE_11B = 1
WIFI_PHY_MODE_11G = 2
WIFI_PHY_MODE_HE20 = 6
WIFI_PHY_MODE_HT20 = 4
WIFI_PHY_MODE_HT40 = 5
WIFI_PHY_MODE_LR = 0
WIFI_PHY_MODE_VHT20 = 7
class espargos.csi.wifi_phy_rate_t(*values)[source]

Bases: IntEnum

WIFI_PHY_RATE_11M_L = 3
WIFI_PHY_RATE_11M_S = 7
WIFI_PHY_RATE_12M = 10
WIFI_PHY_RATE_18M = 14
WIFI_PHY_RATE_1M_L = 0
WIFI_PHY_RATE_24M = 9
WIFI_PHY_RATE_2M_L = 1
WIFI_PHY_RATE_2M_S = 5
WIFI_PHY_RATE_36M = 13
WIFI_PHY_RATE_48M = 8
WIFI_PHY_RATE_54M = 12
WIFI_PHY_RATE_5M_L = 2
WIFI_PHY_RATE_5M_S = 6
WIFI_PHY_RATE_6M = 11
WIFI_PHY_RATE_9M = 15
WIFI_PHY_RATE_LORA_250K = 41
WIFI_PHY_RATE_LORA_500K = 42
WIFI_PHY_RATE_MAX = 43
WIFI_PHY_RATE_MCS0_LGI = 16
WIFI_PHY_RATE_MCS0_SGI = 26
WIFI_PHY_RATE_MCS1_LGI = 17
WIFI_PHY_RATE_MCS1_SGI = 27
WIFI_PHY_RATE_MCS2_LGI = 18
WIFI_PHY_RATE_MCS2_SGI = 28
WIFI_PHY_RATE_MCS3_LGI = 19
WIFI_PHY_RATE_MCS3_SGI = 29
WIFI_PHY_RATE_MCS4_LGI = 20
WIFI_PHY_RATE_MCS4_SGI = 30
WIFI_PHY_RATE_MCS5_LGI = 21
WIFI_PHY_RATE_MCS5_SGI = 31
WIFI_PHY_RATE_MCS6_LGI = 22
WIFI_PHY_RATE_MCS6_SGI = 32
WIFI_PHY_RATE_MCS7_LGI = 23
WIFI_PHY_RATE_MCS7_SGI = 33
WIFI_PHY_RATE_MCS8_LGI = 24
WIFI_PHY_RATE_MCS8_SGI = 34
WIFI_PHY_RATE_MCS9_LGI = 25
WIFI_PHY_RATE_MCS9_SGI = 35
class espargos.csi.wifi_pkt_rx_ctrl_v3_t(buf=None)[source]

Bases: Structure

A ctypes structure representing the wifi_pkt_rx_ctrl_t as provided by the ESP32. See the related esp-idf code for details.

Variant for Espressif PHY version 3.

cfo_high_rate

Structure/Union member

cfo_low_rate

Structure/Union member

channel

Structure/Union member

cur_bb_format

Structure/Union member

data_rssi

Structure/Union member

dump_len

Structure/Union member

fft_gain

Structure/Union member

he_siga1

Structure/Union member

he_siga2

Structure/Union member

is_group

Structure/Union member

lsig_len

Structure/Union member

lsig_reserved

Structure/Union member

noise_floor

Structure/Union member

rate

Structure/Union member

rssi

Structure/Union member

rx_channel_estimate_info_vld

Structure/Union member

rx_channel_estimate_len

Structure/Union member

rx_gain

Structure/Union member

rx_state

Structure/Union member

rxend_state

Structure/Union member

rxmatch0

Structure/Union member

rxmatch1

Structure/Union member

rxmatch2

Structure/Union member

rxmatch3

Structure/Union member

rxstart_time_cyc

Structure/Union member

rxstart_time_cyc_dec

Structure/Union member

second

Structure/Union member

sig_len

Structure/Union member

sig_mode

Structure/Union member

sigb_len

Structure/Union member

timestamp

Structure/Union member

class espargos.csi.wifi_rx_bb_format_t(*values)[source]

Bases: IntEnum

RX_BB_FORMAT_11A = 1
RX_BB_FORMAT_11B = 0
RX_BB_FORMAT_11G = 1
RX_BB_FORMAT_HE_ERSU = 6
RX_BB_FORMAT_HE_MU = 5
RX_BB_FORMAT_HE_SU = 4
RX_BB_FORMAT_HE_TB = 7
RX_BB_FORMAT_HT = 2
RX_BB_FORMAT_VHT = 3
class espargos.csi.wifi_sig_mode_t(*values)[source]

Bases: IntEnum

SIG_MODE_HE = 2
SIG_MODE_HT = 1
SIG_MODE_LEGACY = 0
SIG_MODE_VHT = 3
class espargos.csi.wifi_tx_power_t(*values)[source]

Bases: IntEnum

WIFI_TX_POWER_11_DBM = 44
WIFI_TX_POWER_13_DBM = 52
WIFI_TX_POWER_14_DBM = 56
WIFI_TX_POWER_15_DBM = 60
WIFI_TX_POWER_16_5_DBM = 66
WIFI_TX_POWER_18_DBM = 72
WIFI_TX_POWER_20_DBM = 80
WIFI_TX_POWER_2_DBM = 8
WIFI_TX_POWER_5_DBM = 20
WIFI_TX_POWER_7_DBM = 28
WIFI_TX_POWER_8_5_DBM = 34

espargos.exithandler

class espargos.exithandler.ExitHandler[source]

Bases: object

handler(*args)[source]
kill(*args, **kwargs)[source]
running = True

espargos.radar

class espargos.radar.RadarPoolConfig(board_configs: list[dict])[source]

Bases: object

Low-level radar configuration for an entire pool, one controller config per board.

board_configs: list[dict]
espargos.radar.build_pool_config(calibration: CSICalibration, active_by_sensor, t0_by_sensor, period_by_sensor, tx_power: wifi_tx_power_t, tx_phymode: wifi_phy_mode_t, tx_rate: wifi_phy_rate_t, rfswitch_state: rfswitch_state_t, mac_by_sensor=None) RadarPoolConfig[source]

Build low-level per-board radar controller configuration from a reference-time schedule.

active_by_sensor, t0_by_sensor and period_by_sensor may be scalars, board-local (row, column) arrays, or pool-wide (board, row, column) arrays.

t0_by_sensor is interpreted as a reference time in seconds relative to sensor 0. The returned board configs contain start_by_antid values in controller units (currently integer microseconds), converted to each sensor’s local clock using the stored sensor_clock_offsets from the provided calibration.

espargos.radar.correct_radar_csi_tx_timestamps(csi_data: ndarray, tx_timestamps_s, tx_sensor_indices, subcarrier_frequencies_hz, sensor_clock_offsets, *, tx_timestamp_offset_s: float = 0.0) ndarray[source]

Apply radar TX timestamp phase correction to frequency-domain CSI.

The CSI deserialization path already applies the receive-side timestamp/STO correction. This helper applies the complementary transmit-side correction using the radar TX report timestamp. tx_timestamps_s are sensor-local TX timestamps; they are converted into the calibration reference clock by subtracting the transmitting sensor’s clock offset.

subcarrier_frequencies_hz may be absolute RF frequencies or baseband subcarrier offsets. For consistency with the existing CSI deserialization timestamp correction, callers usually want baseband offsets.

The subcarrier axis is expected to be the last axis of csi_data.

Parameters:
  • csi_data – Complex CSI array with subcarriers on the last axis.

  • tx_timestamps_s – TX timestamp(s), in seconds. Scalar or leading-shape array.

  • tx_sensor_indices – Flattened TX sensor index/indices matching tx_timestamps_s.

  • subcarrier_frequencies_hz – Frequency for each subcarrier, in Hz.

  • sensor_clock_offsets – Sensor clock offsets, in seconds, as an array that flattens in the same order as tx_sensor_indices.

  • tx_timestamp_offset_s – Constant offset added to each TX timestamp before correction. This accounts for packet-boundary conventions in the hardware TX timestamp source.

Returns:

Corrected CSI array with the same shape as csi_data.

espargos.radar.ftm_get_phy_comp(*, responder: bool, primary_channel: int, secondary_channel: int = 0, mode: int = 0, sta_connected: bool = False) int[source]

Reimplementation of ESP32-C61 ftm_get_phy_comp from the Wi-Fi blob.

This raw-units version is kept as a reference to the recovered original control flow. New code should prefer get_ftm_tx_timestamp_reciprocity_delay_s(), which expresses the same logic in a table-driven form and returns seconds directly.

The returned value is in raw FTM timestamp units. Multiply by FTM_TIMESTAMP_UNIT_S to convert to seconds. secondary_channel follows ESP-IDF’s wifi_second_chan_t convention: 0 none, 1 above, 2 below.

sta_connected only affects the initiator/nonzero-mode branch in the recovered implementation.

espargos.radar.get_ftm_tx_timestamp_reciprocity_delay_s(*, responder: bool, primary_channel: int, secondary_channel: int = 0, home_channel_ht40: bool = False, mode: int | None = None, sta_connected: bool = False) float[source]

Return the PHY reciprocity delay used to align TX/RX FTM timestamps.

This is the same compensation modeled by ftm_get_phy_comp(), but expressed in terms of role, HT40 home-channel state, channel layout, and channel group instead of reproducing the recovered branch tree. The returned value is in seconds.

home_channel_ht40 is intentionally separate from secondary_channel. The blob does not use the nonzero mode branch as a generic “secondary channel present” test; it switches into a distinct PHY compensation regime used when the current/home channel context is HT40. That is why this flag remains separate even though primary_channel and secondary_channel are also provided.

mode is kept as a backward-compatible alias for the recovered blob parameter. When provided, only its zero/nonzero state matters.

secondary_channel follows ESP-IDF’s wifi_second_chan_t convention: 0 none, 1 above, 2 below.

espargos.revisions

class espargos.revisions.BoardRevision[source]

Bases: object

antid_to_row_col(antid: int) tuple[source]

Convert firmware antenna id to (row, column) on this board revision.

property calib_trace_delays: list

Calibration trace signal delays on ESPARGOS PCB [in s]

esp_num_to_row_col(esp_num: int) tuple[source]

Convert ESP number to (row, column) on the board

property identification: tuple
sensor_values_to_antid_list(values, name: str = 'values') list[source]

Convert a board-local (row, column) array to firmware antenna-id order.

property serialized_csi_t: type
property type_header: int
class espargos.revisions.BoardRevisionDensiflorus[source]

Bases: BoardRevision

property antid_to_esp_num: dict
esp_num_to_row_col(esp_num: int) tuple[source]

Convert ESP number to (row, column) on the board

property identification: tuple
property serialized_csi_t: type
property type_header: int

espargos.uart

class espargos.uart.UARTClient(host: str, *, timeout: float = 5.0)[source]

Bases: object

add_csi_callback(callback)[source]
add_log_callback(callback)[source]
close()[source]
connect()[source]
disable_csi_stream()[source]
disable_transport_mode()[source]
enable_csi_stream()[source]
hello(timeout: float | None = None) dict[source]
remove_csi_callback(callback)[source]
remove_log_callback(callback)[source]
request(method: str, path: str, body: bytes | str | None = None, timeout: float | None = None) UARTControlResponse[source]
class espargos.uart.UARTControlResponse(status: int, content_type: str, body: bytes)[source]

Bases: object

body_text() str[source]
exception espargos.uart.UARTProtocolError[source]

Bases: Exception

Raised when the ESPARGOS UART protocol encounters malformed frames or unexpected responses.

exception espargos.uart.UARTTimeoutError[source]

Bases: TimeoutError

Raised when a UART request times out.

espargos.uart.cobs_decode(data: bytes) bytes[source]
espargos.uart.cobs_encode(data: bytes) bytes[source]
espargos.uart.is_uart_host(host: str) bool[source]
espargos.uart.parse_uart_host(host: str) tuple[str, dict][source]
espargos.uart.validate_csistream_payload(payload: bytes, revision) bool[source]

espargos.util

class espargos.util.AntennaOrientation(*values)[source]

Bases: Enum

Orientation of a sub-array antenna in the combined array. Describes how the board-local coordinate system maps to the combined array coordinate system. The naming convention uses compass directions: the orientation indicates which direction the board’s row axis (short axis of the 2x4 sub-array) points in the combined array. The N orientation corresponds to the “default” orientation of the board (4 antennas for azimuth, 2 for elevation), where the row axis points downwards and the column axis points to the right (typically used when mounted on tripod).

E = ((0, 1), (-1, 0))
N = ((1, 0), (0, 1))
S = ((-1, 0), (0, -1))
W = ((0, -1), (1, 0))
rotation_matrix()[source]

Returns the 2x2 rotation matrix that maps from the board-local coordinate system to the combined array coordinate system. This is the same as the stride matrix.

espargos.util.build_combined_array_data(indexing_matrix, input_data)[source]

Helper for combined array setups. Re-structures data from multiple subarrays into a single large array, using the provided indexing matrix. Typically, the input data is the CSI data of the subarrays, but it can also be anything else with shape (datapoints, boards, rows, columns, …).

Parameters:
  • indexing_matrix – The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.

  • input_data – The data of the subarrays. Complex-valued NumPy array with shape (datapoints, boards, rows, columns, …).

Returns:

The combined array data. Complex-valued NumPy array with shape (datapoints, rows, columns, subcarriers).

espargos.util.build_jones_matrices(antenna_orientations: ndarray, base_jones_matrix: ndarray = None)[source]

Build per-antenna effective Jones matrices for a combined array, accounting for the physical rotation of each sub-array.

The effective Jones matrix for each antenna maps from the R/L feed basis to the global H/V linear polarization basis, taking into account the antenna’s physical orientation.

The physical model: the R/L feed probes are mounted on the antenna and rotate with it, while the incoming field has fixed global H/V polarization. Rotating the antenna by angle \(\theta\) therefore acts in the feed (output) space of the Jones matrix:

\[J_{\text{eff}} = R^T(\theta) \cdot J_{\text{base}}\]

Inverting to obtain the R/L → H/V mapping:

\[J_{\text{eff}}^{-1} = J_{\text{base}}^{-1} \cdot R(\theta)\]

where \(R(\theta)\) is the 2D rotation matrix for the antenna’s orientation and \(J_{\text{base}}\) is the base Jones matrix (H/V to R/L conversion for the default orientation).

Parameters:
  • antenna_orientations – Array of AntennaOrientation values with shape (rows, cols).

  • base_jones_matrix – The base Jones matrix mapping H/V to R/L for the default (N) orientation. If None, uses constants.ANTENNA_JONES_MATRIX.

Returns:

Array of effective inverse Jones matrices with shape (rows, cols, 2, 2). Multiply with R/L feed vector to obtain global H/V: H_V = jones[r, c] @ R_L.

espargos.util.csi_interp_eigenvec(csi: ndarray, weights: ndarray = None)[source]

Interpolates CSI data (frequency-domain or time-domain) by finding the principal eigenvector of the covariance matrix.

Parameters:
  • csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.

  • weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.

espargos.util.csi_interp_eigenvec_per_subcarrier(csi: ndarray) ndarray[source]

Interpolates CSI data by finding the principal eigenvector of the per-subcarrier covariance matrix. Unlike csi_interp_eigenvec(), this function computes a separate covariance matrix for each subcarrier (last dimension), which preserves the frequency-domain structure of the CSI data.

The result is scaled by the square root of the principal eigenvalue and phase-referenced to the first antenna element (index 0).

Parameters:

csi – Complex-valued CSI data with shape (n_samples, *antenna_shape, n_subcarriers). The first dimension is the number of CSI datapoints (e.g., calibration clusters), the last dimension is the number of subcarriers, and any intermediate dimensions describe the antenna array geometry.

Returns:

Interpolated CSI data with shape (*antenna_shape, n_subcarriers).

espargos.util.csi_interp_iterative(csi: ndarray, weights: ndarray = None, iterations=10)[source]

Coherently combines repeated CSI observations by iteratively phase-aligning them.

Each CSI snapshot is assumed to differ from the others mainly by a single global phase rotation. The algorithm alternates between two steps: estimating a combined CSI from the current phase offsets, and updating the phase offset of each snapshot to best match that combined CSI.

Parameters:
  • csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.

  • weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.

  • iterations – The number of iterations to perform. Default is 10.

Returns:

A phase-aligned weighted average of the input CSI data, with the same shape as one CSI datapoint.

espargos.util.csi_interp_iterative_by_array(csi: ndarray, weights: ndarray = None, iterations=10)[source]

Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Same as csi_interp_iterative(), but assumes that second dimension of csi is the antenna array dimension and performs the interpolation for each antenna array separately.

espargos.util.derive_he20_calibration_from_lltf(complete_clusters_lltf: ndarray, complete_cluster_timestamps: ndarray, secondary_channel_relative: int) ndarray[source]

Derive a phase calibration for HE20 CSI from calibration packets that only provide LLTF.

HE20 uses four times finer subcarrier spacing than LLTF / HT20. This means that a delay which is only observed on the coarse 312.5 kHz LLTF / HT20 grid is ambiguous when projected onto the denser 78.125 kHz HE20 grid: multiple HE20 phase slopes can agree on every fourth subcarrier while disagreeing on the intermediate HE20 tones. We therefore cannot obtain a reliable HE20 calibration by simply fitting a slope on the coarse grid and reusing it unchanged.

This helper resolves the problem by going back to “first principles” of calibration and estimating time and phase offset separately:

  1. Estimate constant per-antenna phase offsets from the already STO-corrected LLTF calibration clusters using a principal-eigenvector estimate.

  2. Undo the LLTF timestamp-based STO correction, recover the underlying per-antenna baseband timing offsets from the raw LLTF slope together with the calibration timestamps, and synthesize the corresponding HE20 phase slope on the denser HE20 subcarrier grid.

The final HE20 calibration is the combination of those per-antenna constant phase offsets and the timestamp-derived HE20 phase slope.

Parameters:
  • complete_clusters_lltf – Complete LLTF calibration CSI clusters as a complex-valued NumPy array with shape (clusters, boards, rows, columns, subcarriers). These values are expected to come from CSICluster.deserialize_csi_lltf() and are therefore already STO-corrected using the forwarded hardware timestamps.

  • complete_cluster_timestamps – Per-sensor timestamps corresponding to complete_clusters_lltf, in seconds, as a NumPy array with shape (clusters, boards, rows, columns).

  • secondary_channel_relative – Relative position of the secondary channel used for the calibration packets. Use -1 for HT40 below, +1 for HT40 above, and 0 for a plain 20 MHz channel.

Returns:

Complex-valued HE20 calibration array with shape (boards, rows, columns, csi.HE20_COEFFICIENTS_PER_CHANNEL).

espargos.util.estimate_toas_rootmusic(csi_fdomain: ndarray, max_source_count=2, chunksize=36, per_board_average=False)[source]

Estimate the time of arrivals (ToAs) of the LoS paths using the root-MUSIC algorithm.

Parameters:
  • csi_fdomain – The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

  • max_source_count – The maximum number of sources to estimate. The number of sources is determined using the Rissanen MDL criterion, but this parameter can be used to limit the number of sources.

  • chunksize – The size of the chunks to use for the covariance matrix computation.

  • per_board_average – If True, compute the average ToA over all antennas per board. If False, return the ToAs for each antenna separately.

Returns:

The estimated ToAs of the LoS paths, in seconds, NumPy array of shape (boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW).

espargos.util.extract_ht20_subcarriers_from_ht40(csi_ht40: ndarray, secondary_channel_relative: int)[source]

Extract the HT20 subcarriers from HT40 CSI data.

Parameters:
  • csi_ht40 – The HT40 CSI data. Complex-valued NumPy array with shape (…, subcarriers).

  • secondary_channel_relative – The relative position of the secondary channel to the primary channel. -1 for below, +1 for above.

Returns:

The extracted HT20 CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

espargos.util.extract_lltf_subcarriers_from_ht20(csi_ht20: ndarray)[source]

Extract the LLTF subcarriers from HT20 CSI data.

Parameters:

csi_ht20 – The HT20 CSI data. Complex-valued NumPy array with shape (…, subcarriers).

Returns:

The extracted LLTF CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

espargos.util.extract_lltf_subcarriers_from_ht40(csi_ht40: ndarray, secondary_channel_relative: int)[source]

Extract the LLTF subcarriers from HT40 CSI data.

Parameters:
  • csi_ht40 – The HT40 CSI data. Complex-valued NumPy array with shape (…, subcarriers).

  • secondary_channel_relative – The relative position of the secondary channel to the primary channel. -1 for below, +1 for above.

Returns:

The extracted LLTF CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

espargos.util.fdomain_to_tdomain_pdp_music(csi_fdomain: ndarray, source_count: int = None, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]

Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using MUSIC super-resolution.

Param:

csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

Returns:

The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.

espargos.util.fdomain_to_tdomain_pdp_mvdr(csi_fdomain: ndarray, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]

Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using the MVDR beamformer.

Param:

csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

Returns:

The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.

espargos.util.fit_complex_sinusoid(csi_data: ndarray) ndarray[source]

Fit a complex sinusoid (amplitude, phase offset, and linear phase slope) to CSI data along the subcarrier axis (last dimension).

Each antenna’s frequency response over a reference channel is modeled as:

\[H[k] = A \cdot \exp\!\bigl(j\,(\varphi_0 + \omega \, k)\bigr)\]

where k is the subcarrier index, A is the amplitude, \(\varphi_0\) is the phase offset, and \(\omega\) is the phase slope (proportional to propagation delay).

The function estimates the parameters per antenna element and returns the reconstructed (fitted) complex sinusoid evaluated at every subcarrier index.

Parameters:

csi_data – Complex-valued CSI array with arbitrary leading dimensions (e.g. antenna geometry) and subcarriers as the last dimension. Shape (*antenna_shape, n_subcarriers).

Returns:

Fitted complex sinusoid with the same shape as csi_data.

espargos.util.get_cable_wavelength(frequencies: ndarray, velocity_factors: ndarray)[source]

Returns the wavelength of the provided subcarrier frequencies on a cable with the given velocity factors.

Parameters:
  • frequencies – The frequencies of the subcarriers, in Hz, NumPy array.

  • velocity_factors – The velocity factors of the cable, NumPy array.

Returns:

The wavelengths of the subcarriers, in meters, NumPy array.

espargos.util.get_center_frequency(primary_channel: int, secondary_channel: int | None = None)[source]

Returns the RF center frequency for the provided Wi-Fi channel configuration.

If only primary_channel is given, this returns the center frequency of that 20 MHz channel. If secondary_channel is also given, this returns the center frequency halfway between primary and secondary, which corresponds to the HT40 LO.

Parameters:
  • primary_channel – The primary Wi-Fi channel number.

  • secondary_channel – The secondary Wi-Fi channel number. If omitted or equal to primary_channel, the 20 MHz channel center is returned.

Returns:

Center frequency in Hz.

espargos.util.get_frequencies_he20(channel: int)[source]

Returns the frequencies of the subcarriers in a 2.4 GHz 802.11ax HE20 channel.

The raw HE-LTF reported by the ESP32-C61 covers subcarrier indices -122..122, where -1, 0, 1 are invalid / null tones.

Parameters:

channel – The primary channel number.

Returns:

The frequencies of the HE20 subcarriers, in Hz, NumPy array.

espargos.util.get_frequencies_ht20(channel: int)[source]

Returns the frequencies of the subcarriers in an 2.4GHz 802.11n 20MHz wide WiFi channel.

Parameters:

primary_channel – The primary channel number (= primary channel, but there is only one channel).

Returns:

The frequencies of the subcarriers, in Hz, NumPy array.

espargos.util.get_frequencies_ht40(primary_channel: int, secondary_channel: int)[source]

Returns the frequencies of the subcarriers in an HT40 2.4GHz WiFi channel. :param primary_channel: The primary channel number. :param secondary_channel: The secondary channel number. :return: The frequencies of the subcarriers, in Hz, NumPy array.

espargos.util.get_frequencies_lltf(channel: int)[source]

Returns the frequencies of the subcarriers in an 2.4GHz 802.11g 20MHz wide WiFi channel.

Parameters:

primary_channel – The primary channel number (= primary channel, but there is only one channel).

Returns:

The frequencies of the subcarriers, in Hz, NumPy array.

espargos.util.mask_csi_by_feed(csidata: ndarray, rfswitch_states: ndarray, desired_feed: rfswitch_state_t)[source]

Mask the CSI data by the RF switch state, i.e., set the CSI data to 0 for all datapoints where the RF switch state is not the desired feed. Also applies scaling to the remaining datapoints to account for the fact that only a fraction of the datapoints are kept, so that the overall power level is preserved.

Parameters:
  • csidata – The CSI data to mask. Complex-valued NumPy array with shape (datapoints, …, subcarriers), usually (datapoints, arrays, rows, columns, subcarriers).

  • rfswitch_states – The RF switch states for each antenna and datapoint. NumPy array with shape (datapoints, …), usually (datapoints, arrays, rows, columns).

  • desired_feed – The desired RF switch state to keep.

Returns:

The masked CSI data. Complex-valued NumPy array with the same shape as the input CSI data. Returns None if no datapoints have the desired RF switch state for any antenna.

espargos.util.parse_combined_array_config(config_dict: dict)[source]

Parse the configuration file for demos that use combined array.

Parameters:

config_dict – The configuration dictionary to parse.

Return indexing_matrix:

The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.

Return board_names_hosts:

The names of the boards and their hostnames.

Return cable_lengths:

The lengths of the cables connecting the boards.

Return cable_velocity_factors:

The velocity factors of the cables connecting the boards.

Return n_rows:

The number of rows in the array.

Return n_cols:

The number of columns in the array.

Return antenna_orientations:

Array of AntennaOrientation values with shape (n_rows, n_cols), indicating the orientation of each antenna’s sub-array in the combined array.

Raises:

ValueError – If the configuration is invalid (e.g., invalid antenna indices, missing/duplicate antennas, non-contiguous sub-arrays, invalid rotation).

espargos.util.remove_mean_sto(csi_datapoints: ndarray)[source]

Removes the mean symbol timing offset (STO) from the CSI data by estimating the STO from the phase slope across subcarriers. All datapoints are corrected separately.

Parameters:

csi_datapoints – The CSI data (multiple datapoints) to remove the mean STO from, frequency-domain. Complex-valued NumPy array with arbitrary shape as long as the first dimension is the datapoint dimension and the last dimension is the subcarrier dimension.

espargos.util.scale_csi_by_reported_gain(csi_data: ndarray, rx_gain: ndarray, fft_gain: ndarray) ndarray[source]

Compensate CSI amplitudes for the receiver gain reported by the ESP32.

The ESP32 reports RX gain in 1 dB units and FFT gain in 0.25 dB units. Since these are receiver-side gains, this helper divides raw CSI amplitudes by the reported gain factor. The scaling is valid for both automatic and manual gain mode, because the reported values are always meaningful.

espargos.util.separate_feeds(csidata: ndarray, rfswitch_state: ndarray)[source]

Separate the CSI data by antenna feeds (R/L) based on the RF switch states. Also takes care of scaling the CSI data for each feed to account for the fact that only a fraction of the datapoints are kept for each feed, so that the overall power level is preserved. Missing measurements for a feed (i.e., half of all measurements) are filled with zeros.

Parameters:
  • csidata – The CSI data to separate. Complex-valued NumPy array with shape (datapoints, …, subcarriers), usually (datapoints, arrays, rows, columns, subcarriers).

  • rfswitch_states – The RF switch states for each antenna and datapoint. NumPy array with shape (datapoints, …), usually (datapoints, arrays, rows, columns).

Returns:

The separated CSI data. Complex-valued NumPy array with shape (datapoints, …, subcarriers, 2), where the last dimension corresponds to the R/L feeds. Returns None if no datapoints have the desired RF switch state for any antenna.

espargos.util.shift_to_firstpeak_sync(csi_datapoints: ndarray, max_delay_taps=3, search_resolution=40, peak_threshold=0.1)[source]

Shifts the CSI data so that the first peak of the channel impulse response is at time 0. All CSI datapoints are shifted by the same amount, i.e., requires synchronized CSI.

Parameters:
  • csi_datapoints – The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).

  • max_delay_taps – The maximum number of time taps to shift the CSI data by.

  • search_resolution – The number of search points (granularity) to use for the time shift.

  • peak_threshold – The threshold for the peak detection, as a fraction of the maximum peak power.

Returns:

The frequency-domain CSI data with the first peak of the channel impulse response at time 0.

Common

class espargos.Logger[source]

Bases: object

Logger class for pyespargos. This class is a singleton and should be used to modify the logging level of the library.

classmethod get_level()[source]

Returns the current logging level of the logger.

logger = <Logger pyespargos (INFO)>
classmethod set_level(level)[source]

Sets the logging level of the logger.

stderrHandler = <StreamHandler <stderr> (NOTSET)>