#!/usr/bin/env python
import websockets.sync.client
import http.client
import threading
import logging
import socket
import ctypes
import json
import binascii
import time
import numpy as np
from . import revisions
from . import csi
from . import uart
# Port used by the controller as source port for UDP CSI packets
CSISTREAM_CONTROLLER_SRC_PORT = 53330
[docs]
class EspargosHTTPStatusError(Exception):
"Raised when the ESPARGOS HTTP API returns an invalid status code"
def __init__(self, status=None, path=None, body=None):
self.status = status
self.path = path
self.body = body
message = "ESPARGOS HTTP API returned an invalid status code"
if status is not None:
message += f" {status}"
if path is not None:
message += f" for /{path}"
if body:
message += f": {body}"
super().__init__(message)
[docs]
class EspargosUnexpectedResponseError(Exception):
"Raised when the server (ESPARGOS controller) provides unexpected response. Is the server really ESPARGOS?"
pass
[docs]
class EspargosCsiStreamConnectionError(Exception):
"Raised when the CSI stream connection could not be established (e.g. magic packet not received)"
pass
[docs]
class EspargosAPIVersionError(Exception):
"Raised when the ESPARGOS controller runs an unsupported API version"
pass
[docs]
class FragmentReassembler:
def __init__(self, timeout_s: float = 5.0, logger=None):
self.timeout_s = timeout_s
self.logger = logger
self._entries = {}
[docs]
def clear(self):
self._entries.clear()
def _drop_stale(self, now: float):
stale_keys = [key for key, entry in self._entries.items() if now - entry["timestamp"] > self.timeout_s]
for key in stale_keys:
self._entries.pop(key, None)
[docs]
def push(self, fragments, now: float | None = None):
if now is None:
now = time.monotonic()
self._drop_stale(now)
completed_packets = []
for header, payload in fragments:
uid = int(header.uid)
total_fragments = int(header.total_fragments)
fragment_index = int(header.fragment_index)
if total_fragments <= 0 or fragment_index >= total_fragments:
if self.logger is not None:
self.logger.debug(f"Ignoring invalid jumbo fragment index {fragment_index}/{total_fragments} for uid {uid}")
self._entries.pop(uid, None)
continue
packet_antid = csi.csistream_uid_to_antid(uid)
entry = self._entries.get(uid)
if entry is None or entry["total_fragments"] != total_fragments:
entry = {
"timestamp": now,
"antid": packet_antid,
"total_fragments": total_fragments,
"parts": {},
}
self._entries[uid] = entry
elif entry["antid"] != packet_antid:
if self.logger is not None:
self.logger.warning(f"Received jumbo fragments with inconsistent UID-derived antid for uid {uid}")
self._entries.pop(uid, None)
continue
entry["timestamp"] = now
entry["parts"][fragment_index] = bytes(payload)
if len(entry["parts"]) != entry["total_fragments"]:
continue
if any(index not in entry["parts"] for index in range(entry["total_fragments"])):
continue
completed_packets.append((entry["antid"], b"".join(entry["parts"][index] for index in range(entry["total_fragments"]))))
self._entries.pop(uid, None)
return completed_packets
# Magic bytes sent by the controller as the first WebSocket frame to confirm a valid CSI stream connection
CSISTREAM_MAGIC = bytes([0xE5, 0xA7, 0x60, 0x00])
# Only this major API version is supported
SUPPORTED_API_MAJOR = 3
[docs]
class Board(object):
_csistream_timeout = 5
# Defaults for controller configuration
DEFAULT_CSI_ACQUIRE_CONFIG = {
"enable": True,
"acquire_csi_legacy": True,
"acquire_csi_force_lltf": False,
"compress_csi": False,
"acquire_csi_ht20": True,
"acquire_csi_ht40": True,
"acquire_csi_vht": True,
"acquire_csi_su": True,
"acquire_csi_mu": True,
"acquire_csi_dcm": True,
"acquire_csi_beamformed": True,
"acquire_csi_he_stbc_mode": 2,
"val_scale_cfg": 2,
"dump_ack_en": True,
"lltf_8bit_mode": False,
}
DEFAULT_CFO_CORRECTION_CONFIG = {
"auto": True,
"value": 0,
}
DEFAULT_GAIN_SETTINGS = {
"fft_scale_enable": False,
"fft_scale_value": 0,
"rx_gain_enable": False,
"rx_gain_value": 0,
}
DEFAULT_WIFI_CHANNEL_OVERRIDES = {
"override_active": False,
"channel-primary": [1] * 8,
"channel-secondary": [0] * 8,
}
def _gain_value_for_controller(self, key: str, values):
if isinstance(values, (str, bytes)):
return values
if np.asarray(values).ndim == 0:
return values
return self.revision.sensor_values_to_antid_list(values, name=key)
def _gain_settings_for_controller(self, settings: dict) -> dict:
return {key: self._gain_value_for_controller(key, value) for key, value in settings.items()}
def __init__(self, host: str):
"""
Constructor for the Board class. Tries to connect to the ESPARGOS controller at the given host and fetches configuration information.
:param host: The IP address or hostname of the ESPARGOS controller
:raises TimeoutError: If the connection to the ESPARGOS controller times out
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self.logger = logging.getLogger("pyespargos.board")
self.host = host
self._uart_client = None
self._transport_kind = "network"
if uart.is_uart_host(host):
self._transport_kind = "uart"
self._uart_client = uart.UARTClient(host)
self._uart_client.add_log_callback(self._handle_uart_log)
self._uart_client.connect()
try:
identification_raw = self._fetch("identify")
except TimeoutError:
self.logger.error(f"Could not connect to {self.host} to fetch identification information")
raise TimeoutError
if not "ESPARGOS-DENSIFLORUS" in identification_raw:
raise EspargosUnexpectedResponseError(f"Server at {self.host} does not look like an ESPARGOS controller. Check if the host is correct.")
try:
api_info_raw = self._fetch("api_info")
try:
api_info = json.loads(api_info_raw)
except (json.JSONDecodeError, TypeError):
raise EspargosUnexpectedResponseError(f"Server at {self.host} did not provide valid API information. Check if the host is correct and the server is running ESPARGOS firmware.")
except TimeoutError:
self.logger.error(f"Could not connect to {self.host} to fetch API information")
raise TimeoutError
except EspargosHTTPStatusError:
raise EspargosAPIVersionError(f"ESPARGOS controller at {self.host} did not provide API version information. " f"This version of pyespargos only supports API major version {SUPPORTED_API_MAJOR}. " "Please update the controller firmware.")
if "api-major" not in api_info or "api-minor" not in api_info:
raise EspargosUnexpectedResponseError(f"Server at {self.host} did not provide API version information in api_info response.")
api_major = api_info["api-major"]
api_minor = api_info.get("api-minor", 0)
if api_major != SUPPORTED_API_MAJOR:
raise EspargosAPIVersionError(
f"ESPARGOS controller at {self.host} runs API version {api_major}.{api_minor}, "
f"but this version of pyespargos only supports API major version {SUPPORTED_API_MAJOR}. " + ("Please update pyespargos." if api_major > SUPPORTED_API_MAJOR else "Please update the controller firmware.")
)
self.api_version = (api_major, api_minor)
self.revision = None
device = api_info.get("device", "")
revision_name = api_info.get("revision", "")
for rev in revisions.all_revisions:
if (device, revision_name) == rev.identification:
self.revision = rev
break
if self.revision is None:
raise EspargosUnexpectedResponseError(f"Unknown ESPARGOS revision: device={device!r}, revision={revision_name!r}")
self.netconf = json.loads(self._fetch("get_netconf"))
self.ip_info = json.loads(self._fetch("get_ip_info"))
self.wificonf = json.loads(self._fetch("get_wificonf"))
self.gain_settings = json.loads(self._fetch("get_gain_settings"))
self.csi_acquire_config = json.loads(self._fetch("get_csi_acquire_config"))
self.logger.info(f"Identified ESPARGOS at {self.ip_info['ip']} as {self.get_name()}")
self.csistream_connected = False
self.consumers = []
self._fragment_reassembler = FragmentReassembler(logger=self.logger)
[docs]
def get_name(self):
"""
Returns the hostname of the ESPARGOS controller as configured in the web interface.
:return: The hostname of the ESPARGOS controller
"""
return self.netconf["hostname"]
[docs]
def start(self, transports=None):
"""
Starts the CSI stream thread for the ESPARGOS controller. The thread will run indefinitely until the stop() method is called.
Supported transports:
- "udp": The controller will send CSI packets to a local UDP socket. This transport is lower-latency and more efficient (higher throughput), but may not work in all network environments.
- "websocket": The controller will send CSI packets over a WebSocket connection. This transport is more widely compatible but may have higher latency and overhead.
- "uart": The controller will stream CSI data over the local serial/UART link. This transport is only available for hosts specified as ``uart:<port>``.
:param transports: Optional list of transports to try, in order of preference. Valid values are "udp" and "websocket". If None (default), tries UDP first (if supported by API version) and then WebSocket.
:raises EspargosCsiStreamConnectionError: If neither UDP nor WebSocket CSI stream could be established
"""
if self._transport_kind == "uart":
transports = ["uart"] if transports is None else transports
elif transports is None:
transports = ["udp", "websocket"]
for transport in transports:
if transport == "uart":
uart_error = self._try_start_uart()
if uart_error is None:
return
self.logger.warning(f"UART CSI stream failed for {self.get_name()}: {uart_error}")
elif transport == "udp":
udp_error = self._try_start_udp()
if udp_error is None:
return
self.logger.warning(f"UDP CSI stream failed for {self.get_name()}: {udp_error}")
elif transport == "websocket":
ws_error = self._try_start_websocket()
if ws_error is None:
return
self.logger.warning(f"WebSocket CSI stream failed for {self.get_name()}: {ws_error}")
else:
self.logger.error(f"Unknown transport {transport} specified for {self.get_name()}, skipping")
raise EspargosCsiStreamConnectionError(f"Could not establish CSI stream to {self.host} via any of the enabled transports, tried transports: {transports}")
def _try_start_uart(self) -> str | None:
if self._uart_client is None:
return f"Host {self.host!r} is not a UART host"
self.logger.info(f"Trying UART CSI stream for {self.get_name()}")
def _callback(payload: bytes):
self._csistream_handle_message(payload)
self._uart_csi_callback = _callback
self._uart_client.add_csi_callback(self._uart_csi_callback)
try:
self._uart_client.enable_csi_stream()
except Exception as e:
self._uart_client.remove_csi_callback(self._uart_csi_callback)
return f"Could not enable UART CSI stream: {e}"
self._csistream_transport = "uart"
self.csistream_connected = True
self.logger.info(f"Started UART CSI stream for {self.get_name()} on {self.host}")
return None
def _try_start_udp(self) -> str | None:
"""
Try to start the CSI stream via UDP.
Returns None on success, or an error message string on failure.
"""
self.logger.info(f"Trying UDP CSI stream for {self.get_name()}")
# Resolve remote endpoint first so we can create a socket with the right family
try:
host_is_bracketed_ipv6 = self.host.startswith("[") and self.host.endswith("]")
udp_host = self.host[1:-1] if host_is_bracketed_ipv6 else self.host
host_for_ipv6_check = udp_host.split("%", 1)[0]
try:
socket.inet_pton(socket.AF_INET6, host_for_ipv6_check)
host_is_ipv6_literal = True
except OSError:
host_is_ipv6_literal = False
preferred_family = socket.AF_INET6 if (host_is_bracketed_ipv6 or host_is_ipv6_literal) else socket.AF_INET
udp_info = socket.getaddrinfo(udp_host, CSISTREAM_CONTROLLER_SRC_PORT, preferred_family, socket.SOCK_DGRAM)
if len(udp_info) == 0:
return f"Could not resolve UDP endpoint for host {self.host}"
udp_family, udp_socktype, udp_proto, _, udp_remote_addr = udp_info[0]
except OSError as e:
return f"Could not resolve UDP endpoint for host {self.host}: {e}"
# Open a local UDP socket on an ephemeral port
try:
udp_sock = socket.socket(udp_family, udp_socktype, udp_proto)
if udp_family == socket.AF_INET6:
udp_sock.bind(("::", 0, 0, 0))
else:
udp_sock.bind(("", 0))
udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16 * 1024 * 1024)
local_port = udp_sock.getsockname()[1]
except OSError as e:
return f"Could not create UDP socket: {e}"
# Send an empty packet to the controller's source port to punch a hole
# in the Windows firewall so that incoming UDP packets are allowed through
try:
udp_sock.sendto(b"", udp_remote_addr)
except OSError as e:
self.logger.warning(f"Could not send firewall-punch packet: {e}")
# Tell the server to start streaming to us via UDP
try:
res = self._fetch("csi_udp", json.dumps({"enable": True, "port": local_port}))
if res != "ok":
udp_sock.close()
return f"Server rejected UDP stream request: {res}"
except Exception as e:
udp_sock.close()
return f"HTTP request to enable UDP stream failed: {e}"
# Wait for the magic packet on the UDP socket
udp_sock.settimeout(3)
try:
data, addr = udp_sock.recvfrom(1024)
except socket.timeout:
udp_sock.close()
self._disable_udp_stream()
return "Timeout waiting for UDP magic packet"
except OSError as e:
udp_sock.close()
self._disable_udp_stream()
return f"Error receiving UDP magic packet: {e}"
if data != CSISTREAM_MAGIC:
udp_sock.close()
self._disable_udp_stream()
return f"Invalid UDP magic packet: expected {CSISTREAM_MAGIC.hex()}, got {data.hex()}"
# UDP stream established successfully
self._udp_sock = udp_sock
self._udp_remote_addr = udp_remote_addr
self._csistream_transport = "udp"
self.csistream_connected = True
self.csistream_thread = threading.Thread(target=self._csistream_loop_udp)
self.csistream_thread.start()
# Start keepalive thread that periodically sends empty packets to punch through the firewall
self._udp_keepalive_stop = threading.Event()
self._udp_keepalive_thread = threading.Thread(target=self._udp_keepalive_loop, daemon=True)
self._udp_keepalive_thread.start()
self.logger.info(f"Started UDP CSI stream for {self.get_name()} on local port {local_port}")
return None
def _try_start_websocket(self) -> str | None:
"""
Try to start the CSI stream via WebSocket.
Returns None on success, or an error message string on failure.
"""
self.logger.info(f"Trying WebSocket CSI stream for {self.get_name()}")
self._csistream_magic_event = threading.Event()
self._csistream_error = None
self._csistream_transport = "websocket"
self.csistream_thread = threading.Thread(target=self._csistream_loop_websocket)
self.csistream_thread.start()
# Only API version major 1 or greater sends magic packet
if not self._csistream_magic_event.wait(timeout=3):
self.csistream_connected = False
self.csistream_thread.join()
return "Did not receive WebSocket magic packet within 3 seconds"
if self._csistream_error is not None:
self.csistream_connected = False
self.csistream_thread.join()
return str(self._csistream_error)
self.logger.info(f"Started WebSocket CSI stream for {self.get_name()}")
return None
def _disable_udp_stream(self):
"""Tell the server to stop the UDP stream (best-effort)."""
try:
self._fetch("csi_udp", json.dumps({"enable": False}))
except Exception:
pass
[docs]
def stop(self):
"""
Stops the CSI stream thread for the ESPARGOS controller. The thread will stop after the current packet has been processed, or after a short timeout.
"""
if self.csistream_connected:
self.csistream_connected = False
if hasattr(self, "csistream_thread"):
self.csistream_thread.join()
if getattr(self, "_csistream_transport", None) == "udp":
if hasattr(self, "_udp_keepalive_stop"):
self._udp_keepalive_stop.set()
self._udp_keepalive_thread.join()
if hasattr(self, "_udp_sock"):
self._udp_sock.close()
self._disable_udp_stream()
elif getattr(self, "_csistream_transport", None) == "uart":
if hasattr(self, "_uart_csi_callback"):
self._uart_client.remove_csi_callback(self._uart_csi_callback)
if self._uart_client is not None:
self._uart_client.disable_csi_stream()
self.logger.info(f"Stopped CSI stream for {self.get_name()}")
[docs]
def close(self):
"""
Close transport resources associated with this board.
For UART-backed boards, this releases the serial port lock. Calling this on
network-backed boards is harmless.
"""
self.stop()
if self._uart_client is not None:
self._uart_client.close()
[docs]
def set_rfswitch(self, state: csi.rfswitch_state_t):
"""
Sets the RF switch state on the ESPARGOS controller for reception mode.
:param state: The RF switch state to set, must be one of :class:`csi.rfswitch_state_t`
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
res = self._fetch("set_rfswitch", str(int(state)))
if res != "ok":
self.logger.error(f"Invalid response: {res}")
raise EspargosUnexpectedResponseError
[docs]
def get_rfswitch(self) -> csi.rfswitch_state_t:
"""
Fetches the current RF switch state from the ESPARGOS controller.
:return: The current RF switch state as one of :class:`csi.rfswitch_state_t`
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
res = self._fetch("get_rfswitch")
try:
state_int = int(res)
# Check if valid enum value
if state_int not in [e.value for e in csi.rfswitch_state_t]:
raise EspargosUnexpectedResponseError("get_rfswitch returned invalid enum value")
return csi.rfswitch_state_t(state_int)
except (ValueError, KeyError):
self.logger.error(f"Invalid response: {res}")
raise EspargosUnexpectedResponseError
[docs]
def set_mac_filter(self, mac_filter: dict):
"""
Tell ESPARGOS board to only receive packets from transmitters with this sender MAC.
mac_filter is a dict with the following format::
{
"enable": true|false,
"mac": "00:11:22:33:44:55",
"mac_mask": "ff:ff:ff:ff:ff:ff"
}
The "enable" field toggles MAC filtering. When enabled, only packets from transmitters
whose MAC address matches the given "mac" (applying the "mac_mask") will be received.
"mac_mask" is a bitmask applied to both the configured MAC and the sender MAC before comparison.
Only provided fields will be changed; others will remain as previously configured.
:param mac_filter: MAC filter configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_mac_filter", mac_filter)
[docs]
def get_mac_filter(self) -> dict:
"""
Fetches the current MAC filter configuration from the ESPARGOS controller.
The returned JSON/dict matches what is configured via :meth:`set_mac_filter` / :meth:`clear_mac_filter`.
Format::
{
"enable": true|false,
"mac": "00:11:22:33:44:55",
"mac_mask": "ff:ff:ff:ff:ff:ff"
}
:return: MAC filter configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
return self._get_json("get_mac_filter")
[docs]
def clear_mac_filter(self):
"""
Tell ESPARGOS board to receive packets from all transmitters.
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_mac_filter", {"enable": False})
[docs]
def set_wificonf(self, wificonf: dict):
"""
Sets WiFi configuration on the ESPARGOS controller.
The controller expects a Python dict with fixed field names
using hyphenated keys. Expected format::
{
"calib-mode": 1,
"calib-source": 0,
"channel-primary": 13,
"channel-secondary": 2,
"country-code": "DE",
"calib-txpower": 34,
"calib-interval": 10
}
Field meanings / types (as used by the controller firmware):
- "calib-mode" (int): When to generate phase reference packets:
- 0: Never generate calibration packets
- 1: Generate calibration packets if receiver RF switch is in reference channel configuration
- 2: Always generate calibration packets
- "calib-source" (int): Configures REFIN / REFOUT ports of controller:
- 0: Use internal clock and phase reference for antennas
- 1: Output clock and phase reference on REFOUT port, antennas expect clock and calibration from REFIN port (master mode)
- 2: Antennas expect clock and calibration from REFIN port, do not output anything on REFOUT (slave mode)
- "channel-primary" (int): Primary WiFi channel.
- "channel-secondary" (int): Secondary channel selector (e.g. 0 = None, 1 = Above, 2 = Below).
- "country-code" (str): Two-letter country code (e.g. "DE").
- "calib-txpower" (int): TX power used for calibration packets (between 8 = 2dBm and 80 = 20dBm).
- "calib-interval" (int): Calibration interval (milliseconds).
:param wificonf: WiFi configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_wificonf", wificonf)
[docs]
def get_wificonf(self) -> dict:
"""
Fetches the current WiFi configuration from the ESPARGOS controller.
The returned JSON/dict uses the same hyphenated keys as accepted by :meth:`set_wificonf`,
e.g. contains fields like "channel-primary", "channel-secondary", "country-code", etc.
:return: WiFi configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
return self._get_json("get_wificonf")
[docs]
def set_csi_acquire_config(self, config: dict):
"""
Sets the CSI acquisition configuration on the ESPARGOS controller.
The controller expects a JSON object (provided here as a Python dict) with integer
fields (use 0/1 for booleans). Field names are fixed.
Boolean toggles:
- enable: Enable to acquire CSI.
- acquire_csi_legacy: Enable to acquire L-LTF when receiving a 11g PPDU.
- acquire_csi_force_lltf: Force receiver to acquire L-LTF, regardless of PPDU type.
- compress_csi: Transform CSI to a time-domain CIR before transport.
- acquire_csi_ht20: Enable to acquire HT-LTF when receiving an HT20 PPDU.
- acquire_csi_ht40: Enable to acquire HT-LTF when receiving an HT40 PPDU.
- acquire_csi_vht: Present in the HTTP API; semantics depend on firmware build / PHY mode support.
- acquire_csi_su: Enable to acquire HE-LTF when receiving an HE20 SU PPDU.
- acquire_csi_mu: Enable to acquire HE-LTF when receiving an HE20 MU PPDU.
- acquire_csi_dcm: Enable to acquire HE-LTF when receiving an HE20 DCM applied PPDU.
- acquire_csi_beamformed: Enable to acquire HE-LTF when receiving an HE20 Beamformed applied PPDU.
- dump_ack_en: Enable to dump 802.11 ACK frame, default disabled.
- lltf_8bit_mode: Report L-LTF CSI as 8-bit values for every subcarrier instead of sparse 12-bit values.
Integer / enum fields:
- acquire_csi_he_stbc_mode: When receiving an STBC applied HE PPDU:
0 = acquire the complete HE-LTF1
1 = acquire the complete HE-LTF2
2 = sample evenly among the HE-LTF1 and HE-LTF2.
- val_scale_cfg: Value 0-8.
Example payload::
{
"enable": true,
"acquire_csi_legacy": true,
"acquire_csi_force_lltf": false,
"compress_csi": false,
"acquire_csi_ht20": true,
"acquire_csi_ht40": true,
"acquire_csi_vht": true,
"acquire_csi_su": true,
"acquire_csi_mu": true,
"acquire_csi_dcm": true,
"acquire_csi_beamformed": true,
"acquire_csi_he_stbc_mode": 2,
"val_scale_cfg": 2,
"dump_ack_en": true,
"lltf_8bit_mode": false
}
:param config: CSI acquisition configuration dict (will be JSON-encoded and POSTed to /set_csi_acquire_config)
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
payload = dict(config)
if "lltf_8bit_mode" in payload and "lltf_bit_mode" not in payload:
payload["lltf_bit_mode"] = payload["lltf_8bit_mode"]
self._post_json_ok("set_csi_acquire_config", payload)
[docs]
def get_csi_acquire_config(self) -> dict:
"""
Fetches the current CSI acquisition configuration from the ESPARGOS controller.
:return: CSI acquisition configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
config = self._get_json("get_csi_acquire_config")
if "lltf_8bit_mode" not in config and "lltf_bit_mode" in config:
config["lltf_8bit_mode"] = config["lltf_bit_mode"]
return config
[docs]
def set_cfo_correction(self, auto: bool, value: int = 0):
"""
Configures receiver CFO correction on the ESPARGOS controller.
When ``auto`` is false, the receiver frequency-offset estimate is forced to
``value`` (signed 13-bit NRXFOE reg_foe_force field, -4096..4095). A value
of zero can reduce packet-to-packet phase noise in radar mode when TX and
RX share the same reference clock.
"""
self._post_json_ok("set_cfo_correction", {"auto": bool(auto), "value": int(value)})
[docs]
def get_cfo_correction(self) -> dict:
"""
Fetches receiver CFO correction configuration from the ESPARGOS controller.
"""
return self._get_json("get_cfo_correction")
[docs]
def set_gain_settings(self, settings: dict):
"""
Sets the gain settings on the ESPARGOS controller.
The gain settings are provided as a JSON object (here as a Python dict) with fixed field names.
Values may be scalars, or arrays/lists with shape ``(2, 4)`` to configure
each sensor individually in board ``(row, column)`` order.
- fft_scale_enable (bool): Enable manual FFT scaling (false = automatic/firmware default).
- fft_scale_value (int): FFT scale value (meaning/range depends on firmware; commonly 0 when disabled).
- rx_gain_enable (bool): Enable manual RX gain (false = automatic/firmware default).
- rx_gain_value (int): RX gain table index, 0..76 (commonly 0 when disabled).
- expert_mode_enable (bool): Enable raw expert gain-table entry override.
- expert_mode_raw (str): Raw gain-table entry as hexadecimal string.
Example payload::
{
"fft_scale_enable": false,
"fft_scale_value": 0,
"rx_gain_enable": false,
"rx_gain_value": 0
}
:param settings: Gain settings dict (will be JSON-encoded and POSTed to /set_gain_settings)
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_gain_settings", self._gain_settings_for_controller(settings))
[docs]
def get_gain_settings(self) -> dict:
"""
Fetches the current gain settings from the ESPARGOS controller.
:return: Gain settings dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
return self._get_json("get_gain_settings")
[docs]
def set_wifi_channel_overrides(self, settings: dict):
"""
Sets per-sensor WiFi channel overrides on the ESPARGOS controller.
The payload mirrors the controller's ``/set_wifi_channel_overrides`` API:
- ``override_active`` (bool): Enable per-sensor channel overrides.
- ``channel-primary`` (list[int], length 8): Primary WiFi channel for each sensor.
- ``channel-secondary`` (list[int], length 8): Secondary channel selector for each sensor
(0 = none, 1 = above, 2 = below).
Passing ``{"override_active": False}`` disables the overrides.
:param settings: Per-sensor WiFi channel override settings dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_wifi_channel_overrides", settings)
[docs]
def get_wifi_channel_overrides(self) -> dict:
"""
Fetches the current per-sensor WiFi channel overrides from the ESPARGOS controller.
:return: Per-sensor WiFi channel override settings dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
return self._get_json("get_wifi_channel_overrides")
[docs]
def set_radar_config(self, config: dict):
"""
Sets the low-level radar TX configuration on the ESPARGOS controller.
The payload mirrors the controller's ``/set_tx_control`` API. Supported fields are:
- ``rfswitch_state`` (int)
- ``active_by_antid`` (list[bool], length 8)
- ``start_by_antid`` (list[int], length 8)
- ``period_by_antid`` (list[int], length 8)
- ``mac_by_antid`` (list[str], length 8, MAC addresses like ``"72:61:64:61:72:00"``)
- ``tx_power`` (int)
- ``tx_phymode`` (int)
- ``tx_rate`` (int)
Only provided fields are changed; others remain unchanged on the controller.
:param config: Radar TX configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
self._post_json_ok("set_tx_control", config)
[docs]
def get_radar_config(self) -> dict:
"""
Fetches the current low-level radar TX configuration from the ESPARGOS controller.
The returned dict mirrors the controller's ``/get_tx_control`` response and contains fields such as
``rfswitch_state``, ``active_by_antid``, ``start_by_antid``, ``period_by_antid``, ``mac_by_antid``,
``tx_power``, ``tx_phymode``, and ``tx_rate``.
:return: Radar TX configuration dict
:raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller or the request was invalid
"""
return self._get_json("get_tx_control")
[docs]
def reboot(self):
"""
Trigger a controller reboot.
The controller responds with ``"ok"`` and then reboots shortly after
sending the reply.
:raises EspargosUnexpectedResponseError: If the server at the given host
is not an ESPARGOS controller or the request was invalid
"""
res = self._fetch("reboot")
if res != "ok":
self.logger.error(f"Invalid response: {res}")
raise EspargosUnexpectedResponseError(str(res))
[docs]
def add_consumer(self, clist: list, cv: threading.Condition, *args):
"""
Adds a consumer to the CSI stream.
A consumer is defined by a list, a condition variable and additional arguments.
When a CSI packet is received, it will be appended to the list, and the condition variable will be notified.
:param clist: A list to which the CSI packet will be appended. The entry added to the list is a tuple :code:`(esp_num, serialized_csi, *args)`,
where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and :code:`*args` are the additional arguments.
:param cv: A condition variable that will be notified when a CSI packet is received
:param args: Additional arguments that will be added to the list along with the CSI packet
"""
self.consumers.append((clist, cv, args))
def _csistream_handle_message(self, message):
try:
jumbo = csi.parse_csistream_jumbo_message(message)
fragments = list(csi.iter_csistream_fragments(jumbo))
except ValueError as exc:
self.logger.debug(f"Ignoring malformed CSI stream message: {exc}")
return
completed_packets = self._fragment_reassembler.push(fragments)
for packet_antid, packet_payload in completed_packets:
try:
serialized_csi = csi.deserialize_packet_buffer(self.revision, packet_payload)
except (AssertionError, ValueError):
self.logger.debug("Ignoring CSI payload with unexpected logical type header")
continue
serialized_csi.antid = packet_antid
packet_esp_num = self.revision.antid_to_esp_num[packet_antid]
for clist, cv, args in self.consumers:
with cv:
clist.append((packet_esp_num, serialized_csi, *args))
cv.notify()
def _udp_keepalive_loop(self):
"""Periodically send empty UDP packets to the controller to keep the firewall hole open."""
while not self._udp_keepalive_stop.wait(timeout=1.0):
try:
self._udp_sock.sendto(b"", self._udp_remote_addr)
except OSError:
break
def _csistream_loop_udp(self):
self._udp_sock.settimeout(0.2)
timeout_total = 0
while self.csistream_connected:
try:
data, addr = self._udp_sock.recvfrom(65535)
timeout_total = 0
self._csistream_handle_message(data)
except socket.timeout:
timeout_total += 0.2
except OSError as e:
self.logger.error(f"Board {self.host} has error in UDP socket: {e}")
self.csistream_connected = False
break
if timeout_total > self._csistream_timeout:
self.logger.warning("UDP timeout, disconnecting")
self.csistream_connected = False
def _csistream_loop_websocket(self):
try:
ws = websockets.sync.client.connect("ws://" + self.host + "/csi", close_timeout=0.5)
except Exception as e:
self._csistream_error = EspargosCsiStreamConnectionError(f"Could not connect to CSI stream WebSocket on {self.host}: {e}")
self._csistream_magic_event.set()
return
with ws as websocket:
# Do not wait for magic packet, no need for that when using websockets
self.csistream_connected = True
self._csistream_magic_event.set()
timeout_total = 0
timeout_once = 0.2
while self.csistream_connected:
try:
message = websocket.recv(timeout_once)
if message == CSISTREAM_MAGIC:
# Ignore magic packet, only relevant for UDP transport
continue
timeout_total = 0
self._csistream_handle_message(message)
except TimeoutError:
timeout_total = timeout_total + timeout_once
except Exception as e:
self.logger.error(f"Board {self.host} has error in websocket: {e}")
self.csistream_connected = False
break
if timeout_total > self._csistream_timeout:
self.logger.warning("WebSocket timeout, disconnecting")
self.csistream_connected = False
def _fetch(self, path, data=None):
method = "GET" if data is None else "POST"
if self._uart_client is not None:
response = self._uart_client.request(method, path, data, timeout=5)
if response.status != 200:
raise EspargosHTTPStatusError(response.status, path, response.body_text())
return response.body_text()
conn = http.client.HTTPConnection(self.host, timeout=5)
conn.request(method, "/" + path, data)
try:
res = conn.getresponse()
except TimeoutError:
self.logger.error(f"Timeout in HTTP request for {self.host}/{path}")
raise TimeoutError
if res.status != 200:
body = res.read().decode("utf-8", errors="replace")
raise EspargosHTTPStatusError(res.status, path, body)
return res.read().decode("utf-8")
def _post_json_ok(self, path: str, payload: dict):
"""
POST JSON payload to `/<path>` and require literal response 'ok'.
"""
res = self._fetch(path, json.dumps(payload))
if res != "ok":
self.logger.error(f"Invalid response: {res}")
raise EspargosUnexpectedResponseError(str(res))
def _get_json(self, path: str) -> dict:
"""
GET `/<path>` and parse response as JSON.
"""
res = self._fetch(path)
try:
return json.loads(res)
except json.JSONDecodeError:
self.logger.error(f"Invalid response: {res}")
raise EspargosUnexpectedResponseError(str(res))
def _handle_uart_log(self, message: str):
self.logger.info(f"[device] {message.rstrip()}")