Create GUI to Thermostat

Control the Thermostat with a Qt GUI, with plotting of key parameters.
This commit is contained in:
atse 2024-10-28 17:51:15 +08:00
parent ff09069424
commit fe0dc30a66
23 changed files with 2949 additions and 1 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ result
*.bin
__pycache__/
*.pyc

View File

@ -57,6 +57,29 @@
dontFixup = true;
auditable = false;
};
pyqtgraph = pkgs.python3Packages.buildPythonPackage rec {
pname = "pyqtgraph";
version = "0.13.3";
format = "pyproject";
src = pkgs.fetchPypi {
inherit pname version;
hash = "sha256-WBCNhBHHBU4IQdi3ke6F4QH8KWubNZwOAd3jipj/Ks4=";
};
propagatedBuildInputs = with pkgs.python3Packages; [ numpy pyqt6 ];
};
pglive = pkgs.python3Packages.buildPythonPackage rec {
pname = "pglive";
version = "0.7.2";
format = "pyproject";
src = pkgs.fetchPypi {
inherit pname version;
hash = "sha256-jqj8X6H1N5mJQ4OrY5ANqRB0YJByqg/bNneEALWmH1A=";
};
buildInputs = [ pkgs.python3Packages.poetry-core ];
propagatedBuildInputs = [ pyqtgraph pkgs.python3Packages.numpy ];
};
in
{
packages.x86_64-linux = {
@ -81,7 +104,7 @@
]
++ (with python3Packages; [
numpy
matplotlib
matplotlib pyqtgraph setuptools pyqt6 qasync pglive
]);
shellHook = ''
export PYTHONPATH=`pwd`/pytec:$PYTHONPATH

View File

@ -68,6 +68,7 @@ class PIDAutotune:
def setReady(self):
self._state = PIDAutotuneState.STATE_READY
self._peak_count = 0
def setOff(self):
self._state = PIDAutotuneState.STATE_OFF

View File

@ -0,0 +1,36 @@
import asyncio
from contextlib import suppress
from pytec.aioclient import AsyncioClient
async def poll_for_info(tec):
while True:
print(tec.get_pwm())
print(tec.get_steinhart_hart())
print(tec.get_pid())
print(tec.get_postfilter())
print(tec.get_fan())
await asyncio.sleep(1)
async def main():
tec = AsyncioClient()
await tec.connect() # (host="192.168.1.26", port=23)
await tec.set_param("s-h", 1, "t0", 20)
print(await tec.get_output())
print(await tec.get_pid())
print(await tec.get_output())
print(await tec.get_postfilter())
print(await tec.get_b_parameter())
polling_task = asyncio.create_task(poll_for_info(tec))
async for data in tec.report_mode():
print(data)
polling_task.cancel()
with suppress(asyncio.CancelledError):
await polling_task
asyncio.run(main())

261
pytec/pytec/aioclient.py Normal file
View File

@ -0,0 +1,261 @@
import asyncio
import json
import logging
class CommandError(Exception):
pass
class AsyncioClient:
def __init__(self):
self._reader = None
self._writer = None
self._command_lock = asyncio.Lock()
self._report_mode_on = False
async def connect(self, host="192.168.1.26", port=23):
"""Connect to Thermostat at specified host and port.
Example::
client = AsyncioClient()
await client.connect()
"""
self._reader, self._writer = await asyncio.open_connection(host, port)
await self._check_zero_limits()
def connected(self):
"""Returns True if client is connected"""
return self._writer is not None
async def disconnect(self):
"""Disconnect from the Thermostat"""
if self._writer is None:
return
# Reader needn't be closed
self._writer.close()
await self._writer.wait_closed()
self._reader = None
self._writer = None
async def _check_zero_limits(self):
output_report = await self.get_output()
for output_channel in output_report:
for limit in ["max_i_neg", "max_i_pos", "max_v"]:
if output_channel[limit] == 0.0:
logging.warning(
"`{}` limit is set to zero on channel {}".format(
limit, output_channel["channel"]
)
)
async def _read_line(self):
# read 1 line
chunk = await self._reader.readline()
return chunk.decode("utf-8", errors="ignore")
async def _read_write(self, command):
self._writer.write(((" ".join(command)).strip() + "\n").encode("utf-8"))
await self._writer.drain()
return await self._read_line()
async def _command(self, *command):
async with self._command_lock:
line = await self._read_write(command)
response = json.loads(line)
if "error" in response:
raise CommandError(response["error"])
return response
async def _get_conf(self, topic):
result = [None, None]
for item in await self._command(topic):
result[int(item["channel"])] = item
return result
async def get_output(self):
"""Retrieve output limits for the TEC
Example::
[{'channel': 0,
'center': 'vref',
'i_set': -0.02002179650216762,
'max_i_neg': 2.0,
'max_v': : 3.988,
'max_i_pos': 2.0,
'polarity': 'normal'},
{'channel': 1,
'center': 'vref',
'i_set': -0.02002179650216762,
'max_i_neg': 2.0,
'max_v': : 3.988,
'max_i_pos': 2.0,
'polarity': 'normal'},
]
"""
return await self._get_conf("output")
async def get_pid(self):
"""Retrieve PID control state
Example::
[{'channel': 0,
'parameters': {
'kp': 10.0,
'ki': 0.02,
'kd': 0.0,
'output_min': 0.0,
'output_max': 3.0},
'target': 37.0},
{'channel': 1,
'parameters': {
'kp': 10.0,
'ki': 0.02,
'kd': 0.0,
'output_min': 0.0,
'output_max': 3.0},
'target': 36.5}]
"""
return await self._get_conf("pid")
async def get_b_parameter(self):
"""Retrieve B-Parameter equation parameters for resistance to temperature conversion
Example::
[{'params': {'b': 3800.0, 'r0': 10000.0, 't0': 298.15}, 'channel': 0},
{'params': {'b': 3800.0, 'r0': 10000.0, 't0': 298.15}, 'channel': 1}]
"""
return await self._get_conf("b-p")
async def get_postfilter(self):
"""Retrieve DAC postfilter configuration
Example::
[{'rate': None, 'channel': 0},
{'rate': 21.25, 'channel': 1}]
"""
return await self._get_conf("postfilter")
async def get_fan(self):
"""Get Thermostat current fan settings"""
return await self._command("fan")
async def report(self):
"""Obtain one-time report on measurement values"""
return await self._command("report")
async def report_mode(self):
"""Start reporting measurement values
Example of yielded data::
{'channel': 0,
'time': 2302524,
'adc': 0.6199188965423515,
'sens': 6138.519310282602,
'temperature': 36.87032392655527,
'pid_engaged': True,
'i_set': 2.0635816680889123,
'vref': 1.494,
'dac_value': 2.527790834044456,
'dac_feedback': 2.523,
'i_tec': 2.331,
'tec_i': 2.0925,
'tec_u_meas': 2.5340000000000003,
'pid_output': 2.067581958092247}
"""
await self._command("report mode", "on")
self._report_mode_on = True
while self._report_mode_on:
async with self._command_lock:
line = await self._read_line()
if not line:
break
try:
yield json.loads(line)
except json.decoder.JSONDecodeError:
pass
await self._command("report mode", "off")
def stop_report_mode(self):
self._report_mode_on = False
async def set_param(self, topic, channel, field="", value=""):
"""Set configuration parameters
Examples::
await tec.set_param("output", 0, "max_v", 2.0)
await tec.set_param("pid", 1, "output_max", 2.5)
await tec.set_param("s-h", 0, "t0", 20.0)
await tec.set_param("center", 0, "vref")
await tec.set_param("postfilter", 1, 21)
See the firmware's README.md for a full list.
"""
if type(value) is float:
value = "{:f}".format(value)
if type(value) is not str:
value = str(value)
await self._command(topic, str(channel), field, value)
async def set_fan(self, power="auto"):
"""Set fan power"""
await self._command("fan", str(power))
async def set_fcurve(self, a=1.0, b=0.0, c=0.0):
"""Set fan curve"""
await self._command("fcurve", str(a), str(b), str(c))
async def power_up(self, channel, target):
"""Start closed-loop mode"""
await self.set_param("pid", channel, "target", value=target)
await self.set_param("output", channel, "pid")
async def save_config(self, channel=""):
"""Save current configuration to EEPROM"""
await self._command("save", str(channel))
if channel == "":
await self._read_line() # Read the extra {}
async def load_config(self, channel=""):
"""Load current configuration from EEPROM"""
await self._command("load", str(channel))
if channel == "":
await self._read_line() # Read the extra {}
async def hw_rev(self):
"""Get Thermostat hardware revision"""
return await self._command("hwrev")
async def reset(self):
"""Reset the Thermostat
The client is disconnected as the TCP session is terminated.
"""
async with self._command_lock:
self._writer.write("reset\n".encode("utf-8"))
await self._writer.drain()
await self.disconnect()
async def dfu(self):
"""Put the Thermostat in DFU mode
The client is disconnected as the Thermostat stops responding to
TCP commands in DFU mode. To exit it, submit a DFU leave request
or power-cycle the Thermostat.
"""
async with self._command_lock:
self._writer.write("dfu\n".encode("utf-8"))
await self._writer.drain()
await self.disconnect()
async def ipv4(self):
"""Get the IPv4 settings of the Thermostat"""
return await self._command("ipv4")

View File

@ -1,6 +1,7 @@
import socket
import json
import logging
import time
@ -14,6 +15,10 @@ class Client:
self._lines = [""]
self._check_zero_limits()
def disconnect(self):
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
def _check_zero_limits(self):
output_report = self.get_output()
for output_channel in output_report:
@ -176,3 +181,11 @@ class Client:
def load_config(self):
"""Load current configuration from EEPROM"""
self._command("load")
def hw_rev(self):
"""Get Thermostat hardware revision"""
return self._command("hwrev")
def fan(self):
"""Get Thermostat current fan settings"""
return self._command("fan")

View File

@ -0,0 +1,84 @@
from PyQt6.QtCore import QObject, pyqtSlot, pyqtSignal
from qasync import asyncSlot
from autotune import PIDAutotuneState, PIDAutotune
class PIDAutoTuner(QObject):
autotune_state_changed = pyqtSignal(int, PIDAutotuneState)
def __init__(self, parent, thermostat, num_of_channel):
super().__init__(parent)
self._thermostat = thermostat
self._thermostat.report_update.connect(self.tick)
self.autotuners = [PIDAutotune(25) for _ in range(num_of_channel)]
self.target_temp = [20.0 for _ in range(num_of_channel)]
self.test_current = [1.0 for _ in range(num_of_channel)]
self.temp_swing = [1.5 for _ in range(num_of_channel)]
self.lookback = [3.0 for _ in range(num_of_channel)]
self.sampling_interval = [1 / 16.67 for _ in range(num_of_channel)]
def set_params(self, params_name, ch, val):
getattr(self, params_name)[ch] = val
def get_state(self, ch):
return self.autotuners[ch].state()
def load_params_and_set_ready(self, ch):
self.autotuners[ch].setParam(
self.target_temp[ch],
self.test_current[ch] / 1000,
self.temp_swing[ch],
1 / self.sampling_interval[ch],
self.lookback[ch],
)
self.autotuners[ch].setReady()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
async def stop_pid_from_running(self, ch):
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
if self._thermostat.connected():
await self._thermostat.set_param("pwm", ch, "i_set", 0)
@asyncSlot(list)
async def tick(self, report):
for channel_report in report:
ch = channel_report["channel"]
self.sampling_interval[ch] = channel_report["interval"]
# TODO: Skip when PID Autotune or emit error message if NTC is not connected
if channel_report["temperature"] is None:
continue
match self.autotuners[ch].state():
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.autotuners[ch].run(
channel_report["temperature"], channel_report["time"]
)
await self._thermostat.set_param(
"pwm", ch, "i_set", self.autotuners[ch].output()
)
case PIDAutotuneState.STATE_SUCCEEDED:
kp, ki, kd = self.autotuners[ch].get_tec_pid()
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
await self._thermostat.set_param("pid", ch, "kp", kp)
await self._thermostat.set_param("pid", ch, "ki", ki)
await self._thermostat.set_param("pid", ch, "kd", kd)
await self._thermostat.set_param("pwm", ch, "pid")
await self._thermostat.set_param(
"pid", ch, "target", self.target_temp[ch]
)
case PIDAutotuneState.STATE_FAILED:
self.autotuners[ch].setOff()
self.autotune_state_changed.emit(ch, self.autotuners[ch].state())
await self._thermostat.set_param("pwm", ch, "i_set", 0)

View File

@ -0,0 +1,126 @@
# A Custom Class that allows defining a QObject Property Dynamically
# Adapted from: https://stackoverflow.com/questions/48425316/how-to-create-pyqt-properties-dynamically
from functools import wraps
from PyQt6.QtCore import QObject, pyqtProperty, pyqtSignal
class PropertyMeta(type(QObject)):
"""Lets a class succinctly define Qt properties."""
def __new__(cls, name, bases, attrs):
for key in list(attrs.keys()):
attr = attrs[key]
if not isinstance(attr, Property):
continue
types = {list: "QVariantList", dict: "QVariantMap"}
type_ = types.get(attr.type_, attr.type_)
notifier = pyqtSignal(type_)
attrs[f"{key}_update"] = notifier
attrs[key] = PropertyImpl(type_=type_, name=key, notify=notifier)
return super().__new__(cls, name, bases, attrs)
class Property:
"""Property definition.
Instances of this class will be replaced with their full
implementation by the PropertyMeta metaclass.
"""
def __init__(self, type_):
self.type_ = type_
class PropertyImpl(pyqtProperty):
"""Property implementation: gets, sets, and notifies of change."""
def __init__(self, type_, name, notify):
super().__init__(type_, self.getter, self.setter, notify=notify)
self.name = name
def getter(self, instance):
return getattr(instance, f"_{self.name}")
def setter(self, instance, value):
signal = getattr(instance, f"{self.name}_update")
if type(value) in {list, dict}:
value = make_notified(value, signal)
setattr(instance, f"_{self.name}", value)
signal.emit(value)
class MakeNotified:
"""Adds notifying signals to lists and dictionaries.
Creates the modified classes just once, on initialization.
"""
change_methods = {
list: [
"__delitem__",
"__iadd__",
"__imul__",
"__setitem__",
"append",
"extend",
"insert",
"pop",
"remove",
"reverse",
"sort",
],
dict: [
"__delitem__",
"__ior__",
"__setitem__",
"clear",
"pop",
"popitem",
"setdefault",
"update",
],
}
def __init__(self):
if not hasattr(dict, "__ior__"):
# Dictionaries don't have | operator in Python < 3.9.
self.change_methods[dict].remove("__ior__")
self.notified_class = {
type_: self.make_notified_class(type_) for type_ in [list, dict]
}
def __call__(self, seq, signal):
"""Returns a notifying version of the supplied list or dict."""
notified_class = self.notified_class[type(seq)]
notified_seq = notified_class(seq)
notified_seq.signal = signal
return notified_seq
@classmethod
def make_notified_class(cls, parent):
notified_class = type(f"notified_{parent.__name__}", (parent,), {})
for method_name in cls.change_methods[parent]:
original = getattr(notified_class, method_name)
notified_method = cls.make_notified_method(original, parent)
setattr(notified_class, method_name, notified_method)
return notified_class
@staticmethod
def make_notified_method(method, parent):
@wraps(method)
def notified_method(self, *args, **kwargs):
result = getattr(parent, method.__name__)(self, *args, **kwargs)
self.signal.emit(self)
return result
return notified_method
make_notified = MakeNotified()

View File

@ -0,0 +1,130 @@
from PyQt6.QtCore import pyqtSignal, QObject, pyqtSlot
from qasync import asyncSlot
from pytec.gui.model.property import Property, PropertyMeta
import asyncio
import logging
from enum import Enum
from pytec.aioclient import AsyncioClient
class ThermostatConnectionState(Enum):
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
class Thermostat(QObject, metaclass=PropertyMeta):
connection_state = Property(ThermostatConnectionState)
hw_rev = Property(dict)
fan = Property(dict)
thermistor = Property(list)
pid = Property(list)
pwm = Property(list)
postfilter = Property(list)
report = Property(list)
connection_error = pyqtSignal()
NUM_CHANNELS = 2
def __init__(self, parent, update_s, disconnect_cb=None):
super().__init__(parent)
self._update_s = update_s
self._client = AsyncioClient()
self._watch_task = None
self._update_params_task = None
self.disconnect_cb = disconnect_cb
self.connection_state = ThermostatConnectionState.DISCONNECTED
async def start_session(self, host, port):
await self._client.connect(host, port)
self.hw_rev = await self._client.hw_rev()
@asyncSlot()
async def end_session(self):
self.stop_watching()
if self.disconnect_cb is not None:
if asyncio.iscoroutinefunction(self.disconnect_cb):
await self.disconnect_cb()
else:
self.disconnect_cb()
await self._client.disconnect()
def start_watching(self):
self._watch_task = asyncio.create_task(self.run())
def stop_watching(self):
if self._watch_task is not None:
self._watch_task.cancel()
self._watch_task = None
self._update_params_task.cancel()
self._update_params_task = None
async def run(self):
self._update_params_task = asyncio.create_task(self.update_params())
while True:
if self._update_params_task.done():
try:
self._update_params_task.result()
except OSError:
logging.error(
"Encountered an error while polling for information from Thermostat.",
exc_info=True,
)
await self.end_session()
self.connection_state = ThermostatConnectionState.DISCONNECTED
self.connection_error.emit()
return
self._update_params_task = asyncio.create_task(self.update_params())
await asyncio.sleep(self._update_s)
async def update_params(self):
self.fan, self.pwm, self.report, self.pid, self.thermistor, self.postfilter = (
await asyncio.gather(
self._client.get_fan(),
self._client.get_pwm(),
self._client.report(),
self._client.get_pid(),
self._client.get_steinhart_hart(),
self._client.get_postfilter(),
)
)
def connected(self):
return self._client.connected()
@pyqtSlot(float)
def set_update_s(self, update_s):
self._update_s = update_s
async def set_ipv4(self, ipv4):
await self._client.set_param("ipv4", ipv4)
async def get_ipv4(self):
return await self._client.ipv4()
@asyncSlot()
async def save_cfg(self, ch=""):
await self._client.save_config(ch)
@asyncSlot()
async def load_cfg(self, ch=""):
await self._client.load_config(ch)
async def dfu(self):
await self._client.dfu()
async def reset(self):
await self._client.reset()
async def set_fan(self, power="auto"):
await self._client.set_fan(power)
async def get_fan(self):
return await self._client.get_fan()
async def set_param(self, topic, channel, field="", value=""):
await self._client.set_param(topic, channel, field, value)

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

View File

@ -0,0 +1,73 @@
from PyQt6 import QtWidgets, QtCore
from PyQt6.QtCore import pyqtSlot
from pytec.gui.model.thermostat import ThermostatConnectionState
class ConnectionDetailsMenu(QtWidgets.QMenu):
def __init__(self, thermostat, connect_btn):
super().__init__()
self._thermostat = thermostat
self._connect_btn = connect_btn
self._thermostat.connection_state_update.connect(
self.thermostat_state_change_handler
)
self.setTitle("Connection Settings")
self.host_set_line = QtWidgets.QLineEdit()
self.host_set_line.setMinimumSize(QtCore.QSize(160, 0))
self.host_set_line.setMaximumSize(QtCore.QSize(160, 16777215))
self.host_set_line.setMaxLength(15)
self.host_set_line.setClearButtonEnabled(True)
def connect_on_enter_press():
self._connect_btn.click()
self.hide()
self.host_set_line.returnPressed.connect(connect_on_enter_press)
self.host_set_line.setText("192.168.1.26")
self.host_set_line.setPlaceholderText("IP for the Thermostat")
host = QtWidgets.QWidgetAction(self)
host.setDefaultWidget(self.host_set_line)
self.addAction(host)
self.host = host
self.port_set_spin = QtWidgets.QSpinBox()
self.port_set_spin.setMinimumSize(QtCore.QSize(70, 0))
self.port_set_spin.setMaximumSize(QtCore.QSize(70, 16777215))
self.port_set_spin.setMaximum(65535)
self.port_set_spin.setValue(23)
def connect_only_if_enter_pressed():
if (
not self.port_set_spin.hasFocus()
): # Don't connect if the spinbox only lost focus
return
connect_on_enter_press()
self.port_set_spin.editingFinished.connect(connect_only_if_enter_pressed)
port = QtWidgets.QWidgetAction(self)
port.setDefaultWidget(self.port_set_spin)
self.addAction(port)
self.port = port
self.exit_button = QtWidgets.QPushButton()
self.exit_button.setText("Exit GUI")
self.exit_button.pressed.connect(QtWidgets.QApplication.instance().quit)
exit_action = QtWidgets.QWidgetAction(self.exit_button)
exit_action.setDefaultWidget(self.exit_button)
self.addAction(exit_action)
self.exit_action = exit_action
@pyqtSlot(ThermostatConnectionState)
def thermostat_state_change_handler(self, state):
self.host_set_line.setEnabled(
state == ThermostatConnectionState.DISCONNECTED
)
self.port_set_spin.setEnabled(
state == ThermostatConnectionState.DISCONNECTED
)

View File

@ -0,0 +1,307 @@
from functools import partial
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
import pyqtgraph.parametertree.parameterTypes as pTypes
from pyqtgraph.parametertree import (
Parameter,
registerParameterType,
)
from qasync import asyncSlot
from autotune import PIDAutotuneState
class MutexParameter(pTypes.ListParameter):
"""
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
The ordering of the list items determines which children will be visible.
"""
def __init__(self, **opts):
super().__init__(**opts)
self.sigValueChanged.connect(self.show_chosen_child)
self.sigValueChanged.emit(self, self.opts["value"])
def _get_param_from_value(self, value):
if isinstance(self.opts["limits"], dict):
values_list = list(self.opts["limits"].values())
else:
values_list = self.opts["limits"]
return self.children()[values_list.index(value)]
@pyqtSlot(object, object)
def show_chosen_child(self, value):
for param in self.children():
param.hide()
child_to_show = self._get_param_from_value(value.value())
child_to_show.show()
if child_to_show.opts.get("triggerOnShow", None):
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
registerParameterType("mutex", MutexParameter)
class CtrlPanel(QObject):
def __init__(
self,
thermostat,
autotuners,
info_box,
trees_ui,
param_tree,
parent=None,
):
super().__init__(parent)
self.thermostat = thermostat
self.autotuners = autotuners
self.info_box = info_box
self.trees_ui = trees_ui
self.NUM_CHANNELS = len(trees_ui)
self.THERMOSTAT_PARAMETERS = [param_tree for i in range(self.NUM_CHANNELS)]
self.params = [
Parameter.create(
name=f"Thermostat Channel {ch} Parameters",
type="group",
value=ch,
children=self.THERMOSTAT_PARAMETERS[ch],
)
for ch in range(self.NUM_CHANNELS)
]
for i, param in enumerate(self.params):
param.channel = i
for i, tree in enumerate(self.trees_ui):
tree.setHeaderHidden(True)
tree.setParameters(self.params[i], showTop=False)
self.params[i].setValue = self._setValue
self.params[i].sigTreeStateChanged.connect(self.send_command)
self.params[i].child("Save to flash").sigActivated.connect(
partial(self.save_settings, i)
)
self.params[i].child("Load from flash").sigActivated.connect(
partial(self.load_settings, i)
)
self.params[i].child(
"PID Config", "PID Auto Tune", "Run"
).sigActivated.connect(partial(self.pid_auto_tune_request, i))
self.thermostat.pid_update.connect(self.update_pid)
self.thermostat.report_update.connect(self.update_report)
self.thermostat.thermistor_update.connect(self.update_thermistor)
self.thermostat.pwm_update.connect(self.update_pwm)
self.thermostat.postfilter_update.connect(self.update_postfilter)
self.autotuners.autotune_state_changed.connect(self.update_pid_autotune)
def _setValue(self, value, blockSignal=None):
"""
Implement 'lock' mechanism for Parameter Type
Modified from the source
"""
try:
if blockSignal is not None:
self.sigValueChanged.disconnect(blockSignal)
value = self._interpretValue(value)
if fn.eq(self.opts["value"], value):
return value
if "lock" in self.opts.keys():
if self.opts["lock"]:
return value
self.opts["value"] = value
self.sigValueChanged.emit(
self, value
) # value might change after signal is received by tree item
finally:
if blockSignal is not None:
self.sigValueChanged.connect(blockSignal)
return self.opts["value"]
def change_params_title(self, channel, path, title):
self.params[channel].child(*path).setOpts(title=title)
@asyncSlot(object, object)
async def send_command(self, param, changes):
"""Translates parameter tree changes into thermostat set_param calls"""
ch = param.channel
for inner_param, change, data in changes:
if change == "value":
new_value = data
if "thermostat:set_param" in inner_param.opts:
if inner_param.opts.get("suffix", None) == "mA":
new_value /= 1000 # Given in mA
thermostat_param = inner_param.opts["thermostat:set_param"]
# Handle thermostat command irregularities
match inner_param.name(), new_value:
case "Postfilter Rate", None:
thermostat_param = thermostat_param.copy()
thermostat_param["field"] = "off"
new_value = ""
case "Control Method", "Constant Current":
return
case "Control Method", "Temperature PID":
new_value = ""
inner_param.setOpts(lock=True)
await self.thermostat.set_param(
channel=ch, value=new_value, **thermostat_param
)
inner_param.setOpts(lock=False)
if "pid_autotune" in inner_param.opts:
auto_tuner_param = inner_param.opts["pid_autotune"]
self.autotuners.set_params(auto_tuner_param, ch, new_value)
@pyqtSlot(list)
def update_pid(self, pid_settings):
for settings in pid_settings:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("PID Config", "Kp").setValue(
settings["parameters"]["kp"]
)
self.params[channel].child("PID Config", "Ki").setValue(
settings["parameters"]["ki"]
)
self.params[channel].child("PID Config", "Kd").setValue(
settings["parameters"]["kd"]
)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Minimum"
).setValue(settings["parameters"]["output_min"] * 1000)
self.params[channel].child(
"PID Config", "PID Output Clamping", "Maximum"
).setValue(settings["parameters"]["output_max"] * 1000)
self.params[channel].child(
"Output Config", "Control Method", "Set Temperature"
).setValue(settings["target"])
@pyqtSlot(list)
def update_report(self, report_data):
for settings in report_data:
channel = settings["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Output Config", "Control Method").setValue(
"Temperature PID" if settings["pid_engaged"] else "Constant Current"
)
self.params[channel].child(
"Output Config", "Control Method", "Set Current"
).setValue(settings["i_set"] * 1000)
if settings["temperature"] is not None:
self.params[channel].child("Temperature").setValue(
settings["temperature"]
)
if settings["tec_i"] is not None:
self.params[channel].child("Current through TEC").setValue(
settings["tec_i"] * 1000
)
@pyqtSlot(list)
def update_thermistor(self, sh_data):
for sh_param in sh_data:
channel = sh_param["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child("Thermistor Config", "T₀").setValue(
sh_param["params"]["t0"] - 273.15
)
self.params[channel].child("Thermistor Config", "R₀").setValue(
sh_param["params"]["r0"]
)
self.params[channel].child("Thermistor Config", "B").setValue(
sh_param["params"]["b"]
)
@pyqtSlot(list)
def update_pwm(self, pwm_data):
for pwm_params in pwm_data:
channel = pwm_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Output Config", "Limits", "Max Voltage Difference"
).setValue(pwm_params["max_v"]["value"])
self.params[channel].child(
"Output Config", "Limits", "Max Cooling Current"
).setValue(pwm_params["max_i_pos"]["value"] * 1000)
self.params[channel].child(
"Output Config", "Limits", "Max Heating Current"
).setValue(pwm_params["max_i_neg"]["value"] * 1000)
@pyqtSlot(list)
def update_postfilter(self, postfilter_data):
for postfilter_params in postfilter_data:
channel = postfilter_params["channel"]
with QSignalBlocker(self.params[channel]):
self.params[channel].child(
"Thermistor Config", "Postfilter Rate"
).setValue(postfilter_params["rate"])
def update_pid_autotune(self, ch, state):
match state:
case PIDAutotuneState.STATE_OFF:
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Run"
)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.change_params_title(
ch, ("PID Config", "PID Auto Tune", "Run"), "Stop"
)
case PIDAutotuneState.STATE_SUCCEEDED:
self.info_box.display_info_box(
"PID Autotune Success",
f"Channel {ch} PID Config has been loaded to Thermostat. Regulating temperature.",
)
case PIDAutotuneState.STATE_FAILED:
self.info_box.display_info_box(
"PID Autotune Failed",
f"Channel {ch} PID Autotune has failed.",
)
@asyncSlot(int)
async def load_settings(self, ch):
await self.thermostat.load_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings loaded",
f"Channel {ch} settings has been loaded from flash.",
)
@asyncSlot(int)
async def save_settings(self, ch):
await self.thermostat.save_cfg(ch)
self.info_box.display_info_box(
f"Channel {ch} settings saved",
f"Channel {ch} settings has been saved to flash.\n"
"It will be loaded on Thermostat reset, or when settings are explicitly loaded.",
)
@asyncSlot()
async def pid_auto_tune_request(self, ch=0):
match self.autotuners.get_state(ch):
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
self.autotuners.load_params_and_set_ready(ch)
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
await self.autotuners.stop_pid_from_running(ch)

