871 lines
42 KiB
Python
871 lines
42 KiB
Python
from PyQt6 import QtWidgets, QtGui, QtCore
|
|
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
|
|
import pyqtgraph.parametertree.parameterTypes as pTypes
|
|
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
|
|
import pyqtgraph as pg
|
|
pg.setConfigOptions(antialias=True)
|
|
from pyqtgraph import mkPen
|
|
from pglive.sources.live_axis_range import LiveAxisRange
|
|
from pglive.sources.data_connector import DataConnector
|
|
from pglive.kwargs import Axis, LeadingLine
|
|
from pglive.sources.live_plot import LiveLinePlot
|
|
from pglive.sources.live_plot_widget import LivePlotWidget
|
|
from pglive.sources.live_axis import LiveAxis
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import logging
|
|
import asyncio
|
|
from driver.kirdy_async import Kirdy, StoppedConnecting
|
|
import qasync
|
|
from qasync import asyncSlot, asyncClose
|
|
from collections import deque
|
|
from datetime import datetime, timezone, timedelta
|
|
from time import time
|
|
from typing import Any, Optional, List
|
|
from ui.ui_conn_settings_form import Ui_Conn_Settings_Form
|
|
from ui.ui_update_network_settings_form import Ui_Update_Network_Settings_Form
|
|
from ui.ui_kirdy_qt import Ui_MainWindow
|
|
from dateutil import tz
|
|
import math
|
|
import socket
|
|
|
|
COMMON_ERROR_MSG = "Connection Timeout. Disconnecting."
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="ARTIQ master")
|
|
|
|
parser.add_argument("--connect", default=None, action="store_true",
|
|
help="Automatically connect to the specified Thermostat in IP:port format")
|
|
parser.add_argument('IP', metavar="ip", default=None, nargs='?')
|
|
parser.add_argument('PORT', metavar="port", default=None, nargs='?')
|
|
parser.add_argument("-l", "--log", dest="logLevel", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
|
help="Set the logging level")
|
|
|
|
return parser
|
|
|
|
class KirdyDataWatcher(QObject):
|
|
"""
|
|
This class provides various signals for Mainwindow to update various kinds of GUI objects
|
|
"""
|
|
connection_error_sig = pyqtSignal()
|
|
setting_update_sig = pyqtSignal(dict)
|
|
report_update_sig = pyqtSignal(dict)
|
|
|
|
def __init__(self, parent, kirdy, update_s):
|
|
self._update_s = update_s
|
|
self._kirdy = kirdy
|
|
self._watch_task = None
|
|
self._report_mode_task = None
|
|
self._poll_for_report = True
|
|
super().__init__(parent)
|
|
|
|
async def signal_emitter(self):
|
|
settings_summary = await self._kirdy.device.get_settings_summary()
|
|
self.setting_update_sig.emit(settings_summary)
|
|
if self._poll_for_report:
|
|
status_report = await self._kirdy.device.get_status_report()
|
|
self.report_update_sig.emit(status_report)
|
|
|
|
async def run(self):
|
|
try:
|
|
task = asyncio.create_task(self.signal_emitter())
|
|
while True:
|
|
if task.done():
|
|
_ = task.result()
|
|
task = asyncio.create_task(self.signal_emitter())
|
|
await asyncio.sleep(self._update_s)
|
|
except asyncio.CancelledError:
|
|
task.cancel()
|
|
except Exception as e:
|
|
logging.error(COMMON_ERROR_MSG)
|
|
self._kirdy.stop_report_mode()
|
|
self.connection_error_sig.emit()
|
|
|
|
def start_watching(self):
|
|
self._watch_task = asyncio.create_task(self.run())
|
|
|
|
async def stop_watching(self):
|
|
if self._watch_task is not None:
|
|
self._watch_task.cancel()
|
|
await self._watch_task
|
|
self._watch_task = None
|
|
await self.set_report_mode(False)
|
|
|
|
async def set_report_mode(self, enabled: bool):
|
|
self._poll_for_report = not enabled
|
|
if enabled:
|
|
self._report_mode_task = asyncio.create_task(self.report_mode())
|
|
else:
|
|
if self._report_mode_task is not None:
|
|
self._kirdy.stop_report_mode()
|
|
await self._report_mode_task
|
|
self._report_mode_task = None
|
|
|
|
async def report_mode(self):
|
|
try:
|
|
async for status_report in self._kirdy.report_mode():
|
|
if status_report["msg_type"] == "Exception":
|
|
raise TimeoutError("Connection Timeout")
|
|
self.report_update_sig.emit(status_report)
|
|
except Exception as e:
|
|
logging.error(f"{COMMON_ERROR_MSG}")
|
|
self.connection_error_sig.emit()
|
|
|
|
@pyqtSlot(float)
|
|
def set_update_s(self, update_s):
|
|
self._update_s = update_s
|
|
|
|
class Graphs:
|
|
def __init__(self, ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph, max_samples=1000):
|
|
self.graphs = [ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph]
|
|
self.connectors = []
|
|
|
|
self._pd_mon_pwr_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._ld_i_set_plot = LiveLinePlot(name="Set", pen=pg.mkPen('r'))
|
|
self._tec_temp_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._tec_setpoint_plot = LiveLinePlot(pen=pg.mkPen('r'))
|
|
self._tec_i_target_plot = LiveLinePlot(name="Target", pen=pg.mkPen('r'))
|
|
self._tec_i_measure_plot = LiveLinePlot(name="Measure", pen=pg.mkPen('g'))
|
|
|
|
self._temp_setpoint_line = tec_temp_graph.getPlotItem().addLine(label='{value} °C', pen=pg.mkPen('g'))
|
|
# Render the temperature setpoint line on top of the temperature being plotted
|
|
self._temp_setpoint_line.setZValue(10)
|
|
self._temp_setpoint_line.setVisible(False)
|
|
|
|
def tickStrings(values: List, scale: float, spacing: float) -> List:
|
|
return [datetime.fromtimestamp(value/1000, tz=timezone.utc).strftime("%H:%M:%S") for value in values]
|
|
|
|
for graph in ld_i_set_graph, pd_mon_pwr_graph, tec_i_graph, tec_temp_graph:
|
|
time_axis = LiveAxis('bottom', text="Time since Kirdy Reset (Hr:Min:Sec)", tick_angle=-45, units="")
|
|
# Display the relative ts in custom %H:%M:%S format without local timezone
|
|
time_axis.tickStrings = tickStrings
|
|
# Prevent scaling prefix being added to the back fo axis label
|
|
time_axis.autoSIPrefix = False
|
|
time_axis.showLabel()
|
|
|
|
graph.setAxisItems({'bottom': time_axis})
|
|
graph.add_crosshair(pg.mkPen(color='red', width=1), {'color': 'green'})
|
|
|
|
#TODO: x_range should not be updated on every tick
|
|
graph.x_range_controller = LiveAxisRange(roll_on_tick=17, offset_left=4900)
|
|
graph.x_range_controller.crop_left_offset_to_data = True
|
|
# Enable linking of axes in the graph widget's context menu
|
|
graph.register(graph.getPlotItem().titleLabel.text) # Slight hack getting the title
|
|
|
|
self.max_samples = max_samples
|
|
ld_i_set_axis = LiveAxis('left', text="Current", units="A")
|
|
ld_i_set_axis.showLabel()
|
|
ld_i_set_graph.setAxisItems({'left': ld_i_set_axis})
|
|
ld_i_set_graph.addItem(self._ld_i_set_plot)
|
|
self.ld_i_set_connector = DataConnector(self._ld_i_set_plot, max_points=self.max_samples)
|
|
self.connectors += [self.ld_i_set_connector]
|
|
|
|
pd_mon_pwr_axis = LiveAxis('left', text="Power", units="W")
|
|
pd_mon_pwr_axis.showLabel()
|
|
pd_mon_pwr_graph.setAxisItems({'left': pd_mon_pwr_axis})
|
|
pd_mon_pwr_graph.addItem(self._pd_mon_pwr_plot)
|
|
self.pd_mon_pwr_connector = DataConnector(self._pd_mon_pwr_plot, max_points=self.max_samples)
|
|
self.connectors += [self.pd_mon_pwr_connector]
|
|
|
|
tec_temp_axis = LiveAxis('left', text="Temperature", units="°C")
|
|
tec_temp_axis.showLabel()
|
|
tec_temp_graph.setAxisItems({'left': tec_temp_axis})
|
|
tec_temp_graph.addItem(self._tec_setpoint_plot)
|
|
tec_temp_graph.addItem(self._tec_temp_plot)
|
|
self.tec_setpoint_connector = DataConnector(self._tec_setpoint_plot, max_points=1)
|
|
self.tec_temp_connector = DataConnector(self._tec_temp_plot, max_points=self.max_samples)
|
|
self.connectors += [self.tec_temp_connector, self.tec_setpoint_connector]
|
|
|
|
tec_i_axis = LiveAxis('left', text="Current", units="A")
|
|
tec_i_axis.showLabel()
|
|
tec_i_graph.setAxisItems({'left': tec_i_axis})
|
|
tec_i_graph.addLegend(brush=(50, 50, 200, 150))
|
|
tec_i_graph.y_range_controller = LiveAxisRange(fixed_range=[-1.0, 1.0])
|
|
tec_i_graph.addItem(self._tec_i_target_plot)
|
|
tec_i_graph.addItem(self._tec_i_measure_plot)
|
|
self.tec_i_target_connector = DataConnector(self._tec_i_target_plot, max_points=self.max_samples)
|
|
self.tec_i_measure_connector = DataConnector(self._tec_i_measure_plot, max_points=self.max_samples)
|
|
self.connectors += [self.tec_i_target_connector, self.tec_i_measure_connector]
|
|
|
|
def set_max_samples(self, max_samples):
|
|
self.max_samples = max_samples
|
|
for connector in self.connectors:
|
|
with connector.data_lock:
|
|
connector.max_points = self.max_samples
|
|
connector.x = deque(maxlen=int(connector.max_points))
|
|
connector.y = deque(maxlen=int(connector.max_points))
|
|
|
|
def plot_append(self, report):
|
|
try:
|
|
ld_i_set = report['laser']['ld_i_set']
|
|
pd_pwr = report['laser']['pd_pwr']
|
|
|
|
tec_i_set = report['thermostat']['i_set']
|
|
tec_i_measure = report['thermostat']['tec_i']
|
|
tec_temp = report['thermostat']['temperature']
|
|
|
|
ts = report['ts']
|
|
|
|
self.ld_i_set_connector.cb_append_data_point(ld_i_set, ts)
|
|
self.pd_mon_pwr_connector.cb_append_data_point(pd_pwr, ts)
|
|
|
|
if tec_temp is not None:
|
|
self.tec_temp_connector.cb_append_data_point(tec_temp, ts)
|
|
if self._temp_setpoint_line.isVisible():
|
|
self.tec_setpoint_connector.cb_append_data_point(self._temp_setpoint_line.value(), ts)
|
|
else:
|
|
self.tec_setpoint_connector.cb_append_data_point(tec_temp, ts)
|
|
if tec_i_measure is not None:
|
|
self.tec_i_measure_connector.cb_append_data_point(tec_i_measure, ts)
|
|
self.tec_i_target_connector.cb_append_data_point(tec_i_set, ts)
|
|
except Exception as e:
|
|
logging.error(f"Graph Value cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
def clear_data_pts(self):
|
|
for connector in self.connectors:
|
|
connector.clear()
|
|
connector.resume()
|
|
|
|
def set_temp_setpoint_line(self, temp=None, visible=None):
|
|
if visible is not None:
|
|
self._temp_setpoint_line.setVisible(visible)
|
|
if temp is not None:
|
|
self._temp_setpoint_line.setValue(temp)
|
|
|
|
# PyQtGraph normally does not update this text when the line
|
|
# is not visible, so make sure that the temperature label
|
|
# gets updated always, and doesn't stay at an old value.
|
|
self._temp_setpoint_line.label.setText(f"{temp} °C", color='g')
|
|
|
|
class MutexParameter(pTypes.ListParameter):
|
|
"""
|
|
Mutually exclusive parameter where only one of its children is visible at a time, list selectable.
|
|
|
|
The ordering of the list items determines which children will be visible.
|
|
"""
|
|
def __init__(self, **opts):
|
|
super().__init__(**opts)
|
|
|
|
self.sigValueChanged.connect(self.show_chosen_child)
|
|
self.sigValueChanged.emit(self, self.opts['value'])
|
|
|
|
def _get_param_from_value(self, value):
|
|
if isinstance(self.opts['limits'], dict):
|
|
values_list = list(self.opts['limits'].values())
|
|
else:
|
|
values_list = self.opts['limits']
|
|
|
|
return self.children()[values_list.index(value)]
|
|
|
|
@pyqtSlot(object, object)
|
|
def show_chosen_child(self, value):
|
|
for param in self.children():
|
|
param.hide()
|
|
|
|
child_to_show = self._get_param_from_value(value.value())
|
|
child_to_show.show()
|
|
|
|
if child_to_show.opts.get('triggerOnShow', None):
|
|
child_to_show.sigValueChanged.emit(child_to_show, child_to_show.value())
|
|
|
|
registerParameterType('mutex', MutexParameter)
|
|
|
|
class UpdateNetSettingsForm(QtWidgets.QDialog, Ui_Update_Network_Settings_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
|
|
def get_net_settings(self):
|
|
try:
|
|
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
|
|
gateway_addr = f"{self.gateway_in_0.text()}.{self.gateway_in_1.text()}.{self.gateway_in_2.text()}.{self.gateway_in_3.text()}"
|
|
socket.inet_aton(ip_addr)
|
|
socket.inet_aton(gateway_addr)
|
|
|
|
return {
|
|
"ip_addr": ip_addr,
|
|
"gateway_addr": gateway_addr,
|
|
"prefix_len": int(self.prefix_len_in.text()),
|
|
"port": int(self.port_in.text()),
|
|
}
|
|
except (OSError, ValueError):
|
|
return None
|
|
|
|
|
|
class ConnSettingsForm(QtWidgets.QDialog, Ui_Conn_Settings_Form):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setupUi(self)
|
|
|
|
def get_net_settings(self):
|
|
try:
|
|
ip_addr = f"{self.addr_in_0.text()}.{self.addr_in_1.text()}.{self.addr_in_2.text()}.{self.addr_in_3.text()}"
|
|
socket.inet_aton(ip_addr)
|
|
|
|
return {
|
|
"ip_addr": ip_addr,
|
|
"port": int(self.port_in.text())
|
|
}
|
|
except (OSError, ValueError):
|
|
return None
|
|
|
|
class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
|
|
"""The maximum number of sample points to store."""
|
|
DEFAULT_MAX_SAMPLES = 1000
|
|
DEFAULT_IP_ADDR = '192.168.1.128'
|
|
DEFAULT_PORT = 1337
|
|
|
|
LASER_DIODE_STATUS = [
|
|
{'name': 'Power', 'type': 'color', 'value': 'w', 'readonly': True},
|
|
{'name': 'Alarm', 'type': 'color', 'value': 'w', 'readonly': True},
|
|
]
|
|
|
|
LASER_DIODE_PARAMETERS = [
|
|
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'LD Current Set', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True},
|
|
{'name': 'PD Current', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'readonly': True},
|
|
{'name': 'PD Power', 'type': 'float', 'suffix': 'W', 'siPrefix': True, 'readonly': True},
|
|
{'name': 'LF Mod Impedance', 'type': 'list', 'limits': ['Is50Ohm', 'Not50Ohm'], 'readonly': True}
|
|
]},
|
|
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'LD Current Set', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1),
|
|
'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_i'},
|
|
{'name': 'LD Current Set Soft Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 1),
|
|
'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_i_soft_limit'},
|
|
{'name': 'LD Power Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 0.3),
|
|
'suffix': 'W', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_ld_pwr_limit'},
|
|
{'name': 'LD Terminals Short', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_ld_terms_short'},
|
|
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'laser', 'action': 'set_default_pwr_on'},
|
|
]},
|
|
{'name': 'Photodiode Monitor Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Responsitivity', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000),
|
|
'suffix': 'A/W', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_responsitivity'},
|
|
{'name': 'Dark Current', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (0, 3000),
|
|
'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'laser', 'action': 'set_pd_mon_dark_current'},
|
|
]},
|
|
]
|
|
|
|
THERMOSTAT_STATUS = [
|
|
{'name': 'Power', 'type': 'color', 'value': 'w', 'readonly': True},
|
|
{'name': 'Alarm', 'type': 'color', 'value': 'w', 'readonly': True},
|
|
]
|
|
|
|
THERMOSTAT_PARAMETERS = [
|
|
{'name': 'Readings', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Temperature', 'type': 'float', 'format': '{value:.4f} °C', 'readonly': True},
|
|
{'name': 'Current through TEC', 'type': 'float', 'suffix': 'A', 'siPrefix': True, 'decimals': 6, 'readonly': True},
|
|
]},
|
|
{'name': 'Output Config', 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Control Method', 'type': 'mutex', 'limits': ['Constant Current', 'Temperature PID'],
|
|
'target_action_pair': [['thermostat', 'set_constant_current_control_mode'], ['thermostat', 'set_pid_control_mode']], 'children': [
|
|
{'name': 'Set Current', 'type': 'float', 'value': 0, 'step': 0.001, 'limits': (-1, 1), 'triggerOnShow': True,
|
|
'decimals': 6, 'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_i_out'},
|
|
{'name': 'Set Temperature', 'type': 'float', 'value': 25, 'step': 0.1, 'limits': (-273, 300),
|
|
'format': '{value:.4f} °C', 'lock': False, 'target': 'thermostat', 'action': 'set_temperature_setpoint'},
|
|
]},
|
|
{'name': 'Limits', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Max Cooling Current', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 1),
|
|
'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_cooling_i'},
|
|
{'name': 'Max Heating Current', 'type': 'float', 'value': 0, 'step': 0.001, 'decimals': 6, 'limits': (0, 1),
|
|
'suffix': 'A', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_heating_i'},
|
|
{'name': 'Max Voltage Difference', 'type': 'float', 'value': 0, 'step': 0.1, 'limits': (0, 5),
|
|
'suffix': 'V', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_tec_max_v'},
|
|
]},
|
|
{'name': 'Default Power On', 'type': 'bool', 'value': False, 'lock': False, 'target': 'thermostat', 'action': 'set_default_pwr_on'},
|
|
]},
|
|
# TODO Temperature ADC Filter Settings
|
|
{'name': 'Temperature Monitor Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Upper Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit'},
|
|
{'name': 'Lower Limit', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_temp_mon_upper_limit'},
|
|
]},
|
|
{'name': 'Thermistor Settings','expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'T₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6, 'limits': (-273, 300),
|
|
'suffix': '°C', 'lock': False, 'target': 'thermostat', 'action': 'set_sh_t0'},
|
|
{'name': 'R₀', 'type': 'float', 'value': 0, 'step': 1, 'decimals': 6,
|
|
'suffix': 'Ω', 'siPrefix': True, 'lock': False, 'target': 'thermostat', 'action': 'set_sh_r0'},
|
|
{'name': 'B', 'type': 'float', 'value': 3950, 'step': 1, 'suffix': 'K', 'decimals': 4, 'lock': False, 'target': 'thermostat', 'action': 'set_sh_beta'},
|
|
]},
|
|
{'name': 'PID Config', 'expanded': False, 'type': 'group', 'children': [
|
|
{'name': 'Kp', 'type': 'float', 'step': 0.1, 'suffix': '', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kp'},
|
|
{'name': 'Ki', 'type': 'float', 'step': 0.1, 'suffix': 'Hz', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_ki'},
|
|
{'name': 'Kd', 'type': 'float', 'step': 0.1, 'suffix': 's', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_kd'},
|
|
{'name': "PID Output Clamping", 'expanded': True, 'type': 'group', 'children': [
|
|
{'name': 'Minimum', 'type': 'float', 'step': 100, 'limits': (-1, 1), 'decimals': 6, 'suffix': 'A', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_min'},
|
|
{'name': 'Maximum', 'type': 'float', 'step': 100, 'limits': (-1, 1), 'decimals': 6, 'suffix': 'A', 'lock': False, 'target': 'thermostat', 'action': 'set_pid_output_max'},
|
|
]},
|
|
# TODO PID AutoTune
|
|
]},
|
|
]
|
|
def __init__(self, args):
|
|
super().__init__()
|
|
self.kirdy = Kirdy()
|
|
self.setupUi(self)
|
|
|
|
# Load Global QT Style Sheet Settings
|
|
qss=os.path.join(os.path.dirname(__file__), "ui/mainwindow.qss")
|
|
with open(qss,"r") as fh:
|
|
self.setStyleSheet(fh.read())
|
|
|
|
self.ip_addr = self.DEFAULT_IP_ADDR
|
|
self.port = self.DEFAULT_PORT
|
|
|
|
self.conn_settings_form = ConnSettingsForm()
|
|
self.conn_settings_form.accepted.connect(self.start_connecting)
|
|
|
|
self.update_net_settings_form = UpdateNetSettingsForm()
|
|
self.update_net_settings_form.accepted.connect(self.update_net_settings)
|
|
|
|
self.max_samples = self.DEFAULT_MAX_SAMPLES
|
|
|
|
self.setup_menu_bar()
|
|
self._set_up_ctrl_btns()
|
|
self._set_up_plot_menu()
|
|
|
|
self.params = [
|
|
Parameter.create(name=f"Laser Diode Status", type='group', value=0, children=self.LASER_DIODE_STATUS),
|
|
Parameter.create(name=f"Laser Diode Parameters", type='group', value=1, children=self.LASER_DIODE_PARAMETERS),
|
|
Parameter.create(name=f"Thermostat Status", type='group', value=2, children=self.THERMOSTAT_STATUS),
|
|
Parameter.create(name=f"Thermostat Parameters", type='group', value=3, children=self.THERMOSTAT_PARAMETERS),
|
|
]
|
|
self._set_param_tree()
|
|
|
|
self.tec_i_graph.setTitle("TEC Current")
|
|
self.tec_temp_graph.setTitle("TEC Temperature")
|
|
self.ld_i_set_graph.setTitle("LD Current Set")
|
|
self.pd_mon_pwr_graph.setTitle("PD Mon Power")
|
|
|
|
self.connect_btn.clicked.connect(self.show_conn_settings_form)
|
|
self.report_box.stateChanged.connect(self.on_report_box_stateChanged)
|
|
|
|
self.kirdy_data_watcher = KirdyDataWatcher(self, self.kirdy, self.report_refresh_spin.value())
|
|
self.kirdy_data_watcher.connection_error_sig.connect(self.bail)
|
|
# TODO: Identify the usable range of set_update_s
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self.kirdy_data_watcher.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
self.kirdy_data_watcher.setting_update_sig.connect(self.update_ld_ctrl_panel_settings)
|
|
self.kirdy_data_watcher.setting_update_sig.connect(self.update_thermostat_ctrl_panel_settings)
|
|
self.kirdy_data_watcher.report_update_sig.connect(self.update_ld_ctrl_panel_readings)
|
|
self.kirdy_data_watcher.report_update_sig.connect(self.update_thermostat_ctrl_panel_readings)
|
|
|
|
self.graphs = Graphs(self.ld_i_set_graph, self.pd_mon_pwr_graph, self.tec_i_graph, self.tec_temp_graph, max_samples=self.max_samples)
|
|
self.kirdy_data_watcher.report_update_sig.connect(self.graphs.plot_append)
|
|
|
|
self.loading_spinner.hide()
|
|
|
|
def setup_menu_bar(self):
|
|
@pyqtSlot(bool)
|
|
def about_kirdy(_):
|
|
# TODO: Replace the hardware revision placeholder
|
|
QtWidgets.QMessageBox.about(
|
|
self,
|
|
"About Kirdy",
|
|
f"""
|
|
<h1>Sinara 1550 Kirdy v"major rev"."minor rev"</h1>
|
|
"""
|
|
)
|
|
self.menu_action_about_kirdy.triggered.connect(about_kirdy)
|
|
|
|
@pyqtSlot(bool)
|
|
def about_gui(_):
|
|
# TODO: Replace the hardware revision placeholder
|
|
QtWidgets.QMessageBox.about(
|
|
self,
|
|
"About Kirdy Control Panel",
|
|
f"""
|
|
<h1>Version: "Version"</h1>
|
|
"""
|
|
)
|
|
self.menu_action_about_gui.triggered.connect(about_gui)
|
|
|
|
@asyncSlot(bool)
|
|
async def dfu_mode(_):
|
|
await self.kirdy.device.dfu()
|
|
await self._on_connection_changed(False)
|
|
self.menu_action_dfu_mode.triggered.connect(dfu_mode)
|
|
|
|
@asyncSlot(bool)
|
|
async def reset_kirdy(_):
|
|
await self._on_connection_changed(False, hard_reset=True)
|
|
self.menu_action_hard_reset.triggered.connect(reset_kirdy)
|
|
|
|
@asyncSlot(bool)
|
|
async def save_settings(_):
|
|
await self.kirdy.device.save_current_settings_to_flash()
|
|
saved = QtWidgets.QMessageBox(self)
|
|
saved.setWindowTitle("Config saved")
|
|
saved.setText(f"Laser diode and thermostat configs have been saved into flash.")
|
|
saved.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
saved.show()
|
|
self.menu_action_save.triggered.connect(save_settings)
|
|
|
|
@asyncSlot(bool)
|
|
async def load_settings(_):
|
|
await self.kirdy.device.load_current_settings_from_flash()
|
|
loaded = QtWidgets.QMessageBox(self)
|
|
loaded.setWindowTitle("Config loaded")
|
|
loaded.setText(f"Laser Diode and Thermostat configs have been loaded from flash.")
|
|
loaded.setIcon(QtWidgets.QMessageBox.Icon.Information)
|
|
loaded.show()
|
|
self.menu_action_load.triggered.connect(load_settings)
|
|
|
|
@asyncSlot(bool)
|
|
async def show_update_net_settings_form(_):
|
|
self.update_net_settings_form.retranslateUi(self.update_net_settings_form)
|
|
self.update_net_settings_form.show()
|
|
self.menu_action_update_net_settings.triggered.connect(show_update_net_settings_form)
|
|
|
|
def show_conn_settings_form(self):
|
|
ip_addr = self.ip_addr.split(".")
|
|
self.conn_settings_form.addr_in_0.setText(ip_addr[0])
|
|
self.conn_settings_form.addr_in_1.setText(ip_addr[1])
|
|
self.conn_settings_form.addr_in_2.setText(ip_addr[2])
|
|
self.conn_settings_form.addr_in_3.setText(ip_addr[3])
|
|
self.conn_settings_form.port_in.setText(str(self.port))
|
|
self.conn_settings_form.show()
|
|
|
|
def _set_up_ctrl_btns(self):
|
|
@asyncSlot(bool)
|
|
async def ld_pwr_on(_):
|
|
await self.kirdy.laser.set_power_on(True)
|
|
self.ld_pwr_on_btn.clicked.connect(ld_pwr_on)
|
|
|
|
@asyncSlot(bool)
|
|
async def ld_pwr_off(_):
|
|
await self.kirdy.laser.set_power_on(False)
|
|
self.ld_pwr_off_btn.clicked.connect(ld_pwr_off)
|
|
|
|
@asyncSlot(bool)
|
|
async def ld_clear_alarm(_):
|
|
await self.kirdy.laser.clear_alarm()
|
|
self.ld_clear_alarm_btn.clicked.connect(ld_clear_alarm)
|
|
|
|
@asyncSlot(bool)
|
|
async def tec_pwr_on(_):
|
|
await self.kirdy.thermostat.set_power_on(True)
|
|
self.tec_pwr_on_btn.clicked.connect(tec_pwr_on)
|
|
|
|
@asyncSlot(bool)
|
|
async def tec_pwr_off(_):
|
|
await self.kirdy.thermostat.set_power_on(False)
|
|
self.tec_pwr_off_btn.clicked.connect(tec_pwr_off)
|
|
|
|
@asyncSlot(bool)
|
|
async def tec_clear_alarm(_):
|
|
await self.kirdy.thermostat.clear_alarm()
|
|
self.tec_clear_alarm_btn.clicked.connect(tec_clear_alarm)
|
|
|
|
def _set_up_plot_menu(self):
|
|
self.plot_menu = QtWidgets.QMenu()
|
|
self.plot_menu.setTitle("Plot Settings")
|
|
|
|
clear = QtGui.QAction("Clear graphs", self.plot_menu)
|
|
clear.triggered.connect(self.clear_graphs)
|
|
self.plot_menu.addAction(clear)
|
|
self.plot_menu.clear = clear
|
|
|
|
self.samples_spinbox = QtWidgets.QSpinBox()
|
|
self.samples_spinbox.setRange(2, 100000)
|
|
self.samples_spinbox.setSuffix(' samples')
|
|
self.samples_spinbox.setValue(self.max_samples)
|
|
self.samples_spinbox.valueChanged.connect(self.set_max_samples)
|
|
|
|
limit_samples = QtWidgets.QWidgetAction(self.plot_menu)
|
|
limit_samples.setDefaultWidget(self.samples_spinbox)
|
|
self.plot_menu.addAction(limit_samples)
|
|
self.plot_menu.limit_samples = limit_samples
|
|
|
|
self.plot_settings.setMenu(self.plot_menu)
|
|
|
|
def _set_param_tree(self):
|
|
def _setValue(self, value, blockSignal=None):
|
|
"""
|
|
Implement 'lock' mechanism for Parameter Type
|
|
|
|
Modified from the source
|
|
"""
|
|
try:
|
|
if blockSignal is not None:
|
|
self.sigValueChanged.disconnect(blockSignal)
|
|
value = self._interpretValue(value)
|
|
if fn.eq(self.opts["value"], value):
|
|
return value
|
|
|
|
if "lock" in self.opts.keys():
|
|
if self.opts["lock"]:
|
|
return value
|
|
self.opts["value"] = value
|
|
self.sigValueChanged.emit(
|
|
self, value
|
|
) # value might change after signal is received by tree item
|
|
finally:
|
|
if blockSignal is not None:
|
|
self.sigValueChanged.connect(blockSignal)
|
|
|
|
return self.opts["value"]
|
|
|
|
status = self.ld_status
|
|
status.setHeaderHidden(True)
|
|
status.setParameters(self.params[0], showTop=False)
|
|
|
|
tree = self.ld_tree
|
|
tree.setHeaderHidden(True)
|
|
tree.setParameters(self.params[1], showTop=False)
|
|
self.params[1].sigTreeStateChanged.connect(self.send_command)
|
|
self.params[1].setValue = _setValue
|
|
|
|
status = self.tec_status
|
|
status.setHeaderHidden(True)
|
|
status.setParameters(self.params[2], showTop=False)
|
|
|
|
tree = self.tec_tree
|
|
tree.setHeaderHidden(True)
|
|
tree.setParameters(self.params[3], showTop=False)
|
|
self.params[3].sigTreeStateChanged.connect(self.send_command)
|
|
self.params[3].setValue = _setValue
|
|
|
|
async def _on_connection_changed(self, result, hard_reset=False):
|
|
def ctrl_panel_setEnable(result):
|
|
self.ld_status.setEnabled(result)
|
|
self.ld_tree.setEnabled(result)
|
|
self.ld_pwr_on_btn.setEnabled(result)
|
|
self.ld_pwr_off_btn.setEnabled(result)
|
|
self.ld_clear_alarm_btn.setEnabled(result)
|
|
self.tec_status.setEnabled(result)
|
|
self.tec_tree.setEnabled(result)
|
|
self.tec_pwr_on_btn.setEnabled(result)
|
|
self.tec_pwr_off_btn.setEnabled(result)
|
|
self.tec_clear_alarm_btn.setEnabled(result)
|
|
ctrl_panel_setEnable(result)
|
|
|
|
def menu_bar_setEnable(result):
|
|
self.menu_action_about_kirdy.setEnabled(result)
|
|
self.menu_action_connect.setEnabled(result)
|
|
self.menu_action_dfu_mode.setEnabled(result)
|
|
self.menu_action_disconnect.setEnabled(result)
|
|
self.menu_action_hard_reset.setEnabled(result)
|
|
self.menu_action_save.setEnabled(result)
|
|
self.menu_action_load.setEnabled(result)
|
|
self.menu_action_update_net_settings.setEnabled(result)
|
|
menu_bar_setEnable(result)
|
|
|
|
def graph_group_setEnable(result):
|
|
self.ld_i_set_graph.setEnabled(result)
|
|
self.pd_mon_pwr_graph.setEnabled(result)
|
|
self.tec_i_graph.setEnabled(result)
|
|
self.tec_temp_graph.setEnabled(result)
|
|
graph_group_setEnable(result)
|
|
|
|
self.report_refresh_spin.setEnabled(result)
|
|
|
|
self.report_group.setEnabled(result)
|
|
self.report_refresh_spin.setEnabled(result)
|
|
self.report_box.setEnabled(result)
|
|
self.report_apply_btn.setEnabled(result)
|
|
|
|
# TODO: Use QStateMachine to manage connections
|
|
self.connect_btn.clicked.disconnect()
|
|
if result:
|
|
self.connect_btn.setText("Disconnect")
|
|
self.connect_btn.clicked.connect(self.bail)
|
|
else:
|
|
self.connect_btn.setText("Connect")
|
|
self.connect_btn.clicked.connect(self.show_conn_settings_form)
|
|
|
|
if result:
|
|
# TODO: self.hw_rev_data = await self.kirdy.hw_rev()
|
|
self._status()
|
|
self.kirdy_data_watcher.start_watching()
|
|
else:
|
|
self.clear_graphs()
|
|
self.report_box.setChecked(False)
|
|
await self.kirdy_data_watcher.stop_watching()
|
|
if hard_reset:
|
|
await self.kirdy.device.hard_reset()
|
|
await self.kirdy.end_session()
|
|
self.status_lbl.setText("Disconnected")
|
|
|
|
def _status(self):
|
|
# TODO: Get rev no from Kirdy and then add revision into the text
|
|
host = self.ip_addr
|
|
port = self.port
|
|
self.status_lbl.setText(f"Connected to Kirdy @ {host}:{port}")
|
|
|
|
def clear_graphs(self):
|
|
self.graphs.clear_data_pts()
|
|
|
|
@asyncSlot(dict)
|
|
async def graphs_update(self, report):
|
|
self.graphs.plot_append(report)
|
|
|
|
@asyncSlot(dict)
|
|
async def update_ld_ctrl_panel_settings(self, settings):
|
|
try:
|
|
settings = settings['laser']
|
|
with QSignalBlocker(self.params[1]):
|
|
self.params[1].child('Output Config', 'LD Current Set').setValue(settings["ld_drive_current"]['value'])
|
|
self.params[1].child('Output Config', 'LD Current Set Soft Limit').setValue(settings["ld_drive_current_limit"]['value'])
|
|
self.params[1].child('Output Config', 'LD Power Limit').setValue(settings["ld_pwr_limit"])
|
|
self.params[1].child('Output Config', 'LD Terminals Short').setValue(settings["ld_terms_short"])
|
|
self.params[1].child('Output Config', 'Default Power On').setValue(settings["default_pwr_on"])
|
|
if settings["pd_mon_params"]["responsitivity"] is not None:
|
|
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValue(settings["pd_mon_params"]["responsitivity"])
|
|
else:
|
|
self.params[1].child('Photodiode Monitor Config', 'Responsitivity').setValue(0)
|
|
self.params[1].child('Photodiode Monitor Config', 'Dark Current').setValue(settings["pd_mon_params"]["i_dark"])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
|
|
|
|
@asyncSlot(dict)
|
|
async def update_ld_ctrl_panel_readings(self, report):
|
|
try:
|
|
report = report['laser']
|
|
with QSignalBlocker(self.params[0]):
|
|
self.params[0].child('Power').setValue('g' if report['pwr_on'] else 'w')
|
|
self.params[0].child('Alarm').setValue('r' if report['pwr_excursion'] else 'w')
|
|
|
|
with QSignalBlocker(self.params[1]):
|
|
self.params[1].child('Readings', 'LD Current Set').setValue(report["ld_i_set"])
|
|
self.params[1].child('Readings', 'PD Current').setValue(report["pd_i"])
|
|
if report["pd_pwr"] is not None:
|
|
self.params[1].child('Readings', 'PD Power').setValue(report["pd_pwr"])
|
|
else:
|
|
self.params[1].child('Readings', 'PD Power').setValue(0)
|
|
self.params[1].child('Readings', 'LF Mod Impedance').setValue(report["term_status"])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
@asyncSlot(dict)
|
|
async def update_thermostat_ctrl_panel_settings(self, settings):
|
|
try:
|
|
settings = settings['thermostat']
|
|
with QSignalBlocker(self.params[3]):
|
|
self.params[3].child('Output Config', 'Control Method').setValue("Temperature PID" if settings["pid_engaged"] else "Constant Current")
|
|
self.params[3].child('Output Config', 'Control Method', 'Set Current').setValue(settings["tec_settings"]['i_set']['value'])
|
|
self.params[3].child('Output Config', 'Control Method', 'Set Temperature').setValue(float(settings["temperature_setpoint"]))
|
|
self.params[3].child('Output Config', 'Limits', 'Max Cooling Current').setValue(settings["tec_settings"]['max_i_pos']['value'])
|
|
self.params[3].child('Output Config', 'Limits', 'Max Heating Current').setValue(settings["tec_settings"]['max_i_neg']['value'])
|
|
self.params[3].child('Output Config', 'Limits', 'Max Voltage Difference').setValue(settings["tec_settings"]['max_v']['value'])
|
|
self.params[3].child('Output Config', 'Default Power On').setValue(settings["default_pwr_on"])
|
|
# TODO: Update the Temperature ADC Settings here as well
|
|
self.params[3].child('Temperature Monitor Config', 'Upper Limit').setValue(settings["temp_mon_settings"]['upper_limit'])
|
|
self.params[3].child('Temperature Monitor Config', 'Lower Limit').setValue(settings["temp_mon_settings"]['lower_limit'])
|
|
self.params[3].child('PID Config', 'Kp').setValue(settings["pid_params"]['kp'])
|
|
self.params[3].child('PID Config', 'Ki').setValue(settings["pid_params"]['ki'])
|
|
self.params[3].child('PID Config', 'Kd').setValue(settings["pid_params"]['kd'])
|
|
self.params[3].child('PID Config', 'PID Output Clamping', 'Minimum').setValue(settings["pid_params"]['output_min'])
|
|
self.params[3].child('PID Config', 'PID Output Clamping', 'Maximum').setValue(settings["pid_params"]['output_max'])
|
|
self.params[3].child('Thermistor Settings', 'T₀').setValue(settings["thermistor_params"]['t0'])
|
|
self.params[3].child('Thermistor Settings', 'R₀').setValue(settings["thermistor_params"]['r0'])
|
|
self.params[3].child('Thermistor Settings', 'B').setValue(settings["thermistor_params"]['b'])
|
|
self.graphs.set_temp_setpoint_line(temp=round(settings["temperature_setpoint"], 6))
|
|
self.graphs.set_temp_setpoint_line(visible=settings['pid_engaged'])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{settings}", exc_info=True)
|
|
|
|
@asyncSlot(dict)
|
|
async def update_thermostat_ctrl_panel_readings(self, report):
|
|
try:
|
|
report = report['thermostat']
|
|
with QSignalBlocker(self.params[2]):
|
|
self.params[2].child('Power').setValue('g' if report['pwr_on'] else 'w')
|
|
self.params[2].child('Alarm').setValue('r' if report['temp_mon_status']['over_temp_alarm'] else 'w')
|
|
with QSignalBlocker(self.params[3]):
|
|
self.params[3].child('Readings', 'Temperature').setValue(report["temperature"])
|
|
self.params[3].child('Readings', 'Current through TEC').setValue(report["tec_i"])
|
|
except Exception as e:
|
|
logging.error(f"Params tree cannot be updated. Data:{report}", exc_info=True)
|
|
|
|
@pyqtSlot(int)
|
|
def set_max_samples(self, samples: int):
|
|
self.graphs.set_max_samples(samples)
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self.kirdy_data_watcher.set_report_mode(enabled)
|
|
|
|
@asyncSlot()
|
|
async def update_net_settings(self):
|
|
net_settings = self.update_net_settings_form.get_net_settings()
|
|
if net_settings is None:
|
|
self.status_lbl.setText("Invalid IP Settings Input")
|
|
return
|
|
addr = net_settings["ip_addr"]
|
|
port = net_settings["port"]
|
|
prefix_len = net_settings["prefix_len"]
|
|
gateway = net_settings["gateway_addr"]
|
|
await self.kirdy.device.set_ip_settings(addr, port, prefix_len, gateway)
|
|
self.status_lbl.setText("IP Settings is Updated")
|
|
|
|
@asyncSlot()
|
|
async def start_connecting(self):
|
|
net_settings = self.conn_settings_form.get_net_settings()
|
|
if net_settings is None:
|
|
self.status_lbl.setText("Invalid IP Settings Input")
|
|
return
|
|
self.ip_addr = net_settings["ip_addr"]
|
|
self.port = net_settings["port"]
|
|
|
|
host = self.ip_addr
|
|
port = self.port
|
|
try:
|
|
if not (self.kirdy.connecting() or self.kirdy.connected()):
|
|
self.status_lbl.setText("Connecting...")
|
|
await self.kirdy.start_session(host=host, port=port, timeout=0.1)
|
|
await self._on_connection_changed(True)
|
|
else:
|
|
await self.bail()
|
|
except (OSError, TimeoutError, ConnectionResetError) as e:
|
|
logging.error(f"Failed communicating to {host}:{port}: {e}")
|
|
await self.bail()
|
|
self.status_lbl.setText(f"Cannot connect to Kirdy@ {host}:{port}")
|
|
|
|
@asyncSlot()
|
|
async def bail(self):
|
|
await self._on_connection_changed(False)
|
|
await self.kirdy.end_session()
|
|
|
|
@asyncSlot(object, object)
|
|
async def send_command(self, param, changes):
|
|
for inner_param, change, data in changes:
|
|
if change == 'value':
|
|
""" cmd translation from mutex type parameter """
|
|
if inner_param.opts.get('target_action_pair', None) is not None:
|
|
target, action = inner_param.opts['target_action_pair'][inner_param.opts['limits'].index(data)]
|
|
cmd = getattr(getattr(self.kirdy, target), action)
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=True)
|
|
await cmd()
|
|
param.child(*param.childPath(inner_param)).setOpts(lock=False)
|
|
continue
|
|
""" cmd translation from non-mutex type parameter"""
|
|
if inner_param.opts.get("target", None) is not None:
|
|
if inner_param.opts.get("action", None) is not None:
|
|
cmd = getattr(getattr(self.kirdy, inner_param.opts["target"]), inner_param.opts["action"])
|
|
await cmd(data)
|
|
continue
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|