forked from M-Labs/artiq
1
0
Fork 0
artiq/artiq/dashboard/moninj.py

1045 lines
38 KiB
Python

import asyncio
import logging
import textwrap
import os
from collections import namedtuple
from PyQt5 import QtCore, QtWidgets, QtGui
import pyqtgraph as pg
from sipyco import pyon
from artiq.coredevice.comm_moninj import CommMonInj, TTLOverride, TTLProbe
from artiq.coredevice.ad9912_reg import AD9912_SER_CONF
from artiq.gui.tools import LayoutWidget, get_open_file_name, get_save_file_name
from artiq.gui.models import DictSyncTreeSepModel
from artiq.tools import exc_to_warning
logger = logging.getLogger(__name__)
class _CancellableLineEdit(QtWidgets.QLineEdit):
def escapePressedConnect(self, cb):
self.esc_cb = cb
def keyPressEvent(self, event):
key = event.key()
if key == QtCore.Qt.Key_Escape:
self.esc_cb(event)
QtWidgets.QLineEdit.keyPressEvent(self, event)
class _TTLWidget(QtWidgets.QFrame):
override_toggled = QtCore.pyqtSignal(bool)
level_toggled = QtCore.pyqtSignal(bool)
def __init__(self, title):
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
self.uid = title
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
grid.addWidget(label, 0, 0)
self.override = QtWidgets.QToolButton()
self.override.setText("OVR")
self.override.setCheckable(True)
self.override.setToolTip("Override")
grid.addWidget(self.override, 0, 1)
self.level = QtWidgets.QToolButton()
self.level.setText("LVL")
self.level.setCheckable(True)
self.level.setToolTip("Level")
grid.addWidget(self.level, 0, 2)
self.direction = QtWidgets.QLabel()
self.direction.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(self.direction, 0, 3)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignRight)
grid.addWidget(self.value, 0, 4)
grid.setColumnStretch(0, 1)
grid.setColumnStretch(1, 1)
grid.setColumnStretch(2, 1)
grid.setColumnStretch(3, 1)
grid.setColumnStretch(4, 1)
self.override.clicked.connect(self.override_toggled)
self.level.clicked.connect(self.level_toggled)
def setValueText(self, override, level):
value_s = "1" if level else "0"
if override:
value_s = "<b>" + value_s + "</b>"
color = " color=\"red\""
else:
color = ""
self.value.setText("<font size=\"5\"{}>{}</font>".format(
color, value_s))
def setDirectionText(self, oe):
direction = "OUT" if oe else "IN"
self.direction.setText("<font size=\"2\">" + direction + "</font>")
def setButtonsState(self, override, level):
self.override.setChecked(override)
if override:
self.stack.setCurrentIndex(1)
self.level.setChecked(level)
class _TTLHandler:
def __init__(self, dm, channel, force_out, title):
self.channel = channel
self.force_out = force_out
self.set_mode = dm.ttl_set_mode
self.title = title
self.uid = title
self.cur_level = False
self.cur_oe = False
self.cur_override = False
self.cur_override_level = False
self.widget = None
def override_toggled(self, override):
if override:
if self.widget.level.isChecked():
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
else:
self.set_mode(self.channel, "exp")
def level_toggled(self, level):
if self.widget.override.isChecked():
if level:
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
def refresh_display(self):
if self.widget is not None:
level = self.cur_override_level if self.cur_override else self.cur_level
oe = self.cur_oe or self.force_out
self.widget.setValueText(self.cur_override, level)
self.widget.setDirectionText(oe)
with QtCore.QSignalBlocker(self.widget):
self.widget.setButtonsState(self.cur_override, self.cur_level)
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def create_widget(self):
self.widget = _TTLWidget(self.title)
self.widget.override_toggled.connect(self.override_toggled)
self.widget.level_toggled.connect(self.level_toggled)
self.refresh_display()
def to_model_path(self):
return "ttl/{}".format(self.title)
class _DDSWidget(QtWidgets.QFrame):
apply_changes = QtCore.pyqtSignal()
off_clicked = QtCore.pyqtSignal()
set_clicked = QtCore.pyqtSignal()
def __init__(self, title, is_urukul):
QtWidgets.QFrame.__init__(self)
self.uid = title
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 0, 0)
self.stack = QtWidgets.QStackedWidget()
grid_disp = LayoutWidget()
grid_disp.layout.setContentsMargins(0, 0, 0, 0)
grid_disp.layout.setHorizontalSpacing(0)
set_btn = QtWidgets.QToolButton()
set_btn.setText("Set")
set_btn.setToolTip("Set frequency")
# for urukuls also allow switching off RF
if is_urukul:
grid_disp.addWidget(set_btn, 0, 0, 1, 1)
off_btn = QtWidgets.QToolButton()
off_btn.setText("Off")
off_btn.setToolTip("Switch off the output")
grid_disp.addWidget(off_btn, 0, 1, 1, 1)
else:
grid_disp.addWidget(set_btn, 0, 0, 1, 2)
self.value_label = QtWidgets.QLabel()
self.value_label.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
grid_disp.addWidget(self.value_label, 0, 2, 1, 4)
self.stack.addWidget(grid_disp)
# page 2: edit data
grid_edit = LayoutWidget()
apply = QtWidgets.QToolButton()
apply.setText("Apply")
apply.setToolTip("Apply changes")
grid_edit.addWidget(apply, 0, 0, 1, 1)
cancel = QtWidgets.QToolButton()
cancel.setText("Cancel")
cancel.setToolTip("Cancel changes")
grid_edit.addWidget(cancel, 0, 1, 1, 1)
grid_edit.layout.setContentsMargins(0, 0, 0, 0)
grid_edit.layout.setHorizontalSpacing(0)
grid_edit.layout.setVerticalSpacing(0)
self.value_edit = _CancellableLineEdit(self)
self.value_edit.setAlignment(QtCore.Qt.AlignRight)
grid_edit.addWidget(self.value_edit, 0, 2, 1, 4)
self.stack.addWidget(grid_edit)
grid.addWidget(self.stack, 0, 1, 1, 6)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
grid.addWidget(unit, 0, 7)
set_btn.clicked.connect(self.set_clicked)
apply.clicked.connect(self.apply_changes)
if is_urukul:
off_btn.clicked.connect(self.off_clicked)
off_btn.setToolTip(textwrap.dedent(
"""Note: If TTL RTIO sw for the channel is switched high,
this button will not disable the channel.
Use the TTL override instead."""))
self.value_edit.returnPressed.connect(self.apply_changes)
def cancel_changes(cancel):
self.set_page(0)
self.value_edit.escapePressedConnect(cancel_changes)
cancel.clicked.connect(cancel_changes)
def set_page(self, page):
self.stack.setCurrentIndex(page)
def get_value(self):
return float(self.value_edit.text())
def set_edit_value(self, value):
self.value_edit.setText("{:.7f}".format(value))
def set_value(self, value):
self.value_label.setText("<font size=\"4\">{:.7f}</font>".format(value))
self.set_edit_value(value)
def start_edit(self):
self.value_edit.setFocus()
self.value_edit.selectAll()
class _DDSHandler:
def __init__(self, dm, title, bus_channel, channel, dds_type,
ref_clk, cpld=None, pll=1, clk_div=0):
self.dm = dm
self.title = title
self.uid = title
self.bus_channel = bus_channel
self.channel = channel
self.cur_frequency = 0
self.dds_name = title
self.cpld = cpld
self.cur_frequency = 0
self.cur_reg = 0
self.dds_type = dds_type
self.is_urukul = dds_type in ["AD9910", "AD9912"]
if dds_type == "AD9914":
self.ftw_per_hz = 2**32 / ref_clk
else:
if dds_type == "AD9910":
max_freq = 1 << 32
clk_mult = [4, 1, 2, 4]
elif dds_type == "AD9912": # AD9912
max_freq = 1 << 48
clk_mult = [1, 1, 2, 4]
else:
raise NotImplementedError
sysclk = ref_clk / clk_mult[clk_div] * pll
self.ftw_per_hz = 1 / sysclk * max_freq
self.widget = None
def create_widget(self):
self.widget = _DDSWidget(self.title, self.is_urukul)
self.widget.apply_changes.connect(self.apply_changes)
self.widget.off_clicked.connect(self.off_clicked)
self.widget.set_clicked.connect(self.set_clicked)
self.refresh_display()
def monitor_update(self, probe, value):
if self.dds_type == "AD9912":
value = value << 16
self.cur_frequency = self._ftw_to_freq(value)
def _ftw_to_freq(self, ftw):
return ftw / self.ftw_per_hz
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_frequency / 1e6)
def apply_changes(self):
self.widget.set_page(0)
frequency = self.widget.get_value() * 1e6
self.dm.dds_set_frequency(self.dds_name, frequency)
def set_clicked(self):
self.widget.set_page(1)
self.widget.set_edit_value(self.cur_frequency / 1e6)
self.widget.start_edit()
def off_clicked(self):
self.dm.dds_channel_toggle(self.dds_name, sw=False)
def to_model_path(self):
return "dds/{}".format(self.title)
class _DACWidget(QtWidgets.QFrame):
def __init__(self, channel, title):
QtWidgets.QFrame.__init__(self)
self.uid = (title, channel)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
self.setLayout(grid)
label = QtWidgets.QLabel("{} ch{}".format(title, channel))
label.setAlignment(QtCore.Qt.AlignLeft)
grid.addWidget(label, 0, 1)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignRight)
grid.addWidget(self.value, 0, 2, 1, 1)
def set_value(self, value):
self.value.setText("<font size=\"4\">{:.3f}</font><font size=\"2\"> %</font>"
.format(value))
class _DACHandler:
def __init__(self, dm, spi_channel, channel, title):
self.widget = None
self.cur_value = 0
self.spi_channel = spi_channel
self.channel = channel
self.title = title
self.uid = (title, channel)
def create_widget(self):
self.widget = _DACWidget(self.channel, self.title)
self.refresh_display()
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_value * 100 / 2**16)
def to_model_path(self):
return "dac/{} ch{}".format(self.title, self.channel)
class Model(DictSyncTreeSepModel):
def __init__(self, init):
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
def clear(self):
for k in self.backing_store:
self._del_item(self, k.split(self.separator))
self.backing_store.clear()
def update(self, d):
for k, v in d.items():
self[v.to_model_path()] = v
class _AddChannelDialog(QtWidgets.QDialog):
def __init__(self, parent, model):
QtWidgets.QDialog.__init__(self, parent=parent)
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
self.setWindowTitle("Add channels")
layout = QtWidgets.QVBoxLayout()
self.setLayout(layout)
self._model = model
self._tree_view = QtWidgets.QTreeView()
self._tree_view.setHeaderHidden(True)
self._tree_view.setSelectionBehavior(
QtWidgets.QAbstractItemView.SelectItems)
self._tree_view.setSelectionMode(
QtWidgets.QAbstractItemView.ExtendedSelection)
self._tree_view.setModel(self._model)
layout.addWidget(self._tree_view)
self._button_box = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel
)
self._button_box.setCenterButtons(True)
self._button_box.accepted.connect(self.add_channels)
self._button_box.rejected.connect(self.reject)
layout.addWidget(self._button_box)
def add_channels(self):
selection = self._tree_view.selectedIndexes()
channels = []
for select in selection:
key = self._model.index_to_key(select)
if key is not None:
channels.append(self._model[key].ref)
self.channels = channels
self.accept()
_HandlerDesc = namedtuple("_HandlerDesc", "uid comment cls arguments")
def setup_from_ddb(ddb):
mi_addr = None
mi_port = None
dds_sysclk = None
description = set()
for k, v in ddb.items():
try:
if isinstance(v, dict):
comment = v.get("comment")
if v["type"] == "local":
if v["module"] == "artiq.coredevice.ttl":
if "ttl_urukul" in k:
continue
channel = v["arguments"]["channel"]
force_out = v["class"] == "TTLOut"
handler = _HandlerDesc(k, comment, _TTLHandler, (channel, force_out, k))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9914" and v["class"] == "AD9914"):
bus_channel = v["arguments"]["bus_channel"]
channel = v["arguments"]["channel"]
dds_sysclk = v["arguments"]["sysclk"]
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], dds_sysclk))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9910" and v["class"] == "AD9910") or \
(v["module"] == "artiq.coredevice.ad9912" and v["class"] == "AD9912"):
channel = v["arguments"]["chip_select"] - 4
if channel < 0:
continue
dds_cpld = v["arguments"]["cpld_device"]
spi_dev = ddb[dds_cpld]["arguments"]["spi_device"]
bus_channel = ddb[spi_dev]["arguments"]["channel"]
pll = v["arguments"]["pll_n"]
refclk = ddb[dds_cpld]["arguments"]["refclk"]
clk_div = v["arguments"].get("clk_div", 0)
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], refclk,
dds_cpld, pll, clk_div))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") or \
(v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino"):
spi_device = v["arguments"]["spi_device"]
spi_device = ddb[spi_device]
while isinstance(spi_device, str):
spi_device = ddb[spi_device]
spi_channel = spi_device["arguments"]["channel"]
for channel in range(32):
handler = _HandlerDesc((k, channel), comment, _DACHandler,
(spi_channel, channel, k))
description.add(handler)
elif v["type"] == "controller" and k == "core_moninj":
mi_addr = v["host"]
mi_port = v.get("port_proxy", 1383)
except KeyError:
pass
return mi_addr, mi_port, description
class _DeviceManager:
def __init__(self, schedule_ctl):
self.mi_addr = None
self.mi_port = None
self.reconnect_mi = asyncio.Event()
self.mi_connection = None
self.mi_connector_task = asyncio.ensure_future(self.mi_connector())
self.schedule_ctl = schedule_ctl
self.ddb = dict()
self.description = set()
self.handlers_by_uid = dict()
self.dds_sysclk = 0
self.ttl_handlers = dict()
self.dds_handlers = dict()
self.dac_handlers = dict()
self.channels_cb = lambda: None
def init_ddb(self, ddb):
self.ddb = ddb
def notify_ddb(self, mod):
mi_addr, mi_port, description = setup_from_ddb(self.ddb)
if (mi_addr, mi_port) != (self.mi_addr, self.mi_port):
self.mi_addr = mi_addr
self.mi_port = mi_port
self.reconnect_mi.set()
for to_remove in self.description - description:
handler = self.handlers_by_uid[to_remove.uid]
del self.handlers_by_uid[to_remove.uid]
if isinstance(handler, _TTLHandler):
self.setup_ttl_monitoring(False, handler.channel)
handler.delete_widget()
del self.ttl_handlers[handler.channel]
elif isinstance(handler, _DDSHandler):
self.setup_dds_monitoring(False, handler.bus_channel, handler.channel)
handler.delete_widget()
del self.dds_handlers[(handler.bus_channel, handler.channel)]
elif isinstance(handler, _DACHandler):
self.setup_dac_monitoring(False, handler.spi_channel, handler.channel)
handler.delete_widget()
del self.dac_handlers[(handler.spi_channel, handler.channel)]
else:
raise ValueError
for to_add in description - self.description:
handler = to_add.cls(self, *to_add.arguments)
handler.create_widget()
if to_add.comment is not None:
handler.widget.setToolTip(to_add.comment)
self.handlers_by_uid[to_add.uid] = handler
if isinstance(handler, _TTLHandler):
self.ttl_handlers[handler.channel] = handler
self.setup_ttl_monitoring(True, handler.channel)
elif isinstance(handler, _DDSHandler):
self.dds_handlers[(handler.bus_channel, handler.channel)] = handler
self.setup_dds_monitoring(True, handler.bus_channel, handler.channel)
elif isinstance(handler, _DACHandler):
self.dac_handlers[(handler.spi_channel, handler.channel)] = handler
self.setup_dac_monitoring(True, handler.spi_channel, handler.channel)
else:
raise ValueError
if description != self.description:
self.channels_cb()
self.description = description
def ttl_set_mode(self, channel, mode):
if self.mi_connection is not None:
handler = self.ttl_handlers[channel]
if mode == "0":
handler.cur_override = True
handler.cur_level = False
self.mi_connection.inject(channel, TTLOverride.level.value, 0)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "1":
handler.cur_override = True
handler.cur_level = True
self.mi_connection.inject(channel, TTLOverride.level.value, 1)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "exp":
handler.cur_override = False
self.mi_connection.inject(channel, TTLOverride.en.value, 0)
else:
raise ValueError
# override state may have changed
handler.refresh_display()
async def _submit_by_content(self, content, class_name, title):
expid = {
"log_level": logging.WARNING,
"content": content,
"class_name": class_name,
"arguments": {}
}
scheduling = {
"pipeline_name": "main",
"priority": 0,
"due_date": None,
"flush": False
}
rid = await self.schedule_ctl.submit(
scheduling["pipeline_name"],
expid,
scheduling["priority"], scheduling["due_date"],
scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid)
def _dds_faux_injection(self, dds_channel, action, title, log_msg):
handler = self.handlers_by_uid[dds_channel]
# create kernel and fill it in and send-by-content
# initialize CPLD (if applicable)
if handler.is_urukul:
# urukuls need CPLD init and switch to on
cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(handler.cpld)
# `sta`/`rf_sw`` variables are guaranteed for urukuls
# so {action} can use it
# if there's no RF enabled, CPLD may have not been initialized
# but if there is, it has been initialised - no need to do again
cpld_init = """delay(15*ms)
was_init = self.core_cache.get("_{cpld}_init")
sta = self.{cpld}.sta_read()
rf_sw = urukul_sta_rf_sw(sta)
if rf_sw == 0 and len(was_init) == 0:
delay(15*ms)
self.{cpld}.init()
self.core_cache.put("_{cpld}_init", [1])
""".format(cpld=handler.cpld)
else:
cpld_dev = ""
cpld_init = ""
# AD9912/9910: init channel (if uninitialized)
if handler.dds_type == "AD9912":
# 0xFF before init, 0x99 after
channel_init = """
if self.{dds_channel}.read({cfgreg}, length=1) == 0xFF:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
elif handler.dds_type == "AD9910":
# -1 before init, 2 after
channel_init = """
if self.{dds_channel}.read32({cfgreg}) == -1:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
else:
channel_init = "self.{dds_channel}.init()".format(dds_channel=dds_channel)
dds_exp = textwrap.dedent("""
from artiq.experiment import *
from artiq.coredevice.urukul import *
class {title}(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("{dds_channel}")
{cpld_dev}
@kernel
def run(self):
self.core.break_realtime()
{cpld_init}
delay(10*ms)
{channel_init}
delay(15*ms)
{action}
""".format(title=title, action=action,
dds_channel=dds_channel,
cpld_dev=cpld_dev, cpld_init=cpld_init,
channel_init=channel_init))
asyncio.ensure_future(
self._submit_by_content(
dds_exp,
title,
log_msg))
def dds_set_frequency(self, dds_channel, freq):
handler = self.handlers_by_uid[dds_channel]
action = "self.{ch}.set({freq})".format(
freq=freq, ch=dds_channel)
if handler.is_urukul:
action += """
ch_no = self.{ch}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw | 1 << ch_no)
""".format(ch=dds_channel, cpld=handler.cpld)
self._dds_faux_injection(
dds_channel,
action,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq / 1e6))
def dds_channel_toggle(self, dds_channel, sw=True):
handler = self.handlers_by_uid[dds_channel]
# urukul only
if sw:
switch = "| 1 << ch_no"
else:
switch = "& ~(1 << ch_no)"
action = """
ch_no = self.{dds_channel}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw {switch})
""".format(
dds_channel=dds_channel,
cpld=handler.cpld,
switch=switch
)
self._dds_faux_injection(
dds_channel,
action,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
def setup_ttl_monitoring(self, enable, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value)
self.mi_connection.monitor_probe(enable, channel, TTLProbe.oe.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.en.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.level.value)
if enable:
self.mi_connection.get_injection_status(channel, TTLOverride.en.value)
def setup_dds_monitoring(self, enable, bus_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, bus_channel, channel)
def setup_dac_monitoring(self, enable, spi_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, spi_channel, channel)
def monitor_cb(self, channel, probe, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if probe == TTLProbe.level.value:
handler.cur_level = bool(value)
elif probe == TTLProbe.oe.value:
handler.cur_oe = bool(value)
handler.refresh_display()
elif (channel, probe) in self.dds_handlers:
handler = self.dds_handlers[(channel, probe)]
handler.monitor_update(probe, value)
handler.refresh_display()
elif (channel, probe) in self.dac_handlers:
handler = self.dac_handlers[(channel, probe)]
handler.cur_value = value
handler.refresh_display()
def injection_status_cb(self, channel, override, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if override == TTLOverride.en.value:
handler.cur_override = bool(value)
if override == TTLOverride.level.value:
handler.cur_override_level = bool(value)
handler.refresh_display()
def disconnect_cb(self):
logger.error("lost connection to moninj")
self.reconnect_mi.set()
async def mi_connector(self):
while True:
await self.reconnect_mi.wait()
self.reconnect_mi.clear()
if self.mi_connection is not None:
await self.mi_connection.close()
self.mi_connection = None
new_mi_connection = CommMonInj(self.monitor_cb, self.injection_status_cb,
self.disconnect_cb)
try:
await new_mi_connection.connect(self.mi_addr, self.mi_port)
except Exception:
logger.error("failed to connect to moninj. Is aqctl_moninj_proxy running?",
exc_info=True)
await asyncio.sleep(10.)
self.reconnect_mi.set()
else:
logger.info("ARTIQ dashboard connected to moninj (%s)",
self.mi_addr)
self.mi_connection = new_mi_connection
for ttl_channel in self.ttl_handlers.keys():
self.setup_ttl_monitoring(True, ttl_channel)
for bus_channel, channel in self.dds_handlers.keys():
self.setup_dds_monitoring(True, bus_channel, channel)
for spi_channel, channel in self.dac_handlers.keys():
self.setup_dac_monitoring(True, spi_channel, channel)
async def close(self):
self.mi_connector_task.cancel()
try:
await asyncio.wait_for(self.mi_connector_task, None)
except asyncio.CancelledError:
pass
if self.mi_connection is not None:
await self.mi_connection.close()
class MoninjTreeWidget(pg.TreeWidget):
def __init__(self):
pg.TreeWidget.__init__(self)
self.setColumnCount(1)
self.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
self.header().setVisible(False)
self.setHorizontalScrollMode(self.ScrollPerPixel)
self.setVerticalScrollMode(self.ScrollPerPixel)
palette = QtGui.QPalette()
palette.setColor(QtGui.QPalette.Highlight, QtGui.QColor("gray"))
self.setPalette(palette)
self._widgets = dict()
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
delete_action = QtWidgets.QAction("Delete", self)
delete_action.triggered.connect(self._remove_item_action)
delete_action.setShortcut("DEL")
delete_action.setShortcutContext(QtCore.Qt.WidgetShortcut)
self.addAction(delete_action)
add_grp_action = QtWidgets.QAction("Insert group", self)
add_grp_action.triggered.connect(self._add_group_action)
add_grp_action.setShortcut("CTRL+SHIFT+N")
add_grp_action.setShortcutContext(QtCore.Qt.WidgetShortcut)
self.addAction(add_grp_action)
def supportedDropActions(self):
# Moving items breaks widgets in pg.TreeWidget, perform only copies
return QtCore.Qt.CopyAction
def itemMoving(self, item, parent, index):
if parent is None:
return True
if self.indexOfTopLevelItem(parent) == -1:
return False
if self.itemWidget(item, 0) is None:
return False
if self.itemWidget(parent, 0) is not None:
return False
return True
def _find_group(self, group):
for i in range(self.topLevelItemCount()):
item = self.topLevelItem(i)
if item.text(0) == group:
return item
return self.invisibleRootItem()
def add_widget(self, widget, group=None):
item = pg.TreeWidgetItem()
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsDropEnabled)
if group is None:
self.addTopLevelItem(item)
else:
group_item = self._find_group(group)
group_item.addChild(item)
self.setItemWidget(item, 0, widget)
def _remove_item_action(self):
if len(self.selectedItems()) == 0:
return
item = self.selectedItems()[0]
self.remove_item(item)
def remove_item(self, item):
if self.itemWidget(item, 0) is None:
item.takeChildren()
root = self.invisibleRootItem()
(item.parent() or root).removeChild(item)
def _add_group_action(self):
if len(self.selectedItems()) == 0:
self.add_group()
return
item = self.selectedItems()[0]
index = self.indexOfTopLevelItem(item)
if index != -1:
self.insert_group(index + 1)
else:
parent = item.parent()
index = self.indexOfTopLevelItem(parent)
self.insert_group(index + 1)
def add_group(self, name="untitled"):
group_item = pg.TreeWidgetItem([name])
group_item.setFlags(group_item.flags() | QtCore.Qt.ItemFlag.ItemIsEditable)
self.addTopLevelItem(group_item)
return group_item
def insert_group(self, index, name="untitled"):
group_item = pg.TreeWidgetItem([name])
group_item.setFlags(group_item.flags() | QtCore.Qt.ItemFlag.ItemIsEditable)
self.insertTopLevelItem(index, group_item)
return group_item
def extend(self, d):
for item in d:
if isinstance(item, dict):
self.add_group(item["name"])
for widget in item["widgets"]:
self.add_widget(widget, group=item["name"])
else:
self.add_widget(item)
def export_list(self):
export_list = list()
for i in range(self.topLevelItemCount()):
item = self.topLevelItem(i)
widget = self.itemWidget(item, 0)
if widget is None:
group = dict()
group["name"] = item.text(0)
group["widgets"] = list()
for j in range(item.childCount()):
child = item.child(j)
widget = self.itemWidget(child, 0)
group["widgets"].append(widget.uid)
export_list.append(group)
else:
export_list.append(widget.uid)
return export_list
class MonInjDock(QtWidgets.QDockWidget):
def __init__(self, schedule_ctl):
QtWidgets.QDockWidget.__init__(self, "MonInj")
self.setObjectName("MonInj")
self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable |
QtWidgets.QDockWidget.DockWidgetFloatable)
self.dm = _DeviceManager(schedule_ctl)
self.dm.channels_cb = lambda: self.set_channels(self.dm.handlers_by_uid)
layout = LayoutWidget()
self.setWidget(layout)
self._current_dir = os.getcwd()
self._menu_btn = QtWidgets.QPushButton()
self._menu_btn.setIcon(
QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_FileDialogStart))
layout.addWidget(self._menu_btn, 0, 0)
self._channel_model = Model({})
self.add_channel_dialog = _AddChannelDialog(self, self._channel_model)
self.add_channel_dialog.accepted.connect(
lambda: self.layout_widgets(self.add_channel_dialog.channels))
add_channel_btn = QtWidgets.QToolButton()
add_channel_btn.setToolTip("Add channels...")
add_channel_btn.setIcon(
QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_FileDialogListView))
add_channel_btn.clicked.connect(self.add_channel_dialog.open)
layout.addWidget(add_channel_btn, 0, 1)
self.tree = MoninjTreeWidget()
layout.addWidget(self.tree, 1, 0, colspan=10)
self._file_menu = QtWidgets.QMenu()
self._add_async_action("Open configuration...", self.load_configuration)
self._add_async_action("Save configuration...", self.save_configuration)
self._menu_btn.setMenu(self._file_menu)
def _add_async_action(self, label, coro):
action = QtWidgets.QAction(label, self)
action.triggered.connect(
lambda: asyncio.ensure_future(exc_to_warning(coro())))
self._file_menu.addAction(action)
def layout_widgets(self, handlers):
for handler in handlers:
handler.create_widget()
self.tree.add_widget(handler.widget)
handler.refresh_display()
def set_channels(self, handlers):
self._channel_model.update(handlers)
def _get_widget_from_handler(self, uid):
if uid in self.dm.handlers_by_uid:
handler = self.dm.handlers_by_uid[uid]
handler.create_widget()
return handler.widget
else:
logger.warning("skipping moninj widget with uid {}:"
" description incorrect/missing from device db".format(uid))
return None
def _connect_config(self, config):
out_config = []
for item in config:
if isinstance(item, dict):
out_widgets = []
for uid in item["widgets"]:
widget = self._get_widget_from_handler(uid)
if widget is not None:
out_widgets.append(widget)
item["widgets"] = out_widgets
out_config.append(item)
else:
widget = self._get_widget_from_handler(item)
if widget is not None:
out_config.append(widget)
return out_config
async def load_configuration(self):
try:
filename = await get_open_file_name(
self,
"Load configuration",
self._current_dir,
"PYON files (*.pyon);;All files (*.*)")
except asyncio.CancelledError:
return
self._current_dir = os.path.dirname(filename)
try:
configuration = pyon.load_file(filename)
connected_config = self._connect_config(configuration)
self.tree.clear()
self.tree.extend(connected_config)
except Exception:
logger.error("Failed to load moninj configuration", exc_info=True)
async def save_configuration(self):
try:
filename = await get_save_file_name(
self,
"Save configuration",
self._current_dir,
"PYON files (*.pyon);;All files (*.*)")
except asyncio.CancelledError:
return
self._current_dir = os.path.dirname(filename)
try:
configuration = self.tree.export_list()
pyon.store_file(filename, configuration)
except Exception:
logger.error("Failed to save moninj configuration", exc_info=True)
async def stop(self):
if self.dm is not None:
await self.dm.close()