pytec GUI: Implement Control Panel

Co-authored-by: linuswck <linuswck@m-labs.hk>
Co-authored-by: Egor Savkin <es@m-labs.hk>
This commit is contained in:
atse 2024-11-04 13:09:01 +08:00
parent 3648cab1f8
commit c5ad9a355e
3 changed files with 665 additions and 2 deletions

View File

@ -0,0 +1,306 @@
from functools import partial
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
import pyqtgraph.parametertree.parameterTypes as pTypes
from pyqtgraph.parametertree import (
Parameter,
registerParameterType,
)
from qasync import asyncSlot
from autotune import PIDAutotuneState
class MutexParameter(pTypes.ListParameter):
"""
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
The ordering of the list items determines which children will be visible.
"""
def __init__(self, **opts):
super().__init__(**opts)
self.sigValueChanged.connect(self.show_chosen_child)
self.sigValueChanged.emit(self, self.opts["value"])
def _get_param_from_value(self, value):
if isinstance(self.opts["limits"], dict):
values_list = list(self.opts["limits"].values())
else:
values_list = self.opts["limits"]
return self.children()[values_list.index(value)]
@pyqtSlot(object, object)
def show_chosen_child(self, value):
for param in self.children():
param.hide()
child_to_show = self._get_param_from_value(value.value())
child_to_show.show()
if child_to_show.opts.get("triggerOnShow", None):
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
registerParameterType("mutex", MutexParameter)
class CtrlPanel(QObject):
def __init__(
self,
thermostat,
autotuners,
info_box,
trees_ui,
param_tree,
parent=None,
):
super().__init__(parent)
self.thermostat = thermostat
self.autotuners = autotuners
self.info_box = info_box
self.trees_ui = trees_ui
self.NUM_CHANNELS = len(trees_ui)
self.THERMOSTAT_PARAMETERS = [param_tree for i in range(self.NUM_CHANNELS)]
self.params = [
Parameter.create(
name=f"Thermostat Channel {ch} Parameters",
type="group",
value=ch,
children=self.THERMOSTAT_PARAMETERS[ch],
)
for ch in range(self.NUM_CHANNELS)
]
for i, param in enumerate(self.params):
param.channel = i
for i, tree in enumerate(self.trees_ui):
tree.setHeaderHidden(True)
tree.setParameters(self.params[i], showTop=False)
self.params[i].setValue = self._setValue
self.params[i].sigTreeStateChanged.connect(self.send_command)
self.params[i].child("Save to flash").sigActivated.connect(
partial(self.save_settings, i)
)
self.params[i].child("Load from flash").sigActivated.connect(
partial(self.load_settings, i)
)
self.params[i].child(
"PID Config", "PID Auto Tune", "Run"
).sigActivated.connect(partial(self.pid_auto_tune_request, i))
self.thermostat.pid_update.connect(self.update_pid)
self.thermostat.report_update.connect(self.update_report)
self.thermostat.thermistor_update.connect(self.update_thermistor)
self.thermostat.output_update.connect(self.update_output)
self.thermostat.postfilter_update.connect(self.update_postfilter)
self.autotuners.autotune_state_changed.connect(self.update_pid_autotune)
def _setValue(self, value, blockSignal=None):
"""
Implement 'lock' mechanism for Parameter Type
Modified from the source
"""
try:
if blockSignal is not None:
self.sigValueChanged.disconnect(blockSignal)
value = self._interpretValue(value)
if fn.eq(self.opts["value"], value):
return value
if "lock" in self.opts.keys():
if self.opts["lock"]:
return value
self.opts["value"] = value
self.sigValueChanged.emit(
self, value
) # value might change after signal is received by tree item
finally:
if blockSignal is not None:
self.sigValueChanged.connect(blockSignal)
return self.opts["value"]
def change_params_title(self, channel, path, title):
self.params[channel].child(*path).setOpts(title=title)
@asyncSlot(object, object)
async def send_command(self, param, changes):
"""Translates parameter tree changes into thermostat set_param calls"""
ch = param.channel
for inner_param, change, data in changes:
if change == "value":
new_value = data
if "thermostat:set_param" in inner_param.opts:
if inner_param.opts.get("suffix", None) == "mA":
new_value /= 1000 # Given in mA
thermostat_param = inner_param.opts["thermostat:set_param"]
# Handle thermostat command irregularities
match inner_param.name(), new_value:
case "Postfilter Rate", None:
thermostat_param = thermostat_param.copy()
thermostat_param["field"] = "off"
new_value = ""
case "Control Method", "Constant Current":
return
case "Control Method", "Temperature PID":
new_value = ""
inner_param.setOpts(lock=True)
await self.thermostat.set_param(
channel=ch, value=new_value, **thermostat_param
)
inner_param.setOpts(lock=False)
if "pid_autotune" in inner_param.opts:
auto_tuner_param = inner_param.opts["pid_autotune"]
self.autotuners.set_params(auto_tuner_param, ch, new_value)
@pyqtSlot(list)
def update_pid(self, pid_settings):
for settings in pid_settings:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("PID Config", "Kp").setValue(
settings["parameters"]["kp"]
)
self.params[channel].child("PID Config", "Ki").setValue(
settings["parameters"]["ki"]
)
self.params[channel].child("PID Config", "Kd").setValue(
settings["parameters"]["kd"]
)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Minimum"
).setValue(settings["parameters"]["output_min"] * 1000)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Maximum"
).setValue(settings["parameters"]["output_max"] * 1000)
self.params[channel].child(
"Output Config", "Control Method", "Set Temperature"
).setValue(settings["target"])
@pyqtSlot(list)
def update_report(self, report_data):
for settings in report_data:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Output Config", "Control Method").setValue(
"Temperature PID" if settings["pid_engaged"] else "Constant Current"
)
self.params[channel].child(
"Output Config", "Control Method", "Set Current"
).setValue(settings["i_set"] * 1000)
if settings["temperature"] is not None:
self.params[channel].child("Temperature").setValue(
settings["temperature"]
)
if settings["tec_i"] is not None:
self.params[channel].child("Current through TEC").setValue(
settings["tec_i"] * 1000
)
@pyqtSlot(list)
def update_thermistor(self, sh_data):
for sh_param in sh_data:
channel = sh_param["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Thermistor Config", "T₀").setValue(
sh_param["params"]["t0"] - 273.15
)
self.params[channel].child("Thermistor Config", "R₀").setValue(
sh_param["params"]["r0"]
)
self.params[channel].child("Thermistor Config", "B").setValue(
sh_param["params"]["b"]
)
@pyqtSlot(list)
def update_output(self, output_data):
for output_params in output_data:
channel = output_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Output Config", "Limits", "Max Voltage Difference"
).setValue(output_params["max_v"])
self.params[channel].child(
"Output Config", "Limits", "Max Cooling Current"
).setValue(output_params["max_i_pos"] * 1000)
self.params[channel].child(
"Output Config", "Limits", "Max Heating Current"
).setValue(output_params["max_i_neg"] * 1000)
@pyqtSlot(list)
def update_postfilter(self, postfilter_data):
for postfilter_params in postfilter_data:
channel = postfilter_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Thermistor Config", "Postfilter Rate"
).setValue(postfilter_params["rate"])
def update_pid_autotune(self, ch, state):
match state:
case PIDAutotuneState.STATE_OFF:
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
)
case PIDAutotuneState.STATE_SUCCEEDED:
self.info_box.display_info_box(
"PID Autotune Success",
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
)
case PIDAutotuneState.STATE_FAILED:
self.info_box.display_info_box(
"PID Autotune Failed",
f"Channel {ch} PID Autotune has failed.",
)
@asyncSlot(int)
async def load_settings(self, ch):
await self.thermostat.load_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings loaded",
f"Channel {ch} settings has been loaded from flash.",
)
@asyncSlot(int)
async def save_settings(self, ch):
await self.thermostat.save_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings saved",
f"Channel {ch} settings has been saved to flash.\n"
"It will be loaded on Thermostat reset, or when settings are explicitly loaded.",
)
@asyncSlot()
async def pid_auto_tune_request(self, ch=0):
match self.autotuners.get_state(ch):
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
self.autotuners.load_params_and_set_ready(ch)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
await self.autotuners.stop_pid_from_running(ch)

