import numpy as np import argparse import os import datetime DATA_COLUMNS = np.array([ ["data_time_iso", "datetime64[s]"], ["phase_radian", "f"], ]) REPORT_PARAMS_KEYS = [ "rpt_time", "data_time_start", "data_time_end", "data_count", "wave_type", # type of DDS waveform "wave_freq", # frequency of the DDS waveform "phase_ps_mean", "phase_ps_min", "phase_ps_max", "phase_ps_std", # standard deviation "phase_ps_meanabsdev", # mean absolute deviation ] def report(*args, **kwargs): if len(args) != 1 or not isinstance(args[0], dict): raise SyntaxError("Must only pass 1 dict argument, or >=1 keyword arguments.") # Create and store lines of strings for the report rpt = [] prm = args[0] for k in REPORT_PARAMS_KEYS: if prm.get(k) is None and kwargs.get(k) is None: raise ValueError("Data missing for report item {}.".format(k)) # Replace or append item from kwargs onto the prm dict if kwargs.get(k) is not None: prm[k] = kwargs[k] # Report title rpt.append("Sayma DAC/TTL Data Analysis") rpt.append("") # Report header rpt.append("Report generated at: {}".format(prm["rpt_time"])) rpt.append("------") rpt.append("") # Report content rpt.append("Data collection started at: {}".format(prm["data_time_start"])) rpt.append(" ended at: {}".format(prm["data_time_end"])) rpt.append("Total number of records: {}".format(prm["data_count"])) rpt.append("") rpt.append("Target wave type: {}".format(prm["wave_type"])) rpt.append(" frequency: {:.2f} MHz".format(prm["wave_freq"]/1e6)) rpt.append("") rpt.append("Phase skew statistics:") rpt.append(" Mean: {:>10.4f} ps".format(prm["phase_ps_mean"])) rpt.append(" Minimum: {:>10.4f} ps".format(prm["phase_ps_min"])) rpt.append(" Maximum: {:>10.4f} ps".format(prm["phase_ps_max"])) rpt.append(" Standard Deviation: {:>10.4f} ps".format(prm["phase_ps_std"])) rpt.append(" Max Absolute Deviation: {:>10.4f} ps".format(prm["phase_ps_maxabsdev"])) rpt.append(" Mean Absolute Deviation: {:>10.4f} ps".format(prm["phase_ps_meanabsdev"])) rpt.append("") # TODO: Use jinja2 to produce a report return '\n'.join(rpt) def main(): # Get timestamp at script start now = datetime.datetime.now().replace( microsecond=0, # get rid of microseconds when printed ) now_iso = now.isoformat(sep=' ') parser = argparse.ArgumentParser(description="Data analysis tool for Sayma DAC/TTL") parser.add_argument("log", help="path of the log file containing the data records; " "must be a CSV in excel-tab dialect", type=str) parser.add_argument("--rpt", help="path of the file where the analysis will be reported (and overwrite)", type=str) args = parser.parse_args() with open(args.log, 'r') as f: print("Analysis of data records started at {}.".format(now_iso)) # Create a numpy array from the CSV data (it has no header row) # Currently, log files from plot_sayma_data.py uses TAB as delimiter data = np.loadtxt(f, delimiter='\t', dtype={'names': DATA_COLUMNS[:, 0], 'formats': DATA_COLUMNS[:, 1]}) # Clean-up: remove duplicate rows data_rows = len(data) # original row count w/ potential duplicates data = np.unique(data, axis=0) print("{} duplicate rows detected and ignored.".format(data_rows - len(data))) data_rows = len(data) # new row count without duplicates # Define dict of params for generating the report rpt_params = dict() rpt_params["rpt_time"] = now_iso # Analyse: data collection time & count rpt_params["data_time_start"] = str(data["data_time_iso"].min()).replace('T', ' ') rpt_params["data_time_end"] = str(data["data_time_iso"].max()).replace('T', ' ') rpt_params["data_count"] = data_rows # Analyse: phase stats # TODO: Check if square waves use the same calculation as sinusoids WAVE_TYPES = ["Sinusoid", "Square wave"] rpt_params["wave_type"] = WAVE_TYPES[int( input(">> Input: target wave type (use the index)\n" " ({}): ".format(", ".join(["{}={}".format(i,x) for i,x in enumerate(WAVE_TYPES)]))) )] rpt_params["wave_freq"] = float( input(">> Input: target wave frequency (in Hz): ") ) # Conversion formula: phase_time = (1/wave_freq) * (phase_angle/2/pi) data_phase_ps = data["phase_radian"] / rpt_params["wave_freq"] / 2 / np.pi * 1e12 rpt_params["phase_ps_mean"] = data_phase_ps.mean() rpt_params["phase_ps_min"] = data_phase_ps.min() rpt_params["phase_ps_max"] = data_phase_ps.max() rpt_params["phase_ps_std"] = data_phase_ps.std() rpt_params["phase_ps_maxabsdev"] = \ np.absolute(data_phase_ps - data_phase_ps.max()).mean() rpt_params["phase_ps_meanabsdev"] = \ np.absolute(data_phase_ps - data_phase_ps.mean()).mean() # Generate the report rpt = report(rpt_params) # Print the report print(rpt) # TODO: Implement report output feature if args.rpt is not None: raise NotImplementedError("Report file output feature has not been implemented.") if __name__ == "__main__": main()