forked from M-Labs/artiq
1
0
Fork 0
artiq/artiq/dashboard/moninj.py

879 lines
32 KiB
Python

import asyncio
import logging
import textwrap
from collections import namedtuple
from PyQt5 import QtCore, QtWidgets
from artiq.coredevice.comm_moninj import CommMonInj, TTLOverride, TTLProbe
from artiq.coredevice.ad9912_reg import AD9912_SER_CONF
from artiq.gui.tools import LayoutWidget
from artiq.gui.flowlayout import FlowLayout
from artiq.gui.models import DictSyncTreeSepModel
logger = logging.getLogger(__name__)
class _CancellableLineEdit(QtWidgets.QLineEdit):
def escapePressedConnect(self, cb):
self.esc_cb = cb
def keyPressEvent(self, event):
key = event.key()
if key == QtCore.Qt.Key_Escape:
self.esc_cb(event)
QtWidgets.QLineEdit.keyPressEvent(self, event)
class _TTLWidget(QtWidgets.QFrame):
override_toggled = QtCore.pyqtSignal(bool)
level_toggled = QtCore.pyqtSignal(bool)
def __init__(self, title):
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(0, 0, 0, 0)
grid.setHorizontalSpacing(0)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
label.setAlignment(QtCore.Qt.AlignCenter)
label.setSizePolicy(QtWidgets.QSizePolicy.Ignored,
QtWidgets.QSizePolicy.Preferred)
grid.addWidget(label, 1, 1)
self.stack = QtWidgets.QStackedWidget()
grid.addWidget(self.stack, 2, 1)
self.direction = QtWidgets.QLabel()
self.direction.setAlignment(QtCore.Qt.AlignCenter)
self.stack.addWidget(self.direction)
grid_cb = LayoutWidget()
grid_cb.layout.setContentsMargins(0, 0, 0, 0)
grid_cb.layout.setHorizontalSpacing(0)
grid_cb.layout.setVerticalSpacing(0)
self.override = QtWidgets.QToolButton()
self.override.setText("OVR")
self.override.setCheckable(True)
self.override.setToolTip("Override")
grid_cb.addWidget(self.override, 3, 1)
self.level = QtWidgets.QToolButton()
self.level.setText("LVL")
self.level.setCheckable(True)
self.level.setToolTip("Level")
grid_cb.addWidget(self.level, 3, 2)
self.stack.addWidget(grid_cb)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(self.value, 3, 1)
grid.setRowStretch(1, 1)
grid.setRowStretch(2, 0)
grid.setRowStretch(3, 0)
grid.setRowStretch(4, 1)
self.override.clicked.connect(self.override_toggled)
self.level.clicked.connect(self.level_toggled)
def setValueText(self, override, level):
value_s = "1" if level else "0"
if override:
value_s = "<b>" + value_s + "</b>"
color = " color=\"red\""
else:
color = ""
self.value.setText("<font size=\"5\"{}>{}</font>".format(
color, value_s))
def setDirectionText(self, oe):
direction = "OUT" if oe else "IN"
self.direction.setText("<font size=\"2\">" + direction + "</font>")
def setButtonsState(self, override, level):
self.override.setChecked(override)
if override:
self.stack.setCurrentIndex(1)
self.level.setChecked(level)
def enterEvent(self, event):
self.stack.setCurrentIndex(1)
QtWidgets.QFrame.enterEvent(self, event)
def leaveEvent(self, event):
if not self.override.isChecked():
self.stack.setCurrentIndex(0)
QtWidgets.QFrame.leaveEvent(self, event)
class _TTLHandler:
def __init__(self, dm, channel, force_out, title):
self.channel = channel
self.force_out = force_out
self.set_mode = dm.ttl_set_mode
self.title = title
self.cur_level = False
self.cur_oe = False
self.cur_override = False
self.cur_override_level = False
self.widget = None
def override_toggled(self, override):
if override:
if self.widget.level.isChecked():
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
else:
self.set_mode(self.channel, "exp")
def level_toggled(self, level):
if self.widget.override.isChecked():
if level:
self.set_mode(self.channel, "1")
else:
self.set_mode(self.channel, "0")
def refresh_display(self):
if self.widget is not None:
level = self.cur_override_level if self.cur_override else self.cur_level
oe = self.cur_oe or self.force_out
self.widget.setValueText(self.cur_override, level)
self.widget.setDirectionText(oe)
with QtCore.QSignalBlocker(self.widget):
self.widget.setButtonsState(self.cur_override, self.cur_level)
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def create_widget(self):
self.widget = _TTLWidget(self.title)
self.widget.override_toggled.connect(self.override_toggled)
self.widget.level_toggled.connect(self.level_toggled)
self.refresh_display()
def to_model_path(self):
return "ttl/{}".format(self.title)
class _DDSWidget(QtWidgets.QFrame):
apply_changes = QtCore.pyqtSignal()
off_clicked = QtCore.pyqtSignal()
set_clicked = QtCore.pyqtSignal()
def __init__(self, title, is_urukul):
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(0, 0, 0, 0)
grid.setHorizontalSpacing(0)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 1, 1)
# FREQ DATA/EDIT FIELD
self.data_stack = QtWidgets.QStackedWidget()
# page 1: display data
grid_disp = LayoutWidget()
grid_disp.layout.setContentsMargins(0, 0, 0, 0)
grid_disp.layout.setHorizontalSpacing(0)
grid_disp.layout.setVerticalSpacing(0)
self.value_label = QtWidgets.QLabel()
self.value_label.setAlignment(QtCore.Qt.AlignCenter)
grid_disp.addWidget(self.value_label, 0, 1, 1, 2)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_disp.addWidget(unit, 0, 3, 1, 1)
self.data_stack.addWidget(grid_disp)
# page 2: edit data
grid_edit = LayoutWidget()
grid_edit.layout.setContentsMargins(0, 0, 0, 0)
grid_edit.layout.setHorizontalSpacing(0)
grid_edit.layout.setVerticalSpacing(0)
self.value_edit = _CancellableLineEdit(self)
self.value_edit.setAlignment(QtCore.Qt.AlignRight)
grid_edit.addWidget(self.value_edit, 0, 1, 1, 2)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_edit.addWidget(unit, 0, 3, 1, 1)
self.data_stack.addWidget(grid_edit)
grid.addWidget(self.data_stack, 2, 1)
# BUTTONS
self.button_stack = QtWidgets.QStackedWidget()
# page 1: SET button
set_grid = LayoutWidget()
set_btn = QtWidgets.QToolButton()
set_btn.setText("Set")
set_btn.setToolTip("Set frequency")
set_grid.addWidget(set_btn, 0, 1, 1, 1)
# for urukuls also allow switching off RF
if is_urukul:
off_btn = QtWidgets.QToolButton()
off_btn.setText("Off")
off_btn.setToolTip("Switch off the output")
set_grid.addWidget(off_btn, 0, 2, 1, 1)
self.button_stack.addWidget(set_grid)
# page 2: apply/cancel buttons
apply_grid = LayoutWidget()
apply = QtWidgets.QToolButton()
apply.setText("Apply")
apply.setToolTip("Apply changes")
apply_grid.addWidget(apply, 0, 1, 1, 1)
cancel = QtWidgets.QToolButton()
cancel.setText("Cancel")
cancel.setToolTip("Cancel changes")
apply_grid.addWidget(cancel, 0, 2, 1, 1)
self.button_stack.addWidget(apply_grid)
grid.addWidget(self.button_stack, 3, 1)
grid.setRowStretch(1, 1)
grid.setRowStretch(2, 1)
grid.setRowStretch(3, 1)
set_btn.clicked.connect(self.set_clicked)
apply.clicked.connect(self.apply_changes)
if is_urukul:
off_btn.clicked.connect(self.off_clicked)
off_btn.setToolTip(textwrap.dedent(
"""Note: If TTL RTIO sw for the channel is switched high,
this button will not disable the channel.
Use the TTL override instead."""))
self.value_edit.returnPressed.connect(self.apply_changes)
def cancel_changes(cancel):
self.set_page(0)
self.value_edit.escapePressedConnect(cancel_changes)
cancel.clicked.connect(cancel_changes)
def set_page(self, page):
self.data_stack.setCurrentIndex(page)
self.button_stack.setCurrentIndex(page)
def get_value(self):
return float(self.value_edit.text())
def set_edit_value(self, value):
self.value_edit.setText("{:.7f}".format(value))
def set_value(self, value):
self.value_label.setText("<font size=\"4\">{:.7f}</font>".format(value))
self.set_edit_value(value)
def start_edit(self):
self.value_edit.setFocus()
self.value_edit.selectAll()
class _DDSHandler:
def __init__(self, dm, title, bus_channel, channel, dds_type,
ref_clk, cpld=None, pll=1, clk_div=0):
self.dm = dm
self.title = title
self.bus_channel = bus_channel
self.channel = channel
self.cur_frequency = 0
self.dds_name = title
self.cpld = cpld
self.cur_frequency = 0
self.cur_reg = 0
self.dds_type = dds_type
self.is_urukul = dds_type in ["AD9910", "AD9912"]
if dds_type == "AD9914":
self.ftw_per_hz = 2**32 / ref_clk
else:
if dds_type == "AD9910":
max_freq = 1 << 32
clk_mult = [4, 1, 2, 4]
elif dds_type == "AD9912": # AD9912
max_freq = 1 << 48
clk_mult = [1, 1, 2, 4]
else:
raise NotImplementedError
sysclk = ref_clk / clk_mult[clk_div] * pll
self.ftw_per_hz = 1 / sysclk * max_freq
self.widget = None
def create_widget(self):
self.widget = _DDSWidget(self.title, self.is_urukul)
self.widget.apply_changes.connect(self.apply_changes)
self.widget.off_clicked.connect(self.off_clicked)
self.widget.set_clicked.connect(self.set_clicked)
self.refresh_display()
def monitor_update(self, probe, value):
if self.dds_type == "AD9912":
value = value << 16
self.cur_frequency = self._ftw_to_freq(value)
def _ftw_to_freq(self, ftw):
return ftw / self.ftw_per_hz
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_frequency / 1e6)
def apply_changes(self):
self.widget.set_page(0)
frequency = self.widget.get_value() * 1e6
self.dm.dds_set_frequency(self.dds_name, frequency)
def set_clicked(self):
self.widget.set_page(1)
self.widget.set_edit_value(self.cur_frequency / 1e6)
self.widget.start_edit()
def off_clicked(self):
self.dm.dds_channel_toggle(self.dds_name, sw=False)
def to_model_path(self):
return "dds/{}".format(self.title)
class _DACWidget(QtWidgets.QFrame):
def __init__(self, channel, title):
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(0, 0, 0, 0)
grid.setHorizontalSpacing(0)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel("{} ch{}".format(title, channel))
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 1, 1)
self.value = QtWidgets.QLabel()
self.value.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(self.value, 2, 1, 6, 1)
grid.setRowStretch(1, 1)
grid.setRowStretch(2, 0)
grid.setRowStretch(3, 1)
def set_value(self, value):
self.value.setText("<font size=\"4\">{:.3f}</font><font size=\"2\"> %</font>"
.format(value))
class _DACHandler:
def __init__(self, dm, spi_channel, channel, title):
self.widget = None
self.cur_value = 0
self.spi_channel = spi_channel
self.channel = channel
self.title = title
def create_widget(self):
self.widget = _DACWidget(self.channel, self.title)
self.refresh_display()
def delete_widget(self):
self.widget.deleteLater()
self.widget = None
def refresh_display(self):
if self.widget is not None:
self.widget.set_value(self.cur_value * 100 / 2**16)
def to_model_path(self):
return "dac/{}/{}".format(self.title, self.channel)
class Model(DictSyncTreeSepModel):
def __init__(self, init):
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
def clear(self):
for k in self.backing_store:
self._del_item(self, k.split(self.separator))
self.backing_store.clear()
def update(self, d):
for k, v in d.items():
self[v.to_model_path()] = v
class _AddChannelDialog(QtWidgets.QDialog):
def __init__(self, parent, model):
QtWidgets.QDialog.__init__(self, parent=parent)
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
self.setWindowTitle("Add channels")
layout = QtWidgets.QVBoxLayout()
self.setLayout(layout)
self._model = model
self._tree_view = QtWidgets.QTreeView()
self._tree_view.setHeaderHidden(True)
self._tree_view.setSelectionBehavior(
QtWidgets.QAbstractItemView.SelectItems)
self._tree_view.setSelectionMode(
QtWidgets.QAbstractItemView.ExtendedSelection)
self._tree_view.setModel(self._model)
layout.addWidget(self._tree_view)
self._button_box = QtWidgets.QDialogButtonBox(
QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel
)
self._button_box.setCenterButtons(True)
self._button_box.accepted.connect(self.add_channels)
self._button_box.rejected.connect(self.reject)
layout.addWidget(self._button_box)
def add_channels(self):
selection = self._tree_view.selectedIndexes()
channels = []
for select in selection:
key = self._model.index_to_key(select)
if key is not None:
channels.append(self._model[key].ref)
self.channels = channels
self.accept()
_HandlerDesc = namedtuple("_HandlerDesc", "uid comment cls arguments")
def setup_from_ddb(ddb):
mi_addr = None
mi_port = None
dds_sysclk = None
description = set()
for k, v in ddb.items():
try:
if isinstance(v, dict):
comment = v.get("comment")
if v["type"] == "local":
if v["module"] == "artiq.coredevice.ttl":
if "ttl_urukul" in k:
continue
channel = v["arguments"]["channel"]
force_out = v["class"] == "TTLOut"
handler = _HandlerDesc(k, comment, _TTLHandler, (channel, force_out, k))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9914" and v["class"] == "AD9914"):
bus_channel = v["arguments"]["bus_channel"]
channel = v["arguments"]["channel"]
dds_sysclk = v["arguments"]["sysclk"]
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], dds_sysclk))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9910" and v["class"] == "AD9910") or \
(v["module"] == "artiq.coredevice.ad9912" and v["class"] == "AD9912"):
channel = v["arguments"]["chip_select"] - 4
if channel < 0:
continue
dds_cpld = v["arguments"]["cpld_device"]
spi_dev = ddb[dds_cpld]["arguments"]["spi_device"]
bus_channel = ddb[spi_dev]["arguments"]["channel"]
pll = v["arguments"]["pll_n"]
refclk = ddb[dds_cpld]["arguments"]["refclk"]
clk_div = v["arguments"].get("clk_div", 0)
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, v["class"], refclk,
dds_cpld, pll, clk_div))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") or \
(v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino"):
spi_device = v["arguments"]["spi_device"]
spi_device = ddb[spi_device]
while isinstance(spi_device, str):
spi_device = ddb[spi_device]
spi_channel = spi_device["arguments"]["channel"]
for channel in range(32):
handler = _HandlerDesc((k, channel), comment, _DACHandler,
(spi_channel, channel, k))
description.add(handler)
elif v["type"] == "controller" and k == "core_moninj":
mi_addr = v["host"]
mi_port = v.get("port_proxy", 1383)
except KeyError:
pass
return mi_addr, mi_port, description
class _DeviceManager:
def __init__(self, schedule_ctl):
self.mi_addr = None
self.mi_port = None
self.reconnect_mi = asyncio.Event()
self.mi_connection = None
self.mi_connector_task = asyncio.ensure_future(self.mi_connector())
self.schedule_ctl = schedule_ctl
self.ddb = dict()
self.description = set()
self.handlers_by_uid = dict()
self.dds_sysclk = 0
self.ttl_handlers = dict()
self.dds_handlers = dict()
self.dac_handlers = dict()
self.channels_cb = lambda: None
def init_ddb(self, ddb):
self.ddb = ddb
def notify_ddb(self, mod):
mi_addr, mi_port, description = setup_from_ddb(self.ddb)
if (mi_addr, mi_port) != (self.mi_addr, self.mi_port):
self.mi_addr = mi_addr
self.mi_port = mi_port
self.reconnect_mi.set()
for to_remove in self.description - description:
handler = self.handlers_by_uid[to_remove.uid]
del self.handlers_by_uid[to_remove.uid]
if isinstance(handler, _TTLHandler):
self.setup_ttl_monitoring(False, handler.channel)
handler.delete_widget()
del self.ttl_handlers[handler.channel]
elif isinstance(handler, _DDSHandler):
self.setup_dds_monitoring(False, handler.bus_channel, handler.channel)
handler.delete_widget()
del self.dds_handlers[(handler.bus_channel, handler.channel)]
elif isinstance(handler, _DACHandler):
self.setup_dac_monitoring(False, handler.spi_channel, handler.channel)
handler.delete_widget()
del self.dac_handlers[(handler.spi_channel, handler.channel)]
else:
raise ValueError
for to_add in description - self.description:
handler = to_add.cls(self, *to_add.arguments)
handler.create_widget()
if to_add.comment is not None:
handler.widget.setToolTip(to_add.comment)
self.handlers_by_uid[to_add.uid] = handler
if isinstance(handler, _TTLHandler):
self.ttl_handlers[handler.channel] = handler
self.setup_ttl_monitoring(True, handler.channel)
elif isinstance(handler, _DDSHandler):
self.dds_handlers[(handler.bus_channel, handler.channel)] = handler
self.setup_dds_monitoring(True, handler.bus_channel, handler.channel)
elif isinstance(handler, _DACHandler):
self.dac_handlers[(handler.spi_channel, handler.channel)] = handler
self.setup_dac_monitoring(True, handler.spi_channel, handler.channel)
else:
raise ValueError
if description != self.description:
self.channels_cb()
self.description = description
def ttl_set_mode(self, channel, mode):
if self.mi_connection is not None:
handler = self.ttl_handlers[channel]
if mode == "0":
handler.cur_override = True
handler.cur_level = False
self.mi_connection.inject(channel, TTLOverride.level.value, 0)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "1":
handler.cur_override = True
handler.cur_level = True
self.mi_connection.inject(channel, TTLOverride.level.value, 1)
self.mi_connection.inject(channel, TTLOverride.oe.value, 1)
self.mi_connection.inject(channel, TTLOverride.en.value, 1)
elif mode == "exp":
handler.cur_override = False
self.mi_connection.inject(channel, TTLOverride.en.value, 0)
else:
raise ValueError
# override state may have changed
handler.refresh_display()
async def _submit_by_content(self, content, class_name, title):
expid = {
"log_level": logging.WARNING,
"content": content,
"class_name": class_name,
"arguments": {}
}
scheduling = {
"pipeline_name": "main",
"priority": 0,
"due_date": None,
"flush": False
}
rid = await self.schedule_ctl.submit(
scheduling["pipeline_name"],
expid,
scheduling["priority"], scheduling["due_date"],
scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid)
def _dds_faux_injection(self, dds_channel, action, title, log_msg):
handler = self.handlers_by_uid[dds_channel]
# create kernel and fill it in and send-by-content
# initialize CPLD (if applicable)
if handler.is_urukul:
# urukuls need CPLD init and switch to on
cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(handler.cpld)
# `sta`/`rf_sw`` variables are guaranteed for urukuls
# so {action} can use it
# if there's no RF enabled, CPLD may have not been initialized
# but if there is, it has been initialised - no need to do again
cpld_init = """delay(15*ms)
was_init = self.core_cache.get("_{cpld}_init")
sta = self.{cpld}.sta_read()
rf_sw = urukul_sta_rf_sw(sta)
if rf_sw == 0 and len(was_init) == 0:
delay(15*ms)
self.{cpld}.init()
self.core_cache.put("_{cpld}_init", [1])
""".format(cpld=handler.cpld)
else:
cpld_dev = ""
cpld_init = ""
# AD9912/9910: init channel (if uninitialized)
if handler.dds_type == "AD9912":
# 0xFF before init, 0x99 after
channel_init = """
if self.{dds_channel}.read({cfgreg}, length=1) == 0xFF:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
elif handler.dds_type == "AD9910":
# -1 before init, 2 after
channel_init = """
if self.{dds_channel}.read32({cfgreg}) == -1:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
else:
channel_init = "self.{dds_channel}.init()".format(dds_channel=dds_channel)
dds_exp = textwrap.dedent("""
from artiq.experiment import *
from artiq.coredevice.urukul import *
class {title}(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("{dds_channel}")
{cpld_dev}
@kernel
def run(self):
self.core.break_realtime()
{cpld_init}
delay(10*ms)
{channel_init}
delay(15*ms)
{action}
""".format(title=title, action=action,
dds_channel=dds_channel,
cpld_dev=cpld_dev, cpld_init=cpld_init,
channel_init=channel_init))
asyncio.ensure_future(
self._submit_by_content(
dds_exp,
title,
log_msg))
def dds_set_frequency(self, dds_channel, freq):
handler = self.handlers_by_uid[dds_channel]
action = "self.{ch}.set({freq})".format(
freq=freq, ch=dds_channel)
if handler.is_urukul:
action += """
ch_no = self.{ch}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw | 1 << ch_no)
""".format(ch=dds_channel, cpld=handler.cpld)
self._dds_faux_injection(
dds_channel,
action,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq / 1e6))
def dds_channel_toggle(self, dds_channel, sw=True):
handler = self.handlers_by_uid[dds_channel]
# urukul only
if sw:
switch = "| 1 << ch_no"
else:
switch = "& ~(1 << ch_no)"
action = """
ch_no = self.{dds_channel}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw {switch})
""".format(
dds_channel=dds_channel,
cpld=handler.cpld,
switch=switch
)
self._dds_faux_injection(
dds_channel,
action,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
def setup_ttl_monitoring(self, enable, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value)
self.mi_connection.monitor_probe(enable, channel, TTLProbe.oe.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.en.value)
self.mi_connection.monitor_injection(enable, channel, TTLOverride.level.value)
if enable:
self.mi_connection.get_injection_status(channel, TTLOverride.en.value)
def setup_dds_monitoring(self, enable, bus_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, bus_channel, channel)
def setup_dac_monitoring(self, enable, spi_channel, channel):
if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, spi_channel, channel)
def monitor_cb(self, channel, probe, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if probe == TTLProbe.level.value:
handler.cur_level = bool(value)
elif probe == TTLProbe.oe.value:
handler.cur_oe = bool(value)
handler.refresh_display()
elif (channel, probe) in self.dds_handlers:
handler = self.dds_handlers[(channel, probe)]
handler.monitor_update(probe, value)
handler.refresh_display()
elif (channel, probe) in self.dac_handlers:
handler = self.dac_handlers[(channel, probe)]
handler.cur_value = value
handler.refresh_display()
def injection_status_cb(self, channel, override, value):
if channel in self.ttl_handlers:
handler = self.ttl_handlers[channel]
if override == TTLOverride.en.value:
handler.cur_override = bool(value)
if override == TTLOverride.level.value:
handler.cur_override_level = bool(value)
handler.refresh_display()
def disconnect_cb(self):
logger.error("lost connection to moninj")
self.reconnect_mi.set()
async def mi_connector(self):
while True:
await self.reconnect_mi.wait()
self.reconnect_mi.clear()
if self.mi_connection is not None:
await self.mi_connection.close()
self.mi_connection = None
new_mi_connection = CommMonInj(self.monitor_cb, self.injection_status_cb,
self.disconnect_cb)
try:
await new_mi_connection.connect(self.mi_addr, self.mi_port)
except Exception:
logger.error("failed to connect to moninj. Is aqctl_moninj_proxy running?",
exc_info=True)
await asyncio.sleep(10.)
self.reconnect_mi.set()
else:
logger.info("ARTIQ dashboard connected to moninj (%s)",
self.mi_addr)
self.mi_connection = new_mi_connection
for ttl_channel in self.ttl_handlers.keys():
self.setup_ttl_monitoring(True, ttl_channel)
for bus_channel, channel in self.dds_handlers.keys():
self.setup_dds_monitoring(True, bus_channel, channel)
for spi_channel, channel in self.dac_handlers.keys():
self.setup_dac_monitoring(True, spi_channel, channel)
async def close(self):
self.mi_connector_task.cancel()
try:
await asyncio.wait_for(self.mi_connector_task, None)
except asyncio.CancelledError:
pass
if self.mi_connection is not None:
await self.mi_connection.close()
class MonInjDock(QtWidgets.QDockWidget):
def __init__(self, schedule_ctl):
QtWidgets.QDockWidget.__init__(self, "MonInj")
self.setObjectName("MonInj")
self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable |
QtWidgets.QDockWidget.DockWidgetFloatable)
self.dm = _DeviceManager(schedule_ctl)
self.dm.channels_cb = lambda: self.set_channels(self.dm.handlers_by_uid)
layout = LayoutWidget()
self.setWidget(layout)
self._channel_model = Model({})
self.add_channel_dialog = _AddChannelDialog(self, self._channel_model)
self.add_channel_dialog.accepted.connect(
lambda: self.layout_widgets(self.add_channel_dialog.channels))
add_channel_btn = QtWidgets.QToolButton()
add_channel_btn.setToolTip("Add channels...")
add_channel_btn.setIcon(
QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_FileDialogListView))
add_channel_btn.clicked.connect(self.add_channel_dialog.open)
layout.addWidget(add_channel_btn, 0, 0, colspan=1)
scroll_area = QtWidgets.QScrollArea()
layout.addWidget(scroll_area, 1, 0, colspan=10)
self.grid = FlowLayout()
grid_widget = QtWidgets.QWidget()
grid_widget.setLayout(self.grid)
scroll_area.setWidgetResizable(True)
scroll_area.setWidget(grid_widget)
def layout_widgets(self, handlers):
for handler in handlers:
self.grid.addWidget(handler.widget)
def set_channels(self, handlers):
self._channel_model.update(handlers)
async def stop(self):
if self.dm is not None:
await self.dm.close()