1193 lines
61 KiB
Python
1193 lines
61 KiB
Python
from PyQt6 import QtWidgets, QtGui, QtCore, uic
|
|
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
|
|
import pyqtgraph.parametertree.parameterTypes as pTypes
|
|
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
|
|
import pyqtgraph as pg
|
|
pg.setConfigOptions(antialias=True)
|
|
from pyqtgraph import mkPen
|
|
from pyqtgraph.functions import siEval, siParse, SI_PREFIX_EXPONENTS, SI_PREFIXES
|
|
from pglive.sources.live_axis_range import LiveAxisRange
|
|
from pglive.sources.data_connector import DataConnector
|
|
from pglive.kwargs import Axis, LeadingLine
|
|
from pglive.sources.live_plot import LiveLinePlot
|
|
from pglive.sources.live_plot_widget import LivePlotWidget
|
|
from pglive.sources.live_axis import LiveAxis
|
|
import re
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import logging
|
|
import asyncio
|
|
from driver.kirdy import FilterConfig, Kirdy as Kirdy_Driver
|
|
import qasync
|
|
from qasync import asyncClose, asyncSlot
|
|
from collections import deque
|
|
from datetime import datetime, timezone, timedelta
|
|
from time import time
|
|
from typing import Any, Optional, List
|
|
from ui.ui_conn_settings_form import Ui_Conn_Settings_Form
|
|
from ui.ui_config_pd_mon_form import Ui_Cfg_Pd_Mon_Form
|
|
from ui.ui_update_network_settings_form import Ui_Update_Network_Settings_Form
|
|
from ui.ui_config_adc_filter_form import Ui_Cfg_Adc_Filter_Form
|
|
from dateutil import tz
|
|
import math
|
|
import socket
|
|
from pid_autotune import PIDAutotune, PIDAutotuneState
|
|
import importlib.resources
|
|
|
|
COMMON_ERROR_MSG = "Connection Timeout. Disconnecting."
|
|
FLOAT_REGEX = re.compile(r'(?P<number>[+-]?((((\d+(\.\d*)?)|(\d*\.\d+))([eE][+-]?\d+)?)|((?i:nan)|(inf))))\s*((?P<siPrefix>[u' + SI_PREFIXES + r']?)(?P<suffix>[\w°℃].*))?$')
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="ARTIQ master")
|
|
|
|
parser.add_argument("--log-file", default="", nargs="?",
|
|
help="Store logs into a file; Set the base filename")
|
|
parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
|
help="Set the logging level")
|
|
|
|
return parser
|
|
|
|
def siConvert(val, suffix, typ=float):
|
|
"""
|
|
Convert a value written in SI notation according to given Si Scale.
|
|
|
|
Example::
|
|
|
|
siConvert(0.1, "mA") # returns 100
|
|
"""
|
|
if val is None:
|
|
val = 0.0
|
|
|
|
val, siprefix, suffix = siParse(str(val)+suffix, FLOAT_REGEX)
|
|
v = typ(val)
|
|
|
|
n = -SI_PREFIX_EXPONENTS[siprefix] if siprefix != '' else 0
|
|
if n > 0:
|
|
return v * 10**n
|
|
elif n < 0:
|
|
# this case makes it possible to use Decimal objects here
|
|
return v / 10**-n
|
|
else:
|
|
return v
|
|
|
|
class Kirdy(QObject):
|
|
connected_sig = pyqtSignal(bool)
|
|
setting_update_sig = pyqtSignal(dict)
|
|
report_update_sig = pyqtSignal(dict)
|
|
cmd_fail_sig = pyqtSignal(str)
|
|
|
|
def __init__(self, parent, kirdy, _poll_interval):
|
|
super().__init__(parent)
|
|
self._poll_interval = _poll_interval
|
|
self._default_poll_interval = _poll_interval
|
|
self._kirdy = kirdy
|
|
self._kirdy.set_connected_sig(self.connected_sig)
|
|
self.connected_sig.connect(self.start_polling)
|
|
self._noti_info_box = QtWidgets.QMessageBox()
|
|
self._noti_info_box.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
|
|
self._kirdy.set_err_msg_sig(self.cmd_fail_sig)
|
|
self._poll_report_timer = QtCore.QTimer()
|
|
self._poll_report_timer.timeout.connect(self.polling_event)
|
|
|
|
self.poll_settings_timer = QtCore.QTimer()
|
|
self.poll_settings_timer.setInterval(100)
|
|
self.poll_settings_timer.timeout.connect(self.polling_settings_event)
|
|
|
|
|
|
def connected(self):
|
|
return self._kirdy.connected()
|
|
|
|
def connecting(self):
|
|
return self._kirdy.connecting()
|
|
|
|
def start_session(self, host, port):
|
|
self._kirdy.start_session(host=host, port=port)
|
|
|
|
def end_session(self):
|
|
if self._poll_report_timer.isActive():
|
|
self._poll_report_timer.stop()
|
|
asyncio.get_running_loop().create_task(self._kirdy.end_session())
|
|
|
|
@pyqtSlot()
|
|
def polling_settings_event(self):
|
|
self._kirdy.task_dispatcher(self._kirdy.device.get_settings_summary(sig=self.setting_update_sig))
|
|
|
|
@pyqtSlot()
|
|
def polling_event(self):
|
|
success = True
|
|
success &= self._kirdy.task_dispatcher(self._kirdy.device.get_status_report(sig=self.report_update_sig))
|
|
if not(success):
|
|
self._noti_info_box.setWindowTitle("Polling rate is too high")
|
|
self._noti_info_box.setText(f"Kirdy cannot handle {1/(self._poll_interval)} Hz polling rate. Reset to default polling rate ({1/self._default_poll_interval} Hz)")
|
|
self._noti_info_box.show()
|
|
|
|
self.set_update_s(self._default_poll_interval)
|
|
|
|
@pyqtSlot(bool)
|
|
def start_polling(self, start):
|
|
if start:
|
|
if not(self._poll_report_timer.isActive()):
|
|
self._poll_report_timer.setInterval(int(self._poll_interval*1000))
|
|
self._poll_report_timer.start()
|
|
self.poll_settings_timer.start()
|
|
else:
|
|
logging.debug("Kirdy Polling Timer has been started already.")
|
|
else:
|
|
self._poll_report_timer.stop()
|
|
self.poll_settings_timer.stop()
|
|
|
|
@pyqtSlot(float)
|
|
def set_update_s(self, interval):
|
|
self._poll_interval = interval
|
|
self.update_polling_rate()
|
|
|
|
def update_polling_rate(self):
|
|
if self._poll_report_timer.isActive():
|
|
self._poll_report_timer.stop()
|
|
self.start_polling(True)
|
|
else:
|
|
logging.debug("Attempt to update polling timer when it is stopped")
|
|
|
|
def get_hw_rev(self):
|
|
return self._kirdy.get_hw_rev()
|
|
|
|
class Graphs:
|
|
def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000):
|
|
self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph]
|
|
self.connectors = []
|
|
|
|
self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r'))
|
|
self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r'))
|
|
self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g'))
|
|
|
|
self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value} ℃', pen=pg.mkPen('g'))
|
|
# Render the temperature setpoint line on top of the temperature being plotted
|
|
self._temp_setpoint_line.setZValue(10)
|
|
self._temp_setpoint_line.setVisible(False)
|
|
|
|
def tickStrings(values: List, scale: float, spacing: float) -> List:
|
|
return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values]
|
|
|
|
for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph:
|
|
time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="")
|
|
# Display the relative ts in custom %H:%M:%S format without local timezone
|
|
time_axis.tickStrings = tickStrings
|
|
# Prevent scaling prefix being added to the back fo axis label
|
|
time_axis.autoSIPrefix = False
|
|
time_axis.showLabel()
|
|
|
|
graph.setAxisItems({'bottom': time_axis})
|
|
graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'})
|
|
|
|
#TODO: x_range should not be updated on every tick
|
|
graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900)
|
|
graph.x_range_controller.crop_left_offset_to_data = True
|
|
# Enable linking of axes in the graph widget's context menu
|
|
graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title
|
|
|
|
self.max_samples = max_samples
|
|
ld_i_set_axis = LiveAxis('left', text="Current", units="A")
|
|
ld_i_set_axis.showLabel()
|
|
ld_i_set_graph.setAxisItems({'left': ld_i_set_axis})
|
|
ld_i_set_graph.addItem(self._ld_i_set_plot)
|
|
ld_i_set_graph.y_range_controller = LiveAxisRange(fixed_range=[0.0, 0.4])
|
|
self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, max_points=self.max_samples)
|
|
self.connectors += [self.ld_i_set_connector]
|
|
|
|
pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W")
|
|
pd_mon_pwr_axis.showLabel()
|
|
pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis})
|
|
pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot)
|
|
self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, max_points=self.max_samples)
|
|
self.connectors += [self.pd_mon_pwr_connector]
|
|
|
|
tec_temp_axis = LiveAxis('left', text="Temperature", units="℃")
|
|
tec_temp_axis.showLabel()
|
|
tec_temp_graph.setAxisItems({'left': tec_temp_axis})
|
|
tec_temp_graph.addItem(self._tec_setpoint_plot)
|
|
tec_temp_graph.addItem(self._tec_temp_plot)
|
|
self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, max_points=1)
|
|
self.tec_temp_connector = DataConnector(self._tec_temp_plot, max_points=self.max_samples)
|
|
self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector]
|
|
|
|
tec_i_axis = LiveAxis('left', text="Current", units="A")
|
|
tec_i_axis.showLabel()
|
|
tec_i_graph.setAxisItems({'left': tec_i_axis})
|
|
tec_i_graph.addLegend(brush=(50, 50, 200, 150))
|
|
tec_i_graph.addItem(self._tec_i_target_plot)
|
|
tec_i_graph.addItem(self._tec_i_measure_plot)
|
|
self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, max_points=self.max_samples)
|
|
self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, max_points=self.max_samples)
|
|
self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector]
|
|
|
|
def set_max_samples(self, max_samples):
|
|
self.max_samples = max_samples
|
|
for connector in self.connectors:
|
|
with connector.data_lock:
|
|
connector.max_points = self.max_samples
|
|
connector.x = deque(maxlen=int(connector.max_points))
|
|
connector.y = deque(maxlen=int(connector.max_points))
|
|
|
|
def plot_append(self, report):
|
|
try:
|
|
ld_i_set = report['laser']['ld_i_set']
|
|
pd_pwr = report['laser']['pd_pwr']
|
|
|
|
tec_i_set = report['thermostat']['i_set']
|
|
tec_i_measure = report['thermostat']['tec_i']
|
|
tec_temp = report['thermostat']['temperature']
|
|
|
|
ts = report['ts']
|
|
|
|
self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts)
|
|
if pd_pwr is not None:
|
|
self._pd_mon_pwr_plot.show()
|
|
self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts)
|
|
else:
|
|
self._pd_mon_pwr_plot.hide()
|
|
self.pd_mon_pwr_connector.cb_append_data_point(0.0, ts)
|
|
|
|
if tec_temp is None:
|
|
self._tec_temp_plot.hide()
|
|
tec_temp = -273.15
|
|
else:
|
|
self._tec_temp_plot.show()
|
|
|
|
self.tec_temp_connector.cb_append_data_point(tec_temp, ts)
|
|
if self._temp_setpoint_line.isVisible():
|
|
self.tec_setpoint_connector.cb_append_data_point(self._temp_setpoint_line.value(), ts)
|
|
else:
|
|
self.tec_setpoint_connector.cb_append_data_point(tec_temp, ts)
|
|
if tec_i_measure is not None:
|
|
self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts)
|
|
self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
def clear_data_pts(self):
|
|
for connector in self.connectors:
|
|
connector.clear()
|
|
connector.resume()
|
|
|
|
def set_temp_setpoint_line(self, temp=None, visible=None):
|
|
if visible is not None:
|
|
self._temp_setpoint_line.setVisible(visible)
|
|
if temp is not None:
|
|
self._temp_setpoint_line.setValue(temp)
|
|
|
|
# PyQtGraph normally does not update this text when the line
|
|
# is not visible, so make sure that the temperature label
|
|
# gets updated always, and doesn't stay at an old value.
|
|
self._temp_setpoint_line.label.setText(f"{temp} ℃", color='g')
|
|
|
|
class MutexParameter(pTypes.ListParameter):
|
|
"""
|
|
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
|
|
|
|
The ordering of the list items determines which children will be visible.
|
|
"""
|
|
def __init__(self, **opts):
|
|
super().__init__(**opts)
|
|
|
|
self.sigValueChanged.connect(self.show_chosen_child)
|
|
self.sigValueChanged.emit(self, self.opts['value'])
|
|
|
|
def _get_param_from_value(self, value):
|
|
if isinstance(self.opts['limits'], dict):
|
|
values_list = list(self.opts['limits'].values())
|
|
else:
|
|
values_list = self.opts['limits']
|
|
|
|
return self.children()[values_list.index(value)]
|
|
|
|
@pyqtSlot(object, object)
|
|
def show_chosen_child(self, value):
|
|
for param in self.children():
|
|
param.hide()
|
|
|
|
child_to_show = self._get_param_from_value(value.value())
|
|
child_to_show.show()
|
|
|
|
if child_to_show.opts.get('triggerOnShow', None):
|
|
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
|
|
|
|
registerParameterType('mutex', MutexParameter)
|
|
|
|
class UpdateNetSettingsForm(QtWidgets.QDialog, Ui_Update_Network_Settings_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
|
|
def get_net_settings(self):
|
|
try:
|
|
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
|
|
gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}"
|
|
socket.inet_aton(ip_addr)
|
|
socket.inet_aton(gateway_addr)
|
|
|
|
return {
|
|
"ip_addr": ip_addr,
|
|
"gateway_addr": gateway_addr,
|
|
"prefix_len": int(self.prefix_len_in.text()),
|
|
"port": int(self.port_in.text()),
|
|
}
|
|
except (OSError, ValueError):
|
|
return None
|
|
|
|
class CfgPdMonForm(QtWidgets.QDialog, Ui_Cfg_Pd_Mon_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
|
|
class ConnSettingsForm(QtWidgets.QDialog, Ui_Conn_Settings_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
|
|
def get_net_settings(self):
|
|
try:
|
|
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
|
|
socket.inet_aton(ip_addr)
|
|
|
|
return {
|
|
"ip_addr": ip_addr,
|
|
"port": int(self.port_in.text())
|
|
}
|
|
except (OSError, ValueError):
|
|
return None
|
|
|
|
class ConfigAdcFilterForm(QtWidgets.QDialog, Ui_Cfg_Adc_Filter_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
self.filter_type_cbox.addItems(['Sinc5Sinc1With50hz60HzRejection', 'Sinc5Sinc1', 'Sinc3', 'Sinc3WithFineODR'])
|
|
self.fine_filter_sampling_rate_spinbox.setVisible(False)
|
|
self.fine_filter_sampling_rate_spinbox.setMinimum(FilterConfig.Sinc3WithFineODR.lower_limit)
|
|
self.fine_filter_sampling_rate_spinbox.setMaximum(FilterConfig.Sinc3WithFineODR.upper_limit)
|
|
self.filter_type_cbox.currentTextChanged.connect(self.sampling_rate_cbox_config)
|
|
|
|
@pyqtSlot(str)
|
|
def sampling_rate_cbox_config(self, filter_type):
|
|
if filter_type == "":
|
|
return
|
|
if filter_type == "Sinc3WithFineODR":
|
|
self.filter_sampling_rate_cbox.setVisible(False)
|
|
self.fine_filter_sampling_rate_spinbox.setVisible(True)
|
|
else:
|
|
self.fine_filter_sampling_rate_spinbox.setVisible(False)
|
|
self.filter_sampling_rate_cbox.setVisible(True)
|
|
self.filter_sampling_rate_cbox.clear()
|
|
self.filter_sampling_rate_cbox.addItems(getattr(FilterConfig, filter_type).get_list_of_settings())
|
|
|
|
def get_filter_settings(self):
|
|
filter_type = self.filter_type_cbox.currentText()
|
|
if filter_type == "Sinc3WithFineODR":
|
|
return getattr(FilterConfig, filter_type)(self.fine_filter_sampling_rate_spinbox.value())
|
|
else:
|
|
filter_type_val = getattr(FilterConfig, filter_type)
|
|
filter_cfg = getattr(filter_type_val, self.filter_sampling_rate_cbox.currentText().lower())
|
|
return filter_cfg
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
"""The maximum number of sample points to store."""
|
|
DEFAULT_MAX_SAMPLES = 1000
|
|
DEFAULT_IP_ADDR = '192.168.1.128'
|
|
DEFAULT_PORT = 1337
|
|
|
|
LASER_DIODE_STATUS = [
|
|
{'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False},
|
|
]}
|
|
]
|
|
|
|
LASER_DIODE_PARAMETERS = [
|
|
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'LD Current Set', 'type': 'float', 'unit': 'mA', 'readonly': True, "compactHeight": False},
|
|
{'name': 'PD Current', 'type': 'float', 'unit': 'uA', 'siPrefix': True, 'readonly': True, "compactHeight": False},
|
|
{'name': 'PD Power', 'type': 'float', 'unit': 'mW', 'readonly': True, "compactHeight": False},
|
|
{'name': 'LF Mod Termination (50 Ohm)', 'type': 'list', 'limits': ['On', 'Off'], 'readonly': True, "compactHeight": False}
|
|
]},
|
|
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, 300),
|
|
'unit': 'mA', 'lock': False, 'target': 'laser', 'action': 'set_i', "compactHeight": False},
|
|
{'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short', "compactHeight": False},
|
|
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on', "compactHeight": False},
|
|
]},
|
|
{'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")),
|
|
'unit': 'mW', 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit', "compactHeight": False},
|
|
{'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 0.001, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")),
|
|
'unit': 'mA/W', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity', "compactHeight": False, 'readonly': True},
|
|
{'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'format': '{value:.4f}', 'decimals': 6, 'limits': (0, float("inf")),
|
|
'unit': 'uA', 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current', "compactHeight": False, 'readonly': True},
|
|
{'name': 'Configure Photodiode Monitor', 'type': 'action'},
|
|
]},
|
|
]
|
|
|
|
THERMOSTAT_STATUS = [
|
|
{'name': 'Status', 'title': 'Status: Power Off', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Color', 'title': '', 'type': 'color', 'value': 'w', 'readonly': True, "compactHeight": False},
|
|
]}
|
|
]
|
|
|
|
THERMOSTAT_PARAMETERS = [
|
|
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Temperature', 'type': 'float', 'unit': '℃', 'format': '{value:.4f}', 'readonly': True, "compactHeight": False},
|
|
{'name': 'Current through TEC', 'type': 'float', 'unit': 'mA', 'decimals': 6, 'readonly': True, "compactHeight": False},
|
|
]},
|
|
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'],
|
|
'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [
|
|
{'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 1, 'limits': (-3000, 3000), 'triggerOnShow': True, 'decimals': 6,
|
|
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out', "compactHeight": False},
|
|
{'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.0001, 'limits': (-273, 300), 'format': '{value:.4f}',
|
|
'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint', "compactHeight": False},
|
|
]},
|
|
{'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000),
|
|
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i', "compactHeight": False},
|
|
{'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000),
|
|
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i', "compactHeight": False},
|
|
{'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 4.3),
|
|
'unit': 'V', 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v', "compactHeight": False},
|
|
]},
|
|
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'},
|
|
]},
|
|
{'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit', "compactHeight": False},
|
|
{'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_lower_limit', "compactHeight": False},
|
|
]},
|
|
{'name': 'Temperature ADC Filter Settings', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Filter Type', 'type': 'list', 'limits': ['Sinc5Sinc1With50hz60HzRejection', 'Sinc5Sinc1', 'Sinc3', 'Sinc3WithFineODR'], 'readonly': True, "compactHeight": False},
|
|
{'name': 'Sampling Rate', 'type': 'float', 'value': 16.67, 'decimals': 4, 'unit': 'Hz', 'readonly': True, "compactHeight": False},
|
|
{'name': 'Recorded Sampling Rate', 'type': 'float', 'value': 16.67, 'decimals': 4, 'unit': 'Hz', 'readonly': True, "compactHeight": False},
|
|
{'name': 'Configure ADC Filter', 'type': 'action'},
|
|
]},
|
|
{'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'unit': '℃', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0', "compactHeight": False},
|
|
{'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6,
|
|
'unit': 'kΩ', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0', "compactHeight": False},
|
|
{'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'decimals': 4,
|
|
'unit': 'K', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta', "compactHeight": False},
|
|
]},
|
|
{'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Kp', 'type': 'float', 'step': 0.1, 'decimals': 16, 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp', "compactHeight": False},
|
|
{'name': 'Ki', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki', "compactHeight": False},
|
|
{'name': 'Kd', 'type': 'float', 'step': 0.1, 'decimals': 16, 'unit': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd', "compactHeight": False},
|
|
{'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Minimum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6,
|
|
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min', "compactHeight": False},
|
|
{'name': 'Maximum', 'type': 'float', 'step': 1, 'limits': (-1000, 1000), 'decimals': 6,
|
|
'unit': 'mA', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max', "compactHeight": False},
|
|
]},
|
|
{'name': 'PID Auto Tune', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Target Temperature', 'type': 'float', 'value': 20.0, 'step': 0.1, 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False},
|
|
{'name': 'Test Current', 'type': 'float', 'value': 1000, 'decimals': 6, 'step': 100, 'limits': (-1000, 1000), 'unit': 'mA', "compactHeight": False},
|
|
{'name': 'Temperature Swing', 'type': 'float', 'value': 0.0, 'step': 0.0001, 'prefix': '±', 'unit': '℃', 'format': '{value:.4f}', "compactHeight": False},
|
|
{'name': 'Lookback', 'type': 'float', 'value': 5.0, 'step': 0.1, 'unit': 's', 'format': '{value:.4f}', "compactHeight": False},
|
|
{'name': 'Run', 'type': 'action', 'tip': 'Run'},
|
|
]},
|
|
]},
|
|
]
|
|
def __init__(self, args):
|
|
super(MainWindow, self).__init__()
|
|
self.kirdy = Kirdy_Driver()
|
|
|
|
ui_file_path = importlib.resources.files("ui").joinpath("kirdy_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self.info_box = QtWidgets.QMessageBox()
|
|
self.info_box.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
|
|
# Load Global QT Style Sheet Settings
|
|
qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss")
|
|
with open(qss,"r") as fh:
|
|
self.setStyleSheet(fh.read())
|
|
|
|
self.ip_addr = self.DEFAULT_IP_ADDR
|
|
self.port = self.DEFAULT_PORT
|
|
|
|
self.cfg_pd_mon_form = CfgPdMonForm()
|
|
|
|
self.conn_settings_form = ConnSettingsForm()
|
|
self.conn_settings_form.accepted.connect(self.start_connecting)
|
|
|
|
self.update_net_settings_form = UpdateNetSettingsForm()
|
|
self.update_net_settings_form.accepted.connect(self.update_net_settings)
|
|
|
|
self.cfg_adc_filter_form = ConfigAdcFilterForm()
|
|
|
|
self.max_samples = self.DEFAULT_MAX_SAMPLES
|
|
|
|
self.autotuner = PIDAutotune(25)
|
|
|
|
self.setup_menu_bar()
|
|
self._set_up_ctrl_btns()
|
|
self._set_up_plot_menu()
|
|
|
|
def _setValuewithLock(self, value):
|
|
if not self.opts.get("lock", None):
|
|
if self.opts.get("unit", None) is not None:
|
|
self.setValue(siConvert(value, self.opts.get("unit", None)))
|
|
else:
|
|
self.setValue(value)
|
|
Parameter.setValuewithLock = _setValuewithLock
|
|
|
|
def _add_unit_to_title(param_tree):
|
|
def _traverse(param):
|
|
if param["type"] == "group" or param["type"] == "mutex":
|
|
for param in param["children"]:
|
|
_add_unit_to_title(param)
|
|
else:
|
|
if "unit" in param.keys():
|
|
if not("title" in param.keys()):
|
|
param["title"] = param["name"]
|
|
param["title"] = param["title"] + " ({:})".format(param["unit"])
|
|
|
|
if isinstance(param_tree, list):
|
|
for param in param_tree:
|
|
_traverse(param)
|
|
else:
|
|
_traverse(param_tree)
|
|
|
|
_add_unit_to_title(self.LASER_DIODE_STATUS)
|
|
_add_unit_to_title(self.LASER_DIODE_PARAMETERS)
|
|
_add_unit_to_title(self.THERMOSTAT_STATUS)
|
|
_add_unit_to_title(self.THERMOSTAT_PARAMETERS)
|
|
|
|
self.params = [
|
|
Parameter.create(name=f"Laser Diode Status", type='group', value=0, children=self.LASER_DIODE_STATUS),
|
|
Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS),
|
|
Parameter.create(name=f"Thermostat Status", type='group', value=2, children=self.THERMOSTAT_STATUS),
|
|
Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS),
|
|
]
|
|
self._set_param_tree()
|
|
self._set_up_pd_mon_form()
|
|
self._set_up_adc_filter_form()
|
|
|
|
self.tec_i_graph.setTitle("TEC Current")
|
|
self.tec_temp_graph.setTitle("TEC Temperature")
|
|
self.ld_i_set_graph.setTitle("LD Current Set")
|
|
self.pd_mon_pwr_graph.setTitle("PD Mon Power")
|
|
|
|
self.connect_btn.clicked.connect(self.show_conn_settings_form)
|
|
|
|
self.kirdy_handler = Kirdy(self, self.kirdy, 1/20.0)
|
|
|
|
self.kirdy_handler.setting_update_sig.connect(self.update_ld_ctrl_panel_settings)
|
|
self.kirdy_handler.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings)
|
|
self.kirdy_handler.report_update_sig.connect(self.update_ld_ctrl_panel_readings)
|
|
self.kirdy_handler.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings)
|
|
self.kirdy_handler.cmd_fail_sig.connect(self.cmd_cannot_execute)
|
|
|
|
self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples)
|
|
self.kirdy_handler.report_update_sig.connect(self.graphs.plot_append)
|
|
|
|
self.loading_spinner.hide()
|
|
self.kirdy_handler.connected_sig.connect(self._on_connection_changed)
|
|
|
|
def setup_menu_bar(self):
|
|
@pyqtSlot(bool)
|
|
def about_kirdy(_):
|
|
hw_rev = self.kirdy_handler.get_hw_rev()
|
|
QtWidgets.QMessageBox.about(
|
|
self,
|
|
"About Kirdy",
|
|
f"""
|
|
<h1>Sinara 1550 Kirdy v{hw_rev["major"]}.{hw_rev["minor"]}</h1>
|
|
"""
|
|
)
|
|
self.menu_action_about_kirdy.triggered.connect(about_kirdy)
|
|
|
|
@pyqtSlot(bool)
|
|
def about_gui(_):
|
|
# TODO: Replace the hardware revision placeholder
|
|
QtWidgets.QMessageBox.about(
|
|
self,
|
|
"About Kirdy Control Panel",
|
|
f"""
|
|
<h1>Version: "Version"</h1>
|
|
"""
|
|
)
|
|
self.menu_action_about_gui.triggered.connect(about_gui)
|
|
|
|
@asyncSlot(bool)
|
|
async def dfu_mode(_):
|
|
await self.kirdy.device.dfu()
|
|
self.kirdy_handler.end_session()
|
|
self.menu_action_dfu_mode.triggered.connect(dfu_mode)
|
|
|
|
@asyncSlot(bool)
|
|
async def reset_kirdy(_):
|
|
await self.kirdy.device.hard_reset()
|
|
self.kirdy_handler.end_session()
|
|
self.menu_action_hard_reset.triggered.connect(reset_kirdy)
|
|
|
|
@pyqtSlot(bool)
|
|
def save_settings(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.device.save_current_settings_to_flash())
|
|
saved = QtWidgets.QMessageBox(self)
|
|
saved.setWindowTitle("Config saved")
|
|
saved.setText(f"Laser diode and thermostat configs have been saved into flash.")
|
|
saved.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
saved.show()
|
|
self.menu_action_save.triggered.connect(save_settings)
|
|
|
|
@pyqtSlot(bool)
|
|
def load_settings(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.device.restore_settings_from_flash())
|
|
loaded = QtWidgets.QMessageBox(self)
|
|
loaded.setWindowTitle("Config loaded")
|
|
loaded.setText(f"Laser Diode and Thermostat configs have been loaded from flash.")
|
|
loaded.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
loaded.show()
|
|
self.menu_action_load.triggered.connect(load_settings)
|
|
|
|
@pyqtSlot(bool)
|
|
def show_update_net_settings_form(_):
|
|
self.update_net_settings_form.retranslateUi(self.update_net_settings_form)
|
|
self.update_net_settings_form.show()
|
|
self.menu_action_update_net_settings.triggered.connect(show_update_net_settings_form)
|
|
|
|
def update_pd_mon_form_readings(self, ld_settings):
|
|
pwr_unit = self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').opts.get("unit", None)
|
|
self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').setOpts(limits= (0, siConvert(ld_settings["ld_pwr_limit"]["max"], pwr_unit)))
|
|
self.cfg_pd_mon_form.settable_pwr_range_display_lbl.setText(f" 0 - {siConvert(ld_settings['ld_pwr_limit']['max'], pwr_unit):.4f}")
|
|
self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.setMaximum(siConvert(ld_settings['ld_pwr_limit']['max'], pwr_unit))
|
|
|
|
responsitivity_unit = self.cfg_pd_mon_form.cfg_responsitivity_spinbox.unit
|
|
self.cfg_pd_mon_form.cfg_responsitivity_reading.setText(f"{siConvert(ld_settings['pd_mon_params']['responsitivity'], responsitivity_unit):.4f}")
|
|
|
|
i_dark_unit = self.cfg_pd_mon_form.cfg_dark_current_spinbox.unit
|
|
self.cfg_pd_mon_form.cfg_dark_current_reading.setText(f"{siConvert(ld_settings['pd_mon_params']['i_dark'], i_dark_unit):.4f}")
|
|
|
|
pwr_limit_unit = self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.unit
|
|
self.cfg_pd_mon_form.cfg_pwr_limit_reading.setText(f"{siConvert(ld_settings['ld_pwr_limit']['value'], pwr_limit_unit):.4f}")
|
|
|
|
def update_adc_filter_form_readings(self, filter_type, filter_rate):
|
|
self.cfg_adc_filter_form.filter_type_reading_lbl.setText(filter_type)
|
|
self.cfg_adc_filter_form.filter_sampling_rate_reading_lbl.setText(str(filter_rate))
|
|
|
|
def show_conn_settings_form(self):
|
|
ip_addr = self.ip_addr.split(".")
|
|
self.conn_settings_form.addr_in_0.setText(ip_addr[0])
|
|
self.conn_settings_form.addr_in_1.setText(ip_addr[1])
|
|
self.conn_settings_form.addr_in_2.setText(ip_addr[2])
|
|
self.conn_settings_form.addr_in_3.setText(ip_addr[3])
|
|
self.conn_settings_form.port_in.setText(str(self.port))
|
|
self.conn_settings_form.show()
|
|
|
|
def _set_up_ctrl_btns(self):
|
|
@pyqtSlot(bool)
|
|
def ld_pwr_on(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True))
|
|
self.ld_pwr_on_btn.clicked.connect(ld_pwr_on)
|
|
|
|
@pyqtSlot(bool)
|
|
def ld_pwr_off(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False))
|
|
self.ld_pwr_off_btn.clicked.connect(ld_pwr_off)
|
|
|
|
@pyqtSlot(bool)
|
|
def ld_clear_alarm(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm())
|
|
self.ld_clear_alarm_btn.clicked.connect(ld_clear_alarm)
|
|
|
|
@pyqtSlot(bool)
|
|
def tec_pwr_on(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(True))
|
|
self.tec_pwr_on_btn.clicked.connect(tec_pwr_on)
|
|
|
|
@pyqtSlot(bool)
|
|
def tec_pwr_off(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_power_on(False))
|
|
self.tec_pwr_off_btn.clicked.connect(tec_pwr_off)
|
|
|
|
@pyqtSlot(bool)
|
|
def tec_clear_alarm(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.clear_alarm())
|
|
self.tec_clear_alarm_btn.clicked.connect(tec_clear_alarm)
|
|
|
|
@pyqtSlot(bool)
|
|
def update_polling_rate(_):
|
|
self.kirdy_handler.set_update_s(1/self.polling_rate_spinbox.value())
|
|
self.kirdy_handler.update_polling_rate()
|
|
self.polling_rate_apply_btn.clicked.connect(update_polling_rate)
|
|
|
|
def _set_up_plot_menu(self):
|
|
self.plot_menu = QtWidgets.QMenu()
|
|
self.plot_menu.setTitle("Plot Settings")
|
|
|
|
clear = QtGui.QAction("Clear graphs", self.plot_menu)
|
|
clear.triggered.connect(self.clear_graphs)
|
|
self.plot_menu.addAction(clear)
|
|
self.plot_menu.clear = clear
|
|
|
|
self.samples_spinbox = QtWidgets.QSpinBox()
|
|
self.samples_spinbox.setRange(2, 100000)
|
|
self.samples_spinbox.setSuffix(' samples')
|
|
self.samples_spinbox.setValue(self.max_samples)
|
|
self.samples_spinbox.valueChanged.connect(self.set_max_samples)
|
|
|
|
limit_samples = QtWidgets.QWidgetAction(self.plot_menu)
|
|
limit_samples.setDefaultWidget(self.samples_spinbox)
|
|
self.plot_menu.addAction(limit_samples)
|
|
self.plot_menu.limit_samples = limit_samples
|
|
|
|
self.plot_settings.setMenu(self.plot_menu)
|
|
|
|
def _set_up_pd_mon_form(self):
|
|
@pyqtSlot(bool)
|
|
def ld_pwr_on(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.clear_alarm())
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(True))
|
|
self.cfg_pd_mon_form.pwr_on_btn.clicked.connect(ld_pwr_on)
|
|
|
|
@pyqtSlot(bool)
|
|
def ld_pwr_off(_):
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_power_on(False))
|
|
self.cfg_pd_mon_form.pwr_off_btn.clicked.connect(ld_pwr_off)
|
|
|
|
def get_spinbox_value(spinbox):
|
|
_, _, suffix = siParse(str(spinbox.value())+spinbox.unit, regex=FLOAT_REGEX)
|
|
return siEval(str(spinbox.value())+spinbox.unit, regex=FLOAT_REGEX, suffix=suffix)
|
|
|
|
def set_spinbox_value(spinbox, val):
|
|
spinbox.setValue(siConvert(val, spinbox.unit))
|
|
|
|
@pyqtSlot(bool)
|
|
def apply_pd_params(_):
|
|
responsitivity = get_spinbox_value(self.cfg_pd_mon_form.cfg_responsitivity_spinbox)
|
|
dark_current = get_spinbox_value(self.cfg_pd_mon_form.cfg_dark_current_spinbox)
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_pd_mon_responsitivity(responsitivity))
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_pd_mon_dark_current(dark_current))
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.apply_pd_params())
|
|
self.cfg_pd_mon_form.apply_pd_params_btn.clicked.connect(apply_pd_params)
|
|
|
|
@pyqtSlot(bool)
|
|
def apply_ld_pwr_limit(_):
|
|
pwr_limit = get_spinbox_value(self.cfg_pd_mon_form.cfg_pwr_limit_spinbox)
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(pwr_limit))
|
|
self.cfg_pd_mon_form.apply_pwr_limit_btn.clicked.connect(apply_ld_pwr_limit)
|
|
|
|
@pyqtSlot(bool)
|
|
def rst_ld_pwr_limit(_):
|
|
pwr_limit = self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.value()
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(0))
|
|
self.cfg_pd_mon_form.rst_ld_pwr_limit_btn.clicked.connect(rst_ld_pwr_limit)
|
|
|
|
@asyncSlot(bool)
|
|
async def apply_ld_pwr_limit_max(_):
|
|
settings = await self.kirdy.device.get_settings_summary()
|
|
set_spinbox_value(self.cfg_pd_mon_form.cfg_pwr_limit_spinbox, settings['laser']['ld_pwr_limit']['max'])
|
|
self.kirdy.task_dispatcher(self.kirdy.laser.set_ld_pwr_limit(settings['laser']['ld_pwr_limit']['max']))
|
|
self.cfg_pd_mon_form.apply_pwr_limit_max_btn.clicked.connect(apply_ld_pwr_limit_max)
|
|
|
|
ld_pwr_limit_unit = self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').opts["unit"]
|
|
ld_pwr_limit_text = self.cfg_pd_mon_form.cfg_pwr_limit_lbl.text()
|
|
self.cfg_pd_mon_form.cfg_pwr_limit_lbl.setText(ld_pwr_limit_text.replace(":", f" ({ld_pwr_limit_unit}):"))
|
|
self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.unit = ld_pwr_limit_unit
|
|
settable_pwr_limit_text = self.cfg_pd_mon_form.settable_pwr_range_lbl.text()
|
|
self.cfg_pd_mon_form.settable_pwr_range_lbl.setText(settable_pwr_limit_text.replace(":", f" ({ld_pwr_limit_unit}):"))
|
|
|
|
pd_responsitivity_unit = self.params[1].child('Photodiode Monitor Config', 'Responsitivity').opts["unit"]
|
|
pd_responsitivity_text = self.cfg_pd_mon_form.cfg_responsitivity_lbl.text()
|
|
self.cfg_pd_mon_form.cfg_responsitivity_lbl.setText(pd_responsitivity_text.replace(":", f" ({pd_responsitivity_unit}):"))
|
|
self.cfg_pd_mon_form.cfg_responsitivity_spinbox.unit = pd_responsitivity_unit
|
|
|
|
pd_dark_current_unit = self.params[1].child('Photodiode Monitor Config', 'Dark Current').opts["unit"]
|
|
pd_dark_current_text = self.cfg_pd_mon_form.cfg_dark_current_lbl.text()
|
|
self.cfg_pd_mon_form.cfg_dark_current_lbl.setText(pd_dark_current_text.replace(":", f" ({pd_dark_current_unit}):"))
|
|
self.cfg_pd_mon_form.cfg_dark_current_spinbox.unit = pd_dark_current_unit
|
|
|
|
def _set_up_adc_filter_form(self):
|
|
@pyqtSlot(bool)
|
|
def apply_adc_filter_settings():
|
|
filter_cfg = self.cfg_adc_filter_form.get_filter_settings()
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.config_temp_adc_filter(filter_cfg))
|
|
self.cfg_adc_filter_form.apply_btn.clicked.connect(apply_adc_filter_settings)
|
|
|
|
def _set_param_tree(self):
|
|
status = self.ld_status
|
|
status.setHeaderHidden(True)
|
|
status.setParameters(self.params[0], showTop=False)
|
|
|
|
tree = self.ld_tree
|
|
tree.setHeaderHidden(True)
|
|
tree.setParameters(self.params[1], showTop=False)
|
|
self.params[1].sigTreeStateChanged.connect(self.send_command)
|
|
|
|
status = self.tec_status
|
|
status.setHeaderHidden(True)
|
|
status.setParameters(self.params[2], showTop=False)
|
|
|
|
tree = self.tec_tree
|
|
tree.setHeaderHidden(True)
|
|
tree.setParameters(self.params[3], showTop=False)
|
|
self.params[3].sigTreeStateChanged.connect(self.send_command)
|
|
|
|
@asyncSlot()
|
|
async def autotune(param):
|
|
match self.autotuner.state():
|
|
case PIDAutotuneState.STATE_OFF:
|
|
settings = await self.kirdy.device.get_settings_summary()
|
|
self.autotuner.setParam(
|
|
param.parent().child('Target Temperature').value(),
|
|
param.parent().child('Test Current').value() / 1000,
|
|
param.parent().child('Temperature Swing').value(),
|
|
1.0 / settings['thermostat']['temp_adc_settings']['rate'],
|
|
param.parent().child('Lookback').value())
|
|
self.autotuner.setReady()
|
|
param.setOpts(title="Stop")
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_constant_current_control_mode())
|
|
self.kirdy_handler.report_update_sig.connect(self.autotune_tick)
|
|
self.loading_spinner.show()
|
|
self.loading_spinner.start()
|
|
self.background_task_lbl.setText("Autotuning")
|
|
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
|
|
self.autotuner.setOff()
|
|
param.setOpts(title="Run")
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0))
|
|
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.stop()
|
|
self.loading_spinner.hide()
|
|
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').sigActivated.connect(autotune)
|
|
|
|
@pyqtSlot()
|
|
def show_pd_mon_cfg_form(param):
|
|
ld_pwr_limit = self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').value()
|
|
pd_responsitivity = self.params[1].child('Photodiode Monitor Config', 'Responsitivity').value()
|
|
pd_dark_current = self.params[1].child('Photodiode Monitor Config', 'Dark Current').value()
|
|
|
|
self.cfg_pd_mon_form.cfg_responsitivity_spinbox.setValue(pd_responsitivity)
|
|
self.cfg_pd_mon_form.cfg_pwr_limit_spinbox.setValue(ld_pwr_limit)
|
|
self.cfg_pd_mon_form.cfg_dark_current_spinbox.setValue(pd_dark_current)
|
|
|
|
self.cfg_pd_mon_form.show()
|
|
self.params[1].child('Photodiode Monitor Config', 'Configure Photodiode Monitor').sigActivated.connect(show_pd_mon_cfg_form)
|
|
|
|
@asyncSlot()
|
|
async def show_adc_filter_cfg_form(param):
|
|
settings = await self.kirdy.device.get_settings_summary()
|
|
filter_type = settings['thermostat']['temp_adc_settings']['filter_type']
|
|
filter_rate = settings['thermostat']['temp_adc_settings'][getattr(getattr(FilterConfig, filter_type), "_odr_type")]
|
|
|
|
self.cfg_adc_filter_form.filter_type_cbox.setCurrentIndex(self.cfg_adc_filter_form.filter_type_cbox.findText(filter_type))
|
|
self.cfg_adc_filter_form.sampling_rate_cbox_config(filter_type)
|
|
|
|
if filter_type == "Sinc3WithFineODR":
|
|
self.cfg_adc_filter_form.fine_filter_sampling_rate_spinbox.setValue(filter_rate)
|
|
else:
|
|
self.cfg_adc_filter_form.filter_sampling_rate_cbox.setCurrentIndex(self.cfg_adc_filter_form.filter_sampling_rate_cbox.findText(filter_rate))
|
|
self.cfg_adc_filter_form.show()
|
|
|
|
self.params[3].child('Temperature ADC Filter Settings', 'Configure ADC Filter').sigActivated.connect(show_adc_filter_cfg_form)
|
|
|
|
@pyqtSlot(str)
|
|
def cmd_cannot_execute(self, kirdy_msg):
|
|
self.info_box.setText(kirdy_msg)
|
|
self.info_box.setWindowTitle("Command fails to execute")
|
|
self.info_box.show()
|
|
|
|
@pyqtSlot(dict)
|
|
def autotune_tick(self, report):
|
|
match self.autotuner.state():
|
|
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
|
|
self.autotuner.run(report['thermostat']['temperature'], report['ts']/1000)
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(self.autotuner.output()))
|
|
case PIDAutotuneState.STATE_SUCCEEDED:
|
|
kp, ki, kd = self.autotuner.get_tec_pid()
|
|
self.autotuner.setOff()
|
|
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run")
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kp(kp))
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_ki(ki))
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_kd(kd))
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_pid_control_mode())
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_temperature_setpoint(self.params[3].child('PID Config', 'PID Auto Tune', 'Target Temperature').value()))
|
|
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.stop()
|
|
self.loading_spinner.hide()
|
|
self.info_box.setWindowTitle("PID AutoTune Success")
|
|
self.info_box.setText("PID Config has been loaded to Thermostat.\nRegulating temperature.")
|
|
self.info_box.show()
|
|
|
|
case PIDAutotuneState.STATE_FAILED:
|
|
self.autotuner.setOff()
|
|
self.params[3].child('PID Config', 'PID Auto Tune', 'Run').setOpts(title="Run")
|
|
self.kirdy.task_dispatcher(self.kirdy.thermostat.set_tec_i_out(0.0))
|
|
self.kirdy_handler.report_update_sig.disconnect(self.autotune_tick)
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.stop()
|
|
self.loading_spinner.hide()
|
|
self.info_box.setWindowTitle("PID Autotune Failed")
|
|
self.info_box.setText("PID Autotune is failed.")
|
|
self.info_box.show()
|
|
|
|
@pyqtSlot(bool)
|
|
def _on_connection_changed(self, result):
|
|
def ctrl_panel_setEnable(result):
|
|
self.ld_status.setEnabled(result)
|
|
self.ld_tree.setEnabled(result)
|
|
self.ld_pwr_on_btn.setEnabled(result)
|
|
self.ld_pwr_off_btn.setEnabled(result)
|
|
self.ld_clear_alarm_btn.setEnabled(result)
|
|
self.tec_status.setEnabled(result)
|
|
self.tec_tree.setEnabled(result)
|
|
self.tec_pwr_on_btn.setEnabled(result)
|
|
self.tec_pwr_off_btn.setEnabled(result)
|
|
self.tec_clear_alarm_btn.setEnabled(result)
|
|
ctrl_panel_setEnable(result)
|
|
|
|
def menu_bar_setEnable(result):
|
|
self.menu_action_about_kirdy.setEnabled(result)
|
|
self.menu_action_connect.setEnabled(result)
|
|
self.menu_action_dfu_mode.setEnabled(result)
|
|
self.menu_action_disconnect.setEnabled(result)
|
|
self.menu_action_hard_reset.setEnabled(result)
|
|
self.menu_action_save.setEnabled(result)
|
|
self.menu_action_load.setEnabled(result)
|
|
self.menu_action_update_net_settings.setEnabled(result)
|
|
menu_bar_setEnable(result)
|
|
|
|
def graph_group_setEnable(result):
|
|
self.ld_i_set_graph.setEnabled(result)
|
|
self.pd_mon_pwr_graph.setEnabled(result)
|
|
self.tec_i_graph.setEnabled(result)
|
|
self.tec_temp_graph.setEnabled(result)
|
|
graph_group_setEnable(result)
|
|
|
|
|
|
self.report_group.setEnabled(result)
|
|
# TODO: Use QStateMachine to manage connections
|
|
self.connect_btn.clicked.disconnect()
|
|
if result:
|
|
self.connect_btn.setText("Disconnect")
|
|
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
|
|
self._status()
|
|
else:
|
|
if self.kirdy_handler.connecting():
|
|
self.status_lbl.setText(f"Connection is dropped. Reconnecting to {self.ip_addr}:{self.port}.")
|
|
self.connect_btn.setText("Stop")
|
|
else:
|
|
self.connect_btn.setText("Connect")
|
|
self.connect_btn.clicked.connect(self.show_conn_settings_form)
|
|
self.clear_graphs()
|
|
self.status_lbl.setText(f"Disconnected from {self.ip_addr}:{self.port}.")
|
|
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
|
|
|
|
def _status(self):
|
|
host = self.ip_addr
|
|
port = self.port
|
|
hw_rev = self.kirdy_handler.get_hw_rev()
|
|
self.status_lbl.setText(f"Connected to Kirdy v{hw_rev['major']}.{hw_rev['minor']} @ {host}:{port}")
|
|
|
|
def clear_graphs(self):
|
|
self.graphs.clear_data_pts()
|
|
|
|
@pyqtSlot(dict)
|
|
def graphs_update(self, report):
|
|
self.graphs.plot_append(report)
|
|
|
|
@pyqtSlot(dict)
|
|
def update_ld_ctrl_panel_settings(self, settings):
|
|
try:
|
|
settings = settings['laser']
|
|
with QSignalBlocker(self.params[1]):
|
|
self.params[1].child('Output Config', 'LD Current Set').setValuewithLock(settings["ld_drive_current"]['value'])
|
|
self.params[1].child('Output Config', 'LD Terminals Short').setValuewithLock(settings["ld_terms_short"])
|
|
self.params[1].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"])
|
|
self.params[1].child('Photodiode Monitor Config', 'LD Power Limit').setValuewithLock(settings["ld_pwr_limit"]["value"])
|
|
self.update_pd_mon_form_readings(settings)
|
|
if settings["pd_mon_params"]["responsitivity"] is not None:
|
|
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(settings["pd_mon_params"]["responsitivity"])
|
|
else:
|
|
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValuewithLock(0)
|
|
self.params[1].child('Photodiode Monitor Config', 'Dark Current').setValuewithLock(settings["pd_mon_params"]["i_dark"])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
|
|
|
|
@pyqtSlot(dict)
|
|
def update_ld_ctrl_panel_readings(self, report):
|
|
try:
|
|
report = report['laser']
|
|
with QSignalBlocker(self.params[0]):
|
|
if report['pwr_excursion']:
|
|
self.params[0].child('Status', 'Color').setValuewithLock('r')
|
|
self.params[0].child('Status').setOpts(title='Status: OverPower Alarm')
|
|
else:
|
|
self.params[0].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w')
|
|
self.params[0].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off')
|
|
|
|
with QSignalBlocker(self.params[1]):
|
|
self.params[1].child('Readings', 'LD Current Set').setValuewithLock(report["ld_i_set"])
|
|
self.params[1].child('Readings', 'PD Current').setValuewithLock(report["pd_i"])
|
|
if report["pd_pwr"] is not None:
|
|
self.params[1].child('Readings', 'PD Power').setValuewithLock(report["pd_pwr"])
|
|
else:
|
|
self.params[1].child('Readings', 'PD Power').setValuewithLock(0)
|
|
self.params[1].child('Readings', 'LF Mod Termination (50 Ohm)').setValuewithLock(report["term_50ohm"])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
@pyqtSlot(dict)
|
|
def update_thermostat_ctrl_panel_settings(self, settings):
|
|
try:
|
|
settings = settings['thermostat']
|
|
with QSignalBlocker(self.params[3]):
|
|
self.params[3].child('Output Config', 'Control Method').setValuewithLock("Temperature PID" if settings["pid_engaged"] else "Constant Current")
|
|
self.params[3].child('Output Config', 'Control Method', 'Set Current').setValuewithLock(settings["tec_settings"]['i_set']['value'])
|
|
self.params[3].child('Output Config', 'Control Method', 'Set Temperature').setValuewithLock(float(settings["temperature_setpoint"]))
|
|
self.params[3].child('Output Config', 'Limits', 'Max Cooling Current').setValuewithLock(settings["tec_settings"]['max_i_pos']['value'])
|
|
self.params[3].child('Output Config', 'Limits', 'Max Heating Current').setValuewithLock(settings["tec_settings"]['max_i_neg']['value'])
|
|
self.params[3].child('Output Config', 'Limits', 'Max Voltage Difference').setValuewithLock(settings["tec_settings"]['max_v']['value'])
|
|
self.params[3].child('Output Config', 'Default Power On').setValuewithLock(settings["default_pwr_on"])
|
|
filter_type = settings['temp_adc_settings']['filter_type']
|
|
filter_rate = settings['temp_adc_settings'][getattr(getattr(FilterConfig, filter_type), "_odr_type")]
|
|
self.update_adc_filter_form_readings(filter_type, filter_rate)
|
|
self.params[3].child('Temperature ADC Filter Settings', 'Filter Type').setValue(filter_type)
|
|
self.params[3].child('Temperature ADC Filter Settings', 'Sampling Rate').setValue(settings['temp_adc_settings']['rate'])
|
|
self.params[3].child('Temperature Monitor Config', 'Upper Limit').setValuewithLock(settings["temp_mon_settings"]['upper_limit'])
|
|
self.params[3].child('Temperature Monitor Config', 'Lower Limit').setValuewithLock(settings["temp_mon_settings"]['lower_limit'])
|
|
self.params[3].child('PID Config', 'Kp').setValuewithLock(settings["pid_params"]['kp'])
|
|
self.params[3].child('PID Config', 'Ki').setValuewithLock(settings["pid_params"]['ki'])
|
|
self.params[3].child('PID Config', 'Kd').setValuewithLock(settings["pid_params"]['kd'])
|
|
self.params[3].child('PID Config', 'PID Output Clamping', 'Minimum').setValuewithLock(settings["pid_params"]['output_min'])
|
|
self.params[3].child('PID Config', 'PID Output Clamping', 'Maximum').setValuewithLock(settings["pid_params"]['output_max'])
|
|
self.params[3].child('Thermistor Settings', 'T₀').setValuewithLock(settings["thermistor_params"]['t0'])
|
|
self.params[3].child('Thermistor Settings', 'R₀').setValuewithLock(settings["thermistor_params"]['r0'])
|
|
self.params[3].child('Thermistor Settings', 'B').setValuewithLock(settings["thermistor_params"]['b'])
|
|
self.graphs.set_temp_setpoint_line(temp=round(settings["temperature_setpoint"], 4))
|
|
self.graphs.set_temp_setpoint_line(visible=settings['pid_engaged'])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
|
|
|
|
@pyqtSlot(dict)
|
|
def update_thermostat_ctrl_panel_readings(self, report):
|
|
try:
|
|
report = report['thermostat']
|
|
with QSignalBlocker(self.params[2]):
|
|
if report['temp_mon_status']['over_temp_alarm']:
|
|
self.params[2].child('Status', 'Color').setValuewithLock('r')
|
|
self.params[2].child('Status').setOpts(title='Status: OverTemperature Alarm')
|
|
|
|
else:
|
|
self.params[2].child('Status', 'Color').setValuewithLock('g' if report['pwr_on'] else 'w')
|
|
self.params[2].child('Status').setOpts(title='Status: Power On' if report['pwr_on'] else 'Status: Power Off')
|
|
|
|
with QSignalBlocker(self.params[3]):
|
|
if report["temperature"] == None:
|
|
self.params[3].child('Readings', 'Temperature').setValuewithLock(-273.15)
|
|
else:
|
|
self.params[3].child('Readings', 'Temperature').setValuewithLock(report["temperature"])
|
|
self.params[3].child('Readings', 'Current through TEC').setValuewithLock(report["tec_i"])
|
|
rate = 1 / (report['interval']['ms'] / 1e3 + report['interval']['us'] / 1e6)
|
|
self.params[3].child('Temperature ADC Filter Settings', 'Recorded Sampling Rate').setValue(rate)
|
|
self.cfg_adc_filter_form.recorded_sampling_rate_reading_lbl.setText(f"{rate:.2f}")
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
@pyqtSlot(int)
|
|
def set_max_samples(self, samples: int):
|
|
self.graphs.set_max_samples(samples)
|
|
|
|
|
|
@pyqtSlot()
|
|
def update_net_settings(self):
|
|
net_settings = self.update_net_settings_form.get_net_settings()
|
|
if net_settings is None:
|
|
self.status_lbl.setText("Invalid IP Settings Input")
|
|
return
|
|
addr = net_settings["ip_addr"]
|
|
port = net_settings["port"]
|
|
prefix_len = net_settings["prefix_len"]
|
|
gateway = net_settings["gateway_addr"]
|
|
self.kirdy.task_dispatcher(self.kirdy.device.set_ip_settings(addr, port, prefix_len, gateway))
|
|
self.status_lbl.setText("IP Settings is Updated")
|
|
|
|
@pyqtSlot()
|
|
def start_connecting(self):
|
|
net_settings = self.conn_settings_form.get_net_settings()
|
|
if net_settings is None:
|
|
self.status_lbl.setText("Invalid IP Settings Input")
|
|
return
|
|
self.ip_addr = net_settings["ip_addr"]
|
|
self.port = net_settings["port"]
|
|
|
|
host = self.ip_addr
|
|
port = self.port
|
|
|
|
if not (self.kirdy_handler.connecting() or self.kirdy_handler.connected()):
|
|
self.status_lbl.setText("Connecting...")
|
|
self.kirdy_handler.start_session(host=host, port=port)
|
|
|
|
self.connect_btn.setText("Stop")
|
|
self.connect_btn.clicked.disconnect()
|
|
self.connect_btn.clicked.connect(self.kirdy_handler.end_session)
|
|
|
|
@pyqtSlot(object, object)
|
|
def send_command(self, param, changes):
|
|
for inner_param, change, data in changes:
|
|
if change == 'value':
|
|
""" cmd translation from mutex type parameter """
|
|
if inner_param.opts.get('target_action_pair', None) is not None:
|
|
target, action = inner_param.opts['target_action_pair'][inner_param.opts['limits'].index(data)]
|
|
cmd = getattr(getattr(self.kirdy, target), action)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=True)
|
|
self.kirdy.task_dispatcher(cmd())
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=False)
|
|
continue
|
|
""" cmd translation from non-mutex type parameter"""
|
|
if inner_param.opts.get("target", None) is not None:
|
|
if inner_param.opts.get("action", None) is not None:
|
|
if inner_param.opts.get("unit", None) is not None:
|
|
_, _, suffix = siParse(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX)
|
|
data = siEval(str(data)+inner_param.opts["unit"], regex=FLOAT_REGEX, suffix=suffix)
|
|
cmd = getattr(getattr(self.kirdy, inner_param.opts["target"]), inner_param.opts["action"])
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=True)
|
|
self.kirdy.task_dispatcher(cmd(data))
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=False)
|
|
continue
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.log_file or args.log_file is None:
|
|
filename = args.log_file
|
|
if filename is None:
|
|
time = datetime.now()
|
|
filename = f"{time.year}-{time.day}-{time.month}-{time.hour}:{time.minute}:{time.second}.log"
|
|
logging.basicConfig(filename=filename,
|
|
filemode='a',
|
|
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
|
|
datefmt='%Y/%d/%m %H:%M:%S',
|
|
level=logging.INFO)
|
|
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
if __name__ == '__main__':
|
|
main()
|