View File

@ -0,0 +1,14 @@
from PyQt6 import QtWidgets
from PyQt6.QtCore import pyqtSlot
class InfoBox(QtWidgets.QMessageBox):
def __init__(self):
super().__init__()
self.setIcon(QtWidgets.QMessageBox.Icon.Information)
@pyqtSlot(str, str)
def display_info_box(self, title, text):
self.setWindowTitle(title)
self.setText(text)
self.show()

View File

@ -0,0 +1,180 @@
from PyQt6.QtCore import QObject, pyqtSlot
from pglive.sources.data_connector import DataConnector
from pglive.kwargs import Axis
from pglive.sources.live_plot import LiveLinePlot
from pglive.sources.live_axis import LiveAxis
from collections import deque
import pyqtgraph as pg
from pytec.gui.model.thermostat import ThermostatConnectionState
pg.setConfigOptions(antialias=True)
class LiveDataPlotter(QObject):
def __init__(self, thermostat, live_plots):
super().__init__()
self._thermostat = thermostat
self._thermostat.report_update.connect(self.update_report)
self._thermostat.pid_update.connect(self.update_pid)
self._thermostat.connection_state_update.connect(
self.thermostat_state_change_handler
)
self.NUM_CHANNELS = len(live_plots)
self.graphs = []
for i, live_plot in enumerate(live_plots):
live_plot[0].setTitle(f"Channel {i} Temperature")
live_plot[1].setTitle(f"Channel {i} Current")
self.graphs.append(_TecGraphs(live_plot[0], live_plot[1]))
@pyqtSlot(ThermostatConnectionState)
def thermostat_state_change_handler(self, state):
if state == ThermostatConnectionState.DISCONNECTED:
self.clear_graphs()
def _config_connector_max_pts(self, connector, samples):
connector.max_points = samples
connector.x = deque(maxlen=int(connector.max_points))
connector.y = deque(maxlen=int(connector.max_points))
@pyqtSlot(int)
def set_max_samples(self, samples: int):
for graph in self.graphs:
self._config_connector_max_pts(graph.t_connector, samples)
self._config_connector_max_pts(graph.i_connector, samples)
self._config_connector_max_pts(graph.iset_connector, samples)
@pyqtSlot()
def clear_graphs(self):
for graph in self.graphs:
graph.clear()
@pyqtSlot(list)
def update_pid(self, pid_settings):
for settings in pid_settings:
channel = settings["channel"]
self.graphs[channel].update_pid(settings)
@pyqtSlot(list)
def update_report(self, report_data):
for settings in report_data:
channel = settings["channel"]
self.graphs[channel].update_report(settings)
class _TecGraphs:
"""The maximum number of sample points to store."""
DEFAULT_MAX_SAMPLES = 1000
def __init__(self, t_widget, i_widget):
self._t_widget = t_widget
self._i_widget = i_widget
self._t_plot = LiveLinePlot()
self._i_plot = LiveLinePlot(name="Measured")
self._iset_plot = LiveLinePlot(name="Set", pen=pg.mkPen("r"))
self._t_line = self._t_widget.getPlotItem().addLine(label="{value} °C")
self._t_line.setVisible(False)
# Hack for keeping setpoint line in plot range
self._t_setpoint_plot = LiveLinePlot()
for graph in t_widget, i_widget:
time_axis = LiveAxis(
"bottom",
text="Time since Thermostat reset",
**{Axis.TICK_FORMAT: Axis.DURATION},
)
time_axis.showLabel()
graph.setAxisItems({"bottom": time_axis})
graph.add_crosshair(pg.mkPen(color="red", width=1), {"color": "green"})
# Enable linking of axes in the graph widget's context menu
graph.register(
graph.getPlotItem().titleLabel.text # Slight hack getting the title
)
temperature_axis = LiveAxis("left", text="Temperature", units="°C")
temperature_axis.showLabel()
t_widget.setAxisItems({"left": temperature_axis})
current_axis = LiveAxis("left", text="Current", units="A")
current_axis.showLabel()
i_widget.setAxisItems({"left": current_axis})
i_widget.addLegend(brush=(50, 50, 200, 150))
t_widget.addItem(self._t_plot)
t_widget.addItem(self._t_setpoint_plot)
i_widget.addItem(self._i_plot)
i_widget.addItem(self._iset_plot)
self.t_connector = DataConnector(
self._t_plot, max_points=self.DEFAULT_MAX_SAMPLES
)
self.t_setpoint_connector = DataConnector(self._t_setpoint_plot, max_points=1)
self.i_connector = DataConnector(
self._i_plot, max_points=self.DEFAULT_MAX_SAMPLES
)
self.iset_connector = DataConnector(
self._iset_plot, max_points=self.DEFAULT_MAX_SAMPLES
)
self.max_samples = self.DEFAULT_MAX_SAMPLES
def plot_append(self, report):
temperature = report["temperature"]
current = report["tec_i"]
iset = report["i_set"]
time = report["time"]
if temperature is not None:
self.t_connector.cb_append_data_point(temperature, time)
if self._t_line.isVisible():
self.t_setpoint_connector.cb_append_data_point(
self._t_line.value(), time
)
else:
self.t_setpoint_connector.cb_append_data_point(temperature, time)
if current is not None:
self.i_connector.cb_append_data_point(current, time)
self.iset_connector.cb_append_data_point(iset, time)
def set_max_sample(self, samples: int):
for connector in self.t_connector, self.i_connector, self.iset_connector:
connector.max_points(samples)
def clear(self):
for connector in self.t_connector, self.i_connector, self.iset_connector:
connector.clear()
def set_t_line(self, temp=None, visible=None):
if visible is not None:
self._t_line.setVisible(visible)
if temp is not None:
self._t_line.setValue(temp)
# PyQtGraph normally does not update this text when the line
# is not visible, so make sure that the temperature label
# gets updated always, and doesn't stay at an old value.
self._t_line.label.setText(f"{temp} °C")
def set_max_samples(self, samples: int):
for graph in self.graphs:
graph.t_connector.max_points = samples
graph.i_connector.max_points = samples
graph.iset_connector.max_points = samples
def clear_graphs(self):
for graph in self.graphs:
graph.clear()
def update_pid(self, pid_settings):
self.set_t_line(temp=round(pid_settings["target"], 6))
def update_report(self, report_data):
self.plot_append(report_data)
self.set_t_line(visible=report_data["pid_engaged"])