View File

@ -0,0 +1,336 @@
{
"ctrl_panel": [
{
"name": "Temperature",
"type": "float",
"format": "{value:.4f} °C",
"readonly": true
},
{
"name": "Current through TEC",
"type": "float",
"suffix": "mA",
"decimals": 6,
"readonly": true
},
{
"name": "Output Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "Control Method",
"type": "mutex",
"limits": [
"Constant Current",
"Temperature PID"
],
"value": "Constant Current",
"thermostat:set_param": {
"topic": "output",
"field": "pid"
},
"children": [
{
"name": "Set Current",
"type": "float",
"value": 0,
"step": 100,
"limits": [
-2000,
2000
],
"triggerOnShow": true,
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "i_set"
},
"lock": false
},
{
"name": "Set Temperature",
"type": "float",
"value": 25,
"step": 0.1,
"limits": [
-273,
300
],
"format": "{value:.4f} °C",
"thermostat:set_param": {
"topic": "pid",
"field": "target"
},
"lock": false
}
]
},
{
"name": "Limits",
"expanded": true,
"type": "group",
"children": [
{
"name": "Max Cooling Current",
"type": "float",
"value": 0,
"step": 100,
"decimals": 6,
"limits": [
0,
2000
],
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "max_i_pos"
},
"lock": false
},
{
"name": "Max Heating Current",
"type": "float",
"value": 0,
"step": 100,
"decimals": 6,
"limits": [
0,
2000
],
"suffix": "mA",
"thermostat:set_param": {
"topic": "output",
"field": "max_i_neg"
},
"lock": false
},
{
"name": "Max Voltage Difference",
"type": "float",
"value": 0,
"step": 0.1,
"limits": [
0,
5
],
"siPrefix": true,
"suffix": "V",
"thermostat:set_param": {
"topic": "output",
"field": "max_v"
},
"lock": false
}
]
}
]
},
{
"name": "Thermistor Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "T₀",
"type": "float",
"value": 25,
"step": 0.1,
"limits": [
-100,
100
],
"format": "{value:.4f} °C",
"thermostat:set_param": {
"topic": "s-h",
"field": "t0"
},
"lock": false
},
{
"name": "R₀",
"type": "float",
"value": 10000,
"step": 1,
"siPrefix": true,
"suffix": "Ω",
"thermostat:set_param": {
"topic": "s-h",
"field": "r0"
},
"lock": false
},
{
"name": "B",
"type": "float",
"value": 3950,
"step": 1,
"suffix": "K",
"decimals": 4,
"thermostat:set_param": {
"topic": "s-h",
"field": "b"
},
"lock": false
},
{
"name": "Postfilter Rate",
"type": "list",
"value": 16.67,
"thermostat:set_param": {
"topic": "postfilter",
"field": "rate"
},
"limits": {
"Off": null,
"16.67 Hz": 16.67,
"20 Hz": 20.0,
"21.25 Hz": 21.25,
"27 Hz": 27.0
},
"lock": false
}
]
},
{
"name": "PID Config",
"expanded": true,
"type": "group",
"children": [
{
"name": "Kp",
"type": "float",
"step": 0.1,
"suffix": "",
"thermostat:set_param": {
"topic": "pid",
"field": "kp"
},
"lock": false
},
{
"name": "Ki",
"type": "float",
"step": 0.1,
"suffix": "Hz",
"thermostat:set_param": {
"topic": "pid",
"field": "ki"
},
"lock": false
},
{
"name": "Kd",
"type": "float",
"step": 0.1,
"suffix": "s",
"thermostat:set_param": {
"topic": "pid",
"field": "kd"
},
"lock": false
},
{
"name": "PID Output Clamping",
"expanded": true,
"type": "group",
"children": [
{
"name": "Minimum",
"type": "float",
"step": 100,
"limits": [
-2000,
2000
],
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "pid",
"field": "output_min"
},
"lock": false
},
{
"name": "Maximum",
"type": "float",
"step": 100,
"limits": [
-2000,
2000
],
"decimals": 6,
"suffix": "mA",
"thermostat:set_param": {
"topic": "pid",
"field": "output_max"
},
"lock": false
}
]
},
{
"name": "PID Auto Tune",
"expanded": false,
"type": "group",
"children": [
{
"name": "Target Temperature",
"type": "float",
"value": 20,
"step": 0.1,
"format": "{value:.4f} °C",
"pid_autotune": "target_temp"
},
{
"name": "Test Current",
"type": "float",
"value": 0,
"decimals": 6,
"step": 100,
"limits": [
-2000,
2000
],
"suffix": "mA",
"pid_autotune": "test_current"
},
{
"name": "Temperature Swing",
"type": "float",
"value": 1.5,
"step": 0.1,
"prefix": "±",
"format": "{value:.4f} °C",
"pid_autotune": "temp_swing"
},
{
"name": "Lookback",
"type": "float",
"value": 3.0,
"step": 0.1,
"format": "{value:.4f} s",
"pid_autotune": "lookback"
},
{
"name": "Run",
"type": "action",
"tip": "Run"
}
]
}
]
},
{
"name": "Save to flash",
"type": "action",
"tip": "Save config to thermostat, applies on reset"
},
{
"name": "Load from flash",
"type": "action",
"tip": "Load config from flash"
}
]
}

