forked from M-Labs/thermostat
329 lines
12 KiB
Python
Executable File
329 lines
12 KiB
Python
Executable File
from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
|
|
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
|
|
from pytec.gui.view.conn_menu import ConnMenu
|
|
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
|
|
from pytec.gui.view.live_plot_view import LiveDataPlotter
|
|
from pytec.gui.view.ctrl_panel import CtrlPanel
|
|
from pytec.gui.view.info_box import InfoBox
|
|
from pytec.gui.model.pid_autotuner import PIDAutoTuner
|
|
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
|
|
import json
|
|
from autotune import PIDAutotuneState
|
|
from qasync import asyncSlot, asyncClose
|
|
import qasync
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from PyQt6 import QtWidgets, QtGui, uic
|
|
from PyQt6.QtCore import QSignalBlocker, pyqtSlot
|
|
import pyqtgraph as pg
|
|
from functools import partial
|
|
import importlib.resources
|
|
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
|
|
|
|
parser.add_argument(
|
|
"--connect",
|
|
default=None,
|
|
action="store_true",
|
|
help="Automatically connect to the specified Thermostat in host:port format",
|
|
)
|
|
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
|
|
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log",
|
|
dest="logLevel",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
help="Set the logging level",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--param_tree",
|
|
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
|
|
help="Param Tree Description JSON File",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
NUM_CHANNELS = 2
|
|
|
|
def __init__(self, args):
|
|
super(MainWindow, self).__init__()
|
|
|
|
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self.info_box = InfoBox()
|
|
|
|
self.thermostat = Thermostat(
|
|
self, self.report_refresh_spin.value()
|
|
)
|
|
self._connecting_task = None
|
|
|
|
def handle_connection_error():
|
|
self.info_box.display_info_box(
|
|
"Connection Error", "Thermostat connection lost. Is it unplugged?"
|
|
)
|
|
|
|
self.thermostat.connection_error.connect(handle_connection_error)
|
|
|
|
self.thermostat.connection_error.connect(self.thermostat.timed_out)
|
|
self.thermostat.connection_error.connect(self.thermostat.end_session)
|
|
|
|
self.thermostat.connection_state_changed.connect(self._on_connection_changed)
|
|
|
|
self.autotuners = PIDAutoTuner(self, self.thermostat, 2)
|
|
|
|
def get_ctrl_panel_config(args):
|
|
with open(args.param_tree, "r", encoding="utf-8") as f:
|
|
return json.load(f)["ctrl_panel"]
|
|
|
|
param_tree_sigActivated_handles = [
|
|
[
|
|
[
|
|
["PID Config", "PID Auto Tune", "Run"],
|
|
partial(self.pid_auto_tune_request, ch),
|
|
],
|
|
]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
|
|
self.ctrl_panel_view = CtrlPanel(
|
|
self.thermostat,
|
|
self.autotuners,
|
|
self.info_box,
|
|
[self.ch0_tree, self.ch1_tree],
|
|
get_ctrl_panel_config(args),
|
|
param_tree_sigActivated_handles,
|
|
)
|
|
|
|
self.zero_limits_warning = ZeroLimitsWarningView(
|
|
self.style(), self.limits_warning
|
|
)
|
|
self.ctrl_panel_view.set_zero_limits_warning_sig.connect(
|
|
self.zero_limits_warning.set_limits_warning
|
|
)
|
|
|
|
self.thermostat.report_update.connect(self.pid_autotune_handler)
|
|
|
|
self.thermostat.hw_rev_update.connect(self._status)
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
|
|
self.channel_graphs = LiveDataPlotter(
|
|
[
|
|
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
)
|
|
|
|
self.thermostat.report_update.connect(self.channel_graphs.update_report)
|
|
self.thermostat.pid_update.connect(self.channel_graphs.update_pid)
|
|
|
|
self.plot_options_menu = PlotOptionsMenu(self.channel_graphs)
|
|
self.plot_settings.setMenu(self.plot_options_menu)
|
|
|
|
self.conn_menu = ConnMenu()
|
|
self.connect_btn.setMenu(self.conn_menu)
|
|
|
|
self.thermostat_ctrl_menu = ThermostatCtrlMenu(self.thermostat, self.style())
|
|
self.thermostat_ctrl_menu.reset_act.connect(self.reset_request)
|
|
|
|
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
|
|
|
|
self.loading_spinner.hide()
|
|
|
|
if args.connect:
|
|
if args.IP:
|
|
self.host_set_line.setText(args.IP)
|
|
if args.PORT:
|
|
self.port_set_spin.setValue(int(args.PORT))
|
|
self.connect_btn.click()
|
|
|
|
@asyncSlot(ThermostatConnectionState)
|
|
async def _on_connection_changed(self, result):
|
|
match result:
|
|
case ThermostatConnectionState.CONNECTED:
|
|
self.graph_group.setEnabled(True)
|
|
self.report_group.setEnabled(True)
|
|
self.thermostat_settings.setEnabled(True)
|
|
|
|
self.conn_menu.host_set_line.setEnabled(False)
|
|
self.conn_menu.port_set_spin.setEnabled(False)
|
|
self.connect_btn.setText("Disconnect")
|
|
|
|
case ThermostatConnectionState.CONNECTING:
|
|
self.status_lbl.setText("Connecting...")
|
|
self.connect_btn.setText("Stop")
|
|
self.conn_menu.host_set_line.setEnabled(False)
|
|
self.conn_menu.port_set_spin.setEnabled(False)
|
|
|
|
case ThermostatConnectionState.DISCONNECTED:
|
|
self.graph_group.setEnabled(False)
|
|
self.report_group.setEnabled(False)
|
|
self.thermostat_settings.setEnabled(False)
|
|
|
|
self.conn_menu.host_set_line.setEnabled(True)
|
|
self.conn_menu.port_set_spin.setEnabled(True)
|
|
self.connect_btn.setText("Connect")
|
|
|
|
self.status_lbl.setText("Disconnected")
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setPixmap(QtGui.QPixmap())
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setToolTip("")
|
|
self.channel_graphs.clear_graphs()
|
|
self.report_box.setChecked(False)
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
|
if self.thermostat.connection_errored:
|
|
# Don't send any commands, just reset local state
|
|
self.autotuners.autotuners[ch].setOff()
|
|
else:
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
|
|
def _status(self, hw_rev_d: dict):
|
|
self.status_lbl.setText(
|
|
f"Connected to Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}"
|
|
)
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self.thermostat.set_report_mode(enabled)
|
|
|
|
@asyncClose
|
|
async def closeEvent(self, _event):
|
|
try:
|
|
await self.thermostat.end_session()
|
|
except:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def on_connect_btn_clicked(self):
|
|
if (self._connecting_task is None) and (not self.thermostat.connected()):
|
|
self._connecting_task = asyncio.create_task(
|
|
self.thermostat.start_session(
|
|
host=self.conn_menu.host_set_line.text(),
|
|
port=self.conn_menu.port_set_spin.value(),
|
|
)
|
|
)
|
|
try:
|
|
await self._connecting_task
|
|
except (OSError, asyncio.CancelledError) as exc:
|
|
await self.thermostat.end_session()
|
|
if isinstance(exc, asyncio.CancelledError):
|
|
return
|
|
raise
|
|
finally:
|
|
self._connecting_task = None
|
|
|
|
elif self._connecting_task is not None:
|
|
self._connecting_task.cancel()
|
|
else:
|
|
await self.thermostat.end_session()
|
|
|
|
@asyncSlot()
|
|
async def pid_auto_tune_request(self, ch=0):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
|
|
self.autotuners.load_params_and_set_ready(ch)
|
|
|
|
case (
|
|
PIDAutotuneState.STATE_READY
|
|
| PIDAutotuneState.STATE_RELAY_STEP_UP
|
|
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
|
|
):
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
# To Update the UI elements
|
|
self.pid_autotune_handler([])
|
|
|
|
@asyncSlot(list)
|
|
async def pid_autotune_handler(self, _):
|
|
ch_tuning = []
|
|
for ch in range(self.NUM_CHANNELS):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF:
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
|
|
)
|
|
case (
|
|
PIDAutotuneState.STATE_READY
|
|
| PIDAutotuneState.STATE_RELAY_STEP_UP
|
|
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
|
|
):
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
|
|
)
|
|
ch_tuning.append(ch)
|
|
|
|
case PIDAutotuneState.STATE_SUCCEEDED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Success",
|
|
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
|
|
)
|
|
self.info_box.show()
|
|
|
|
case PIDAutotuneState.STATE_FAILED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Failed",
|
|
f"Channel {ch} PID Autotune has failed.",
|
|
)
|
|
self.info_box.show()
|
|
|
|
if len(ch_tuning) == 0:
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
else:
|
|
self.background_task_lbl.setText(
|
|
"Autotuning channel {ch}...".format(ch=ch_tuning)
|
|
)
|
|
self.loading_spinner.start()
|
|
self.loading_spinner.show()
|
|
|
|
@asyncSlot(bool)
|
|
async def reset_request(self, _):
|
|
assert self.thermostat.connected()
|
|
|
|
await self.thermostat.reset()
|
|
await self.thermostat.end_session()
|
|
await asyncio.sleep(0.1) # Wait for the reset to start
|
|
|
|
self.connect_btn.click() # Reconnect
|
|
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
app.setWindowIcon(
|
|
QtGui.QIcon(
|
|
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
|
|
)
|
|
)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|