View File

@ -0,0 +1,36 @@
from PyQt6 import QtWidgets
from PyQt6.QtWidgets import QAbstractButton
from PyQt6.QtCore import pyqtSignal, pyqtSlot
class NetSettingsInputDiag(QtWidgets.QInputDialog):
set_ipv4_act = pyqtSignal(str)
def __init__(self, current_ipv4_settings):
super().__init__()
self.setWindowTitle("Network Settings")
self.setLabelText(
"Set the Thermostat's IPv4 address, netmask and gateway (optional)"
)
self.setTextValue(current_ipv4_settings)
self._new_ipv4 = ""
@pyqtSlot(str)
def set_ipv4(ipv4_settings):
self._new_ipv4 = ipv4_settings
sure = QtWidgets.QMessageBox(self)
sure.setWindowTitle("Set network?")
sure.setText(
f"Setting this as network and disconnecting:<br>{ipv4_settings}"
)
sure.buttonClicked.connect(self._emit_sig)
sure.show()
self.textValueSelected.connect(set_ipv4)
self.show()
@pyqtSlot(QAbstractButton)
def _emit_sig(self, _):
self.set_ipv4_act.emit(self._new_ipv4)

View File

@ -0,0 +1,335 @@
{
"ctrl_panel":[
{
"name":"Temperature",
"type":"float",
"format":"{value:.4f} °C",
"readonly":true
},
{
"name":"Current through TEC",
"type":"float",
"suffix":"mA",
"decimals":6,
"readonly":true
},
{
"name":"Output Config",
"expanded":true,
"type":"group",
"children":[
{
"name":"Control Method",
"type":"mutex",
"limits":[
"Constant Current",
"Temperature PID"
],
"thermostat:set_param":{
"topic":"pwm",
"field":"pid"
},
"children":[
{
"name":"Set Current",
"type":"float",
"value":0,
"step":100,
"limits":[
-2000,
2000
],
"triggerOnShow":true,
"decimals":6,
"suffix":"mA",
"thermostat:set_param":{
"topic":"pwm",
"field":"i_set"
},
"lock":false
},
{
"name":"Set Temperature",
"type":"float",
"value":25,
"step":0.1,
"limits":[
-273,
300
],
"format":"{value:.4f} °C",
"thermostat:set_param":{
"topic":"pid",
"field":"target"
},
"lock":false
}
]
},
{
"name":"Limits",
"expanded":true,
"type":"group",
"children":[
{
"name":"Max Cooling Current",
"type":"float",
"value":0,
"step":100,
"decimals":6,
"limits":[
0,
2000
],
"suffix":"mA",
"thermostat:set_param":{
"topic":"pwm",
"field":"max_i_pos"
},
"lock":false
},
{
"name":"Max Heating Current",
"type":"float",
"value":0,
"step":100,
"decimals":6,
"limits":[
0,
2000
],
"suffix":"mA",
"thermostat:set_param":{
"topic":"pwm",
"field":"max_i_neg"
},
"lock":false
},
{
"name":"Max Voltage Difference",
"type":"float",
"value":0,
"step":0.1,
"limits":[
0,
5
],
"siPrefix":true,
"suffix":"V",
"thermostat:set_param":{
"topic":"pwm",
"field":"max_v"
},
"lock":false
}
]
}
]
},
{
"name":"Thermistor Config",
"expanded":true,
"type":"group",
"children":[
{
"name":"T₀",
"type":"float",
"value":25,
"step":0.1,
"limits":[
-100,
100
],
"format":"{value:.4f} °C",
"thermostat:set_param":{
"topic":"s-h",
"field":"t0"
},
"lock":false
},
{
"name":"R₀",
"type":"float",
"value":10000,
"step":1,
"siPrefix":true,
"suffix":"Ω",
"thermostat:set_param":{
"topic":"s-h",
"field":"r0"
},
"lock":false
},
{
"name":"B",
"type":"float",
"value":3950,
"step":1,
"suffix":"K",
"decimals":4,
"thermostat:set_param":{
"topic":"s-h",
"field":"b"
},
"lock":false
},
{
"name":"Postfilter Rate",
"type":"list",
"value":16.67,
"thermostat:set_param":{
"topic":"postfilter",
"field":"rate"
},
"limits":{
"Off":null,
"16.67 Hz":16.67,
"20 Hz":20.0,
"21.25 Hz":21.25,
"27 Hz":27.0
},
"lock":false
}
]
},
{
"name":"PID Config",
"expanded":true,
"type":"group",
"children":[
{
"name":"Kp",
"type":"float",
"step":0.1,
"suffix":"",
"thermostat:set_param":{
"topic":"pid",
"field":"kp"
},
"lock":false
},
{
"name":"Ki",
"type":"float",
"step":0.1,
"suffix":"Hz",
"thermostat:set_param":{
"topic":"pid",
"field":"ki"
},
"lock":false
},
{
"name":"Kd",
"type":"float",
"step":0.1,
"suffix":"s",
"thermostat:set_param":{
"topic":"pid",
"field":"kd"
},
"lock":false
},
{
"name":"PID Output Clamping",
"expanded":true,
"type":"group",
"children":[
{
"name":"Minimum",
"type":"float",
"step":100,
"limits":[
-2000,
2000
],
"decimals":6,
"suffix":"mA",
"thermostat:set_param":{
"topic":"pid",
"field":"output_min"
},
"lock":false
},
{
"name":"Maximum",
"type":"float",
"step":100,
"limits":[
-2000,
2000
],
"decimals":6,
"suffix":"mA",
"thermostat:set_param":{
"topic":"pid",
"field":"output_max"
},
"lock":false
}
]
},
{
"name":"PID Auto Tune",
"expanded":false,
"type":"group",
"children":[
{
"name":"Target Temperature",
"type":"float",
"value":20,
"step":0.1,
"format":"{value:.4f} °C",
"pid_autotune":"target_temp"
},
{
"name":"Test Current",
"type":"float",
"value":0,
"decimals":6,
"step":100,
"limits":[
-2000,
2000
],
"suffix":"mA",
"pid_autotune":"test_current"
},
{
"name":"Temperature Swing",
"type":"float",
"value":1.5,
"step":0.1,
"prefix":"±",
"format":"{value:.4f} °C",
"pid_autotune":"temp_swing"
},
{
"name":"Lookback",
"type":"float",
"value":3.0,
"step":0.1,
"format":"{value:.4f} s",
"pid_autotune":"lookback"
},
{
"name":"Run",
"type":"action",
"tip":"Run"
}
]
}
]
},
{
"name":"Save to flash",
"type":"action",
"tip":"Save config to thermostat, applies on reset"
},
{
"name":"Load from flash",
"type":"action",
"tip":"Load config from flash"
}
]
}

