WIP: GUI: Thermostat Control Panel #147

Draft
atse wants to merge 10 commits from atse/thermostat:GUI into master
3 changed files with 144 additions and 0 deletions
Showing only changes of commit ca7a14f969 - Show all commits

View File

@ -18,6 +18,7 @@ class PIDAutotuneState(Enum):
STATE_RELAY_STEP_DOWN = 'relay step down'
STATE_SUCCEEDED = 'succeeded'
STATE_FAILED = 'failed'
STATE_READY = "ready"
class PIDAutotune:
@ -57,6 +58,21 @@ class PIDAutotune:
self._Ku = 0
self._Pu = 0
def setParam(self, target, step, noiseband, sampletime, lookback):
self._setpoint = target
self._outputstep = step
self._out_max = step
self._out_min = -step
self._noiseband = noiseband
self._inputs = deque(maxlen=round(lookback / sampletime))
def setReady(self):
atse marked this conversation as resolved Outdated
Outdated
Review

Do not mix camel case and underscore naming schemes in method names for the same class.

Do not mix camel case and underscore naming schemes in method names for the same class.
Outdated
Review

Corrected in #162.

Corrected in #162.
self._state = PIDAutotuneState.STATE_READY
self._peak_count = 0
def setOff(self):
self._state = PIDAutotuneState.STATE_OFF
def state(self):
"""Get the current state."""
return self._state
@ -82,6 +98,13 @@ class PIDAutotune:
kd = divisors[2] * self._Ku * self._Pu
return PIDAutotune.PIDParams(kp, ki, kd)
def get_tec_pid(self):
divisors = self._tuning_rules["tyreus-luyben"]
kp = self._Ku * divisors[0]
ki = divisors[1] * self._Ku / self._Pu
kd = divisors[2] * self._Ku * self._Pu
return kp, ki, kd
Outdated
Review

Split all autotune changes from GUI PR

Split all autotune changes from GUI PR
Outdated
Review

Now in #162.

Now in #162.
def run(self, input_val, time_input):
"""To autotune a system, this method must be called periodically.

View File

@ -0,0 +1,84 @@
from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal
from qasync import asyncSlot
from pythermostat.autotune import PIDAutotuneState, PIDAutotune
class PIDAutoTuner(QObject):
autotune_state_changed = pyqtSignal(int, PIDAutotuneState)
def __init__(self, parent, thermostat, num_of_channel):
super().__init__(parent)
self._thermostat = thermostat
self._thermostat.report_update.connect(self.tick)
self.autotuners = [PIDAutotune(25) for _ in range(num_of_channel)]
self.target_temp = [20.0 for _ in range(num_of_channel)]
self.test_current = [1.0 for _ in range(num_of_channel)]
self.temp_swing = [1.5 for _ in range(num_of_channel)]
self.lookback = [3.0 for _ in range(num_of_channel)]
self.sampling_interval = [1 / 16.67 for _ in range(num_of_channel)]
def set_params(self, params_name, ch, val):
getattr(self, params_name)[ch] = val
def get_state(self, ch):
return self.autotuners[ch].state()
def load_params_and_set_ready(self, ch):
self.autotuners[ch].setParam(
self.target_temp[ch],
self.test_current[ch] / 1000,
self.temp_swing[ch],
1 / self.sampling_interval[ch],
self.lookback[ch],
)
self.autotuners[ch].setReady()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
async def stop_pid_from_running(self, ch):
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
if self._thermostat.connected():
await self._thermostat.set_param("output", ch, "i_set", 0)
@asyncSlot(list)
async def tick(self, report):
for channel_report in report:
ch = channel_report["channel"]
self.sampling_interval[ch] = channel_report["interval"]
# TODO: Skip when PID Autotune or emit error message if NTC is not connected
if channel_report["temperature"] is None:
continue
match self.autotuners[ch].state():
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.autotuners[ch].run(
channel_report["temperature"], channel_report["time"]
)
await self._thermostat.set_param(
"output", ch, "i_set", self.autotuners[ch].output()
)
case PIDAutotuneState.STATE_SUCCEEDED:
kp, ki, kd = self.autotuners[ch].get_tec_pid()
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
await self._thermostat.set_param("pid", ch, "kp", kp)
await self._thermostat.set_param("pid", ch, "ki", ki)
await self._thermostat.set_param("pid", ch, "kd", kd)
await self._thermostat.set_param("output", ch, "pid")
await self._thermostat.set_param(
"pid", ch, "target", self.target_temp[ch]
)
case PIDAutotuneState.STATE_FAILED:
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
await self._thermostat.set_param("output", ch, "i_set", 0)

View File

@ -8,7 +8,9 @@ from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import pyqtSlot
import qasync
from qasync import asyncSlot, asyncClose
from pythermostat.autotune import PIDAutotuneState
from pythermostat.gui.model.thermostat import Thermostat, ThermostatConnectionState
from pythermostat.gui.model.pid_autotuner import PIDAutoTuner
from pythermostat.gui.view.connection_details_menu import ConnectionDetailsMenu
from pythermostat.gui.view.info_box import InfoBox
from pythermostat.gui.view.zero_limits_warning_view import ZeroLimitsWarningView
@ -54,6 +56,19 @@ class MainWindow(QtWidgets.QMainWindow):
self._on_connection_state_changed
)
self._autotuners = PIDAutoTuner(self, self._thermostat, 2)
self._autotuners.autotune_state_changed.connect(
self._on_pid_autotune_state_changed
)
# Handlers for disconnections
async def autotune_disconnect():
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
await self._autotuners.stop_pid_from_running(ch)
self._thermostat.disconnect_cb = autotune_disconnect
@pyqtSlot()
def handle_connection_error():
self._info_box.display_info_box(
@ -111,6 +126,28 @@ class MainWindow(QtWidgets.QMainWindow):
self.connect_btn.setText("Connect")
self.status_lbl.setText("Disconnected")
@pyqtSlot(int, PIDAutotuneState)
def _on_pid_autotune_state_changed(self, _ch, _state):
autotuning_channels = []
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) in {
PIDAutotuneState.STATE_READY,
PIDAutotuneState.STATE_RELAY_STEP_UP,
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
}:
autotuning_channels.append(ch)
if len(autotuning_channels) == 0:
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
else:
self.background_task_lbl.setText(
f"Autotuning channel {autotuning_channels}..."
)
self.loading_spinner.start()
self.loading_spinner.show()
@asyncSlot()
async def on_connect_btn_clicked(self):
match self._thermostat.connection_state: