Source code for espargos.util

#!/usr/bin/env python3

import numpy as np
from . import constants
from . import csi

[docs] def csi_interp_iterative(csi, weights=None, iterations=10): """ Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Tries to sum up the CSI data phase-coherently with the least error. More details about the algorithm (which is quite straightforward) can be found in section "IV. Linear Interpolation Baseline" in the paper "GAN-based Massive MIMO Channel Model Trained on Measured Data". :param csi: The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints. :param weights: The weights to use for each CSI datapoint. If None, all datapoints are weighted equally. :param iterations: The number of iterations to perform. Default is 10. :return: The interpolated CSI data. Complex-valued NumPy array with the same shape as the input CSI data. """ if weights is None: weights = np.ones(len(csi), dtype = csi.dtype) / len(csi) phi = np.zeros_like(weights, dtype = csi.dtype) w = None for i in range(iterations): w = np.einsum("n,n,n...->...", weights, np.exp(-1.0j * phi), csi) phi = np.angle(np.einsum("a,na->n", np.conj(w.flatten()), csi.reshape(len(csi), -1))) #err = np.sum([weights[n] * np.linalg.norm(csi[n] - np.exp(1.0j * phi[n]) * w)**2 for n in range(len(csi))]) return w
[docs] def csi_interp_iterative_by_array(csi, weights=None, iterations=10): """ Interpolates CSI data (frequency-domain or time-domain) using an iterative algorithm. Same as :func:`csi_interp_iterative`, but assumes that second dimension of :code:`csi` is the antenna array dimension and performs the interpolation for each antenna array separately. """ csi_interp = np.zeros((csi.shape[1], *csi.shape[2:]), dtype = csi.dtype) for b in range(csi.shape[1]): csi_interp[b] = csi_interp_iterative(csi[:,b], weights=weights, iterations=iterations) return csi_interp
[docs] def csi_interp_eigenvec(csi, weights=None): """ Interpolates CSI data (frequency-domain or time-domain) by finding the principal eigenvector of the covariance matrix. :param csi: The CSI data to interpolate. Complex-valued NumPy array. Can be an array with arbitrary dimensions, but the first dimension must be the number of CSI datapoints. :param weights: The weights to use for each CSI datapoint. If None, all datapoints are weighted equally. """ if weights is None: weights = np.ones(len(csi)) / len(csi) csi_shape = csi.shape[1:] csi = np.reshape(csi, (csi.shape[0], -1)) R = np.einsum("n,na,nb->ab", weights, csi, np.conj(csi)) # eig is faster than eigh for small matrices like the one here w, v = np.linalg.eig(R) principal = np.argmax(w) return np.reshape(v[:, principal], csi_shape)
[docs] def get_frequencies_ht40(primary_channel, secondary_channel): """ Returns the frequencies of the subcarriers in an HT40 2.4GHz WiFi channel. :param primary_channel: The primary channel number. :param secondary_channel: The secondary channel number. :return: The frequencies of the subcarriers, in Hz, NumPy array. """ center_primary = constants.WIFI_CHANNEL1_FREQUENCY + constants.WIFI_CHANNEL_SPACING * (primary_channel - 1) center_secondary = constants.WIFI_CHANNEL1_FREQUENCY + constants.WIFI_CHANNEL_SPACING * (secondary_channel - 1) center_ht40 = (center_primary + center_secondary) / 2 ht40_subcarrier_count = (csi.csi_buf_t.htltf_lower.size + csi.HT40_GAP_SUBCARRIERS * 2 + csi.csi_buf_t.htltf_higher.size) // 2 assert(ht40_subcarrier_count % 2 == 1) return center_ht40 + np.arange(-ht40_subcarrier_count // 2, ht40_subcarrier_count // 2) * constants.WIFI_SUBCARRIER_SPACING
[docs] def get_calib_trace_wavelength(frequencies): """ Returns the wavelength of the subcarriers on the calibration traces on the ESPARGOS sensor board. :param frequencies: The frequencies of the subcarriers, in Hz, NumPy array. :return: The wavelengths of the subcarriers, in meters, NumPy array. """ return constants.SPEED_OF_LIGHT / (frequencies * np.sqrt(constants.CALIB_TRACE_EFFECTIVE_DIELECTRIC_CONSTANT))
[docs] def get_cable_wavelength(frequencies, velocity_factors): """ Returns the wavelength of the provided subcarrier frequencies on a cable with the given velocity factors. :param frequencies: The frequencies of the subcarriers, in Hz, NumPy array. :param velocity_factors: The velocity factors of the cable, NumPy array. :return: The wavelengths of the subcarriers, in meters, NumPy array. """ return constants.SPEED_OF_LIGHT / frequencies[np.newaxis, :] * velocity_factors[:, np.newaxis]
[docs] def interpolate_ht40_gap(csi_ht40): """ Apply linear interpolation to determine realistic values for the subcarrier channel coefficients in the gap between the bonded channels in an HT40 channel. :param csi_ht40: The CSI data for an HT40 channel. Complex-valued NumPy array with arbitrary shape, but the last dimension must be the subcarriers. :return: The CSI data with the values in the gap filled in. """ index_left = csi.csi_buf_t.htltf_lower.size // 2 - 1 index_right = csi.csi_buf_t.htltf_lower.size // 2 + csi.HT40_GAP_SUBCARRIERS missing_indices = np.arange(index_left + 1, index_right) left = csi_ht40[..., index_left] right = csi_ht40[..., index_right] interp = (missing_indices - index_left) / (index_right - index_left) csi_ht40[..., missing_indices] = interp * right[..., np.newaxis] + (1 - interp) * left[..., np.newaxis]
[docs] def shift_to_firstpeak(csi_datapoints, max_delay_taps = 3, search_resolution = 40, peak_threshold = 0.4): """ Shifts the CSI data so that the first peak of the channel impulse response is at time 0. Uses a simple but rather computation-efficient algorithm to find the first peak of the channel impulse response (as opposed to superresolution-based approach). :param csi_datapoints: The CSI data to shift, frequency-domain. Complex-valued NumPy array with shape (datapoints, arrays, rows, columns, subcarriers). :param max_delay_taps: The maximum number of time taps to shift the CSI data by. :param search_resolution: The number of search points (granularity) to use for the time shift. :param peak_threshold: The threshold for the peak detection, as a fraction of the maximum peak power. :return: The frequency-domain CSI data with the first peak of the channel impulse response at time 0. """ # Time-shift all collected CSI so that first "peak" is at time 0 # CSI datapoints has shape (datapoints, arrays, rows, columns, subcarriers) shifts = np.linspace(-max_delay_taps, 0, search_resolution) subcarrier_range = np.arange(-csi_datapoints.shape[-1] // 2, csi_datapoints.shape[-1] // 2) + 1 shift_vectors = np.exp(1.0j * np.outer(shifts, 2 * np.pi * subcarrier_range / csi_datapoints.shape[-1])) powers_by_delay = np.abs(np.einsum("lbrms,ds->lbrmd", csi_datapoints, shift_vectors)) max_peaks = np.max(powers_by_delay, axis = -1) first_peak = np.argmax(powers_by_delay > peak_threshold * max_peaks[:,:,:,:,np.newaxis], axis = -1) shift_to_firstpeak = shift_vectors[first_peak] return shift_to_firstpeak * csi_datapoints