View File

@ -0,0 +1,25 @@
from PyQt6 import QtWidgets, QtGui
class PlotOptionsMenu(QtWidgets.QMenu):
def __init__(self, channel_graphs, max_samples=1000):
super().__init__()
self.channel_graphs = channel_graphs
self.setTitle("Plot Settings")
clear = QtGui.QAction("Clear graphs", self)
self.addAction(clear)
self.clear = clear
self.clear.triggered.connect(self.channel_graphs.clear_graphs)
self.samples_spinbox = QtWidgets.QSpinBox()
self.samples_spinbox.setRange(2, 100000)
self.samples_spinbox.setSuffix(" samples")
self.samples_spinbox.setValue(max_samples)
self.samples_spinbox.valueChanged.connect(self.channel_graphs.set_max_samples)
limit_samples = QtWidgets.QWidgetAction(self)
limit_samples.setDefaultWidget(self.samples_spinbox)
self.addAction(limit_samples)
self.limit_samples = limit_samples

View File

@ -0,0 +1,572 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>1280</width>
<height>720</height>
</rect>
</property>
<property name="minimumSize">
<size>
<width>1280</width>
<height>720</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>3840</width>
<height>2160</height>
</size>
</property>
<property name="windowTitle">
<string>Thermostat Control Panel</string>
</property>
<property name="windowIcon">
<iconset>
<normaloff>../resources/artiq.ico</normaloff>../resources/artiq.ico</iconset>
</property>
<widget class="QWidget" name="main_widget">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>1</horstretch>
<verstretch>1</verstretch>
</sizepolicy>
</property>
<layout class="QGridLayout" name="gridLayout_2">
<property name="leftMargin">
<number>3</number>
</property>
<property name="topMargin">
<number>3</number>
</property>
<property name="rightMargin">
<number>3</number>
</property>
<property name="bottomMargin">
<number>3</number>
</property>
<property name="spacing">
<number>3</number>
</property>
<item row="0" column="1">
<layout class="QVBoxLayout" name="main_layout">
<property name="spacing">
<number>0</number>
</property>
<item>
<widget class="QFrame" name="graph_group">
<property name="enabled">
<bool>false</bool>
</property>
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Expanding">
<horstretch>1</horstretch>
<verstretch>1</verstretch>
</sizepolicy>
</property>
<property name="frameShape">
<enum>QFrame::Shape::StyledPanel</enum>
</property>
<property name="frameShadow">
<enum>QFrame::Shadow::Raised</enum>
</property>
<layout class="QGridLayout" name="graphs_layout" rowstretch="1,1" columnstretch="1,1,1" rowminimumheight="100,100" columnminimumwidth="100,100,100">
<property name="sizeConstraint">
<enum>QLayout::SizeConstraint::SetDefaultConstraint</enum>
</property>
<property name="leftMargin">
<number>3</number>
</property>
<property name="topMargin">
<number>3</number>
</property>
<property name="rightMargin">
<number>3</number>
</property>
<property name="bottomMargin">
<number>3</number>
</property>
<property name="spacing">
<number>2</number>
</property>
<item row="1" column="1">
<widget class="LivePlotWidget" name="ch1_t_graph" native="true"/>
</item>
<item row="0" column="1">
<widget class="LivePlotWidget" name="ch0_t_graph" native="true"/>
</item>
<item row="0" column="2">
<widget class="LivePlotWidget" name="ch0_i_graph" native="true"/>
</item>
<item row="1" column="2">
<widget class="LivePlotWidget" name="ch1_i_graph" native="true"/>
</item>
<item row="0" column="0" rowspan="2">
<widget class="QTabWidget" name="tabWidget">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="currentIndex">
<number>0</number>
</property>
<widget class="QWidget" name="ch0_tab">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<attribute name="title">
<string>Channel 0</string>
</attribute>
<layout class="QVBoxLayout" name="verticalLayout_2">
<item>
<widget class="ParameterTree" name="ch0_tree" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="ch1_tab">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<attribute name="title">
<string>Channel 1</string>
</attribute>
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="ParameterTree" name="ch1_tree" native="true">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Preferred">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QFrame" name="bottom_settings_group">
<property name="sizePolicy">
<sizepolicy hsizetype="Expanding" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>0</width>
<height>40</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>16777215</width>
<height>40</height>
</size>
</property>
<property name="frameShape">
<enum>QFrame::Shape::StyledPanel</enum>
</property>
<property name="frameShadow">
<enum>QFrame::Shadow::Raised</enum>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_2">
<property name="spacing">
<number>3</number>
</property>
<property name="leftMargin">
<number>3</number>
</property>
<property name="topMargin">
<number>3</number>
</property>
<property name="rightMargin">
<number>3</number>
</property>
<property name="bottomMargin">
<number>3</number>
</property>
<item>
<layout class="QHBoxLayout" name="settings_layout">
<item>
<widget class="QToolButton" name="connect_btn">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>100</width>
<height>0</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>100</width>
<height>16777215</height>
</size>
</property>
<property name="baseSize">
<size>
<width>100</width>
<height>0</height>
</size>
</property>
<property name="text">
<string>Connect</string>
</property>
<property name="popupMode">
<enum>QToolButton::ToolButtonPopupMode::MenuButtonPopup</enum>
</property>
<property name="toolButtonStyle">
<enum>Qt::ToolButtonStyle::ToolButtonFollowStyle</enum>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="status_lbl">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>240</width>
<height>0</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>120</width>
<height>16777215</height>
</size>
</property>
<property name="baseSize">
<size>
<width>120</width>
<height>50</height>
</size>
</property>
<property name="text">
<string>Disconnected</string>
</property>
</widget>
</item>
<item>
<widget class="QToolButton" name="thermostat_settings">
<property name="enabled">
<bool>false</bool>
</property>
<property name="text">
<string notr="true">⚙</string>
</property>
<property name="popupMode">
<enum>QToolButton::ToolButtonPopupMode::InstantPopup</enum>
</property>
</widget>
</item>
<item>
<widget class="QToolButton" name="plot_settings">
<property name="toolTip">
<string>Plot Settings</string>
</property>
<property name="text">
<string>📉</string>
</property>
<property name="popupMode">
<enum>QToolButton::ToolButtonPopupMode::InstantPopup</enum>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="limits_warning">
<property name="toolTipDuration">
<number>1000000000</number>
</property>
</widget>
</item>
<item>
<widget class="QLabel" name="background_task_lbl">
<property name="text">
<string>Ready.</string>
</property>
</widget>
</item>
<item>
<widget class="QtWaitingSpinner" name="loading_spinner" native="true"/>
</item>
<item>
<spacer name="horizontalSpacer">
<property name="orientation">
<enum>Qt::Orientation::Horizontal</enum>
</property>
<property name="sizeHint" stdset="0">
<size>
<width>40</width>
<height>20</height>
</size>
</property>
</spacer>
</item>
<item>
<widget class="QWidget" name="report_group" native="true">
<property name="enabled">
<bool>false</bool>
</property>
<property name="sizePolicy">
<sizepolicy hsizetype="Minimum" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>40</width>
<height>0</height>
</size>
</property>
<layout class="QHBoxLayout" name="horizontalLayout_4">
<property name="spacing">
<number>0</number>
</property>
<property name="leftMargin">
<number>0</number>
</property>
<property name="topMargin">
<number>0</number>
</property>
<property name="rightMargin">
<number>0</number>
</property>
<property name="bottomMargin">
<number>0</number>
</property>
<item>
<layout class="QHBoxLayout" name="report_layout" stretch="0,1,1">
<property name="spacing">
<number>6</number>
</property>
<property name="sizeConstraint">
<enum>QLayout::SizeConstraint::SetDefaultConstraint</enum>
</property>
<property name="leftMargin">
<number>0</number>
</property>
<item>
<widget class="QLabel" name="report_lbl">
<property name="text">
<string>Poll every: </string>
</property>
<property name="alignment">
<set>Qt::AlignmentFlag::AlignRight|Qt::AlignmentFlag::AlignTrailing|Qt::AlignmentFlag::AlignVCenter</set>
</property>
</widget>
</item>
<item>
<widget class="QDoubleSpinBox" name="report_refresh_spin">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>70</width>
<height>0</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>70</width>
<height>16777215</height>
</size>
</property>
<property name="baseSize">
<size>
<width>70</width>
<height>0</height>
</size>
</property>
<property name="suffix">
<string> s</string>
</property>
<property name="decimals">
<number>1</number>
</property>
<property name="minimum">
<double>0.100000000000000</double>
</property>
<property name="singleStep">
<double>0.100000000000000</double>
</property>
<property name="stepType">
<enum>QAbstractSpinBox::StepType::AdaptiveDecimalStepType</enum>
</property>
<property name="value">
<double>1.000000000000000</double>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="report_apply_btn">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Expanding">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize">
<size>
<width>80</width>
<height>0</height>
</size>
</property>
<property name="maximumSize">
<size>
<width>80</width>
<height>16777215</height>
</size>
</property>
<property name="baseSize">
<size>
<width>80</width>
<height>0</height>
</size>
</property>
<property name="text">
<string>Apply</string>
</property>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
</item>
</layout>
</item>
</layout>
</widget>
<action name="actionReset">
<property name="text">
<string>Reset</string>
</property>
<property name="toolTip">
<string>Reset the Thermostat</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
<action name="actionEnter_DFU_Mode">
<property name="text">
<string>Enter DFU Mode</string>
</property>
<property name="toolTip">
<string>Reset thermostat and enter USB device firmware update (DFU) mode</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
<action name="actionNetwork_Settings">
<property name="text">
<string>Network Settings</string>
</property>
<property name="toolTip">
<string>Configure IPv4 address, netmask length, and optional default gateway</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
<action name="actionAbout_Thermostat">
<property name="text">
<string>About Thermostat</string>
</property>
<property name="toolTip">
<string>Show Thermostat hardware revision, and settings related to i</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
<action name="actionLoad_all_configs">
<property name="text">
<string>Load all channel configs from flash</string>
</property>
<property name="toolTip">
<string>Restore configuration for all channels from flash</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
<action name="actionSave_all_configs">
<property name="text">
<string>Save all channel configs to flash</string>
</property>
<property name="toolTip">
<string>Save configuration for all channels to flash</string>
</property>
<property name="menuRole">
<enum>QAction::MenuRole::NoRole</enum>
</property>
</action>
</widget>
<customwidgets>
<customwidget>
<class>ParameterTree</class>
<extends>QWidget</extends>
<header>pyqtgraph.parametertree</header>
<container>1</container>
</customwidget>
<customwidget>
<class>LivePlotWidget</class>
<extends>QWidget</extends>
<header>pglive.sources.live_plot_widget</header>
<container>1</container>
</customwidget>
<customwidget>
<class>QtWaitingSpinner</class>
<extends>QWidget</extends>
<header>pytec.gui.view.waitingspinnerwidget</header>
<container>1</container>
</customwidget>
</customwidgets>
<resources/>
<connections/>
</ui>

