1
0
forked from M-Labs/kirdy
kirdy-firmware/pykirdy/kirdy_qt.py
2024-09-04 15:39:27 +08:00

970 lines
48 KiB
Python

from PyQt6 import QtWidgets, QtGui, QtCore, uic
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
import pyqtgraph.parametertree.parameterTypes as pTypes
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
import pyqtgraph as pg
pg.setConfigOptions(antialias=True)
from pyqtgraph import mkPen
from pyqtgraph.functions import siEval, siParse, SI_PREFIX_EXPONENTS, SI_PREFIXES
from pglive.sources.live_axis_range import LiveAxisRange
from pglive.sources.data_connector import DataConnector
from pglive.kwargs import Axis, LeadingLine
from pglive.sources.live_plot import LiveLinePlot
from pglive.sources.live_plot_widget import LivePlotWidget
from pglive.sources.live_axis import LiveAxis
import re
import sys
import os
import argparse
import logging
import asyncio
from driver.kirdy import Kirdy as Kirdy_Driver
import qasync
from qasync import asyncClose, asyncSlot
from collections import deque
from datetime import datetime, timezone, timedelta
from time import time
from typing import Any, Optional, List
from ui.ui_conn_settings_form import Ui_Conn_Settings_Form
from ui.ui_update_network_settings_form import Ui_Update_Network_Settings_Form
from dateutil import tz
import math
import socket
from pid_autotune import PIDAutotune, PIDAutotuneState
import importlib.resources
COMMON_ERROR_MSG = "Connection Timeout. Disconnecting."
FLOAT_REGEX = re.compile(r'(?P<number>[+-]?((((\d+(\.\d*)?)|(\d*\.\d+))([eE][+-]?\d+)?)|((?i:nan)|(inf))))\s*((?P<siPrefix>[u' + SI_PREFIXES + r']?)(?P<suffix>[\w°℃].*))?$')
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ master")
parser.add_argument("--connect", default=None, action="store_true",
help="Automatically connect to the specified Thermostat in IP:port format")
parser.add_argument('IP', metavar="ip", default=None, nargs='?')
parser.add_argument('PORT', metavar="port", default=None, nargs='?')
parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
help="Set the logging level")
return parser
def siConvert(val, suffix, typ=float):
"""
Convert a value written in SI notation according to given Si Scale.
Example::
siConvert(0.1, "mA") # returns 100
"""
val, siprefix, suffix = siParse(str(val)+suffix, FLOAT_REGEX)
v = typ(val)
n = -SI_PREFIX_EXPONENTS[siprefix] if siprefix != '' else 0
if n > 0:
return v * 10**n
elif n < 0:
# this case makes it possible to use Decimal objects here
return v / 10**-n
else:
return v
class Kirdy(QObject):
connected_sig = pyqtSignal(bool)
setting_update_sig = pyqtSignal(dict)
report_update_sig = pyqtSignal(dict)
def __init__(self, parent, kirdy, _poll_interval):
super().__init__(parent)
self._poll_interval = _poll_interval
self._kirdy = kirdy
self._kirdy.set_connected_sig(self.connected_sig)
self.connected_sig.connect(self.start_polling)
self.connected_sig.connect(self.connected_setup)
self._kirdy.set_report_sig(self.report_update_sig)
self._timer = QtCore.QBasicTimer()
def connected(self):
return self._kirdy.connected()
def connecting(self):
return self._kirdy.connecting()
def start_session(self, host, port):
self._kirdy.start_session(host=host, port=port)
def end_session(self):
if self._timer.isActive():
self._timer.stop()
asyncio.get_running_loop().create_task(self._kirdy.end_session())
@pyqtSlot(bool)
def connected_setup(self, connected):
if connected:
self._kirdy.task_dispatcher(self._kirdy.device.set_active_report_mode(True))
self._kirdy._report_mode_on = True
def timerEvent(self, event):
self._kirdy.task_dispatcher(self._kirdy.device.get_settings_summary(sig=self.setting_update_sig))
@pyqtSlot(bool)
def start_polling(self, start):
if start:
if not(self._timer.isActive()):
self._timer.start(int(self._poll_interval*1000), self)
else:
logging.debug("Kirdy Polling Timer has been started already.")
else:
self._timer.stop()
@pyqtSlot(float)
def set_update_s(self, interval):
self._poll_interval = interval
self.update_polling_rate()
def update_polling_rate(self):
if self._timer.isActive():
self._timer.stop()
self.start_polling()
else:
logging.debug("Attempt to update polling timer when it is stopped")
class Graphs:
def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000):
self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph]
self.connectors = []
self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r'))
self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r'))
self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r'))
self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r'))
self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r'))
self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g'))
self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value}', pen=pg.mkPen('g'))
# Render the temperature setpoint line on top of the temperature being plotted
self._temp_setpoint_line.setZValue(10)
self._temp_setpoint_line.setVisible(False)
def tickStrings(values: List, scale: float, spacing: float) -> List:
return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values]
for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph:
time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="")
# Display the relative ts in custom %H:%M:%S format without local timezone
time_axis.tickStrings = tickStrings
# Prevent scaling prefix being added to the back fo axis label
time_axis.autoSIPrefix = False
time_axis.showLabel()
graph.setAxisItems({'bottom': time_axis})
graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'})
#TODO: x_range should not be updated on every tick
graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900)
graph.x_range_controller.crop_left_offset_to_data = True
# Enable linking of axes in the graph widget's context menu
graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title
self.max_samples = max_samples
ld_i_set_axis = LiveAxis('left', text="Current", units="A")
ld_i_set_axis.showLabel()
ld_i_set_graph.setAxisItems({'left': ld_i_set_axis})
ld_i_set_graph.addItem(self._ld_i_set_plot)
self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, max_points=self.max_samples)
self.connectors += [self.ld_i_set_connector]
pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W")
pd_mon_pwr_axis.showLabel()
pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis})
pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot)
self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, max_points=self.max_samples)
self.connectors += [self.pd_mon_pwr_connector]
tec_temp_axis = LiveAxis('left', text="Temperature", units="")
tec_temp_axis.showLabel()
tec_temp_graph.setAxisItems({'left': tec_temp_axis})
tec_temp_graph.addItem(self._tec_setpoint_plot)
tec_temp_graph.addItem(self._tec_temp_plot)
self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, max_points=1)
self.tec_temp_connector = DataConnector(self._tec_temp_plot, max_points=self.max_samples)
self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector]
tec_i_axis = LiveAxis('left', text="Current", units="A")
tec_i_axis.showLabel()
tec_i_graph.setAxisItems({'left': tec_i_axis})
tec_i_graph.addLegend(brush=(50, 50, 200, 150))
tec_i_graph.y_range_controller = LiveAxisRange(fixed_range=[-1.0, 1.0])
tec_i_graph.addItem(self._tec_i_target_plot)
tec_i_graph.addItem(self._tec_i_measure_plot)
self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, max_points=self.max_samples)
self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, max_points=self.max_samples)
self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector]
def set_max_samples(self, max_samples):
self.max_samples = max_samples
for connector in self.connectors:
with connector.data_lock:
connector.max_points = self.max_samples
connector.x = deque(maxlen=int(connector.max_points))
connector.y = deque(maxlen=int(connector.max_points))
def plot_append(self, report):
try:
ld_i_set = report['laser']['ld_i_set']
pd_pwr = report['laser']['pd_pwr']
tec_i_set = report['thermostat']['i_set']
tec_i_measure = report['thermostat']['tec_i']
tec_temp = report['thermostat']['temperature']
ts = report['ts']
self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts)
if pd_pwr is not None:
self._pd_mon_pwr_plot.show()
self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts)
else:
self._pd_mon_pwr_plot.hide()
self.pd_mon_pwr_connector.cb_append_data_point(0.0, ts)
if tec_temp is None:
self._tec_temp_plot.hide()
tec_temp = -273.15
else:
self._tec_temp_plot.show()
self.tec_temp_connector.cb_append_data_point(tec_temp, ts)
if self._temp_setpoint_line.isVisible():
self.tec_setpoint_connector.cb_append_data_point(self._temp_setpoint_line.value(), ts)
else:
self.tec_setpoint_connector.cb_append_data_point(tec_temp, ts)
if tec_i_measure is not None:
self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts)
self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts)
except Exception as e:
logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True)
def clear_data_pts(self):
for connector in self.connectors:
connector.clear()
connector.resume()
def set_temp_setpoint_line(self, temp=None, visible=None):
if visible is not None:
self._temp_setpoint_line.setVisible(visible)
if temp is not None:
self._temp_setpoint_line.setValue(temp)
# PyQtGraph normally does not update this text when the line
# is not visible, so make sure that the temperature label
# gets updated always, and doesn't stay at an old value.
self._temp_setpoint_line.label.setText(f"{temp}", color='g')
class MutexParameter(pTypes.ListParameter):
"""
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
The ordering of the list items determines which children will be visible.
"""
def __init__(self, **opts):
super().__init__(**opts)
self.sigValueChanged.connect(self.show_chosen_child)
self.sigValueChanged.emit(self, self.opts['value'])
def _get_param_from_value(self, value):
if isinstance(self.opts['limits'], dict):
values_list = list(self.opts['limits'].values())
else:
values_list = self.opts['limits']
return self.children()[values_list.index(value)]
@pyqtSlot(object, object)
def show_chosen_child(self, value):
for param in self.children():
param.hide()
child_to_show = self._get_param_from_value(value.value())
child_to_show.show()
if child_to_show.opts.get('triggerOnShow', None):
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
registerParameterType('mutex', MutexParameter)
class UpdateNetSettingsForm(QtWidgets.QDialog, Ui_Update_Network_Settings_Form):
def __init__(self):
super().__init__()
self.setupUi(self)
def get_net_settings(self):
try:
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}"
socket.inet_aton(ip_addr)
socket.inet_aton(gateway_addr)
return {
"ip_addr": ip_addr,
"gateway_addr": gateway_addr,
"prefix_len": int(self.prefix_len_in.text()),
"port": int(self.port_in.text()),
}
except (OSError, ValueError):
return None
class ConnSettingsForm(QtWidgets.QDialog, Ui_Conn_Settings_Form):
def __init__(self):
super().__init__()
self.setupUi(self)
def get_net_settings(self):
try:
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
socket.inet_aton(ip_addr)
return {
"ip_addr": ip_addr,
"port": int(self.port_in.text())
}
except (OSError, ValueError):
return None
class MainWindow(QtWidgets.QMainWindow):
"""The maximum number of sample points to store."""
DEFAULT_MAX_SAMPLES = 1000
DEFAULT_IP_ADDR = '192.168.1.128'
DEFAULT_PORT = 1337
LASER_DIODE_STATUS = [
{'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [
{'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False},
]}
]
LASER_DIODE_PARAMETERS = [
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
{'name': 'LD Current Set', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True, "compactHeight": False},
{'name': 'PD Current', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True, "compactHeight": False},
{'name': 'PD Power', 'type': 'float', 'suffix': 'W', 'siPrefix': True, 'readonly': True, "compactHeight": False},
{'name': 'LF Mod Termination (50 Ohm)', 'type': 'list', 'limits': ['On', 'Off'], 'readonly': True, "compactHeight": False}
]},
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
{'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 300),
'unit': 'mA', 'lock': False, 'target': 'laser', 'action': 'set_i', "compactHeight": False},
{'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short', "compactHeight": False},
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on', "compactHeight": False},
]},
{'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [
{'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, float("inf")),
'unit': 'mW', 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit', "compactHeight": False},
{'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, float("inf")),
'unit': 'mA/W', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity', "compactHeight": False},
{'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, float("inf")),
'unit': 'uA', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current', "compactHeight": False},
]},
]
THERMOSTAT_STATUS = [
{'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [
{'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False},
]}
]
THERMOSTAT_PARAMETERS = [
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
{'name': 'Temperature', 'type': 'float', 'format': '{value:.4f}', 'readonly': True, "compactHeight": False},
{'name': 'Current through TEC', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'decimals': 6, 'readonly': True, "compactHeight": False},
]},
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
{'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'],
'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [
{'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 1, 'limits': (-1000, 1000), 'triggerOnShow': True, 'decimals': 6,
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out', "compactHeight": False},
{'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.0001, 'limits': (-273, 300), 'format': '{value:.4f}',
'unit': '', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint', "compactHeight": False},
]},
{'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [
{'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1000),
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i', "compactHeight": False},
{'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1000),
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i', "compactHeight": False},
{'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 4),
'unit': 'V', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v', "compactHeight": False},
]},
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'},
]},
# TODO Temperature ADC Filter Settings
{'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [
{'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
'unit': '', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit', "compactHeight": False},
{'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
'unit': '', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_lower_limit', "compactHeight": False},
]},
{'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [
{'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
'unit': '', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0', "compactHeight": False},
{'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6,
'unit': '', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0', "compactHeight": False},
{'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'decimals': 4,
'unit': 'K', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta', "compactHeight": False},
]},
{'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [
{'name': 'Kp', 'type': 'float', 'step': 0.1, 'decimals': 16, 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp', "compactHeight": False},
{'name': 'Ki', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki', "compactHeight": False},
{'name': 'Kd', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd', "compactHeight": False},
{'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [
{'name': 'Minimum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6,
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min', "compactHeight": False},
{'name': 'Maximum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6,
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max', "compactHeight": False},
]},
{'name': 'PID Auto Tune', 'expanded': False, 'type': 'group', 'children': [
{'name': 'Target Temperature', 'type': 'float', 'value': 20.0, 'step': 0.1, 'unit': '', 'format': '{value:.4f}', "compactHeight": False},
{'name': 'Test Current', 'type': 'float', 'value': 1000, 'decimals': 6, 'step': 100, 'limits': (-1000, 1000), 'unit': 'mA', "compactHeight": False},
{'name': 'Temperature Swing', 'type': 'float', 'value': 0.0, 'step': 0.0001, 'prefix': '±', 'unit': '', 'format': '{value:.4f}', "compactHeight": False},
{'name': 'Lookback', 'type': 'float', 'value': 5.0, 'step': 0.1, 'unit': 's', 'format': '{value:.4f}', "compactHeight": False},
{'name': 'Run', 'type': 'action', 'tip': 'Run'},
]},
]},
]
def __init__(self, args):
super(MainWindow, self).__init__()
self.kirdy = Kirdy_Driver()
ui_file_path = importlib.resources.files("ui").joinpath("kirdy_qt.ui")
uic.loadUi(ui_file_path, self)
self.info_box = QtWidgets.QMessageBox()
self.info_box.setIcon(QtWidgets.QMessageBox.Icon.Information)
# Load Global QT Style Sheet Settings
qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss")
with open(qss,"r") as fh:
self.setStyleSheet(fh.read())
self.ip_addr = self.DEFAULT_IP_ADDR
self.port = self.DEFAULT_PORT
self.conn_settings_form = ConnSettingsForm()
self.conn_settings_form.accepted.connect(self.start_connecting)
self.update_net_settings_form = UpdateNetSettingsForm()
self.update_net_settings_form.accepted.connect(self.update_net_settings)
self.max_samples = self.DEFAULT_MAX_SAMPLES
self.autotuner = PIDAutotune(25)
self.setup_menu_bar()
self._set_up_ctrl_btns()
self._set_up_plot_menu()
def _setValuewithLock(self, value):
if not self.opts.get("lock", None):
if self.opts.get("unit", None) is not None:
self.setValue(siConvert(value, self.opts.get("unit", None)))
else:
self.setValue(value)
Parameter.setValuewithLock = _setValuewithLock
def _add_unit_to_title(param_tree):
def _traverse(param):
if param["type"] == "group" or param["type"] == "mutex":
for param in param["children"]:
_add_unit_to_title(param)
else:
if "unit" in param.keys():
if not("title" in param.keys()):
param["title"] = param["name"]
param["title"] = param["title"] + " ({:})".format(param["unit"])
if isinstance(param_tree, list):
for param in param_tree:
_traverse(param)
else:
_traverse(param_tree)
_add_unit_to_title(self.LASER_DIODE_STATUS)
_add_unit_to_title(self.LASER_DIODE_PARAMETERS)
_add_unit_to_title(self.THERMOSTAT_STATUS)
_add_unit_to_title(self.THERMOSTAT_PARAMETERS)
self.params = [
Parameter.create(name=f"Laser Diode Status", type='group', value=0, children=self.LASER_DIODE_STATUS),
Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS),
Parameter.create(name=f"Thermostat Status", type='group', value=2, children=self.THERMOSTAT_STATUS),
Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS),
]
self._set_param_tree()
self.tec_i_graph.setTitle("TEC Current")
self.tec_temp_graph.setTitle("TEC Temperature")
self.ld_i_set_graph.setTitle("LD Current Set")
self.pd_mon_pwr_graph.setTitle("PD Mon Power")
self.connect_btn.clicked.connect(self.show_conn_settings_form)
self.kirdy_handler = Kirdy(self, self.kirdy, 1.0)
self.kirdy_handler.setting_update_sig.connect(self.update_ld_ctrl_panel_settings)
self.kirdy_handler.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings)
self.kirdy_handler.report_update_sig.connect(self.update_ld_ctrl_panel_readings)
self.kirdy_handler.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings)
self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples)
self.kirdy_handler.report_update_sig.connect(self.graphs.plot_append)
self.loading_spinner.hide()
self.kirdy_handler.connected_sig.connect(self._on_connection_changed)
def setup_menu_bar(self):
@pyqtSlot(bool)
def about_kirdy(_):
# TODO: Replace the hardware revision placeholder
QtWidgets.QMessageBox.about(
self,
"About Kirdy",
f"""
<h1>Sinara 1550 Kirdy v"major rev"."minor rev"</h1>
"""
)
self.menu_action_about_kirdy.triggered.connect(about_kirdy)
@pyqtSlot(bool)
def about_gui(_):
# TODO: Replace the hardware revision placeholder
QtWidgets.QMessageBox.about(
self,
"About Kirdy Control Panel",
f"""
<h1>Version: "Version"</h1>
"""
)
self.menu_action_about_gui.triggered.connect(about_gui)
@pyqtSlot(bool)
def dfu_mode(_):
self.kirdy.task_dispatcher(self.kirdy.device.dfu())
self.kirdy_handler.end_session()
self.menu_action_dfu_mode.triggered.connect(dfu_mode)
@pyqtSlot(bool)
def reset_kirdy(_):
self.kirdy.task_dispatcher(self.kirdy.device.hard_reset())
self.kirdy_handler.end_session()
self.menu_action_hard_reset.triggered.connect(reset_kirdy)
@pyqtSlot(bool)
def save_settings(_):
self.kirdy.task_dispatcher(self.kirdy.device.save_current_settings_to_flash())
saved = QtWidgets.QMessageBox(self)
saved.setWindowTitle("Config saved")
saved.setText(f"Laser diode and thermostat configs have been saved into flash.")
saved.setIcon(QtWidgets.QMessageBox.Icon.Information)
saved.show()
self.menu_action_save.triggered.connect(save_settings)
@pyqtSlot(bool)
def load_settings(_):
self.kirdy.task_dispatcher(self.kirdy.device.restore_settings_from_flash())
loaded = QtWidgets.QMessageBox(self)
loaded.setWindowTitle("Config loaded")
loaded.setText(f"Laser Diode and Thermostat configs have been loaded from flash.")
loaded.setIcon(QtWidgets.QMessageBox.Icon.Information)
loaded.show()
self.menu_action_load.triggered.connect(load_settings)
@pyqtSlot(bool)
def show_update_net_settings_form(_):
self.update_net_settings_form.retranslateUi(self.update_net_settings_form)
self.update_net_settings_form.show()
self.menu_action_update_net_settings.triggered.connect(show_update_net_settings_form)
def show_conn_settings_form(self):
ip_addr = self.ip_addr.split(".")
self.conn_settings_form.addr_in_0.setText(ip_addr[0])
self.conn_settings_form.addr_in_1.setText(ip_addr[1])
self.conn_settings_form.addr_in_2.setText(ip_addr[2])
self.conn_settings_form.addr_in_3.setText(ip_addr[3])
self.conn_settings_form.port_in.setText(str(self.port))
self.conn_settings_form.show()
def _set_up_ctrl_btns(self):
@pyqtSlot(bool)
def ld_pwr_on(_):
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True))
self.ld_pwr_on_btn.clicked.connect(ld_pwr_on)
@pyqtSlot(bool)
def ld_pwr_off(_):
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False))
self.ld_pwr_off_btn.clicked.connect(ld_pwr_off)
@pyqtSlot(bool)
def ld_clear_alarm(_):
self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm())
self.ld_clear_alarm_btn.clicked.connect(ld_clear_alarm)
@pyqtSlot(bool)
def tec_pwr_on(_):
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(True))
self.tec_pwr_on_btn.clicked.connect(tec_pwr_on)
@pyqtSlot(bool)
def tec_pwr_off(_):
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(False))
self.tec_pwr_off_btn.clicked.connect(tec_pwr_off)
@pyqtSlot(bool)
def tec_clear_alarm(_):
self.kirdy.task_dispatcher(self.kirdy.thermostat.clear_alarm())
self.tec_clear_alarm_btn.clicked.connect(tec_clear_alarm)
def _set_up_plot_menu(self):
self.plot_menu = QtWidgets.QMenu()
self.plot_menu.setTitle("Plot Settings")
clear = QtGui.QAction("Clear graphs", self.plot_menu)
clear.triggered.connect(self.clear_graphs)
self.plot_menu.addAction(clear)
self.plot_menu.clear = clear
self.samples_spinbox = QtWidgets.QSpinBox()
self.samples_spinbox.setRange(2, 100000)
self.samples_spinbox.setSuffix(' samples')
self.samples_spinbox.setValue(self.max_samples)
self.samples_spinbox.valueChanged.connect(self.set_max_samples)
limit_samples = QtWidgets.QWidgetAction(self.plot_menu)
limit_samples.setDefaultWidget(self.samples_spinbox)
self.plot_menu.addAction(limit_samples)
self.plot_menu.limit_samples = limit_samples
self.plot_settings.setMenu(self.plot_menu)
def _set_param_tree(self):
status = self.ld_status
status.setHeaderHidden(True)
status.setParameters(self.params[0], showTop=False)
tree = self.ld_tree
tree.setHeaderHidden(True)
tree.setParameters(self.params[1], showTop=False)
self.params[1].sigTreeStateChanged.connect(self.send_command)
status = self.tec_status
status.setHeaderHidden(True)
status.setParameters(self.params[2], showTop=False)
tree = self.tec_tree
tree.setHeaderHidden(True)
tree.setParameters(self.params[3], showTop=False)
self.params[3].sigTreeStateChanged.connect(self.send_command)
@asyncSlot()
async def autotune(param):
match self.autotuner.state():
case PIDAutotuneState.STATE_OFF:
settings = await self.kirdy.device.get_settings_summary()
self.autotuner.setParam(
param.parent().child('Target Temperature').value(),
param.parent().child('Test Current').value() / 1000,
param.parent().child('Temperature Swing').value(),
1.0 / settings['thermostat']['temp_adc_settings']['rate'],
param.parent().child('Lookback').value())
self.autotuner.setReady()
param.setOpts(title="Stop")
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_constant_current_control_mode())
self.kirdy_handler.report_update_sig.connect(self.autotune_tick)
self.loading_spinner.show()
self.loading_spinner.start()
self.background_task_lbl.setText("Autotuning")
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
self.autotuner.setOff()
param.setOpts(title="Run")
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0))
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
self.background_task_lbl.setText("Ready.")
self.loading_spinner.stop()
self.loading_spinner.hide()
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').sigActivated.connect(autotune)
@pyqtSlot(dict)
def autotune_tick(self, report):
match self.autotuner.state():
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
self.autotuner.run(report['thermostat']['temperature'], report['ts']/1000)
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(self.autotuner.output()))
case PIDAutotuneState.STATE_SUCCEEDED:
kp, ki, kd = self.autotuner.get_tec_pid()
self.autotuner.setOff()
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run")
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kp(kp))
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_ki(ki))
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kd(kd))
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_control_mode())
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_temperature_setpoint(self.params[3].child('PID Config', 'PID Auto Tune', 'Target Temperature').value()))
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
self.background_task_lbl.setText("Ready.")
self.loading_spinner.stop()
self.loading_spinner.hide()
self.info_box.setWindowTitle("PID AutoTune Success")
self.info_box.setText("PID Config has been loaded to Thermostat.\nRegulating temperature.")
self.info_box.show()
case PIDAutotuneState.STATE_FAILED:
self.autotuner.setOff()
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run")
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0))
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
self.background_task_lbl.setText("Ready.")
self.loading_spinner.stop()
self.loading_spinner.hide()
self.info_box.setWindowTitle("PID Autotune Failed")
self.info_box.setText("PID Autotune is failed.")
self.info_box.show()
@pyqtSlot(bool)
def _on_connection_changed(self, result):
def ctrl_panel_setEnable(result):
self.ld_status.setEnabled(result)
self.ld_tree.setEnabled(result)
self.ld_pwr_on_btn.setEnabled(result)
self.ld_pwr_off_btn.setEnabled(result)
self.ld_clear_alarm_btn.setEnabled(result)
self.tec_status.setEnabled(result)
self.tec_tree.setEnabled(result)
self.tec_pwr_on_btn.setEnabled(result)
self.tec_pwr_off_btn.setEnabled(result)
self.tec_clear_alarm_btn.setEnabled(result)
ctrl_panel_setEnable(result)
def menu_bar_setEnable(result):
self.menu_action_about_kirdy.setEnabled(result)
self.menu_action_connect.setEnabled(result)
self.menu_action_dfu_mode.setEnabled(result)
self.menu_action_disconnect.setEnabled(result)
self.menu_action_hard_reset.setEnabled(result)
self.menu_action_save.setEnabled(result)
self.menu_action_load.setEnabled(result)
self.menu_action_update_net_settings.setEnabled(result)
menu_bar_setEnable(result)
def graph_group_setEnable(result):
self.ld_i_set_graph.setEnabled(result)
self.pd_mon_pwr_graph.setEnabled(result)
self.tec_i_graph.setEnabled(result)
self.tec_temp_graph.setEnabled(result)
graph_group_setEnable(result)
self.report_group.setEnabled(result)
# TODO: Use QStateMachine to manage connections
self.connect_btn.clicked.disconnect()
if result:
self.connect_btn.setText("Disconnect")
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
# TODO: self.hw_rev_data = self.kirdy.hw_rev()
self._status()
else:
if self.kirdy_handler.connecting():
self.status_lbl.setText(f"Connection is dropped. Reconnecting to {self.ip_addr}:{self.port}.")
self.connect_btn.setText("Stop")
else:
self.connect_btn.setText("Connect")
self.connect_btn.clicked.connect(self.show_conn_settings_form)
self.clear_graphs()
self.status_lbl.setText(f"Disconnected from {self.ip_addr}:{self.port}.")
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
def _status(self):
# TODO: Get rev no from Kirdy and then add revision into the text
host = self.ip_addr
port = self.port
self.status_lbl.setText(f"Connected to {host}:{port}")
def clear_graphs(self):
self.graphs.clear_data_pts()
@pyqtSlot(dict)
def graphs_update(self, report):
self.graphs.plot_append(report)
@pyqtSlot(dict)
def update_ld_ctrl_panel_settings(self, settings):
try:
settings = settings['laser']
with QSignalBlocker(self.params[1]):
self.params[1].child('Output Config', 'LD Current Set').setValuewithLock(settings["ld_drive_current"]['value'])
self.params[1].child('Output Config', 'LD Terminals Short').setValuewithLock(settings["ld_terms_short"])
self.params[1].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"])
self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').setValuewithLock(settings["ld_pwr_limit"])
if settings["pd_mon_params"]["responsitivity"] is not None:
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(settings["pd_mon_params"]["responsitivity"])
else:
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(0)
self.params[1].child('Photodiode Monitor Config', 'Dark Current').setValuewithLock(settings["pd_mon_params"]["i_dark"])
except Exception as e:
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
@pyqtSlot(dict)
def update_ld_ctrl_panel_readings(self, report):
try:
report = report['laser']
with QSignalBlocker(self.params[0]):
if report['pwr_excursion']:
self.params[0].child('Status', 'Color').setValuewithLock('r')
self.params[0].child('Status').setOpts(title='Status: OverPower Alarm')
else:
self.params[0].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w')
self.params[0].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off')
with QSignalBlocker(self.params[1]):
self.params[1].child('Readings', 'LD Current Set').setValuewithLock(report["ld_i_set"])
self.params[1].child('Readings', 'PD Current').setValuewithLock(report["pd_i"])
if report["pd_pwr"] is not None:
self.params[1].child('Readings', 'PD Power').setValuewithLock(report["pd_pwr"])
else:
self.params[1].child('Readings', 'PD Power').setValuewithLock(0)
self.params[1].child('Readings', 'LF Mod Termination (50 Ohm)').setValuewithLock(report["term_50ohm"])
except Exception as e:
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
@pyqtSlot(dict)
def update_thermostat_ctrl_panel_settings(self, settings):
try:
settings = settings['thermostat']
with QSignalBlocker(self.params[3]):
self.params[3].child('Output Config', 'Control Method').setValuewithLock("Temperature PID" if settings["pid_engaged"] else "Constant Current")
self.params[3].child('Output Config', 'Control Method', 'Set Current').setValuewithLock(settings["tec_settings"]['i_set']['value'])
self.params[3].child('Output Config', 'Control Method', 'Set Temperature').setValuewithLock(float(settings["temperature_setpoint"]))
self.params[3].child('Output Config', 'Limits', 'Max Cooling Current').setValuewithLock(settings["tec_settings"]['max_i_pos']['value'])
self.params[3].child('Output Config', 'Limits', 'Max Heating Current').setValuewithLock(settings["tec_settings"]['max_i_neg']['value'])
self.params[3].child('Output Config', 'Limits', 'Max Voltage Difference').setValuewithLock(settings["tec_settings"]['max_v']['value'])
self.params[3].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"])
# TODO: Update the Temperature ADC Settings here as well
self.params[3].child('Temperature Monitor Config', 'Upper Limit').setValuewithLock(settings["temp_mon_settings"]['upper_limit'])
self.params[3].child('Temperature Monitor Config', 'Lower Limit').setValuewithLock(settings["temp_mon_settings"]['lower_limit'])
self.params[3].child('PID Config', 'Kp').setValuewithLock(settings["pid_params"]['kp'])
self.params[3].child('PID Config', 'Ki').setValuewithLock(settings["pid_params"]['ki'])
self.params[3].child('PID Config', 'Kd').setValuewithLock(settings["pid_params"]['kd'])
self.params[3].child('PID Config', 'PID Output Clamping', 'Minimum').setValuewithLock(settings["pid_params"]['output_min'])
self.params[3].child('PID Config', 'PID Output Clamping', 'Maximum').setValuewithLock(settings["pid_params"]['output_max'])
self.params[3].child('Thermistor Settings', 'T₀').setValuewithLock(settings["thermistor_params"]['t0'])
self.params[3].child('Thermistor Settings', 'R₀').setValuewithLock(settings["thermistor_params"]['r0'])
self.params[3].child('Thermistor Settings', 'B').setValuewithLock(settings["thermistor_params"]['b'])
self.graphs.set_temp_setpoint_line(temp=round(settings["temperature_setpoint"], 4))
self.graphs.set_temp_setpoint_line(visible=settings['pid_engaged'])
except Exception as e:
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
@pyqtSlot(dict)
def update_thermostat_ctrl_panel_readings(self, report):
try:
report = report['thermostat']
with QSignalBlocker(self.params[2]):
if report['temp_mon_status']['over_temp_alarm']:
self.params[2].child('Status', 'Color').setValuewithLock('r')
self.params[2].child('Status').setOpts(title='Status: OverTemperature Alarm')
else:
self.params[2].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w')
self.params[2].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off')
with QSignalBlocker(self.params[3]):
if report["temperature"] == None:
self.params[3].child('Readings', 'Temperature').setValuewithLock(-273.15)
else:
self.params[3].child('Readings', 'Temperature').setValuewithLock(report["temperature"])
self.params[3].child('Readings', 'Current through TEC').setValuewithLock(report["tec_i"])
except Exception as e:
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
@pyqtSlot(int)
def set_max_samples(self, samples: int):
self.graphs.set_max_samples(samples)
@pyqtSlot()
def update_net_settings(self):
net_settings = self.update_net_settings_form.get_net_settings()
if net_settings is None:
self.status_lbl.setText("Invalid IP Settings Input")
return
addr = net_settings["ip_addr"]
port = net_settings["port"]
prefix_len = net_settings["prefix_len"]
gateway = net_settings["gateway_addr"]
self.kirdy.task_dispatcher(self.kirdy.device.set_ip_settings(addr, port, prefix_len, gateway))
self.status_lbl.setText("IP Settings is Updated")
@pyqtSlot()
def start_connecting(self):
net_settings = self.conn_settings_form.get_net_settings()
if net_settings is None:
self.status_lbl.setText("Invalid IP Settings Input")
return
self.ip_addr = net_settings["ip_addr"]
self.port = net_settings["port"]
host = self.ip_addr
port = self.port
if not (self.kirdy_handler.connecting() or self.kirdy_handler.connected()):
self.status_lbl.setText("Connecting...")
self.kirdy_handler.start_session(host=host, port=port)
self.connect_btn.setText("Stop")
self.connect_btn.clicked.disconnect()
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
@pyqtSlot(object, object)
def send_command(self, param, changes):
for inner_param, change, data in changes:
if change == 'value':
""" cmd translation from mutex type parameter """
if inner_param.opts.get('target_action_pair', None) is not None:
target, action = inner_param.opts['target_action_pair'][inner_param.opts['limits'].index(data)]
cmd = getattr(getattr(self.kirdy, target), action)
param.child(*param.childPath(inner_param)).setOpts(lock=True)
self.kirdy.task_dispatcher(cmd())
param.child(*param.childPath(inner_param)).setOpts(lock=False)
continue
""" cmd translation from non-mutex type parameter"""
if inner_param.opts.get("target", None) is not None:
if inner_param.opts.get("action", None) is not None:
if inner_param.opts.get("unit", None) is not None:
_, _, suffix = siParse(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX)
data = siEval(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX, suffix=suffix)
cmd = getattr(getattr(self.kirdy, inner_param.opts["target"]), inner_param.opts["action"])
param.child(*param.childPath(inner_param)).setOpts(lock=True)
self.kirdy.task_dispatcher(cmd(data))
param.child(*param.childPath(inner_param)).setOpts(lock=False)
continue
async def coro_main():
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app_quit_event = asyncio.Event()
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
main_window = MainWindow(args)
main_window.show()
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == '__main__':
main()