Source code for espargos.board

#!/usr/bin/env python3

import websockets.sync.client
import http.client
import threading
import logging
import ctypes
import json
from . import csi


[docs] class EspargosHTTPStatusError(Exception): "Raised when the ESPARGOS HTTP API returns an invalid status code" pass
[docs] class EspargosUnexpectedResponseError(Exception): "Raised when the server (ESPARGOS controller) provides unexpected response. Is the server really ESPARGOS?" pass
[docs] class Board(object): _csistream_timeout = 5 def __init__(self, host): """ Conststructor for the Board class. Tries to connect to the ESPARGOS controller at the given host and fetches configuration information. :param host: The IP address or hostname of the ESPARGOS controller :raises TimeoutError: If the connection to the ESPARGOS controller times out :raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller """ self.logger = logging.getLogger("pyespargos.board") self.host = host try: identification = self._fetch("identify") except TimeoutError: self.logger.error(f"Could not connect to {self.host}") raise TimeoutError if identification != "ESPARGOS": raise EspargosUnexpectedResponseError self.netconf = json.loads(self._fetch("get_netconf")) self.ip_info = json.loads(self._fetch("get_ip_info")) self.wificonf = json.loads(self._fetch("get_wificonf")) self.logger.info( f"Identified ESPARGOS at {self.ip_info['ip']} as {self.get_name()}") self.csistream_connected = True self.consumers = []
[docs] def get_name(self): """ Returns the hostname of the ESPARGOS controller as configured in the web interface. :return: The hostname of the ESPARGOS controller """ return self.netconf["hostname"]
[docs] def start(self): """ Starts the CSI stream thread for the ESPARGOS controller. The thread will run indefinitely until the stop() method is called. """ self.csistream_thread = threading.Thread(target=self._csistream_loop) self.csistream_thread.start() self.logger.info(f"Started CSI stream for {self.get_name()}")
[docs] def stop(self): """ Stops the CSI stream thread for the ESPARGOS controller. The thread will stop after the current packet has been processed, or after a short timeout. """ if self.csistream_connected: self.csistream_connected = False self.csistream_thread.join() self.logger.info(f"Stopped CSI stream for {self.get_name()}")
[docs] def set_calib(self, calibrate): """ Enables or disables calibration mode on the ESPARGOS controller. :param calibrate: True to enable calibration mode, False to disable it :raises EspargosUnexpectedResponseError: If the server at the given host is not an ESPARGOS controller """ res = self._fetch("set_calib", "1" if calibrate else "0") if res != "ok": self.logger.error(f"Invalid response: {res}") raise EspargosUnexpectedResponseError
[docs] def add_consumer(self, clist, cv, *args): """ Adds a consumer to the CSI stream. A consumer is defined by a list, a condition variable and additional arguments. When a CSI packet is received, it will be appended to the list, and the condition variable will be notified. :param clist: A list to which the CSI packet will be appended. The entry added to the list is a tuple :code:`(esp_num, serialized_csi, *args)`, where esp_num is the number of the sensor in the array, serialized_csi is the raw CSI packet and :code:`*args` are the additional arguments. :param cv: A condition variable that will be notified when a CSI packet is received :param args: Additional arguments that will be added to the list along with the CSI packet """ self.consumers.append((clist, cv, args))
def _csistream_handle_message(self, message): pktsize = ctypes.sizeof(csi.csistream_pkt_t) assert(len(message) % pktsize == 0) for i in range(0, len(message), pktsize): packet = csi.csistream_pkt_t(message[i:i + pktsize]) serialized_csi = csi.serialized_csi_t(packet.buf) if serialized_csi.type_header == csi._ESPARGOS_SPI_TYPE_HEADER_CSI: for clist, cv, args in self.consumers: with cv: clist.append((packet.esp_num, serialized_csi, *args)) cv.notify() def _csistream_loop(self): with websockets.sync.client.connect("ws://" + self.host + "/csi", close_timeout = 0.5) as websocket: self.csistream_connected = True timeout_total = 0 timeout_once = 0.2 while self.csistream_connected: try: message = websocket.recv(timeout_once) timeout_total = 0 self._csistream_handle_message(message) except TimeoutError: timeout_total = timeout_total + timeout_once if timeout_total > self._csistream_timeout: self.logger.warn("Websockets timeout, disconnecting") self.csistream_connected = False def _fetch(self, path, data=None): method = "GET" if data is None else "POST" conn = http.client.HTTPConnection(self.host, timeout=5) conn.request(method, "/" + path, data) try: res = conn.getresponse() except TimeoutError: self.logger.error(f"Timeout in HTTP request for {self.host}/{path}") raise TimeoutError if res.status != 200: raise EspargosHTTPStatusError return res.read().decode("utf-8")