Source code for espargos.pool

#!/usr/bin/env python

from weakref import WeakKeyDictionary
from collections import OrderedDict
from typing import Callable
import numpy as np
import threading
import binascii
import logging
import time
import json

from . import calibration
from . import constants
from . import cluster
from . import board
from . import util
from . import csi
from . import radar


[docs] class CalibrationError(RuntimeError): """Raised when ESPARGOS calibration cannot collect usable calibration CSI."""
class _CSICallback(object): def __init__( self, cb: Callable[[cluster.CSICluster], None], cb_predicate: Callable[[cluster.CSICluster], bool] = None, ): # By default, provide csi if CSI is available from all antennas self.cb_predicate = cb_predicate self.cb = cb # Track fired state per cluster.CSICluster object self.fired = WeakKeyDictionary() def try_call(self, csi_cluster: cluster.CSICluster): # Check if callback has already been fired for this cluster.CSICluster object if self.fired.get(csi_cluster, False): return True # Check if callback needs to be called: Use predicate function if defined, otherwise call if all antennas have CSI callback_required = False if self.cb_predicate is not None: callback_required = self.cb_predicate(csi_cluster) else: callback_required = csi_cluster.get_completion_all() if callback_required: self.cb(csi_cluster) # Mark as fired for this cluster.CSICluster object self.fired[csi_cluster] = True return True return False
[docs] class Pool(object): """ A Pool is a collection of ESPARGOS boards. The pool manages the clustering of CSI data from multiple ESPARGOS sensors (antennas) that belong to the same WiFi packet and provides :class:'cluster.CSICluster' objects to registered callbacks. """ def __init__(self, boards: list[board.Board], ota_cache_timeout=5, refgen_boards=None): """ Constructor for the Pool class. :param boards: A list of ESPARGOS boards that belong to the pool :param ota_cache_timeout: Optional. The timeout in seconds after which over-the-air CSI data is considered stale and discarded if the cluster is not complete :param refgen_boards: Optional. In some multi-board setups, the calibration signal is provided by (a) separate ESPARGOS device(s) that is / are not part of the pool (only controller is used to generate packets, sensors not used). If provided, sends calibration command to these boards, which will then generate the calibration signal during calibration phase. """ self.logger = logging.getLogger("pyespargos.pool") self.boards = boards self.refgen_boards = refgen_boards if refgen_boards is not None else [] self.ota_cache_timeout = ota_cache_timeout self.emit_calibration_csi = False # We have two caches: One for calibration packets, the other one for over-the-air packets self.cluster_cache_calib_lock = threading.Lock() self.cluster_cache_calib = OrderedDict[str, cluster.CSICluster]() self.cluster_cache_ota_lock = threading.Lock() self.cluster_cache_ota = OrderedDict[str, cluster.CSICluster]() self.input_list = list() self.input_cond = threading.Condition() for board_num, board in enumerate(self.boards): board.add_consumer(self.input_list, self.input_cond, board_num) self.callbacks: list[_CSICallback] = [] self.logger.info(f"Created new pool with {len(boards)} board(s)") self.stored_calibration: calibration.CSICalibration = None self.stats = dict() def _assert_same_across_boards(self, values: list, what: str): """ Ensure all entries in `values` are identical (after canonical JSON encoding for dict/list). """ if not values: raise ValueError(f"{what}: no boards in pool") def canon(v): if isinstance(v, (dict, list)): return json.dumps(v, sort_keys=True, separators=(",", ":")) return v c0 = canon(values[0]) for i, v in enumerate(values[1:], start=1): if canon(v) != c0: raise ValueError(f"{what}: mismatch between boards (board 0 != board {i})") def _assert_same_dict_across_boards(self, dicts: list[dict], what: str, ignore_keys: set[str] | None = None): """ Ensure all dicts are identical, optionally ignoring specific keys. """ if not dicts: raise ValueError(f"{what}: no boards in pool") ignore_keys = ignore_keys or set() def strip_ignored(d: dict): return {k: v for k, v in d.items() if k not in ignore_keys} stripped = [strip_ignored(d) for d in dicts] reference = stripped[0] for board_num, current in enumerate(stripped[1:], start=1): if current == reference: continue mismatch_lines = [] for key in sorted(set(reference) | set(current)): reference_value = reference.get(key, "<missing>") current_value = current.get(key, "<missing>") if current_value != reference_value: mismatch_lines.append(f"{key}: board 0={reference_value!r}, board {board_num}={current_value!r}") details = "; ".join(mismatch_lines) if mismatch_lines else "no differing top-level keys found" raise ValueError(f"{what}: mismatch between boards (board 0 != board {board_num}): {details}")
[docs] def set_rfswitch(self, state: csi.rfswitch_state_t): """ Set RF switch state for all boards in the pool. :param state: The RF switch state to set, must be one of :class:`csi.rfswitch_state_t` """ for board in self.boards + self.refgen_boards: board.set_rfswitch(state)
[docs] def get_rfswitch(self) -> csi.rfswitch_state_t: """ Get RF switch state from the first board in the pool. :return: The RF switch state of the first board in the pool """ if not self.boards: raise ValueError("No boards in pool to get RF switch state from") states = [b.get_rfswitch() for b in self.boards] self._assert_same_across_boards(states, "RF switch state") return states[0]
[docs] def set_mac_filter(self, mac_filter: dict): """ Set the MAC address filter for all boards in the pool. Will only accept packets from the specified MAC address. This is forwarded to :meth:`pyespargos.board.Board.set_mac_filter` for each board. """ for board in self.boards: board.set_mac_filter(mac_filter)
[docs] def clear_mac_filter(self): """ Clear the MAC address filter for all boards in the pool. """ for board in self.boards: board.clear_mac_filter()
[docs] def get_mac_filter(self) -> dict: """ Return MAC filter configuration; sanity-check all boards report the same value. This is forwarded to :meth:`pyespargos.board.Board.get_mac_filter` for each board. """ filters = [b.get_mac_filter() for b in self.boards] self._assert_same_across_boards(filters, "MAC filter") return filters[0]
[docs] def get_csi_acquire_config(self) -> dict: """ Return CSI acquire config; sanity-check all boards report the same value. """ cfgs = [b.get_csi_acquire_config() for b in self.boards] self._assert_same_across_boards(cfgs, "CSI acquire config") return cfgs[0]
[docs] def set_csi_acquire_config(self, config: dict): """ Set CSI acquisition configuration on all boards in this pool and sanity-check that all boards end up with the same config. This is forwarded to :meth:`pyespargos.board.Board.set_csi_acquire_config` for each board. For the expected JSON/dict format, refer to that method's documentation. :param config: CSI acquisition configuration dict to apply to all boards. :raises ValueError: If boards in the pool disagree on the resulting config after applying. :raises EspargosUnexpectedResponseError: If any board returns an unexpected response. """ for b in self.boards: b.set_csi_acquire_config(config) _ = self.get_csi_acquire_config()
[docs] def get_cfo_correction(self) -> dict: """ Return CFO correction config; sanity-check all boards report the same value. """ configs = [b.get_cfo_correction() for b in self.boards] self._assert_same_across_boards(configs, "CFO correction") return configs[0]
[docs] def set_cfo_correction(self, auto: bool, value: int = 0): """ Configure CFO correction on all boards in this pool. """ for b in self.boards: b.set_cfo_correction(auto, value) _ = self.get_cfo_correction()
[docs] def get_gain_settings(self) -> dict: """ Return gain settings; sanity-check all boards report the same value. """ settings = [b.get_gain_settings() for b in self.boards] self._assert_same_across_boards(settings, "Gain settings") return settings[0]
[docs] def set_gain_settings(self, settings: dict): """ Set gain settings on all boards in this pool. This is forwarded to :meth:`pyespargos.board.Board.set_gain_settings` for each board. Values may be scalars, board-local ``(row, column)`` arrays applied to every board, or pool-wide ``(board, row, column)`` arrays. :param settings: Gain settings dict to apply to all boards. :raises EspargosUnexpectedResponseError: If any board returns an unexpected response. """ per_board_settings = [dict() for _ in self.boards] for key, value in settings.items(): array = np.asarray(value) if array.shape == self.get_shape(): for board_index in range(len(self.boards)): per_board_settings[board_index][key] = array[board_index] else: for board_settings in per_board_settings: board_settings[key] = value for board_obj, board_settings in zip(self.boards, per_board_settings): board_obj.set_gain_settings(board_settings)
[docs] def get_wifi_channel_overrides(self) -> dict: """ Return per-sensor WiFi channel overrides; sanity-check all boards report the same value. """ settings = [b.get_wifi_channel_overrides() for b in self.boards] self._assert_same_across_boards(settings, "WiFi channel overrides") return settings[0]
[docs] def set_wifi_channel_overrides(self, settings: dict): """ Set per-sensor WiFi channel overrides on all boards in this pool and sanity-check that all boards end up with the same settings. This is forwarded to :meth:`pyespargos.board.Board.set_wifi_channel_overrides` for each board. For the expected JSON/dict format, refer to that method's documentation. :param settings: Per-sensor WiFi channel override settings dict to apply to all boards. :raises ValueError: If boards in the pool disagree on the resulting settings after applying. :raises EspargosUnexpectedResponseError: If any board returns an unexpected response. """ for b in self.boards: b.set_wifi_channel_overrides(settings) _ = self.get_wifi_channel_overrides()
[docs] def get_radar_configs(self) -> list[dict]: """ Return radar TX configuration for all boards in the pool. """ return [b.get_radar_config() for b in self.boards]
[docs] def get_radar_config(self) -> dict: """ Return radar TX configuration; sanity-check all boards report the same value. """ configs = self.get_radar_configs() self._assert_same_across_boards(configs, "Radar config") return configs[0]
[docs] def set_radar_config(self, config: dict | radar.RadarPoolConfig): """ Set radar TX configuration on the boards in this pool. ``config`` may either be a single controller config dict applied to every board, or a :class:`pyespargos.espargos.radar.RadarPoolConfig` containing one config per board. """ if isinstance(config, radar.RadarPoolConfig): if len(config.board_configs) != len(self.boards): raise ValueError(f"RadarPoolConfig contains {len(config.board_configs)} board configs, expected {len(self.boards)}") for board_obj, board_config in zip(self.boards, config.board_configs): board_obj.set_radar_config(board_config) return for b in self.boards: b.set_radar_config(config)
[docs] def get_wificonf(self) -> dict: """ Return WiFi config; sanity-check boards report the same value. Consistency check ignores "calib-source" and "calib-mode" (they may legitimately differ). """ wificonfs = [b.get_wificonf() for b in self.boards] self._assert_same_dict_across_boards( wificonfs, "WiFi config", ignore_keys={"calib-source", "calib-mode"}, ) return wificonfs[0]
[docs] def set_wificonf(self, wificonf: dict): """ Set WiFi config on all boards and sanity-check resulting configs match across boards. This is forwarded to :meth:`pyespargos.board.Board.set_wificonf` for each board. For the expected JSON/dict format, refer to that method's documentation. The values of "calib-source" and "calib-mode" are ignored and not propagated to the pool. If you need to set those, call :meth:`pyespargos.board.Board.set_wificonf` on each board individually. Consistency check also ignores "calib-source" and "calib-mode" (they may legitimately differ). :param wificonf: WiFi configuration dict to apply to all boards. :raises ValueError: If boards in the pool disagree on the resulting config after applying (excluding ignored keys). :raises EspargosUnexpectedResponseError: If any board returns an unexpected response. """ wificonf = dict(wificonf) # Make a copy wificonf.pop("calib-source", None) wificonf.pop("calib-mode", None) for b in self.boards: b.set_wificonf(wificonf) _ = self.get_wificonf()
[docs] def start(self): """ Start the streaming of CSI data for all boards in the pool. """ for board in self.boards: board.start()
[docs] def stop(self): """ Stop the streaming of CSI data for all boards in the pool. """ for board in self.boards: board.stop()
[docs] def reboot(self): """ Trigger a reboot on all boards in the pool. """ for board in self.boards + self.refgen_boards: board.reboot()
[docs] def add_csi_callback( self, cb: Callable[[cluster.CSICluster], None], cb_predicate: Callable[[cluster.CSICluster], bool] = None, ): """ Register callback function that is invoked whenever a new CSI cluster is completed. :param cb: The function to call, gets instance of class :class:`.cluster.CSICluster` as parameter :param cb_predicate: A function with signature :code:`(csi_cluster)` that defines the conditions under which clustered CSI is regarded as completed and thus provided to the callback. If :code:`cb_predicate` returns true, clustered CSI is regarded as completed. If no predicate is provided, the default behavior is to trigger the callback when CSI has been received from all sensors on all boards. By default, callbacks receive over-the-air/radar CSI only. Enable :meth:`set_emit_calibration_csi` to also emit calibration CSI (from internal reference generators) through this same callback path. :return: A callback handle that can be passed to :meth:`remove_csi_callback` """ callback = _CSICallback(cb, cb_predicate) self.callbacks.append(callback) return callback
[docs] def remove_csi_callback(self, callback: _CSICallback) -> bool: """ Remove a CSI callback previously returned by :meth:`add_csi_callback`. :param callback: Callback handle returned by :meth:`add_csi_callback` :return: True if the callback was registered and removed, False otherwise """ try: self.callbacks.remove(callback) return True except ValueError: return False
[docs] def set_emit_calibration_csi(self, enabled: bool): """ Control whether calibration CSI clusters are emitted through normal CSI callbacks. Calibration clusters remain marked as calibration packets via :meth:`espargos.cluster.CSICluster.is_calib`. """ self.emit_calibration_csi = bool(enabled)
[docs] def get_emit_calibration_csi(self) -> bool: """ Return whether calibration CSI clusters are emitted through normal CSI callbacks. """ return self.emit_calibration_csi
def _try_callbacks(self, csi_cluster: cluster.CSICluster) -> bool: all_callbacks_fired = True for cb in self.callbacks: all_callbacks_fired = all_callbacks_fired and cb.try_call(csi_cluster) return all_callbacks_fired def _clusters_to_calibration(self, board_num=None): """ Convert collected calibration clusters to phase calibration values. :param board_num: If provided, only process calibration clusters for the specified board number """ # Take snapshot of current calibration clusters under lock with self.cluster_cache_calib_lock: clusters = list(self.cluster_cache_calib.values()) # Collection of complete clusters (= reference CSI data from all antennas available): L-LTF, HT20-LTF, and HT40-LTF complete_clusters_lltf = [] complete_cluster_timestamps_lltf = [] complete_clusters_ht20 = [] complete_cluster_timestamps_ht20 = [] complete_clusters_ht40 = [] complete_cluster_timestamps = [] # Read wificonf to determine primary/secondary channel wificonf = self.get_wificonf() channel_primary = wificonf.get("channel-primary", None) channel_secondary = wificonf.get("channel-secondary", None) channel_secondary = -1 if channel_secondary == 2 else channel_secondary any_csi_count = 0 for cluster in clusters: cluster_channel_primary = cluster.get_primary_channel() cluster_channel_secondary = cluster.get_secondary_channel_relative() if channel_primary != cluster_channel_primary or channel_secondary != cluster_channel_secondary: self.logger.warning( f"Calibration cluster with differing channel settings detected (most likely stale data), expected primary channel {channel_primary} and secondary channel {channel_secondary}, but got primary channel {cluster_channel_primary} and secondary channel {cluster_channel_secondary}. Skipping cluster." ) continue completion = cluster.get_completion()[board_num] if board_num is not None else cluster.get_completion() if np.any(completion): any_csi_count = any_csi_count + 1 if np.all(completion): cluster_timestamps = cluster.get_sensor_timestamps()[board_num] if board_num is not None else cluster.get_sensor_timestamps() complete_cluster_timestamps.append(cluster_timestamps) if cluster.has_lltf(): complete_clusters_lltf.append(cluster.deserialize_csi_lltf()[board_num] if board_num is not None else cluster.deserialize_csi_lltf()) complete_cluster_timestamps_lltf.append(cluster_timestamps) if cluster.has_ht20ltf(): complete_clusters_ht20.append(cluster.deserialize_csi_ht20ltf()[board_num] if board_num is not None else cluster.deserialize_csi_ht20ltf()) complete_cluster_timestamps_ht20.append(cluster_timestamps) if cluster.has_ht40ltf(): complete_clusters_ht40.append(cluster.deserialize_csi_ht40ltf()[board_num] if board_num is not None else cluster.deserialize_csi_ht40ltf()) if board_num is not None: self.logger.info(f"Board {self.boards[board_num].get_name()}: Collected {any_csi_count} calibration clusters:") else: self.logger.info(f"Collected {any_csi_count} calibration clusters:") self.logger.info(f" - {len(complete_clusters_ht40)} complete clusters with HT40-LTF") self.logger.info(f" - {len(complete_clusters_ht20)} complete clusters with HT20-LTF") self.logger.info(f" - {len(complete_clusters_lltf)} complete clusters with L-LTF") if len(complete_clusters_ht20) > 0: # If we only have HT20 but no LLTF calibration, we can still proceed: Use corresponding subcarriers from HT20 for LLTF calibration if len(complete_clusters_lltf) == 0: self.logger.warning("No LLTF calibration clusters received, deriving LLTF calibration from HT20 calibration") complete_clusters_lltf.extend([util.extract_lltf_subcarriers_from_ht20(csi_ht20) for csi_ht20 in complete_clusters_ht20]) complete_cluster_timestamps_lltf.extend(complete_cluster_timestamps_ht20) complete_cluster_count = len(complete_cluster_timestamps) calibration_error = None if any_csi_count < 5: calibration_error = "too few calibration packets were received" elif complete_cluster_count == 0: calibration_error = "no packet contained CSI from the complete calibrated array" elif len(complete_clusters_lltf) == 0: calibration_error = "no complete packet contained L-LTF or HT20-LTF CSI" if calibration_error is not None: raise CalibrationError( f"ESPARGOS calibration failed: {calibration_error}. " f"Received {any_csi_count} calibration packets with any CSI, " f"{complete_cluster_count} complete packets, " f"{len(complete_clusters_ht20)} complete HT20-LTF packets, " f"and {len(complete_clusters_lltf)} complete L-LTF/HT20-derived packets. " "Calibration needs several packets with CSI from the complete array and at least one complete L-LTF or HT20-LTF packet. " "Check signal level, RX gain, packet filtering, and whether all sensors are receiving the calibration/reference signal." ) return ( np.asarray(complete_clusters_lltf), np.asarray(complete_clusters_ht20), np.asarray(complete_clusters_ht40), np.asarray(complete_cluster_timestamps), np.asarray(complete_cluster_timestamps_lltf), channel_primary, channel_secondary, ) def _compute_sensor_clock_offsets(self, complete_cluster_timestamps: np.ndarray) -> np.ndarray: """ Compute per-sensor clock offsets relative to sensor 0 from complete calibration clusters. :param complete_cluster_timestamps: Array of shape ``(clusters, boards, rows, columns)`` containing per-sensor timestamps in seconds. :return: Array of shape ``(boards, rows, columns)`` with offsets in seconds relative to sensor 0. """ if len(complete_cluster_timestamps) == 0: return np.full(self.get_shape(), np.nan, dtype=np.float64) sensor_clock_offsets = np.asarray(complete_cluster_timestamps, dtype=np.float64) sensor_clock_offsets -= sensor_clock_offsets[:, 0:1, 0:1, 0:1] return np.mean(sensor_clock_offsets, axis=0)
[docs] def calibrate( self, per_board=True, duration=2, exithandler=None, cable_lengths=None, cable_velocity_factors=None, run_in_thread=True, ): """ Run calibration for a specified duration. :param per_board: True to calibrate each board separately, False to calibrate all boards together. Set to False if the same phase reference signal is used for all boards, otherwise set to True. :param duration: The duration in seconds for which calibration should be run :param exithandler: An optional exit handler that can be used to stop calibration prematurely if :code:`exithandler.running` is set to False in a separate thread :param cable_lengths: The lengths of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards, in meters. Only needed for phase-coherent multi-board setups, omit if all cables have the same length. :param cable_velocity_factors: The velocity factors of the feeder cables that distribute the clock and phase calibration signal to the ESPARGOS boards Must be the same length as :code:`cable_lengths`, and all entries should be in the range [0, 1]. :param run_in_thread: If True, the pool handling will be performed in the current thread. Set to False in case the pool is already running in a separate thread (e.g., backlog is already active). """ # Clear calibration cache with self.cluster_cache_calib_lock: self.cluster_cache_calib.clear() # Back up and clear MAC filter previous_mac_filter = self.get_mac_filter() previous_rfswitch_state = self.get_rfswitch() try: self.clear_mac_filter() # Enable calibration mode self.logger.info("Starting calibration") self.set_rfswitch(csi.rfswitch_state_t.SENSOR_RFSWITCH_REFERENCE) # Run calibration for specified duration start = time.time() while (time.time() - start < duration) and (exithandler is None or exithandler.running): if run_in_thread: self.run() else: time.sleep(0.01) finally: # Disable calibration mode self.logger.info("Finished calibration") self.set_rfswitch(previous_rfswitch_state) self.set_mac_filter(previous_mac_filter) with self.cluster_cache_ota_lock: self.cluster_cache_ota.clear() # Collect calibration packets and compute calibration phases ( complete_clusters_lltf, complete_clusters_ht20, complete_clusters_ht40, complete_cluster_timestamps, complete_cluster_timestamps_lltf, channel_primary, channel_secondary, ) = self._clusters_to_calibration() sensor_clock_offsets = self._compute_sensor_clock_offsets(complete_cluster_timestamps) phase_calibration_he20 = util.derive_he20_calibration_from_lltf(complete_clusters_lltf, complete_cluster_timestamps_lltf, channel_secondary) if per_board: phase_calibrations_lltf = [] phase_calibrations_ht20 = [] phase_calibrations_ht40 = [] for board_num in range(len(self.boards)): ( complete_clusters_lltf, complete_clusters_ht20, complete_clusters_ht40, _complete_cluster_timestamps, _complete_cluster_timestamps_lltf, _channel_primary, _channel_secondary, ) = self._clusters_to_calibration(board_num) phase_calibrations_lltf.append( util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_lltf)) if len(complete_clusters_lltf) > 0 else np.full( self.get_shape()[1:] + (csi.LEGACY_COEFFICIENTS_PER_CHANNEL,), np.nan, ) ) phase_calibrations_ht20.append( util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_ht20)) if len(complete_clusters_ht20) > 0 else np.full( self.get_shape()[1:] + (csi.HT_COEFFICIENTS_PER_CHANNEL,), np.nan, ) ) phase_calibrations_ht40.append( util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_ht40)) if len(complete_clusters_ht40) > 0 else np.full( self.get_shape()[1:] + (csi.HT_COEFFICIENTS_PER_CHANNEL * 2 + csi.HT40_GAP_SUBCARRIERS,), np.nan, ) ) self.stored_calibration = calibration.CSICalibration( self.boards, channel_primary, channel_secondary, np.asarray(phase_calibrations_lltf), np.asarray(phase_calibrations_ht20), np.asarray(phase_calibrations_ht40), phase_calibration_he20, sensor_clock_offsets=sensor_clock_offsets, ) else: phase_calibrations_lltf = util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_lltf)) if len(complete_clusters_lltf) > 0 else np.full(self.get_shape() + (csi.LEGACY_COEFFICIENTS_PER_CHANNEL,), np.nan) phase_calibrations_ht20 = util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_ht20)) if len(complete_clusters_ht20) > 0 else np.full(self.get_shape() + (csi.HT_COEFFICIENTS_PER_CHANNEL,), np.nan) phase_calibration_ht40 = ( util.csi_interp_eigenvec_per_subcarrier(np.asarray(complete_clusters_ht40)) if len(complete_clusters_ht40) > 0 else np.full( self.get_shape() + (csi.HT_COEFFICIENTS_PER_CHANNEL * 2 + csi.HT40_GAP_SUBCARRIERS,), np.nan, ) ) # Each antenna just gets a delayed and phase-shifted version of the reference signal, # so frequency response is just a complex sinusoid over subcarrier axis. # Fit optimal complex sinusoid to the CSI of each antenna across subcarriers to extract the phase shift and delay, which we can then use for calibration. phase_calibrations_lltf = util.fit_complex_sinusoid(phase_calibrations_lltf) phase_calibrations_ht20 = util.fit_complex_sinusoid(phase_calibrations_ht20) phase_calibration_ht40 = util.fit_complex_sinusoid(phase_calibration_ht40) self.stored_calibration = calibration.CSICalibration( self.boards, channel_primary, channel_secondary, phase_calibrations_lltf, phase_calibrations_ht20, phase_calibration_ht40, phase_calibration_he20, sensor_clock_offsets=sensor_clock_offsets, board_cable_lengths=cable_lengths, board_cable_vfs=cable_velocity_factors, )
[docs] def get_calibration(self): """ Get the stored calibration values. :return: The stored calibration values as a :class:`.calibration.CSICalibration` object """ return self.stored_calibration
[docs] def get_shape(self): """ Get the outer shape of the stored data, i.e., only the antenna dimensions and not subcarrier dimensions or similar. """ return (len(self.boards), constants.ROWS_PER_BOARD, constants.ANTENNAS_PER_ROW)
[docs] def get_stats(self): """ Get collected statistics about the pool. """ return self.stats
[docs] def run(self): """ Process incoming CSI data packets and call registered callbacks if CSI clusters are complete. Repeatedly call this function from your main loop or from a separate thread. May block for a short amount of time if no data is available. """ with self.input_cond: self.input_cond.wait(timeout=0.5) packets = [p for p in self.input_list] self.input_list.clear() self._handle_packets(packets)
def _handle_packets(self, packets): self.stats["packet_backlog"] = len(packets) for pkt in packets: esp_num, stream_packet, board_num = pkt[0], pkt[1], pkt[2] source_mac_str = binascii.hexlify(bytearray(stream_packet.source_mac)).decode("utf-8") dest_mac_str = binascii.hexlify(bytearray(stream_packet.dest_mac)).decode("utf-8") # Identifier (here: MAC address & sequence control number) cluster_id = f"{source_mac_str}-{dest_mac_str}-{stream_packet.seq_ctrl.seg:03x}-{stream_packet.seq_ctrl.frag:01x}" if isinstance(stream_packet, csi.radar_tx_report_tlv_t): with self.cluster_cache_ota_lock: if cluster_id not in self.cluster_cache_ota: self.cluster_cache_ota[cluster_id] = cluster.CSICluster( source_mac_str, dest_mac_str, stream_packet.seq_ctrl, [b.revision for b in self.boards], ) self.cluster_cache_ota[cluster_id].set_radar_tx_report(stream_packet, board_num=board_num, esp_num=esp_num) continue serialized_csi = stream_packet # Prepare a cache entry for a new cluster with a different and add received data to the current cluster if serialized_csi.is_calib: calib_cluster = None with self.cluster_cache_calib_lock: if cluster_id not in self.cluster_cache_calib: self.cluster_cache_calib[cluster_id] = cluster.CSICluster( source_mac_str, dest_mac_str, serialized_csi.seq_ctrl, [b.revision for b in self.boards], ) self.cluster_cache_calib[cluster_id].add_csi(board_num, esp_num, serialized_csi) calib_cluster = self.cluster_cache_calib[cluster_id] if self.emit_calibration_csi: self._try_callbacks(calib_cluster) else: with self.cluster_cache_ota_lock: if cluster_id not in self.cluster_cache_ota: self.cluster_cache_ota[cluster_id] = cluster.CSICluster( source_mac_str, dest_mac_str, serialized_csi.seq_ctrl, [b.revision for b in self.boards], ) # Add received data for the antenna to the current cluster self.cluster_cache_ota[cluster_id].add_csi(board_num, esp_num, serialized_csi) # Check OTA cluster cache for packets where callback is due and for stale packets. # Snapshot under the lock so concurrent mutation (e.g. cache clear at the end of # calibration) cannot invalidate the iteration, then fire callbacks without holding # the lock to avoid blocking packet ingestion or deadlocking on re-entrant calls. with self.cluster_cache_ota_lock: ota_clusters = list(self.cluster_cache_ota.items()) stale = set() for id, csi_cluster in ota_clusters: all_callbacks_fired = self._try_callbacks(csi_cluster) if all_callbacks_fired and np.any(csi_cluster.get_completion()): stale.add(id) if csi_cluster.get_age() > self.ota_cache_timeout: stale.add(id) with self.cluster_cache_ota_lock: for id in stale: self.cluster_cache_ota.pop(id, None)