PyThermostat GUI: Implement Control Panel

Co-authored-by: linuswck <linuswck@m-labs.hk>
Co-authored-by: Egor Savkin <es@m-labs.hk>
This commit is contained in:
atse 2024-11-04 13:09:01 +08:00
parent d948d1d0a0
commit 0d58d70607
3 changed files with 665 additions and 2 deletions

View File

@ -0,0 +1,306 @@
from functools import partial
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
import pyqtgraph.parametertree.parameterTypes as pTypes
from pyqtgraph.parametertree import (
Parameter,
registerParameterType,
)
from qasync import asyncSlot
from autotune import PIDAutotuneState
class MutexParameter(pTypes.ListParameter):
"""
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
The ordering of the list items determines which children will be visible.
"""
def __init__(self, **opts):
super().__init__(**opts)
self.sigValueChanged.connect(self.show_chosen_child)
self.sigValueChanged.emit(self, self.opts["value"])
def _get_param_from_value(self, value):
if isinstance(self.opts["limits"], dict):
values_list = list(self.opts["limits"].values())
else:
values_list = self.opts["limits"]
return self.children()[values_list.index(value)]
@pyqtSlot(object, object)
def show_chosen_child(self, value):
for param in self.children():
param.hide()
child_to_show = self._get_param_from_value(value.value())
child_to_show.show()
if child_to_show.opts.get("triggerOnShow", None):
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
registerParameterType("mutex", MutexParameter)
class CtrlPanel(QObject):
def __init__(
self,
thermostat,
autotuners,
info_box,
trees_ui,
param_tree,
parent=None,
):
super().__init__(parent)
self.thermostat = thermostat
self.autotuners = autotuners
self.info_box = info_box
self.trees_ui = trees_ui
self.NUM_CHANNELS = len(trees_ui)
self.THERMOSTAT_PARAMETERS = [param_tree for i in range(self.NUM_CHANNELS)]
self.params = [
Parameter.create(
name=f"Thermostat Channel {ch} Parameters",
type="group",
value=ch,
children=self.THERMOSTAT_PARAMETERS[ch],
)
for ch in range(self.NUM_CHANNELS)
]
for i, param in enumerate(self.params):
param.channel = i
for i, tree in enumerate(self.trees_ui):
tree.setHeaderHidden(True)
tree.setParameters(self.params[i], showTop=False)
self.params[i].setValue = self._setValue
self.params[i].sigTreeStateChanged.connect(self.send_command)
self.params[i].child("Save to flash").sigActivated.connect(
partial(self.save_settings, i)
)
self.params[i].child("Load from flash").sigActivated.connect(
partial(self.load_settings, i)
)
self.params[i].child(
"PID Config", "PID Auto Tune", "Run"
).sigActivated.connect(partial(self.pid_auto_tune_request, i))
self.thermostat.pid_update.connect(self.update_pid)
self.thermostat.report_update.connect(self.update_report)
self.thermostat.thermistor_update.connect(self.update_thermistor)
self.thermostat.output_update.connect(self.update_output)
self.thermostat.postfilter_update.connect(self.update_postfilter)
self.autotuners.autotune_state_changed.connect(self.update_pid_autotune)
def _setValue(self, value, blockSignal=None):
"""
Implement 'lock' mechanism for Parameter Type
Modified from the source
"""
try:
if blockSignal is not None:
self.sigValueChanged.disconnect(blockSignal)
value = self._interpretValue(value)
if fn.eq(self.opts["value"], value):
return value
if "lock" in self.opts.keys():
if self.opts["lock"]:
return value
self.opts["value"] = value
self.sigValueChanged.emit(
self, value
) # value might change after signal is received by tree item
finally:
if blockSignal is not None:
self.sigValueChanged.connect(blockSignal)
return self.opts["value"]
def change_params_title(self, channel, path, title):
self.params[channel].child(*path).setOpts(title=title)
@asyncSlot(object, object)
async def send_command(self, param, changes):
"""Translates parameter tree changes into thermostat set_param calls"""
ch = param.channel
for inner_param, change, data in changes:
if change == "value":
new_value = data
if "thermostat:set_param" in inner_param.opts:
if inner_param.opts.get("suffix", None) == "mA":
new_value /= 1000 # Given in mA
thermostat_param = inner_param.opts["thermostat:set_param"]
# Handle thermostat command irregularities
match inner_param.name(), new_value:
case "Postfilter Rate", None:
thermostat_param = thermostat_param.copy()
thermostat_param["field"] = "off"
new_value = ""
case "Control Method", "Constant Current":
return
case "Control Method", "Temperature PID":
new_value = ""
inner_param.setOpts(lock=True)
await self.thermostat.set_param(
channel=ch, value=new_value, **thermostat_param
)
inner_param.setOpts(lock=False)
if "pid_autotune" in inner_param.opts:
auto_tuner_param = inner_param.opts["pid_autotune"]
self.autotuners.set_params(auto_tuner_param, ch, new_value)
@pyqtSlot(list)
def update_pid(self, pid_settings):
for settings in pid_settings:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("PID Config", "Kp").setValue(
settings["parameters"]["kp"]
)
self.params[channel].child("PID Config", "Ki").setValue(
settings["parameters"]["ki"]
)
self.params[channel].child("PID Config", "Kd").setValue(
settings["parameters"]["kd"]
)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Minimum"
).setValue(settings["parameters"]["output_min"] * 1000)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Maximum"
).setValue(settings["parameters"]["output_max"] * 1000)
self.params[channel].child(
"Output Config", "Control Method", "Set Temperature"
).setValue(settings["target"])
@pyqtSlot(list)
def update_report(self, report_data):
for settings in report_data:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Output Config", "Control Method").setValue(
"Temperature PID" if settings["pid_engaged"] else "Constant Current"
)
self.params[channel].child(
"Output Config", "Control Method", "Set Current"
).setValue(settings["i_set"] * 1000)
if settings["temperature"] is not None:
self.params[channel].child("Temperature").setValue(
settings["temperature"]
)
if settings["tec_i"] is not None:
self.params[channel].child("Current through TEC").setValue(
settings["tec_i"] * 1000
)
@pyqtSlot(list)
def update_thermistor(self, sh_data):
for sh_param in sh_data:
channel = sh_param["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Thermistor Config", "T₀").setValue(
sh_param["params"]["t0"] - 273.15
)
self.params[channel].child("Thermistor Config", "R₀").setValue(
sh_param["params"]["r0"]
)
self.params[channel].child("Thermistor Config", "B").setValue(
sh_param["params"]["b"]
)
@pyqtSlot(list)
def update_output(self, output_data):
for output_params in output_data:
channel = output_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Output Config", "Limits", "Max Voltage Difference"
).setValue(output_params["max_v"])
self.params[channel].child(
"Output Config", "Limits", "Max Cooling Current"
).setValue(output_params["max_i_pos"] * 1000)
self.params[channel].child(
"Output Config", "Limits", "Max Heating Current"
).setValue(output_params["max_i_neg"] * 1000)
@pyqtSlot(list)
def update_postfilter(self, postfilter_data):
for postfilter_params in postfilter_data:
channel = postfilter_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Thermistor Config", "Postfilter Rate"
).setValue(postfilter_params["rate"])
def update_pid_autotune(self, ch, state):
match state:
case PIDAutotuneState.STATE_OFF:
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
)
case PIDAutotuneState.STATE_SUCCEEDED:
self.info_box.display_info_box(
"PID Autotune Success",
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
)
case PIDAutotuneState.STATE_FAILED:
self.info_box.display_info_box(
"PID Autotune Failed",
f"Channel {ch} PID Autotune has failed.",
)
@asyncSlot(int)
async def load_settings(self, ch):
await self.thermostat.load_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings loaded",
f"Channel {ch} settings has been loaded from flash.",
)
@asyncSlot(int)
async def save_settings(self, ch):
await self.thermostat.save_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings saved",
f"Channel {ch} settings has been saved to flash.\n"
"It will be loaded on Thermostat reset, or when settings are explicitly loaded.",
)
@asyncSlot()
async def pid_auto_tune_request(self, ch=0):
match self.autotuners.get_state(ch):
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
self.autotuners.load_params_and_set_ready(ch)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
await self.autotuners.stop_pid_from_running(ch)

