thermostat/pytec/tec_qt.py
atse 3648cab1f8 pytec GUI: Implement PlotSettingsMenu
Co-authored-by: linuswck <linuswck@m-labs.hk>
2024-11-11 12:40:52 +08:00

234 lines
8.1 KiB
Python
Executable File

"""GUI for the Sinara 8451 Thermostat"""
import asyncio
import logging
import argparse
import importlib.resources
from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import pyqtSlot
import qasync
from qasync import asyncSlot, asyncClose
from autotune import PIDAutotuneState
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
from pytec.gui.model.pid_autotuner import PIDAutoTuner
from pytec.gui.view.zero_limits_warning_view import ZeroLimitsWarningView
from pytec.gui.view.thermostat_settings_menu import ThermostatSettingsMenu
from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
from pytec.gui.view.live_plot_view import LiveDataPlotter
from pytec.gui.view.info_box import InfoBox
def get_argparser():
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
parser.add_argument(
"--connect",
default=None,
action="store_true",
help="Automatically connect to the specified Thermostat in host:port format",
)
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
parser.add_argument(
"-l",
"--log",
dest="logLevel",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
return parser
class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2
def __init__(self):
super().__init__()
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
uic.loadUi(ui_file_path, self)
self._info_box = InfoBox()
# Models
self._thermostat = Thermostat(self, self.report_refresh_spin.value())
self._connecting_task = None
self._thermostat.connection_state_update.connect(
self._on_connection_state_changed
)
self._autotuners = PIDAutoTuner(self, self._thermostat, 2)
self._autotuners.autotune_state_changed.connect(
self._on_pid_autotune_state_changed
)
# Handlers for disconnections
async def autotune_disconnect():
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
await self._autotuners.stop_pid_from_running(ch)
self._thermostat.disconnect_cb = autotune_disconnect
@pyqtSlot()
def handle_connection_error():
self._info_box.display_info_box(
"Connection Error", "Thermostat connection lost. Is it unplugged?"
)
self._thermostat.connection_error.connect(handle_connection_error)
# Graphs
self._channel_graphs = LiveDataPlotter(
self._thermostat,
[
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
for ch in range(self.NUM_CHANNELS)
],
)
# Bottom bar menus
self.connection_details_menu = ConnectionDetailsMenu(
self._thermostat, self.connect_btn
)
self.connect_btn.setMenu(self.connection_details_menu)
self._thermostat_settings_menu = ThermostatSettingsMenu(
self._thermostat, self._info_box, self.style()
)
self.thermostat_settings.setMenu(self._thermostat_settings_menu)
self._plot_options_menu = PlotOptionsMenu(self._channel_graphs)
self.plot_settings.setMenu(self._plot_options_menu)
# Status line
self._zero_limits_warning_view = ZeroLimitsWarningView(
self._thermostat, self.style(), self.limits_warning
)
self.loading_spinner.hide()
self.report_apply_btn.clicked.connect(
lambda: self._thermostat.set_update_s(self.report_refresh_spin.value())
)
@asyncClose
async def closeEvent(self, _event):
try:
await self._thermostat.end_session()
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
except:
pass
@pyqtSlot(ThermostatConnectionState)
def _on_connection_state_changed(self, state):
self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
self.thermostat_settings.setEnabled(
state == ThermostatConnectionState.CONNECTED
)
self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
match state:
case ThermostatConnectionState.CONNECTED:
self.connect_btn.setText("Disconnect")
self.status_lbl.setText(
"Connected to Thermostat v"
f"{self._thermostat.hw_rev['rev']['major']}."
f"{self._thermostat.hw_rev['rev']['minor']}"
)
case ThermostatConnectionState.CONNECTING:
self.connect_btn.setText("Stop")
self.status_lbl.setText("Connecting...")
case ThermostatConnectionState.DISCONNECTED:
self.connect_btn.setText("Connect")
self.status_lbl.setText("Disconnected")
@pyqtSlot(int, PIDAutotuneState)
def _on_pid_autotune_state_changed(self, _ch, _state):
autotuning_channels = []
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) in {
PIDAutotuneState.STATE_READY,
PIDAutotuneState.STATE_RELAY_STEP_UP,
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
}:
autotuning_channels.append(ch)
if len(autotuning_channels) == 0:
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
else:
self.background_task_lbl.setText(
f"Autotuning channel {autotuning_channels}..."
)
self.loading_spinner.start()
self.loading_spinner.show()
@asyncSlot()
async def on_connect_btn_clicked(self):
match self._thermostat.connection_state:
case ThermostatConnectionState.DISCONNECTED:
self._connecting_task = asyncio.current_task()
self._thermostat.connection_state = ThermostatConnectionState.CONNECTING
await self._thermostat.start_session(
host=self.connection_details_menu.host_set_line.text(),
port=self.connection_details_menu.port_set_spin.value(),
)
self._connecting_task = None
self._thermostat.connection_state = ThermostatConnectionState.CONNECTED
self._thermostat.start_watching()
case ThermostatConnectionState.CONNECTING:
self._connecting_task.cancel()
self._connecting_task = None
await self._thermostat.end_session()
self._thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
case ThermostatConnectionState.CONNECTED:
await self._thermostat.end_session()
self._thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
app.setWindowIcon(
QtGui.QIcon(
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
)
)
main_window = MainWindow()
main_window.show()
if args.connect:
if args.HOST:
main_window.connection_details_menu.host_set_line.setText(args.HOST)
if args.PORT:
main_window.connection_details_menu.port_set_spin.setValue(int(args.PORT))
main_window.connect_btn.click()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == "__main__":
main()