Source code for espargos.uart

#!/usr/bin/env python

import errno
import logging
import os
import queue
import struct
import threading
import time
import termios

import serial
import serial.tools.list_ports

from . import csi

UART_PROTOCOL_VERSION = 1

FRAME_TYPE_HELLO_REQ = 0x01
FRAME_TYPE_HELLO_RESP = 0x02
FRAME_TYPE_RPC_REQ = 0x10
FRAME_TYPE_RPC_RESP = 0x11
FRAME_TYPE_CSI_DATA = 0x20
FRAME_TYPE_STREAM_CTRL = 0x21
FRAME_TYPE_LOG = 0x30
UART_ACTIVATION_TOKEN = b"ESPARGOS-UART-MODE\n"

STREAM_ID_CSI = 1
STREAM_ID_TRANSPORT = 0

RPC_METHOD_GET = 0
RPC_METHOD_POST = 1

DEFAULT_UART_BAUDRATE = 3000000
FT231XQ_UART_BAUDRATE = 2000000
CP2102C_UART_BAUDRATE = 3000000
DEFAULT_BOOT_BAUDRATE = 115200
DEFAULT_TIMEOUT = 5.0
DEFAULT_READ_TIMEOUT = 0.001
DEFAULT_LATENCY_TIMER_MS = 1
DEFAULT_ACTIVATION_RETRY_INTERVAL = 0.05
DEFAULT_KEEPALIVE_INTERVAL = 1.0
DEFAULT_MODEM_IDLE_DTR = False
DEFAULT_MODEM_IDLE_RTS = False
USB_UART_BAUDRATES = {
    0x6015: FT231XQ_UART_BAUDRATE,  # FTDI FT231XQ USB UART
    0xEA64: CP2102C_UART_BAUDRATE,  # SiLabs CP2102C USB to UART Bridge Controller
}
NO_RESET_MODEM_STATES = {
    0x6015: (True, True),  # FTDI FT231X USB UART
    0xEA64: (True, True),  # SiLabs CP2102C USB to UART Bridge Controller
}


