from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot import pyqtgraph.parametertree.parameterTypes as pTypes from pyqtgraph.parametertree import ( Parameter, registerParameterType, ) class MutexParameter(pTypes.ListParameter): """ Mutually exclusive parameter where only one of its children is visible at a time, list selectable. The ordering of the list items determines which children will be visible. """ def __init__(self, **opts): super().__init__(**opts) self.sigValueChanged.connect(self.show_chosen_child) self.sigValueChanged.emit(self, self.opts["value"]) def _get_param_from_value(self, value): if isinstance(self.opts["limits"], dict): values_list = list(self.opts["limits"].values()) else: values_list = self.opts["limits"] return self.children()[values_list.index(value)] @pyqtSlot(object, object) def show_chosen_child(self, value): for param in self.children(): param.hide() child_to_show = self._get_param_from_value(value.value()) child_to_show.show() if child_to_show.opts.get("triggerOnShow", None): child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value()) registerParameterType("mutex", MutexParameter) class CtrlPanel(QObject): set_zero_limits_warning_sig = pyqtSignal(list) def __init__( self, trees_ui, param_tree, sigTreeStateChanged_handle, sigActivated_handles, parent=None, ): super().__init__(parent) self.trees_ui = trees_ui self.NUM_CHANNELS = len(trees_ui) self.THERMOSTAT_PARAMETERS = [param_tree for i in range(self.NUM_CHANNELS)] self.params = [ Parameter.create( name=f"Thermostat Channel {ch} Parameters", type="group", value=ch, children=self.THERMOSTAT_PARAMETERS[ch], ) for ch in range(self.NUM_CHANNELS) ] for i, param in enumerate(self.params): param.channel = i for i, tree in enumerate(self.trees_ui): tree.setHeaderHidden(True) tree.setParameters(self.params[i], showTop=False) self.params[i].setValue = self._setValue self.params[i].sigTreeStateChanged.connect(sigTreeStateChanged_handle) for handle in sigActivated_handles[i]: self.params[i].child(*handle[0]).sigActivated.connect(handle[1]) def _setValue(self, value, blockSignal=None): """ Implement 'lock' mechanism for Parameter Type Modified from the source """ try: if blockSignal is not None: self.sigValueChanged.disconnect(blockSignal) value = self._interpretValue(value) if fn.eq(self.opts["value"], value): return value if "lock" in self.opts.keys(): if self.opts["lock"]: return value self.opts["value"] = value self.sigValueChanged.emit( self, value ) # value might change after signal is received by tree item finally: if blockSignal is not None: self.sigValueChanged.connect(blockSignal) return self.opts["value"] def change_params_title(self, channel, path, title): self.params[channel].child(*path).setOpts(title=title) @pyqtSlot("QVariantList") def update_pid(self, pid_settings): for settings in pid_settings: channel = settings["channel"] with QSignalBlocker(self.params[channel]): self.params[channel].child("PID Config", "Kp").setValue( settings["parameters"]["kp"] ) self.params[channel].child("PID Config", "Ki").setValue( settings["parameters"]["ki"] ) self.params[channel].child("PID Config", "Kd").setValue( settings["parameters"]["kd"] ) self.params[channel].child( "PID Config", "PID Output Clamping", "Minimum" ).setValue(settings["parameters"]["output_min"] * 1000) self.params[channel].child( "PID Config", "PID Output Clamping", "Maximum" ).setValue(settings["parameters"]["output_max"] * 1000) self.params[channel].child( "Output Config", "Control Method", "Set Temperature" ).setValue(settings["target"]) @pyqtSlot("QVariantList") def update_report(self, report_data): for settings in report_data: channel = settings["channel"] with QSignalBlocker(self.params[channel]): self.params[channel].child("Output Config", "Control Method").setValue( "Temperature PID" if settings["pid_engaged"] else "Constant Current" ) self.params[channel].child( "Output Config", "Control Method", "Set Current" ).setValue(settings["i_set"] * 1000) if settings["temperature"] is not None: self.params[channel].child("Temperature").setValue( settings["temperature"] ) if settings["tec_i"] is not None: self.params[channel].child("Current through TEC").setValue( settings["tec_i"] * 1000 ) @pyqtSlot("QVariantList") def update_thermistor(self, sh_data): for sh_param in sh_data: channel = sh_param["channel"] with QSignalBlocker(self.params[channel]): self.params[channel].child("Thermistor Config", "T₀").setValue( sh_param["params"]["t0"] - 273.15 ) self.params[channel].child("Thermistor Config", "R₀").setValue( sh_param["params"]["r0"] ) self.params[channel].child("Thermistor Config", "B").setValue( sh_param["params"]["b"] ) @pyqtSlot("QVariantList") def update_pwm(self, pwm_data): channels_zeroed_limits = [set() for i in range(self.NUM_CHANNELS)] for pwm_params in pwm_data: channel = pwm_params["channel"] with QSignalBlocker(self.params[channel]): self.params[channel].child( "Output Config", "Limits", "Max Voltage Difference" ).setValue(pwm_params["max_v"]["value"]) self.params[channel].child( "Output Config", "Limits", "Max Cooling Current" ).setValue(pwm_params["max_i_pos"]["value"] * 1000) self.params[channel].child( "Output Config", "Limits", "Max Heating Current" ).setValue(pwm_params["max_i_neg"]["value"] * 1000) for limit in "max_i_pos", "max_i_neg", "max_v": if pwm_params[limit]["value"] == 0.0: channels_zeroed_limits[channel].add(limit) self.set_zero_limits_warning_sig.emit(channels_zeroed_limits) @pyqtSlot("QVariantList") def update_postfilter(self, postfilter_data): for postfilter_params in postfilter_data: channel = postfilter_params["channel"] with QSignalBlocker(self.params[channel]): self.params[channel].child( "Thermistor Config", "Postfilter Rate" ).setValue(postfilter_params["rate"])