creotech-sayma-testsuite/analyze_sayma_data.py

130 lines
5.4 KiB
Python

import numpy as np
import argparse
import os
import datetime
DATA_COLUMNS = np.array([
["data_time_iso", "datetime64[s]"],
["phase_radian", "f"],
])
REPORT_PARAMS_KEYS = [
"rpt_time",
"data_time_start", "data_time_end", "data_count",
"wave_type", # type of DDS waveform
"wave_freq", # frequency of the DDS waveform
"phase_ps_mean", "phase_ps_min", "phase_ps_max",
"phase_ps_std", # standard deviation
"phase_ps_meanabsdev", # mean absolute deviation
]
def report(*args, **kwargs):
if len(args) != 1 or not isinstance(args[0], dict):
raise SyntaxError("Must only pass 1 dict argument, or >=1 keyword arguments.")
# Create and store lines of strings for the report
rpt = []
prm = args[0]
for k in REPORT_PARAMS_KEYS:
if prm.get(k) is None and kwargs.get(k) is None:
raise ValueError("Data missing for report item {}.".format(k))
# Replace or append item from kwargs onto the prm dict
if kwargs.get(k) is not None:
prm[k] = kwargs[k]
# Report title
rpt.append("Sayma DAC/TTL Data Analysis")
rpt.append("")
# Report header
rpt.append("Report generated at: {}".format(prm["rpt_time"]))
rpt.append("------")
rpt.append("")
# Report content
rpt.append("Data collection started at: {}".format(prm["data_time_start"]))
rpt.append(" ended at: {}".format(prm["data_time_end"]))
rpt.append("Total number of records: {}".format(prm["data_count"]))
rpt.append("")
rpt.append("Target wave type: {}".format(prm["wave_type"]))
rpt.append(" frequency: {:.2f} MHz".format(prm["wave_freq"]/1e6))
rpt.append("")
rpt.append("Phase skew statistics:")
rpt.append(" Mean: {:>10.4f} ps".format(prm["phase_ps_mean"]))
rpt.append(" Minimum: {:>10.4f} ps".format(prm["phase_ps_min"]))
rpt.append(" Maximum: {:>10.4f} ps".format(prm["phase_ps_max"]))
rpt.append(" Standard Deviation: {:>10.4f} ps".format(prm["phase_ps_std"]))
rpt.append(" Max Absolute Deviation: {:>10.4f} ps".format(prm["phase_ps_maxabsdev"]))
rpt.append(" Mean Absolute Deviation: {:>10.4f} ps".format(prm["phase_ps_meanabsdev"]))
rpt.append("")
# TODO: Use jinja2 to produce a report
return '\n'.join(rpt)
def main():
# Get timestamp at script start
now = datetime.datetime.now().replace(
microsecond=0, # get rid of microseconds when printed
)
now_iso = now.isoformat(sep=' ')
parser = argparse.ArgumentParser(description="Data analysis tool for Sayma DAC/TTL")
parser.add_argument("log",
help="path of the log file containing the data records; "
"must be a CSV in excel-tab dialect",
type=str)
parser.add_argument("--rpt",
help="path of the file where the analysis will be reported (and overwrite)",
type=str)
args = parser.parse_args()
with open(args.log, 'r') as f:
print("Analysis of data records started at {}.".format(now_iso))
# Create a numpy array from the CSV data (it has no header row)
# Currently, log files from plot_sayma_data.py uses TAB as delimiter
data = np.loadtxt(f, delimiter='\t',
dtype={'names': DATA_COLUMNS[:, 0], 'formats': DATA_COLUMNS[:, 1]})
# Clean-up: remove duplicate rows
data_rows = len(data) # original row count w/ potential duplicates
data = np.unique(data, axis=0)
print("{} duplicate rows detected and ignored.".format(data_rows - len(data)))
data_rows = len(data) # new row count without duplicates
# Define dict of params for generating the report
rpt_params = dict()
rpt_params["rpt_time"] = now_iso
# Analyse: data collection time & count
rpt_params["data_time_start"] = str(data["data_time_iso"].min()).replace('T', ' ')
rpt_params["data_time_end"] = str(data["data_time_iso"].max()).replace('T', ' ')
rpt_params["data_count"] = data_rows
# Analyse: phase stats
# TODO: Check if square waves use the same calculation as sinusoids
WAVE_TYPES = ["Sinusoid", "Square wave"]
rpt_params["wave_type"] = WAVE_TYPES[int(
input(">> Input: target wave type (use the index)\n"
" ({}): ".format(", ".join(["{}={}".format(i,x) for i,x in enumerate(WAVE_TYPES)])))
)]
rpt_params["wave_freq"] = float(
input(">> Input: target wave frequency (in Hz): ")
)
# Conversion formula: phase_time = (1/wave_freq) * (phase_angle/2/pi)
data_phase_ps = data["phase_radian"] / rpt_params["wave_freq"] / 2 / np.pi * 1e12
rpt_params["phase_ps_mean"] = data_phase_ps.mean()
rpt_params["phase_ps_min"] = data_phase_ps.min()
rpt_params["phase_ps_max"] = data_phase_ps.max()
rpt_params["phase_ps_std"] = data_phase_ps.std()
rpt_params["phase_ps_maxabsdev"] = \
np.absolute(data_phase_ps - data_phase_ps.max()).mean()
rpt_params["phase_ps_meanabsdev"] = \
np.absolute(data_phase_ps - data_phase_ps.mean()).mean()
# Generate the report
rpt = report(rpt_params)
# Print the report
print(rpt)
# TODO: Implement report output feature
if args.rpt is not None:
raise NotImplementedError("Report file output feature has not been implemented.")
if __name__ == "__main__":
main()