forked from M-Labs/thermostat
linuswck
726f1a3657
- Bugs fix: 1. Params Tree user input will not get overwritten by incoming report thermostat_data_model. 2. PID Autotune Sampling Period is now set according to Thermostat sampling interval 3. PID Autotune won't get stuck in Fail State 4. Various types disconnection related Bugs 5. Number of Samples stored in the plot cannot be set 6. Limit the max settable output current to be 2000mA - Improvement: 1. Params Tree settings can be changed with external json 2. Use a Tab system to show a single channel of config instead of two 3. Expose PID Autotune lookback params 4. Icon is changed to Artiq logo - Restructure: 1. Restructure the code to follow Model-View-Delegate Design Pattern
437 lines
16 KiB
Python
437 lines
16 KiB
Python
from view.zero_limits_warning import zero_limits_warning_view
|
|
from view.net_settings_input_diag import net_settings_input_diag
|
|
from view.thermostat_ctrl_menu import thermostat_ctrl_menu
|
|
from view.conn_menu import conn_menu
|
|
from view.plot_options_menu import plot_options_menu
|
|
from view.live_plot_view import LiveDataPlotter
|
|
from view.ctrl_panel import ctrl_panel
|
|
from view.info_box import info_box
|
|
from model.pid_autotuner import PIDAutoTuner
|
|
from model.thermostat_data_model import WrappedClient, Thermostat
|
|
import json
|
|
from autotune import PIDAutotuneState
|
|
from qasync import asyncSlot, asyncClose
|
|
import qasync
|
|
from pytec.aioclient import StoppedConnecting
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from PyQt6 import QtWidgets, QtGui, uic
|
|
from PyQt6.QtCore import QSignalBlocker, pyqtSlot
|
|
import pyqtgraph as pg
|
|
from functools import partial
|
|
import importlib.resources
|
|
|
|
|
|
pg.setConfigOptions(antialias=True)
|
|
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
|
|
|
|
parser.add_argument(
|
|
"--connect",
|
|
default=None,
|
|
action="store_true",
|
|
help="Automatically connect to the specified Thermostat in IP:port format",
|
|
)
|
|
parser.add_argument("IP", metavar="ip", default=None, nargs="?")
|
|
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log",
|
|
dest="logLevel",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
help="Set the logging level",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--param_tree",
|
|
default=importlib.resources.files("view").joinpath("param_tree.json"),
|
|
help="Param Tree Description JSON File",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
NUM_CHANNELS = 2
|
|
|
|
def __init__(self, args):
|
|
super(MainWindow, self).__init__()
|
|
|
|
ui_file_path = importlib.resources.files("view").joinpath("tec_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self.show()
|
|
|
|
self.hw_rev_data = None
|
|
self.info_box = info_box()
|
|
|
|
self.client = WrappedClient(self)
|
|
self.client.connection_error.connect(self.bail)
|
|
|
|
self.thermostat = Thermostat(
|
|
self, self.client, self.report_refresh_spin.value()
|
|
)
|
|
|
|
self.autotuners = PIDAutoTuner(self, self.client, 2)
|
|
|
|
def get_ctrl_panel_config(args):
|
|
with open(args.param_tree, "r") as f:
|
|
return json.load(f)["ctrl_panel"]
|
|
|
|
param_tree_sigActivated_handles = [
|
|
[
|
|
[["Save to flash"], partial(self.thermostat.save_cfg, ch)],
|
|
[["Load from flash"], partial(self.thermostat.load_cfg, ch)],
|
|
[
|
|
["PID Config", "PID Auto Tune", "Run"],
|
|
partial(self.pid_auto_tune_request, ch),
|
|
],
|
|
]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
self.thermostat.info_box_trigger.connect(self.info_box.display_info_box)
|
|
|
|
self.zero_limits_warning = zero_limits_warning_view(
|
|
self.style(), self.limits_warning
|
|
)
|
|
self.ctrl_panel_view = ctrl_panel(
|
|
[self.ch0_tree, self.ch1_tree],
|
|
get_ctrl_panel_config(args),
|
|
self.send_command,
|
|
param_tree_sigActivated_handles,
|
|
)
|
|
self.ctrl_panel_view.set_zero_limits_warning_sig.connect(
|
|
self.zero_limits_warning.set_limits_warning
|
|
)
|
|
|
|
self.thermostat.fan_update.connect(self.fan_update)
|
|
self.thermostat.report_update.connect(self.ctrl_panel_view.update_report)
|
|
self.thermostat.report_update.connect(self.autotuners.tick)
|
|
self.thermostat.report_update.connect(self.pid_autotune_handler)
|
|
self.thermostat.pid_update.connect(self.ctrl_panel_view.update_pid)
|
|
self.thermostat.pwm_update.connect(self.ctrl_panel_view.update_pwm)
|
|
self.thermostat.thermistor_update.connect(
|
|
self.ctrl_panel_view.update_thermistor
|
|
)
|
|
self.thermostat.postfilter_update.connect(
|
|
self.ctrl_panel_view.update_postfilter
|
|
)
|
|
self.thermostat.interval_update.connect(
|
|
self.autotuners.update_sampling_interval
|
|
)
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
|
|
self.channel_graphs = LiveDataPlotter(
|
|
[
|
|
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
)
|
|
|
|
self.thermostat.report_update.connect(self.channel_graphs.update_report)
|
|
self.thermostat.pid_update.connect(self.channel_graphs.update_pid)
|
|
|
|
self.plot_options_menu = plot_options_menu()
|
|
self.plot_options_menu.clear.triggered.connect(self.clear_graphs)
|
|
self.plot_options_menu.samples_spinbox.valueChanged.connect(
|
|
self.channel_graphs.set_max_samples
|
|
)
|
|
self.plot_settings.setMenu(self.plot_options_menu)
|
|
|
|
self.conn_menu = conn_menu()
|
|
self.connect_btn.setMenu(self.conn_menu)
|
|
|
|
self.thermostat_ctrl_menu = thermostat_ctrl_menu(self.style())
|
|
self.thermostat_ctrl_menu.fan_set_act.connect(self.fan_set_request)
|
|
self.thermostat_ctrl_menu.fan_auto_set_act.connect(self.fan_auto_set_request)
|
|
self.thermostat_ctrl_menu.reset_act.connect(self.reset_request)
|
|
self.thermostat_ctrl_menu.dfu_act.connect(self.dfu_request)
|
|
self.thermostat_ctrl_menu.save_cfg_act.connect(self.save_cfg_request)
|
|
self.thermostat_ctrl_menu.load_cfg_act.connect(self.load_cfg_request)
|
|
self.thermostat_ctrl_menu.net_cfg_act.connect(self.net_settings_request)
|
|
|
|
self.thermostat.hw_rev_update.connect(self.thermostat_ctrl_menu.hw_rev)
|
|
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
|
|
|
|
self.loading_spinner.hide()
|
|
|
|
if args.connect:
|
|
if args.IP:
|
|
self.host_set_line.setText(args.IP)
|
|
if args.PORT:
|
|
self.port_set_spin.setValue(int(args.PORT))
|
|
self.connect_btn.click()
|
|
|
|
def clear_graphs(self):
|
|
self.channel_graphs.clear_graphs()
|
|
|
|
async def _on_connection_changed(self, result):
|
|
self.graph_group.setEnabled(result)
|
|
self.report_group.setEnabled(result)
|
|
self.thermostat_settings.setEnabled(result)
|
|
|
|
self.conn_menu.host_set_line.setEnabled(not result)
|
|
self.conn_menu.port_set_spin.setEnabled(not result)
|
|
self.connect_btn.setText("Disconnect" if result else "Connect")
|
|
if result:
|
|
self.hw_rev_data = await self.thermostat.get_hw_rev()
|
|
logging.debug(self.hw_rev_data)
|
|
|
|
self._status(self.hw_rev_data)
|
|
self.thermostat.start_watching()
|
|
else:
|
|
self.status_lbl.setText("Disconnected")
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setPixmap(QtGui.QPixmap())
|
|
self.thermostat_ctrl_menu.fan_pwm_warning.setToolTip("")
|
|
self.clear_graphs()
|
|
self.report_box.setChecked(False)
|
|
if not Thermostat.connecting or Thermostat.connected:
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
await self.thermostat.set_report_mode(False)
|
|
self.thermostat.stop_watching()
|
|
|
|
def _status(self, hw_rev_d: dict):
|
|
logging.debug(hw_rev_d)
|
|
self.status_lbl.setText(
|
|
f"Connected to Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['minor']}"
|
|
)
|
|
|
|
@pyqtSlot("QVariantMap")
|
|
def fan_update(self, fan_settings):
|
|
logging.debug(fan_settings)
|
|
if fan_settings is None:
|
|
return
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_power_slider):
|
|
self.thermostat_ctrl_menu.fan_power_slider.setValue(
|
|
fan_settings["fan_pwm"] or 100
|
|
) # 0 = PWM off = full strength
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box):
|
|
self.thermostat_ctrl_menu.fan_auto_box.setChecked(fan_settings["auto_mode"])
|
|
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
|
|
self.thermostat_ctrl_menu.set_fan_pwm_warning()
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self.thermostat.set_report_mode(enabled)
|
|
|
|
@asyncClose
|
|
async def closeEvent(self, event):
|
|
try:
|
|
await self.bail()
|
|
except:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def on_connect_btn_clicked(self):
|
|
host, port = (
|
|
self.conn_menu.host_set_line.text(),
|
|
self.conn_menu.port_set_spin.value(),
|
|
)
|
|
try:
|
|
if not (self.client.connecting() or self.client.connected()):
|
|
self.status_lbl.setText("Connecting...")
|
|
self.connect_btn.setText("Stop")
|
|
self.conn_menu.host_set_line.setEnabled(False)
|
|
self.conn_menu.port_set_spin.setEnabled(False)
|
|
|
|
try:
|
|
await self.client.start_session(host=host, port=port, timeout=5)
|
|
except StoppedConnecting:
|
|
return
|
|
await self._on_connection_changed(True)
|
|
else:
|
|
await self.bail()
|
|
|
|
# TODO: Remove asyncio.TimeoutError in Python 3.11
|
|
except (OSError, TimeoutError, asyncio.TimeoutError):
|
|
try:
|
|
await self.bail()
|
|
except ConnectionResetError:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def bail(self):
|
|
await self._on_connection_changed(False)
|
|
await self.client.end_session()
|
|
|
|
@asyncSlot(object, object)
|
|
async def send_command(self, param, changes):
|
|
"""Translates parameter tree changes into thermostat set_param calls"""
|
|
ch = param.channel
|
|
|
|
for inner_param, change, data in changes:
|
|
if change == "value":
|
|
if inner_param.opts.get("param", None) is not None:
|
|
if inner_param.opts.get("suffix", None) == "mA":
|
|
data /= 1000 # Given in mA
|
|
|
|
thermostat_param = inner_param.opts["param"]
|
|
if thermostat_param[1] == "ch":
|
|
thermostat_param[1] = ch
|
|
|
|
if inner_param.name() == "Postfilter Rate" and data is None:
|
|
set_param_args = (*thermostat_param[:2], "off")
|
|
else:
|
|
set_param_args = (*thermostat_param, data)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=True)
|
|
await self.client.set_param(*set_param_args)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=False)
|
|
|
|
if inner_param.opts.get("pid_autotune", None) is not None:
|
|
auto_tuner_param = inner_param.opts["pid_autotune"][0]
|
|
if inner_param.opts["pid_autotune"][1] != "ch":
|
|
ch = inner_param.opts["pid_autotune"][1]
|
|
self.autotuners.set_params(auto_tuner_param, ch, data)
|
|
|
|
if inner_param.opts.get("activaters", None) is not None:
|
|
activater = inner_param.opts["activaters"][
|
|
inner_param.opts["limits"].index(data)
|
|
]
|
|
if activater is not None:
|
|
if activater[1] == "ch":
|
|
activater[1] = ch
|
|
await self.client.set_param(*activater)
|
|
|
|
@asyncSlot()
|
|
async def pid_auto_tune_request(self, ch=0):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
|
|
self.autotuners.load_params_and_set_ready(ch)
|
|
|
|
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
# To Update the UI elements
|
|
self.pid_autotune_handler([])
|
|
|
|
@asyncSlot(list)
|
|
async def pid_autotune_handler(self, _):
|
|
ch_tuning = []
|
|
for ch in range(self.NUM_CHANNELS):
|
|
match self.autotuners.get_state(ch):
|
|
case PIDAutotuneState.STATE_OFF:
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
|
|
)
|
|
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
|
|
self.ctrl_panel_view.change_params_title(
|
|
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
|
|
)
|
|
ch_tuning.append(ch)
|
|
|
|
case PIDAutotuneState.STATE_SUCCEEDED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Success",
|
|
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
|
|
)
|
|
self.info_box.show()
|
|
|
|
case PIDAutotuneState.STATE_FAILED:
|
|
self.info_box.display_info_box(
|
|
"PID Autotune Failed", f"Channel {ch} PID Autotune is failed."
|
|
)
|
|
self.info_box.show()
|
|
|
|
if len(ch_tuning) == 0:
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
else:
|
|
self.background_task_lbl.setText(
|
|
"Autotuning channel {ch}...".format(ch=ch_tuning)
|
|
)
|
|
self.loading_spinner.start()
|
|
self.loading_spinner.show()
|
|
|
|
@asyncSlot(int)
|
|
async def fan_set_request(self, value):
|
|
if not self.client.connected():
|
|
return
|
|
if self.thermostat_ctrl_menu.fan_auto_box.isChecked():
|
|
with QSignalBlocker(self.thermostat_ctrl_menu.fan_auto_box):
|
|
self.thermostat_ctrl_menu.fan_auto_box.setChecked(False)
|
|
await self.client.set_fan(value)
|
|
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
|
|
self.thermostat_ctrl_menu.set_fan_pwm_warning()
|
|
|
|
@asyncSlot(int)
|
|
async def fan_auto_set_request(self, enabled):
|
|
if not self.client.connected():
|
|
return
|
|
if enabled:
|
|
await self.client.set_fan("auto")
|
|
self.fan_update(await self.client.get_fan())
|
|
else:
|
|
await self.client.set_fan(
|
|
self.thermostat_ctrl_menu.fan_power_slider.value()
|
|
)
|
|
|
|
@asyncSlot(int)
|
|
async def save_cfg_request(self, ch):
|
|
await self.thermostat.save_cfg(str(ch))
|
|
|
|
@asyncSlot(int)
|
|
async def load_cfg_request(self, ch):
|
|
await self.thermostat.load_cfg(str(ch))
|
|
|
|
@asyncSlot(bool)
|
|
async def dfu_request(self, _):
|
|
await self._on_connection_changed(False)
|
|
await self.thermostat.dfu()
|
|
|
|
@asyncSlot(bool)
|
|
async def reset_request(self, _):
|
|
await self._on_connection_changed(False)
|
|
await self.thermostat.reset()
|
|
await asyncio.sleep(0.1) # Wait for the reset to start
|
|
|
|
self.connect_btn.click() # Reconnect
|
|
|
|
@asyncSlot(bool)
|
|
async def net_settings_request(self, _):
|
|
ipv4 = await self.thermostat.get_ipv4()
|
|
self.net_settings_input_diag = net_settings_input_diag(ipv4["addr"])
|
|
self.net_settings_input_diag.set_ipv4_act.connect(self.set_net_settings_request)
|
|
|
|
@asyncSlot(str)
|
|
async def set_net_settings_request(self, ipv4_settings):
|
|
await self.thermostat.set_ipv4(ipv4_settings)
|
|
await self.thermostat._client.end_session()
|
|
await self._on_connection_changed(False)
|
|
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
app.setWindowIcon(
|
|
QtGui.QIcon(str(importlib.resources.files("resources").joinpath("artiq.ico")))
|
|
)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|