View File

@ -0,0 +1,336 @@
{
"ctrl_panel": [
{
"name": "Temperature",
"type": "float",
"format": "{value:.4f} °C",
"readonly": true
},
{
"name": "Current through TEC",
"type": "float",
"suffix": "mA",
"decimals": 6,
"readonly": true
},
{
"name": "Output Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "Control Method",
"type": "mutex",
"limits": [
"Constant Current",
"Temperature PID"
],
"value": "Constant Current",
"thermostat:set_param": {
"topic": "output",
"field": "pid"
},
"children": [
{
"name": "Set Current",
"type": "float",
"value": 0,
"step": 100,
"limits": [
-2000,
2000
],
"triggerOnShow": true,
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "i_set"
},
"lock": false
},
{
"name": "Set Temperature",
"type": "float",
"value": 25,
"step": 0.1,
"limits": [
-273,
300
],
"format": "{value:.4f} °C",
"thermostat:set_param": {
"topic": "pid",
"field": "target"
},
"lock": false
}
]
},
{
"name": "Limits",
"expanded": true,
"type": "group",
"children": [
{
"name": "Max Cooling Current",
"type": "float",
"value": 0,
"step": 100,
"decimals": 6,
"limits": [
0,
2000
],
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "max_i_pos"
},
"lock": false
},
{
"name": "Max Heating Current",
"type": "float",
"value": 0,
"step": 100,
"decimals": 6,
"limits": [
0,
2000
],
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "max_i_neg"
},
"lock": false
},
{
"name": "Max Voltage Difference",
"type": "float",
"value": 0,
"step": 0.1,
"limits": [
0,
5
],
"siPrefix": true,
"suffix": "V",
"thermostat:set_param": {
"topic": "output",
"field": "max_v"
},
"lock": false
}
]
}
]
},
{
"name": "Thermistor Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "T₀",
"type": "float",
"value": 25,
"step": 0.1,
"limits": [
-100,
100
],
"format": "{value:.4f} °C",
"thermostat:set_param": {
"topic": "s-h",
"field": "t0"
},
"lock": false
},
{
"name": "R₀",
"type": "float",
"value": 10000,
"step": 1,
"siPrefix": true,
"suffix": "Ω",
"thermostat:set_param": {
"topic": "s-h",
"field": "r0"
},
"lock": false
},
{
"name": "B",
"type": "float",
"value": 3950,
"step": 1,
"suffix": "K",
"decimals": 4,
"thermostat:set_param": {
"topic": "s-h",
"field": "b"
},
"lock": false
},
{
"name": "Postfilter Rate",
"type": "list",
"value": 16.67,
"thermostat:set_param": {
"topic": "postfilter",
"field": "rate"
},
"limits": {
"Off": null,
"16.67 Hz": 16.67,
"20 Hz": 20.0,
"21.25 Hz": 21.25,
"27 Hz": 27.0
},
"lock": false
}
]
},
{
"name": "PID Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "Kp",
"type": "float",
"step": 0.1,
"suffix": "",
"thermostat:set_param": {
"topic": "pid",
"field": "kp"
},
"lock": false
},
{
"name": "Ki",
"type": "float",
"step": 0.1,
"suffix": "Hz",
"thermostat:set_param": {
"topic": "pid",
"field": "ki"
},
"lock": false
},
{
"name": "Kd",
"type": "float",
"step": 0.1,
"suffix": "s",
"thermostat:set_param": {
"topic": "pid",
"field": "kd"
},
"lock": false
},
{
"name": "PID Output Clamping",
"expanded": true,
"type": "group",
"children": [
{
"name": "Minimum",
"type": "float",
"step": 100,
"limits": [
-2000,
2000
],
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "pid",
"field": "output_min"
},
"lock": false
},
{
"name": "Maximum",
"type": "float",
"step": 100,
"limits": [
-2000,
2000
],
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "pid",
"field": "output_max"
},
"lock": false
}
]
},
{
"name": "PID Auto Tune",
"expanded": false,
"type": "group",
"children": [
{
"name": "Target Temperature",
"type": "float",
"value": 20,
"step": 0.1,
"format": "{value:.4f} °C",
"pid_autotune": "target_temp"
},
{
"name": "Test Current",
"type": "float",
"value": 0,
"decimals": 6,
"step": 100,
"limits": [
-2000,
2000
],
"suffix": "mA",
"pid_autotune": "test_current"
},
{
"name": "Temperature Swing",
"type": "float",
"value": 1.5,
"step": 0.1,
"prefix": "±",
"format": "{value:.4f} °C",
"pid_autotune": "temp_swing"
},
{
"name": "Lookback",
"type": "float",
"value": 3.0,
"step": 0.1,
"format": "{value:.4f} s",
"pid_autotune": "lookback"
},
{
"name": "Run",
"type": "action",
"tip": "Run"
}
]
}
]
},
{
"name": "Save to flash",
"type": "action",
"tip": "Save config to thermostat, applies on reset"
},
{
"name": "Load from flash",
"type": "action",
"tip": "Load config from flash"
}
]
}

