thermostat/pytec/tec_qt.py

252 lines
8.6 KiB
Python
Raw Normal View History

from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
from pytec.gui.view.conn_menu import ConnMenu
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
from pytec.gui.view.live_plot_view import LiveDataPlotter
from pytec.gui.view.ctrl_panel import CtrlPanel
from pytec.gui.view.info_box import InfoBox
from pytec.gui.model.pid_autotuner import PIDAutoTuner
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
import json
from autotune import PIDAutotuneState
from qasync import asyncSlot, asyncClose
import qasync
import asyncio
import logging
import argparse
from PyQt6 import QtWidgets, QtGui, uic
2024-08-29 12:11:49 +08:00
from PyQt6.QtCore import pyqtSlot
import importlib.resources
def get_argparser():
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
parser.add_argument(
"--connect",
default=None,
action="store_true",
2024-08-23 15:43:02 +08:00
help="Automatically connect to the specified Thermostat in host:port format",
)
2024-08-23 15:43:02 +08:00
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
parser.add_argument(
"-l",
"--log",
dest="logLevel",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
parser.add_argument(
"-p",
"--param_tree",
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
help="Param Tree Description JSON File",
)
return parser
class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2
2023-08-16 17:35:13 +08:00
def __init__(self, args):
2024-08-28 17:22:59 +08:00
super().__init__()
2023-08-16 17:35:13 +08:00
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
uic.loadUi(ui_file_path, self)
2023-08-16 17:35:13 +08:00
self.info_box = InfoBox()
2023-08-16 17:35:13 +08:00
2024-08-28 17:20:14 +08:00
# Models
self.thermostat = Thermostat(self, self.report_refresh_spin.value())
self._connecting_task = None
2024-09-03 15:53:19 +08:00
self.thermostat.connection_state_update.connect(self._on_connection_changed)
2024-08-28 17:20:14 +08:00
self.autotuners = PIDAutoTuner(self, self.thermostat, 2)
self.autotuners.autotune_state_changed.connect(self.pid_autotune_handler)
# Handlers for disconnections
async def autotune_disconnect():
for ch in range(self.NUM_CHANNELS):
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
await self.autotuners.stop_pid_from_running(ch)
self.thermostat.disconnect_cb = autotune_disconnect
2024-08-29 12:11:49 +08:00
@pyqtSlot()
def handle_connection_error():
self.info_box.display_info_box(
"Connection Error", "Thermostat connection lost. Is it unplugged?"
)
self.thermostat.connection_error.connect(handle_connection_error)
2023-08-16 17:35:13 +08:00
2024-08-28 17:20:14 +08:00
# Control Panel
def get_ctrl_panel_config(args):
2024-08-14 11:54:15 +08:00
with open(args.param_tree, "r", encoding="utf-8") as f:
return json.load(f)["ctrl_panel"]
self.ctrl_panel_view = CtrlPanel(
2024-08-26 13:49:56 +08:00
self.thermostat,
self.autotuners,
self.info_box,
[self.ch0_tree, self.ch1_tree],
get_ctrl_panel_config(args),
)
2024-06-28 10:02:43 +08:00
2024-08-28 17:20:14 +08:00
# Graphs
self.channel_graphs = LiveDataPlotter(
self.thermostat,
[
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
for ch in range(self.NUM_CHANNELS)
]
)
2024-08-28 17:20:14 +08:00
# Bottom bar menus
self.conn_menu = ConnMenu(self.thermostat, self.connect_btn)
self.connect_btn.setMenu(self.conn_menu)
2023-08-16 17:35:13 +08:00
self.thermostat_ctrl_menu = ThermostatCtrlMenu(
self.thermostat, self.info_box, self.style()
)
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
2023-08-16 17:35:13 +08:00
2024-08-28 17:20:14 +08:00
self.plot_options_menu = PlotOptionsMenu(self.channel_graphs)
self.plot_settings.setMenu(self.plot_options_menu)
# Status line
self.zero_limits_warning = ZeroLimitsWarningView(
2024-09-02 16:12:55 +08:00
self.thermostat, self.style(), self.limits_warning
2024-08-28 17:20:14 +08:00
)
2023-08-16 17:35:13 +08:00
self.loading_spinner.hide()
2024-08-28 17:20:14 +08:00
self.report_apply_btn.clicked.connect(
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
)
2024-08-29 11:26:11 +08:00
@asyncClose
async def closeEvent(self, _event):
try:
await self.thermostat.end_session()
self.thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
2024-08-29 11:26:11 +08:00
except:
pass
2024-08-29 12:51:18 +08:00
@pyqtSlot(ThermostatConnectionState)
def _on_connection_changed(self, state):
2024-08-28 10:24:37 +08:00
self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
2024-08-28 10:24:25 +08:00
self.thermostat_settings.setEnabled(
2024-08-28 10:24:37 +08:00
state == ThermostatConnectionState.CONNECTED
2024-08-28 10:24:25 +08:00
)
2024-08-29 11:47:39 +08:00
self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
2024-08-28 10:24:25 +08:00
2024-08-28 10:24:37 +08:00
match state:
case ThermostatConnectionState.CONNECTED:
self.connect_btn.setText("Disconnect")
2024-08-27 18:01:27 +08:00
self.status_lbl.setText(
2024-08-28 16:32:31 +08:00
"Connected to Thermostat v"
f"{self.thermostat.hw_rev['rev']['major']}."
f"{self.thermostat.hw_rev['rev']['minor']}"
2024-08-27 18:01:27 +08:00
)
case ThermostatConnectionState.CONNECTING:
self.connect_btn.setText("Stop")
2024-08-28 10:50:48 +08:00
self.status_lbl.setText("Connecting...")
case ThermostatConnectionState.DISCONNECTED:
self.connect_btn.setText("Connect")
self.status_lbl.setText("Disconnected")
self.report_box.setChecked(False)
2024-08-29 12:51:18 +08:00
@pyqtSlot(int, PIDAutotuneState)
def pid_autotune_handler(self, _ch, _state):
2024-08-29 11:26:11 +08:00
ch_tuning = []
for ch in range(self.NUM_CHANNELS):
if self.autotuners.get_state(ch) in {
PIDAutotuneState.STATE_READY,
PIDAutotuneState.STATE_RELAY_STEP_UP,
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
}:
ch_tuning.append(ch)
2024-08-29 11:26:11 +08:00
if len(ch_tuning) == 0:
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
else:
self.background_task_lbl.setText(
"Autotuning channel {ch}...".format(ch=ch_tuning)
)
self.loading_spinner.start()
self.loading_spinner.show()
@asyncSlot()
async def on_connect_btn_clicked(self):
match self.thermostat.connection_state:
case ThermostatConnectionState.DISCONNECTED:
self._connecting_task = asyncio.create_task(
self.thermostat.start_session(
host=self.conn_menu.host_set_line.text(),
port=self.conn_menu.port_set_spin.value(),
)
)
self.thermostat.connection_state = ThermostatConnectionState.CONNECTING
await self._connecting_task
self._connecting_task = None
self.thermostat.connection_state = ThermostatConnectionState.CONNECTED
self.thermostat.start_watching()
case ThermostatConnectionState.CONNECTING:
self._connecting_task.cancel()
await self._thermostat.end_session()
self._thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
self._connecting_task = None
case ThermostatConnectionState.CONNECTED:
await self.thermostat.end_session()
self.thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
2024-08-29 11:26:11 +08:00
@asyncSlot(int)
async def on_report_box_stateChanged(self, enabled):
await self.thermostat.set_report_mode(enabled)
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
app.setWindowIcon(
2024-07-03 14:40:13 +08:00
QtGui.QIcon(
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
)
)
main_window = MainWindow(args)
main_window.show()
if args.connect:
if args.HOST:
2024-08-27 15:40:59 +08:00
main_window.conn_menu.host_set_line.setText(args.HOST)
if args.PORT:
2024-08-27 15:40:59 +08:00
main_window.conn_menu.port_set_spin.setValue(int(args.PORT))
main_window.connect_btn.click()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == "__main__":
main()