thermostat/pytec/tec_qt.py

227 lines
8.0 KiB
Python
Raw Normal View History

from PyQt6 import QtWidgets, QtGui
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
from pyqtgraph import PlotWidget
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
import pyqtgraph as pg
import sys
import argparse
import logging
import asyncio
from pytec.aioclient import Client
import qasync
from qasync import asyncSlot, asyncClose
# pyuic6 -x tec_qt.ui -o ui_tec_qt.py
from ui_tec_qt import Ui_MainWindow
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ master")
parser.add_argument("--connect", default=None, action="store_true",
help="Automatically connect to the specified Thermostat in IP:port format")
parser.add_argument('IP', metavar="ip", default=None, nargs='?')
parser.add_argument('PORT', metavar="port", default=None, nargs='?')
parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help="Set the logging level")
return parser
class ClientWatcher(QObject):
2023-07-11 12:27:43 +08:00
fan_update = pyqtSignal(dict)
pwm_update = pyqtSignal(list)
report_update = pyqtSignal(list)
pid_update = pyqtSignal(list)
2023-07-05 16:25:13 +08:00
def __init__(self, parent, client, update_s):
self.update_s = update_s
2023-07-05 16:25:13 +08:00
self.client = client
self.watch_task = None
super().__init__(parent)
async def run(self):
loop = asyncio.get_running_loop()
while True:
time = loop.time()
await self.update_params()
await asyncio.sleep(self.update_s - (loop.time() - time))
async def update_params(self):
2023-07-05 16:25:13 +08:00
self.fan_update.emit(await self.client.fan())
def start_watching(self):
self.watch_task = asyncio.create_task(self.run())
@pyqtSlot()
def stop_watching(self):
2023-07-06 13:00:53 +08:00
self.watch_task.cancel()
2023-07-11 12:27:43 +08:00
@pyqtSlot(float)
def set_update_s(self, update_s):
self.update_s = update_s
class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
def __init__(self, args):
super().__init__()
self.setupUi(self)
self.connect_btn.clicked.connect(self.connect)
self.fan_power_slider.valueChanged.connect(self.fan_set)
self.fan_auto_box.stateChanged.connect(self.fan_auto_set)
self.fan_pwm_recommended = False
2023-07-05 16:25:13 +08:00
self.tec_client: Client = None
2023-07-05 16:02:35 +08:00
self.client_watcher: ClientWatcher = None
if args.connect:
if args.IP:
self.ip_set_line.setText(args.IP)
if args.PORT:
self.port_set_spin.setValue(int(args.PORT))
self.connect_btn.click()
def _on_connection_changed(self, result):
self.graph_group.setEnabled(result)
self.hw_rev_lbl.setEnabled(result)
self.fan_group.setEnabled(result)
self.report_group.setEnabled(result)
self.ip_set_line.setEnabled(not result)
self.port_set_spin.setEnabled(not result)
self.status_lbl.setText("Connected" if result else "Disconnected")
self.connect_btn.setText("Disconnect" if result else "Connect")
if not result:
self.hw_rev_lbl.setText("Thermostat vX.Y")
self.fan_pwm_warning.setPixmap(QtGui.QPixmap())
self.fan_pwm_warning.setToolTip("")
def _set_fan_pwm_warning(self):
if self.fan_power_slider.value() != 100:
pixmapi = getattr(QtWidgets.QStyle.StandardPixmap, "SP_MessageBoxWarning")
icon = self.style().standardIcon(pixmapi)
self.fan_pwm_warning.setPixmap(icon.pixmap(16, 16))
2023-07-07 17:44:03 +08:00
self.fan_pwm_warning.setToolTip("Throttling the fan (not recommended on this hardware rev)")
else:
self.fan_pwm_warning.setPixmap(QtGui.QPixmap())
self.fan_pwm_warning.setToolTip("")
def _hw_rev(self, hw_rev_d: dict):
logging.debug(hw_rev_d)
self.hw_rev_lbl.setText(f"Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}")
self.fan_group.setEnabled(hw_rev_d["settings"]["fan_available"])
self.fan_pwm_recommended = hw_rev_d["settings"]["fan_pwm_recommended"]
2023-07-11 12:27:43 +08:00
@pyqtSlot(dict)
def fan_update(self, fan_settings: dict):
logging.debug(fan_settings)
if fan_settings is None:
return
2023-07-05 13:00:56 +08:00
with QSignalBlocker(self.fan_power_slider):
2023-07-11 11:48:38 +08:00
self.fan_power_slider.setValue(fan_settings["fan_pwm"] or 100) # 0 = PWM off = full strength
self.fan_power_slider.setEnabled(not fan_settings["auto_mode"])
2023-07-05 13:00:56 +08:00
with QSignalBlocker(self.fan_auto_box):
self.fan_auto_box.setChecked(fan_settings["auto_mode"])
if not self.fan_pwm_recommended:
self._set_fan_pwm_warning()
@asyncSlot(int)
async def fan_set(self, value):
2023-07-05 16:25:13 +08:00
if self.tec_client is None or self.fan_auto_box.isChecked():
return
await self.tec_client.set_param("fan", value)
if not self.fan_pwm_recommended:
self._set_fan_pwm_warning()
@asyncSlot(int)
async def fan_auto_set(self, enabled):
2023-07-05 16:25:13 +08:00
if self.tec_client is None:
return
self.fan_power_slider.setEnabled(not enabled)
if enabled:
2023-07-05 16:25:13 +08:00
await self.tec_client.set_param("fan", "auto")
self.fan_update(await self.tec_client.fan())
else:
2023-07-05 16:25:13 +08:00
await self.tec_client.set_param("fan", self.fan_power_slider.value())
@asyncClose
async def closeEvent(self, event):
if self.client_watcher is not None:
self.client_watcher.stop_watching()
2023-07-06 12:42:22 +08:00
if self.tec_client is not None:
await self.tec_client.disconnect()
@asyncSlot()
async def connect(self):
ip, port = self.ip_set_line.text(), self.port_set_spin.value()
try:
2023-07-05 16:25:13 +08:00
if self.tec_client is None:
self.status_lbl.setText("Connecting...")
self.ip_set_line.setEnabled(False)
self.port_set_spin.setEnabled(False)
self.connect_btn.setText("Stop")
2023-07-05 16:25:13 +08:00
self.tec_client = Client()
self.connect_task = asyncio.create_task(self.tec_client.connect(host=ip, port=port, timeout=30))
try:
await self.connect_task
except asyncio.exceptions.CancelledError:
return
self.connect_btn.setEnabled(True)
self._on_connection_changed(True)
2023-07-05 16:25:13 +08:00
self._hw_rev(await self.tec_client.hw_rev())
2023-07-05 16:14:31 +08:00
self.fan_update(await self.tec_client.fan())
self.client_watcher = ClientWatcher(self, self.tec_client, self.report_refresh_spin.value())
2023-07-05 16:02:35 +08:00
self.client_watcher.fan_update.connect(self.fan_update)
self.report_apply_btn.clicked.connect(
2023-07-05 16:02:35 +08:00
lambda: self.client_watcher.set_update_s(self.report_refresh_spin.value())
)
self.client_watcher.start_watching()
2023-07-05 10:24:36 +08:00
else:
if self.client_watcher is None:
self.connect_task.cancel()
self.tec_client = None
self.on_connection_changed(False)
return
else:
self.client_watcher.stop_watching()
self.client_watcher = None
2023-07-05 16:25:13 +08:00
await self.tec_client.disconnect()
self.tec_client = None
2023-07-05 10:24:36 +08:00
self._on_connection_changed(False)
2023-07-11 17:45:57 +08:00
except (OSError, TimeoutError) as e:
2023-07-18 12:43:44 +08:00
logging.error(f"Failed communicating to {ip}:{port}: {e}")
await self.tec_client.disconnect()
self._on_connection_changed(False)
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
main_window = MainWindow(args)
main_window.show()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == '__main__':
main()