forked from M-Labs/thermostat
480 lines
18 KiB
Python
480 lines
18 KiB
Python
from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
|
|
from pytec.gui.view.net_settings_input_diag import NetSettingsInputDiag
|
|
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
|
|
from pytec.gui.view.conn_menu import ConnMenu
|
|
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
|
|
from pytec.gui.view.live_plot_view import LiveDataPlotter
|
|
from pytec.gui.view.ctrl_panel import CtrlPanel
|
|
from pytec.gui.view.info_box import InfoBox
|
|
from pytec.gui.model.pid_autotuner import PIDAutoTuner
|
|
from pytec.gui.model.thermostat import Thermostat
|
|
from pytec.aioclient import Client
|
|
import json
|
|
from autotune import PIDAutotuneState
|
|
from qasync import asyncSlot, asyncClose
|
|
import qasync
|
|
from pytec.aioclient import StoppedConnecting
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from PyQt6 import QtWidgets, QtGui, uic
|
|
from PyQt6.QtCore import QSignalBlocker, pyqtSlot
|
|
import pyqtgraph as pg
|
|
from functools import partial
|
|
import importlib.resources
|
|
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
|
|
|
|
parser.add_argument(
|
|
"--connect",
|
|
default=None,
|
|
action="store_true",
|
|
help="Automatically connect to the specified Thermostat in IP:port format",
|
|
)
|
|
parser.add_argument("IP", metavar="ip", default=None, nargs="?")
|
|
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log",
|
|
dest="logLevel",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
help="Set the logging level",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--param_tree",
|
|
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
|
|
help="Param Tree Description JSON File",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
NUM_CHANNELS = 2
|
|
|
|
def __init__(self, args):
|
|
super(MainWindow, self).__init__()
|
|
|
|
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self.hw_rev_data = None
|
|
self.info_box = InfoBox()
|
|
|
|
self.client = Client()
|
|
|
|
self.thermostat = Thermostat(
|
|
self, self.client, self.report_refresh_spin.value()
|
|
)
|
|
|
|
def handle_connection_error():
|
|
self.info_box.display_info_box(
|
|
"Connection Error", "Thermostat connection lost. Is it unplugged?"
|
|
)
|
|
|
|
self.bail()
|
|
|
|
self.thermostat.connection_error.connect(handle_connection_error)
|
|
|
|
self.client.connection_error.connect(self.thermostat.timed_out)
|
|
self.client.connection_error.connect(self.bail)
|
|
|
|
self.autotuners = PIDAutoTuner(self, self.client, 2)
|
|
|
|
def get_ctrl_panel_config(args):
|
|
with open(args.param_tree, "r") as f:
|
|
return json.load(f)["ctrl_panel"]
|
|
|
|
param_tree_sigActivated_handles = [
|
|
[
|
|
[["save"], partial(self.thermostat.save_cfg, ch)],
|
|
[["load"], partial(self.thermostat.load_cfg, ch)],
|
|
[
|
|
["pid", "pid_autotune", "run_pid"],
|
|
partial(self.pid_auto_tune_request, ch),
|
|
],
|
|
]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
self.thermostat.info_box_trigger.connect(self.info_box.display_info_box)
|
|
|
|
self.ctrl_panel_view = CtrlPanel(
|
|
[self.ch0_tree, self.ch1_tree],
|
|
get_ctrl_panel_config(args),
|
|
self.send_command,
|
|
param_tree_sigActivated_handles,
|
|
)
|
|
|
|
self.zero_limits_warning = ZeroLimitsWarningView(
|
|
self.style(), self.limits_warning
|
|
)
|
|
self.ctrl_panel_view.set_zero_limits_warning_sig.connect(
|
|
self.zero_limits_warning.set_limits_warning
|
|
)
|
|
|
|
self.thermostat.fan_update.connect(self.fan_update)
|
|
self.thermostat.report_update.connect(self.ctrl_panel_view.update_report)
|
|
self.thermostat.report_update.connect(self.autotuners.tick)
|
|
self.thermostat.report_update.connect(self.pid_autotune_handler)
|
|
self.thermostat.pid_update.connect(self.ctrl_panel_view.update_pid)
|
|
self.thermostat.pwm_update.connect(self.ctrl_panel_view.update_pwm)
|
|
self.thermostat.thermistor_update.connect(
|
|
self.ctrl_panel_view.update_thermistor
|
|
)
|
|
self.thermostat.postfilter_update.connect(
|
|
self.ctrl_panel_view.update_postfilter
|
|
)
|
|
self.thermostat.interval_update.connect(
|
|
self.autotuners.update_sampling_interval
|
|
)
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
|
|
self.channel_graphs = LiveDataPlotter(
|
|
[
|
|
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
)
|
|
|
|
self.thermostat.report_update.connect(self.channel_graphs.update_report)
|
|
self.thermostat.pid_update.connect(self.channel_graphs.update_pid)
|
|
|
|
self.plot_options_menu = PlotOptionsMenu()
|
|
self.plot_options_menu.clear.triggered.connect(self.clear_graphs)
|
|
self.plot_options_menu.samples_spinbox.valueChanged.connect(
|
|
self.channel_graphs.set_max_samples
|
|
)
|
|
self.plot_settings.setMenu(self.plot_options_menu)
|
|
|
|
self.conn_menu = ConnMenu()
|
|
self.connect_btn.setMenu(self.conn_menu)
|
|
|
|
self.thermostat_ctrl_menu = ThermostatCtrlMenu(self.style())
|
|
self.thermostat_ctrl_menu.fan_set_act.connect(self.fan_set_request)
|
|
self.thermostat_ctrl_menu.fan_auto_set_act.connect(self.fan_auto_set_request)
|
|
self.thermostat_ctrl_menu.reset_act.connect(self.reset_request)
|
|
self.thermostat_ctrl_menu.dfu_act.connect(self.dfu_request)
|
|
self.thermostat_ctrl_menu.save_cfg_act.connect(self.save_cfg_request)
|
|
self.thermostat_ctrl_menu.load_cfg_act.connect(self.load_cfg_request)
|
|
self.thermostat_ctrl_menu.net_cfg_act.connect(self.net_settings_request)
|
|
|
|
self.thermostat.hw_rev_update.connect(self.thermostat_ctrl_menu.hw_rev)
|
|
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
|
|
|
|
self.loading_spinner.hide()
|
|
|
|
if args.connect:
|
|
if args.IP:
|
|
self.host_set_line.setText(args.IP)
|
|
if args.PORT:
|
|
self.port_set_spin.setValue(int(args.PORT))
|
|
self.connect_btn.click()
|
|
|
|
def clear_graphs(self):
|
|
self.channel_graphs.clear_graphs()
|
|
|
|
async def _on_connection_changed(self, result):
|
|
self.graph_group.setEnabled(result)
|
|
self.report_group.setEnabled(result)
|
|
self.thermostat_settings.setEnabled(result)
|
|
|
|
self.conn_menu.host_set_line.setEnabled(not result)
|
|
self.conn_menu.port_set_spin.setEnabled(not result)
|
|
self.connect_btn.setText("Disconnect" if result else "Connect")
|
|
if result:
|
|
self.hw_rev_data = await self.thermostat.get_hw_rev()
|
|
logging.debug(self.hw_rev_data)
|
|
|
|
self._status(self.hw_rev_data)
|
|
self.thermostat.start_watching()
|
|
else:
|
|
self.status_lbl.setText("Disconnected")
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setPixmap(QtGui.QPixmap())
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setToolTip("")
|
|
self.clear_graphs()
|
|
self.report_box.setChecked(False)
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
|
if self.thermostat.connection_errored:
|
|
# Don't send any commands, just reset local state
|
|
self.autotuners.autotuners[ch].setOff()
|
|
else:
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
await self.thermostat.set_report_mode(False)
|
|
self.thermostat.stop_watching()
|
|
|
|
def _status(self, hw_rev_d: dict):
|
|
logging.debug(hw_rev_d)
|
|
self.status_lbl.setText(
|
|
f"Connected to Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}"
|
|
)
|
|
|
|
@pyqtSlot("QVariantMap")
|
|
def fan_update(self, fan_settings):
|
|
logging.debug(fan_settings)
|
|
if fan_settings is None:
|
|
return
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_power_slider):
|
|
self.thermostat_ctrl_menu.fan_power_slider.setValue(
|
|
fan_settings["fan_pwm"] or 100 # 0 = PWM off = full strength
|
|
)
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box):
|
|
self.thermostat_ctrl_menu.fan_auto_box.setChecked(fan_settings["auto_mode"])
|
|
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
|
|
self.thermostat_ctrl_menu.set_fan_pwm_warning()
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self.thermostat.set_report_mode(enabled)
|
|
|
|
@asyncClose
|
|
async def closeEvent(self, event):
|
|
try:
|
|
await self.bail()
|
|
except:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def on_connect_btn_clicked(self):
|
|
host, port = (
|
|
self.conn_menu.host_set_line.text(),
|
|
self.conn_menu.port_set_spin.value(),
|
|
)
|
|
try:
|
|
if not (self.client.connecting() or self.client.connected()):
|
|
self.status_lbl.setText("Connecting...")
|
|
self.connect_btn.setText("Stop")
|
|
self.conn_menu.host_set_line.setEnabled(False)
|
|
self.conn_menu.port_set_spin.setEnabled(False)
|
|
|
|
try:
|
|
await self.client.start_session(host=host, port=port, timeout=5)
|
|
except StoppedConnecting:
|
|
return
|
|
await self._on_connection_changed(True)
|
|
else:
|
|
await self.bail()
|
|
|
|
# TODO: Remove asyncio.TimeoutError in Python 3.11
|
|
except (OSError, asyncio.TimeoutError):
|
|
try:
|
|
await self.bail()
|
|
except ConnectionResetError:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def bail(self):
|
|
await self._on_connection_changed(False)
|
|
await self.client.end_session()
|
|
|
|
@asyncSlot(object, object)
|
|
async def send_command(self, param, changes):
|
|
"""Translates parameter tree changes into thermostat set_param calls"""
|
|
ch = param.channel
|
|
|
|
for inner_param, change, data in changes:
|
|
if change == "value":
|
|
if inner_param.opts.get("param", None) is not None:
|
|
if inner_param.opts.get("title", None).endswith(" (mA)"):
|
|
data /= 1000 # Given in mA
|
|
|
|
thermostat_param = inner_param.opts["param"]
|
|
if thermostat_param[1] == "$ch":
|
|
thermostat_param[1] = ch
|
|
|
|
if inner_param.name() == "rate" and data is None:
|
|
set_param_args = (*thermostat_param[:2], "off")
|
|
else:
|
|
set_param_args = (*thermostat_param, data)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=True)
|
|
await self.client.set_param(*set_param_args)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=False)
|
|
|
|
if inner_param.opts.get("pid_autotune", None) is not None:
|
|
auto_tuner_param = inner_param.opts["pid_autotune"][0]
|
|
if inner_param.opts["pid_autotune"][1] != "$ch":
|
|
ch = inner_param.opts["pid_autotune"][1]
|
|
self.autotuners.set_params(auto_tuner_param, ch, data)
|
|
|
|
if change == "auto":
|
|
is_auto = data
|
|
if is_auto:
|
|
if "auto_param" in inner_param.opts:
|
|
thermostat_param = inner_param.opts["auto_param"]
|
|
else:
|
|
thermostat_param = [
|
|
*inner_param.opts["param"],
|
|
inner_param.value() / 1000,
|
|
]
|
|
|
|
if thermostat_param[1] == "$ch":
|
|
thermostat_param[1] = ch
|
|
|
|
inner_param.setOpts(lock=True)
|
|
await self.client.set_param(*thermostat_param)
|
|
inner_param.setOpts(lock=False)
|
|
|
|
@asyncSlot()
|
|
async def pid_auto_tune_request(self, ch=0):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
|
|
self.autotuners.load_params_and_set_ready(ch)
|
|
|
|
case (
|
|
PIDAutotuneState.STATE_READY
|
|
| PIDAutotuneState.STATE_RELAY_STEP_UP
|
|
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
|
|
):
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
# To Update the UI elements
|
|
self.pid_autotune_handler([])
|
|
|
|
@asyncSlot(list)
|
|
async def pid_autotune_handler(self, _):
|
|
ch_tuning = []
|
|
for ch in range(self.NUM_CHANNELS):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF:
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("pid", "pid_autotune", "run_pid"), "Run"
|
|
)
|
|
case (
|
|
PIDAutotuneState.STATE_READY
|
|
| PIDAutotuneState.STATE_RELAY_STEP_UP
|
|
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
|
|
):
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("pid", "pid_autotune", "run_pid"), "Stop"
|
|
)
|
|
ch_tuning.append(ch)
|
|
|
|
case PIDAutotuneState.STATE_SUCCEEDED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Success",
|
|
f"Channel {ch} PID Settings has been loaded to Thermostat. Regulating temperature.",
|
|
)
|
|
self.info_box.show()
|
|
|
|
case PIDAutotuneState.STATE_FAILED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Failed",
|
|
f"Channel {ch} PID Autotune has failed.",
|
|
)
|
|
self.info_box.show()
|
|
|
|
if len(ch_tuning) == 0:
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
else:
|
|
self.background_task_lbl.setText(
|
|
"Autotuning channel {ch}...".format(ch=ch_tuning)
|
|
)
|
|
self.loading_spinner.start()
|
|
self.loading_spinner.show()
|
|
|
|
@asyncSlot(int)
|
|
async def fan_set_request(self, value):
|
|
assert self.client.connected()
|
|
|
|
if self.thermostat_ctrl_menu.fan_auto_box.isChecked():
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box):
|
|
self.thermostat_ctrl_menu.fan_auto_box.setChecked(False)
|
|
await self.client.set_fan(value)
|
|
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
|
|
self.thermostat_ctrl_menu.set_fan_pwm_warning()
|
|
|
|
@asyncSlot(int)
|
|
async def fan_auto_set_request(self, enabled):
|
|
assert self.client.connected()
|
|
|
|
if enabled:
|
|
await self.client.set_fan("auto")
|
|
self.fan_update(await self.client.get_fan())
|
|
else:
|
|
await self.client.set_fan(
|
|
self.thermostat_ctrl_menu.fan_power_slider.value()
|
|
)
|
|
|
|
@asyncSlot(int)
|
|
async def save_cfg_request(self, ch):
|
|
assert self.thermostat.connected()
|
|
|
|
await self.thermostat.save_cfg(str(ch))
|
|
|
|
@asyncSlot(int)
|
|
async def load_cfg_request(self, ch):
|
|
assert self.thermostat.connected()
|
|
|
|
await self.thermostat.load_cfg(str(ch))
|
|
|
|
@asyncSlot(bool)
|
|
async def dfu_request(self, _):
|
|
assert self.thermostat.connected()
|
|
|
|
await self._on_connection_changed(False)
|
|
await self.thermostat.dfu()
|
|
|
|
@asyncSlot(bool)
|
|
async def reset_request(self, _):
|
|
assert self.thermostat.connected()
|
|
|
|
await self._on_connection_changed(False)
|
|
await self.thermostat.reset()
|
|
await asyncio.sleep(0.1) # Wait for the reset to start
|
|
|
|
self.connect_btn.click() # Reconnect
|
|
|
|
@asyncSlot(bool)
|
|
async def net_settings_request(self, _):
|
|
assert self.thermostat.connected()
|
|
|
|
ipv4 = await self.thermostat.get_ipv4()
|
|
self.net_settings_input_diag = NetSettingsInputDiag(ipv4["addr"])
|
|
self.net_settings_input_diag.set_ipv4_act.connect(self.set_net_settings_request)
|
|
|
|
@asyncSlot(str)
|
|
async def set_net_settings_request(self, ipv4_settings):
|
|
assert self.thermostat.connected()
|
|
|
|
await self.thermostat.set_ipv4(ipv4_settings)
|
|
await self.thermostat._client.end_session()
|
|
await self._on_connection_changed(False)
|
|
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
app.setWindowIcon(
|
|
QtGui.QIcon(
|
|
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
|
|
)
|
|
)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|