View File

@ -4,6 +4,7 @@ import asyncio
import logging
import argparse
import importlib.resources
import json
from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import pyqtSlot
import qasync
@ -12,6 +13,7 @@ from pythermostat.autotune import PIDAutotuneState
from pythermostat.gui.model.thermostat import Thermostat, ThermostatConnectionState
from pythermostat.gui.model.pid_autotuner import PIDAutoTuner
from pythermostat.gui.view.connection_details_menu import ConnectionDetailsMenu
from pythermostat.gui.view.ctrl_panel import CtrlPanel
from pythermostat.gui.view.info_box import InfoBox
from pythermostat.gui.view.live_plot_view import LiveDataPlotter
from pythermostat.gui.view.plot_options_menu import PlotOptionsMenu
@ -37,6 +39,12 @@ def get_argparser():
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
parser.add_argument(
"-p",
"--param_tree",
default=importlib.resources.files("pythermostat.gui.view").joinpath("param_tree.json"),
help="Param Tree Description JSON File",
)
return parser
@ -44,7 +52,7 @@ def get_argparser():
class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2
def __init__(self):
def __init__(self, args):
super().__init__()
ui_file_path = importlib.resources.files("pythermostat.gui.view").joinpath("MainWindow.ui")
@ -80,6 +88,19 @@ class MainWindow(QtWidgets.QMainWindow):
self._thermostat.connection_error.connect(handle_connection_error)
# Control Panel
def get_ctrl_panel_config(args):
with open(args.param_tree, "r", encoding="utf-8") as f:
return json.load(f)["ctrl_panel"]
self._ctrl_panel_view = CtrlPanel(
self._thermostat,
self._autotuners,
self._info_box,
[self.ch0_tree, self.ch1_tree],
get_ctrl_panel_config(args),
)
# Graphs
self._channel_graphs = LiveDataPlotter(
self._thermostat,
@ -212,7 +233,7 @@ async def coro_main():
)
)
main_window = MainWindow()
main_window = MainWindow(args)
main_window.show()
if args.connect: