from PyQt6 import QtWidgets, QtGui, QtCore, uic from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot import pyqtgraph.parametertree.parameterTypes as pTypes from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType import pyqtgraph as pg pg.setConfigOptions(antialias=True) from pyqtgraph import mkPen from pyqtgraph.functions import siEval, siParse, SI_PREFIX_EXPONENTS, SI_PREFIXES from pglive.sources.live_axis_range import LiveAxisRange from pglive.sources.data_connector import DataConnector from pglive.kwargs import Axis, LeadingLine from pglive.sources.live_plot import LiveLinePlot from pglive.sources.live_plot_widget import LivePlotWidget from pglive.sources.live_axis import LiveAxis import re import sys import os import argparse import logging import asyncio from driver.kirdy import FilterConfig, Kirdy as Kirdy_Driver import qasync from qasync import asyncClose, asyncSlot from collections import deque from datetime import datetime, timezone, timedelta from time import time from typing import Any, Optional, List from dateutil import tz import math import socket from pid_autotune import PIDAutotune, PIDAutotuneState import importlib.resources COMMON_ERROR_MSG = "Connection Timeout. Disconnecting." FLOAT_REGEX = re.compile(r'(?P[+-]?((((\d+(\.\d*)?)|(\d*\.\d+))([eE][+-]?\d+)?)|((?i:nan)|(inf))))\s*((?P[u' + SI_PREFIXES + r']?)(?P[\w°℃].*))?$') def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") parser.add_argument("--log-file", default="", nargs="?", help="Store logs into a file; Set the base filename") parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Set the logging level") return parser def siConvert(val, suffix, typ=float): """ Convert a value written in SI notation according to given Si Scale. Example:: siConvert(0.1, "mA") # returns 100 """ if val is None: val = 0.0 val, siprefix, suffix = siParse(str(val)+suffix, FLOAT_REGEX) v = typ(val) n = -SI_PREFIX_EXPONENTS[siprefix] if siprefix != '' else 0 if n > 0: return v * 10**n elif n < 0: # this case makes it possible to use Decimal objects here return v / 10**-n else: return v class Kirdy(QObject): connected_sig = pyqtSignal(bool) setting_update_sig = pyqtSignal(dict) report_update_sig = pyqtSignal(dict) cmd_fail_sig = pyqtSignal(str) cmd_warning_sig = pyqtSignal(str) def __init__(self, parent, kirdy, _poll_interval): super().__init__(parent) self._poll_interval = _poll_interval self._default_poll_interval = _poll_interval self._kirdy = kirdy self._kirdy.set_connected_sig(self.connected_sig) self.connected_sig.connect(self.start_polling) self._noti_info_box = QtWidgets.QMessageBox() self._noti_info_box.setIcon(QtWidgets.QMessageBox.Icon.Information) self._kirdy.set_err_msg_sig(self.cmd_fail_sig) self._kirdy.set_warning_msg_sig(self.cmd_warning_sig) self._poll_report_timer = QtCore.QTimer() self._poll_report_timer.timeout.connect(self.polling_event) self.poll_settings_timer = QtCore.QTimer() self.poll_settings_timer.setInterval(100) self.poll_settings_timer.timeout.connect(self.polling_settings_event) def connected(self): return self._kirdy.connected() def connecting(self): return self._kirdy.connecting() def start_session(self, host, port): self._kirdy.start_session(host=host, port=port) def end_session(self): if self._poll_report_timer.isActive(): self._poll_report_timer.stop() asyncio.get_running_loop().create_task(self._kirdy.end_session()) @pyqtSlot() def polling_settings_event(self): self._kirdy.task_dispatcher(self._kirdy.device.get_settings_summary(sig=self.setting_update_sig)) @pyqtSlot() def polling_event(self): success = True success &= self._kirdy.task_dispatcher(self._kirdy.device.get_status_report(sig=self.report_update_sig)) if not(success): self._noti_info_box.setWindowTitle(" ") self._noti_info_box.setText(f"Polling rate is too high. Kirdy cannot handle {1/(self._poll_interval)} Hz polling rate. Reset to default polling rate ({1/self._default_poll_interval} Hz)") self._noti_info_box.show() self.set_update_s(self._default_poll_interval) @pyqtSlot(bool) def start_polling(self, start): if start: if not(self._poll_report_timer.isActive()): self._poll_report_timer.setInterval(int(self._poll_interval*1000)) self._poll_report_timer.start() self.poll_settings_timer.start() else: logging.debug("Kirdy Polling Timer has been started already.") else: self._poll_report_timer.stop() self.poll_settings_timer.stop() @pyqtSlot(float) def set_update_s(self, interval): self._poll_interval = interval self.update_polling_rate() def update_polling_rate(self): if self._poll_report_timer.isActive(): self._poll_report_timer.stop() self.start_polling(True) else: logging.debug("Attempt to update polling timer when it is stopped") def get_hw_rev(self): return self._kirdy.get_hw_rev() def get_firmware_rev(self): return self._kirdy.get_firmware_rev() class Graphs: def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000): self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph] self.connectors = [] self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r')) self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r')) self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r')) self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g')) self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value} ℃', pen=pg.mkPen('g')) # Render the temperature setpoint line on top of the temperature being plotted self._temp_setpoint_line.setZValue(10) self._temp_setpoint_line.setVisible(False) def tickStrings(values: List, scale: float, spacing: float) -> List: return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values] for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph: time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="") # Display the relative ts in custom %H:%M:%S format without local timezone time_axis.tickStrings = tickStrings # Prevent scaling prefix being added to the back fo axis label time_axis.autoSIPrefix = False time_axis.showLabel() graph.setAxisItems({'bottom': time_axis}) graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'}) #TODO: x_range should not be updated on every tick graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900) graph.x_range_controller.crop_left_offset_to_data = True # Enable linking of axes in the graph widget's context menu graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title self.max_samples = max_samples ld_i_set_axis = LiveAxis('left', text="Current", units="A") ld_i_set_axis.showLabel() ld_i_set_graph.setAxisItems({'left': ld_i_set_axis}) ld_i_set_graph.addItem(self._ld_i_set_plot) ld_i_set_graph.y_range_controller = LiveAxisRange(fixed_range=[0.0, 0.4]) self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, plot_rate=10.0, update_rate=10.0, max_points=self.max_samples) self.connectors += [self.ld_i_set_connector] pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W") pd_mon_pwr_axis.showLabel() # Laser Diode Power Reading Graph Range Width: 5mW pd_mon_pwr_graph.y_range_controller = LiveAxisRange(y_range_width=0.005, y_bound=[0.0, float("inf")]) pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis}) pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot) self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, plot_rate=10.0, update_rate=10.0, max_points=self.max_samples) self.connectors += [self.pd_mon_pwr_connector] tec_temp_axis = LiveAxis('left', text="Temperature", units="℃") tec_temp_axis.showLabel() # TEC Temperature Reading Graph Range Width: 2.5mK tec_temp_graph.y_range_controller = LiveAxisRange(y_range_width=0.0025) tec_temp_graph.setAxisItems({'left': tec_temp_axis}) tec_temp_graph.addItem(self._tec_setpoint_plot) tec_temp_graph.addItem(self._tec_temp_plot) self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, plot_rate=10.0, update_rate=10.0, max_points=2) self.tec_temp_connector = DataConnector(self._tec_temp_plot, plot_rate=10.0, update_rate=10.0, max_points=self.max_samples) self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector] tec_i_axis = LiveAxis('left', text="Current", units="A") tec_i_axis.showLabel() tec_i_graph.setAxisItems({'left': tec_i_axis}) tec_i_graph.addLegend(brush=(50, 50, 200, 150)) tec_i_graph.addItem(self._tec_i_target_plot) tec_i_graph.addItem(self._tec_i_measure_plot) # TEC Output Current Reading Graph Range Width: 50mA tec_i_graph.y_range_controller = LiveAxisRange(y_range_width=0.05) self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, plot_rate=10.0, update_rate=10.0, max_points=self.max_samples) self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, plot_rate=10.0, update_rate=10.0, max_points=self.max_samples) self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector] def set_max_samples(self, max_samples): self.max_samples = max_samples for connector in self.connectors: with connector.data_lock: connector.max_points = self.max_samples connector.x = deque(maxlen=int(connector.max_points)) connector.y = deque(maxlen=int(connector.max_points)) def plot_append(self, report): try: ld_i_set = report['laser']['ld_i_set'] pd_pwr = report['laser']['pd_pwr'] tec_i_set = report['thermostat']['i_set'] tec_i_measure = report['thermostat']['tec_i'] tec_temp = report['thermostat']['temperature'] ts = report['ts'] self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts) if pd_pwr is not None: self._pd_mon_pwr_plot.show() self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts) else: self._pd_mon_pwr_plot.hide() self.pd_mon_pwr_connector.cb_append_data_point(0.0, ts) if tec_temp is None: self._tec_temp_plot.hide() tec_temp = -273.15 else: self._tec_temp_plot.show() self.tec_temp_connector.cb_append_data_point(tec_temp, ts) if self._temp_setpoint_line.isVisible(): self.tec_setpoint_connector.cb_set_data([self._temp_setpoint_line.value(), self._temp_setpoint_line.value()], [ts, ts]) else: self.tec_setpoint_connector.cb_set_data([tec_temp, tec_temp], [ts, ts]) if tec_i_measure is not None: self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts) self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts) except Exception as e: logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True) def clear_data_pts(self): for connector in self.connectors: connector.clear() connector.resume() def set_temp_setpoint_line(self, temp=None, visible=None): if visible is not None: self._temp_setpoint_line.setVisible(visible) if temp is not None: self._temp_setpoint_line.setValue(temp) # PyQtGraph normally does not update this text when the line # is not visible, so make sure that the temperature label # gets updated always, and doesn't stay at an old value. self._temp_setpoint_line.label.setText(f"{temp} ℃", color='g') class MutexParameter(pTypes.ListParameter): """ Mutually exclusive parameter where only one of its children is visible at a time, list selectable. The ordering of the list items determines which children will be visible. """ def __init__(self, **opts): super().__init__(**opts) self.sigValueChanged.connect(self.show_chosen_child) self.sigValueChanged.emit(self, self.opts['value']) def _get_param_from_value(self, value): if isinstance(self.opts['limits'], dict): values_list = list(self.opts['limits'].values()) else: values_list = self.opts['limits'] return self.children()[values_list.index(value)] @pyqtSlot(object, object) def show_chosen_child(self, value): for param in self.children(): param.hide() child_to_show = self._get_param_from_value(value.value()) child_to_show.show() if child_to_show.opts.get('triggerOnShow', None): child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value()) registerParameterType('mutex', MutexParameter) class UpdateNetSettingsForm(QtWidgets.QDialog): def __init__(self): super().__init__() ui_file_path = importlib.resources.files("ui").joinpath("update_network_settings_form.ui") uic.loadUi(ui_file_path, self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}" socket.inet_aton(ip_addr) socket.inet_aton(gateway_addr) return { "ip_addr": ip_addr, "gateway_addr": gateway_addr, "prefix_len": int(self.prefix_len_in.text()), "port": int(self.port_in.text()), } except (OSError, ValueError): return None class CfgPdMonForm(QtWidgets.QDialog): def __init__(self): super().__init__() ui_file_path = importlib.resources.files("ui").joinpath("config_pd_mon_form.ui") uic.loadUi(ui_file_path, self) class ConnSettingsForm(QtWidgets.QDialog): def __init__(self): super().__init__() ui_file_path = importlib.resources.files("ui").joinpath("conn_settings_form.ui") uic.loadUi(ui_file_path, self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" socket.inet_aton(ip_addr) return { "ip_addr": ip_addr, "port": int(self.port_in.text()) } except (OSError, ValueError): return None class ConfigAdcFilterForm(QtWidgets.QDialog): def __init__(self): super().__init__() ui_file_path = importlib.resources.files("ui").joinpath("config_adc_filter_form.ui") uic.loadUi(ui_file_path, self) self.filter_type_cbox.addItems(['Sinc5Sinc1With50hz60HzRejection', 'Sinc5Sinc1', 'Sinc3', 'Sinc3WithFineODR']) self.fine_filter_sampling_rate_spinbox.setVisible(False) self.fine_filter_sampling_rate_spinbox.setMinimum(FilterConfig.Sinc3WithFineODR.lower_limit) self.fine_filter_sampling_rate_spinbox.setMaximum(FilterConfig.Sinc3WithFineODR.upper_limit) self.filter_type_cbox.currentTextChanged.connect(self.sampling_rate_cbox_config) @pyqtSlot(str) def sampling_rate_cbox_config(self, filter_type): if filter_type == "": return if filter_type == "Sinc3WithFineODR": self.filter_sampling_rate_cbox.setVisible(False) self.fine_filter_sampling_rate_spinbox.setVisible(True) else: self.fine_filter_sampling_rate_spinbox.setVisible(False) self.filter_sampling_rate_cbox.setVisible(True) self.filter_sampling_rate_cbox.clear() self.filter_sampling_rate_cbox.addItems(getattr(FilterConfig, filter_type).get_list_of_settings()) def get_filter_settings(self): filter_type = self.filter_type_cbox.currentText() if filter_type == "Sinc3WithFineODR": return getattr(FilterConfig, filter_type)(self.fine_filter_sampling_rate_spinbox.value()) else: filter_type_val = getattr(FilterConfig, filter_type) filter_cfg = getattr(filter_type_val, self.filter_sampling_rate_cbox.currentText().lower()) return filter_cfg class MainWindow(QtWidgets.QMainWindow): """The maximum number of sample points to store.""" DEFAULT_MAX_SAMPLES = 1000 DEFAULT_IP_ADDR = '192.168.1.128' DEFAULT_PORT = 1550 LASER_DIODE_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'unit': 'mA', 'readonly': True, "compactHeight": False}, {'name': 'PD Current', 'type': 'float', 'unit': 'uA', 'readonly': True, "compactHeight": False}, {'name': 'PD Power', 'type': 'float', 'unit': 'mW', 'readonly': True, "compactHeight": False}, {'name': 'LF Mod Termination (50 Ohm)', 'type': 'list', 'limits': ['On', 'Off'], 'readonly': True, "compactHeight": False} ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, 300), 'unit': 'mA', 'lock': False, 'target': 'laser', 'action': 'set_i', "compactHeight": False}, {'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short', "compactHeight": False}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on', "compactHeight": False}, ]}, {'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'mW', 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit', "compactHeight": False}, {'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'mA/W', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity', "compactHeight": False, 'readonly': True}, {'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'uA', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current', "compactHeight": False, 'readonly': True}, {'name': 'Configure Photodiode Monitor', 'type': 'action'}, ]}, ] THERMOSTAT_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Temperature', 'type': 'float', 'unit': '℃', 'format': '{value:.4f}', 'readonly': True, "compactHeight": False}, {'name': 'Current through TEC', 'type': 'float', 'unit': 'mA', 'decimals': 6, 'readonly': True, "compactHeight": False}, ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'], 'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [ {'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 1, 'limits': (-3000, 3000), 'triggerOnShow': True, 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out', "compactHeight": False}, {'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.0001, 'limits': (-273.15, 300), 'format': '{value:.4f}', 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint', "compactHeight": False}, ]}, {'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000), 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i', "compactHeight": False}, {'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000), 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i', "compactHeight": False}, {'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 4.3), 'unit': 'V', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v', "compactHeight": False}, ]}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'}, ]}, {'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273.15, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit', "compactHeight": False}, {'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273.15, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_lower_limit', "compactHeight": False}, ]}, {'name': 'Temperature ADC Filter Settings', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Filter Type', 'type': 'list', 'limits': ['Sinc5Sinc1With50hz60HzRejection', 'Sinc5Sinc1', 'Sinc3', 'Sinc3WithFineODR'], 'readonly': True, "compactHeight": False}, {'name': 'Sampling Rate', 'type': 'float', 'value': 16.67, 'decimals': 4, 'unit': 'Hz', 'readonly': True, "compactHeight": False}, {'name': 'Recorded Sampling Rate', 'type': 'float', 'value': 16.67, 'decimals': 4, 'unit': 'Hz', 'readonly': True, "compactHeight": False}, {'name': 'Configure ADC Filter', 'type': 'action'}, ]}, {'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [ {'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273.15, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0', "compactHeight": False}, {'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'unit': 'kΩ', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0', "compactHeight": False}, {'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'decimals': 4, 'unit': 'K', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta', "compactHeight": False}, ]}, {'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Kp', 'type': 'float', 'step': 0.1, 'decimals': 16, 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp', "compactHeight": False}, {'name': 'Ki', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki', "compactHeight": False}, {'name': 'Kd', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd', "compactHeight": False}, {'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [ {'name': 'Minimum', 'type': 'float', 'step': 1, 'limits': (-3000, 3000), 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min', "compactHeight": False}, {'name': 'Maximum', 'type': 'float', 'step': 1, 'limits': (-3000, 3000), 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max', "compactHeight": False}, ]}, {'name': 'PID Auto Tune', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Target Temperature', 'type': 'float', 'value': 20.0, 'step': 0.1, 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Test Current', 'type': 'float', 'value': 1000, 'decimals': 6, 'step': 100, 'limits': (-3000, 3000), 'unit': 'mA', "compactHeight": False}, {'name': 'Temperature Swing', 'type': 'float', 'value': 0.0, 'step': 0.0001, 'prefix': '±', 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Lookback', 'type': 'float', 'value': 5.0, 'step': 0.1, 'unit': 's', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Run', 'type': 'action', 'tip': 'Run'}, ]}, ]}, ] def __init__(self, args): super(MainWindow, self).__init__() self.kirdy = Kirdy_Driver() ui_file_path = importlib.resources.files("ui").joinpath("kirdy_qt.ui") uic.loadUi(ui_file_path, self) self.info_box = QtWidgets.QMessageBox() self.info_box.setIcon(QtWidgets.QMessageBox.Icon.Information) # Load Global QT Style Sheet Settings qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss") with open(qss,"r") as fh: self.setStyleSheet(fh.read()) self.ip_addr = self.DEFAULT_IP_ADDR self.port = self.DEFAULT_PORT self.cfg_pd_mon_form = CfgPdMonForm() self.conn_settings_form = ConnSettingsForm() self.conn_settings_form.accepted.connect(self.start_connecting) self.update_net_settings_form = UpdateNetSettingsForm() self.update_net_settings_form.accepted.connect(self.update_net_settings) self.cfg_adc_filter_form = ConfigAdcFilterForm() self.max_samples = self.DEFAULT_MAX_SAMPLES self.autotuner = PIDAutotune(25) self.setup_menu_bar() self._set_up_ctrl_btns() self._set_up_plot_menu() def _setValuewithLock(self, value): if not self.opts.get("lock", None): if self.opts.get("unit", None) is not None: self.setValue(siConvert(value, self.opts.get("unit", None))) else: self.setValue(value) Parameter.setValuewithLock = _setValuewithLock def _add_unit_to_title(param_tree): def _traverse(param): if param["type"] == "group" or param["type"] == "mutex": for param in param["children"]: _add_unit_to_title(param) else: if "unit" in param.keys(): if not("title" in param.keys()): param["title"] = param["name"] param["title"] = param["title"] + " ({:})".format(param["unit"]) if isinstance(param_tree, list): for param in param_tree: _traverse(param) else: _traverse(param_tree) _add_unit_to_title(self.LASER_DIODE_PARAMETERS) _add_unit_to_title(self.THERMOSTAT_PARAMETERS) self.params = [ Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS), Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS), ] self._set_param_tree() self._set_up_pd_mon_form() self._set_up_adc_filter_form() self.tec_i_graph.setTitle("TEC Current") self.tec_temp_graph.setTitle("TEC Temperature") self.ld_i_set_graph.setTitle("LD Current Set") self.pd_mon_pwr_graph.setTitle("PD Mon Power") self.connect_btn.clicked.connect(self.show_conn_settings_form) self.kirdy_handler = Kirdy(self, self.kirdy, 1/20.0) self.kirdy_handler.setting_update_sig.connect(self.update_ld_ctrl_panel_settings) self.kirdy_handler.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings) self.kirdy_handler.report_update_sig.connect(self.update_ld_ctrl_panel_readings) self.kirdy_handler.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings) self.kirdy_handler.cmd_fail_sig.connect(self.show_err_cmd_msg) self.kirdy_handler.cmd_warning_sig.connect(self.show_warning_cmd_msg) self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples) self.kirdy_handler.report_update_sig.connect(self.graphs.plot_append) self.loading_spinner.hide() self.kirdy_handler.connected_sig.connect(self._on_connection_changed) def setup_menu_bar(self): @pyqtSlot(bool) def about_kirdy(_): hw_rev = self.kirdy_handler.get_hw_rev() firmware_rev = self.kirdy_handler.get_firmware_rev() QtWidgets.QMessageBox.about( self, "About Kirdy", f"""

