from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu from pytec.gui.view.conn_menu import ConnMenu from pytec.gui.view.plot_options_menu import PlotOptionsMenu from pytec.gui.view.live_plot_view import LiveDataPlotter from pytec.gui.view.ctrl_panel import CtrlPanel from pytec.gui.view.info_box import InfoBox from pytec.gui.model.pid_autotuner import PIDAutoTuner from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState import json from autotune import PIDAutotuneState from qasync import asyncSlot, asyncClose import qasync import asyncio import logging import argparse from PyQt6 import QtWidgets, QtGui, uic from PyQt6.QtCore import pyqtSlot import importlib.resources def get_argparser(): parser = argparse.ArgumentParser(description="Thermostat Control Panel") parser.add_argument( "--connect", default=None, action="store_true", help="Automatically connect to the specified Thermostat in host:port format", ) parser.add_argument("HOST", metavar="host", default=None, nargs="?") parser.add_argument("PORT", metavar="port", default=None, nargs="?") parser.add_argument( "-l", "--log", dest="logLevel", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Set the logging level", ) parser.add_argument( "-p", "--param_tree", default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"), help="Param Tree Description JSON File", ) return parser class MainWindow(QtWidgets.QMainWindow): NUM_CHANNELS = 2 def __init__(self, args): super().__init__() ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui") uic.loadUi(ui_file_path, self) self._info_box = InfoBox() # Models self._thermostat = Thermostat(self, self.report_refresh_spin.value()) self._connecting_task = None self._thermostat.connection_state_update.connect( self._on_connection_state_changed ) self._autotuners = PIDAutoTuner(self, self._thermostat, 2) self._autotuners.autotune_state_changed.connect( self._on_pid_autotune_state_changed ) # Handlers for disconnections async def autotune_disconnect(): for ch in range(self.NUM_CHANNELS): if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF: await self._autotuners.stop_pid_from_running(ch) self._thermostat.disconnect_cb = autotune_disconnect @pyqtSlot() def handle_connection_error(): self._info_box.display_info_box( "Connection Error", "Thermostat connection lost. Is it unplugged?" ) self._thermostat.connection_error.connect(handle_connection_error) # Control Panel def get_ctrl_panel_config(args): with open(args.param_tree, "r", encoding="utf-8") as f: return json.load(f)["ctrl_panel"] self._ctrl_panel_view = CtrlPanel( self._thermostat, self._autotuners, self._info_box, [self.ch0_tree, self.ch1_tree], get_ctrl_panel_config(args), ) # Graphs self._channel_graphs = LiveDataPlotter( self._thermostat, [ [getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")] for ch in range(self.NUM_CHANNELS) ] ) # Bottom bar menus self.conn_menu = ConnMenu(self._thermostat, self.connect_btn) self.connect_btn.setMenu(self.conn_menu) self._thermostat_ctrl_menu = ThermostatCtrlMenu( self._thermostat, self._info_box, self.style() ) self.thermostat_settings.setMenu(self._thermostat_ctrl_menu) self._plot_options_menu = PlotOptionsMenu(self._channel_graphs) self.plot_settings.setMenu(self._plot_options_menu) # Status line self._zero_limits_warning = ZeroLimitsWarningView( self._thermostat, self.style(), self.limits_warning ) self.loading_spinner.hide() self.report_apply_btn.clicked.connect( lambda: self._thermostat.set_update_s(self.report_refresh_spin.value()) ) @asyncClose async def closeEvent(self, _event): try: await self._thermostat.end_session() self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED except: pass @pyqtSlot(ThermostatConnectionState) def _on_connection_state_changed(self, state): self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED) self.thermostat_settings.setEnabled( state == ThermostatConnectionState.CONNECTED ) self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED) match state: case ThermostatConnectionState.CONNECTED: self.connect_btn.setText("Disconnect") self.status_lbl.setText( "Connected to Thermostat v" f"{self._thermostat.hw_rev['rev']['major']}." f"{self._thermostat.hw_rev['rev']['minor']}" ) case ThermostatConnectionState.CONNECTING: self.connect_btn.setText("Stop") self.status_lbl.setText("Connecting...") case ThermostatConnectionState.DISCONNECTED: self.connect_btn.setText("Connect") self.status_lbl.setText("Disconnected") @pyqtSlot(int, PIDAutotuneState) def _on_pid_autotune_state_changed(self, _ch, _state): ch_tuning = [] for ch in range(self.NUM_CHANNELS): if self._autotuners.get_state(ch) in { PIDAutotuneState.STATE_READY, PIDAutotuneState.STATE_RELAY_STEP_UP, PIDAutotuneState.STATE_RELAY_STEP_DOWN, }: ch_tuning.append(ch) if len(ch_tuning) == 0: self.background_task_lbl.setText("Ready.") self.loading_spinner.hide() self.loading_spinner.stop() else: self.background_task_lbl.setText( "Autotuning channel {ch}...".format(ch=ch_tuning) ) self.loading_spinner.start() self.loading_spinner.show() @asyncSlot() async def on_connect_btn_clicked(self): match self._thermostat.connection_state: case ThermostatConnectionState.DISCONNECTED: self._connecting_task = asyncio.create_task( self._thermostat.start_session( host=self.conn_menu.host_set_line.text(), port=self.conn_menu.port_set_spin.value(), ) ) self._thermostat.connection_state = ThermostatConnectionState.CONNECTING await self._connecting_task self._connecting_task = None self._thermostat.connection_state = ThermostatConnectionState.CONNECTED self._thermostat.start_watching() case ThermostatConnectionState.CONNECTING: self._connecting_task.cancel() self._connecting_task = None await self._thermostat.end_session() self._thermostat.connection_state = ( ThermostatConnectionState.DISCONNECTED ) case ThermostatConnectionState.CONNECTED: await self._thermostat.end_session() self._thermostat.connection_state = ( ThermostatConnectionState.DISCONNECTED ) async def coro_main(): args = get_argparser().parse_args() if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app_quit_event = asyncio.Event() app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(app_quit_event.set) app.setWindowIcon( QtGui.QIcon( str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico")) ) ) main_window = MainWindow(args) main_window.show() if args.connect: if args.HOST: main_window.conn_menu.host_set_line.setText(args.HOST) if args.PORT: main_window.conn_menu.port_set_spin.setValue(int(args.PORT)) main_window.connect_btn.click() await app_quit_event.wait() def main(): qasync.run(coro_main()) if __name__ == "__main__": main()