diff --git a/pythermostat/pythermostat/autotune.py b/pythermostat/pythermostat/autotune.py index dada469..7272952 100644 --- a/pythermostat/pythermostat/autotune.py +++ b/pythermostat/pythermostat/autotune.py @@ -18,6 +18,7 @@ class PIDAutotuneState(Enum): STATE_RELAY_STEP_DOWN = 'relay step down' STATE_SUCCEEDED = 'succeeded' STATE_FAILED = 'failed' + STATE_READY = "ready" class PIDAutotune: @@ -57,6 +58,21 @@ class PIDAutotune: self._Ku = 0 self._Pu = 0 + def setParam(self, target, step, noiseband, sampletime, lookback): + self._setpoint = target + self._outputstep = step + self._out_max = step + self._out_min = -step + self._noiseband = noiseband + self._inputs = deque(maxlen=round(lookback / sampletime)) + + def setReady(self): + self._state = PIDAutotuneState.STATE_READY + self._peak_count = 0 + + def setOff(self): + self._state = PIDAutotuneState.STATE_OFF + def state(self): """Get the current state.""" return self._state @@ -82,6 +98,13 @@ class PIDAutotune: kd = divisors[2] * self._Ku * self._Pu return PIDAutotune.PIDParams(kp, ki, kd) + def get_tec_pid(self): + divisors = self._tuning_rules["tyreus-luyben"] + kp = self._Ku * divisors[0] + ki = divisors[1] * self._Ku / self._Pu + kd = divisors[2] * self._Ku * self._Pu + return kp, ki, kd + def run(self, input_val, time_input): """To autotune a system, this method must be called periodically. diff --git a/pythermostat/pythermostat/gui/model/pid_autotuner.py b/pythermostat/pythermostat/gui/model/pid_autotuner.py new file mode 100644 index 0000000..c916a21 --- /dev/null +++ b/pythermostat/pythermostat/gui/model/pid_autotuner.py @@ -0,0 +1,84 @@ +from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal +from qasync import asyncSlot +from autotune import PIDAutotuneState, PIDAutotune + + +class PIDAutoTuner(QObject): + autotune_state_changed = pyqtSignal(int, PIDAutotuneState) + + def __init__(self, parent, thermostat, num_of_channel): + super().__init__(parent) + + self._thermostat = thermostat + self._thermostat.report_update.connect(self.tick) + + self.autotuners = [PIDAutotune(25) for _ in range(num_of_channel)] + self.target_temp = [20.0 for _ in range(num_of_channel)] + self.test_current = [1.0 for _ in range(num_of_channel)] + self.temp_swing = [1.5 for _ in range(num_of_channel)] + self.lookback = [3.0 for _ in range(num_of_channel)] + self.sampling_interval = [1 / 16.67 for _ in range(num_of_channel)] + + def set_params(self, params_name, ch, val): + getattr(self, params_name)[ch] = val + + def get_state(self, ch): + return self.autotuners[ch].state() + + def load_params_and_set_ready(self, ch): + self.autotuners[ch].setParam( + self.target_temp[ch], + self.test_current[ch] / 1000, + self.temp_swing[ch], + 1 / self.sampling_interval[ch], + self.lookback[ch], + ) + self.autotuners[ch].setReady() + self.autotune_state_changed.emit(ch, self.autotuners[ch].state()) + + async def stop_pid_from_running(self, ch): + self.autotuners[ch].setOff() + self.autotune_state_changed.emit(ch, self.autotuners[ch].state()) + if self._thermostat.connected(): + await self._thermostat.set_param("output", ch, "i_set", 0) + + @asyncSlot(list) + async def tick(self, report): + for channel_report in report: + ch = channel_report["channel"] + + self.sampling_interval[ch] = channel_report["interval"] + + # TODO: Skip when PID Autotune or emit error message if NTC is not connected + if channel_report["temperature"] is None: + continue + + match self.autotuners[ch].state(): + case ( + PIDAutotuneState.STATE_READY + | PIDAutotuneState.STATE_RELAY_STEP_UP + | PIDAutotuneState.STATE_RELAY_STEP_DOWN + ): + self.autotuners[ch].run( + channel_report["temperature"], channel_report["time"] + ) + await self._thermostat.set_param( + "output", ch, "i_set", self.autotuners[ch].output() + ) + case PIDAutotuneState.STATE_SUCCEEDED: + kp, ki, kd = self.autotuners[ch].get_tec_pid() + self.autotuners[ch].setOff() + self.autotune_state_changed.emit(ch, self.autotuners[ch].state()) + + await self._thermostat.set_param("pid", ch, "kp", kp) + await self._thermostat.set_param("pid", ch, "ki", ki) + await self._thermostat.set_param("pid", ch, "kd", kd) + await self._thermostat.set_param("output", ch, "pid") + + await self._thermostat.set_param( + "pid", ch, "target", self.target_temp[ch] + ) + case PIDAutotuneState.STATE_FAILED: + self.autotuners[ch].setOff() + self.autotune_state_changed.emit(ch, self.autotuners[ch].state()) + await self._thermostat.set_param("output", ch, "i_set", 0) diff --git a/pythermostat/pythermostat/thermostat_qt.py b/pythermostat/pythermostat/thermostat_qt.py index 31be0a1..ef707e6 100755 --- a/pythermostat/pythermostat/thermostat_qt.py +++ b/pythermostat/pythermostat/thermostat_qt.py @@ -8,7 +8,9 @@ from PyQt6 import QtWidgets, QtGui, uic from PyQt6.QtCore import pyqtSlot import qasync from qasync import asyncSlot, asyncClose +from pythermostat.autotune import PIDAutotuneState from pythermostat.gui.model.thermostat import Thermostat, ThermostatConnectionState +from pythermostat.gui.model.pid_autotuner import PIDAutoTuner from pythermostat.gui.view.connection_details_menu import ConnectionDetailsMenu from pythermostat.gui.view.info_box import InfoBox from pythermostat.gui.view.thermostat_settings_menu import ThermostatSettingsMenu @@ -55,6 +57,19 @@ class MainWindow(QtWidgets.QMainWindow): self._on_connection_state_changed ) + self._autotuners = PIDAutoTuner(self, self._thermostat, 2) + self._autotuners.autotune_state_changed.connect( + self._on_pid_autotune_state_changed + ) + + # Handlers for disconnections + async def autotune_disconnect(): + for ch in range(self.NUM_CHANNELS): + if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF: + await self._autotuners.stop_pid_from_running(ch) + + self._thermostat.disconnect_cb = autotune_disconnect + @pyqtSlot() def handle_connection_error(): self._info_box.display_info_box( @@ -117,6 +132,28 @@ class MainWindow(QtWidgets.QMainWindow): self.connect_btn.setText("Connect") self.status_lbl.setText("Disconnected") + @pyqtSlot(int, PIDAutotuneState) + def _on_pid_autotune_state_changed(self, _ch, _state): + autotuning_channels = [] + for ch in range(self.NUM_CHANNELS): + if self._autotuners.get_state(ch) in { + PIDAutotuneState.STATE_READY, + PIDAutotuneState.STATE_RELAY_STEP_UP, + PIDAutotuneState.STATE_RELAY_STEP_DOWN, + }: + autotuning_channels.append(ch) + + if len(autotuning_channels) == 0: + self.background_task_lbl.setText("Ready.") + self.loading_spinner.hide() + self.loading_spinner.stop() + else: + self.background_task_lbl.setText( + f"Autotuning channel {autotuning_channels}..." + ) + self.loading_spinner.start() + self.loading_spinner.show() + @asyncSlot() async def on_connect_btn_clicked(self): match self._thermostat.connection_state: