from PyQt6 import QtWidgets from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot from pyqtgraph import PlotWidget from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType import pyqtgraph as pg import sys import argparse import logging import asyncio from pytec.aioclient import Client import qasync from qasync import asyncSlot, asyncClose # pyuic6 -x tec_qt.ui -o ui_tec_qt.py from ui_tec_qt import Ui_MainWindow def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") parser.add_argument("--connect", default=None, action="store_true", help="Automatically connect to the specified Thermostat in IP:port format") parser.add_argument('IP', metavar="ip", default=None, nargs='?') parser.add_argument('PORT', metavar="port", default=None, nargs='?') parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help="Set the logging level") return parser class ClientWatcher(QObject): fan_update = pyqtSignal(object) pwm_update = pyqtSignal(object) report_update = pyqtSignal(object) pid_update = pyqtSignal(object) def __init__(self, parent, client, update_s): self.update_s = update_s self.running = True self.client = client super().__init__(parent) async def run(self): loop = asyncio.get_running_loop() while self.running: time = loop.time() await self.update_params() await asyncio.sleep(self.update_s - (loop.time() - time)) async def update_params(self): self.fan_update.emit(await self.client.fan()) @pyqtSlot() def stop_watching(self): self.running = False @pyqtSlot() def set_update_s(self, update_s): self.update_s = update_s class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): def __init__(self, args): super().__init__() self.setupUi(self) self.connect_btn.clicked.connect(self.connect) self.fan_power_slider.valueChanged.connect(self.fan_set) self.fan_auto_box.stateChanged.connect(self.fan_auto_set) self.tec_client: Client = None self.client_watcher: ClientWatcher = None self.client_watcher_task = None if args.connect: if args.IP: self.ip_set_line.setText(args.IP) if args.PORT: self.port_set_spin.setValue(int(args.PORT)) self.connect_btn.click() def _on_connection_changed(self, result): self.graph_group.setEnabled(result) self.hw_rev_lbl.setEnabled(result) self.fan_group.setEnabled(result) self.report_group.setEnabled(result) self.ip_set_line.setEnabled(not result) self.port_set_spin.setEnabled(not result) self.status_lbl.setText("Connected" if result else "Disconnected") self.connect_btn.setText("Disconnect" if result else "Connect") if not result: self.hw_rev_lbl.setText("Thermostat vX.Y") self.fan_group.setStyleSheet("") def _hw_rev(self, hw_rev_d: dict): logging.debug(hw_rev_d) self.hw_rev_lbl.setText(f"Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}") self.fan_group.setEnabled(hw_rev_d["settings"]["fan_available"]) if hw_rev_d["settings"]["fan_pwm_recommended"]: self.fan_group.setStyleSheet("") self.fan_group.setToolTip("") else: self.fan_group.setStyleSheet("background-color: yellow") self.fan_group.setToolTip("fan_pwm not recommended on this hardware revision") def fan_update(self, fan_settings): logging.debug(fan_settings) if fan_settings is None: return with QSignalBlocker(self.fan_power_slider): self.fan_power_slider.setValue(fan_settings["fan_pwm"]) self.fan_power_slider.setEnabled(not fan_settings["auto_mode"]) with QSignalBlocker(self.fan_auto_box): self.fan_auto_box.setChecked(fan_settings["auto_mode"]) @asyncSlot() async def fan_set(self): if self.tec_client is None or self.fan_auto_box.isChecked(): return await self.tec_client.set_param("fan", self.fan_power_slider.value()) @asyncSlot(int) async def fan_auto_set(self, enabled): if self.tec_client is None: return self.fan_power_slider.setEnabled(not enabled) if enabled: await self.tec_client.set_param("fan", "auto") else: await self.tec_client.set_param("fan", self.fan_power_slider.value()) @asyncSlot() async def connect(self): ip, port = self.ip_set_line.text(), self.port_set_spin.value() try: if self.tec_client is None: self.status_lbl.setText("Connecting...") self.tec_client = Client() await self.tec_client.connect(host=ip, port=port, timeout=30) self._on_connection_changed(True) self._hw_rev(await self.tec_client.hw_rev()) # self.fan_update(await self.tec_client.fan()) self.client_watcher = ClientWatcher(self.main_widget, self.tec_client, self.report_refresh_spin.value()) self.client_watcher.fan_update.connect(self.fan_update) self.report_apply_btn.clicked.connect( lambda: self.client_watcher.set_update_s(self.report_refresh_spin.value()) ) QtWidgets.QApplication.instance().aboutToQuit.connect(self.client_watcher.stop_watching) self.client_watcher_task = asyncio.create_task(self.client_watcher.run()) else: await self.tec_client.disconnect() self.tec_client = None self._on_connection_changed(False) self.client_watcher.stop_watching() self.client_watcher = None await self.client_watcher_task self.client_watcher_task = None except Exception as e: logging.error(f"Failed communicating to the {ip}:{port}: {e}") self._on_connection_changed(False) async def coro_main(): args = get_argparser().parse_args() if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app_quit_event = asyncio.Event() app = QtWidgets.QApplication.instance() app.aboutToQuit.connect(app_quit_event.set) main_window = MainWindow(args) main_window.show() await app_quit_event.wait() def main(): qasync.run(coro_main()) if __name__ == '__main__': main()