#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Wed Jan 19 15:23:16 2022 @author: West Zhican Chen """ import sys import numpy as np import h5py import matplotlib.pyplot as plt import collections import time from find_lts import * from optparse import OptionParser from channel_analysis import * import hdf5_lib from hdf5_lib import * import matplotlib ################################################################################################## # # CSI PROCESSING # ################################################################################################## def process_hdf5(hdf5, cell_i = 0, sc_i = 7): """ Process the pilots data from collected hdf5 data to get csi value Input: ------ hdf5: input hdf5 file as defined in class hdf5_lib in hdf5_lib.py cell_i: int, index of the hub where base station is connected sc_i: int, index of the subcarrier used in csi value Output: ------ data_pilot: dict with the following key and value chan_full: np.complex128, full channel matrix (spatial and frequency domain) with shape [num_cl, num_bs_ants, num_sc] chan_sci: np.complex128, channel matrix with respect to subcarrier sc_i (spatial domain) with shape [num_cl, num_bs_ants] csi: np.complex128, processed csi value (frequency domain) from pilot data with shape [num_frames, num_cells, num_cl, num_bs_ants, num_pilots, num_sc] samps: np.complex128, received samples (time domain) from pilot data with shape [num_frames, num_cells, num_cl, num_bs_ants, samps_per_slot] m_filt: np.complex64, matched filter (time domain) output with shape [num_frames, num_cells, num_cl, num_bs_ants, samps_per_slot] sf_start: int, index of starting sample with shape [num_frames, num_cells, num_cl, num_bs_ants] cmpx_pilots: raw complex IQ samples of received pilots (time domain) with shape [num_frames, num_cells, num_cl, num_bs_ants, sampes_per_slot] pilots_rx_t_btc: raw complex IQ samples of received pilots (time domain) excluding zero padding portion with shape [num_frames, num_cells, num_cl, num_bs_ants, sampes_per_slot - zpadding] data_noise: dict with the following key and value samps_n: np.complex128, received noise samples (time domain) shape [num_frames, num_cells, 1, num_bs_ants, samps_per_slot] td_pwr_dbm_n: np.float, average noise power in dBm with shape [num_frames, num_cells, 1, num_bs_ants] """ plt.close("all") # Retrieve attributes n_frm_st = hdf5.n_frm_st # index of starting frame n_frm_end = hdf5.n_frm_end # index of ending frame metadata = hdf5.metadata # metadata read from hdf5 file if 'SYMBOL_LEN' in metadata: # symbol length including zero padding (legace datasets) samps_per_slot = int(metadata['SYMBOL_LEN']) elif 'SLOT_SAMP_LEN' in metadata: # symbol length including zero padding (recent datasets) samps_per_slot = int(metadata['SLOT_SAMP_LEN']) num_pilots = int(metadata['PILOT_NUM']) # number of pilots (one per UE? seems this value is the same as num_cl) num_cl = int(metadata['CL_NUM']) # number of clients n_ue = num_cl prefix_len = int(metadata['PREFIX_LEN']) # length of prefix postfix_len = int(metadata['POSTFIX_LEN']) # length of postfix z_padding = prefix_len + postfix_len # length of total zero padding # if offset < 0: # if no offset is given use prefix from HDF5 # offset = int(prefix_len) offset = int(prefix_len) fft_size = int(metadata['FFT_SIZE']) # size of FFT cp = int(metadata['CP_LEN']) # length of Cyclic Prefix rate = int(metadata['RATE']) # sampling rate pilot_type = metadata['PILOT_SEQ_TYPE'].astype(str)[0] # pilot type nonzero_sc_size = fft_size # size of nonzero subcarriers (same as FFT size for legacy datasets) if 'DATA_SUBCARRIER_NUM' in metadata: nonzero_sc_size = metadata['DATA_SUBCARRIER_NUM'] # size of nonzero subcarriers (recent datasets) ofdm_pilot = np.array(metadata['OFDM_PILOT']) # OFDM pilot (time domain) with length [fft_size + cp] if "OFDM_PILOT_F" in metadata: # OFDM pilot (frequency domain) with length [fft_size] ofdm_pilot_f = np.array(metadata['OFDM_PILOT_F']) else: if pilot_type == 'zadoff-chu': _, pilot_f = generate_training_seq(preamble_type='zadoff-chu', seq_length=nonzero_sc_size, cp=cp, upsample=1, reps=1) else: _, pilot_f = generate_training_seq(preamble_type='lts', cp=32, upsample=1) ofdm_pilot_f = pilot_f if "RECIPROCAL_CALIB" in metadata: # bool value indicating whether to conduct reciprocity calibration between uplink and downlink reciprocal_calib = np.array(metadata['RECIPROCAL_CALIB']) else: reciprocal_calib = np.array(0) samps_per_slot_no_pad = samps_per_slot - z_padding # symbol length excluding zero padding symbol_per_slot = ((samps_per_slot_no_pad) // (cp + fft_size)) # number of symbols per frame if 'UL_SYMS' in metadata: # bool value related to uplink data (legacy data) ul_slot_num = int(metadata['UL_SYMS']) elif 'UL_SLOTS' in metadata: # bool value related to uplink data (recent data) ul_slot_num = int(metadata['UL_SLOTS']) # initialize return value dicts data_noise = {} data_pilot = {} # checking pilot data pilot_data_avail = len(hdf5.pilot_samples) # bool value indicating if pilot data is available if pilot_data_avail: pilot_samples = hdf5.pilot_samples # checking noise data noise_avail = len(hdf5.noise_samples) # # bool value indicating if noise data is available if noise_avail: noise_samples = hdf5.noise_samples In = noise_samples[:, :, :, :, 0::2] / 2 ** 15 Qn = noise_samples[:, :, :, :, 1::2] / 2 ** 15 samps_n = In + (Qn * 1j) # compute noise power td_pwr_dbm_n, mean_pwr_n, std_pwr_n = samps2power(samps_n) # rms = np.sqrt(np.mean(samps_n * np.conj(samps_n), axis = -1)) # td_pwr_lin = np.real(rms) ** 2 # # td_pwr_dbm_n = 10 * np.log10(td_pwr_lin / 1e-3) # td_pwr_dbm_n = 10 * np.log10(td_pwr_lin) data_noise['samps_n'] = samps_n data_noise['td_pwr_dbm_n'] = td_pwr_dbm_n data_noise['mean_pwr_n'] = mean_pwr_n data_noise['std_pwr_n'] = std_pwr_n # inspecting pilots data if pilot_data_avail: print("samps_per_slot = {}, offset = {}, cp = {}, prefix_len = {}, postfix_len = {}, z_padding = {}, pilot_rep = {}".format(samps_per_slot, offset, cp, prefix_len, postfix_len, z_padding, symbol_per_slot)) num_frames = pilot_samples.shape[0] num_cells = pilot_samples.shape[1] num_bs_ants = pilot_samples.shape[3] scale_factor = 2**-15 samps_mat = np.reshape(pilot_samples, (num_frames, num_cells, num_pilots, num_bs_ants, samps_per_slot, 2)) samps = scale_factor * (samps_mat[:, :, :, :, :, 0] + 1j * samps_mat[:, :, :, :, :, 1]) td_pwr_dbm_p, mean_pwr_p, std_pwr_p = samps2power(samps) csi_result = csi_from_pilots(pilot_samples, z_padding = z_padding, fft_size = fft_size, cp = cp) csi, m_filt, sf_start, cmpx_pilots, k_lts, n_lts, pilots_rx_t_btc = csi_result # take averaged value from sc_i as spatial channel matrix chan_full, chan_sci = csi2chan(csi, cell_i = cell_i, sc_i = sc_i) data_pilot['chan_full'] = chan_full data_pilot['chan_sci'] = chan_sci data_pilot['csi'] = csi data_pilot['samps'] = samps data_pilot['td_pwr_dbm_p'] = td_pwr_dbm_p data_pilot['mean_pwr_p'] = mean_pwr_p data_pilot['std_pwr_p'] = std_pwr_p data_pilot['m_filt'] = m_filt data_pilot['sf_start'] = sf_start data_pilot['cmpx_pilots'] = cmpx_pilots data_pilot['pilots_rx_t_btc'] = pilots_rx_t_btc # return chan_full, chan_sci, csi, samps, m_filt, sf_start, cmpx_pilots, pilots_rx_t_btc return data_pilot, data_noise # samps_mat = np.reshape(pilot_samples, (num_frames, num_cells, num_pilots, num_bs_ants, samps_per_slot, 2)) # samps = scale_factor * (samps_mat[:, :, :, :, :, 0] + 1j * samps_mat[:, :, :, :, :, 1]) # csi, _ = hdf5_lib.samps2csi(pilot_samples, # num_pilots, # samps_per_slot, # fft_size = fft_size, # offset = offset, # bound = z_padding, # cp = cp, # pilot_f = ofdm_pilot_f) # # get out the specific cell csi data for analysis # cellCSI = csi[:, cell_i, :, :, :, :] # # extract the csi data corresponding to a specific ofdm symbol # if ofdm_sym_i >= symbol_per_slot: # if out of range index, do average # userCSI = np.mean(cellCSI[:, :, :, :, :], 2) # else: # userCSI = cellCSI[:, :, ofdm_sym_i, :, :] # if deep_inspect: # match_filt, seq_num, seq_len, cmpx_pilots, seq_orig = hdf5_lib.filter_pilots(samps, z_padding, fft_size, cp, pilot_type, nonzero_sc_size) # match_filt_clr, frame_map, f_st, peak_map = hdf5_lib.frame_sanity(match_filt, seq_num, seq_len, n_frm_st, frame_to_plot = frame_i, plt_ant = ant_i, cp = cp) # return samps, csi, userCSI, match_filt, seq_num, seq_len, cmpx_pilots, seq_orig, match_filt_clr, frame_map, f_st, peak_map # return samps, csi, userCSI def csi2chan(csi, cell_i = 0, sc_i = 7, norm = False): """ Process the calculated csi to get channel matrix Input: ------ csi: np.complex128, processed csi value (frequency domain) from pilot data with shape [num_frames, num_cells, num_cl, num_bs_ants, num_pilots, num_sc] cell_i: int, index of selected cell for channel calculation sc_i: int, index of selected subcarrier for channel calculation Output: ------ chan_full: np.complex128, full channel matrix (spatial and frequency domain) with shape [num_cl, num_bs_ants, num_sc] chan_sci: np.complex128, channel matrix with respect to subcarrier sc_i (spatial domain) with shape [num_cl, num_bs_ants] """ (num_frames, num_cells, num_cl, num_bs_ants, num_pilots, num_sc) = csi.shape assert cell_i < num_cells, "cell_i {} is out of range for num_cells {}".format(cell_i, num_cells) assert sc_i < num_sc, "sc_i {} is out of range for num_sc {}".format(sc_i, num_sc) # rearrange the axis of csi for further processing cellCSI = csi[:, cell_i, :, :, :, :] cellCSI = np.transpose(cellCSI, axes = [1, 2, 4, 0, 3]) assert cellCSI.shape == (num_cl, num_bs_ants, num_sc, num_frames, num_pilots) cellCSI = cellCSI.reshape(num_cl, num_bs_ants, num_sc, -1) # # normalize power to max value 0 dB # if norm: # # find out the index of normalization factor # (ind_cl, ind_bs_ant, ind_sc) = np.unravel_index(np.argmax(abs(cellCSI[:, :, :, 0]), axis=None), cellCSI[:, :, :, 0].shape) # # normalize csi to yield higher SNR # cellCSI_norm_factor = cellCSI[ind_cl, ind_bs_ant, ind_sc, :] # cellCSI_norm = cellCSI / cellCSI_norm_factor # chan_full = np.mean(cellCSI_norm, axis = 3) # else: # chan_full = np.mean(cellCSI, axis = 3) # phase normalization cellCSI_phase_normed = np.zeros(cellCSI.shape, dtype = np.complex128) chan_full = np.zeros((num_cl, num_bs_ants, num_sc), dtype = np.complex128) for cl_idx in range(num_cl): # find an antenna with non-zero receiving power ant_idx = np.argmax(abs(cellCSI[cl_idx, :, 0, int(cellCSI.shape[-1] / 2)])) for sc_idx in range(num_sc): for rep_idx in range(cellCSI.shape[-1]): # calculate the phase of ant_idx-th antenna element as the normalization factor phase_norm = cellCSI[cl_idx, ant_idx, sc_idx, rep_idx] / abs(cellCSI[cl_idx, ant_idx, sc_idx, rep_idx]) cellCSI_phase_normed[cl_idx, :, sc_idx, rep_idx] = cellCSI[cl_idx, :, sc_idx, rep_idx] / phase_norm # average over all the frames to get higher SNR chan_full[cl_idx, :, sc_idx] = np.mean(cellCSI_phase_normed[cl_idx, :, sc_idx, :], axis = -1) chan_sci = chan_full[:, :, sc_i] return chan_full, chan_sci ################################################################################################## # # DATA QUALITY (CORRELATION PEAKS) ANALYSIS # ################################################################################################## def hdf52pmap(data_pilot, metadata, n_frm_st = 0): """ Convert the hdf5 data into peak percentage maps for data quality inspection Input: ------ data_pilot: dict with pilot data (see more description from above) metadata: metadata from hdf5 data n_frm_st: int, index of starting frame Output: ------ seq_found: float, frame map with shape (num_frames, num_cells, num_cl, num_bs_ants), each element is the percentage of detected correlation peaks """ # loading samps and metadata # data_pilot, data_noise = process_hdf5(hdf5, cell_i = cell_i, sc_i = sc_i) samps = data_pilot['samps'] # metadata = hdf5.metadata prefix_len = int(metadata['PREFIX_LEN']) # length of prefix postfix_len = int(metadata['POSTFIX_LEN']) # length of postfix z_padding = prefix_len + postfix_len # length of total zero padding fft_size = int(metadata['FFT_SIZE']) cp = int(metadata['CP_LEN']) # length of Cyclic Prefix pilot_type = metadata['PILOT_SEQ_TYPE'].astype(str)[0] # pilot type nonzero_sc_size = fft_size # size of nonzero subcarriers (same as FFT size for legacy datasets) if 'SYMBOL_LEN' in metadata: # symbol length including zero padding (legace datasets) samps_per_slot = int(metadata['SYMBOL_LEN']) elif 'SLOT_SAMP_LEN' in metadata: # symbol length including zero padding (recent datasets) samps_per_slot = int(metadata['SLOT_SAMP_LEN']) samps_per_slot_no_pad = samps_per_slot - z_padding # symbol length excluding zero padding symbol_per_slot = ((samps_per_slot_no_pad) // (cp + fft_size)) # number of symbols per frame match_filt, seq_num, seq_len, cmpx_pilots, seq_orig = hdf5_lib.filter_pilots(samps, z_padding, fft_size, cp, pilot_type, nonzero_sc_size) match_filt_clr, frame_map, f_st, peak_map = hdf5_lib.frame_sanity(match_filt, seq_num, seq_len, n_frm_st, cp = cp) # construct seq_found map seq_found = 100 * peak_map / symbol_per_slot return seq_found def pmap2pic(seq_found, n_frm_st, n_frm_end, fig_path = None): """ Converts the calculated peak map into pictures and save to designated folder Input: ------ seq_found: float, frame map with shape (num_frames, num_cells, num_cl, num_bs_ants), each element is the percentage of detected correlation peaks n_frm_st: int, index of starting frame n_frm_end: int, index of ending frame fig_path: str, path to save the figures Output: ------ NA """ (num_frames, num_cells, num_cl, num_bs_ants) = seq_found.shape # in every figure, we save 2 consecutive client's info num_fig = int(num_cl / 2) for n_fig in range(num_fig): # initialize the figure fig, axes = plt.subplots(nrows = 2, ncols = 1, squeeze = False) fig.suptitle('Pilot Map (Percentage of Detected Pilots Per Symbol) - NOTE: Might exceed 100% due to threshold') c = [] # we only inspect 1 cell at this version n_c = 0 for n_u in range(2): c.append(axes[n_u, n_c].imshow(seq_found[:, 0, 2 * n_fig + n_u, :].T, vmin=0, vmax=100, cmap='Blues', interpolation='nearest', extent=[n_frm_st, n_frm_end, num_bs_ants, 0], aspect="auto")) axes[n_u, n_c].set_title('Cell {} UE {}'.format(n_c, 2 * n_fig + n_u)) axes[n_u, n_c].set_ylabel('Antenna #') axes[n_u, n_c].set_xlabel('Frame #') axes[n_u, n_c].set_xticks(np.arange(n_frm_st, n_frm_end, 1), minor=True) axes[n_u, n_c].set_yticks(np.arange(0, num_bs_ants, 1), minor=True) axes[n_u, n_c].grid(which='minor', color='0.75', linestyle='-', linewidth=0.05) cbar = plt.colorbar(c[-1], ax=axes.ravel().tolist(), ticks=np.linspace(0, 100, 11), orientation='horizontal') cbar.ax.set_xticklabels(['0%', '10%', '20%', '30%', '40%', '50%', '60%', '70%', '80%', '90%', '100%']) if fig_path: plt.savefig(fig_path + "pmap_{}".format(n_fig)) if not fig_path: plt.show() ################################################################################################## # # CHANNEL POWER ANALYSIS # ################################################################################################## def power2plot(power, label = None): """ Draw the heat map for the power plot Input: ------: power: float, channel power with shape [num_cl, num_bs_ants] Output: ------ None """ plt.rcParams['font.size'] = '10' scplot = plt.imshow(power, cmap='bwr') cbar = plt.colorbar(scplot) if not label: cbar.set_label('Channel Power (dB)') else: cbar.set_label(label) # plt.xticks(list(range(num_tx_ant))) # plt.yticks(list(range(num_rx_ant))) plt.xlabel("rx antenna index") plt.ylabel("tx antenna index") plt.show() def chan2power(chan_sci, plt = False): """ Calculate the power from the channel matrix Input: ------ chan_sci:np.complex128, channel matrix with respect to subcarrier sc_i (spatial domain) with shape [num_cl, num_bs_ants] plt: bool, whether to plot the power plot Output: ------ power_chan_sci_dB: float, channel power with shape [num_cl, num_bs_ants] """ # calculate the normalized channel power in dB scale power_chan_sci = np.abs(chan_sci)**2 # power_chan_sci = power_chan_sci / max(power_chan_sci.reshape(-1,)) power_chan_sci_dB = 10.0 * np.log10(power_chan_sci) if plt: power2plot(power_chan_sci_dB) return power_chan_sci_dB def csi2power(csi, cell_i = 0, sc_i = 7, plt = False): """ Directly calculate the self-coupling power from the csi value Note that here the frame average is performed over the self-coupling power instead of the channel matrix Input: ------ csi: np.complex128, processed csi value (frequency domain) from pilot data with shape [num_frames, num_cells, num_cl, num_bs_ants, num_pilots, num_sc] cell_i: int, index of selected cell for channel calculation sc_i: int, index of selected subcarrier for channel calculation plt: bool, whether to plot the power plot Output: ------ power_chan_full_dB: float, channel power with shape [num_cl, num_bs_ants, num_sc, num_frame * num_pilots] power_chan_full_mean_dB: float, averaged channel power with shape [num_cl, num_bs_ants, num_sc] power_chan_sci_dB: float, channel power with shape [num_cl, num_bs_ants] """ (num_frames, num_cells, num_cl, num_bs_ants, num_pilots, num_sc) = csi.shape assert cell_i < num_cells, "cell_i {} is out of range for num_cells {}".format(cell_i, num_cells) assert sc_i < num_sc, "sc_i {} is out of range for num_sc {}".format(sc_i, num_sc) # rearrange the axis of csi for further processing cellCSI = csi[:, cell_i, :, :, :, :] cellCSI = np.transpose(cellCSI, axes = [1, 2, 4, 0, 3]) assert cellCSI.shape == (num_cl, num_bs_ants, num_sc, num_frames, num_pilots) cellCSI = cellCSI.reshape(num_cl, num_bs_ants, num_sc, -1) # calculate the normalized channel power in dB scale power_csi_full = np.abs(cellCSI)**2 # power_csi_full = power_csi_full / max(power_csi_full.reshape(-1,)) power_csi_full_dB = 10.0 * np.log10(power_csi_full) # average out the channel power power_csi_full_mean_dB = np.mean(power_csi_full_dB, axis = -1) power_csi_sci_dB = power_csi_full_mean_dB[:, :, sc_i] if plt: power2plot(power_csi_sci_dB) return power_csi_full, power_csi_full_dB, power_csi_full_mean_dB, power_csi_sci_dB def samps2power(samps): """ calculate power from samps Input: ------ samps: np.complex128, received samples (time domain) from pilot data with shape [num_frames, num_cells, num_cl, num_bs_ants, samps_per_slot] Output: ------ td_pwr_dbm: float, average sample power in dBm with shape [num_frames, num_cells, num_cl, num_bs_ants] mean_pwr: float, mean sample power in dBm with shape [num_cells, num_cl, num_bs_ants] std_pwr: float, standard deviation of sample power in dBm with shape [num_cells, num_cl, num_bs_ants] """ # compute noise power rms = np.sqrt(np.mean(samps * np.conj(samps), axis = -1)) td_pwr_lin = np.real(rms) ** 2 td_pwr_dbm = 10 * np.log10(td_pwr_lin) td_pwr_dbm_trans = np.transpose(td_pwr_dbm, axes = [1, 2, 3, 0]) mean_pwr = np.mean(td_pwr_dbm_trans, axis = -1) std_pwr = np.std(td_pwr_dbm_trans, axis = -1) return td_pwr_dbm, mean_pwr, std_pwr