View File

@ -0,0 +1,215 @@
import logging
from PyQt6 import QtWidgets, QtGui, QtCore
from PyQt6.QtCore import pyqtSignal, pyqtSlot, QSignalBlocker
from qasync import asyncSlot
from pytec.gui.view.net_settings_input_diag import NetSettingsInputDiag
from pytec.gui.model.thermostat import ThermostatConnectionState
class ThermostatSettingsMenu(QtWidgets.QMenu):
def __init__(self, thermostat, info_box, style):
super().__init__()
self._thermostat = thermostat
self._info_box = info_box
self._style = style
self.setTitle("Thermostat settings")
self.hw_rev_data = dict()
self._thermostat.hw_rev_update.connect(self.hw_rev)
self._thermostat.connection_state_update.connect(
self.thermostat_state_change_handler
)
self.fan_group = QtWidgets.QWidget()
self.fan_group.setEnabled(False)
self.fan_group.setMinimumSize(QtCore.QSize(40, 0))
self.fan_layout = QtWidgets.QHBoxLayout(self.fan_group)
self.fan_layout.setSpacing(9)
self.fan_lbl = QtWidgets.QLabel(parent=self.fan_group)
self.fan_lbl.setMinimumSize(QtCore.QSize(40, 0))
self.fan_lbl.setMaximumSize(QtCore.QSize(40, 16777215))
self.fan_lbl.setBaseSize(QtCore.QSize(40, 0))
self.fan_layout.addWidget(self.fan_lbl)
self.fan_power_slider = QtWidgets.QSlider(parent=self.fan_group)
self.fan_power_slider.setMinimumSize(QtCore.QSize(200, 0))
self.fan_power_slider.setMaximumSize(QtCore.QSize(200, 16777215))
self.fan_power_slider.setBaseSize(QtCore.QSize(200, 0))
self.fan_power_slider.setRange(1, 100)
self.fan_power_slider.setOrientation(QtCore.Qt.Orientation.Horizontal)
self.fan_layout.addWidget(self.fan_power_slider)
self.fan_auto_box = QtWidgets.QCheckBox(parent=self.fan_group)
self.fan_auto_box.setMinimumSize(QtCore.QSize(70, 0))
self.fan_auto_box.setMaximumSize(QtCore.QSize(70, 16777215))
self.fan_layout.addWidget(self.fan_auto_box)
self.fan_pwm_warning = QtWidgets.QLabel(parent=self.fan_group)
self.fan_pwm_warning.setMinimumSize(QtCore.QSize(16, 0))
self.fan_layout.addWidget(self.fan_pwm_warning)
self.fan_power_slider.valueChanged.connect(self.fan_set_request)
self.fan_auto_box.stateChanged.connect(self.fan_auto_set_request)
self._thermostat.fan_update.connect(self.fan_update)
self.fan_lbl.setToolTip("Adjust the fan")
self.fan_lbl.setText("Fan:")
self.fan_auto_box.setText("Auto")
fan = QtWidgets.QWidgetAction(self)
fan.setDefaultWidget(self.fan_group)
self.addAction(fan)
self.fan = fan
self.actionReset = QtGui.QAction("Reset Thermostat", self)
self.actionReset.triggered.connect(self.reset_request)
self.addAction(self.actionReset)
self.actionEnter_DFU_Mode = QtGui.QAction("Enter DFU Mode", self)
self.actionEnter_DFU_Mode.triggered.connect(self.dfu_request)
self.addAction(self.actionEnter_DFU_Mode)
self.actionnet_settings_input_diag = QtGui.QAction("Set IPV4 Settings", self)
self.actionnet_settings_input_diag.triggered.connect(self.net_settings_request)
self.addAction(self.actionnet_settings_input_diag)
@asyncSlot(bool)
async def load(_):
await self._thermostat.load_cfg()
self._info_box.display_info_box(
"Config loaded", "All channel configs have been loaded from flash."
)
self.actionLoad_all_configs = QtGui.QAction("Load Config", self)
self.actionLoad_all_configs.triggered.connect(load)
self.addAction(self.actionLoad_all_configs)
@asyncSlot(bool)
async def save(_):
await self._thermostat.save_cfg()
self._info_box.display_info_box(
"Config saved", "All channel configs have been saved to flash."
)
self.actionSave_all_configs = QtGui.QAction("Save Config", self)
self.actionSave_all_configs.triggered.connect(save)
self.addAction(self.actionSave_all_configs)
def about_thermostat():
QtWidgets.QMessageBox.about(
self,
"About Thermostat",
f"""
<h1>Sinara 8451 Thermostat v{self.hw_rev_data['rev']['major']}.{self.hw_rev_data['rev']['minor']}</h1>
<br>
<h2>Settings:</h2>
Default fan curve:
a = {self.hw_rev_data['settings']['fan_k_a']},
b = {self.hw_rev_data['settings']['fan_k_b']},
c = {self.hw_rev_data['settings']['fan_k_c']}
<br>
Fan PWM range:
{self.hw_rev_data['settings']['min_fan_pwm']} \u2013 {self.hw_rev_data['settings']['max_fan_pwm']}
<br>
Fan PWM frequency: {self.hw_rev_data['settings']['fan_pwm_freq_hz']} Hz
<br>
Fan available: {self.hw_rev_data['settings']['fan_available']}
<br>
Fan PWM recommended: {self.hw_rev_data['settings']['fan_pwm_recommended']}
""",
)
self.actionAbout_Thermostat = QtGui.QAction("About Thermostat", self)
self.actionAbout_Thermostat.triggered.connect(about_thermostat)
self.addAction(self.actionAbout_Thermostat)
@pyqtSlot("QVariantMap")
def fan_update(self, fan_settings):
logging.debug(fan_settings)
if fan_settings is None:
return
with QSignalBlocker(self.fan_power_slider):
self.fan_power_slider.setValue(
fan_settings["fan_pwm"] or 100 # 0 = PWM off = full strength
)
with QSignalBlocker(self.fan_auto_box):
self.fan_auto_box.setChecked(fan_settings["auto_mode"])
def set_fan_pwm_warning(self):
if self.fan_power_slider.value() != 100:
pixmapi = getattr(QtWidgets.QStyle.StandardPixmap, "SP_MessageBoxWarning")
icon = self._style.standardIcon(pixmapi)
self.fan_pwm_warning.setPixmap(icon.pixmap(16, 16))
self.fan_pwm_warning.setToolTip(
"Throttling the fan (not recommended on this hardware rev)"
)
else:
self.fan_pwm_warning.setPixmap(QtGui.QPixmap())
self.fan_pwm_warning.setToolTip("")
@pyqtSlot(ThermostatConnectionState)
def thermostat_state_change_handler(self, state):
if state == ThermostatConnectionState.DISCONNECTED:
self.fan_pwm_warning.setPixmap(QtGui.QPixmap())
self.fan_pwm_warning.setToolTip("")
@pyqtSlot("QVariantMap")
def hw_rev(self, hw_rev):
self.hw_rev_data = hw_rev
self.fan_group.setEnabled(self.hw_rev_data["settings"]["fan_available"])
@asyncSlot(int)
async def fan_set_request(self, value):
assert self._thermostat.connected()
if self.fan_auto_box.isChecked():
with QSignalBlocker(self.fan_auto_box):
self.fan_auto_box.setChecked(False)
await self._thermostat.set_fan(value)
if not self.hw_rev_data["settings"]["fan_pwm_recommended"]:
self.set_fan_pwm_warning()
@asyncSlot(int)
async def fan_auto_set_request(self, enabled):
assert self._thermostat.connected()
if enabled:
await self._thermostat.set_fan("auto")
self.fan_update(await self._thermostat.get_fan())
else:
await self.thermostat.set_fan(
self.fan_power_slider.value()
)
@asyncSlot(bool)
async def reset_request(self, _):
assert self._thermostat.connected()
await self._thermostat.reset()
await self._thermostat.end_session()
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
@asyncSlot(bool)
async def dfu_request(self, _):
assert self._thermostat.connected()
await self._thermostat.dfu()
await self._thermostat.end_session()
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
@asyncSlot(bool)
async def net_settings_request(self, _):
assert self._thermostat.connected()
ipv4 = await self._thermostat.get_ipv4()
self.net_settings_input_diag = NetSettingsInputDiag(ipv4["addr"])
self.net_settings_input_diag.set_ipv4_act.connect(self.set_net_settings_request)
@asyncSlot(str)
async def set_net_settings_request(self, ipv4_settings):
assert self._thermostat.connected()
await self._thermostat.set_ipv4(ipv4_settings)
await self._thermostat.end_session()
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED

