import numpy as np
import threading
import logging
import re
from . import csi
[docs]
class CSIBacklog(object):
"""
CSI backlog class. Stores CSI data in a ringbuffer for processing when needed.
:param pool: CSI pool object to collect CSI data from
:param enable_ht40: Enable storing CSI from HT40 frames (default: True)
:param calibrate: Apply calibration to CSI data (default: True)
:param size: Size of the ringbuffer (default: 100)
"""
def __init__(self, pool, enable_ht40 = True, calibrate = True, size = 100):
self.logger = logging.getLogger("pyespargos.backlog")
self.pool = pool
self.size = size
self.enable_ht40 = enable_ht40
self.calibrate = calibrate
self.storage_ht40 = np.zeros((size,) + self.pool.get_shape() + ((csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2,), dtype = np.complex64)
self.storage_timestamps = np.zeros(size)
self.storage_rssi = np.zeros((size,) + self.pool.get_shape(), dtype = np.float32)
self.head = 0
self.latest = None
self.running = True
def new_csi_callback(clustered_csi):
# Check MAC address if filter is installed
if self.mac_filter is not None:
if not self.mac_filter.match(clustered_csi.get_source_mac()):
return
# Store timestamp
sensor_timestamps = clustered_csi.get_sensor_timestamps()
if self.calibrate:
assert(self.pool.get_calibration() is not None)
sensor_timestamps = self.pool.get_calibration().apply_timestamps(sensor_timestamps)
self.storage_timestamps[self.head] = np.mean(sensor_timestamps)
# Store HT40 CSI if applicable
if self.enable_ht40 and clustered_csi.is_ht40():
csi_ht40 = clustered_csi.deserialize_csi_ht40()
if self.calibrate:
assert(self.pool.get_calibration() is not None)
csi_ht40 = self.pool.get_calibration().apply_ht40(csi_ht40)
self.storage_ht40[self.head] = csi_ht40
# Store RSSI
self.storage_rssi[self.head] = clustered_csi.get_rssi()
# Advance ringbuffer head
self.latest = self.head
self.head = (self.head + 1) % self.size
self.filllevel = min(self.filllevel + 1, self.size)
for cb in self.callbacks:
cb()
self.pool.add_csi_callback(new_csi_callback)
self.callbacks = []
self.filllevel = 0
self.mac_filter = None
[docs]
def add_update_callback(self, cb):
""" Add a callback that is called when new CSI data is added to the backlog """
self.callbacks.append(cb)
[docs]
def get_ht40(self):
"""
Retrieve HT40 CSI data from the ringbuffer
:return: HT40 CSI data, oldest first
"""
assert(self.enable_ht40)
return np.roll(self.storage_ht40, -self.head, axis = 0)[-self.filllevel:]
[docs]
def get_timestamps(self):
"""
Retrieve packet timestamps from the ringbuffer
:return: Timestamps, oldest first
"""
return np.roll(self.storage_timestamps, -self.head, axis = 0)[-self.filllevel:]
[docs]
def get_latest_timestamp(self):
"""
Retrieve the timestamp of the most recent packet in the ringbuffer
:return: Timestamp of the most recent packet
"""
if self.latest is None:
return None
return self.storage_timestamps[self.latest]
[docs]
def nonempty(self):
"""
Check if the backlog is nonempty
:return: True if the backlog is nonempty
"""
return self.latest is not None
[docs]
def start(self):
"""
Start the CSI backlog thread, must be called before using the backlog
"""
self.thread = threading.Thread(target=self.__run)
self.thread.start()
self.logger.info(f"Started CSI backlog thread")
[docs]
def stop(self):
"""
Stop the CSI backlog thread
"""
self.running = False
self.thread.join()
[docs]
def set_mac_filter(self, filter_regex):
"""
Set a MAC address filter for the backlog
:param filter_regex: MAC address filter regex
"""
self.mac_filter = re.compile(filter_regex)
def __run(self):
"""
CSI backlog thread main loop, do not call directly.
This function runs in a separate thread and continuously processes CSI data from the pool.
"""
while self.running:
self.pool.run()