thermostat/pytec/tec_qt.py

368 lines
13 KiB
Python
Raw Normal View History

from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
from pytec.gui.view.conn_menu import ConnMenu
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
from pytec.gui.view.live_plot_view import LiveDataPlotter
from pytec.gui.view.ctrl_panel import CtrlPanel
from pytec.gui.view.info_box import InfoBox
from pytec.gui.model.pid_autotuner import PIDAutoTuner
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
import json
from autotune import PIDAutotuneState
from qasync import asyncSlot, asyncClose
import qasync
import asyncio
import logging
import argparse
from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import QSignalBlocker, pyqtSlot
import pyqtgraph as pg
from functools import partial
import importlib.resources
def get_argparser():
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
parser.add_argument(
"--connect",
default=None,
action="store_true",
2024-08-23 15:43:02 +08:00
help="Automatically connect to the specified Thermostat in host:port format",
)
2024-08-23 15:43:02 +08:00
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
parser.add_argument(
"-l",
"--log",
dest="logLevel",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
parser.add_argument(
"-p",
"--param_tree",
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
help="Param Tree Description JSON File",
)
return parser
class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2
2023-08-16 17:35:13 +08:00
def __init__(self, args):
2024-08-28 17:22:59 +08:00
super().__init__()
2023-08-16 17:35:13 +08:00
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
uic.loadUi(ui_file_path, self)
2023-08-16 17:35:13 +08:00
self.hw_rev_data = None
self.info_box = InfoBox()
2023-08-16 17:35:13 +08:00
self.thermostat = Thermostat(self, self.report_refresh_spin.value())
self._connecting_task = None
def handle_connection_error():
self.info_box.display_info_box(
"Connection Error", "Thermostat connection lost. Is it unplugged?"
)
self.thermostat.connection_error.connect(handle_connection_error)
2023-08-16 17:35:13 +08:00
self.thermostat.connection_error.connect(self.thermostat.timed_out)
self.thermostat.connection_error.connect(self.thermostat.end_session)
self.thermostat.connection_state_changed.connect(self._on_connection_changed)
self.autotuners = PIDAutoTuner(self, self.thermostat, 2)
def get_ctrl_panel_config(args):
2024-08-14 11:54:15 +08:00
with open(args.param_tree, "r", encoding="utf-8") as f:
return json.load(f)["ctrl_panel"]
param_tree_sigActivated_handles = [
[
[
["PID Config", "PID Auto Tune", "Run"],
partial(self.pid_auto_tune_request, ch),
],
]
for ch in range(self.NUM_CHANNELS)
]
self.thermostat.info_box_trigger.connect(self.info_box.display_info_box)
self.ctrl_panel_view = CtrlPanel(
2024-08-26 13:49:56 +08:00
self.thermostat,
self.autotuners,
[self.ch0_tree, self.ch1_tree],
get_ctrl_panel_config(args),
param_tree_sigActivated_handles,
)
2024-06-28 10:02:43 +08:00
self.zero_limits_warning = ZeroLimitsWarningView(
self.style(), self.limits_warning
)
self.ctrl_panel_view.set_zero_limits_warning_sig.connect(
self.zero_limits_warning.set_limits_warning
)
self.thermostat.fan_update.connect(self.fan_update)
self.thermostat.report_update.connect(self.autotuners.tick)
self.thermostat.report_update.connect(self.pid_autotune_handler)
self.thermostat.interval_update.connect(
self.autotuners.update_sampling_interval
)
self.report_apply_btn.clicked.connect(
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
)
self.channel_graphs = LiveDataPlotter(
[
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
for ch in range(self.NUM_CHANNELS)
]
)
self.thermostat.report_update.connect(self.channel_graphs.update_report)
self.thermostat.pid_update.connect(self.channel_graphs.update_pid)
self.plot_options_menu = PlotOptionsMenu()
self.plot_options_menu.clear.triggered.connect(self.clear_graphs)
self.plot_options_menu.samples_spinbox.valueChanged.connect(
self.channel_graphs.set_max_samples
)
self.plot_settings.setMenu(self.plot_options_menu)
self.conn_menu = ConnMenu()
self.connect_btn.setMenu(self.conn_menu)
2023-08-16 17:35:13 +08:00
2024-08-26 18:01:01 +08:00
self.thermostat_ctrl_menu = ThermostatCtrlMenu(self.thermostat, self.style())
self.thermostat_ctrl_menu.reset_act.connect(self.reset_request)
self.thermostat_ctrl_menu.save_cfg_act.connect(self.save_cfg_request)
self.thermostat_ctrl_menu.load_cfg_act.connect(self.load_cfg_request)
2023-08-16 17:35:13 +08:00
self.thermostat.hw_rev_update.connect(self.thermostat_ctrl_menu.hw_rev)
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
2023-08-16 17:35:13 +08:00
self.loading_spinner.hide()
def clear_graphs(self):
self.channel_graphs.clear_graphs()
@asyncSlot(ThermostatConnectionState)
async def _on_connection_changed(self, result):
match result:
case ThermostatConnectionState.CONNECTED:
self.graph_group.setEnabled(True)
self.report_group.setEnabled(True)
self.thermostat_settings.setEnabled(True)
self.conn_menu.host_set_line.setEnabled(False)
self.conn_menu.port_set_spin.setEnabled(False)
self.connect_btn.setText("Disconnect")
self._status(self.hw_rev_data)
case ThermostatConnectionState.CONNECTING:
self.status_lbl.setText("Connecting...")
self.connect_btn.setText("Stop")
self.conn_menu.host_set_line.setEnabled(False)
self.conn_menu.port_set_spin.setEnabled(False)
case ThermostatConnectionState.DISCONNECTED:
self.graph_group.setEnabled(False)
self.report_group.setEnabled(False)
self.thermostat_settings.setEnabled(False)
self.conn_menu.host_set_line.setEnabled(True)
self.conn_menu.port_set_spin.setEnabled(True)
self.connect_btn.setText("Connect")
self.status_lbl.setText("Disconnected")
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
self.thermostat_ctrl_menu.fan_pwm_warning.setPixmap(QtGui.QPixmap())
self.thermostat_ctrl_menu.fan_pwm_warning.setToolTip("")
self.clear_graphs()
self.report_box.setChecked(False)
for ch in range(self.NUM_CHANNELS):
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
if self.thermostat.connection_errored:
# Don't send any commands, just reset local state
self.autotuners.autotuners[ch].setOff()
else:
await self.autotuners.stop_pid_from_running(ch)
def _status(self, hw_rev_d: dict):
self.status_lbl.setText(
f"Connected to Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}"
)
@pyqtSlot("QVariantMap")
def fan_update(self, fan_settings):
logging.debug(fan_settings)
if fan_settings is None:
return
with QSignalBlocker(self.thermostat_ctrl_menu.fan_power_slider):
self.thermostat_ctrl_menu.fan_power_slider.setValue(
2024-06-20 12:07:19 +08:00
fan_settings["fan_pwm"] or 100 # 0 = PWM off = full strength
)
with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box):
self.thermostat_ctrl_menu.fan_auto_box.setChecked(fan_settings["auto_mode"])
2023-08-16 17:35:13 +08:00
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
self.thermostat_ctrl_menu.set_fan_pwm_warning()
2023-08-16 17:35:13 +08:00
@asyncSlot(int)
async def on_report_box_stateChanged(self, enabled):
await self.thermostat.set_report_mode(enabled)
@asyncClose
2024-08-14 11:54:21 +08:00
async def closeEvent(self, _event):
try:
await self.thermostat.end_session()
except:
pass
@asyncSlot()
async def on_connect_btn_clicked(self):
if (self._connecting_task is None) and (not self.thermostat.connected()):
self._connecting_task = asyncio.create_task(
2024-08-22 16:51:58 +08:00
self.thermostat.start_session(
host=self.conn_menu.host_set_line.text(),
port=self.conn_menu.port_set_spin.value(),
)
)
try:
2024-08-23 16:11:27 +08:00
self.hw_rev_data = await self._connecting_task
2024-08-22 16:51:58 +08:00
except (OSError, asyncio.CancelledError) as exc:
await self.thermostat.end_session()
if isinstance(exc, asyncio.CancelledError):
return
raise
finally:
self._connecting_task = None
elif self._connecting_task is not None:
self._connecting_task.cancel()
else:
await self.thermostat.end_session()
@asyncSlot()
async def pid_auto_tune_request(self, ch=0):
match self.autotuners.get_state(ch):
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
self.autotuners.load_params_and_set_ready(ch)
2024-07-03 14:40:13 +08:00
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
await self.autotuners.stop_pid_from_running(ch)
# To Update the UI elements
self.pid_autotune_handler([])
2023-08-16 17:35:13 +08:00
@asyncSlot(list)
async def pid_autotune_handler(self, _):
ch_tuning = []
for ch in range(self.NUM_CHANNELS):
match self.autotuners.get_state(ch):
case PIDAutotuneState.STATE_OFF:
self.ctrl_panel_view.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
)
2024-07-03 14:40:13 +08:00
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.ctrl_panel_view.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
)
ch_tuning.append(ch)
2023-08-16 17:35:13 +08:00
case PIDAutotuneState.STATE_SUCCEEDED:
self.info_box.display_info_box(
"PID Autotune Success",
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
)
self.info_box.show()
2023-08-16 17:35:13 +08:00
case PIDAutotuneState.STATE_FAILED:
self.info_box.display_info_box(
2024-07-03 14:40:13 +08:00
"PID Autotune Failed",
f"Channel {ch} PID Autotune has failed.",
)
self.info_box.show()
if len(ch_tuning) == 0:
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
else:
self.background_task_lbl.setText(
"Autotuning channel {ch}...".format(ch=ch_tuning)
)
self.loading_spinner.start()
self.loading_spinner.show()
async def save_cfg_request(self, ch):
2024-07-08 11:18:02 +08:00
assert self.thermostat.connected()
await self.thermostat.save_cfg(str(ch))
@asyncSlot(int)
async def load_cfg_request(self, ch):
2024-07-08 11:18:02 +08:00
assert self.thermostat.connected()
await self.thermostat.load_cfg(str(ch))
@asyncSlot(bool)
async def reset_request(self, _):
2024-07-08 11:18:02 +08:00
assert self.thermostat.connected()
await self.thermostat.reset()
await self.thermostat.end_session()
await asyncio.sleep(0.1) # Wait for the reset to start
self.connect_btn.click() # Reconnect
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
app.setWindowIcon(
2024-07-03 14:40:13 +08:00
QtGui.QIcon(
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
)
)
main_window = MainWindow(args)
main_window.show()
if args.connect:
if args.HOST:
main_window.host_set_line.setText(args.HOST)
if args.PORT:
main_window.port_set_spin.setValue(int(args.PORT))
main_window.connect_btn.click()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == "__main__":
main()