[docs] class UARTProtocolError(Exception): "Raised when the ESPARGOS UART protocol encounters malformed frames or unexpected responses." pass
[docs] class UARTTimeoutError(TimeoutError): "Raised when a UART request times out." pass
[docs] def is_uart_host(host: str) -> bool: return host.startswith("uart:")
[docs] def parse_uart_host(host: str) -> tuple[str, dict]: if not is_uart_host(host): raise ValueError(f"Unsupported UART host specifier: {host!r}") spec = host[len("uart:") :] if not spec: raise ValueError("UART host specifier is empty") device = spec params = {} if "@" in device: device, baud = device.rsplit("@", 1) if not baud or not baud.isdigit(): raise ValueError(f"Invalid UART baudrate in host specifier: {host!r}") params["baud"] = baud if not device: raise ValueError("UART device path is empty") return device, params
[docs] def cobs_encode(data: bytes) -> bytes: if not data: return b"\x01" out = bytearray() code_index = 0 out.append(0) code = 1 for b in data: if b == 0: out[code_index] = code code_index = len(out) out.append(0) code = 1 else: out.append(b) code += 1 if code == 0xFF: out[code_index] = code code_index = len(out) out.append(0) code = 1 out[code_index] = code return bytes(out)
[docs] def cobs_decode(data: bytes) -> bytes: if not data: raise UARTProtocolError("Received empty COBS frame") out = bytearray() idx = 0 length = len(data) while idx < length: code = data[idx] if code == 0: raise UARTProtocolError("COBS frame contains zero byte") idx += 1 end = idx + code - 1 if end > length: raise UARTProtocolError("COBS frame is truncated") out.extend(data[idx:end]) idx = end if code != 0xFF and idx < length: out.append(0) return bytes(out)
def _build_frame(frame_type: int, request_id: int, payload: bytes) -> bytes: header = struct.pack("<BBHI", UART_PROTOCOL_VERSION, frame_type, request_id & 0xFFFF, len(payload)) return cobs_encode(header + payload) + b"\x00" def _parse_frame(frame: bytes) -> tuple[int, int, bytes]: decoded = cobs_decode(frame) if len(decoded) < 8: raise UARTProtocolError("Frame too short") version, frame_type, request_id, payload_len = struct.unpack("<BBHI", decoded[:8]) if version != UART_PROTOCOL_VERSION: raise UARTProtocolError(f"Unsupported UART protocol version {version}") payload = decoded[8:] if len(payload) != payload_len: raise UARTProtocolError("Frame payload length mismatch") return frame_type, request_id, payload
[docs] class UARTControlResponse: def __init__(self, status: int, content_type: str, body: bytes): self.status = status self.content_type = content_type self.body = body
[docs] def body_text(self) -> str: return self.body.decode("utf-8")
[docs] class UARTClient: def __init__(self, host: str, *, timeout: float = DEFAULT_TIMEOUT): self.logger = logging.getLogger("pyespargos.uart") self.timeout = timeout self.device, params = parse_uart_host(host) self.boot_baudrate = int(params.get("boot_baud", DEFAULT_BOOT_BAUDRATE)) self.baudrate = int(params.get("baud", self._detect_default_baudrate())) self.read_timeout = float(params.get("read_timeout", DEFAULT_READ_TIMEOUT)) self.latency_timer_ms = int(params.get("latency_ms", DEFAULT_LATENCY_TIMER_MS)) self.activation_retry_interval = float(params.get("activation_interval", DEFAULT_ACTIVATION_RETRY_INTERVAL)) self.keepalive_interval = float(params.get("keepalive_interval", DEFAULT_KEEPALIVE_INTERVAL)) self._serial = None self._reader_thread = None self._keepalive_thread = None self._stop_event = threading.Event() self._write_lock = threading.Lock() self._request_lock = threading.Lock() self._pending: dict[int, queue.Queue] = {} self._pending_lock = threading.Lock() self._reqid = 1 self._reqid_lock = threading.Lock() self._csi_callbacks = [] self._log_callbacks = [] self._rx_buffer = bytearray() self._connected = False self._reader_running = False self._latency_timer_restore_path = None self._latency_timer_restore_value = None self._modem_idle_state = self._detect_modem_idle_state()
[docs] def connect(self): if self._connected: return self._stop_event.clear() self._serial = self._open_serial(self.baudrate) try: self.hello(timeout=min(1.0, self.timeout)) except (UARTProtocolError, UARTTimeoutError, serial.SerialException): self._activate_transport_mode() self.logger.info(f"Connected to ESPARGOS UART on {self.device}") self._connected = True self._start_keepalive_thread()
def _activate_transport_mode(self) -> None: if self._serial is None: raise UARTProtocolError("UART is not connected") self._serial.baudrate = self.boot_baudrate self._serial.reset_input_buffer() self._serial.reset_output_buffer() self._serial.write(f"ESPARGOS-UART-MODE:{self.baudrate}\n".encode("ascii")) self._serial.flush() time.sleep(max(self.activation_retry_interval, 0.1)) self._serial.baudrate = self.baudrate time.sleep(0.1) self._serial.reset_input_buffer() self._serial.reset_output_buffer() self.hello()
[docs] def close(self): if self._serial is None: return self._stop_event.set() if self._keepalive_thread is not None: self._keepalive_thread.join(timeout=1.0) self._keepalive_thread = None try: self.disable_csi_stream() except Exception: pass try: self.disable_transport_mode() except Exception: pass if self._reader_thread is not None: self._reader_thread.join(timeout=1.0) self._reader_thread = None self._reader_running = False try: try: self._apply_modem_idle_state(self._serial) except Exception: pass self._serial.close() finally: self._restore_latency_timer() self._serial = None self._connected = False
[docs] def add_csi_callback(self, callback): self._csi_callbacks.append(callback)
[docs] def remove_csi_callback(self, callback): if callback in self._csi_callbacks: self._csi_callbacks.remove(callback)
[docs] def add_log_callback(self, callback): self._log_callbacks.append(callback)
[docs] def remove_log_callback(self, callback): if callback in self._log_callbacks: self._log_callbacks.remove(callback)
[docs] def hello(self, timeout: float | None = None) -> dict: payload = self._request_frame(FRAME_TYPE_HELLO_REQ, b"", timeout=timeout) if len(payload) == 0: return {} if len(payload) < 8: raise UARTProtocolError("HELLO response too short") device_len, revision_len, api_major, api_minor = struct.unpack("<HHHH", payload[:8]) expected = 8 + device_len + revision_len if len(payload) != expected: raise UARTProtocolError("HELLO response has invalid payload length") device = payload[8 : 8 + device_len].decode("utf-8") revision = payload[8 + device_len : expected].decode("utf-8") return { "device": device, "revision": revision, "api-major": api_major, "api-minor": api_minor, }
[docs] def request(self, method: str, path: str, body: bytes | str | None = None, timeout: float | None = None) -> UARTControlResponse: if body is None: body_bytes = b"" elif isinstance(body, bytes): body_bytes = body else: body_bytes = str(body).encode("utf-8") method_id = RPC_METHOD_GET if method.upper() == "GET" else RPC_METHOD_POST path_bytes = path.encode("utf-8") payload = struct.pack("<BHI", method_id, len(path_bytes), len(body_bytes)) + path_bytes + body_bytes response = self._request_frame(FRAME_TYPE_RPC_REQ, payload, timeout=timeout) if len(response) < 8: raise UARTProtocolError("RPC response too short") status, content_type_len, body_len = struct.unpack("<HHI", response[:8]) expected = 8 + content_type_len + body_len if len(response) != expected: raise UARTProtocolError("RPC response length mismatch") content_type = response[8 : 8 + content_type_len].decode("utf-8") body = response[8 + content_type_len : expected] return UARTControlResponse(status, content_type, body)
[docs] def enable_csi_stream(self): self._ensure_reader_thread() self._send_frame(FRAME_TYPE_STREAM_CTRL, 0, struct.pack("<BB", STREAM_ID_CSI, 1))
[docs] def disable_csi_stream(self): self._send_frame(FRAME_TYPE_STREAM_CTRL, 0, struct.pack("<BB", STREAM_ID_CSI, 0))
[docs] def disable_transport_mode(self): self._send_frame(FRAME_TYPE_STREAM_CTRL, 0, struct.pack("<BB", STREAM_ID_TRANSPORT, 0)) if self._serial is not None: self._serial.flush() time.sleep(0.05)
def _send_keepalive(self): self._send_frame(FRAME_TYPE_STREAM_CTRL, 0, struct.pack("<BB", STREAM_ID_TRANSPORT, 1)) def _request_frame(self, frame_type: int, payload: bytes, timeout: float | None = None) -> bytes: if self._reader_running: return self._request_frame_async(frame_type, payload, timeout) return self._request_frame_sync(frame_type, payload, timeout) def _request_frame_async(self, frame_type: int, payload: bytes, timeout: float | None = None) -> bytes: reqid = self._allocate_request_id() q = queue.Queue(maxsize=1) with self._pending_lock: self._pending[reqid] = q try: self._send_frame(frame_type, reqid, payload) try: response_type, response_payload = q.get(timeout=self.timeout if timeout is None else timeout) except queue.Empty as exc: raise UARTTimeoutError(f"Timed out waiting for UART response to request {reqid}") from exc expected_type = FRAME_TYPE_HELLO_RESP if frame_type == FRAME_TYPE_HELLO_REQ else FRAME_TYPE_RPC_RESP if response_type != expected_type: raise UARTProtocolError(f"Unexpected response type 0x{response_type:02x} for request {reqid}") return response_payload finally: with self._pending_lock: self._pending.pop(reqid, None) def _request_frame_sync(self, frame_type: int, payload: bytes, timeout: float | None = None) -> bytes: reqid = self._allocate_request_id() expected_type = FRAME_TYPE_HELLO_RESP if frame_type == FRAME_TYPE_HELLO_REQ else FRAME_TYPE_RPC_RESP timeout = self.timeout if timeout is None else timeout with self._request_lock: self._send_frame(frame_type, reqid, payload) end_time = time.monotonic() + timeout while True: remaining = end_time - time.monotonic() if remaining <= 0: raise UARTTimeoutError(f"Timed out waiting for UART response to request {reqid}") frame_type_rx, request_id_rx, payload_rx = self._read_one_frame(timeout=min(0.5, remaining)) if frame_type_rx is None: continue if request_id_rx == reqid and frame_type_rx == expected_type: return payload_rx self._handle_frame(frame_type_rx, request_id_rx, payload_rx) def _allocate_request_id(self) -> int: with self._reqid_lock: reqid = self._reqid self._reqid = 1 if self._reqid == 0xFFFF else self._reqid + 1 return reqid def _send_frame(self, frame_type: int, request_id: int, payload: bytes): if self._serial is None: raise UARTProtocolError("UART is not connected") data = _build_frame(frame_type, request_id, payload) with self._write_lock: self._serial.write(data) self._serial.flush() def _ensure_reader_thread(self): if self._reader_running: return self._stop_event.clear() self._reader_thread = threading.Thread(target=self._reader_loop, name=f"uart-reader-{self.device}", daemon=True) self._reader_thread.start() self._reader_running = True def _start_keepalive_thread(self): if self.keepalive_interval <= 0 or self._keepalive_thread is not None: return self._keepalive_thread = threading.Thread(target=self._keepalive_loop, name=f"uart-keepalive-{self.device}", daemon=True) self._keepalive_thread.start() def _keepalive_loop(self): while not self._stop_event.wait(self.keepalive_interval): try: self._send_keepalive() except Exception as exc: self.logger.debug(f"Stopping UART keepalive for {self.device}: {exc}") break def _open_serial(self, baudrate: int) -> serial.Serial: # Known ESPARGOS USB-UART adapters need specific modem-control levels # applied before opening the port, otherwise the standard ESP32 auto- # reset circuitry reboots the board as soon as the port is opened. ser = serial.Serial( port=None, baudrate=baudrate, timeout=self.read_timeout, exclusive=True, xonxoff=False, rtscts=False, dsrdtr=False, ) self._apply_modem_idle_state(ser) ser.port = self.device ser.open() self._configure_termios(ser) self._apply_modem_idle_state(ser) self._apply_low_latency_tuning() ser.reset_input_buffer() ser.reset_output_buffer() return ser def _configure_termios(self, ser: serial.Serial): if os.name != "posix": return try: attrs = termios.tcgetattr(ser.fileno()) except Exception as exc: self.logger.debug(f"Could not read termios state for {self.device}: {exc}") return attrs[2] |= termios.CLOCAL if hasattr(termios, "HUPCL"): attrs[2] &= ~termios.HUPCL try: termios.tcsetattr(ser.fileno(), termios.TCSANOW, attrs) except Exception as exc: self.logger.debug(f"Could not update termios state for {self.device}: {exc}") def _apply_modem_idle_state(self, ser: serial.Serial): ser.dtr, ser.rts = self._modem_idle_state def _matching_port_info(self): device_realpath = os.path.realpath(self.device) for portinfo in serial.tools.list_ports.comports(): if os.path.realpath(portinfo.device) != device_realpath: continue return portinfo return None def _detect_default_baudrate(self) -> int: portinfo = self._matching_port_info() if portinfo is not None and portinfo.pid in USB_UART_BAUDRATES: baudrate = USB_UART_BAUDRATES[portinfo.pid] self.logger.debug(f"Using {baudrate} baud for USB-UART adapter PID 0x{portinfo.pid:04x}") return baudrate return DEFAULT_UART_BAUDRATE def _detect_modem_idle_state(self) -> tuple[bool, bool]: portinfo = self._matching_port_info() if portinfo is not None: if portinfo.pid in NO_RESET_MODEM_STATES: return NO_RESET_MODEM_STATES[portinfo.pid] return DEFAULT_MODEM_IDLE_DTR, DEFAULT_MODEM_IDLE_RTS def _apply_low_latency_tuning(self): if os.name != "posix": return devname = os.path.basename(os.path.realpath(self.device)) latency_path = f"/sys/class/tty/{devname}/device/latency_timer" if not os.path.exists(latency_path): return try: current = open(latency_path, "r", encoding="ascii").read().strip() except OSError as exc: self.logger.debug(f"Could not read UART latency timer for {self.device}: {exc}") return if current == str(self.latency_timer_ms): return try: with open(latency_path, "w", encoding="ascii") as f: f.write(str(self.latency_timer_ms)) except OSError as exc: if exc.errno not in (errno.EACCES, errno.EROFS, errno.EPERM): self.logger.debug(f"Could not set UART latency timer for {self.device}: {exc}") return self._latency_timer_restore_path = latency_path self._latency_timer_restore_value = current self.logger.info(f"Set UART latency timer for {self.device} to {self.latency_timer_ms} ms") def _restore_latency_timer(self): if self._latency_timer_restore_path is None or self._latency_timer_restore_value is None: return try: with open(self._latency_timer_restore_path, "w", encoding="ascii") as f: f.write(self._latency_timer_restore_value) except OSError: pass finally: self._latency_timer_restore_path = None self._latency_timer_restore_value = None def _read_one_frame(self, timeout: float) -> tuple[int | None, int | None, bytes | None]: end_time = time.monotonic() + timeout while True: try: delimiter = self._rx_buffer.index(0) except ValueError: delimiter = -1 if delimiter >= 0: raw_frame = bytes(self._rx_buffer[:delimiter]) del self._rx_buffer[: delimiter + 1] if not raw_frame: continue try: return _parse_frame(raw_frame) except UARTProtocolError as exc: self.logger.debug(f"Ignoring invalid UART frame while resynchronizing: {exc}") continue remaining = end_time - time.monotonic() if remaining <= 0: return None, None, None chunk = self._serial.read(self._serial.in_waiting or 4096) if not chunk: continue self._rx_buffer.extend(chunk) def _reader_loop(self): while not self._stop_event.is_set(): try: frame_type, request_id, payload = self._read_one_frame(timeout=0.2) except serial.SerialException as exc: self.logger.error(f"UART read error on {self.device}: {exc}") break if frame_type is None: continue self._handle_frame(frame_type, request_id, payload) self._reader_running = False def _handle_frame(self, frame_type: int, request_id: int, payload: bytes): if frame_type in (FRAME_TYPE_HELLO_RESP, FRAME_TYPE_RPC_RESP): with self._pending_lock: q = self._pending.get(request_id) if q is not None: q.put((frame_type, payload)) return if frame_type == FRAME_TYPE_CSI_DATA: for callback in list(self._csi_callbacks): callback(payload) return if frame_type == FRAME_TYPE_LOG: text = payload.decode("utf-8", errors="replace") for callback in list(self._log_callbacks): callback(text) return self.logger.debug(f"Ignoring UART frame type 0x{frame_type:02x}")
[docs] def validate_csistream_payload(payload: bytes, revision) -> bool: del revision if len(payload) < 4: return False try: jumbo = csi.parse_csistream_jumbo_message(payload) for _header, _fragment in csi.iter_csistream_fragments(jumbo): pass except ValueError: return False return True