from PyQt6 import QtWidgets, QtGui, QtCore, uic from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot import pyqtgraph.parametertree.parameterTypes as pTypes from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType import pyqtgraph as pg pg.setConfigOptions(antialias=True) from pyqtgraph import mkPen from pyqtgraph.functions import siEval, siParse, SI_PREFIX_EXPONENTS, SI_PREFIXES from pglive.sources.live_axis_range import LiveAxisRange from pglive.sources.data_connector import DataConnector from pglive.kwargs import Axis, LeadingLine from pglive.sources.live_plot import LiveLinePlot from pglive.sources.live_plot_widget import LivePlotWidget from pglive.sources.live_axis import LiveAxis import re import sys import os import argparse import logging import asyncio from driver.kirdy import Kirdy as Kirdy_Driver import qasync from qasync import asyncClose, asyncSlot from collections import deque from datetime import datetime, timezone, timedelta from time import time from typing import Any, Optional, List from ui.ui_conn_settings_form import Ui_Conn_Settings_Form from ui.ui_update_network_settings_form import Ui_Update_Network_Settings_Form from dateutil import tz import math import socket from pid_autotune import PIDAutotune, PIDAutotuneState import importlib.resources COMMON_ERROR_MSG = "Connection Timeout. Disconnecting." FLOAT_REGEX = re.compile(r'(?P[+-]?((((\d+(\.\d*)?)|(\d*\.\d+))([eE][+-]?\d+)?)|((?i:nan)|(inf))))\s*((?P[u' + SI_PREFIXES + r']?)(?P[\w°℃].*))?$') def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") parser.add_argument("--log-file", default="", nargs="?", help="Store logs into a file; Set the base filename") parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Set the logging level") return parser def siConvert(val, suffix, typ=float): """ Convert a value written in SI notation according to given Si Scale. Example:: siConvert(0.1, "mA") # returns 100 """ val, siprefix, suffix = siParse(str(val)+suffix, FLOAT_REGEX) v = typ(val) n = -SI_PREFIX_EXPONENTS[siprefix] if siprefix != '' else 0 if n > 0: return v * 10**n elif n < 0: # this case makes it possible to use Decimal objects here return v / 10**-n else: return v class Kirdy(QObject): connected_sig = pyqtSignal(bool) setting_update_sig = pyqtSignal(dict) report_update_sig = pyqtSignal(dict) def __init__(self, parent, kirdy, _poll_interval): super().__init__(parent) self._poll_interval = _poll_interval self._kirdy = kirdy self._kirdy.set_connected_sig(self.connected_sig) self.connected_sig.connect(self.start_polling) self.connected_sig.connect(self.connected_setup) self._kirdy.set_report_sig(self.report_update_sig) self._timer = QtCore.QBasicTimer() def connected(self): return self._kirdy.connected() def connecting(self): return self._kirdy.connecting() def start_session(self, host, port): self._kirdy.start_session(host=host, port=port) def end_session(self): if self._timer.isActive(): self._timer.stop() asyncio.get_running_loop().create_task(self._kirdy.end_session()) @pyqtSlot(bool) def connected_setup(self, connected): if connected: self._kirdy.task_dispatcher(self._kirdy.device.set_active_report_mode(True)) self._kirdy._report_mode_on = True def timerEvent(self, event): self._kirdy.task_dispatcher(self._kirdy.device.get_settings_summary(sig=self.setting_update_sig)) @pyqtSlot(bool) def start_polling(self, start): if start: if not(self._timer.isActive()): self._timer.start(int(self._poll_interval*1000), self) else: logging.debug("Kirdy Polling Timer has been started already.") else: self._timer.stop() @pyqtSlot(float) def set_update_s(self, interval): self._poll_interval = interval self.update_polling_rate() def update_polling_rate(self): if self._timer.isActive(): self._timer.stop() self.start_polling() else: logging.debug("Attempt to update polling timer when it is stopped") def get_hw_rev(self): return self._kirdy.get_hw_rev() class Graphs: def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000): self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph] self.connectors = [] self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r')) self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r')) self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r')) self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g')) self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value} ℃', pen=pg.mkPen('g')) # Render the temperature setpoint line on top of the temperature being plotted self._temp_setpoint_line.setZValue(10) self._temp_setpoint_line.setVisible(False) def tickStrings(values: List, scale: float, spacing: float) -> List: return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values] for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph: time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="") # Display the relative ts in custom %H:%M:%S format without local timezone time_axis.tickStrings = tickStrings # Prevent scaling prefix being added to the back fo axis label time_axis.autoSIPrefix = False time_axis.showLabel() graph.setAxisItems({'bottom': time_axis}) graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'}) #TODO: x_range should not be updated on every tick graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900) graph.x_range_controller.crop_left_offset_to_data = True # Enable linking of axes in the graph widget's context menu graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title self.max_samples = max_samples ld_i_set_axis = LiveAxis('left', text="Current", units="A") ld_i_set_axis.showLabel() ld_i_set_graph.setAxisItems({'left': ld_i_set_axis}) ld_i_set_graph.addItem(self._ld_i_set_plot) self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, max_points=self.max_samples) self.connectors += [self.ld_i_set_connector] pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W") pd_mon_pwr_axis.showLabel() pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis}) pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot) self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, max_points=self.max_samples) self.connectors += [self.pd_mon_pwr_connector] tec_temp_axis = LiveAxis('left', text="Temperature", units="℃") tec_temp_axis.showLabel() tec_temp_graph.setAxisItems({'left': tec_temp_axis}) tec_temp_graph.addItem(self._tec_setpoint_plot) tec_temp_graph.addItem(self._tec_temp_plot) self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, max_points=1) self.tec_temp_connector = DataConnector(self._tec_temp_plot, max_points=self.max_samples) self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector] tec_i_axis = LiveAxis('left', text="Current", units="A") tec_i_axis.showLabel() tec_i_graph.setAxisItems({'left': tec_i_axis}) tec_i_graph.addLegend(brush=(50, 50, 200, 150)) tec_i_graph.y_range_controller = LiveAxisRange(fixed_range=[-1.0, 1.0]) tec_i_graph.addItem(self._tec_i_target_plot) tec_i_graph.addItem(self._tec_i_measure_plot) self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, max_points=self.max_samples) self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, max_points=self.max_samples) self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector] def set_max_samples(self, max_samples): self.max_samples = max_samples for connector in self.connectors: with connector.data_lock: connector.max_points = self.max_samples connector.x = deque(maxlen=int(connector.max_points)) connector.y = deque(maxlen=int(connector.max_points)) def plot_append(self, report): try: ld_i_set = report['laser']['ld_i_set'] pd_pwr = report['laser']['pd_pwr'] tec_i_set = report['thermostat']['i_set'] tec_i_measure = report['thermostat']['tec_i'] tec_temp = report['thermostat']['temperature'] ts = report['ts'] self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts) if pd_pwr is not None: self._pd_mon_pwr_plot.show() self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts) else: self._pd_mon_pwr_plot.hide() self.pd_mon_pwr_connector.cb_append_data_point(0.0, ts) if tec_temp is None: self._tec_temp_plot.hide() tec_temp = -273.15 else: self._tec_temp_plot.show() self.tec_temp_connector.cb_append_data_point(tec_temp, ts) if self._temp_setpoint_line.isVisible(): self.tec_setpoint_connector.cb_append_data_point(self._temp_setpoint_line.value(), ts) else: self.tec_setpoint_connector.cb_append_data_point(tec_temp, ts) if tec_i_measure is not None: self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts) self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts) except Exception as e: logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True) def clear_data_pts(self): for connector in self.connectors: connector.clear() connector.resume() def set_temp_setpoint_line(self, temp=None, visible=None): if visible is not None: self._temp_setpoint_line.setVisible(visible) if temp is not None: self._temp_setpoint_line.setValue(temp) # PyQtGraph normally does not update this text when the line # is not visible, so make sure that the temperature label # gets updated always, and doesn't stay at an old value. self._temp_setpoint_line.label.setText(f"{temp} ℃", color='g') class MutexParameter(pTypes.ListParameter): """ Mutually exclusive parameter where only one of its children is visible at a time, list selectable. The ordering of the list items determines which children will be visible. """ def __init__(self, **opts): super().__init__(**opts) self.sigValueChanged.connect(self.show_chosen_child) self.sigValueChanged.emit(self, self.opts['value']) def _get_param_from_value(self, value): if isinstance(self.opts['limits'], dict): values_list = list(self.opts['limits'].values()) else: values_list = self.opts['limits'] return self.children()[values_list.index(value)] @pyqtSlot(object, object) def show_chosen_child(self, value): for param in self.children(): param.hide() child_to_show = self._get_param_from_value(value.value()) child_to_show.show() if child_to_show.opts.get('triggerOnShow', None): child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value()) registerParameterType('mutex', MutexParameter) class UpdateNetSettingsForm(QtWidgets.QDialog, Ui_Update_Network_Settings_Form): def __init__(self): super().__init__() self.setupUi(self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}" socket.inet_aton(ip_addr) socket.inet_aton(gateway_addr) return { "ip_addr": ip_addr, "gateway_addr": gateway_addr, "prefix_len": int(self.prefix_len_in.text()), "port": int(self.port_in.text()), } except (OSError, ValueError): return None class ConnSettingsForm(QtWidgets.QDialog, Ui_Conn_Settings_Form): def __init__(self): super().__init__() self.setupUi(self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" socket.inet_aton(ip_addr) return { "ip_addr": ip_addr, "port": int(self.port_in.text()) } except (OSError, ValueError): return None class MainWindow(QtWidgets.QMainWindow): """The maximum number of sample points to store.""" DEFAULT_MAX_SAMPLES = 1000 DEFAULT_IP_ADDR = '192.168.1.128' DEFAULT_PORT = 1337 LASER_DIODE_STATUS = [ {'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False}, ]} ] LASER_DIODE_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True, "compactHeight": False}, {'name': 'PD Current', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True, "compactHeight": False}, {'name': 'PD Power', 'type': 'float', 'suffix': 'W', 'siPrefix': True, 'readonly': True, "compactHeight": False}, {'name': 'LF Mod Termination (50 Ohm)', 'type': 'list', 'limits': ['On', 'Off'], 'readonly': True, "compactHeight": False} ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 300), 'unit': 'mA', 'lock': False, 'target': 'laser', 'action': 'set_i', "compactHeight": False}, {'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short', "compactHeight": False}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on', "compactHeight": False}, ]}, {'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'mW', 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit', "compactHeight": False}, {'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'mA/W', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity', "compactHeight": False}, {'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, float("inf")), 'unit': 'uA', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current', "compactHeight": False}, ]}, ] THERMOSTAT_STATUS = [ {'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False}, ]} ] THERMOSTAT_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Temperature', 'type': 'float', 'format': '{value:.4f} ℃', 'readonly': True, "compactHeight": False}, {'name': 'Current through TEC', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'decimals': 6, 'readonly': True, "compactHeight": False}, ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'], 'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [ {'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 1, 'limits': (-1000, 1000), 'triggerOnShow': True, 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out', "compactHeight": False}, {'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.0001, 'limits': (-273, 300), 'format': '{value:.4f}', 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint', "compactHeight": False}, ]}, {'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1000), 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i', "compactHeight": False}, {'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1000), 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i', "compactHeight": False}, {'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 4), 'unit': 'V', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v', "compactHeight": False}, ]}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'}, ]}, # TODO Temperature ADC Filter Settings {'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit', "compactHeight": False}, {'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_lower_limit', "compactHeight": False}, ]}, {'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [ {'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0', "compactHeight": False}, {'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'unit': 'kΩ', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0', "compactHeight": False}, {'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'decimals': 4, 'unit': 'K', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta', "compactHeight": False}, ]}, {'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Kp', 'type': 'float', 'step': 0.1, 'decimals': 16, 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp', "compactHeight": False}, {'name': 'Ki', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki', "compactHeight": False}, {'name': 'Kd', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd', "compactHeight": False}, {'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [ {'name': 'Minimum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min', "compactHeight": False}, {'name': 'Maximum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6, 'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max', "compactHeight": False}, ]}, {'name': 'PID Auto Tune', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Target Temperature', 'type': 'float', 'value': 20.0, 'step': 0.1, 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Test Current', 'type': 'float', 'value': 1000, 'decimals': 6, 'step': 100, 'limits': (-1000, 1000), 'unit': 'mA', "compactHeight": False}, {'name': 'Temperature Swing', 'type': 'float', 'value': 0.0, 'step': 0.0001, 'prefix': '±', 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Lookback', 'type': 'float', 'value': 5.0, 'step': 0.1, 'unit': 's', 'format': '{value:.4f}', "compactHeight": False}, {'name': 'Run', 'type': 'action', 'tip': 'Run'}, ]}, ]}, ] def __init__(self, args): super(MainWindow, self).__init__() self.kirdy = Kirdy_Driver() ui_file_path = importlib.resources.files("ui").joinpath("kirdy_qt.ui") uic.loadUi(ui_file_path, self) self.info_box = QtWidgets.QMessageBox() self.info_box.setIcon(QtWidgets.QMessageBox.Icon.Information) # Load Global QT Style Sheet Settings qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss") with open(qss,"r") as fh: self.setStyleSheet(fh.read()) self.ip_addr = self.DEFAULT_IP_ADDR self.port = self.DEFAULT_PORT self.conn_settings_form = ConnSettingsForm() self.conn_settings_form.accepted.connect(self.start_connecting) self.update_net_settings_form = UpdateNetSettingsForm() self.update_net_settings_form.accepted.connect(self.update_net_settings) self.max_samples = self.DEFAULT_MAX_SAMPLES self.autotuner = PIDAutotune(25) self.setup_menu_bar() self._set_up_ctrl_btns() self._set_up_plot_menu() def _setValuewithLock(self, value): if not self.opts.get("lock", None): if self.opts.get("unit", None) is not None: self.setValue(siConvert(value, self.opts.get("unit", None))) else: self.setValue(value) Parameter.setValuewithLock = _setValuewithLock def _add_unit_to_title(param_tree): def _traverse(param): if param["type"] == "group" or param["type"] == "mutex": for param in param["children"]: _add_unit_to_title(param) else: if "unit" in param.keys(): if not("title" in param.keys()): param["title"] = param["name"] param["title"] = param["title"] + " ({:})".format(param["unit"]) if isinstance(param_tree, list): for param in param_tree: _traverse(param) else: _traverse(param_tree) _add_unit_to_title(self.LASER_DIODE_STATUS) _add_unit_to_title(self.LASER_DIODE_PARAMETERS) _add_unit_to_title(self.THERMOSTAT_STATUS) _add_unit_to_title(self.THERMOSTAT_PARAMETERS) self.params = [ Parameter.create(name=f"Laser Diode Status", type='group', value=0, children=self.LASER_DIODE_STATUS), Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS), Parameter.create(name=f"Thermostat Status", type='group', value=2, children=self.THERMOSTAT_STATUS), Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS), ] self._set_param_tree() self.tec_i_graph.setTitle("TEC Current") self.tec_temp_graph.setTitle("TEC Temperature") self.ld_i_set_graph.setTitle("LD Current Set") self.pd_mon_pwr_graph.setTitle("PD Mon Power") self.connect_btn.clicked.connect(self.show_conn_settings_form) self.kirdy_handler = Kirdy(self, self.kirdy, 1.0) self.kirdy_handler.setting_update_sig.connect(self.update_ld_ctrl_panel_settings) self.kirdy_handler.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings) self.kirdy_handler.report_update_sig.connect(self.update_ld_ctrl_panel_readings) self.kirdy_handler.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings) self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples) self.kirdy_handler.report_update_sig.connect(self.graphs.plot_append) self.loading_spinner.hide() self.kirdy_handler.connected_sig.connect(self._on_connection_changed) def setup_menu_bar(self): @pyqtSlot(bool) def about_kirdy(_): hw_rev = self.kirdy_handler.get_hw_rev() QtWidgets.QMessageBox.about( self, "About Kirdy", f"""

