forked from M-Labs/artiq
1
0
Fork 0
artiq/artiq/dashboard/moninj.py

995 lines
36 KiB
Python

import asyncio
import logging
import textwrap
import os
from collections import namedtuple
from PyQt5 import QtCore, QtWidgets, QtGui
import pyqtgraph as pg
from sipyco import pyon
from artiq.coredevice.comm_moninj import CommMonInj, TTLOverride, TTLProbe
from artiq.coredevice.ad9912_reg import AD9912_SER_CONF
from artiq.gui.tools import LayoutWidget, get_open_file_name, get_save_file_name
from artiq.gui.models import DictSyncTreeSepModel
logger = logging.getLogger(__name__)
class _CancellableLineEdit(QtWidgets.QLineEdit):
def escapePressedConnect(self, cb):
self.esc_cb = cb
def keyPressEvent(self, event):
key = event.key()
if key == QtCore.Qt.Key_Escape:
self.esc_cb(event)
QtWidgets.QLineEdit.keyPressEvent(self, event)
class _TTLWidget(QtWidgets.QFrame):
override_toggled = QtCore.pyqtSignal(bool)
level_toggled = QtCore.pyqtSignal(bool)
def __init__(self, title):
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
self.uid = title
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
grid.addWidget(label, 0, 0)
self.stack = QtWidgets.QStackedWidget()
grid.addWidget(self.stack, 0, 1)
self.direction = QtWidgets.QLabel()
self.direction.setAlignment(QtCore.Qt.AlignCenter)
self.stack.addWidget(self.direction)
grid_cb = LayoutWidget()
grid_cb.layout.setContentsMargins(0, 0, 0, 0)
grid_cb.layout.setHorizontalSpacing(0)
grid_cb.layout.setVerticalSpacing(0)
self.override = QtWidgets.QToolButton()
self.override.setText("OVR")
self.override.setCheckable(True)
self.override.setToolTip("Override")
grid_cb.addWidget(self.override, 0, 0)
self.level = QtWidgets.QToolButton()
self.level.setText("LVL")
self.level.setCheckable(True)
self.level.setToolTip("Level")
grid_cb.addWidget(self.level, 0, 1)
self.stack.addWidget(grid_cb)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignRight)
grid.addWidget(self.value, 0, 2)
grid.setColumnStretch(0, 1)
grid.setColumnStretch(1, 1)
grid.setColumnStretch(2, 1)
self.override.clicked.connect(self.override_toggled)
self.level.clicked.connect(self.level_toggled)
def setValueText(self, override, level):
value_s = "1" if level else "0"
if override:
value_s = "<b>" + value_s + "</b>"
color = " color=\"red\""
else:
color = ""
self.value.setText("<font size=\"5\"{}>{}</font>".format(
color, value_s))
def setDirectionText(self, oe):
direction = "OUT" if oe else "IN"
self.direction.setText("<font size=\"2\">" + direction + "</font>")
def setButtonsState(self, override, level):
self.override.setChecked(override)
if override:
self.stack.setCurrentIndex(1)
self.level.setChecked(level)
def enterEvent(self, event):
self.stack.setCurrentIndex(1)
QtWidgets.QFrame.enterEvent(self, event)
def leaveEvent(self, event):
if not self.override.isChecked():
self.stack.setCurrentIndex(0)
QtWidgets.QFrame.leaveEvent(self, event)
class _TTLHandler:
def __init__(self, dm, channel, force_out, title):
self.channel = channel
self.force_out = force_out
self.set_mode = dm.ttl_set_mode
self.title = title
self.uid = title
self.cur_level = False
self.cur_oe = False
self.cur_override = False
self.cur_override_level = False
self.widget = None
def override_toggled(self, override):
if override:
if self.widget.level.isChecked():
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
else:
self.set_mode(self.channel, "exp")
def level_toggled(self, level):
if self.widget.override.isChecked():
if level:
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
def refresh_display(self):
if self.widget is not None:
level = self.cur_override_level if self.cur_override else self.cur_level
oe = self.cur_oe or self.force_out
self.widget.setValueText(self.cur_override, level)
self.widget.setDirectionText(oe)
with QtCore.QSignalBlocker(self.widget):
self.widget.setButtonsState(self.cur_override, self.cur_level)
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def create_widget(self):
self.widget = _TTLWidget(self.title)
self.widget.override_toggled.connect(self.override_toggled)
self.widget.level_toggled.connect(self.level_toggled)
self.refresh_display()
def to_model_path(self):
return "ttl/{}".format(self.title)
class _DDSWidget(QtWidgets.QFrame):
apply_changes = QtCore.pyqtSignal()
off_clicked = QtCore.pyqtSignal()
set_clicked = QtCore.pyqtSignal()
def __init__(self, title, is_urukul):
QtWidgets.QFrame.__init__(self)
self.uid = title
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 0, 0)
# FREQ DATA/EDIT FIELD
self.data_stack = QtWidgets.QStackedWidget()
# page 1: display data
grid_disp = LayoutWidget()
grid_disp.layout.setContentsMargins(0, 0, 0, 0)
grid_disp.layout.setHorizontalSpacing(0)
self.value_label = QtWidgets.QLabel()
self.value_label.setAlignment(QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter)
grid_disp.addWidget(self.value_label, 0, 0, 1, 4)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_disp.addWidget(unit, 0, 4, 1, 1)
self.data_stack.addWidget(grid_disp)
# page 2: edit data
grid_edit = LayoutWidget()
grid_edit.layout.setContentsMargins(0, 0, 0, 0)
grid_edit.layout.setHorizontalSpacing(0)
grid_edit.layout.setVerticalSpacing(0)
self.value_edit = _CancellableLineEdit(self)
self.value_edit.setAlignment(QtCore.Qt.AlignRight)
grid_edit.addWidget(self.value_edit, 0, 0, 1, 4)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_edit.addWidget(unit, 0, 4, 1, 1)
self.data_stack.addWidget(grid_edit)
grid.addWidget(self.data_stack, 0, 2)
# BUTTONS
self.button_stack = QtWidgets.QStackedWidget()
# page 1: SET button
set_grid = LayoutWidget()
set_btn = QtWidgets.QToolButton()
set_btn.setText("Set")
set_btn.setToolTip("Set frequency")
set_grid.addWidget(set_btn, 0, 0, 1, 1)
# for urukuls also allow switching off RF
if is_urukul:
off_btn = QtWidgets.QToolButton()
off_btn.setText("Off")
off_btn.setToolTip("Switch off the output")
set_grid.addWidget(off_btn, 0, 1, 1, 1)
self.button_stack.addWidget(set_grid)
# page 2: apply/cancel buttons
apply_grid = LayoutWidget()
apply = QtWidgets.QToolButton()
apply.setText("Apply")
apply.setToolTip("Apply changes")
apply_grid.addWidget(apply, 0, 0, 1, 1)
cancel = QtWidgets.QToolButton()
cancel.setText("Cancel")
cancel.setToolTip("Cancel changes")
apply_grid.addWidget(cancel, 0, 1, 1, 1)
self.button_stack.addWidget(apply_grid)
grid.addWidget(self.button_stack, 0, 1)
set_btn.clicked.connect(self.set_clicked)
apply.clicked.connect(self.apply_changes)
if is_urukul:
off_btn.clicked.connect(self.off_clicked)
off_btn.setToolTip(textwrap.dedent(
"""Note: If TTL RTIO sw for the channel is switched high,
this button will not disable the channel.
Use the TTL override instead."""))
self.value_edit.returnPressed.connect(self.apply_changes)
def cancel_changes(cancel):
self.set_page(0)
self.value_edit.escapePressedConnect(cancel_changes)
cancel.clicked.connect(cancel_changes)
def set_page(self, page):
self.data_stack.setCurrentIndex(page)
self.button_stack.setCurrentIndex(page)
def get_value(self):
return float(self.value_edit.text())
def set_edit_value(self, value):
self.value_edit.setText("{:.7f}".format(value))
def set_value(self, value):
self.value_label.setText("<font size=\"4\">{:.7f}</font>".format(value))
self.set_edit_value(value)
def start_edit(self):
self.value_edit.setFocus()
self.value_edit.selectAll()
class _DDSHandler:
def __init__(self, dm, title, bus_channel, channel, dds_type,
ref_clk, cpld=None, pll=1, clk_div=0):
self.dm = dm
self.title = title
self.uid = title
self.bus_channel = bus_channel
self.channel = channel
self.cur_frequency = 0
self.dds_name = title
self.cpld = cpld
self.cur_frequency = 0
self.cur_reg = 0
self.dds_type = dds_type
self.is_urukul = dds_type in ["AD9910", "AD9912"]
if dds_type == "AD9914":
self.ftw_per_hz = 2**32 / ref_clk
else:
if dds_type == "AD9910":
max_freq = 1 << 32
clk_mult = [4, 1, 2, 4]
elif dds_type == "AD9912": # AD9912
max_freq = 1 << 48
clk_mult = [1, 1, 2, 4]
else:
raise NotImplementedError
sysclk = ref_clk / clk_mult[clk_div] * pll
self.ftw_per_hz = 1 / sysclk * max_freq
self.widget = None
def create_widget(self):
self.widget = _DDSWidget(self.title, self.is_urukul)
self.widget.apply_changes.connect(self.apply_changes)
self.widget.off_clicked.connect(self.off_clicked)
self.widget.set_clicked.connect(self.set_clicked)
self.refresh_display()
def monitor_update(self, probe, value):
if self.dds_type == "AD9912":
value = value << 16
self.cur_frequency = self._ftw_to_freq(value)
def _ftw_to_freq(self, ftw):
return ftw / self.ftw_per_hz
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_frequency / 1e6)
def apply_changes(self):
self.widget.set_page(0)
frequency = self.widget.get_value() * 1e6
self.dm.dds_set_frequency(self.dds_name, frequency)
def set_clicked(self):
self.widget.set_page(1)
self.widget.set_edit_value(self.cur_frequency / 1e6)
self.widget.start_edit()
def off_clicked(self):
self.dm.dds_channel_toggle(self.dds_name, sw=False)
def to_model_path(self):
return "dds/{}".format(self.title)
class _DACWidget(QtWidgets.QFrame):
def __init__(self, channel, title):
QtWidgets.QFrame.__init__(self)
self.uid = (title, channel)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(2, 2, 2, 2)
grid.setHorizontalSpacing(2)
self.setLayout(grid)
label = QtWidgets.QLabel("{} ch{}".format(title, channel))
label.setAlignment(QtCore.Qt.AlignLeft)
grid.addWidget(label, 0, 1)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignRight)
grid.addWidget(self.value, 0, 2, 1, 1)
def set_value(self, value):
self.value.setText("<font size=\"4\">{:.3f}</font><font size=\"2\"> %</font>"
.format(value))
class _DACHandler:
def __init__(self, dm, spi_channel, channel, title):
self.widget = None
self.cur_value = 0
self.spi_channel = spi_channel
self.channel = channel
self.title = title
self.uid = (title, channel)
def create_widget(self):
self.widget = _DACWidget(self.channel, self.title)
self.refresh_display()
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_value * 100 / 2**16)
def to_model_path(self):
return "dac/{}/{}".format(self.title, self.channel)
class Model(DictSyncTreeSepModel):
def __init__(self, init):
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
def clear(self):
for k in self.backing_store:
self._del_item(self, k.split(self.separator))
self.backing_store.clear()
def update(self, d):
for k, v in d.items():
self[v.to_model_path()] = v
class _AddChannelDialog(QtWidgets.QDialog):
def __init__(self, parent, model):
QtWidgets.QDialog.__init__(self, parent=parent)
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
self.setWindowTitle("Add channels")
layout = QtWidgets.QVBoxLayout()
self.setLayout(layout)
self._model = model
self._tree_view = QtWidgets.QTreeView()
self._tree_view.setHeaderHidden(True)
self._tree_view.setSelectionBehavior(
QtWidgets.QAbstractItemView.SelectItems)
self._tree_view.setSelectionMode(
QtWidgets.QAbstractItemView.ExtendedSelection)
self._tree_view.setModel(self._model)
layout.addWidget(self._tree_view)
self._button_box = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel
)
self._button_box.setCenterButtons(True)
self._button_box.accepted.connect(self.add_channels)
self._button_box.rejected.connect(self.reject)
layout.addWidget(self._button_box)
def add_channels(self):
selection = self._tree_view.selectedIndexes()
channels = []
for select in selection:
key = self._model.index_to_key(select)
if key is not None:
channels.append(self._model[key].ref)
self.channels = channels
self.accept()
_HandlerDesc = namedtuple("_HandlerDesc", "uid comment cls arguments")
def setup_from_ddb(ddb):
mi_addr = None
mi_port = None
dds_sysclk = None
description = set()
for k, v in ddb.items():
try:
if isinstance(v, dict):
comment = v.get("comment")
if v["type"] == "local":
if v["module"] == "artiq.coredevice.ttl":
if "ttl_urukul" in k:
continue
channel = v["arguments"]["channel"]
force_out = v["class"] == "TTLOut"
handler = _HandlerDesc(k, comment, _TTLHandler, (channel, force_out, k))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9914" and v["class"] == "AD9914"):
bus_channel = v["arguments"]["bus_channel"]
channel = v["arguments"]["channel"]
dds_sysclk = v["arguments"]["sysclk"]
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], dds_sysclk))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9910" and v["class"] == "AD9910") or \
(v["module"] == "artiq.coredevice.ad9912" and v["class"] == "AD9912"):
channel = v["arguments"]["chip_select"] - 4
if channel < 0:
continue
dds_cpld = v["arguments"]["cpld_device"]
spi_dev = ddb[dds_cpld]["arguments"]["spi_device"]
bus_channel = ddb[spi_dev]["arguments"]["channel"]
pll = v["arguments"]["pll_n"]
refclk = ddb[dds_cpld]["arguments"]["refclk"]
clk_div = v["arguments"].get("clk_div", 0)
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], refclk,
dds_cpld, pll, clk_div))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") or \
(v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino"):
spi_device = v["arguments"]["spi_device"]
spi_device = ddb[spi_device]
while isinstance(spi_device, str):
spi_device = ddb[spi_device]
spi_channel = spi_device["arguments"]["channel"]
for channel in range(32):
handler = _HandlerDesc((k, channel), comment, _DACHandler,
(spi_channel, channel, k))
description.add(handler)
elif v["type"] == "controller" and k == "core_moninj":
mi_addr = v["host"]
mi_port = v.get("port_proxy", 1383)
except KeyError:
pass
return mi_addr, mi_port, description
class _DeviceManager:
def __init__(self, schedule_ctl):
self.mi_addr = None
self.mi_port = None
self.reconnect_mi = asyncio.Event()
self.mi_connection = None
self.mi_connector_task = asyncio.ensure_future(self.mi_connector())
self.schedule_ctl = schedule_ctl
self.ddb = dict()
self.description = set()
self.handlers_by_uid = dict()
self.dds_sysclk = 0
self.ttl_handlers = dict()
self.dds_handlers = dict()
self.dac_handlers = dict()
self.channels_cb = lambda: None
def init_ddb(self, ddb):
self.ddb = ddb
def notify_ddb(self, mod):
mi_addr, mi_port, description = setup_from_ddb(self.ddb)
if (mi_addr, mi_port) != (self.mi_addr, self.mi_port):
self.mi_addr = mi_addr
self.mi_port = mi_port
self.reconnect_mi.set()
for to_remove in self.description - description:
handler = self.handlers_by_uid[to_remove.uid]
del self.handlers_by_uid[to_remove.uid]
if isinstance(handler, _TTLHandler):
self.setup_ttl_monitoring(False, handler.channel)
handler.delete_widget()
del self.ttl_handlers[handler.channel]
elif isinstance(handler, _DDSHandler):
self.setup_dds_monitoring(False, handler.bus_channel, handler.channel)
handler.delete_widget()
del self.dds_handlers[(handler.bus_channel, handler.channel)]
elif isinstance(handler, _DACHandler):
self.setup_dac_monitoring(False, handler.spi_channel, handler.channel)
handler.delete_widget()
del self.dac_handlers[(handler.spi_channel, handler.channel)]
else:
raise ValueError
for to_add in description - self.description:
handler = to_add.cls(self, *to_add.arguments)
handler.create_widget()
if to_add.comment is not None:
handler.widget.setToolTip(to_add.comment)
self.handlers_by_uid[to_add.uid] = handler
if isinstance(handler, _TTLHandler):
self.ttl_handlers[handler.channel] = handler
self.setup_ttl_monitoring(True, handler.channel)
elif isinstance(handler, _DDSHandler):
self.dds_handlers[(handler.bus_channel, handler.channel)] = handler
self.setup_dds_monitoring(True, handler.bus_channel, handler.channel)
elif isinstance(handler, _DACHandler):
self.dac_handlers[(handler.spi_channel, handler.channel)] = handler
self.setup_dac_monitoring(True, handler.spi_channel, handler.channel)
else:
raise ValueError
if description != self.description:
self.channels_cb()
self.description = description
def ttl_set_mode(self, channel, mode):
if self.mi_connection is not None:
handler = self.ttl_handlers[channel]
if mode == "0":
handler.cur_override = True
handler.cur_level = False
self.mi_connection.inject(channel, TTLOverride.level.value, 0)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "1":
handler.cur_override = True
handler.cur_level = True
self.mi_connection.inject(channel, TTLOverride.level.value, 1)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "exp":
handler.cur_override = False
self.mi_connection.inject(channel, TTLOverride.en.value, 0)
else:
raise ValueError
# override state may have changed
handler.refresh_display()
async def _submit_by_content(self, content, class_name, title):
expid = {
"log_level": logging.WARNING,
"content": content,
"class_name": class_name,
"arguments": {}
}
scheduling = {
"pipeline_name": "main",
"priority": 0,
"due_date": None,
"flush": False
}
rid = await self.schedule_ctl.submit(
scheduling["pipeline_name"],
expid,
scheduling["priority"], scheduling["due_date"],
scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid)
def _dds_faux_injection(self, dds_channel, action, title, log_msg):
handler = self.handlers_by_uid[dds_channel]
# create kernel and fill it in and send-by-content
# initialize CPLD (if applicable)
if handler.is_urukul:
# urukuls need CPLD init and switch to on
cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(handler.cpld)
# `sta`/`rf_sw`` variables are guaranteed for urukuls
# so {action} can use it
# if there's no RF enabled, CPLD may have not been initialized
# but if there is, it has been initialised - no need to do again
cpld_init = """delay(15*ms)
was_init = self.core_cache.get("_{cpld}_init")
sta = self.{cpld}.sta_read()
rf_sw = urukul_sta_rf_sw(sta)
if rf_sw == 0 and len(was_init) == 0:
delay(15*ms)
self.{cpld}.init()
self.core_cache.put("_{cpld}_init", [1])
""".format(cpld=handler.cpld)
else:
cpld_dev = ""
cpld_init = ""
# AD9912/9910: init channel (if uninitialized)
if handler.dds_type == "AD9912":
# 0xFF before init, 0x99 after
channel_init = """
if self.{dds_channel}.read({cfgreg}, length=1) == 0xFF:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
elif handler.dds_type == "AD9910":
# -1 before init, 2 after
channel_init = """
if self.{dds_channel}.read32({cfgreg}) == -1:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
else:
channel_init = "self.{dds_channel}.init()".format(dds_channel=dds_channel)
dds_exp = textwrap.dedent("""
from artiq.experiment import *
from artiq.coredevice.urukul import *
class {title}(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("{dds_channel}")
{cpld_dev}
@kernel
def run(self):
self.core.break_realtime()
{cpld_init}
delay(10*ms)
{channel_init}
delay(15*ms)
{action}
""".format(title=title, action=action,
dds_channel=dds_channel,
cpld_dev=cpld_dev, cpld_init=cpld_init,
channel_init=channel_init))
asyncio.ensure_future(
self._submit_by_content(
dds_exp,
title,
log_msg))
def dds_set_frequency(self, dds_channel, freq):
handler = self.handlers_by_uid[dds_channel]
action = "self.{ch}.set({freq})".format(
freq=freq, ch=dds_channel)
if handler.is_urukul:
action += """
ch_no = self.{ch}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw | 1 << ch_no)
""".format(ch=dds_channel, cpld=handler.cpld)
self._dds_faux_injection(
dds_channel,
action,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq / 1e6))
def dds_channel_toggle(self, dds_channel, sw=True):
handler = self.handlers_by_uid[dds_channel]
# urukul only
if sw:
switch = "| 1 << ch_no"
else:
switch = "& ~(1 << ch_no)"
action = """
ch_no = self.{dds_channel}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw {switch})
""".format(
dds_channel=dds_channel,
cpld=handler.cpld,
switch=switch
)
self._dds_faux_injection(
dds_channel,
action,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
def setup_ttl_monitoring(self, enable, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value)
self.mi_connection.monitor_probe(enable, channel, TTLProbe.oe.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.en.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.level.value)
if enable:
self.mi_connection.get_injection_status(channel, TTLOverride.en.value)
def setup_dds_monitoring(self, enable, bus_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, bus_channel, channel)
def setup_dac_monitoring(self, enable, spi_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, spi_channel, channel)
def monitor_cb(self, channel, probe, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if probe == TTLProbe.level.value:
handler.cur_level = bool(value)
elif probe == TTLProbe.oe.value:
handler.cur_oe = bool(value)
handler.refresh_display()
elif (channel, probe) in self.dds_handlers:
handler = self.dds_handlers[(channel, probe)]
handler.monitor_update(probe, value)
handler.refresh_display()
elif (channel, probe) in self.dac_handlers:
handler = self.dac_handlers[(channel, probe)]
handler.cur_value = value
handler.refresh_display()
def injection_status_cb(self, channel, override, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if override == TTLOverride.en.value:
handler.cur_override = bool(value)
if override == TTLOverride.level.value:
handler.cur_override_level = bool(value)
handler.refresh_display()
def disconnect_cb(self):
logger.error("lost connection to moninj")
self.reconnect_mi.set()
async def mi_connector(self):
while True:
await self.reconnect_mi.wait()
self.reconnect_mi.clear()
if self.mi_connection is not None:
await self.mi_connection.close()
self.mi_connection = None
new_mi_connection = CommMonInj(self.monitor_cb, self.injection_status_cb,
self.disconnect_cb)
try:
await new_mi_connection.connect(self.mi_addr, self.mi_port)
except Exception:
logger.error("failed to connect to moninj. Is aqctl_moninj_proxy running?",
exc_info=True)
await asyncio.sleep(10.)
self.reconnect_mi.set()
else:
logger.info("ARTIQ dashboard connected to moninj (%s)",
self.mi_addr)
self.mi_connection = new_mi_connection
for ttl_channel in self.ttl_handlers.keys():
self.setup_ttl_monitoring(True, ttl_channel)
for bus_channel, channel in self.dds_handlers.keys():
self.setup_dds_monitoring(True, bus_channel, channel)
for spi_channel, channel in self.dac_handlers.keys():
self.setup_dac_monitoring(True, spi_channel, channel)
async def close(self):
self.mi_connector_task.cancel()
try:
await asyncio.wait_for(self.mi_connector_task, None)
except asyncio.CancelledError:
pass
if self.mi_connection is not None:
await self.mi_connection.close()
class MoninjTreeWidget(pg.TreeWidget):
def __init__(self):
pg.TreeWidget.__init__(self)
self.setColumnCount(1)
self.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
palette = QtGui.QPalette()
palette.setColor(QtGui.QPalette.Highlight, QtGui.QColor("gray"))
self.setPalette(palette)
self._widgets = dict()
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
delete_action = QtWidgets.QAction("Delete", self)
delete_action.triggered.connect(self._remove_item_action)
delete_action.setShortcut("DEL")
delete_action.setShortcutContext(QtCore.Qt.WidgetShortcut)
self.addAction(delete_action)
add_grp_action = QtWidgets.QAction("Add new group", self)
add_grp_action.triggered.connect(self.add_group)
self.addAction(add_grp_action)
def supportedDropActions(self):
# Moving items breaks widgets in pg.TreeWidget, perform only copies
return QtCore.Qt.CopyAction
def itemMoving(self, item, parent, index):
if parent is None:
return True
if self.indexOfTopLevelItem(parent) == -1:
return False
if self.itemWidget(item, 0) is None:
return False
if self.itemWidget(parent, 0) is not None:
return False
return True
def add_widget(self, widget):
item = pg.TreeWidgetItem()
item.setFlags(item.flags() & ~QtCore.Qt.ItemFlag.ItemIsDropEnabled)
self._widgets[widget.uid] = widget
self.addTopLevelItem(item)
self.setItemWidget(item, 0, widget)
def _remove_item_action(self):
if len(self.selectedItems()) == 0:
return
item = self.selectedItems()[0]
self.remove_item(item)
def remove_item(self, item):
if self.itemWidget(item, 0) is None:
self.remove_group_items(item)
else:
self.remove_widget(item)
root = self.invisibleRootItem()
(item.parent() or root).removeChild(item)
def remove_widget(self, item):
widget = self.itemWidget(item, 0)
del self._widgets[widget.uid]
def add_group(self):
group = "untitled_group"
group_item = pg.TreeWidgetItem([group])
group_item.setFlags(group_item.flags() | QtCore.Qt.ItemFlag.ItemIsEditable)
self.addTopLevelItem(group_item)
return group_item
def remove_group_items(self, group_item):
child_items = group_item.takeChildren()
for child in child_items:
self.remove_widget(child)
def extend(self, d):
pass
def clear(self):
pass
def export_list(self):
pass
class MonInjDock(QtWidgets.QDockWidget):
def __init__(self, schedule_ctl):
QtWidgets.QDockWidget.__init__(self, "MonInj")
self.setObjectName("MonInj")
self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable |
QtWidgets.QDockWidget.DockWidgetFloatable)
self.dm = _DeviceManager(schedule_ctl)
self.dm.channels_cb = lambda: self.set_channels(self.dm.handlers_by_uid)
layout = LayoutWidget()
self.setWidget(layout)
self._channel_model = Model({})
self.add_channel_dialog = _AddChannelDialog(self, self._channel_model)
self.add_channel_dialog.accepted.connect(
lambda: self.layout_widgets(self.add_channel_dialog.channels))
add_channel_btn = QtWidgets.QToolButton()
add_channel_btn.setToolTip("Add channels...")
add_channel_btn.setIcon(
QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_FileDialogListView))
add_channel_btn.clicked.connect(self.add_channel_dialog.open)
layout.addWidget(add_channel_btn, 0, 0, colspan=1)
self.tree = MoninjTreeWidget()
layout.addWidget(self.tree, 1, 0, colspan=10)
def layout_widgets(self, handlers):
for handler in handlers:
handler.create_widget()
self.tree.add_widget(handler.widget)
handler.refresh_display()
def set_channels(self, handlers):
self._channel_model.update(handlers)
def _connect_config(self, config):
pass
async def load_configuration(self):
try:
filename = await get_open_file_name(
self,
"Load configuration",
self._current_dir,
"PYON files (*.pyon);;All files (*.*)")
except asyncio.CancelledError:
return
self._current_dir = os.path.dirname(filename)
try:
configuration = pyon.load_file(filename)
connected_config = self._connect_config(configuration)
connected_config = self.dm.connect_config(configuration)
self.tree.clear()
self.tree.extend(connected_config)
except Exception:
logger.error("Failed to load moninj configuration", exc_info=True)
async def save_configuration(self):
try:
filename = await get_save_file_name(
self,
"Save configuration",
self._current_dir,
"PYON files (*.pyon);;All files (*.*)")
except asyncio.CancelledError:
return
self._current_dir = os.path.dirname(filename)
try:
configuration = self.tree.export_list()
pyon.store_file(filename, configuration)
except Exception:
logger.error("Failed to save moninj configuration", exc_info=True)
async def stop(self):
if self.dm is not None:
await self.dm.close()