from PyQt6.QtCore import QObject, pyqtSlot from qasync import asyncSlot from autotune import PIDAutotuneState, PIDAutotune class PIDAutoTuner(QObject): def __init__(self, parent, client, num_of_channel): super().__init__() self._client = client self.autotuners = [PIDAutotune(25) for _ in range(num_of_channel)] self.target_temp = [20.0 for _ in range(num_of_channel)] self.test_current = [1.0 for _ in range(num_of_channel)] self.temp_swing = [1.5 for _ in range(num_of_channel)] self.lookback = [3.0 for _ in range(num_of_channel)] self.sampling_interval = [1 / 16.67 for _ in range(num_of_channel)] @pyqtSlot(list) def update_sampling_interval(self, interval): self.sampling_interval = interval def set_params(self, params_name, ch, val): getattr(self, params_name)[ch] = val def get_state(self, ch): return self.autotuners[ch].state() def load_params_and_set_ready(self, ch): self.autotuners[ch].setParam( self.target_temp[ch], self.test_current[ch] / 1000, self.temp_swing[ch], 1 / self.sampling_interval[ch], self.lookback[ch], ) self.autotuners[ch].setReady() async def stop_pid_from_running(self, ch): self.autotuners[ch].setOff() await self._client.set_param("pwm", ch, "i_set", 0) @asyncSlot(list) async def tick(self, report): for channel_report in report: # TODO: Skip when PID Autotune or emit error message if NTC is not connected if channel_report["temperature"] is None: continue ch = channel_report["channel"] match self.autotuners[ch].state(): case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN: self.autotuners[ch].run( channel_report["temperature"], channel_report["time"] ) await self._client.set_param( "pwm", ch, "i_set", self.autotuners[ch].output() ) case PIDAutotuneState.STATE_SUCCEEDED: kp, ki, kd = self.autotuners[ch].get_tec_pid() self.autotuners[ch].setOff() await self._client.set_param("pid", ch, "kp", kp) await self._client.set_param("pid", ch, "ki", ki) await self._client.set_param("pid", ch, "kd", kd) await self._client.set_param("pwm", ch, "pid") await self._client.set_param( "pid", ch, "target", self.target_temp[ch] ) case PIDAutotuneState.STATE_FAILED: self.autotuners[ch].setOff() await self._client.set_param("pwm", ch, "i_set", 0)