Sinara 1550 Kirdy

Hardware Revision: v{hw_rev["major"]}.{hw_rev["minor"]}
Firmware Revision: {firmware_rev}
""" ) self.menu_action_about_kirdy.triggered.connect(about_kirdy) @pyqtSlot(bool) def about_gui(_): # TODO: Replace the hardware revision placeholder QtWidgets.QMessageBox.about( self, "About Kirdy Control Panel", f"""

Version: "Version"

""" ) self.menu_action_about_gui.triggered.connect(about_gui) @asyncSlot(bool) async def dfu_mode(_): await self.kirdy.device.dfu() self.kirdy_handler.end_session() self.menu_action_dfu_mode.triggered.connect(dfu_mode) @asyncSlot(bool) async def reset_kirdy(_): await self.kirdy.device.hard_reset() self.kirdy_handler.end_session() self.menu_action_hard_reset.triggered.connect(reset_kirdy) @pyqtSlot(bool) def save_settings(_): self.kirdy.task_dispatcher(self.kirdy.device.save_current_settings_to_flash()) saved = QtWidgets.QMessageBox(self) saved.setWindowTitle(" ") saved.setText(f"Config saved. Laser diode and thermostat configs have been saved into flash.") saved.setIcon(QtWidgets.QMessageBox.Icon.Information) saved.show() self.menu_action_save.triggered.connect(save_settings) @pyqtSlot(bool) def load_settings(_): self.kirdy.task_dispatcher(self.kirdy.device.restore_settings_from_flash()) loaded = QtWidgets.QMessageBox(self) loaded.setWindowTitle(" ") loaded.setText(f"Config loaded. Laser Diode and Thermostat configs have been loaded from flash.") loaded.setIcon(QtWidgets.QMessageBox.Icon.Information) loaded.show() self.menu_action_load.triggered.connect(load_settings) @pyqtSlot(bool) def show_update_net_settings_form(_): self.update_net_settings_form.show() self.menu_action_update_net_settings.triggered.connect(show_update_net_settings_form) def update_pd_mon_form_readings(self, ld_settings): pwr_unit = self.params[0].child('Photodiode Monitor Config', 'LD Power Limit').opts.get("unit", None) self.params[0].child('Photodiode Monitor Config', 'LD Power Limit').setOpts(limits= (0, siConvert(ld_settings["ld_pwr_limit"]["max"], pwr_unit))) self.cfg_pd_mon_form.settable_pwr_range_display_lbl.setText(f" 0 - {siConvert(ld_settings['ld_pwr_limit']['max'], pwr_unit):.4f}") self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.setMaximum(siConvert(ld_settings['ld_pwr_limit']['max'], pwr_unit)) responsitivity_unit = self.cfg_pd_mon_form.cfg_responsitivity_spinbox.unit self.cfg_pd_mon_form.cfg_responsitivity_reading.setText(f"{siConvert(ld_settings['pd_mon_params']['responsitivity'], responsitivity_unit):.4f}") i_dark_unit = self.cfg_pd_mon_form.cfg_dark_current_spinbox.unit self.cfg_pd_mon_form.cfg_dark_current_reading.setText(f"{siConvert(ld_settings['pd_mon_params']['i_dark'], i_dark_unit):.4f}") pwr_limit_unit = self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.unit self.cfg_pd_mon_form.cfg_pwr_limit_reading.setText(f"{siConvert(ld_settings['ld_pwr_limit']['value'], pwr_limit_unit):.4f}") def update_adc_filter_form_readings(self, filter_type, filter_rate): self.cfg_adc_filter_form.filter_type_reading_lbl.setText(filter_type) self.cfg_adc_filter_form.filter_sampling_rate_reading_lbl.setText(str(filter_rate)) def show_conn_settings_form(self): ip_addr = self.ip_addr.split(".") self.conn_settings_form.addr_in_0.setText(ip_addr[0]) self.conn_settings_form.addr_in_1.setText(ip_addr[1]) self.conn_settings_form.addr_in_2.setText(ip_addr[2]) self.conn_settings_form.addr_in_3.setText(ip_addr[3]) self.conn_settings_form.port_in.setText(str(self.port)) self.conn_settings_form.show() def _set_up_ctrl_btns(self): @pyqtSlot(bool) def ld_pwr_on(_): self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True)) self.ld_pwr_on_btn.clicked.connect(ld_pwr_on) @pyqtSlot(bool) def ld_pwr_off(_): self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False)) self.ld_pwr_off_btn.clicked.connect(ld_pwr_off) @pyqtSlot(bool) def ld_clear_alarm(_): self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm()) self.ld_clear_alarm_btn.clicked.connect(ld_clear_alarm) @pyqtSlot(bool) def tec_pwr_on(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(True)) self.tec_pwr_on_btn.clicked.connect(tec_pwr_on) @pyqtSlot(bool) def tec_pwr_off(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(False)) self.tec_pwr_off_btn.clicked.connect(tec_pwr_off) @pyqtSlot(bool) def tec_clear_alarm(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.clear_alarm()) self.tec_clear_alarm_btn.clicked.connect(tec_clear_alarm) @pyqtSlot(bool) def update_polling_rate(_): self.kirdy_handler.set_update_s(1/self.polling_rate_spinbox.value()) self.kirdy_handler.update_polling_rate() self.polling_rate_apply_btn.clicked.connect(update_polling_rate) def _set_up_plot_menu(self): self.plot_menu = QtWidgets.QMenu() self.plot_menu.setTitle("Plot Settings") clear = QtGui.QAction("Clear graphs", self.plot_menu) clear.triggered.connect(self.clear_graphs) self.plot_menu.addAction(clear) self.plot_menu.clear = clear self.samples_spinbox = QtWidgets.QSpinBox() self.samples_spinbox.setRange(2, 100000) self.samples_spinbox.setSuffix(' samples') self.samples_spinbox.setValue(self.max_samples) self.samples_spinbox.valueChanged.connect(self.set_max_samples) limit_samples = QtWidgets.QWidgetAction(self.plot_menu) limit_samples.setDefaultWidget(self.samples_spinbox) self.plot_menu.addAction(limit_samples) self.plot_menu.limit_samples = limit_samples self.plot_settings.setMenu(self.plot_menu) def _set_up_pd_mon_form(self): @pyqtSlot(bool) def ld_pwr_on(_): self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm()) self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True)) self.cfg_pd_mon_form.pwr_on_btn.clicked.connect(ld_pwr_on) @pyqtSlot(bool) def ld_pwr_off(_): self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False)) self.cfg_pd_mon_form.pwr_off_btn.clicked.connect(ld_pwr_off) def get_spinbox_value(spinbox): _, _, suffix = siParse(str(spinbox.value())+spinbox.unit, regex=FLOAT_REGEX) return siEval(str(spinbox.value())+spinbox.unit, regex=FLOAT_REGEX, suffix=suffix) def set_spinbox_value(spinbox, val): spinbox.setValue(siConvert(val, spinbox.unit)) @pyqtSlot(bool) def apply_pd_params(_): responsitivity = get_spinbox_value(self.cfg_pd_mon_form.cfg_responsitivity_spinbox) dark_current = get_spinbox_value(self.cfg_pd_mon_form.cfg_dark_current_spinbox) self.kirdy.task_dispatcher(self.kirdy.laser.set_pd_mon_responsitivity(responsitivity)) self.kirdy.task_dispatcher(self.kirdy.laser.set_pd_mon_dark_current(dark_current)) self.kirdy.task_dispatcher(self.kirdy.laser.apply_pd_params()) self.cfg_pd_mon_form.apply_pd_params_btn.clicked.connect(apply_pd_params) @pyqtSlot(bool) def apply_ld_pwr_limit(_): pwr_limit = get_spinbox_value(self.cfg_pd_mon_form.cfg_pwr_limit_spinbox) self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(pwr_limit)) self.cfg_pd_mon_form.apply_pwr_limit_btn.clicked.connect(apply_ld_pwr_limit) @pyqtSlot(bool) def rst_ld_pwr_limit(_): pwr_limit = self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.value() self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(0)) self.cfg_pd_mon_form.rst_ld_pwr_limit_btn.clicked.connect(rst_ld_pwr_limit) @asyncSlot(bool) async def apply_ld_pwr_limit_max(_): settings = await self.kirdy.device.get_settings_summary() set_spinbox_value(self.cfg_pd_mon_form.cfg_pwr_limit_spinbox, settings['laser']['ld_pwr_limit']['max']) self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(settings['laser']['ld_pwr_limit']['max'])) self.cfg_pd_mon_form.apply_pwr_limit_max_btn.clicked.connect(apply_ld_pwr_limit_max) ld_pwr_limit_unit = self.params[0].child('Photodiode Monitor Config', 'LD Power Limit').opts["unit"] ld_pwr_limit_text = self.cfg_pd_mon_form.cfg_pwr_limit_lbl.text() self.cfg_pd_mon_form.cfg_pwr_limit_lbl.setText(ld_pwr_limit_text.replace(":", f" ({ld_pwr_limit_unit}):")) self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.unit = ld_pwr_limit_unit settable_pwr_limit_text = self.cfg_pd_mon_form.settable_pwr_range_lbl.text() self.cfg_pd_mon_form.settable_pwr_range_lbl.setText(settable_pwr_limit_text.replace(":", f" ({ld_pwr_limit_unit}):")) pd_responsitivity_unit = self.params[0].child('Photodiode Monitor Config', 'Responsitivity').opts["unit"] pd_responsitivity_text = self.cfg_pd_mon_form.cfg_responsitivity_lbl.text() self.cfg_pd_mon_form.cfg_responsitivity_lbl.setText(pd_responsitivity_text.replace(":", f" ({pd_responsitivity_unit}):")) self.cfg_pd_mon_form.cfg_responsitivity_spinbox.unit = pd_responsitivity_unit pd_dark_current_unit = self.params[0].child('Photodiode Monitor Config', 'Dark Current').opts["unit"] pd_dark_current_text = self.cfg_pd_mon_form.cfg_dark_current_lbl.text() self.cfg_pd_mon_form.cfg_dark_current_lbl.setText(pd_dark_current_text.replace(":", f" ({pd_dark_current_unit}):")) self.cfg_pd_mon_form.cfg_dark_current_spinbox.unit = pd_dark_current_unit def _set_up_adc_filter_form(self): @pyqtSlot(bool) def apply_adc_filter_settings(): filter_cfg = self.cfg_adc_filter_form.get_filter_settings() self.kirdy.task_dispatcher(self.kirdy.thermostat.config_temp_adc_filter(filter_cfg)) self.cfg_adc_filter_form.apply_btn.clicked.connect(apply_adc_filter_settings) def _set_param_tree(self): self.ld_status.setStyleSheet("border: 3px solid #A1A1A1;") # Light Gray self.tec_status.setStyleSheet("border: 3px solid #A1A1A1;") # Light Gray tree = self.ld_tree tree.setHeaderHidden(True) tree.setParameters(self.params[0], showTop=False) self.params[0].sigTreeStateChanged.connect(self.send_command) tree = self.tec_tree tree.setHeaderHidden(True) tree.setParameters(self.params[1], showTop=False) self.params[1].sigTreeStateChanged.connect(self.send_command) self.prev_autotuner_state = None @asyncSlot() async def autotune(param): self.prev_autotuner_state = None match self.autotuner.state(): case PIDAutotuneState.STATE_OFF: settings = await self.kirdy.device.get_settings_summary() self.autotuner.setParam( param.parent().child('Target Temperature').value(), param.parent().child('Test Current').value() / 1000, param.parent().child('Temperature Swing').value(), 1.0 / settings['thermostat']['temp_adc_settings']['rate'], param.parent().child('Lookback').value()) print(param.parent().child('Lookback').value()) self.autotuner.setReady() param.setOpts(title="Stop") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_constant_current_control_mode()) self.kirdy_handler.report_update_sig.connect(self.autotune_tick) self.loading_spinner.show() self.loading_spinner.start() self.background_task_lbl.setText("Autotuning") case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN: self.autotuner.setOff() param.setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0)) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.params[1].child('PID Config', 'PID Auto Tune', 'Run').sigActivated.connect(autotune) @pyqtSlot() def show_pd_mon_cfg_form(param): ld_pwr_limit = self.params[0].child('Photodiode Monitor Config', 'LD Power Limit').value() pd_responsitivity = self.params[0].child('Photodiode Monitor Config', 'Responsitivity').value() pd_dark_current = self.params[0].child('Photodiode Monitor Config', 'Dark Current').value() self.cfg_pd_mon_form.cfg_responsitivity_spinbox.setValue(pd_responsitivity) self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.setValue(ld_pwr_limit) self.cfg_pd_mon_form.cfg_dark_current_spinbox.setValue(pd_dark_current) self.cfg_pd_mon_form.show() self.params[0].child('Photodiode Monitor Config', 'Configure Photodiode Monitor').sigActivated.connect(show_pd_mon_cfg_form) @asyncSlot() async def show_adc_filter_cfg_form(param): settings = await self.kirdy.device.get_settings_summary() filter_type = settings['thermostat']['temp_adc_settings']['filter_type'] filter_rate = settings['thermostat']['temp_adc_settings'][getattr(getattr(FilterConfig, filter_type), "_odr_type")] self.cfg_adc_filter_form.filter_type_cbox.setCurrentIndex(self.cfg_adc_filter_form.filter_type_cbox.findText(filter_type)) self.cfg_adc_filter_form.sampling_rate_cbox_config(filter_type) if filter_type == "Sinc3WithFineODR": self.cfg_adc_filter_form.fine_filter_sampling_rate_spinbox.setValue(filter_rate) else: self.cfg_adc_filter_form.filter_sampling_rate_cbox.setCurrentIndex(self.cfg_adc_filter_form.filter_sampling_rate_cbox.findText(filter_rate)) self.cfg_adc_filter_form.show() self.params[1].child('Temperature ADC Filter Settings', 'Configure ADC Filter').sigActivated.connect(show_adc_filter_cfg_form) @pyqtSlot(str) def show_err_cmd_msg(self, kirdy_msg): self.info_box.setWindowTitle(" ") self.info_box.setText(f"Command fails to execute. {kirdy_msg}") self.info_box.show() @pyqtSlot(str) def show_warning_cmd_msg(self, kirdy_msg): self.info_box.setWindowTitle(" ") self.info_box.setText(f"Command executed but has warning message. {kirdy_msg}") self.info_box.show() @asyncSlot(dict) async def autotune_tick(self, report): self.autotuner.run(report['thermostat']['temperature'], report['ts']/1000) if self.prev_autotuner_state != self.autotuner.state(): match self.autotuner.state(): case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN: await self.kirdy.thermostat.set_tec_i_out(self.autotuner.output()) self.prev_autotuner_state = self.autotuner.state() case PIDAutotuneState.STATE_SUCCEEDED: kp, ki, kd = self.autotuner.get_tec_pid() self.autotuner.setOff() self.params[1].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kp(kp)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_ki(ki)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kd(kd)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_control_mode()) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_temperature_setpoint(self.params[1].child('PID Config', 'PID Auto Tune', 'Target Temperature').value())) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.info_box.setWindowTitle(" ") self.info_box.setText("PID AutoTune Success: PID Config has been loaded to Thermostat.\nRegulating temperature.") self.info_box.show() self.prev_autotuner_state = None case PIDAutotuneState.STATE_FAILED: self.autotuner.setOff() self.params[1].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0)) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.info_box.setWindowTitle(" ") self.info_box.setText("PID Autotune failed.") self.info_box.show() self.prev_autotuner_state = None @pyqtSlot(bool) def _on_connection_changed(self, result): def ctrl_panel_setEnable(result): self.ld_status.setEnabled(result) self.ld_tree.setEnabled(result) self.ld_pwr_on_btn.setEnabled(result) self.ld_pwr_off_btn.setEnabled(result) self.ld_clear_alarm_btn.setEnabled(result) self.tec_status.setEnabled(result) self.tec_tree.setEnabled(result) self.tec_pwr_on_btn.setEnabled(result) self.tec_pwr_off_btn.setEnabled(result) self.tec_clear_alarm_btn.setEnabled(result) ctrl_panel_setEnable(result) def menu_bar_setEnable(result): self.menu_action_about_kirdy.setEnabled(result) self.menu_action_connect.setEnabled(result) self.menu_action_dfu_mode.setEnabled(result) self.menu_action_disconnect.setEnabled(result) self.menu_action_hard_reset.setEnabled(result) self.menu_action_save.setEnabled(result) self.menu_action_load.setEnabled(result) self.menu_action_update_net_settings.setEnabled(result) menu_bar_setEnable(result) def graph_group_setEnable(result): self.ld_i_set_graph.setEnabled(result) self.pd_mon_pwr_graph.setEnabled(result) self.tec_i_graph.setEnabled(result) self.tec_temp_graph.setEnabled(result) graph_group_setEnable(result) self.report_group.setEnabled(result) # TODO: Use QStateMachine to manage connections self.connect_btn.clicked.disconnect() if result: self.connect_btn.setText("Disconnect") self.connect_btn.clicked.connect(self.kirdy_handler.end_session) self._status() else: if self.kirdy_handler.connecting(): self.status_lbl.setText(f"Connection is dropped. Reconnecting to {self.ip_addr}:{self.port}.") self.connect_btn.setText("Stop") else: self.connect_btn.setText("Connect") self.connect_btn.clicked.connect(self.show_conn_settings_form) self.clear_graphs() self.status_lbl.setText(f"Disconnected from {self.ip_addr}:{self.port}.") self.connect_btn.clicked.connect(self.kirdy_handler.end_session) def _status(self): host = self.ip_addr port = self.port hw_rev = self.kirdy_handler.get_hw_rev() self.status_lbl.setText(f"Connected to Kirdy v{hw_rev['major']}.{hw_rev['minor']} @ {host}:{port}") def clear_graphs(self): self.graphs.clear_data_pts() @pyqtSlot(dict) def graphs_update(self, report): self.graphs.plot_append(report) @pyqtSlot(dict) def update_ld_ctrl_panel_settings(self, settings): try: settings = settings['laser'] with QSignalBlocker(self.params[0]): self.params[0].child('Output Config', 'LD Current Set').setValuewithLock(settings["ld_drive_current"]['value']) self.params[0].child('Output Config', 'LD Terminals Short').setValuewithLock(settings["ld_terms_short"]) self.params[0].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"]) self.params[0].child('Photodiode Monitor Config', 'LD Power Limit').setValuewithLock(settings["ld_pwr_limit"]["value"]) self.update_pd_mon_form_readings(settings) if settings["pd_mon_params"]["responsitivity"] is not None: self.params[0].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(settings["pd_mon_params"]["responsitivity"]) else: self.params[0].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(0) self.params[0].child('Photodiode Monitor Config', 'Dark Current').setValuewithLock(settings["pd_mon_params"]["i_dark"]) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True) @pyqtSlot(dict) def update_ld_ctrl_panel_readings(self, report): try: report = report['laser'] with QSignalBlocker(self.params[0]): if report['pwr_excursion']: self.ld_status.setStyleSheet("border: 3px solid red;") self.ld_status.setText(' Status: OverPower Alarm') else: if report['pwr_on']: self.ld_status.setStyleSheet("border: 3px solid #44E62C;") # Light Green self.ld_status.setText(' Status: Power On') else: self.ld_status.setStyleSheet("border: 3px solid #A1A1A1;") # Light Gray self.ld_status.setText(' Status: Power Off') with QSignalBlocker(self.params[0]): self.params[0].child('Readings', 'LD Current Set').setValuewithLock(report["ld_i_set"]) self.params[0].child('Readings', 'PD Current').setValuewithLock(report["pd_i"]) if report["pd_pwr"] is not None: self.params[0].child('Readings', 'PD Power').setValuewithLock(report["pd_pwr"]) else: self.params[0].child('Readings', 'PD Power').setValuewithLock(0) self.params[0].child('Readings', 'LF Mod Termination (50 Ohm)').setValuewithLock(report["term_50ohm"]) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True) @pyqtSlot(dict) def update_thermostat_ctrl_panel_settings(self, settings): try: settings = settings['thermostat'] with QSignalBlocker(self.params[1]): self.params[1].child('Output Config', 'Control Method').setValuewithLock("Temperature PID" if settings["pid_engaged"] else "Constant Current") self.params[1].child('Output Config', 'Control Method', 'Set Current').setValuewithLock(settings["tec_settings"]['i_set']['value']) self.params[1].child('Output Config', 'Control Method', 'Set Temperature').setValuewithLock(float(settings["temperature_setpoint"])) self.params[1].child('Output Config', 'Limits', 'Max Cooling Current').setValuewithLock(settings["tec_settings"]['max_i_pos']['value']) self.params[1].child('Output Config', 'Limits', 'Max Heating Current').setValuewithLock(settings["tec_settings"]['max_i_neg']['value']) self.params[1].child('Output Config', 'Limits', 'Max Voltage Difference').setValuewithLock(settings["tec_settings"]['max_v']['value']) self.params[1].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"]) filter_type = settings['temp_adc_settings']['filter_type'] filter_rate = settings['temp_adc_settings'][getattr(getattr(FilterConfig, filter_type), "_odr_type")] self.update_adc_filter_form_readings(filter_type, filter_rate) self.params[1].child('Temperature ADC Filter Settings', 'Filter Type').setValue(filter_type) self.params[1].child('Temperature ADC Filter Settings', 'Sampling Rate').setValue(settings['temp_adc_settings']['rate']) self.params[1].child('Temperature Monitor Config', 'Upper Limit').setValuewithLock(settings["temp_mon_settings"]['upper_limit']) self.params[1].child('Temperature Monitor Config', 'Lower Limit').setValuewithLock(settings["temp_mon_settings"]['lower_limit']) self.params[1].child('PID Config', 'Kp').setValuewithLock(settings["pid_params"]['kp']) self.params[1].child('PID Config', 'Ki').setValuewithLock(settings["pid_params"]['ki']) self.params[1].child('PID Config', 'Kd').setValuewithLock(settings["pid_params"]['kd']) self.params[1].child('PID Config', 'PID Output Clamping', 'Minimum').setValuewithLock(settings["pid_params"]['output_min']) self.params[1].child('PID Config', 'PID Output Clamping', 'Maximum').setValuewithLock(settings["pid_params"]['output_max']) self.params[1].child('Thermistor Settings', 'T₀').setValuewithLock(settings["thermistor_params"]['t0']) self.params[1].child('Thermistor Settings', 'R₀').setValuewithLock(settings["thermistor_params"]['r0']) self.params[1].child('Thermistor Settings', 'B').setValuewithLock(settings["thermistor_params"]['b']) self.graphs.set_temp_setpoint_line(temp=round(settings["temperature_setpoint"], 4)) self.graphs.set_temp_setpoint_line(visible=settings['pid_engaged']) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True) @pyqtSlot(dict) def update_thermostat_ctrl_panel_readings(self, report): try: report = report['thermostat'] if report['temp_mon_status']['over_temp_alarm']: self.tec_status.setStyleSheet("border: 3px solid red;") self.tec_status.setText(' Status: OverTemperature Alarm') else: if report['pwr_on']: self.tec_status.setStyleSheet("border: 3px solid #44E62C;") # Light Green self.tec_status.setText(' Status: Power On') else: self.tec_status.setStyleSheet("border: 3px solid #A1A1A1;") # Light Gray self.tec_status.setText(' Status: Power Off') with QSignalBlocker(self.params[1]): if report["temperature"] == None: self.params[1].child('Readings', 'Temperature').setValuewithLock(-273.15) else: self.params[1].child('Readings', 'Temperature').setValuewithLock(report["temperature"]) self.params[1].child('Readings', 'Current through TEC').setValuewithLock(report["tec_i"]) rate = 1 / (report['interval']['ms'] / 1e3 + report['interval']['us'] / 1e6) self.params[1].child('Temperature ADC Filter Settings', 'Recorded Sampling Rate').setValue(rate) self.cfg_adc_filter_form.recorded_sampling_rate_reading_lbl.setText(f"{rate:.2f}") except Exception as e: logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True) @pyqtSlot(int) def set_max_samples(self, samples: int): self.graphs.set_max_samples(samples) @pyqtSlot() def update_net_settings(self): net_settings = self.update_net_settings_form.get_net_settings() if net_settings is None: self.status_lbl.setText("Invalid IP Settings Input") return addr = net_settings["ip_addr"] port = net_settings["port"] prefix_len = net_settings["prefix_len"] gateway = net_settings["gateway_addr"] self.kirdy.task_dispatcher(self.kirdy.device.set_ip_settings(addr, port, prefix_len, gateway)) self.status_lbl.setText("IP Settings is Updated") @pyqtSlot() def start_connecting(self): net_settings = self.conn_settings_form.get_net_settings() if net_settings is None: self.status_lbl.setText("Invalid IP Settings Input") return self.ip_addr = net_settings["ip_addr"] self.port = net_settings["port"] host = self.ip_addr port = self.port if not (self.kirdy_handler.connecting() or self.kirdy_handler.connected()): self.status_lbl.setText("Connecting...") self.kirdy_handler.start_session(host=host, port=port) self.connect_btn.setText("Stop") self.connect_btn.clicked.disconnect() self.connect_btn.clicked.connect(self.kirdy_handler.end_session) @pyqtSlot(object, object) def send_command(self, param, changes): for inner_param, change, data in changes: if change == 'value': """ cmd translation from mutex type parameter """ if inner_param.opts.get('target_action_pair', None) is not None: target, action = inner_param.opts['target_action_pair'][inner_param.opts['limits'].index(data)] cmd = getattr(getattr(self.kirdy, target), action) param.child(*param.childPath(inner_param)).setOpts(lock=True) self.kirdy.task_dispatcher(cmd()) param.child(*param.childPath(inner_param)).setOpts(lock=False) continue """ cmd translation from non-mutex type parameter""" if inner_param.opts.get("target", None) is not None: if inner_param.opts.get("action", None) is not None: if inner_param.opts.get("unit", None) is not None: _, _, suffix = siParse(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX) data = siEval(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX, suffix=suffix) cmd = getattr(getattr(self.kirdy, inner_param.opts["target"]), inner_param.opts["action"]) param.child(*param.childPath(inner_param)).setOpts(lock=True) self.kirdy.task_dispatcher(cmd(data)) param.child(*param.childPath(inner_param)).setOpts(lock=False) continue async def coro_main(): args = get_argparser().parse_args() if args.log_file or args.log_file is None: filename = args.log_file if filename is None: time = datetime.now() filename = f"{time.year}-{time.day}-{time.month}-{time.hour}:{time.minute}:{time.second}.log" logging.basicConfig(filename=filename, filemode='a', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y/%d/%m %H:%M:%S', level=logging.INFO) if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app_quit_event = asyncio.Event() app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(app_quit_event.set) main_window = MainWindow(args) main_window.show() await app_quit_event.wait() def main(): qasync.run(coro_main()) if __name__ == '__main__': main()