View File

@ -4,6 +4,7 @@ import asyncio
import logging import logging
import argparse import argparse
import importlib.resources import importlib.resources
import json
from PyQt6 import QtWidgets, QtGui, uic from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import pyqtSlot from PyQt6.QtCore import pyqtSlot
import qasync import qasync
@ -16,6 +17,7 @@ from pytec.gui.view.thermostat_settings_menu import ThermostatSettingsMenu
from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu
from pytec.gui.view.plot_options_menu import PlotOptionsMenu from pytec.gui.view.plot_options_menu import PlotOptionsMenu
from pytec.gui.view.live_plot_view import LiveDataPlotter from pytec.gui.view.live_plot_view import LiveDataPlotter
from pytec.gui.view.ctrl_panel import CtrlPanel
from pytec.gui.view.info_box import InfoBox from pytec.gui.view.info_box import InfoBox
@ -37,6 +39,12 @@ def get_argparser():
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level", help="Set the logging level",
) )
parser.add_argument(
"-p",
"--param_tree",
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
help="Param Tree Description JSON File",
)
return parser return parser
@ -44,7 +52,7 @@ def get_argparser():
class MainWindow(QtWidgets.QMainWindow): class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2 NUM_CHANNELS = 2
def __init__(self): def __init__(self, args):
super().__init__() super().__init__()
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui") ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
@ -80,6 +88,19 @@ class MainWindow(QtWidgets.QMainWindow):
self._thermostat.connection_error.connect(handle_connection_error) self._thermostat.connection_error.connect(handle_connection_error)
# Control Panel
def get_ctrl_panel_config(args):
with open(args.param_tree, "r", encoding="utf-8") as f:
return json.load(f)["ctrl_panel"]
self._ctrl_panel_view = CtrlPanel(
self._thermostat,
self._autotuners,
self._info_box,
[self.ch0_tree, self.ch1_tree],
get_ctrl_panel_config(args),
)
# Graphs # Graphs
self._channel_graphs = LiveDataPlotter( self._channel_graphs = LiveDataPlotter(
self._thermostat, self._thermostat,
@ -212,7 +233,7 @@ async def coro_main():
) )
) )
main_window = MainWindow() main_window = MainWindow(args)
main_window.show() main_window.show()
if args.connect: if args.connect: