from PyQt6 import QtWidgets, QtGui, QtCore from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot import pyqtgraph.parametertree.parameterTypes as pTypes from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType import pyqtgraph as pg pg.setConfigOptions(antialias=True) from pyqtgraph import mkPen from pglive.sources.live_axis_range import LiveAxisRange from pglive.sources.data_connector import DataConnector from pglive.kwargs import Axis, LeadingLine from pglive.sources.live_plot import LiveLinePlot from pglive.sources.live_plot_widget import LivePlotWidget from pglive.sources.live_axis import LiveAxis import sys import os import argparse import logging import asyncio from driver.kirdy_async import Kirdy, StoppedConnecting import qasync from qasync import asyncSlot, asyncClose from collections import deque from datetime import datetime, timezone, timedelta from time import time from typing import Any, Optional, List from ui.ui_conn_settings_form import Ui_Conn_Settings_Form from ui.ui_update_network_settings_form import Ui_Update_Network_Settings_Form from ui.ui_kirdy_qt import Ui_MainWindow from dateutil import tz import math import socket COMMON_ERROR_MSG = "Connection Timeout. Disconnecting." def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") parser.add_argument("--connect", default=None, action="store_true", help="Automatically connect to the specified Thermostat in IP:port format") parser.add_argument('IP', metavar="ip", default=None, nargs='?') parser.add_argument('PORT', metavar="port", default=None, nargs='?') parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Set the logging level") return parser class KirdyDataWatcher(QObject): """ This class provides various signals for Mainwindow to update various kinds of GUI objects """ connection_error_sig = pyqtSignal() setting_update_sig = pyqtSignal(dict) report_update_sig = pyqtSignal(dict) def __init__(self, parent, kirdy, update_s): self._update_s = update_s self._kirdy = kirdy self._watch_task = None self._report_mode_task = None self._poll_for_report = True super().__init__(parent) async def signal_emitter(self): settings_summary = await self._kirdy.device.get_settings_summary() self.setting_update_sig.emit(settings_summary) if self._poll_for_report: status_report = await self._kirdy.device.get_status_report() self.report_update_sig.emit(status_report) async def run(self): try: task = asyncio.create_task(self.signal_emitter()) while True: if task.done(): _ = task.result() task = asyncio.create_task(self.signal_emitter()) await asyncio.sleep(self._update_s) except asyncio.CancelledError: task.cancel() except Exception as e: logging.error(COMMON_ERROR_MSG) self._kirdy.stop_report_mode() self.connection_error_sig.emit() def start_watching(self): self._watch_task = asyncio.create_task(self.run()) async def stop_watching(self): if self._watch_task is not None: self._watch_task.cancel() await self._watch_task self._watch_task = None await self.set_report_mode(False) async def set_report_mode(self, enabled: bool): self._poll_for_report = not enabled if enabled: self._report_mode_task = asyncio.create_task(self.report_mode()) else: if self._report_mode_task is not None: self._kirdy.stop_report_mode() await self._report_mode_task self._report_mode_task = None async def report_mode(self): try: async for status_report in self._kirdy.report_mode(): if status_report["msg_type"] == "Exception": raise TimeoutError("Connection Timeout") self.report_update_sig.emit(status_report) except Exception as e: logging.error(f"{COMMON_ERROR_MSG}") self.connection_error_sig.emit() @pyqtSlot(float) def set_update_s(self, update_s): self._update_s = update_s class Graphs: def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000): self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph] self.connectors = [] self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r')) self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r')) self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r')) self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r')) self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g')) self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value} °C', pen=pg.mkPen('g')) # Render the temperature setpoint line on top of the temperature being plotted self._temp_setpoint_line.setZValue(10) self._temp_setpoint_line.setVisible(False) def tickStrings(values: List, scale: float, spacing: float) -> List: return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values] for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph: time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="") # Display the relative ts in custom %H:%M:%S format without local timezone time_axis.tickStrings = tickStrings # Prevent scaling prefix being added to the back fo axis label time_axis.autoSIPrefix = False time_axis.showLabel() graph.setAxisItems({'bottom': time_axis}) graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'}) #TODO: x_range should not be updated on every tick graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900) graph.x_range_controller.crop_left_offset_to_data = True # Enable linking of axes in the graph widget's context menu graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title self.max_samples = max_samples ld_i_set_axis = LiveAxis('left', text="Current", units="A") ld_i_set_axis.showLabel() ld_i_set_graph.setAxisItems({'left': ld_i_set_axis}) ld_i_set_graph.addItem(self._ld_i_set_plot) self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, max_points=self.max_samples) self.connectors += [self.ld_i_set_connector] pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W") pd_mon_pwr_axis.showLabel() pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis}) pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot) self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, max_points=self.max_samples) self.connectors += [self.pd_mon_pwr_connector] tec_temp_axis = LiveAxis('left', text="Temperature", units="°C") tec_temp_axis.showLabel() tec_temp_graph.setAxisItems({'left': tec_temp_axis}) tec_temp_graph.addItem(self._tec_setpoint_plot) tec_temp_graph.addItem(self._tec_temp_plot) self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, max_points=1) self.tec_temp_connector = DataConnector(self._tec_temp_plot, max_points=self.max_samples) self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector] tec_i_axis = LiveAxis('left', text="Current", units="A") tec_i_axis.showLabel() tec_i_graph.setAxisItems({'left': tec_i_axis}) tec_i_graph.addLegend(brush=(50, 50, 200, 150)) tec_i_graph.y_range_controller = LiveAxisRange(fixed_range=[-1.0, 1.0]) tec_i_graph.addItem(self._tec_i_target_plot) tec_i_graph.addItem(self._tec_i_measure_plot) self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, max_points=self.max_samples) self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, max_points=self.max_samples) self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector] def set_max_samples(self, max_samples): self.max_samples = max_samples for connector in self.connectors: with connector.data_lock: connector.max_points = self.max_samples connector.x = deque(maxlen=int(connector.max_points)) connector.y = deque(maxlen=int(connector.max_points)) def plot_append(self, report): try: ld_i_set = report['laser']['ld_i_set'] pd_pwr = report['laser']['pd_pwr'] tec_i_set = report['thermostat']['i_set'] tec_i_measure = report['thermostat']['tec_i'] tec_temp = report['thermostat']['temperature'] ts = report['ts'] self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts) self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts) if tec_temp is not None: self.tec_temp_connector.cb_append_data_point(tec_temp, ts) if self._temp_setpoint_line.isVisible(): self.tec_setpoint_connector.cb_append_data_point(self._temp_setpoint_line.value(), ts) else: self.tec_setpoint_connector.cb_append_data_point(tec_temp, ts) if tec_i_measure is not None: self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts) self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts) except Exception as e: logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True) def clear_data_pts(self): for connector in self.connectors: connector.clear() connector.resume() def set_temp_setpoint_line(self, temp=None, visible=None): if visible is not None: self._temp_setpoint_line.setVisible(visible) if temp is not None: self._temp_setpoint_line.setValue(temp) # PyQtGraph normally does not update this text when the line # is not visible, so make sure that the temperature label # gets updated always, and doesn't stay at an old value. self._temp_setpoint_line.label.setText(f"{temp} °C", color='g') class MutexParameter(pTypes.ListParameter): """ Mutually exclusive parameter where only one of its children is visible at a time, list selectable. The ordering of the list items determines which children will be visible. """ def __init__(self, **opts): super().__init__(**opts) self.sigValueChanged.connect(self.show_chosen_child) self.sigValueChanged.emit(self, self.opts['value']) def _get_param_from_value(self, value): if isinstance(self.opts['limits'], dict): values_list = list(self.opts['limits'].values()) else: values_list = self.opts['limits'] return self.children()[values_list.index(value)] @pyqtSlot(object, object) def show_chosen_child(self, value): for param in self.children(): param.hide() child_to_show = self._get_param_from_value(value.value()) child_to_show.show() if child_to_show.opts.get('triggerOnShow', None): child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value()) registerParameterType('mutex', MutexParameter) class UpdateNetSettingsForm(QtWidgets.QDialog, Ui_Update_Network_Settings_Form): def __init__(self): super().__init__() self.setupUi(self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}" socket.inet_aton(ip_addr) socket.inet_aton(gateway_addr) return { "ip_addr": ip_addr, "gateway_addr": gateway_addr, "prefix_len": int(self.prefix_len_in.text()), "port": int(self.port_in.text()), } except (OSError, ValueError): return None class ConnSettingsForm(QtWidgets.QDialog, Ui_Conn_Settings_Form): def __init__(self): super().__init__() self.setupUi(self) def get_net_settings(self): try: ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}" socket.inet_aton(ip_addr) return { "ip_addr": ip_addr, "port": int(self.port_in.text()) } except (OSError, ValueError): return None class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): """The maximum number of sample points to store.""" DEFAULT_MAX_SAMPLES = 1000 DEFAULT_IP_ADDR = '192.168.1.128' DEFAULT_PORT = 1337 LASER_DIODE_STATUS = [ {'name': 'Power', 'type': 'color', 'value': 'w', 'readonly': True}, {'name': 'Alarm', 'type': 'color', 'value': 'w', 'readonly': True}, ] LASER_DIODE_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True}, {'name': 'PD Current', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True}, {'name': 'PD Power', 'type': 'float', 'suffix': 'W', 'siPrefix': True, 'readonly': True}, {'name': 'LF Mod Impedance', 'type': 'list', 'limits': ['Is50Ohm', 'Not50Ohm'], 'readonly': True} ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1), 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_i'}, {'name': 'LD Current Set Soft Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1), 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_i_soft_limit'}, {'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 0.3), 'suffix': 'W', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit'}, {'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short'}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on'}, ]}, {'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000), 'suffix': 'A/W', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity'}, {'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000), 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current'}, ]}, ] THERMOSTAT_STATUS = [ {'name': 'Power', 'type': 'color', 'value': 'w', 'readonly': True}, {'name': 'Alarm', 'type': 'color', 'value': 'w', 'readonly': True}, ] THERMOSTAT_PARAMETERS = [ {'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Temperature', 'type': 'float', 'format': '{value:.4f} °C', 'readonly': True}, {'name': 'Current through TEC', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'decimals': 6, 'readonly': True}, ]}, {'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [ {'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'], 'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [ {'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 0.001, 'limits': (-1, 1), 'triggerOnShow': True, 'decimals': 6, 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out'}, {'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.1, 'limits': (-273, 300), 'format': '{value:.4f} °C', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint'}, ]}, {'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 1), 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i'}, {'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 1), 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i'}, {'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 5), 'suffix': 'V', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v'}, ]}, {'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'}, ]}, # TODO Temperature ADC Filter Settings {'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit'}, {'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit'}, ]}, {'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [ {'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300), 'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0'}, {'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'suffix': 'Ω', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0'}, {'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'suffix': 'K', 'decimals': 4, 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta'}, ]}, {'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Kp', 'type': 'float', 'step': 0.1, 'suffix': '', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp'}, {'name': 'Ki', 'type': 'float', 'step': 0.1, 'suffix': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki'}, {'name': 'Kd', 'type': 'float', 'step': 0.1, 'suffix': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd'}, {'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [ {'name': 'Minimum', 'type': 'float', 'step': 100, 'limits': (-1, 1), 'decimals': 6, 'suffix': 'A', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min'}, {'name': 'Maximum', 'type': 'float', 'step': 100, 'limits': (-1, 1), 'decimals': 6, 'suffix': 'A', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max'}, ]}, # TODO PID AutoTune ]}, ] def __init__(self, args): super().__init__() self.kirdy = Kirdy() self.setupUi(self) # Load Global QT Style Sheet Settings qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss") with open(qss,"r") as fh: self.setStyleSheet(fh.read()) self.ip_addr = self.DEFAULT_IP_ADDR self.port = self.DEFAULT_PORT self.conn_settings_form = ConnSettingsForm() self.conn_settings_form.accepted.connect(self.start_connecting) self.update_net_settings_form = UpdateNetSettingsForm() self.update_net_settings_form.accepted.connect(self.update_net_settings) self.max_samples = self.DEFAULT_MAX_SAMPLES self.setup_menu_bar() self._set_up_ctrl_btns() self._set_up_plot_menu() self.params = [ Parameter.create(name=f"Laser Diode Status", type='group', value=0, children=self.LASER_DIODE_STATUS), Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS), Parameter.create(name=f"Thermostat Status", type='group', value=2, children=self.THERMOSTAT_STATUS), Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS), ] self._set_param_tree() self.tec_i_graph.setTitle("TEC Current") self.tec_temp_graph.setTitle("TEC Temperature") self.ld_i_set_graph.setTitle("LD Current Set") self.pd_mon_pwr_graph.setTitle("PD Mon Power") self.connect_btn.clicked.connect(self.show_conn_settings_form) self.report_box.stateChanged.connect(self.on_report_box_stateChanged) self.kirdy_data_watcher = KirdyDataWatcher(self, self.kirdy, self.report_refresh_spin.value()) self.kirdy_data_watcher.connection_error_sig.connect(self.bail) # TODO: Identify the usable range of set_update_s self.report_apply_btn.clicked.connect( lambda: self.kirdy_data_watcher.set_update_s(self.report_refresh_spin.value()) ) self.kirdy_data_watcher.setting_update_sig.connect(self.update_ld_ctrl_panel_settings) self.kirdy_data_watcher.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings) self.kirdy_data_watcher.report_update_sig.connect(self.update_ld_ctrl_panel_readings) self.kirdy_data_watcher.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings) self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples) self.kirdy_data_watcher.report_update_sig.connect(self.graphs.plot_append) self.loading_spinner.hide() def setup_menu_bar(self): @pyqtSlot(bool) def about_kirdy(_): # TODO: Replace the hardware revision placeholder QtWidgets.QMessageBox.about( self, "About Kirdy", f"""