from PyQt6 import QtWidgets, QtGui, QtCore from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType import pyqtgraph as pg from pglive.sources.data_connector import DataConnector from pglive.kwargs import Axis from pglive.sources.live_plot import LiveLinePlot from pglive.sources.live_plot_widget import LivePlotWidget from pglive.sources.live_axis import LiveAxis import sys import argparse import logging import asyncio from pytec.aioclient import Client, StoppedConnecting import qasync from qasync import asyncSlot, asyncClose # pyuic6 -x tec_qt.ui -o ui_tec_qt.py from ui_tec_qt import Ui_MainWindow class CommandsParameter(Parameter): pass THERMOSTAT_PARAMETERS = [[ {'name': 'Constant Current', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (-3, 3), 'siPrefix': True, 'suffix': 'A', 'commands': [f'pwm {ch} i_set {{value}}']}, {'name': 'Temperature PID', 'type': 'bool', 'value': False, 'commands': [f'pwm {ch} pid'], 'payload': ch, 'children': [ {'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.1, 'limits': (-273, 300), 'siPrefix': True, 'suffix': '°C', 'commands': [f'pid {ch} target {{value}}'], 'payload': ch}, ]}, {'name': 'Output Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Max Current', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 3), 'siPrefix': True, 'suffix': 'A', 'commands': [f'pwm {ch} max_i_pos {{value}}', f'pwm {ch} max_i_neg {{value}}', f'pid {ch} output_min -{{value}}', f'pid {ch} output_max {{value}}']}, {'name': 'Max Voltage', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 5), 'siPrefix': True, 'suffix': 'V', 'commands': [f'pwm {ch} max_v {{value}}']}, ]}, {'name': 'Thermistor Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'T₀', 'type': 'float', 'value': 25, 'step': 0.1, 'limits': (-100, 100), 'siPrefix': True, 'suffix': '°C', 'commands': [f's-h {ch} t0 {{value}}']}, {'name': 'R₀', 'type': 'float', 'value': 10000, 'step': 1, 'siPrefix': True, 'suffix': 'Ω', 'commands': [f's-h {ch} r0 {{value}}']}, {'name': 'β', 'type': 'float', 'value': 3950, 'step': 1, 'suffix': 'K', 'commands': [f's-h {ch} b {{value}}']}, ]}, {'name': 'Postfilter Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Rate', 'type': 'float', 'value': 16.67, 'step': 0.01, 'suffix': 'Hz', 'commands': [f'postfilter {ch} rate {{value}}']}, ]}, {'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Kp', 'type': 'float', 'value': 0, 'step': 0.1, 'suffix': '°C/A', 'commands': [f'pid {ch} kp {{value}}']}, {'name': 'Ki', 'type': 'float', 'value': 0, 'step': 0.1, 'suffix': '°C/C', 'commands': [f'pid {ch} ki {{value}}']}, {'name': 'Kd', 'type': 'float', 'value': 0, 'step': 0.1, 'suffix': '°Cs²/C', 'commands': [f'pid {ch} kd {{value}}']}, {'name': 'PID Auto Tune', 'expanded': False, 'type': 'group', 'children': [ {'name': 'Target Temperature', 'type': 'float', 'value': 20, 'step': 0.1, 'siPrefix': True, 'suffix': '°C'}, {'name': 'Test Current', 'type': 'float', 'value': 1, 'step': 0.1, 'siPrefix': True, 'suffix': 'A'}, {'name': 'Temperature Swing', 'type': 'float', 'value': 1.5, 'step': 0.1, 'siPrefix': True, 'prefix': '±', 'suffix': '°C'}, {'name': 'Run', 'type': 'action', 'tip': 'Run'}, ]}, ]}, {'name': 'Save to flash', 'type': 'action', 'tip': 'Save config to flash', 'commands': [f'save {ch}']} ] for ch in range(2)] params = [ CommandsParameter.create(name='Thermostat Params 0', type='group', children=THERMOSTAT_PARAMETERS[0]), CommandsParameter.create(name='Thermostat Params 1', type='group', children=THERMOSTAT_PARAMETERS[1]), ] def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") parser.add_argument("--connect", default=None, action="store_true", help="Automatically connect to the specified Thermostat in IP:port format") parser.add_argument('IP', metavar="ip", default=None, nargs='?') parser.add_argument('PORT', metavar="port", default=None, nargs='?') parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Set the logging level") return parser class ClientWatcher(QObject): fan_update = pyqtSignal(dict) pwm_update = pyqtSignal(list) report_update = pyqtSignal(list) pid_update = pyqtSignal(list) thermistor_update = pyqtSignal(list) postfilter_update = pyqtSignal(list) def __init__(self, parent, client, update_s): self.update_s = update_s self.client = client self.watch_task = None self.poll_for_report = True super().__init__(parent) async def run(self): loop = asyncio.get_running_loop() while True: time = loop.time() await self.update_params() await asyncio.sleep(self.update_s - (loop.time() - time)) async def update_params(self): self.fan_update.emit(await self.client.fan()) self.pwm_update.emit(await self.client.get_pwm()) if self.poll_for_report: self.report_update.emit(await self.client._command("report")) self.pid_update.emit(await self.client.get_pid()) self.thermistor_update.emit(await self.client.get_steinhart_hart()) self.postfilter_update.emit(await self.client.get_postfilter()) def start_watching(self): self.watch_task = asyncio.create_task(self.run()) def is_watching(self): return self.watch_task is not None @pyqtSlot() def stop_watching(self): if self.watch_task is not None: self.watch_task.cancel() self.watch_task = None def set_report_polling(self, enabled: bool): self.poll_for_report = enabled @pyqtSlot(float) def set_update_s(self, update_s): self.update_s = update_s class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): """The maximum number of sample points to store.""" DEFAULT_MAX_SAMPLES = 1000 def __init__(self, args): super().__init__() self.setupUi(self) self.max_samples = self.DEFAULT_MAX_SAMPLES self._set_up_connection_menu() self._set_up_thermostat_menu() self._set_up_plot_menu() self._set_param_tree() self.ch0_t_plot = LiveLinePlot() self.ch0_i_plot = LiveLinePlot() self.ch1_t_plot = LiveLinePlot() self.ch1_i_plot = LiveLinePlot() self.ch0_t_line = self.ch0_t_graph.getPlotItem().addLine(label='{value} °C') self.ch0_t_line.setVisible(False) self.ch1_t_line = self.ch1_t_graph.getPlotItem().addLine(label='{value} °C') self.ch1_t_line.setVisible(False) self._set_up_graphs() self.ch0_t_connector = DataConnector(self.ch0_t_plot, max_points=self.DEFAULT_MAX_SAMPLES) self.ch0_i_connector = DataConnector(self.ch0_i_plot, max_points=self.DEFAULT_MAX_SAMPLES) self.ch1_t_connector = DataConnector(self.ch1_t_plot, max_points=self.DEFAULT_MAX_SAMPLES) self.ch1_i_connector = DataConnector(self.ch1_i_plot, max_points=self.DEFAULT_MAX_SAMPLES) self.hw_rev_data = None self.client = Client() self.client_watcher = ClientWatcher(self, self.client, self.report_refresh_spin.value()) self.client_watcher.fan_update.connect(self.fan_update) self.client_watcher.report_update.connect(self.plot) self.client_watcher.report_update.connect(self.update_report) self.client_watcher.pid_update.connect(self.update_pid) self.client_watcher.pwm_update.connect(self.update_pwm) self.client_watcher.thermistor_update.connect(self.update_thermistor) self.client_watcher.postfilter_update.connect(self.update_postfilter) self.report_apply_btn.clicked.connect( lambda: self.client_watcher.set_update_s(self.report_refresh_spin.value()) ) self.report_mode_task = None if args.connect: if args.IP: self.host_set_line.setText(args.IP) if args.PORT: self.port_set_spin.setValue(int(args.PORT)) self.connect_btn.click() def _set_up_connection_menu(self): _translate = QtCore.QCoreApplication.translate self.connection_menu = QtWidgets.QMenu() self.connection_menu.setTitle('Connection Settings') self.host_set_line = QtWidgets.QLineEdit() sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Policy.Fixed, QtWidgets.QSizePolicy.Policy.Expanding) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) sizePolicy.setHeightForWidth(self.host_set_line.sizePolicy().hasHeightForWidth()) self.host_set_line.setSizePolicy(sizePolicy) self.host_set_line.setMinimumSize(QtCore.QSize(160, 0)) self.host_set_line.setMaximumSize(QtCore.QSize(160, 16777215)) self.host_set_line.setMaxLength(15) self.host_set_line.setClearButtonEnabled(True) self.host_set_line.setObjectName("host_set_line") self.host_set_line.setText("192.168.1.26") self.host_set_line.setPlaceholderText(_translate("MainWindow", "IP for the Thermostat")) host = QtWidgets.QWidgetAction(self.connection_menu) host.setDefaultWidget(self.host_set_line) self.connection_menu.addAction(host) self.connection_menu.host = host self.port_set_spin = QtWidgets.QSpinBox() sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Policy.Fixed, QtWidgets.QSizePolicy.Policy.Fixed) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) sizePolicy.setHeightForWidth(self.port_set_spin.sizePolicy().hasHeightForWidth()) self.port_set_spin.setSizePolicy(sizePolicy) self.port_set_spin.setMinimumSize(QtCore.QSize(70, 0)) self.port_set_spin.setMaximumSize(QtCore.QSize(70, 16777215)) self.port_set_spin.setMaximum(65535) self.port_set_spin.setProperty("value", 23) self.port_set_spin.setObjectName("port_set_spin") port = QtWidgets.QWidgetAction(self.connection_menu) port.setDefaultWidget(self.port_set_spin) self.connection_menu.addAction(port) self.connection_menu.port = port self.connect_btn.setMenu(self.connection_menu) def _set_up_thermostat_menu(self): _translate = QtCore.QCoreApplication.translate self.thermostat_menu = QtWidgets.QMenu() self.thermostat_menu.setTitle('Thermostat settings') self.fan_group = QtWidgets.QWidget() self.fan_group.setEnabled(False) self.fan_group.setMinimumSize(QtCore.QSize(40, 0)) self.fan_group.setObjectName("fan_group") self.horizontalLayout_6 = QtWidgets.QHBoxLayout(self.fan_group) self.horizontalLayout_6.setContentsMargins(0, 0, 0, 0) self.horizontalLayout_6.setSpacing(0) self.horizontalLayout_6.setObjectName("horizontalLayout_6") self.gan_layout = QtWidgets.QHBoxLayout() self.gan_layout.setSpacing(9) self.gan_layout.setObjectName("gan_layout") self.fan_lbl = QtWidgets.QLabel(parent=self.fan_group) sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Policy.Fixed, QtWidgets.QSizePolicy.Policy.Expanding) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) sizePolicy.setHeightForWidth(self.fan_lbl.sizePolicy().hasHeightForWidth()) self.fan_lbl.setSizePolicy(sizePolicy) self.fan_lbl.setMinimumSize(QtCore.QSize(40, 0)) self.fan_lbl.setMaximumSize(QtCore.QSize(40, 16777215)) self.fan_lbl.setBaseSize(QtCore.QSize(40, 0)) self.fan_lbl.setObjectName("fan_lbl") self.gan_layout.addWidget(self.fan_lbl) self.fan_power_slider = QtWidgets.QSlider(parent=self.fan_group) sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Policy.Fixed, QtWidgets.QSizePolicy.Policy.Expanding) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) sizePolicy.setHeightForWidth(self.fan_power_slider.sizePolicy().hasHeightForWidth()) self.fan_power_slider.setSizePolicy(sizePolicy) self.fan_power_slider.setMinimumSize(QtCore.QSize(200, 0)) self.fan_power_slider.setMaximumSize(QtCore.QSize(200, 16777215)) self.fan_power_slider.setBaseSize(QtCore.QSize(200, 0)) self.fan_power_slider.setMinimum(1) self.fan_power_slider.setMaximum(100) self.fan_power_slider.setOrientation(QtCore.Qt.Orientation.Horizontal) self.fan_power_slider.setObjectName("fan_power_slider") self.gan_layout.addWidget(self.fan_power_slider) self.fan_auto_box = QtWidgets.QCheckBox(parent=self.fan_group) sizePolicy = QtWidgets.QSizePolicy(QtWidgets.QSizePolicy.Policy.Fixed, QtWidgets.QSizePolicy.Policy.Expanding) sizePolicy.setHorizontalStretch(0) sizePolicy.setVerticalStretch(0) sizePolicy.setHeightForWidth(self.fan_auto_box.sizePolicy().hasHeightForWidth()) self.fan_auto_box.setSizePolicy(sizePolicy) self.fan_auto_box.setMinimumSize(QtCore.QSize(70, 0)) self.fan_auto_box.setMaximumSize(QtCore.QSize(70, 16777215)) self.fan_auto_box.setObjectName("fan_auto_box") self.gan_layout.addWidget(self.fan_auto_box) self.fan_pwm_warning = QtWidgets.QLabel(parent=self.fan_group) self.fan_pwm_warning.setMinimumSize(QtCore.QSize(16, 0)) self.fan_pwm_warning.setText("") self.fan_pwm_warning.setObjectName("fan_pwm_warning") self.gan_layout.addWidget(self.fan_pwm_warning) self.horizontalLayout_6.addLayout(self.gan_layout) self.fan_power_slider.valueChanged.connect(self.fan_set) self.fan_auto_box.stateChanged.connect(self.fan_auto_set) self.fan_lbl.setToolTip(_translate("MainWindow", "Adjust the fan")) self.fan_lbl.setText(_translate("MainWindow", "Fan:")) self.fan_auto_box.setText(_translate("MainWindow", "Auto")) fan = QtWidgets.QWidgetAction(self.thermostat_menu) fan.setDefaultWidget(self.fan_group) self.thermostat_menu.addAction(fan) self.thermostat_menu.fan = fan @asyncSlot(bool) async def reset_thermostat(_): await self._on_connection_changed(False) await self.client.reset() await asyncio.sleep(0.1) # Wait for the reset to start self.connect_btn.click() # Reconnect self.actionReset.triggered.connect(reset_thermostat) self.thermostat_menu.addAction(self.actionReset) @asyncSlot(bool) async def dfu_mode(_): await self._on_connection_changed(False) await self.client.dfu() # TODO: add a firmware flashing GUI? self.actionEnter_DFU_Mode.triggered.connect(dfu_mode) self.thermostat_menu.addAction(self.actionEnter_DFU_Mode) self.thermostat_menu.addAction(self.actionNetwork_Settings) @asyncSlot(bool) async def load(_): await self.client.load_config() self.actionLoad_all_configs.triggered.connect(load) self.thermostat_menu.addAction(self.actionLoad_all_configs) @asyncSlot(bool) async def save(_): await self.client.save_config() self.actionSave_all_configs.triggered.connect(save) self.thermostat_menu.addAction(self.actionSave_all_configs) def about_thermostat(): QtWidgets.QMessageBox.about( self, _translate("MainWindow","About Thermostat"), f"""