Sinara 1550 Kirdy v{hw_rev["major"]}.{hw_rev["minor"]}

""" ) self.menu_action_about_kirdy.triggered.connect(about_kirdy) @pyqtSlot(bool) def about_gui(_): # TODO: Replace the hardware revision placeholder QtWidgets.QMessageBox.about( self, "About Kirdy Control Panel", f"""

Version: "Version"

""" ) self.menu_action_about_gui.triggered.connect(about_gui) @pyqtSlot(bool) def dfu_mode(_): self.kirdy.task_dispatcher(self.kirdy.device.dfu()) self.kirdy_handler.end_session() self.menu_action_dfu_mode.triggered.connect(dfu_mode) @pyqtSlot(bool) def reset_kirdy(_): self.kirdy.task_dispatcher(self.kirdy.device.hard_reset()) self.kirdy_handler.end_session() self.menu_action_hard_reset.triggered.connect(reset_kirdy) @pyqtSlot(bool) def save_settings(_): self.kirdy.task_dispatcher(self.kirdy.device.save_current_settings_to_flash()) saved = QtWidgets.QMessageBox(self) saved.setWindowTitle("Config saved") saved.setText(f"Laser diode and thermostat configs have been saved into flash.") saved.setIcon(QtWidgets.QMessageBox.Icon.Information) saved.show() self.menu_action_save.triggered.connect(save_settings) @pyqtSlot(bool) def load_settings(_): self.kirdy.task_dispatcher(self.kirdy.device.restore_settings_from_flash()) loaded = QtWidgets.QMessageBox(self) loaded.setWindowTitle("Config loaded") loaded.setText(f"Laser Diode and Thermostat configs have been loaded from flash.") loaded.setIcon(QtWidgets.QMessageBox.Icon.Information) loaded.show() self.menu_action_load.triggered.connect(load_settings) @pyqtSlot(bool) def show_update_net_settings_form(_): self.update_net_settings_form.retranslateUi(self.update_net_settings_form) self.update_net_settings_form.show() self.menu_action_update_net_settings.triggered.connect(show_update_net_settings_form) def show_conn_settings_form(self): ip_addr = self.ip_addr.split(".") self.conn_settings_form.addr_in_0.setText(ip_addr[0]) self.conn_settings_form.addr_in_1.setText(ip_addr[1]) self.conn_settings_form.addr_in_2.setText(ip_addr[2]) self.conn_settings_form.addr_in_3.setText(ip_addr[3]) self.conn_settings_form.port_in.setText(str(self.port)) self.conn_settings_form.show() def _set_up_ctrl_btns(self): @pyqtSlot(bool) def ld_pwr_on(_): self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True)) self.ld_pwr_on_btn.clicked.connect(ld_pwr_on) @pyqtSlot(bool) def ld_pwr_off(_): self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False)) self.ld_pwr_off_btn.clicked.connect(ld_pwr_off) @pyqtSlot(bool) def ld_clear_alarm(_): self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm()) self.ld_clear_alarm_btn.clicked.connect(ld_clear_alarm) @pyqtSlot(bool) def tec_pwr_on(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(True)) self.tec_pwr_on_btn.clicked.connect(tec_pwr_on) @pyqtSlot(bool) def tec_pwr_off(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(False)) self.tec_pwr_off_btn.clicked.connect(tec_pwr_off) @pyqtSlot(bool) def tec_clear_alarm(_): self.kirdy.task_dispatcher(self.kirdy.thermostat.clear_alarm()) self.tec_clear_alarm_btn.clicked.connect(tec_clear_alarm) def _set_up_plot_menu(self): self.plot_menu = QtWidgets.QMenu() self.plot_menu.setTitle("Plot Settings") clear = QtGui.QAction("Clear graphs", self.plot_menu) clear.triggered.connect(self.clear_graphs) self.plot_menu.addAction(clear) self.plot_menu.clear = clear self.samples_spinbox = QtWidgets.QSpinBox() self.samples_spinbox.setRange(2, 100000) self.samples_spinbox.setSuffix(' samples') self.samples_spinbox.setValue(self.max_samples) self.samples_spinbox.valueChanged.connect(self.set_max_samples) limit_samples = QtWidgets.QWidgetAction(self.plot_menu) limit_samples.setDefaultWidget(self.samples_spinbox) self.plot_menu.addAction(limit_samples) self.plot_menu.limit_samples = limit_samples self.plot_settings.setMenu(self.plot_menu) def _set_param_tree(self): status = self.ld_status status.setHeaderHidden(True) status.setParameters(self.params[0], showTop=False) tree = self.ld_tree tree.setHeaderHidden(True) tree.setParameters(self.params[1], showTop=False) self.params[1].sigTreeStateChanged.connect(self.send_command) status = self.tec_status status.setHeaderHidden(True) status.setParameters(self.params[2], showTop=False) tree = self.tec_tree tree.setHeaderHidden(True) tree.setParameters(self.params[3], showTop=False) self.params[3].sigTreeStateChanged.connect(self.send_command) @asyncSlot() async def autotune(param): match self.autotuner.state(): case PIDAutotuneState.STATE_OFF: settings = await self.kirdy.device.get_settings_summary() self.autotuner.setParam( param.parent().child('Target Temperature').value(), param.parent().child('Test Current').value() / 1000, param.parent().child('Temperature Swing').value(), 1.0 / settings['thermostat']['temp_adc_settings']['rate'], param.parent().child('Lookback').value()) self.autotuner.setReady() param.setOpts(title="Stop") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_constant_current_control_mode()) self.kirdy_handler.report_update_sig.connect(self.autotune_tick) self.loading_spinner.show() self.loading_spinner.start() self.background_task_lbl.setText("Autotuning") case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN: self.autotuner.setOff() param.setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0)) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.params[3].child('PID Config', 'PID Auto Tune', 'Run').sigActivated.connect(autotune) @pyqtSlot(dict) def autotune_tick(self, report): match self.autotuner.state(): case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN: self.autotuner.run(report['thermostat']['temperature'], report['ts']/1000) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(self.autotuner.output())) case PIDAutotuneState.STATE_SUCCEEDED: kp, ki, kd = self.autotuner.get_tec_pid() self.autotuner.setOff() self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kp(kp)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_ki(ki)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kd(kd)) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_control_mode()) self.kirdy.task_dispatcher(self.kirdy.thermostat.set_temperature_setpoint(self.params[3].child('PID Config', 'PID Auto Tune', 'Target Temperature').value())) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.info_box.setWindowTitle("PID AutoTune Success") self.info_box.setText("PID Config has been loaded to Thermostat.\nRegulating temperature.") self.info_box.show() case PIDAutotuneState.STATE_FAILED: self.autotuner.setOff() self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run") self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0)) self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick) self.background_task_lbl.setText("Ready.") self.loading_spinner.stop() self.loading_spinner.hide() self.info_box.setWindowTitle("PID Autotune Failed") self.info_box.setText("PID Autotune is failed.") self.info_box.show() @pyqtSlot(bool) def _on_connection_changed(self, result): def ctrl_panel_setEnable(result): self.ld_status.setEnabled(result) self.ld_tree.setEnabled(result) self.ld_pwr_on_btn.setEnabled(result) self.ld_pwr_off_btn.setEnabled(result) self.ld_clear_alarm_btn.setEnabled(result) self.tec_status.setEnabled(result) self.tec_tree.setEnabled(result) self.tec_pwr_on_btn.setEnabled(result) self.tec_pwr_off_btn.setEnabled(result) self.tec_clear_alarm_btn.setEnabled(result) ctrl_panel_setEnable(result) def menu_bar_setEnable(result): self.menu_action_about_kirdy.setEnabled(result) self.menu_action_connect.setEnabled(result) self.menu_action_dfu_mode.setEnabled(result) self.menu_action_disconnect.setEnabled(result) self.menu_action_hard_reset.setEnabled(result) self.menu_action_save.setEnabled(result) self.menu_action_load.setEnabled(result) self.menu_action_update_net_settings.setEnabled(result) menu_bar_setEnable(result) def graph_group_setEnable(result): self.ld_i_set_graph.setEnabled(result) self.pd_mon_pwr_graph.setEnabled(result) self.tec_i_graph.setEnabled(result) self.tec_temp_graph.setEnabled(result) graph_group_setEnable(result) self.report_group.setEnabled(result) # TODO: Use QStateMachine to manage connections self.connect_btn.clicked.disconnect() if result: self.connect_btn.setText("Disconnect") self.connect_btn.clicked.connect(self.kirdy_handler.end_session) self._status() else: if self.kirdy_handler.connecting(): self.status_lbl.setText(f"Connection is dropped. Reconnecting to {self.ip_addr}:{self.port}.") self.connect_btn.setText("Stop") else: self.connect_btn.setText("Connect") self.connect_btn.clicked.connect(self.show_conn_settings_form) self.clear_graphs() self.status_lbl.setText(f"Disconnected from {self.ip_addr}:{self.port}.") self.connect_btn.clicked.connect(self.kirdy_handler.end_session) def _status(self): host = self.ip_addr port = self.port hw_rev = self.kirdy_handler.get_hw_rev() self.status_lbl.setText(f"Connected to Kirdy v{hw_rev['major']}.{hw_rev['minor']} @ {host}:{port}") def clear_graphs(self): self.graphs.clear_data_pts() @pyqtSlot(dict) def graphs_update(self, report): self.graphs.plot_append(report) @pyqtSlot(dict) def update_ld_ctrl_panel_settings(self, settings): try: settings = settings['laser'] with QSignalBlocker(self.params[1]): self.params[1].child('Output Config', 'LD Current Set').setValuewithLock(settings["ld_drive_current"]['value']) self.params[1].child('Output Config', 'LD Terminals Short').setValuewithLock(settings["ld_terms_short"]) self.params[1].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"]) self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').setValuewithLock(settings["ld_pwr_limit"]) if settings["pd_mon_params"]["responsitivity"] is not None: self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(settings["pd_mon_params"]["responsitivity"]) else: self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(0) self.params[1].child('Photodiode Monitor Config', 'Dark Current').setValuewithLock(settings["pd_mon_params"]["i_dark"]) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True) @pyqtSlot(dict) def update_ld_ctrl_panel_readings(self, report): try: report = report['laser'] with QSignalBlocker(self.params[0]): if report['pwr_excursion']: self.params[0].child('Status', 'Color').setValuewithLock('r') self.params[0].child('Status').setOpts(title='Status: OverPower Alarm') else: self.params[0].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w') self.params[0].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off') with QSignalBlocker(self.params[1]): self.params[1].child('Readings', 'LD Current Set').setValuewithLock(report["ld_i_set"]) self.params[1].child('Readings', 'PD Current').setValuewithLock(report["pd_i"]) if report["pd_pwr"] is not None: self.params[1].child('Readings', 'PD Power').setValuewithLock(report["pd_pwr"]) else: self.params[1].child('Readings', 'PD Power').setValuewithLock(0) self.params[1].child('Readings', 'LF Mod Termination (50 Ohm)').setValuewithLock(report["term_50ohm"]) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True) @pyqtSlot(dict) def update_thermostat_ctrl_panel_settings(self, settings): try: settings = settings['thermostat'] with QSignalBlocker(self.params[3]): self.params[3].child('Output Config', 'Control Method').setValuewithLock("Temperature PID" if settings["pid_engaged"] else "Constant Current") self.params[3].child('Output Config', 'Control Method', 'Set Current').setValuewithLock(settings["tec_settings"]['i_set']['value']) self.params[3].child('Output Config', 'Control Method', 'Set Temperature').setValuewithLock(float(settings["temperature_setpoint"])) self.params[3].child('Output Config', 'Limits', 'Max Cooling Current').setValuewithLock(settings["tec_settings"]['max_i_pos']['value']) self.params[3].child('Output Config', 'Limits', 'Max Heating Current').setValuewithLock(settings["tec_settings"]['max_i_neg']['value']) self.params[3].child('Output Config', 'Limits', 'Max Voltage Difference').setValuewithLock(settings["tec_settings"]['max_v']['value']) self.params[3].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"]) # TODO: Update the Temperature ADC Settings here as well self.params[3].child('Temperature Monitor Config', 'Upper Limit').setValuewithLock(settings["temp_mon_settings"]['upper_limit']) self.params[3].child('Temperature Monitor Config', 'Lower Limit').setValuewithLock(settings["temp_mon_settings"]['lower_limit']) self.params[3].child('PID Config', 'Kp').setValuewithLock(settings["pid_params"]['kp']) self.params[3].child('PID Config', 'Ki').setValuewithLock(settings["pid_params"]['ki']) self.params[3].child('PID Config', 'Kd').setValuewithLock(settings["pid_params"]['kd']) self.params[3].child('PID Config', 'PID Output Clamping', 'Minimum').setValuewithLock(settings["pid_params"]['output_min']) self.params[3].child('PID Config', 'PID Output Clamping', 'Maximum').setValuewithLock(settings["pid_params"]['output_max']) self.params[3].child('Thermistor Settings', 'T₀').setValuewithLock(settings["thermistor_params"]['t0']) self.params[3].child('Thermistor Settings', 'R₀').setValuewithLock(settings["thermistor_params"]['r0']) self.params[3].child('Thermistor Settings', 'B').setValuewithLock(settings["thermistor_params"]['b']) self.graphs.set_temp_setpoint_line(temp=round(settings["temperature_setpoint"], 4)) self.graphs.set_temp_setpoint_line(visible=settings['pid_engaged']) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True) @pyqtSlot(dict) def update_thermostat_ctrl_panel_readings(self, report): try: report = report['thermostat'] with QSignalBlocker(self.params[2]): if report['temp_mon_status']['over_temp_alarm']: self.params[2].child('Status', 'Color').setValuewithLock('r') self.params[2].child('Status').setOpts(title='Status: OverTemperature Alarm') else: self.params[2].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w') self.params[2].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off') with QSignalBlocker(self.params[3]): if report["temperature"] == None: self.params[3].child('Readings', 'Temperature').setValuewithLock(-273.15) else: self.params[3].child('Readings', 'Temperature').setValuewithLock(report["temperature"]) self.params[3].child('Readings', 'Current through TEC').setValuewithLock(report["tec_i"]) except Exception as e: logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True) @pyqtSlot(int) def set_max_samples(self, samples: int): self.graphs.set_max_samples(samples) @pyqtSlot() def update_net_settings(self): net_settings = self.update_net_settings_form.get_net_settings() if net_settings is None: self.status_lbl.setText("Invalid IP Settings Input") return addr = net_settings["ip_addr"] port = net_settings["port"] prefix_len = net_settings["prefix_len"] gateway = net_settings["gateway_addr"] self.kirdy.task_dispatcher(self.kirdy.device.set_ip_settings(addr, port, prefix_len, gateway)) self.status_lbl.setText("IP Settings is Updated") @pyqtSlot() def start_connecting(self): net_settings = self.conn_settings_form.get_net_settings() if net_settings is None: self.status_lbl.setText("Invalid IP Settings Input") return self.ip_addr = net_settings["ip_addr"] self.port = net_settings["port"] host = self.ip_addr port = self.port if not (self.kirdy_handler.connecting() or self.kirdy_handler.connected()): self.status_lbl.setText("Connecting...") self.kirdy_handler.start_session(host=host, port=port) self.connect_btn.setText("Stop") self.connect_btn.clicked.disconnect() self.connect_btn.clicked.connect(self.kirdy_handler.end_session) @pyqtSlot(object, object) def send_command(self, param, changes): for inner_param, change, data in changes: if change == 'value': """ cmd translation from mutex type parameter """ if inner_param.opts.get('target_action_pair', None) is not None: target, action = inner_param.opts['target_action_pair'][inner_param.opts['limits'].index(data)] cmd = getattr(getattr(self.kirdy, target), action) param.child(*param.childPath(inner_param)).setOpts(lock=True) self.kirdy.task_dispatcher(cmd()) param.child(*param.childPath(inner_param)).setOpts(lock=False) continue """ cmd translation from non-mutex type parameter""" if inner_param.opts.get("target", None) is not None: if inner_param.opts.get("action", None) is not None: if inner_param.opts.get("unit", None) is not None: _, _, suffix = siParse(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX) data = siEval(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX, suffix=suffix) cmd = getattr(getattr(self.kirdy, inner_param.opts["target"]), inner_param.opts["action"]) param.child(*param.childPath(inner_param)).setOpts(lock=True) self.kirdy.task_dispatcher(cmd(data)) param.child(*param.childPath(inner_param)).setOpts(lock=False) continue async def coro_main(): args = get_argparser().parse_args() if args.log_file or args.log_file is None: filename = args.log_file if filename is None: time = datetime.now() filename = f"{time.year}-{time.day}-{time.month}-{time.hour}:{time.minute}:{time.second}.log" logging.basicConfig(filename=filename, filemode='a', format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', datefmt='%Y/%d/%m %H:%M:%S', level=logging.INFO) if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app_quit_event = asyncio.Event() app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(app_quit_event.set) main_window = MainWindow(args) main_window.show() await app_quit_event.wait() def main(): qasync.run(coro_main()) if __name__ == '__main__': main()