API Reference
The API is unstable at this early stage of development.
espargos.board
- class espargos.board.Board(host: str)[source]
Bases:
object- DEFAULT_CFO_CORRECTION_CONFIG = {'auto': True, 'value': 0}
- DEFAULT_CSI_ACQUIRE_CONFIG = {'acquire_csi_beamformed': True, 'acquire_csi_dcm': True, 'acquire_csi_force_lltf': False, 'acquire_csi_he_stbc_mode': 2, 'acquire_csi_ht20': True, 'acquire_csi_ht40': True, 'acquire_csi_legacy': True, 'acquire_csi_mu': True, 'acquire_csi_su': True, 'acquire_csi_vht': True, 'compress_csi': False, 'dump_ack_en': True, 'enable': True, 'lltf_8bit_mode': False, 'val_scale_cfg': 2}
- DEFAULT_GAIN_SETTINGS = {'fft_scale_enable': False, 'fft_scale_value': 0, 'rx_gain_enable': False, 'rx_gain_value': 0}
- DEFAULT_WIFI_CHANNEL_OVERRIDES = {'channel-primary': [1, 1, 1, 1, 1, 1, 1, 1], 'channel-secondary': [0, 0, 0, 0, 0, 0, 0, 0], 'override_active': False}
- add_consumer(clist: list, cv: Condition, *args)[source]
Adds a consumer to the CSI stream. A consumer is defined by a list, a condition variable and additional arguments. When a CSI packet is received, it will be appended to the list, and the condition variable will be notified.
- Parameters:
clist – A list to which the CSI packet will be appended. The entry added to the list is a tuple
(esp_num, serialized_csi, *args), where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and*argsare the additional arguments.cv – A condition variable that will be notified when a CSI packet is received
args – Additional arguments that will be added to the list along with the CSI packet
- clear_mac_filter()[source]
Tell ESPARGOS board to receive packets from all transmitters.
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- close()[source]
Close transport resources associated with this board.
For UART-backed boards, this releases the serial port lock. Calling this on network-backed boards is harmless.
- get_cfo_correction() dict[source]
Fetches receiver CFO correction configuration from the ESPARGOS controller.
- get_csi_acquire_config() dict[source]
Fetches the current CSI acquisition configuration from the ESPARGOS controller.
- Returns:
CSI acquisition configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_gain_settings() dict[source]
Fetches the current gain settings from the ESPARGOS controller.
- Returns:
Gain settings dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_mac_filter() dict[source]
Fetches the current MAC filter configuration from the ESPARGOS controller.
The returned JSON/dict matches what is configured via
set_mac_filter()/clear_mac_filter(). Format:{ "enable": true|false, "mac": "00:11:22:33:44:55", "mac_mask": "ff:ff:ff:ff:ff:ff" }
- Returns:
MAC filter configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_name()[source]
Returns the hostname of the ESPARGOS controller as configured in the web interface.
- Returns:
The hostname of the ESPARGOS controller
- get_radar_config() dict[source]
Fetches the current low-level radar TX configuration from the ESPARGOS controller.
The returned dict mirrors the controller’s
/get_tx_controlresponse and contains fields such asrfswitch_state,active_by_antid,start_by_antid,period_by_antid,mac_by_antid,tx_power,tx_phymode, andtx_rate.- Returns:
Radar TX configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_rfswitch() rfswitch_state_t[source]
Fetches the current RF switch state from the ESPARGOS controller.
- Returns:
The current RF switch state as one of
csi.rfswitch_state_t- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_wifi_channel_overrides() dict[source]
Fetches the current per-sensor WiFi channel overrides from the ESPARGOS controller.
- Returns:
Per-sensor WiFi channel override settings dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- get_wificonf() dict[source]
Fetches the current WiFi configuration from the ESPARGOS controller.
The returned JSON/dict uses the same hyphenated keys as accepted by
set_wificonf(), e.g. contains fields like “channel-primary”, “channel-secondary”, “country-code”, etc.- Returns:
WiFi configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- reboot()[source]
Trigger a controller reboot.
The controller responds with
"ok"and then reboots shortly after sending the reply.- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_cfo_correction(auto: bool, value: int = 0)[source]
Configures receiver CFO correction on the ESPARGOS controller.
When
autois false, the receiver frequency-offset estimate is forced tovalue(signed 13-bit NRXFOE reg_foe_force field, -4096..4095). A value of zero can reduce packet-to-packet phase noise in radar mode when TX and RX share the same reference clock.
- set_csi_acquire_config(config: dict)[source]
Sets the CSI acquisition configuration on the ESPARGOS controller.
The controller expects a JSON object (provided here as a Python dict) with integer fields (use 0/1 for booleans). Field names are fixed.
- Boolean toggles:
enable: Enable to acquire CSI.
acquire_csi_legacy: Enable to acquire L-LTF when receiving a 11g PPDU.
acquire_csi_force_lltf: Force receiver to acquire L-LTF, regardless of PPDU type.
compress_csi: Transform CSI to a time-domain CIR before transport.
acquire_csi_ht20: Enable to acquire HT-LTF when receiving an HT20 PPDU.
acquire_csi_ht40: Enable to acquire HT-LTF when receiving an HT40 PPDU.
acquire_csi_vht: Present in the HTTP API; semantics depend on firmware build / PHY mode support.
acquire_csi_su: Enable to acquire HE-LTF when receiving an HE20 SU PPDU.
acquire_csi_mu: Enable to acquire HE-LTF when receiving an HE20 MU PPDU.
acquire_csi_dcm: Enable to acquire HE-LTF when receiving an HE20 DCM applied PPDU.
acquire_csi_beamformed: Enable to acquire HE-LTF when receiving an HE20 Beamformed applied PPDU.
dump_ack_en: Enable to dump 802.11 ACK frame, default disabled.
lltf_8bit_mode: Report L-LTF CSI as 8-bit values for every subcarrier instead of sparse 12-bit values.
- Integer / enum fields:
- acquire_csi_he_stbc_mode: When receiving an STBC applied HE PPDU:
0 = acquire the complete HE-LTF1 1 = acquire the complete HE-LTF2 2 = sample evenly among the HE-LTF1 and HE-LTF2.
val_scale_cfg: Value 0-8.
Example payload:
{ "enable": true, "acquire_csi_legacy": true, "acquire_csi_force_lltf": false, "compress_csi": false, "acquire_csi_ht20": true, "acquire_csi_ht40": true, "acquire_csi_vht": true, "acquire_csi_su": true, "acquire_csi_mu": true, "acquire_csi_dcm": true, "acquire_csi_beamformed": true, "acquire_csi_he_stbc_mode": 2, "val_scale_cfg": 2, "dump_ack_en": true, "lltf_8bit_mode": false }
- Parameters:
config – CSI acquisition configuration dict (will be JSON-encoded and POSTed to /set_csi_acquire_config)
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_gain_settings(settings: dict)[source]
Sets the gain settings on the ESPARGOS controller.
The gain settings are provided as a JSON object (here as a Python dict) with fixed field names. Values may be scalars, or arrays/lists with shape
(2, 4)to configure each sensor individually in board(row, column)order.fft_scale_enable (bool): Enable manual FFT scaling (false = automatic/firmware default).
fft_scale_value (int): FFT scale value (meaning/range depends on firmware; commonly 0 when disabled).
rx_gain_enable (bool): Enable manual RX gain (false = automatic/firmware default).
rx_gain_value (int): RX gain table index, 0..76 (commonly 0 when disabled).
expert_mode_enable (bool): Enable raw expert gain-table entry override.
expert_mode_raw (str): Raw gain-table entry as hexadecimal string.
Example payload:
{ "fft_scale_enable": false, "fft_scale_value": 0, "rx_gain_enable": false, "rx_gain_value": 0 }
- Parameters:
settings – Gain settings dict (will be JSON-encoded and POSTed to /set_gain_settings)
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_mac_filter(mac_filter: dict)[source]
Tell ESPARGOS board to only receive packets from transmitters with this sender MAC.
mac_filter is a dict with the following format:
{ "enable": true|false, "mac": "00:11:22:33:44:55", "mac_mask": "ff:ff:ff:ff:ff:ff" }
The “enable” field toggles MAC filtering. When enabled, only packets from transmitters whose MAC address matches the given “mac” (applying the “mac_mask”) will be received. “mac_mask” is a bitmask applied to both the configured MAC and the sender MAC before comparison. Only provided fields will be changed; others will remain as previously configured.
- Parameters:
mac_filter – MAC filter configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_radar_config(config: dict)[source]
Sets the low-level radar TX configuration on the ESPARGOS controller.
The payload mirrors the controller’s
/set_tx_controlAPI. Supported fields are:rfswitch_state(int)active_by_antid(list[bool], length 8)start_by_antid(list[int], length 8)period_by_antid(list[int], length 8)mac_by_antid(list[str], length 8, MAC addresses like"72:61:64:61:72:00")tx_power(int)tx_phymode(int)tx_rate(int)
Only provided fields are changed; others remain unchanged on the controller.
- Parameters:
config – Radar TX configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_rfswitch(state: rfswitch_state_t)[source]
Sets the RF switch state on the ESPARGOS controller for reception mode.
- Parameters:
state – The RF switch state to set, must be one of
csi.rfswitch_state_t- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_wifi_channel_overrides(settings: dict)[source]
Sets per-sensor WiFi channel overrides on the ESPARGOS controller.
The payload mirrors the controller’s
/set_wifi_channel_overridesAPI:override_active(bool): Enable per-sensor channel overrides.channel-primary(list[int], length 8): Primary WiFi channel for each sensor.channel-secondary(list[int], length 8): Secondary channel selector for each sensor (0 = none, 1 = above, 2 = below).
Passing
{"override_active": False}disables the overrides.- Parameters:
settings – Per-sensor WiFi channel override settings dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- set_wificonf(wificonf: dict)[source]
Sets WiFi configuration on the ESPARGOS controller.
The controller expects a Python dict with fixed field names using hyphenated keys. Expected format:
{ "calib-mode": 1, "calib-source": 0, "channel-primary": 13, "channel-secondary": 2, "country-code": "DE", "calib-txpower": 34, "calib-interval": 10 }
- Field meanings / types (as used by the controller firmware):
“calib-mode” (int): When to generate phase reference packets: - 0: Never generate calibration packets - 1: Generate calibration packets if receiver RF switch is in reference channel configuration - 2: Always generate calibration packets
“calib-source” (int): Configures REFIN / REFOUT ports of controller: - 0: Use internal clock and phase reference for antennas - 1: Output clock and phase reference on REFOUT port, antennas expect clock and calibration from REFIN port (master mode) - 2: Antennas expect clock and calibration from REFIN port, do not output anything on REFOUT (slave mode)
“channel-primary” (int): Primary WiFi channel.
“channel-secondary” (int): Secondary channel selector (e.g. 0 = None, 1 = Above, 2 = Below).
“country-code” (str): Two-letter country code (e.g. “DE”).
“calib-txpower” (int): TX power used for calibration packets (between 8 = 2dBm and 80 = 20dBm).
“calib-interval” (int): Calibration interval (milliseconds).
- Parameters:
wificonf – WiFi configuration dict
- Raises:
EspargosUnexpectedResponseError – If the server at the given host is not an ESPARGOS controller or the request was invalid
- start(transports=None)[source]
Starts the CSI stream thread for the ESPARGOS controller. The thread will run indefinitely until the stop() method is called.
Supported transports:
“udp”: The controller will send CSI packets to a local UDP socket. This transport is lower-latency and more efficient (higher throughput), but may not work in all network environments.
“websocket”: The controller will send CSI packets over a WebSocket connection. This transport is more widely compatible but may have higher latency and overhead.
“uart”: The controller will stream CSI data over the local serial/UART link. This transport is only available for hosts specified as
uart:<port>.
- Parameters:
transports – Optional list of transports to try, in order of preference. Valid values are “udp” and “websocket”. If None (default), tries UDP first (if supported by API version) and then WebSocket.
- Raises:
EspargosCsiStreamConnectionError – If neither UDP nor WebSocket CSI stream could be established
- exception espargos.board.EspargosAPIVersionError[source]
Bases:
ExceptionRaised when the ESPARGOS controller runs an unsupported API version
- exception espargos.board.EspargosCsiStreamConnectionError[source]
Bases:
ExceptionRaised when the CSI stream connection could not be established (e.g. magic packet not received)
- exception espargos.board.EspargosHTTPStatusError(status=None, path=None, body=None)[source]
Bases:
ExceptionRaised when the ESPARGOS HTTP API returns an invalid status code
espargos.pool
- exception espargos.pool.CalibrationError[source]
Bases:
RuntimeErrorRaised when ESPARGOS calibration cannot collect usable calibration CSI.
- class espargos.pool.Pool(boards: list[Board], ota_cache_timeout=5, refgen_boards=None)[source]
Bases:
objectA Pool is a collection of ESPARGOS boards. The pool manages the clustering of CSI data from multiple ESPARGOS sensors (antennas) that belong to the same WiFi packet and provides :class:’cluster.CSICluster’ objects to registered callbacks.
- add_csi_callback(cb: Callable[[CSICluster], None], cb_predicate: Callable[[CSICluster], bool] = None)[source]
Register callback function that is invoked whenever a new CSI cluster is completed.
- Parameters:
cb – The function to call, gets instance of class
cluster.CSIClusteras parametercb_predicate – A function with signature
(csi_cluster)that defines the conditions under which clustered CSI is regarded as completed and thus provided to the callback. Ifcb_predicatereturns true, clustered CSI is regarded as completed. If no predicate is provided, the default behavior is to trigger the callback when CSI has been received from all sensors on all boards. By default, callbacks receive over-the-air/radar CSI only. Enableset_emit_calibration_csi()to also emit calibration CSI (from internal reference generators) through this same callback path.
- Returns:
A callback handle that can be passed to
remove_csi_callback()
- calibrate(per_board=True, duration=2, exithandler=None, cable_lengths=None, cable_velocity_factors=None, run_in_thread=True)[source]
Run calibration for a specified duration.
- Parameters:
per_board – True to calibrate each board separately, False to calibrate all boards together. Set to False if the same phase reference signal is used for all boards, otherwise set to True.
duration – The duration in seconds for which calibration should be run
exithandler – An optional exit handler that can be used to stop calibration prematurely if
exithandler.runningis set to False in a separate threadcable_lengths – The lengths of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards, in meters. Only needed for phase-coherent multi-board setups, omit if all cables have the same length.
cable_velocity_factors – The velocity factors of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards Must be the same length as
cable_lengths, and all entries should be in the range [0, 1].run_in_thread – If True, the pool handling will be performed in the current thread. Set to False in case the pool is already running in a separate thread (e.g., backlog is already active).
- get_calibration()[source]
Get the stored calibration values.
- Returns:
The stored calibration values as a
calibration.CSICalibrationobject
- get_cfo_correction() dict[source]
Return CFO correction config; sanity-check all boards report the same value.
- get_csi_acquire_config() dict[source]
Return CSI acquire config; sanity-check all boards report the same value.
- get_emit_calibration_csi() bool[source]
Return whether calibration CSI clusters are emitted through normal CSI callbacks.
- get_gain_settings() dict[source]
Return gain settings; sanity-check all boards report the same value.
- get_mac_filter() dict[source]
Return MAC filter configuration; sanity-check all boards report the same value.
This is forwarded to
pyespargos.board.Board.get_mac_filter()for each board.
- get_radar_config() dict[source]
Return radar TX configuration; sanity-check all boards report the same value.
- get_rfswitch() rfswitch_state_t[source]
Get RF switch state from the first board in the pool.
- Returns:
The RF switch state of the first board in the pool
- get_shape()[source]
Get the outer shape of the stored data, i.e., only the antenna dimensions and not subcarrier dimensions or similar.
- get_wifi_channel_overrides() dict[source]
Return per-sensor WiFi channel overrides; sanity-check all boards report the same value.
- get_wificonf() dict[source]
Return WiFi config; sanity-check boards report the same value.
Consistency check ignores “calib-source” and “calib-mode” (they may legitimately differ).
- remove_csi_callback(callback: _CSICallback) bool[source]
Remove a CSI callback previously returned by
add_csi_callback().- Parameters:
callback – Callback handle returned by
add_csi_callback()- Returns:
True if the callback was registered and removed, False otherwise
- run()[source]
Process incoming CSI data packets and call registered callbacks if CSI clusters are complete. Repeatedly call this function from your main loop or from a separate thread. May block for a short amount of time if no data is available.
- set_cfo_correction(auto: bool, value: int = 0)[source]
Configure CFO correction on all boards in this pool.
- set_csi_acquire_config(config: dict)[source]
Set CSI acquisition configuration on all boards in this pool and sanity-check that all boards end up with the same config.
This is forwarded to
pyespargos.board.Board.set_csi_acquire_config()for each board. For the expected JSON/dict format, refer to that method’s documentation.- Parameters:
config – CSI acquisition configuration dict to apply to all boards.
- Raises:
ValueError – If boards in the pool disagree on the resulting config after applying.
EspargosUnexpectedResponseError – If any board returns an unexpected response.
- set_emit_calibration_csi(enabled: bool)[source]
Control whether calibration CSI clusters are emitted through normal CSI callbacks.
Calibration clusters remain marked as calibration packets via
espargos.cluster.CSICluster.is_calib().
- set_gain_settings(settings: dict)[source]
Set gain settings on all boards in this pool.
This is forwarded to
pyespargos.board.Board.set_gain_settings()for each board. Values may be scalars, board-local(row, column)arrays applied to every board, or pool-wide(board, row, column)arrays.- Parameters:
settings – Gain settings dict to apply to all boards.
- Raises:
EspargosUnexpectedResponseError – If any board returns an unexpected response.
- set_mac_filter(mac_filter: dict)[source]
Set the MAC address filter for all boards in the pool. Will only accept packets from the specified MAC address.
This is forwarded to
pyespargos.board.Board.set_mac_filter()for each board.
- set_radar_config(config: dict | RadarPoolConfig)[source]
Set radar TX configuration on the boards in this pool.
configmay either be a single controller config dict applied to every board, or apyespargos.espargos.radar.RadarPoolConfigcontaining one config per board.
- set_rfswitch(state: rfswitch_state_t)[source]
Set RF switch state for all boards in the pool.
- Parameters:
state – The RF switch state to set, must be one of
csi.rfswitch_state_t
- set_wifi_channel_overrides(settings: dict)[source]
Set per-sensor WiFi channel overrides on all boards in this pool and sanity-check that all boards end up with the same settings.
This is forwarded to
pyespargos.board.Board.set_wifi_channel_overrides()for each board. For the expected JSON/dict format, refer to that method’s documentation.- Parameters:
settings – Per-sensor WiFi channel override settings dict to apply to all boards.
- Raises:
ValueError – If boards in the pool disagree on the resulting settings after applying.
EspargosUnexpectedResponseError – If any board returns an unexpected response.
- set_wificonf(wificonf: dict)[source]
Set WiFi config on all boards and sanity-check resulting configs match across boards.
This is forwarded to
pyespargos.board.Board.set_wificonf()for each board. For the expected JSON/dict format, refer to that method’s documentation.The values of “calib-source” and “calib-mode” are ignored and not propagated to the pool. If you need to set those, call
pyespargos.board.Board.set_wificonf()on each board individually. Consistency check also ignores “calib-source” and “calib-mode” (they may legitimately differ).- Parameters:
wificonf – WiFi configuration dict to apply to all boards.
- Raises:
ValueError – If boards in the pool disagree on the resulting config after applying (excluding ignored keys).
EspargosUnexpectedResponseError – If any board returns an unexpected response.
espargos.backlog
- class espargos.backlog.BacklogFilter[source]
Bases:
objectBase class for CSI backlog filters.
Subclasses implement
matches()to decide whether a clustered CSI frame should be admitted to the backlog.
- class espargos.backlog.CSIBacklog(pool, fields=None, calibrate=True, cb_predicate=None, size=100)[source]
Bases:
objectCSI backlog class. Stores CSI data in a ringbuffer for processing when needed.
- Parameters:
pool – CSI pool object to collect CSI data from
fields – List of fields to store (default: all), e.g., [“lltf”, “ht40”, “rssi”, “rx_gain”, “fft_gain”, “cfo”, “lltf_8bit_mode”, “timestamp”, “host_timestamp”, “mac”, “radar_tx_timestamp”, “radar_tx_index”, “radar_tx_power”, “radar_tx_rfswitch_state”]
calibrate – Apply calibration to CSI data (default: True)
cb_predicate – A function that defines the conditions under which clustered CSI is regarded as completed and thus added to the backlog. See
espargos.pool.Pool.add_csi_callback()for more details.size – Size of the ringbuffer (default: 100)
- DATA_FORMATS = {'cfo': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'fft_gain': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'he20': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (245,)}, 'host_timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': False, 'shape': ()}, 'ht20': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (57,)}, 'ht40': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (117,)}, 'lltf': {'dtype': <class 'numpy.complex64'>, 'per_antenna': True, 'shape': (53,)}, 'lltf_8bit_mode': {'dtype': <class 'numpy.bool'>, 'per_antenna': True, 'shape': ()}, 'mac': {'dtype': <class 'numpy.uint8'>, 'per_antenna': False, 'shape': (6,)}, 'radar_tx_index': {'dtype': <class 'numpy.int16'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_power': {'dtype': <class 'numpy.int16'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_rfswitch_state': {'dtype': <class 'numpy.uint8'>, 'per_antenna': False, 'shape': ()}, 'radar_tx_timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': False, 'shape': ()}, 'rfswitch_state': {'dtype': <class 'numpy.uint8'>, 'per_antenna': True, 'shape': ()}, 'rssi': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'rx_gain': {'dtype': <class 'numpy.float32'>, 'per_antenna': True, 'shape': ()}, 'timestamp': {'dtype': <class 'numpy.float64'>, 'per_antenna': True, 'shape': ()}}
- add_filter(backlog_filter)[source]
Add a filter to the backlog.
- Parameters:
backlog_filter – Instance of
BacklogFilter
- add_update_callback(cb)[source]
Add a callback that is called when new CSI data is added to the backlog
- clear()[source]
Clear all stored CSI datapoints from the ringbuffer.
This is useful after changing calibration or other receiver settings: entries already in the backlog were stored with the old interpretation and must not be mixed with freshly received CSI.
- count_valid_datapoints(key: str, allow_incomplete: bool = False) int[source]
Count datapoints for which a particular stored field (key) is valid (finite and nonzero).
The first axis of every backlog field is the datapoint axis. If
allow_incompleteis false, only datapoints with all finite values are counted. If true, datapoints with at least one finite value are counted. Non-floating fields are always finite, so each stored entry counts.- Parameters:
key – Key of the data to inspect.
allow_incomplete – Whether partially valid datapoints count.
- get(key)[source]
Retrieve data from the ringbuffer
- Parameters:
key – Key of the data to retrieve (e.g., “lltf”, “ht40”, “rssi”, etc.)
- Returns:
Data corresponding to the key, oldest first
- get_fields()[source]
Get the list of fields currently stored in the backlog.
- Returns:
List of fields currently stored in the backlog
- get_filters()[source]
Get the list of currently active backlog filters.
- Returns:
List of
BacklogFilterinstances
- get_latest(key)[source]
Retrieve the latest value for a key in the ringbuffer.
- Parameters:
key – Key of the data to retrieve
- Returns:
Latest value, or None if no data is available
- get_multiple(keys)[source]
Retrieve multiple data fields from the ringbuffer. You must use get_multiple to ensure consistency of data across multiple keys.
- Parameters:
keys – List of keys of the data to retrieve (e.g., [“lltf”, “ht40”, “rssi”], etc.)
- Returns:
Tuple of data arrays corresponding to the keys (in same order), contents are oldest first
- remove_filter(backlog_filter)[source]
Remove a previously added filter from the backlog.
- Parameters:
backlog_filter – Instance of
BacklogFilter
- set_fields(new_fields)[source]
Set the fields to be stored in the backlog. Existing data will be preserved for fields that are still present.
- Parameters:
new_fields – New list of fields to store
- class espargos.backlog.Exclude11bFilter[source]
Bases:
BacklogFilterBacklog filter that drops 802.11b packets, which do not carry CSI.
- class espargos.backlog.MacFilter(filter_regex)[source]
Bases:
BacklogFilterBacklog filter that matches source MAC addresses against a regular expression.
- Parameters:
filter_regex – Regular expression applied to the source MAC string
espargos.calibration
- class espargos.calibration.CSICalibration(boards: list[Board], channel_primary: int, channel_secondary: int, calibration_values_lltf: ndarray, calibration_values_ht20: ndarray, calibration_values_ht40: ndarray, calibration_values_he20: ndarray | None, sensor_clock_offsets: ndarray, board_cable_lengths=None, board_cable_vfs=None)[source]
Bases:
object- apply_he20(values: ndarray) ndarray[source]
Apply phase calibration to the provided HE20 CSI data by oversampling the stored HT20 calibration.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HE20_COEFFICIENTS_PER_CHANNEL)- Returns:
The phase-calibrated CSI data
- apply_ht20(values: ndarray) ndarray[source]
Apply phase calibration to the provided HT20 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL)- Returns:
The phase-calibrated CSI data
- apply_ht40(values: ndarray) ndarray[source]
Apply phase calibration to the provided HT40 CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)- Returns:
The phase-calibrated CSI data
- apply_lltf(values: ndarray) ndarray[source]
Apply phase calibration to the provided L-LTF CSI data. Also accounts for subcarrier-specific phase offsets, e.g., due to low-pass filter characteristic of baseband signal path inside the ESP32.
- Parameters:
values – The CSI data to which the phase calibration should be applied, as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)- Returns:
The phase-calibrated CSI data
- time_to_sensor_time(time)[source]
Convert a reference time into the corresponding local time for each sensor.
The provided reference time is interpreted relative to sensor 0. The return value therefore contains one time per sensor, offset by
sensor_clock_offsets.- Parameters:
time – Reference time in seconds, relative to sensor 0. May be a scalar or an array broadcastable to the sensor layout.
- Returns:
Per-sensor time values as a numpy array with shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
espargos.cluster
- class espargos.cluster.CSICluster(source_mac: str, dest_mac: str, seq_ctrl: seq_ctrl_t, board_revisions: list[BoardRevision])[source]
Bases:
objectA CSICluster object represents a collection of CSI data estimated for the same WiFi packet.
The class clusters the CSI data from multiple ESPARGOS sensors (antennas), which may belong to the same or different ESPARGOS boards. It is used to store CSI data until it is complete and can be provided to a callback. CSI data may be from calibration packets or over-the-air packets.
- add_csi(board_num: int, esp_num: int, serialized_csi: serialized_csi_tlv_t)[source]
Add CSI data to the cluster.
- Parameters:
board_num – The number of the ESPARGOS board that received the CSI data
esp_num – The number of the ESPARGOS sensor within that board that received the CSI data
serialized_csi – The serialized CSI data
csi_cplx – The complex-valued CSI data
- deserialize_csi_he20ltf()[source]
Deserialize the HE20 HE-LTF part of the CSI data.
The internal HE20 ordering is ascending subcarrier index
-122..122. The invalid / null tones-1, 0, 1are explicitly zeroed because the raw PHY payload may contain meaningless values there.
- deserialize_csi_ht20ltf()[source]
Deserialize the HT20 (HT-LTF without channel bonding) part of the CSI data.
- Returns:
The HT-LTF part of the CSI data as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL)
- deserialize_csi_ht40ltf()[source]
Deserialize the HT40 (HT-LTF with channel bonding) part of the CSI data.
- Returns:
The HT-LTF part of the CSI data as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.HT_COEFFICIENTS_PER_CHANNEL + csi.HT40_GAP_SUBCARRIERS + csi.HT_COEFFICIENTS_PER_CHANNEL)
- deserialize_csi_lltf()[source]
Deserialize the L-LTF part of the CSI data.
- Returns:
The L-LTF part of the CSI data as a complex-valued numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW, csi.LEGACY_COEFFICIENTS_PER_CHANNEL)
- get_age()[source]
Get the age of the CSI data, in seconds.
The age is only approximate, it is based on the timestamp when the
CSIClusterobject was created, not on the sensor timestamps.- Returns:
The age of the CSI data, in seconds
- get_completion()[source]
Get the completion state of the CSI data.
- Returns:
A boolean numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)that indicates which sensors have provided CSI data
- get_completion_all()[source]
Get the global completion state of the CSI data, i.e., whether all sensors have provided CSI data.
- Returns:
True if all sensors have provided CSI data, False otherwise
- get_gain_table_entry_raw()[source]
Get the raw 12-byte ESP32-C61 PHY gain-table entry used for each received packet.
- get_gain_table_entry_valid()[source]
Return a mask indicating whether a raw gain-table entry was reported for each sensor.
- get_host_timestamp()[source]
Get the timestamp at which the
CSIClusterobject was created, which is approximately when the first sensor received the CSI data.- Returns:
The timestamp at which the first sensor received the CSI data, in seconds since the epoch
- get_lltf_8bit_mode()[source]
Get whether each sensor reported LLTF CSI in 8-bit mode for this packet.
- get_noise_floor()[source]
Get the noise floor of the WiFi packet.
- Returns:
The noise floor of the WiFi packet
- get_primary_channel() int[source]
Get the primary channel number.
- Returns:
The primary channel number
- get_radar_tx_index() int[source]
Return the flattened TX sensor index derived from the CSI stream UID, or -1 if unknown.
- get_radar_tx_info()[source]
Return the transmit-side radar report for this packet, or None if not available.
- get_rfswitch_state()[source]
Get the RF switch state of all sensors when the WiFi packet was received.
- get_secondary_channel() int[source]
Get the secondary channel number.
- Returns:
The secondary channel number
- get_secondary_channel_relative()[source]
Get the relative position of the secondary channel with respect to the primary channel.
- Returns:
0 if no secondary channel is used, 1 if the secondary channel is above the primary channel, -1 if the secondary channel is below the primary channel
- get_sensor_timestamps()[source]
Get the (nanosecond-precision) timestamps at which the WiFi packet was sampled by the sensors. This timestamp does not include the offset that the chip derived from the CSI, it is only the sampling start time.
- Returns:
A numpy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)that contains the sensor timestamps in seconds
- get_seq_ctrl()[source]
Get the sequence control field of the WiFi packet.
- Returns:
The sequence control field of the WiFi packet
- get_source_mac()[source]
Get the source MAC address of the WiFi packet.
- Returns:
The source MAC address of the WiFi packet
- has_he20ltf() bool[source]
Check if HE20 HE-LTF channel estimates are available for all complete sensors.
- has_ht20ltf() bool[source]
Check if HT20 (HT-LTF without channel bonding) channel estimates are available for all complete sensors.
- Returns:
True if there is HT20 CSI data for all complete sensors, False otherwise
- has_ht40ltf() bool[source]
Check if HT40 (HT-LTF with 40MHz channel bonding) channel estimates are available for all complete sensors.
- Returns:
True if there is HT40 CSI data for all complete sensors, False otherwise
- has_lltf() bool[source]
Check if L-LTF channel estimates are available for all complete sensors.
- Returns:
True if there is L-LTF CSI data for all complete sensors, False otherwise
- has_radar_tx_report() bool[source]
Check whether this cluster has transmit-side radar metadata attached.
- is_11b() bool[source]
Check whether this packet uses the 802.11b baseband format.
- Returns:
True if the packet is 802.11b, False otherwise
- set_radar_tx_report(radar_tx_report: radar_tx_report_tlv_t, board_num: int | None = None, esp_num: int | None = None)[source]
Attach radar transmit metadata for the Wi-Fi packet represented by this cluster.
espargos.constants
- espargos.constants.ANTENNAS_PER_BOARD = 8
Number of antennas on one board
- espargos.constants.ANTENNAS_PER_ROW = 4
Number of antennas per row / per SPI controller on the board
- espargos.constants.ANTENNA_JONES_CROSSPOL_MATRIX = array([[0.69141227+0.j , 0.10475943+0.10475943j], [0.10475943+0.10475943j, 0.69141227+0.j ]])
Empirically determined cross-polarization component (due to intentional elliptical antenna polarization) of the Jones matrix
- espargos.constants.ANTENNA_JONES_MATRIX = array([[-0.4148262 +0.07407611j, 0.4148262 -0.07407611j], [ 0.56297841+0.07407611j, 0.56297841+0.07407611j]])
Jones matrix to convert from linear (H/V) to feed (R/L) polarization basis
- espargos.constants.ANTENNA_JONES_MATRIX_SIMPLE = array([[-0.70710678, 0.70710678], [ 0.70710678, 0.70710678]])
Simple Jones matrix to convert from linear (H/V) to feed (R/L) polarization basis
- espargos.constants.ANTENNA_SEPARATION = 0.06
Distance between the centers of two antennas [m]
- espargos.constants.FFT_GAIN_DB_PER_UNIT = 0.25
Reported FFT gain step size in dB
- espargos.constants.ROWS_PER_BOARD = 2
Number of rows / SPI controllers on the board
- espargos.constants.RX_GAIN_DB_PER_UNIT = 1.0
Reported RX gain step size in dB
- espargos.constants.SPEED_OF_LIGHT = 299792458
Speed of light in a vacuum
- espargos.constants.WIFI_CHANNEL1_FREQUENCY = 2412000000.0
Frequency of channel 1 in 2.4 GHz WiFi
- espargos.constants.WIFI_CHANNEL_SPACING = 5000000.0
Frequency spacing of WiFi channels
- espargos.constants.WIFI_SUBCARRIER_SPACING = 312500.0
Subcarrier spacing of WiFi (in Hz)
espargos.csi
- espargos.csi.HE20_COEFFICIENTS_PER_CHANNEL = 245
Number of channel coefficients (active plus invalid subcarriers) for a 20 MHz HE-LTF
- espargos.csi.HT40_GAP_SUBCARRIERS = 3
Gap between primary and secondary channel in HT40 mode, in subcarriers
- espargos.csi.HT_COEFFICIENTS_PER_CHANNEL = 57
Number of channel coefficients (active subcarriers) per Wi-Fi channel in HT mode (HT-LTF)
- espargos.csi.LEGACY_COEFFICIENTS_PER_CHANNEL = 53
Number of channel coefficients (active subcarriers) per Wi-Fi channel in legacy mode (L-LTF)
- class espargos.csi.compressed_rx_ctrl_t(buf=None)[source]
Bases:
Structure- cfo_high_rate
Structure/Union member
- cfo_low_rate
Structure/Union member
- channel
Structure/Union member
- cur_bb_format
Structure/Union member
- fft_gain
Structure/Union member
- flags
Structure/Union member
- he_sig1_mcs
Structure/Union member
- noise_floor
Structure/Union member
- rate
Structure/Union member
- reserved
Structure/Union member
- rssi
Structure/Union member
- rx_channel_estimate_len
Structure/Union member
- rx_gain
Structure/Union member
- rxstart_time_cyc
Structure/Union member
- secondary_channel
Structure/Union member
- sig_mode
Structure/Union member
- timestamp
Structure/Union member
- class espargos.csi.csistream_fragment_header_t(buf=None)[source]
Bases:
Structure- fragment_index
Structure/Union member
- size
Structure/Union member
- total_fragments
Structure/Union member
- uid
Structure/Union member
- espargos.csi.decode_compressed_lltf(buf, acquire_force_lltf: bool = False, lltf_8bit_mode: bool = False, **kwargs) ndarray[source]
- espargos.csi.deserialize_packet_buffer(revision, pktbuf)[source]
Deserialize a raw stream payload into the appropriate packet structure based on the type header.
- espargos.csi.gain_byte_to_signed(value: int) int[source]
Interpret an 8-bit gain value reported in
rx_ctrlas signed.The hardware stores gain fields as bytes. RX gain normally lives in the positive gain-table range, while FFT gain can be negative and is therefore reported in two’s-complement form.
- espargos.csi.get_cfo_from_rx_ctrl(rx_ctrl) int[source]
Compute the CFO value (in Hz) from a wifi_pkt_rx_ctrl_v3_t byte buffer.
This was reverse engineered from librftest.a (bb_common.o) and libphy.a (phy_feature.o).
Espressif’s RX metadata exposes two CFO fields. The low-rate field is used when the derived
rate_indexis below 8 (802.11b); otherwise the high-rate field is used (802.11g/n/ax).
- espargos.csi.interpolate_he20ltf_gaps(csi_he20: ndarray) None[source]
Fill the HE20 invalid subcarriers
-1, 0, 1by linear interpolation in place.- Parameters:
csi_he20 – Complex HE20-LTF CSI array. The last dimension must contain
HE20_COEFFICIENTS_PER_CHANNELsubcarriers in ascending order-122..122. Any leading dimensions are preserved.
- espargos.csi.interpolate_ht20ltf_gap(csi_ht20: ndarray) None[source]
Fill the HT20-LTF DC subcarrier by linear interpolation in place.
- Parameters:
csi_ht20 – Complex HT20-LTF CSI array. The last dimension must contain
HT_COEFFICIENTS_PER_CHANNELsubcarriers in ascending order-28..28. Any leading dimensions are preserved.
- espargos.csi.interpolate_ht40ltf_gap(csi_ht40: ndarray) None[source]
Fill the three HT40 gap subcarriers between primary and secondary channel in place.
- Parameters:
csi_ht40 – Complex HT40-LTF CSI array. The last dimension must contain
2 * HT_COEFFICIENTS_PER_CHANNEL + HT40_GAP_SUBCARRIERSsubcarriers in ascending order-58..58. Any leading dimensions are preserved.
- espargos.csi.interpolate_lltf_gap(csi_lltf: ndarray) None[source]
Fill the L-LTF DC subcarrier by linear interpolation in place.
- Parameters:
csi_lltf – Complex L-LTF CSI array. The last dimension must contain
LEGACY_COEFFICIENTS_PER_CHANNELsubcarriers in ascending order-26..26. Any leading dimensions are preserved.
- class espargos.csi.radar_tx_report_tlv_t(buf=None)[source]
Bases:
object- get_hardware_tx_phase_raw() int[source]
Extract the apparent signed 11-bit phase-ish field from timestamp register 2.
- get_hardware_tx_timestamp_ns() float[source]
Decode the raw ESP32-C61 TX timestamp registers into sensor-local nanoseconds.
The wire format intentionally keeps the raw register values. This helper mirrors the low-level recovery formula from the firmware so analysis code can choose whether and how to use the decoded timestamp.
- property has_hardware_tx_timestamp
- property tx_succeeded
- class espargos.csi.rfswitch_state_t(*values)[source]
Bases:
IntEnum- SENSOR_RFSWITCH_ANTENNA_L = 3
- SENSOR_RFSWITCH_ANTENNA_R = 2
- SENSOR_RFSWITCH_ANTENNA_RANDOM = 4
- SENSOR_RFSWITCH_ISOLATION = 0
- SENSOR_RFSWITCH_REFERENCE = 1
- SENSOR_RFSWITCH_UNKNOWN = 255
- class espargos.csi.seq_ctrl_t(buf=None)[source]
Bases:
StructureA ctypes structure representing the sequence control field of a Wi-Fi packet.
This structure is used to store the sequence control field of a Wi-Fi packet, which contains the fragment number and the segment number.
- frag
Structure/Union member
- seg
Structure/Union member
- class espargos.csi.serialized_csi_tlv_t(buf=None)[source]
Bases:
object- property acquire_force_lltf
- property acquire_lltf_8bit_mode
- property acquire_lltf_bit_mode
- property first_word_invalid
- property is_calib
- property is_compressed
- property is_radar
- class espargos.csi.wifi_phy_mode_t(*values)[source]
Bases:
IntEnum- WIFI_PHY_MODE_11A = 3
- WIFI_PHY_MODE_11B = 1
- WIFI_PHY_MODE_11G = 2
- WIFI_PHY_MODE_HE20 = 6
- WIFI_PHY_MODE_HT20 = 4
- WIFI_PHY_MODE_HT40 = 5
- WIFI_PHY_MODE_LR = 0
- WIFI_PHY_MODE_VHT20 = 7
- class espargos.csi.wifi_phy_rate_t(*values)[source]
Bases:
IntEnum- WIFI_PHY_RATE_11M_L = 3
- WIFI_PHY_RATE_11M_S = 7
- WIFI_PHY_RATE_12M = 10
- WIFI_PHY_RATE_18M = 14
- WIFI_PHY_RATE_1M_L = 0
- WIFI_PHY_RATE_24M = 9
- WIFI_PHY_RATE_2M_L = 1
- WIFI_PHY_RATE_2M_S = 5
- WIFI_PHY_RATE_36M = 13
- WIFI_PHY_RATE_48M = 8
- WIFI_PHY_RATE_54M = 12
- WIFI_PHY_RATE_5M_L = 2
- WIFI_PHY_RATE_5M_S = 6
- WIFI_PHY_RATE_6M = 11
- WIFI_PHY_RATE_9M = 15
- WIFI_PHY_RATE_LORA_250K = 41
- WIFI_PHY_RATE_LORA_500K = 42
- WIFI_PHY_RATE_MAX = 43
- WIFI_PHY_RATE_MCS0_LGI = 16
- WIFI_PHY_RATE_MCS0_SGI = 26
- WIFI_PHY_RATE_MCS1_LGI = 17
- WIFI_PHY_RATE_MCS1_SGI = 27
- WIFI_PHY_RATE_MCS2_LGI = 18
- WIFI_PHY_RATE_MCS2_SGI = 28
- WIFI_PHY_RATE_MCS3_LGI = 19
- WIFI_PHY_RATE_MCS3_SGI = 29
- WIFI_PHY_RATE_MCS4_LGI = 20
- WIFI_PHY_RATE_MCS4_SGI = 30
- WIFI_PHY_RATE_MCS5_LGI = 21
- WIFI_PHY_RATE_MCS5_SGI = 31
- WIFI_PHY_RATE_MCS6_LGI = 22
- WIFI_PHY_RATE_MCS6_SGI = 32
- WIFI_PHY_RATE_MCS7_LGI = 23
- WIFI_PHY_RATE_MCS7_SGI = 33
- WIFI_PHY_RATE_MCS8_LGI = 24
- WIFI_PHY_RATE_MCS8_SGI = 34
- WIFI_PHY_RATE_MCS9_LGI = 25
- WIFI_PHY_RATE_MCS9_SGI = 35
- class espargos.csi.wifi_pkt_rx_ctrl_v3_t(buf=None)[source]
Bases:
StructureA ctypes structure representing the wifi_pkt_rx_ctrl_t as provided by the ESP32. See the related esp-idf code for details.
Variant for Espressif PHY version 3.
- cfo_high_rate
Structure/Union member
- cfo_low_rate
Structure/Union member
- channel
Structure/Union member
- cur_bb_format
Structure/Union member
- data_rssi
Structure/Union member
- dump_len
Structure/Union member
- fft_gain
Structure/Union member
- he_siga1
Structure/Union member
- he_siga2
Structure/Union member
- is_group
Structure/Union member
- lsig_len
Structure/Union member
- lsig_reserved
Structure/Union member
- noise_floor
Structure/Union member
- rate
Structure/Union member
- rssi
Structure/Union member
- rx_channel_estimate_info_vld
Structure/Union member
- rx_channel_estimate_len
Structure/Union member
- rx_gain
Structure/Union member
- rx_state
Structure/Union member
- rxend_state
Structure/Union member
- rxmatch0
Structure/Union member
- rxmatch1
Structure/Union member
- rxmatch2
Structure/Union member
- rxmatch3
Structure/Union member
- rxstart_time_cyc
Structure/Union member
- rxstart_time_cyc_dec
Structure/Union member
- second
Structure/Union member
- sig_len
Structure/Union member
- sig_mode
Structure/Union member
- sigb_len
Structure/Union member
- timestamp
Structure/Union member
- class espargos.csi.wifi_rx_bb_format_t(*values)[source]
Bases:
IntEnum- RX_BB_FORMAT_11A = 1
- RX_BB_FORMAT_11B = 0
- RX_BB_FORMAT_11G = 1
- RX_BB_FORMAT_HE_ERSU = 6
- RX_BB_FORMAT_HE_MU = 5
- RX_BB_FORMAT_HE_SU = 4
- RX_BB_FORMAT_HE_TB = 7
- RX_BB_FORMAT_HT = 2
- RX_BB_FORMAT_VHT = 3
- class espargos.csi.wifi_sig_mode_t(*values)[source]
Bases:
IntEnum- SIG_MODE_HE = 2
- SIG_MODE_HT = 1
- SIG_MODE_LEGACY = 0
- SIG_MODE_VHT = 3
- class espargos.csi.wifi_tx_power_t(*values)[source]
Bases:
IntEnum- WIFI_TX_POWER_11_DBM = 44
- WIFI_TX_POWER_13_DBM = 52
- WIFI_TX_POWER_14_DBM = 56
- WIFI_TX_POWER_15_DBM = 60
- WIFI_TX_POWER_16_5_DBM = 66
- WIFI_TX_POWER_18_DBM = 72
- WIFI_TX_POWER_20_DBM = 80
- WIFI_TX_POWER_2_DBM = 8
- WIFI_TX_POWER_5_DBM = 20
- WIFI_TX_POWER_7_DBM = 28
- WIFI_TX_POWER_8_5_DBM = 34
espargos.exithandler
espargos.radar
- class espargos.radar.RadarPoolConfig(board_configs: list[dict])[source]
Bases:
objectLow-level radar configuration for an entire pool, one controller config per board.
- board_configs: list[dict]
- espargos.radar.build_pool_config(calibration: CSICalibration, active_by_sensor, t0_by_sensor, period_by_sensor, tx_power: wifi_tx_power_t, tx_phymode: wifi_phy_mode_t, tx_rate: wifi_phy_rate_t, rfswitch_state: rfswitch_state_t, mac_by_sensor=None) RadarPoolConfig[source]
Build low-level per-board radar controller configuration from a reference-time schedule.
active_by_sensor,t0_by_sensorandperiod_by_sensormay be scalars, board-local(row, column)arrays, or pool-wide(board, row, column)arrays.t0_by_sensoris interpreted as a reference time in seconds relative to sensor 0. The returned board configs containstart_by_antidvalues in controller units (currently integer microseconds), converted to each sensor’s local clock using the storedsensor_clock_offsetsfrom the provided calibration.
- espargos.radar.correct_radar_csi_tx_timestamps(csi_data: ndarray, tx_timestamps_s, tx_sensor_indices, subcarrier_frequencies_hz, sensor_clock_offsets, *, tx_timestamp_offset_s: float = 0.0) ndarray[source]
Apply radar TX timestamp phase correction to frequency-domain CSI.
The CSI deserialization path already applies the receive-side timestamp/STO correction. This helper applies the complementary transmit-side correction using the radar TX report timestamp.
tx_timestamps_sare sensor-local TX timestamps; they are converted into the calibration reference clock by subtracting the transmitting sensor’s clock offset.subcarrier_frequencies_hzmay be absolute RF frequencies or baseband subcarrier offsets. For consistency with the existing CSI deserialization timestamp correction, callers usually want baseband offsets.The subcarrier axis is expected to be the last axis of
csi_data.- Parameters:
csi_data – Complex CSI array with subcarriers on the last axis.
tx_timestamps_s – TX timestamp(s), in seconds. Scalar or leading-shape array.
tx_sensor_indices – Flattened TX sensor index/indices matching
tx_timestamps_s.subcarrier_frequencies_hz – Frequency for each subcarrier, in Hz.
sensor_clock_offsets – Sensor clock offsets, in seconds, as an array that flattens in the same order as
tx_sensor_indices.tx_timestamp_offset_s – Constant offset added to each TX timestamp before correction. This accounts for packet-boundary conventions in the hardware TX timestamp source.
- Returns:
Corrected CSI array with the same shape as
csi_data.
- espargos.radar.ftm_get_phy_comp(*, responder: bool, primary_channel: int, secondary_channel: int = 0, mode: int = 0, sta_connected: bool = False) int[source]
Reimplementation of ESP32-C61
ftm_get_phy_compfrom the Wi-Fi blob.This raw-units version is kept as a reference to the recovered original control flow. New code should prefer
get_ftm_tx_timestamp_reciprocity_delay_s(), which expresses the same logic in a table-driven form and returns seconds directly.The returned value is in raw FTM timestamp units. Multiply by
FTM_TIMESTAMP_UNIT_Sto convert to seconds.secondary_channelfollows ESP-IDF’swifi_second_chan_tconvention:0none,1above,2below.sta_connectedonly affects the initiator/nonzero-mode branch in the recovered implementation.
- espargos.radar.get_ftm_tx_timestamp_reciprocity_delay_s(*, responder: bool, primary_channel: int, secondary_channel: int = 0, home_channel_ht40: bool = False, mode: int | None = None, sta_connected: bool = False) float[source]
Return the PHY reciprocity delay used to align TX/RX FTM timestamps.
This is the same compensation modeled by
ftm_get_phy_comp(), but expressed in terms of role, HT40 home-channel state, channel layout, and channel group instead of reproducing the recovered branch tree. The returned value is in seconds.home_channel_ht40is intentionally separate fromsecondary_channel. The blob does not use the nonzeromodebranch as a generic “secondary channel present” test; it switches into a distinct PHY compensation regime used when the current/home channel context is HT40. That is why this flag remains separate even thoughprimary_channelandsecondary_channelare also provided.modeis kept as a backward-compatible alias for the recovered blob parameter. When provided, only its zero/nonzero state matters.secondary_channelfollows ESP-IDF’swifi_second_chan_tconvention:0none,1above,2below.
espargos.revisions
- class espargos.revisions.BoardRevision[source]
Bases:
object- antid_to_row_col(antid: int) tuple[source]
Convert firmware antenna id to (row, column) on this board revision.
- property calib_trace_delays: list
Calibration trace signal delays on ESPARGOS PCB [in s]
- property identification: tuple
- sensor_values_to_antid_list(values, name: str = 'values') list[source]
Convert a board-local (row, column) array to firmware antenna-id order.
- property serialized_csi_t: type
- property type_header: int
espargos.uart
- class espargos.uart.UARTClient(host: str, *, timeout: float = 5.0)[source]
Bases:
object- request(method: str, path: str, body: bytes | str | None = None, timeout: float | None = None) UARTControlResponse[source]
- class espargos.uart.UARTControlResponse(status: int, content_type: str, body: bytes)[source]
Bases:
object
- exception espargos.uart.UARTProtocolError[source]
Bases:
ExceptionRaised when the ESPARGOS UART protocol encounters malformed frames or unexpected responses.
espargos.util
- class espargos.util.AntennaOrientation(*values)[source]
Bases:
EnumOrientation of a sub-array antenna in the combined array. Describes how the board-local coordinate system maps to the combined array coordinate system. The naming convention uses compass directions: the orientation indicates which direction the board’s row axis (short axis of the 2x4 sub-array) points in the combined array. The N orientation corresponds to the “default” orientation of the board (4 antennas for azimuth, 2 for elevation), where the row axis points downwards and the column axis points to the right (typically used when mounted on tripod).
- E = ((0, 1), (-1, 0))
- N = ((1, 0), (0, 1))
- S = ((-1, 0), (0, -1))
- W = ((0, -1), (1, 0))
- espargos.util.build_combined_array_data(indexing_matrix, input_data)[source]
Helper for combined array setups. Re-structures data from multiple subarrays into a single large array, using the provided indexing matrix. Typically, the input data is the CSI data of the subarrays, but it can also be anything else with shape (datapoints, boards, rows, columns, …).
- Parameters:
indexing_matrix – The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.
input_data – The data of the subarrays. Complex-valued NumPy array with shape (datapoints, boards, rows, columns, …).
- Returns:
The combined array data. Complex-valued NumPy array with shape (datapoints, rows, columns, subcarriers).
- espargos.util.build_jones_matrices(antenna_orientations: ndarray, base_jones_matrix: ndarray = None)[source]
Build per-antenna effective Jones matrices for a combined array, accounting for the physical rotation of each sub-array.
The effective Jones matrix for each antenna maps from the R/L feed basis to the global H/V linear polarization basis, taking into account the antenna’s physical orientation.
The physical model: the R/L feed probes are mounted on the antenna and rotate with it, while the incoming field has fixed global H/V polarization. Rotating the antenna by angle \(\theta\) therefore acts in the feed (output) space of the Jones matrix:
\[J_{\text{eff}} = R^T(\theta) \cdot J_{\text{base}}\]Inverting to obtain the R/L → H/V mapping:
\[J_{\text{eff}}^{-1} = J_{\text{base}}^{-1} \cdot R(\theta)\]where \(R(\theta)\) is the 2D rotation matrix for the antenna’s orientation and \(J_{\text{base}}\) is the base Jones matrix (H/V to R/L conversion for the default orientation).
- Parameters:
antenna_orientations – Array of
AntennaOrientationvalues with shape (rows, cols).base_jones_matrix – The base Jones matrix mapping H/V to R/L for the default (N) orientation. If None, uses
constants.ANTENNA_JONES_MATRIX.
- Returns:
Array of effective inverse Jones matrices with shape (rows, cols, 2, 2). Multiply with R/L feed vector to obtain global H/V:
H_V = jones[r, c] @ R_L.
- espargos.util.csi_interp_eigenvec(csi: ndarray, weights: ndarray = None)[source]
Interpolates CSI data (frequency-domain or time-domain) by finding the principal eigenvector of the covariance matrix.
- Parameters:
csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.
weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.
- espargos.util.csi_interp_eigenvec_per_subcarrier(csi: ndarray) ndarray[source]
Interpolates CSI data by finding the principal eigenvector of the per-subcarrier covariance matrix. Unlike
csi_interp_eigenvec(), this function computes a separate covariance matrix for each subcarrier (last dimension), which preserves the frequency-domain structure of the CSI data.The result is scaled by the square root of the principal eigenvalue and phase-referenced to the first antenna element (index 0).
- Parameters:
csi – Complex-valued CSI data with shape
(n_samples, *antenna_shape, n_subcarriers). The first dimension is the number of CSI datapoints (e.g., calibration clusters), the last dimension is the number of subcarriers, and any intermediate dimensions describe the antenna array geometry.- Returns:
Interpolated CSI data with shape
(*antenna_shape, n_subcarriers).
- espargos.util.csi_interp_iterative(csi: ndarray, weights: ndarray = None, iterations=10)[source]
Coherently combines repeated CSI observations by iteratively phase-aligning them.
Each CSI snapshot is assumed to differ from the others mainly by a single global phase rotation. The algorithm alternates between two steps: estimating a combined CSI from the current phase offsets, and updating the phase offset of each snapshot to best match that combined CSI.
- Parameters:
csi – The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints.
weights – The weights to use for each CSI datapoint. If None, all datapoints are weighted equally.
iterations – The number of iterations to perform. Default is 10.
- Returns:
A phase-aligned weighted average of the input CSI data, with the same shape as one CSI datapoint.
- espargos.util.csi_interp_iterative_by_array(csi: ndarray, weights: ndarray = None, iterations=10)[source]
Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Same as
csi_interp_iterative(), but assumes that second dimension ofcsiis the antenna array dimension and performs the interpolation for each antenna array separately.
- espargos.util.derive_he20_calibration_from_lltf(complete_clusters_lltf: ndarray, complete_cluster_timestamps: ndarray, secondary_channel_relative: int) ndarray[source]
Derive a phase calibration for HE20 CSI from calibration packets that only provide LLTF.
HE20 uses four times finer subcarrier spacing than LLTF / HT20. This means that a delay which is only observed on the coarse 312.5 kHz LLTF / HT20 grid is ambiguous when projected onto the denser 78.125 kHz HE20 grid: multiple HE20 phase slopes can agree on every fourth subcarrier while disagreeing on the intermediate HE20 tones. We therefore cannot obtain a reliable HE20 calibration by simply fitting a slope on the coarse grid and reusing it unchanged.
This helper resolves the problem by going back to “first principles” of calibration and estimating time and phase offset separately:
Estimate constant per-antenna phase offsets from the already STO-corrected LLTF calibration clusters using a principal-eigenvector estimate.
Undo the LLTF timestamp-based STO correction, recover the underlying per-antenna baseband timing offsets from the raw LLTF slope together with the calibration timestamps, and synthesize the corresponding HE20 phase slope on the denser HE20 subcarrier grid.
The final HE20 calibration is the combination of those per-antenna constant phase offsets and the timestamp-derived HE20 phase slope.
- Parameters:
complete_clusters_lltf – Complete LLTF calibration CSI clusters as a complex-valued NumPy array with shape
(clusters, boards, rows, columns, subcarriers). These values are expected to come fromCSICluster.deserialize_csi_lltf()and are therefore already STO-corrected using the forwarded hardware timestamps.complete_cluster_timestamps – Per-sensor timestamps corresponding to
complete_clusters_lltf, in seconds, as a NumPy array with shape(clusters, boards, rows, columns).secondary_channel_relative – Relative position of the secondary channel used for the calibration packets. Use
-1for HT40 below,+1for HT40 above, and0for a plain 20 MHz channel.
- Returns:
Complex-valued HE20 calibration array with shape
(boards, rows, columns, csi.HE20_COEFFICIENTS_PER_CHANNEL).
- espargos.util.estimate_toas_rootmusic(csi_fdomain: ndarray, max_source_count=2, chunksize=36, per_board_average=False)[source]
Estimate the time of arrivals (ToAs) of the LoS paths using the root-MUSIC algorithm.
- Parameters:
csi_fdomain – The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
max_source_count – The maximum number of sources to estimate. The number of sources is determined using the Rissanen MDL criterion, but this parameter can be used to limit the number of sources.
chunksize – The size of the chunks to use for the covariance matrix computation.
per_board_average – If True, compute the average ToA over all antennas per board. If False, return the ToAs for each antenna separately.
- Returns:
The estimated ToAs of the LoS paths, in seconds, NumPy array of shape
(boardcount, constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW).
- espargos.util.extract_ht20_subcarriers_from_ht40(csi_ht40: ndarray, secondary_channel_relative: int)[source]
Extract the HT20 subcarriers from HT40 CSI data.
- Parameters:
csi_ht40 – The HT40 CSI data. Complex-valued NumPy array with shape (…, subcarriers).
secondary_channel_relative – The relative position of the secondary channel to the primary channel. -1 for below, +1 for above.
- Returns:
The extracted HT20 CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- espargos.util.extract_lltf_subcarriers_from_ht20(csi_ht20: ndarray)[source]
Extract the LLTF subcarriers from HT20 CSI data.
- Parameters:
csi_ht20 – The HT20 CSI data. Complex-valued NumPy array with shape (…, subcarriers).
- Returns:
The extracted LLTF CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- espargos.util.extract_lltf_subcarriers_from_ht40(csi_ht40: ndarray, secondary_channel_relative: int)[source]
Extract the LLTF subcarriers from HT40 CSI data.
- Parameters:
csi_ht40 – The HT40 CSI data. Complex-valued NumPy array with shape (…, subcarriers).
secondary_channel_relative – The relative position of the secondary channel to the primary channel. -1 for below, +1 for above.
- Returns:
The extracted LLTF CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- espargos.util.fdomain_to_tdomain_pdp_music(csi_fdomain: ndarray, source_count: int = None, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]
Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using MUSIC super-resolution.
- Param:
csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- Returns:
The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.
- espargos.util.fdomain_to_tdomain_pdp_mvdr(csi_fdomain: ndarray, chunksize=36, tap_min=-7, tap_max=7, resolution=200)[source]
Convert frequency-domain CSI data to a time-domain power delay profile (PDP) using the MVDR beamformer.
- Param:
csi_fdomain: The frequency-domain CSI data. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
- Returns:
The delays (in taps) and the PDPs of shape (datapoints, arrays, rows, columns, delays), as NumPy arrays.
- espargos.util.fit_complex_sinusoid(csi_data: ndarray) ndarray[source]
Fit a complex sinusoid (amplitude, phase offset, and linear phase slope) to CSI data along the subcarrier axis (last dimension).
Each antenna’s frequency response over a reference channel is modeled as:
\[H[k] = A \cdot \exp\!\bigl(j\,(\varphi_0 + \omega \, k)\bigr)\]where k is the subcarrier index, A is the amplitude, \(\varphi_0\) is the phase offset, and \(\omega\) is the phase slope (proportional to propagation delay).
The function estimates the parameters per antenna element and returns the reconstructed (fitted) complex sinusoid evaluated at every subcarrier index.
- Parameters:
csi_data – Complex-valued CSI array with arbitrary leading dimensions (e.g. antenna geometry) and subcarriers as the last dimension. Shape
(*antenna_shape, n_subcarriers).- Returns:
Fitted complex sinusoid with the same shape as csi_data.
- espargos.util.get_cable_wavelength(frequencies: ndarray, velocity_factors: ndarray)[source]
Returns the wavelength of the provided subcarrier frequencies on a cable with the given velocity factors.
- Parameters:
frequencies – The frequencies of the subcarriers, in Hz, NumPy array.
velocity_factors – The velocity factors of the cable, NumPy array.
- Returns:
The wavelengths of the subcarriers, in meters, NumPy array.
- espargos.util.get_center_frequency(primary_channel: int, secondary_channel: int | None = None)[source]
Returns the RF center frequency for the provided Wi-Fi channel configuration.
If only
primary_channelis given, this returns the center frequency of that 20 MHz channel. Ifsecondary_channelis also given, this returns the center frequency halfway between primary and secondary, which corresponds to the HT40 LO.- Parameters:
primary_channel – The primary Wi-Fi channel number.
secondary_channel – The secondary Wi-Fi channel number. If omitted or equal to
primary_channel, the 20 MHz channel center is returned.
- Returns:
Center frequency in Hz.
- espargos.util.get_frequencies_he20(channel: int)[source]
Returns the frequencies of the subcarriers in a 2.4 GHz 802.11ax HE20 channel.
The raw HE-LTF reported by the ESP32-C61 covers subcarrier indices
-122..122, where-1, 0, 1are invalid / null tones.- Parameters:
channel – The primary channel number.
- Returns:
The frequencies of the HE20 subcarriers, in Hz, NumPy array.
- espargos.util.get_frequencies_ht20(channel: int)[source]
Returns the frequencies of the subcarriers in an 2.4GHz 802.11n 20MHz wide WiFi channel.
- Parameters:
primary_channel – The primary channel number (= primary channel, but there is only one channel).
- Returns:
The frequencies of the subcarriers, in Hz, NumPy array.
- espargos.util.get_frequencies_ht40(primary_channel: int, secondary_channel: int)[source]
Returns the frequencies of the subcarriers in an HT40 2.4GHz WiFi channel. :param primary_channel: The primary channel number. :param secondary_channel: The secondary channel number. :return: The frequencies of the subcarriers, in Hz, NumPy array.
- espargos.util.get_frequencies_lltf(channel: int)[source]
Returns the frequencies of the subcarriers in an 2.4GHz 802.11g 20MHz wide WiFi channel.
- Parameters:
primary_channel – The primary channel number (= primary channel, but there is only one channel).
- Returns:
The frequencies of the subcarriers, in Hz, NumPy array.
- espargos.util.mask_csi_by_feed(csidata: ndarray, rfswitch_states: ndarray, desired_feed: rfswitch_state_t)[source]
Mask the CSI data by the RF switch state, i.e., set the CSI data to 0 for all datapoints where the RF switch state is not the desired feed. Also applies scaling to the remaining datapoints to account for the fact that only a fraction of the datapoints are kept, so that the overall power level is preserved.
- Parameters:
csidata – The CSI data to mask. Complex-valued NumPy array with shape (datapoints, …, subcarriers), usually (datapoints, arrays, rows, columns, subcarriers).
rfswitch_states – The RF switch states for each antenna and datapoint. NumPy array with shape (datapoints, …), usually (datapoints, arrays, rows, columns).
desired_feed – The desired RF switch state to keep.
- Returns:
The masked CSI data. Complex-valued NumPy array with the same shape as the input CSI data. Returns None if no datapoints have the desired RF switch state for any antenna.
- espargos.util.parse_combined_array_config(config_dict: dict)[source]
Parse the configuration file for demos that use combined array.
- Parameters:
config_dict – The configuration dictionary to parse.
- Return indexing_matrix:
The indexing matrix to map the CSI data of the subarrays to the CSI data of the large array.
- Return board_names_hosts:
The names of the boards and their hostnames.
- Return cable_lengths:
The lengths of the cables connecting the boards.
- Return cable_velocity_factors:
The velocity factors of the cables connecting the boards.
- Return n_rows:
The number of rows in the array.
- Return n_cols:
The number of columns in the array.
- Return antenna_orientations:
Array of
AntennaOrientationvalues with shape (n_rows, n_cols), indicating the orientation of each antenna’s sub-array in the combined array.- Raises:
ValueError – If the configuration is invalid (e.g., invalid antenna indices, missing/duplicate antennas, non-contiguous sub-arrays, invalid rotation).
- espargos.util.remove_mean_sto(csi_datapoints: ndarray)[source]
Removes the mean symbol timing offset (STO) from the CSI data by estimating the STO from the phase slope across subcarriers. All datapoints are corrected separately.
- Parameters:
csi_datapoints – The CSI data (multiple datapoints) to remove the mean STO from, frequency-domain. Complex-valued NumPy array with arbitrary shape as long as the first dimension is the datapoint dimension and the last dimension is the subcarrier dimension.
- espargos.util.scale_csi_by_reported_gain(csi_data: ndarray, rx_gain: ndarray, fft_gain: ndarray) ndarray[source]
Compensate CSI amplitudes for the receiver gain reported by the ESP32.
The ESP32 reports RX gain in 1 dB units and FFT gain in 0.25 dB units. Since these are receiver-side gains, this helper divides raw CSI amplitudes by the reported gain factor. The scaling is valid for both automatic and manual gain mode, because the reported values are always meaningful.
- espargos.util.separate_feeds(csidata: ndarray, rfswitch_state: ndarray)[source]
Separate the CSI data by antenna feeds (R/L) based on the RF switch states. Also takes care of scaling the CSI data for each feed to account for the fact that only a fraction of the datapoints are kept for each feed, so that the overall power level is preserved. Missing measurements for a feed (i.e., half of all measurements) are filled with zeros.
- Parameters:
csidata – The CSI data to separate. Complex-valued NumPy array with shape (datapoints, …, subcarriers), usually (datapoints, arrays, rows, columns, subcarriers).
rfswitch_states – The RF switch states for each antenna and datapoint. NumPy array with shape (datapoints, …), usually (datapoints, arrays, rows, columns).
- Returns:
The separated CSI data. Complex-valued NumPy array with shape (datapoints, …, subcarriers, 2), where the last dimension corresponds to the R/L feeds. Returns None if no datapoints have the desired RF switch state for any antenna.
- espargos.util.shift_to_firstpeak_sync(csi_datapoints: ndarray, max_delay_taps=3, search_resolution=40, peak_threshold=0.1)[source]
Shifts the CSI data so that the first peak of the channel impulse response is at time 0. All CSI datapoints are shifted by the same amount, i.e., requires synchronized CSI.
- Parameters:
csi_datapoints – The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers).
max_delay_taps – The maximum number of time taps to shift the CSI data by.
search_resolution – The number of search points (granularity) to use for the time shift.
peak_threshold – The threshold for the peak detection, as a fraction of the maximum peak power.
- Returns:
The frequency-domain CSI data with the first peak of the channel impulse response at time 0.