from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView from pytec.gui.view.net_settings_input_diag import NetSettingsInputDiag from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu from pytec.gui.view.conn_menu import ConnMenu from pytec.gui.view.plot_options_menu import PlotOptionsMenu from pytec.gui.view.live_plot_view import LiveDataPlotter from pytec.gui.view.ctrl_panel import CtrlPanel from pytec.gui.view.info_box import InfoBox from pytec.gui.model.pid_autotuner import PIDAutoTuner from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState import json from autotune import PIDAutotuneState from qasync import asyncSlot, asyncClose import qasync import asyncio import logging import argparse from PyQt6 import QtWidgets, QtGui, uic from PyQt6.QtCore import QSignalBlocker, pyqtSlot import pyqtgraph as pg from functools import partial import importlib.resources def get_argparser(): parser = argparse.ArgumentParser(description="Thermostat Control Panel") parser.add_argument( "--connect", default=None, action="store_true", help="Automatically connect to the specified Thermostat in host:port format", ) parser.add_argument("HOST", metavar="host", default=None, nargs="?") parser.add_argument("PORT", metavar="port", default=None, nargs="?") parser.add_argument( "-l", "--log", dest="logLevel", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Set the logging level", ) parser.add_argument( "-p", "--param_tree", default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"), help="Param Tree Description JSON File", ) return parser class MainWindow(QtWidgets.QMainWindow): NUM_CHANNELS = 2 def __init__(self, args): super(MainWindow, self).__init__() ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui") uic.loadUi(ui_file_path, self) self.hw_rev_data = None self.info_box = InfoBox() self.thermostat = Thermostat( self, self.report_refresh_spin.value() ) self._connecting_task = None def handle_connection_error(): self.info_box.display_info_box( "Connection Error", "Thermostat connection lost. Is it unplugged?" ) self.bail() self.thermostat.connection_error.connect(handle_connection_error) self.thermostat.connection_error.connect(self.thermostat.timed_out) self.thermostat.connection_error.connect(self.bail) self.autotuners = PIDAutoTuner(self, self.thermostat, 2) def get_ctrl_panel_config(args): with open(args.param_tree, "r", encoding="utf-8") as f: return json.load(f)["ctrl_panel"] param_tree_sigActivated_handles = [ [ [["Save to flash"], partial(self.thermostat.save_cfg, ch)], [["Load from flash"], partial(self.thermostat.load_cfg, ch)], [ ["PID Config", "PID Auto Tune", "Run"], partial(self.pid_auto_tune_request, ch), ], ] for ch in range(self.NUM_CHANNELS) ] self.thermostat.info_box_trigger.connect(self.info_box.display_info_box) self.ctrl_panel_view = CtrlPanel( [self.ch0_tree, self.ch1_tree], get_ctrl_panel_config(args), self.send_command, param_tree_sigActivated_handles, ) self.zero_limits_warning = ZeroLimitsWarningView( self.style(), self.limits_warning ) self.ctrl_panel_view.set_zero_limits_warning_sig.connect( self.zero_limits_warning.set_limits_warning ) self.thermostat.fan_update.connect(self.fan_update) self.thermostat.report_update.connect(self.ctrl_panel_view.update_report) self.thermostat.report_update.connect(self.autotuners.tick) self.thermostat.report_update.connect(self.pid_autotune_handler) self.thermostat.pid_update.connect(self.ctrl_panel_view.update_pid) self.thermostat.pwm_update.connect(self.ctrl_panel_view.update_pwm) self.thermostat.thermistor_update.connect( self.ctrl_panel_view.update_thermistor ) self.thermostat.postfilter_update.connect( self.ctrl_panel_view.update_postfilter ) self.thermostat.interval_update.connect( self.autotuners.update_sampling_interval ) self.report_apply_btn.clicked.connect( lambda: self.thermostat.set_update_s(self.report_refresh_spin.value()) ) self.channel_graphs = LiveDataPlotter( [ [getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")] for ch in range(self.NUM_CHANNELS) ] ) self.thermostat.report_update.connect(self.channel_graphs.update_report) self.thermostat.pid_update.connect(self.channel_graphs.update_pid) self.plot_options_menu = PlotOptionsMenu() self.plot_options_menu.clear.triggered.connect(self.clear_graphs) self.plot_options_menu.samples_spinbox.valueChanged.connect( self.channel_graphs.set_max_samples ) self.plot_settings.setMenu(self.plot_options_menu) self.conn_menu = ConnMenu() self.connect_btn.setMenu(self.conn_menu) self.thermostat_ctrl_menu = ThermostatCtrlMenu(self.style()) self.thermostat_ctrl_menu.fan_set_act.connect(self.fan_set_request) self.thermostat_ctrl_menu.fan_auto_set_act.connect(self.fan_auto_set_request) self.thermostat_ctrl_menu.reset_act.connect(self.reset_request) self.thermostat_ctrl_menu.dfu_act.connect(self.dfu_request) self.thermostat_ctrl_menu.save_cfg_act.connect(self.save_cfg_request) self.thermostat_ctrl_menu.load_cfg_act.connect(self.load_cfg_request) self.thermostat_ctrl_menu.net_cfg_act.connect(self.net_settings_request) self.thermostat.hw_rev_update.connect(self.thermostat_ctrl_menu.hw_rev) self.thermostat_settings.setMenu(self.thermostat_ctrl_menu) self.loading_spinner.hide() if args.connect: if args.IP: self.host_set_line.setText(args.IP) if args.PORT: self.port_set_spin.setValue(int(args.PORT)) self.connect_btn.click() def clear_graphs(self): self.channel_graphs.clear_graphs() async def _on_connection_changed(self, result): match result: case ThermostatConnectionState.CONNECTED: self.graph_group.setEnabled(True) self.report_group.setEnabled(True) self.thermostat_settings.setEnabled(True) self.conn_menu.host_set_line.setEnabled(False) self.conn_menu.port_set_spin.setEnabled(False) self.connect_btn.setText("Disconnect") self._status(self.hw_rev_data) case ThermostatConnectionState.CONNECTING: self.status_lbl.setText("Connecting...") self.connect_btn.setText("Stop") self.conn_menu.host_set_line.setEnabled(False) self.conn_menu.port_set_spin.setEnabled(False) case ThermostatConnectionState.DISCONNECTED: self.graph_group.setEnabled(False) self.report_group.setEnabled(False) self.thermostat_settings.setEnabled(False) self.conn_menu.host_set_line.setEnabled(True) self.conn_menu.port_set_spin.setEnabled(True) self.connect_btn.setText("Connect") self.status_lbl.setText("Disconnected") self.background_task_lbl.setText("Ready.") self.loading_spinner.hide() self.loading_spinner.stop() self.thermostat_ctrl_menu.fan_pwm_warning.setPixmap(QtGui.QPixmap()) self.thermostat_ctrl_menu.fan_pwm_warning.setToolTip("") self.clear_graphs() self.report_box.setChecked(False) for ch in range(self.NUM_CHANNELS): if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF: if self.thermostat.connection_errored: # Don't send any commands, just reset local state self.autotuners.autotuners[ch].setOff() else: await self.autotuners.stop_pid_from_running(ch) def _status(self, hw_rev_d: dict): self.status_lbl.setText( f"Connected to Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}" ) @pyqtSlot("QVariantMap") def fan_update(self, fan_settings): logging.debug(fan_settings) if fan_settings is None: return with QSignalBlocker(self.thermostat_ctrl_menu.fan_power_slider): self.thermostat_ctrl_menu.fan_power_slider.setValue( fan_settings["fan_pwm"] or 100 # 0 = PWM off = full strength ) with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box): self.thermostat_ctrl_menu.fan_auto_box.setChecked(fan_settings["auto_mode"]) if not self.hw_rev_data["settings"]["fan_pwm_recommended"]: self.thermostat_ctrl_menu.set_fan_pwm_warning() @asyncSlot(int) async def on_report_box_stateChanged(self, enabled): await self.thermostat.set_report_mode(enabled) @asyncClose async def closeEvent(self, _event): try: await self.bail() except: pass @asyncSlot() async def on_connect_btn_clicked(self): if (self._connecting_task is None) and (not self.thermostat.connected()): await self._on_connection_changed(ThermostatConnectionState.CONNECTING) self._connecting_task = asyncio.create_task( self.thermostat.start_session( host=self.conn_menu.host_set_line.text(), port=self.conn_menu.port_set_spin.value(), ) ) try: self.hw_rev_data = await self._connecting_task except (OSError, asyncio.CancelledError) as exc: await self.bail() if isinstance(exc, asyncio.CancelledError): return raise finally: self._connecting_task = None await self._on_connection_changed(ThermostatConnectionState.CONNECTED) elif self._connecting_task is not None: self._connecting_task.cancel() else: await self.bail() @asyncSlot() async def bail(self): await self._on_connection_changed(ThermostatConnectionState.DISCONNECTED) await self.thermostat.end_session() @asyncSlot(object, object) async def send_command(self, param, changes): """Translates parameter tree changes into thermostat set_param calls""" ch = param.channel for inner_param, change, data in changes: if change == "value": if inner_param.opts.get("param", None) is not None: if inner_param.opts.get("suffix", None) == "mA": data /= 1000 # Given in mA thermostat_param = inner_param.opts["param"] if thermostat_param[1] == "ch": thermostat_param[1] = ch if inner_param.name() == "Postfilter Rate" and data is None: set_param_args = (*thermostat_param[:2], "off") else: set_param_args = (*thermostat_param, data) param.child(*param.childPath(inner_param)).setOpts(lock=True) await self.thermostat.set_param(*set_param_args) param.child(*param.childPath(inner_param)).setOpts(lock=False) if inner_param.opts.get("pid_autotune", None) is not None: auto_tuner_param = inner_param.opts["pid_autotune"][0] if inner_param.opts["pid_autotune"][1] != "ch": ch = inner_param.opts["pid_autotune"][1] self.autotuners.set_params(auto_tuner_param, ch, data) if inner_param.opts.get("activaters", None) is not None: activater = inner_param.opts["activaters"][ inner_param.opts["limits"].index(data) ] if activater is not None: if activater[1] == "ch": activater[1] = ch await self.thermostat.set_param(*activater) @asyncSlot() async def pid_auto_tune_request(self, ch=0): match self.autotuners.get_state(ch): case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED: self.autotuners.load_params_and_set_ready(ch) case ( PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN ): await self.autotuners.stop_pid_from_running(ch) # To Update the UI elements self.pid_autotune_handler([]) @asyncSlot(list) async def pid_autotune_handler(self, _): ch_tuning = [] for ch in range(self.NUM_CHANNELS): match self.autotuners.get_state(ch): case PIDAutotuneState.STATE_OFF: self.ctrl_panel_view.change_params_title( ch, ("PID Config", "PID Auto Tune", "Run"), "Run" ) case ( PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN ): self.ctrl_panel_view.change_params_title( ch, ("PID Config", "PID Auto Tune", "Run"), "Stop" ) ch_tuning.append(ch) case PIDAutotuneState.STATE_SUCCEEDED: self.info_box.display_info_box( "PID Autotune Success", f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.", ) self.info_box.show() case PIDAutotuneState.STATE_FAILED: self.info_box.display_info_box( "PID Autotune Failed", f"Channel {ch} PID Autotune has failed.", ) self.info_box.show() if len(ch_tuning) == 0: self.background_task_lbl.setText("Ready.") self.loading_spinner.hide() self.loading_spinner.stop() else: self.background_task_lbl.setText( "Autotuning channel {ch}...".format(ch=ch_tuning) ) self.loading_spinner.start() self.loading_spinner.show() @asyncSlot(int) async def fan_set_request(self, value): assert self.thermostat.connected() if self.thermostat_ctrl_menu.fan_auto_box.isChecked(): with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box): self.thermostat_ctrl_menu.fan_auto_box.setChecked(False) await self.thermostat.set_fan(value) if not self.hw_rev_data["settings"]["fan_pwm_recommended"]: self.thermostat_ctrl_menu.set_fan_pwm_warning() @asyncSlot(int) async def fan_auto_set_request(self, enabled): assert self.thermostat.connected() if enabled: await self.thermostat.set_fan("auto") self.fan_update(await self.thermostat.get_fan()) else: await self.thermostat.set_fan( self.thermostat_ctrl_menu.fan_power_slider.value() ) @asyncSlot(int) async def save_cfg_request(self, ch): assert self.thermostat.connected() await self.thermostat.save_cfg(str(ch)) @asyncSlot(int) async def load_cfg_request(self, ch): assert self.thermostat.connected() await self.thermostat.load_cfg(str(ch)) @asyncSlot(bool) async def dfu_request(self, _): assert self.thermostat.connected() await self._on_connection_changed(ThermostatConnectionState.DISCONNECTED) await self.thermostat.dfu() @asyncSlot(bool) async def reset_request(self, _): assert self.thermostat.connected() await self._on_connection_changed(ThermostatConnectionState.DISCONNECTED) await self.thermostat.reset() await asyncio.sleep(0.1) # Wait for the reset to start self.connect_btn.click() # Reconnect @asyncSlot(bool) async def net_settings_request(self, _): assert self.thermostat.connected() ipv4 = await self.thermostat.get_ipv4() self.net_settings_input_diag = NetSettingsInputDiag(ipv4["addr"]) self.net_settings_input_diag.set_ipv4_act.connect(self.set_net_settings_request) @asyncSlot(str) async def set_net_settings_request(self, ipv4_settings): assert self.thermostat.connected() await self.thermostat.set_ipv4(ipv4_settings) await self.bail() async def coro_main(): args = get_argparser().parse_args() if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app_quit_event = asyncio.Event() app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(app_quit_event.set) app.setWindowIcon( QtGui.QIcon( str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico")) ) ) main_window = MainWindow(args) main_window.show() await app_quit_event.wait() def main(): qasync.run(coro_main()) if __name__ == "__main__": main()