View File

@ -0,0 +1,212 @@
"""
The MIT License (MIT)
Copyright (c) 2012-2014 Alexander Turkin
Copyright (c) 2014 William Hallatt
Copyright (c) 2015 Jacob Dawid
Copyright (c) 2016 Luca Weiss
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import math
from PyQt6.QtCore import *
from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
class QtWaitingSpinner(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
# WAS IN initialize()
self._color = QColor(Qt.GlobalColor.black)
self._roundness = 100.0
self._minimumTrailOpacity = 3.14159265358979323846
self._trailFadePercentage = 80.0
self._revolutionsPerSecond = 1.57079632679489661923
self._numberOfLines = 20
self._lineLength = 5
self._lineWidth = 2
self._innerRadius = 5
self._currentCounter = 0
self._timer = QTimer(self)
self._timer.timeout.connect(self.rotate)
self.updateSize()
self.updateTimer()
# END initialize()
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
def paintEvent(self, QPaintEvent):
painter = QPainter(self)
painter.fillRect(self.rect(), Qt.GlobalColor.transparent)
painter.setRenderHint(QPainter.RenderHint.Antialiasing, True)
if self._currentCounter >= self._numberOfLines:
self._currentCounter = 0
painter.setPen(Qt.PenStyle.NoPen)
for i in range(0, self._numberOfLines):
painter.save()
painter.translate(
self._innerRadius + self._lineLength,
self._innerRadius + self._lineLength,
)
rotateAngle = float(360 * i) / float(self._numberOfLines)
painter.rotate(rotateAngle)
painter.translate(self._innerRadius, 0)
distance = self.lineCountDistanceFromPrimary(
i, self._currentCounter, self._numberOfLines
)
color = self.currentLineColor(
distance,
self._numberOfLines,
self._trailFadePercentage,
self._minimumTrailOpacity,
self._color,
)
painter.setBrush(color)
painter.drawRoundedRect(
QRect(0, int(-self._lineWidth / 2), self._lineLength, self._lineWidth),
self._roundness,
self._roundness,
Qt.SizeMode.RelativeSize,
)
painter.restore()
def start(self):
if not self._timer.isActive():
self._timer.start()
self._currentCounter = 0
def stop(self):
if self._timer.isActive():
self._timer.stop()
self._currentCounter = 0
def setNumberOfLines(self, lines):
self._numberOfLines = lines
self._currentCounter = 0
self.updateTimer()
def setLineLength(self, length):
self._lineLength = length
self.updateSize()
def setLineWidth(self, width):
self._lineWidth = width
self.updateSize()
def setInnerRadius(self, radius):
self._innerRadius = radius
self.updateSize()
def color(self):
return self._color
def roundness(self):
return self._roundness
def minimumTrailOpacity(self):
return self._minimumTrailOpacity
def trailFadePercentage(self):
return self._trailFadePercentage
def revolutionsPersSecond(self):
return self._revolutionsPerSecond
def numberOfLines(self):
return self._numberOfLines
def lineLength(self):
return self._lineLength
def lineWidth(self):
return self._lineWidth
def innerRadius(self):
return self._innerRadius
def setRoundness(self, roundness):
self._roundness = max(0.0, min(100.0, roundness))
def setColor(self, color=Qt.GlobalColor.black):
self._color = QColor(color)
def setRevolutionsPerSecond(self, revolutionsPerSecond):
self._revolutionsPerSecond = revolutionsPerSecond
self.updateTimer()
def setTrailFadePercentage(self, trail):
self._trailFadePercentage = trail
def setMinimumTrailOpacity(self, minimumTrailOpacity):
self._minimumTrailOpacity = minimumTrailOpacity
def rotate(self):
self._currentCounter += 1
if self._currentCounter >= self._numberOfLines:
self._currentCounter = 0
self.update()
def updateSize(self):
self.size = (self._innerRadius + self._lineLength) * 2
self.setFixedSize(self.size, self.size)
def updateTimer(self):
self._timer.setInterval(
int(1000 / (self._numberOfLines * self._revolutionsPerSecond))
)
def lineCountDistanceFromPrimary(self, current, primary, totalNrOfLines):
distance = primary - current
if distance < 0:
distance += totalNrOfLines
return distance
def currentLineColor(
self, countDistance, totalNrOfLines, trailFadePerc, minOpacity, colorinput
):
color = QColor(colorinput)
if countDistance == 0:
return color
minAlphaF = minOpacity / 100.0
distanceThreshold = int(math.ceil((totalNrOfLines - 1) * trailFadePerc / 100.0))
if countDistance > distanceThreshold:
color.setAlphaF(minAlphaF)
else:
alphaDiff = color.alphaF() - minAlphaF
gradient = alphaDiff / float(distanceThreshold + 1)
resultAlpha = color.alphaF() - gradient * countDistance
# If alpha is out of bounds, clip it.
resultAlpha = min(1.0, max(0.0, resultAlpha))
color.setAlphaF(resultAlpha)
return color
if __name__ == "__main__":
app = QApplication([])
waiting_spinner = QtWaitingSpinner()
waiting_spinner.show()
waiting_spinner.start()
app.exec()

View File

@ -0,0 +1,50 @@
from PyQt6.QtCore import pyqtSlot, QObject
from PyQt6 import QtWidgets, QtGui
class ZeroLimitsWarningView(QObject):
def __init__(self, thermostat, style, limit_warning):
super().__init__()
self._thermostat = thermostat
self._thermostat.pwm_update.connect(self.set_limits_warning)
self._lbl = limit_warning
self._style = style
@pyqtSlot(list)
def set_limits_warning(self, pwm_data: list):
channels_zeroed_limits = [set() for i in range(self._thermostat.NUM_CHANNELS)]
for pwm_params in pwm_data:
channel = pwm_params["channel"]
for limit in "max_i_pos", "max_i_neg", "max_v":
if pwm_params[limit]["value"] == 0.0:
channels_zeroed_limits[channel].add(limit)
channel_disabled = [False, False]
report_str = "The following output limit(s) are set to zero:\n"
for ch, zeroed_limits in enumerate(channels_zeroed_limits):
if {"max_i_pos", "max_i_neg"}.issubset(zeroed_limits):
report_str += "Max Cooling Current, Max Heating Current"
channel_disabled[ch] = True
if "max_v" in zeroed_limits:
if channel_disabled[ch]:
report_str += ", "
report_str += "Max Voltage Difference"
channel_disabled[ch] = True
if channel_disabled[ch]:
report_str += f" for Channel {ch}\n"
report_str += (
"\nThese limit(s) are restricting the channel(s) from producing current."
)
if True in channel_disabled:
pixmapi = getattr(QtWidgets.QStyle.StandardPixmap, "SP_MessageBoxWarning")
icon = self._style.standardIcon(pixmapi)
self._lbl.setPixmap(icon.pixmap(16, 16))
self._lbl.setToolTip(report_str)
else:
self._lbl.setPixmap(QtGui.QPixmap())
self._lbl.setToolTip(None)

254
pytec/tec_qt.py Normal file
View File

@ -0,0 +1,254 @@
"""GUI for the Sinara 8451 Thermostat"""
import json
import asyncio
import logging
import argparse
import importlib.resources
import qasync
from qasync import asyncSlot, asyncClose
from autotune import PIDAutotuneState
from PyQt6 import QtWidgets, QtGui, uic
from PyQt6.QtCore import pyqtSlot
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
from pytec.gui.model.pid_autotuner import PIDAutoTuner
from pytec.gui.view.zero_limits_warning_view import ZeroLimitsWarningView
from pytec.gui.view.thermostat_settings_menu import ThermostatSettingsMenu
from pytec.gui.view.connection_details_menu import ConnectionDetailsMenu
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
from pytec.gui.view.live_plot_view import LiveDataPlotter
from pytec.gui.view.ctrl_panel import CtrlPanel
from pytec.gui.view.info_box import InfoBox
def get_argparser():
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
parser.add_argument(
"--connect",
default=None,
action="store_true",
help="Automatically connect to the specified Thermostat in host:port format",
)
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
parser.add_argument(
"-l",
"--log",
dest="logLevel",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
parser.add_argument(
"-p",
"--param_tree",
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
help="Param Tree Description JSON File",
)
return parser
class MainWindow(QtWidgets.QMainWindow):
NUM_CHANNELS = 2
def __init__(self, args):
super().__init__()
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
uic.loadUi(ui_file_path, self)
self._info_box = InfoBox()
# Models
self._thermostat = Thermostat(self, self.report_refresh_spin.value())
self._connecting_task = None
self._thermostat.connection_state_update.connect(
self._on_connection_state_changed
)
self._autotuners = PIDAutoTuner(self, self._thermostat, 2)
self._autotuners.autotune_state_changed.connect(
self._on_pid_autotune_state_changed
)
# Handlers for disconnections
async def autotune_disconnect():
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
await self._autotuners.stop_pid_from_running(ch)
self._thermostat.disconnect_cb = autotune_disconnect
@pyqtSlot()
def handle_connection_error():
self._info_box.display_info_box(
"Connection Error", "Thermostat connection lost. Is it unplugged?"
)
self._thermostat.connection_error.connect(handle_connection_error)
# Control Panel
def get_ctrl_panel_config(args):
with open(args.param_tree, "r", encoding="utf-8") as f:
return json.load(f)["ctrl_panel"]
self._ctrl_panel_view = CtrlPanel(
self._thermostat,
self._autotuners,
self._info_box,
[self.ch0_tree, self.ch1_tree],
get_ctrl_panel_config(args),
)
# Graphs
self._channel_graphs = LiveDataPlotter(
self._thermostat,
[
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
for ch in range(self.NUM_CHANNELS)
],
)
# Bottom bar menus
self.connection_details_menu = ConnectionDetailsMenu(
self._thermostat, self.connect_btn
)
self.connect_btn.setMenu(self.connection_details_menu)
self._thermostat_settings_menu = ThermostatSettingsMenu(
self._thermostat, self._info_box, self.style()
)
self.thermostat_settings.setMenu(self._thermostat_settings_menu)
self._plot_options_menu = PlotOptionsMenu(self._channel_graphs)
self.plot_settings.setMenu(self._plot_options_menu)
# Status line
self._zero_limits_warning_view = ZeroLimitsWarningView(
self._thermostat, self.style(), self.limits_warning
)
self.loading_spinner.hide()
self.report_apply_btn.clicked.connect(
lambda: self._thermostat.set_update_s(self.report_refresh_spin.value())
)
@asyncClose
async def closeEvent(self, _event):
try:
await self._thermostat.end_session()
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
except:
pass
@pyqtSlot(ThermostatConnectionState)
def _on_connection_state_changed(self, state):
self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
self.thermostat_settings.setEnabled(
state == ThermostatConnectionState.CONNECTED
)
self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
match state:
case ThermostatConnectionState.CONNECTED:
self.connect_btn.setText("Disconnect")
self.status_lbl.setText(
"Connected to Thermostat v"
f"{self._thermostat.hw_rev['rev']['major']}."
f"{self._thermostat.hw_rev['rev']['minor']}"
)
case ThermostatConnectionState.CONNECTING:
self.connect_btn.setText("Stop")
self.status_lbl.setText("Connecting...")
case ThermostatConnectionState.DISCONNECTED:
self.connect_btn.setText("Connect")
self.status_lbl.setText("Disconnected")
@pyqtSlot(int, PIDAutotuneState)
def _on_pid_autotune_state_changed(self, _ch, _state):
autotuning_channels = []
for ch in range(self.NUM_CHANNELS):
if self._autotuners.get_state(ch) in {
PIDAutotuneState.STATE_READY,
PIDAutotuneState.STATE_RELAY_STEP_UP,
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
}:
autotuning_channels.append(ch)
if len(autotuning_channels) == 0:
self.background_task_lbl.setText("Ready.")
self.loading_spinner.hide()
self.loading_spinner.stop()
else:
self.background_task_lbl.setText(
f"Autotuning channel {autotuning_channels}..."
)
self.loading_spinner.start()
self.loading_spinner.show()
@asyncSlot()
async def on_connect_btn_clicked(self):
match self._thermostat.connection_state:
case ThermostatConnectionState.DISCONNECTED:
self._connecting_task = asyncio.current_task()
self._thermostat.connection_state = ThermostatConnectionState.CONNECTING
await self._thermostat.start_session(
host=self.connection_details_menu.host_set_line.text(),
port=self.connection_details_menu.port_set_spin.value(),
)
self._connecting_task = None
self._thermostat.connection_state = ThermostatConnectionState.CONNECTED
self._thermostat.start_watching()
case ThermostatConnectionState.CONNECTING:
self._connecting_task.cancel()
self._connecting_task = None
await self._thermostat.end_session()
self._thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
case ThermostatConnectionState.CONNECTED:
await self._thermostat.end_session()
self._thermostat.connection_state = (
ThermostatConnectionState.DISCONNECTED
)
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
app.setWindowIcon(
QtGui.QIcon(
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
)
)
main_window = MainWindow(args)
main_window.show()
if args.connect:
if args.HOST:
main_window.connection_details_menu.host_set_line.setText(args.HOST)
if args.PORT:
main_window.connection_details_menu.port_set_spin.setValue(int(args.PORT))
main_window.connect_btn.click()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == "__main__":
main()