forked from M-Labs/thermostat
pytec GUI: Incorporate autotuning
Co-authored-by: topquark12 <aw@m-labs.hk> Co-authored-by: linuswck <linuswck@m-labs.hk> Co-authored-by: Egor Savkin <es@m-labs.hk>
This commit is contained in:
parent
0de3c9452e
commit
c77e6f73f1
@ -17,6 +17,7 @@ class PIDAutotuneState(Enum):
|
|||||||
STATE_RELAY_STEP_DOWN = 'relay step down'
|
STATE_RELAY_STEP_DOWN = 'relay step down'
|
||||||
STATE_SUCCEEDED = 'succeeded'
|
STATE_SUCCEEDED = 'succeeded'
|
||||||
STATE_FAILED = 'failed'
|
STATE_FAILED = 'failed'
|
||||||
|
STATE_READY = "ready"
|
||||||
|
|
||||||
|
|
||||||
class PIDAutotune:
|
class PIDAutotune:
|
||||||
@ -56,6 +57,21 @@ class PIDAutotune:
|
|||||||
self._Ku = 0
|
self._Ku = 0
|
||||||
self._Pu = 0
|
self._Pu = 0
|
||||||
|
|
||||||
|
def setParam(self, target, step, noiseband, sampletime, lookback):
|
||||||
|
self._setpoint = target
|
||||||
|
self._outputstep = step
|
||||||
|
self._out_max = step
|
||||||
|
self._out_min = -step
|
||||||
|
self._noiseband = noiseband
|
||||||
|
self._inputs = deque(maxlen=round(lookback / sampletime))
|
||||||
|
|
||||||
|
def setReady(self):
|
||||||
|
self._state = PIDAutotuneState.STATE_READY
|
||||||
|
self._peak_count = 0
|
||||||
|
|
||||||
|
def setOff(self):
|
||||||
|
self._state = PIDAutotuneState.STATE_OFF
|
||||||
|
|
||||||
def state(self):
|
def state(self):
|
||||||
"""Get the current state."""
|
"""Get the current state."""
|
||||||
return self._state
|
return self._state
|
||||||
@ -81,6 +97,13 @@ class PIDAutotune:
|
|||||||
kd = divisors[2] * self._Ku * self._Pu
|
kd = divisors[2] * self._Ku * self._Pu
|
||||||
return PIDAutotune.PIDParams(kp, ki, kd)
|
return PIDAutotune.PIDParams(kp, ki, kd)
|
||||||
|
|
||||||
|
def get_tec_pid(self):
|
||||||
|
divisors = self._tuning_rules["tyreus-luyben"]
|
||||||
|
kp = self._Ku * divisors[0]
|
||||||
|
ki = divisors[1] * self._Ku / self._Pu
|
||||||
|
kd = divisors[2] * self._Ku * self._Pu
|
||||||
|
return kp, ki, kd
|
||||||
|
|
||||||
def run(self, input_val, time_input):
|
def run(self, input_val, time_input):
|
||||||
"""To autotune a system, this method must be called periodically.
|
"""To autotune a system, this method must be called periodically.
|
||||||
|
|
||||||
|
84
pytec/pytec/gui/model/pid_autotuner.py
Normal file
84
pytec/pytec/gui/model/pid_autotuner.py
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal
|
||||||
|
from qasync import asyncSlot
|
||||||
|
from autotune import PIDAutotuneState, PIDAutotune
|
||||||
|
|
||||||
|
|
||||||
|
class PIDAutoTuner(QObject):
|
||||||
|
autotune_state_changed = pyqtSignal(int, PIDAutotuneState)
|
||||||
|
|
||||||
|
def __init__(self, parent, thermostat, num_of_channel):
|
||||||
|
super().__init__(parent)
|
||||||
|
|
||||||
|
self._thermostat = thermostat
|
||||||
|
self._thermostat.report_update.connect(self.tick)
|
||||||
|
|
||||||
|
self.autotuners = [PIDAutotune(25) for _ in range(num_of_channel)]
|
||||||
|
self.target_temp = [20.0 for _ in range(num_of_channel)]
|
||||||
|
self.test_current = [1.0 for _ in range(num_of_channel)]
|
||||||
|
self.temp_swing = [1.5 for _ in range(num_of_channel)]
|
||||||
|
self.lookback = [3.0 for _ in range(num_of_channel)]
|
||||||
|
self.sampling_interval = [1 / 16.67 for _ in range(num_of_channel)]
|
||||||
|
|
||||||
|
def set_params(self, params_name, ch, val):
|
||||||
|
getattr(self, params_name)[ch] = val
|
||||||
|
|
||||||
|
def get_state(self, ch):
|
||||||
|
return self.autotuners[ch].state()
|
||||||
|
|
||||||
|
def load_params_and_set_ready(self, ch):
|
||||||
|
self.autotuners[ch].setParam(
|
||||||
|
self.target_temp[ch],
|
||||||
|
self.test_current[ch] / 1000,
|
||||||
|
self.temp_swing[ch],
|
||||||
|
1 / self.sampling_interval[ch],
|
||||||
|
self.lookback[ch],
|
||||||
|
)
|
||||||
|
self.autotuners[ch].setReady()
|
||||||
|
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
|
||||||
|
|
||||||
|
async def stop_pid_from_running(self, ch):
|
||||||
|
self.autotuners[ch].setOff()
|
||||||
|
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
|
||||||
|
if self._thermostat.connected():
|
||||||
|
await self._thermostat.set_param("pwm", ch, "i_set", 0)
|
||||||
|
|
||||||
|
@asyncSlot(list)
|
||||||
|
async def tick(self, report):
|
||||||
|
for channel_report in report:
|
||||||
|
ch = channel_report["channel"]
|
||||||
|
|
||||||
|
self.sampling_interval[ch] = channel_report["interval"]
|
||||||
|
|
||||||
|
# TODO: Skip when PID Autotune or emit error message if NTC is not connected
|
||||||
|
if channel_report["temperature"] is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
match self.autotuners[ch].state():
|
||||||
|
case (
|
||||||
|
PIDAutotuneState.STATE_READY
|
||||||
|
| PIDAutotuneState.STATE_RELAY_STEP_UP
|
||||||
|
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
|
||||||
|
):
|
||||||
|
self.autotuners[ch].run(
|
||||||
|
channel_report["temperature"], channel_report["time"]
|
||||||
|
)
|
||||||
|
await self._thermostat.set_param(
|
||||||
|
"pwm", ch, "i_set", self.autotuners[ch].output()
|
||||||
|
)
|
||||||
|
case PIDAutotuneState.STATE_SUCCEEDED:
|
||||||
|
kp, ki, kd = self.autotuners[ch].get_tec_pid()
|
||||||
|
self.autotuners[ch].setOff()
|
||||||
|
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
|
||||||
|
|
||||||
|
await self._thermostat.set_param("pid", ch, "kp", kp)
|
||||||
|
await self._thermostat.set_param("pid", ch, "ki", ki)
|
||||||
|
await self._thermostat.set_param("pid", ch, "kd", kd)
|
||||||
|
await self._thermostat.set_param("pwm", ch, "pid")
|
||||||
|
|
||||||
|
await self._thermostat.set_param(
|
||||||
|
"pid", ch, "target", self.target_temp[ch]
|
||||||
|
)
|
||||||
|
case PIDAutotuneState.STATE_FAILED:
|
||||||
|
self.autotuners[ch].setOff()
|
||||||
|
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
|
||||||
|
await self._thermostat.set_param("pwm", ch, "i_set", 0)
|
@ -8,7 +8,9 @@ from PyQt6 import QtWidgets, QtGui, uic
|
|||||||
from PyQt6.QtCore import pyqtSlot
|
from PyQt6.QtCore import pyqtSlot
|
||||||
import qasync
|
import qasync
|
||||||
from qasync import asyncSlot, asyncClose
|
from qasync import asyncSlot, asyncClose
|
||||||
|
from autotune import PIDAutotuneState
|
||||||
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
|
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
|
||||||
|
from pytec.gui.model.pid_autotuner import PIDAutoTuner
|
||||||
from pytec.gui.view.zero_limits_warning_view import ZeroLimitsWarningView
|
from pytec.gui.view.zero_limits_warning_view import ZeroLimitsWarningView
|
||||||
from pytec.gui.view.thermostat_settings_menu import ThermostatSettingsMenu
|
from pytec.gui.view.thermostat_settings_menu import ThermostatSettingsMenu
|
||||||
from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu
|
from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu
|
||||||
@ -55,6 +57,19 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self._on_connection_state_changed
|
self._on_connection_state_changed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._autotuners = PIDAutoTuner(self, self._thermostat, 2)
|
||||||
|
self._autotuners.autotune_state_changed.connect(
|
||||||
|
self._on_pid_autotune_state_changed
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handlers for disconnections
|
||||||
|
async def autotune_disconnect():
|
||||||
|
for ch in range(self.NUM_CHANNELS):
|
||||||
|
if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
||||||
|
await self._autotuners.stop_pid_from_running(ch)
|
||||||
|
|
||||||
|
self._thermostat.disconnect_cb = autotune_disconnect
|
||||||
|
|
||||||
@pyqtSlot()
|
@pyqtSlot()
|
||||||
def handle_connection_error():
|
def handle_connection_error():
|
||||||
self._info_box.display_info_box(
|
self._info_box.display_info_box(
|
||||||
@ -117,6 +132,28 @@ class MainWindow(QtWidgets.QMainWindow):
|
|||||||
self.connect_btn.setText("Connect")
|
self.connect_btn.setText("Connect")
|
||||||
self.status_lbl.setText("Disconnected")
|
self.status_lbl.setText("Disconnected")
|
||||||
|
|
||||||
|
@pyqtSlot(int, PIDAutotuneState)
|
||||||
|
def _on_pid_autotune_state_changed(self, _ch, _state):
|
||||||
|
autotuning_channels = []
|
||||||
|
for ch in range(self.NUM_CHANNELS):
|
||||||
|
if self._autotuners.get_state(ch) in {
|
||||||
|
PIDAutotuneState.STATE_READY,
|
||||||
|
PIDAutotuneState.STATE_RELAY_STEP_UP,
|
||||||
|
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
|
||||||
|
}:
|
||||||
|
autotuning_channels.append(ch)
|
||||||
|
|
||||||
|
if len(autotuning_channels) == 0:
|
||||||
|
self.background_task_lbl.setText("Ready.")
|
||||||
|
self.loading_spinner.hide()
|
||||||
|
self.loading_spinner.stop()
|
||||||
|
else:
|
||||||
|
self.background_task_lbl.setText(
|
||||||
|
f"Autotuning channel {autotuning_channels}..."
|
||||||
|
)
|
||||||
|
self.loading_spinner.start()
|
||||||
|
self.loading_spinner.show()
|
||||||
|
|
||||||
@asyncSlot()
|
@asyncSlot()
|
||||||
async def on_connect_btn_clicked(self):
|
async def on_connect_btn_clicked(self):
|
||||||
match self._thermostat.connection_state:
|
match self._thermostat.connection_state:
|
||||||
|
Loading…
Reference in New Issue
Block a user