diff --git a/artiq/applets/simple.py b/artiq/applets/simple.py index e5776310a..196b8f1e3 100644 --- a/artiq/applets/simple.py +++ b/artiq/applets/simple.py @@ -64,9 +64,10 @@ class AppletIPCClient(AsyncioChildComm): exc_info=True) self.close_cb() - def subscribe(self, datasets, init_cb, mod_cb): + def subscribe(self, datasets, init_cb, mod_cb, dataset_prefixes=[]): self.write_pyon({"action": "subscribe", - "datasets": datasets}) + "datasets": datasets, + "dataset_prefixes": dataset_prefixes}) self.init_cb = init_cb self.mod_cb = mod_cb asyncio.ensure_future(self.listen()) @@ -113,6 +114,9 @@ class SimpleApplet: self.embed = os.getenv("ARTIQ_APPLET_EMBED") self.datasets = {getattr(self.args, arg.replace("-", "_")) for arg in self.dataset_args} + # Optional prefixes (dataset sub-trees) to match subscriptions against; + # currently only used by out-of-tree subclasses (ndscan). + self.dataset_prefixes = [] def qasync_init(self): app = QtWidgets.QApplication([]) @@ -162,6 +166,14 @@ class SimpleApplet: self.data = data return data + def is_dataset_subscribed(self, key): + if key in self.datasets: + return True + for prefix in self.dataset_prefixes: + if key.startswith(prefix): + return True + return False + def filter_mod(self, mod): if self.embed is not None: # the parent already filters for us @@ -170,9 +182,9 @@ class SimpleApplet: if mod["action"] == "init": return True if mod["path"]: - return mod["path"][0] in self.datasets + return self.is_dataset_subscribed(mod["path"][0]) elif mod["action"] in {"setitem", "delitem"}: - return mod["key"] in self.datasets + return self.is_dataset_subscribed(mod["key"]) else: return False @@ -204,7 +216,8 @@ class SimpleApplet: self.loop.run_until_complete(self.subscriber.connect( self.args.server, self.args.port)) else: - self.ipc.subscribe(self.datasets, self.sub_init, self.sub_mod) + self.ipc.subscribe(self.datasets, self.sub_init, self.sub_mod, + dataset_prefixes=self.dataset_prefixes) def unsubscribe(self): if self.embed is None: diff --git a/artiq/coredevice/comm_kernel.py b/artiq/coredevice/comm_kernel.py index 1e7febd0d..54f0300e4 100644 --- a/artiq/coredevice/comm_kernel.py +++ b/artiq/coredevice/comm_kernel.py @@ -526,6 +526,9 @@ class CommKernel: self._skip_rpc_value(tags) elif tag == "r": self._skip_rpc_value(tags) + elif tag == "a": + _ndims = tags.pop(0) + self._skip_rpc_value(tags) else: pass diff --git a/artiq/coredevice/comm_moninj.py b/artiq/coredevice/comm_moninj.py index b5c2ee40d..e02da526c 100644 --- a/artiq/coredevice/comm_moninj.py +++ b/artiq/coredevice/comm_moninj.py @@ -46,6 +46,9 @@ class CommMonInj: del self._writer raise + def wait_terminate(self): + return self._receive_task + async def close(self): self.disconnect_cb = None try: @@ -91,6 +94,10 @@ class CommMonInj: self.injection_status_cb(channel, override, value) else: raise ValueError("Unknown packet type", ty) + except asyncio.CancelledError: + raise + except: + logger.error("Moninj connection terminating with exception", exc_info=True) finally: if self.disconnect_cb is not None: self.disconnect_cb() diff --git a/artiq/dashboard/experiments.py b/artiq/dashboard/experiments.py index 871894a66..3c9d5edac 100644 --- a/artiq/dashboard/experiments.py +++ b/artiq/dashboard/experiments.py @@ -164,7 +164,7 @@ class _ArgumentEditor(QtWidgets.QTreeWidget): async def _recompute_argument(self, name): try: - expdesc = await self.manager.compute_expdesc(self.expurl) + expdesc, _ = await self.manager.compute_expdesc(self.expurl) except: logger.error("Could not recompute argument '%s' of '%s'", name, self.expurl, exc_info=True) @@ -216,6 +216,15 @@ class _ArgumentEditor(QtWidgets.QTreeWidget): pass self.verticalScrollBar().setValue(state["scroll"]) + # Hooks that allow user-supplied argument editors to react to imminent user + # actions. Here, we always keep the manager-stored submission arguments + # up-to-date, so no further action is required. + def about_to_submit(self): + pass + + def about_to_close(self): + pass + log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] @@ -241,7 +250,8 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): self.manager = manager self.expurl = expurl - self.argeditor = _ArgumentEditor(self.manager, self, self.expurl) + editor_class = self.manager.get_argument_editor_class(expurl) + self.argeditor = editor_class(self.manager, self, self.expurl) self.layout.addWidget(self.argeditor, 0, 0, 1, 5) self.layout.setRowStretch(0, 1) @@ -369,6 +379,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): self.hdf5_load_directory = os.path.expanduser("~") def submit_clicked(self): + self.argeditor.about_to_submit() try: self.manager.submit(self.expurl) except: @@ -391,7 +402,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): async def _recompute_arguments_task(self, overrides=dict()): try: - expdesc = await self.manager.compute_expdesc(self.expurl) + expdesc, ui_name = await self.manager.compute_expdesc(self.expurl) except: logger.error("Could not recompute experiment description of '%s'", self.expurl, exc_info=True) @@ -404,12 +415,13 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): arginfo[k][0]["default"].insert(0, v) else: arginfo[k][0]["default"] = v - self.manager.initialize_submission_arguments(self.expurl, arginfo) + self.manager.initialize_submission_arguments(self.expurl, arginfo, ui_name) argeditor_state = self.argeditor.save_state() self.argeditor.deleteLater() - self.argeditor = _ArgumentEditor(self.manager, self, self.expurl) + editor_class = self.manager.get_argument_editor_class(self.expurl) + self.argeditor = editor_class(self.manager, self, self.expurl) self.argeditor.restore_state(argeditor_state) self.layout.addWidget(self.argeditor, 0, 0, 1, 5) @@ -422,7 +434,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): async def _recompute_sched_options_task(self): try: - expdesc = await self.manager.compute_expdesc(self.expurl) + expdesc, _ = await self.manager.compute_expdesc(self.expurl) except: logger.error("Could not recompute experiment description of '%s'", self.expurl, exc_info=True) @@ -473,6 +485,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow): await self._recompute_arguments_task(arguments) def closeEvent(self, event): + self.argeditor.about_to_close() self.sigClosed.emit() QtWidgets.QMdiSubWindow.closeEvent(self, event) @@ -544,7 +557,13 @@ class _QuickOpenDialog(QtWidgets.QDialog): class ExperimentManager: - def __init__(self, main_window, + #: Global registry for custom argument editor classes, indexed by the experiment + #: `argument_ui` string; can be populated by dashboard plugins such as ndscan. + #: If no handler for a requested UI name is found, the default built-in argument + #: editor will be used. + argument_ui_classes = dict() + + def __init__(self, main_window, dataset_sub, explist_sub, schedule_sub, schedule_ctl, experiment_db_ctl): self.main_window = main_window @@ -555,7 +574,10 @@ class ExperimentManager: self.submission_scheduling = dict() self.submission_options = dict() self.submission_arguments = dict() + self.argument_ui_names = dict() + self.datasets = dict() + dataset_sub.add_setmodel_callback(self.set_dataset_model) self.explist = dict() explist_sub.add_setmodel_callback(self.set_explist_model) self.schedule = dict() @@ -570,6 +592,9 @@ class ExperimentManager: quick_open_shortcut.setContext(QtCore.Qt.ApplicationShortcut) quick_open_shortcut.activated.connect(self.show_quick_open) + def set_dataset_model(self, model): + self.datasets = model + def set_explist_model(self, model): self.explist = model.backing_store @@ -586,6 +611,17 @@ class ExperimentManager: else: raise ValueError("Malformed experiment URL") + def get_argument_editor_class(self, expurl): + ui_name = self.argument_ui_names.get(expurl, None) + if not ui_name and expurl[:5] == "repo:": + ui_name = self.explist.get(expurl[5:], {}).get("argument_ui", None) + if ui_name: + result = self.argument_ui_classes.get(ui_name, None) + if result: + return result + logger.warning("Ignoring unknown argument UI '%s'", ui_name) + return _ArgumentEditor + def get_submission_scheduling(self, expurl): if expurl in self.submission_scheduling: return self.submission_scheduling[expurl] @@ -615,7 +651,7 @@ class ExperimentManager: self.submission_options[expurl] = options return options - def initialize_submission_arguments(self, expurl, arginfo): + def initialize_submission_arguments(self, expurl, arginfo, ui_name): arguments = OrderedDict() for name, (procdesc, group, tooltip) in arginfo.items(): state = procdesc_to_entry(procdesc).default_state(procdesc) @@ -626,6 +662,7 @@ class ExperimentManager: "state": state, # mutated by entries } self.submission_arguments[expurl] = arguments + self.argument_ui_names[expurl] = ui_name return arguments def get_submission_arguments(self, expurl): @@ -635,9 +672,9 @@ class ExperimentManager: if expurl[:5] != "repo:": raise ValueError("Submission arguments must be preinitialized " "when not using repository") - arginfo = self.explist[expurl[5:]]["arginfo"] - arguments = self.initialize_submission_arguments(expurl, arginfo) - return arguments + class_desc = self.explist[expurl[5:]] + return self.initialize_submission_arguments(expurl, + class_desc["arginfo"], class_desc.get("argument_ui", None)) def open_experiment(self, expurl): if expurl in self.open_experiments: @@ -739,13 +776,15 @@ class ExperimentManager: revision = None description = await self.experiment_db_ctl.examine( file, use_repository, revision) - return description[class_name] + class_desc = description[class_name] + return class_desc, class_desc.get("argument_ui", None) async def open_file(self, file): description = await self.experiment_db_ctl.examine(file, False) for class_name, class_desc in description.items(): expurl = "file:{}@{}".format(class_name, file) - self.initialize_submission_arguments(expurl, class_desc["arginfo"]) + self.initialize_submission_arguments(expurl, class_desc["arginfo"], + class_desc.get("argument_ui", None)) if expurl in self.open_experiments: self.open_experiments[expurl].close() self.open_experiment(expurl) @@ -758,6 +797,7 @@ class ExperimentManager: "options": self.submission_options, "arguments": self.submission_arguments, "docks": self.dock_states, + "argument_uis": self.argument_ui_names, "open_docks": set(self.open_experiments.keys()) } @@ -768,6 +808,7 @@ class ExperimentManager: self.submission_scheduling = state["scheduling"] self.submission_options = state["options"] self.submission_arguments = state["arguments"] + self.argument_ui_names = state.get("argument_uis", {}) for expurl in state["open_docks"]: self.open_experiment(expurl) diff --git a/artiq/dashboard/moninj.py b/artiq/dashboard/moninj.py index c624529af..df5854259 100644 --- a/artiq/dashboard/moninj.py +++ b/artiq/dashboard/moninj.py @@ -1,5 +1,6 @@ import asyncio import logging +import textwrap from collections import namedtuple from PyQt5 import QtCore, QtWidgets, QtGui @@ -13,6 +14,16 @@ from artiq.gui.flowlayout import FlowLayout logger = logging.getLogger(__name__) +class _CancellableLineEdit(QtWidgets.QLineEdit): + def escapePressedConnect(self, cb): + self.esc_cb = cb + + def keyPressEvent(self, event): + key = event.key() + if key == QtCore.Qt.Key_Escape: + self.esc_cb(event) + QtWidgets.QLineEdit.keyPressEvent(self, event) + class _TTLWidget(QtWidgets.QFrame): def __init__(self, dm, channel, force_out, title): @@ -168,15 +179,136 @@ class _SimpleDisplayWidget(QtWidgets.QFrame): raise NotImplementedError -class _DDSWidget(_SimpleDisplayWidget): - def __init__(self, dm, bus_channel, channel, title): +class _DDSWidget(QtWidgets.QFrame): + def __init__(self, dm, title, bus_channel=0, channel=0, cpld=None): + self.dm = dm self.bus_channel = bus_channel self.channel = channel + self.dds_name = title + self.cpld = cpld self.cur_frequency = 0 - _SimpleDisplayWidget.__init__(self, title) + + QtWidgets.QFrame.__init__(self) + + self.setFrameShape(QtWidgets.QFrame.Box) + self.setFrameShadow(QtWidgets.QFrame.Raised) + + grid = QtWidgets.QGridLayout() + grid.setContentsMargins(0, 0, 0, 0) + grid.setHorizontalSpacing(0) + grid.setVerticalSpacing(0) + self.setLayout(grid) + label = QtWidgets.QLabel(title) + label.setAlignment(QtCore.Qt.AlignCenter) + grid.addWidget(label, 1, 1) + + # FREQ DATA/EDIT FIELD + self.data_stack = QtWidgets.QStackedWidget() + + # page 1: display data + grid_disp = LayoutWidget() + grid_disp.layout.setContentsMargins(0, 0, 0, 0) + grid_disp.layout.setHorizontalSpacing(0) + grid_disp.layout.setVerticalSpacing(0) + + self.value_label = QtWidgets.QLabel() + self.value_label.setAlignment(QtCore.Qt.AlignCenter) + grid_disp.addWidget(self.value_label, 0, 1, 1, 2) + + unit = QtWidgets.QLabel("MHz") + unit.setAlignment(QtCore.Qt.AlignCenter) + grid_disp.addWidget(unit, 0, 3, 1, 1) + + self.data_stack.addWidget(grid_disp) + + # page 2: edit data + grid_edit = LayoutWidget() + grid_edit.layout.setContentsMargins(0, 0, 0, 0) + grid_edit.layout.setHorizontalSpacing(0) + grid_edit.layout.setVerticalSpacing(0) + + self.value_edit = _CancellableLineEdit(self) + self.value_edit.setAlignment(QtCore.Qt.AlignRight) + grid_edit.addWidget(self.value_edit, 0, 1, 1, 2) + unit = QtWidgets.QLabel("MHz") + unit.setAlignment(QtCore.Qt.AlignCenter) + grid_edit.addWidget(unit, 0, 3, 1, 1) + self.data_stack.addWidget(grid_edit) + + grid.addWidget(self.data_stack, 2, 1) + + # BUTTONS + self.button_stack = QtWidgets.QStackedWidget() + + # page 1: SET button + set_grid = LayoutWidget() + + set_btn = QtWidgets.QToolButton() + set_btn.setText("Set") + set_btn.setToolTip("Set frequency") + set_grid.addWidget(set_btn, 0, 1, 1, 1) + + # for urukuls also allow switching off RF + if self.cpld: + off_btn = QtWidgets.QToolButton() + off_btn.setText("Off") + off_btn.setToolTip("Switch off the output") + set_grid.addWidget(off_btn, 0, 2, 1, 1) + + self.button_stack.addWidget(set_grid) + + # page 2: apply/cancel buttons + apply_grid = LayoutWidget() + apply = QtWidgets.QToolButton() + apply.setText("Apply") + apply.setToolTip("Apply changes") + apply_grid.addWidget(apply, 0, 1, 1, 1) + cancel = QtWidgets.QToolButton() + cancel.setText("Cancel") + cancel.setToolTip("Cancel changes") + apply_grid.addWidget(cancel, 0, 2, 1, 1) + self.button_stack.addWidget(apply_grid) + grid.addWidget(self.button_stack, 3, 1) + + grid.setRowStretch(1, 1) + grid.setRowStretch(2, 1) + grid.setRowStretch(3, 1) + + set_btn.clicked.connect(self.set_clicked) + apply.clicked.connect(self.apply_changes) + if self.cpld: + off_btn.clicked.connect(self.off_clicked) + self.value_edit.returnPressed.connect(lambda: self.apply_changes(None)) + self.value_edit.escapePressedConnect(self.cancel_changes) + cancel.clicked.connect(self.cancel_changes) + + self.refresh_display() + + def set_clicked(self, set): + self.data_stack.setCurrentIndex(1) + self.button_stack.setCurrentIndex(1) + self.value_edit.setText("{:.7f}" + .format(self.cur_frequency/1e6)) + self.value_edit.setFocus() + self.value_edit.selectAll() + + def off_clicked(self, set): + self.dm.dds_channel_toggle(self.dds_name, self.cpld, sw=False) + + def apply_changes(self, apply): + self.data_stack.setCurrentIndex(0) + self.button_stack.setCurrentIndex(0) + frequency = float(self.value_edit.text())*1e6 + self.dm.dds_set_frequency(self.dds_name, self.cpld, frequency) + + def cancel_changes(self, cancel): + self.data_stack.setCurrentIndex(0) + self.button_stack.setCurrentIndex(0) def refresh_display(self): - self.value.setText("{:.7f} MHz" + self.value_label.setText("{:.7f}" + .format(self.cur_frequency/1e6)) + self.value_edit.setText("{:.7f}" .format(self.cur_frequency/1e6)) def sort_key(self): @@ -213,6 +345,8 @@ def setup_from_ddb(ddb): comment = v.get("comment") if v["type"] == "local": if v["module"] == "artiq.coredevice.ttl": + if "ttl_urukul" in k: + continue channel = v["arguments"]["channel"] force_out = v["class"] == "TTLOut" widget = _WidgetDesc(k, comment, _TTLWidget, (channel, force_out, k)) @@ -222,8 +356,20 @@ def setup_from_ddb(ddb): bus_channel = v["arguments"]["bus_channel"] channel = v["arguments"]["channel"] dds_sysclk = v["arguments"]["sysclk"] - widget = _WidgetDesc(k, comment, _DDSWidget, (bus_channel, channel, k)) + widget = _WidgetDesc(k, comment, _DDSWidget, (k, bus_channel, channel)) description.add(widget) + elif (v["module"] == "artiq.coredevice.ad9910" + and v["class"] == "AD9910") or \ + (v["module"] == "artiq.coredevice.ad9912" + and v["class"] == "AD9912"): + channel = v["arguments"]["chip_select"] - 4 + if channel < 0: + continue + dds_cpld = v["arguments"]["cpld_device"] + spi_dev = ddb[dds_cpld]["arguments"]["spi_device"] + bus_channel = ddb[spi_dev]["arguments"]["channel"] + widget = _WidgetDesc(k, comment, _DDSWidget, (k, bus_channel, channel, dds_cpld)) + description.add(widget) elif ( (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") or (v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino")): spi_device = v["arguments"]["spi_device"] @@ -243,13 +389,15 @@ def setup_from_ddb(ddb): class _DeviceManager: - def __init__(self): + def __init__(self, schedule_ctl): self.mi_addr = None self.mi_port = None self.reconnect_mi = asyncio.Event() self.mi_connection = None self.mi_connector_task = asyncio.ensure_future(self.mi_connector()) + self.schedule_ctl = schedule_ctl + self.ddb = dict() self.description = set() self.widgets_by_uid = dict() @@ -344,6 +492,108 @@ class _DeviceManager: # override state may have changed widget.refresh_display() + async def _submit_by_content(self, content, class_name, title): + expid = { + "log_level": logging.WARNING, + "content": content, + "class_name": class_name, + "arguments": [] + } + scheduling = { + "pipeline_name": "main", + "priority": 0, + "due_date": None, + "flush": False + } + rid = await self.schedule_ctl.submit( + scheduling["pipeline_name"], + expid, + scheduling["priority"], scheduling["due_date"], + scheduling["flush"]) + logger.info("Submitted '%s', RID is %d", title, rid) + + def dds_set_frequency(self, dds_channel, dds_cpld, freq): + # create kernel and fill it in and send-by-content + if dds_cpld: + # urukuls need CPLD init and switch to on + # keep previous config if it was set already + cpld_dev = """self.setattr_device("core_cache") + self.setattr_device("{}")""".format(dds_cpld) + cpld_init = """cfg = self.core_cache.get("_{cpld}_cfg") + if len(cfg) > 0: + self.{cpld}.cfg_reg = cfg[0] + else: + self.{cpld}.init() + self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg]) + cfg = self.core_cache.get("_{cpld}_cfg") + """.format(cpld=dds_cpld) + cfg_sw = """self.{}.cfg_sw(True) + cfg[0] = self.{}.cfg_reg + """.format(dds_channel, dds_cpld) + else: + cpld_dev = "" + cpld_init = "" + cfg_sw = "" + dds_exp = textwrap.dedent(""" + from artiq.experiment import * + + class SetDDS(EnvExperiment): + def build(self): + self.setattr_device("core") + self.setattr_device("{dds_channel}") + {cpld_dev} + + @kernel + def run(self): + self.core.break_realtime() + delay(2*ms) + {cpld_init} + self.{dds_channel}.init() + self.{dds_channel}.set({freq}) + {cfg_sw} + """.format(dds_channel=dds_channel, freq=freq, + cpld_dev=cpld_dev, cpld_init=cpld_init, + cfg_sw=cfg_sw)) + asyncio.ensure_future( + self._submit_by_content( + dds_exp, + "SetDDS", + "Set DDS {} {}MHz".format(dds_channel, freq/1e6))) + + def dds_channel_toggle(self, dds_channel, dds_cpld, sw=True): + # urukul only + toggle_exp = textwrap.dedent(""" + from artiq.experiment import * + + class ToggleDDS(EnvExperiment): + def build(self): + self.setattr_device("core") + self.setattr_device("{ch}") + self.setattr_device("core_cache") + self.setattr_device("{cpld}") + + @kernel + def run(self): + self.core.break_realtime() + delay(2*ms) + cfg = self.core_cache.get("_{cpld}_cfg") + if len(cfg) > 0: + self.{cpld}.cfg_reg = cfg[0] + else: + self.{cpld}.init() + self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg]) + cfg = self.core_cache.get("_{cpld}_cfg") + self.{ch}.init() + self.{ch}.cfg_sw({sw}) + cfg[0] = self.{cpld}.cfg_reg + """.format(ch=dds_channel, cpld=dds_cpld, sw=sw)) + asyncio.ensure_future( + self._submit_by_content( + toggle_exp, + "ToggleDDS", + "Toggle DDS {} {}".format(dds_channel, "on" if sw else "off")) + ) + def setup_ttl_monitoring(self, enable, channel): if self.mi_connection is not None: self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value) @@ -410,6 +660,8 @@ class _DeviceManager: await asyncio.sleep(10.) self.reconnect_mi.set() else: + logger.info("ARTIQ dashboard connected to moninj proxy (%s)", + self.mi_addr) self.mi_connection = new_mi_connection for ttl_channel in self.ttl_widgets.keys(): self.setup_ttl_monitoring(True, ttl_channel) @@ -451,12 +703,12 @@ class _MonInjDock(QtWidgets.QDockWidget): class MonInj: - def __init__(self): + def __init__(self, schedule_ctl): self.ttl_dock = _MonInjDock("TTL") self.dds_dock = _MonInjDock("DDS") self.dac_dock = _MonInjDock("DAC") - self.dm = _DeviceManager() + self.dm = _DeviceManager(schedule_ctl) self.dm.ttl_cb = lambda: self.ttl_dock.layout_widgets( self.dm.ttl_widgets.values()) self.dm.dds_cb = lambda: self.dds_dock.layout_widgets( diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 2869cf744..c4ca665d4 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -6,6 +6,9 @@ use board_misoc::clock; use board_artiq::drtio_routing; use sched::Io; use sched::Mutex; +const ASYNC_ERROR_COLLISION: u8 = 1 << 0; +const ASYNC_ERROR_BUSY: u8 = 1 << 1; +const ASYNC_ERROR_SEQUENCE_ERROR: u8 = 1 << 2; #[cfg(has_drtio)] pub mod drtio { @@ -211,12 +214,18 @@ pub mod drtio { Ok(drtioaux::Packet::DestinationDownReply) => destination_set_up(routing_table, up_destinations, destination, false), Ok(drtioaux::Packet::DestinationOkReply) => (), - Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => - error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel), - Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => - error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel), - Ok(drtioaux::Packet::DestinationBusyReply { channel }) => - error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel), + Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => { + error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; + } + Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => { + error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; + } + Ok(drtioaux::Packet::DestinationBusyReply { channel }) => { + error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; + } Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) } @@ -339,15 +348,15 @@ fn async_error_thread(io: Io) { unsafe { io.until(|| csr::rtio_core::async_error_read() != 0).unwrap(); let errors = csr::rtio_core::async_error_read(); - if errors & 1 != 0 { + if errors & ASYNC_ERROR_COLLISION != 0 { error!("RTIO collision involving channel {}", csr::rtio_core::collision_channel_read()); } - if errors & 2 != 0 { + if errors & ASYNC_ERROR_BUSY != 0 { error!("RTIO busy error involving channel {}", csr::rtio_core::busy_channel_read()); } - if errors & 4 != 0 { + if errors & ASYNC_ERROR_SEQUENCE_ERROR != 0 { error!("RTIO sequence error involving channel {}", csr::rtio_core::sequence_error_channel_read()); } diff --git a/artiq/frontend/aqctl_corelog.py b/artiq/frontend/aqctl_corelog.py index ca0827585..8c766ecf5 100755 --- a/artiq/frontend/aqctl_corelog.py +++ b/artiq/frontend/aqctl_corelog.py @@ -10,9 +10,11 @@ from sipyco.pc_rpc import Server from sipyco import common_args from sipyco.logging_tools import log_with_name from sipyco.asyncio_tools import SignalHandler +from sipyco.keepalive import async_open_connection from artiq.coredevice.comm_mgmt import Request, Reply +logger = logging.getLogger(__name__) def get_argparser(): parser = argparse.ArgumentParser( @@ -38,38 +40,49 @@ async def get_logs_sim(host): async def get_logs(host): - reader, writer = await asyncio.open_connection(host, 1380) - writer.write(b"ARTIQ management\n") - endian = await reader.readexactly(1) - if endian == b"e": - endian = "<" - elif endian == b"E": - endian = ">" - else: - raise IOError("Incorrect reply from device: expected e/E.") - writer.write(struct.pack("B", Request.PullLog.value)) - await writer.drain() + try: + reader, writer = await async_open_connection( + host, + 1380, + after_idle=1, + interval=1, + max_fails=3, + ) + writer.write(b"ARTIQ management\n") + endian = await reader.readexactly(1) + if endian == b"e": + endian = "<" + elif endian == b"E": + endian = ">" + else: + raise IOError("Incorrect reply from device: expected e/E.") + writer.write(struct.pack("B", Request.PullLog.value)) + await writer.drain() - while True: - length, = struct.unpack(endian + "l", await reader.readexactly(4)) - log = await reader.readexactly(length) + while True: + length, = struct.unpack(endian + "l", await reader.readexactly(4)) + log = await reader.readexactly(length) - for line in log.decode("utf-8").splitlines(): - m = re.match(r"^\[.+?\] (TRACE|DEBUG| INFO| WARN|ERROR)\((.+?)\): (.+)$", line) - levelname = m.group(1) - if levelname == 'TRACE': - level = logging.TRACE - elif levelname == 'DEBUG': - level = logging.DEBUG - elif levelname == ' INFO': - level = logging.INFO - elif levelname == ' WARN': - level = logging.WARN - elif levelname == 'ERROR': - level = logging.ERROR - name = 'firmware.' + m.group(2).replace('::', '.') - text = m.group(3) - log_with_name(name, level, text) + for line in log.decode("utf-8").splitlines(): + m = re.match(r"^\[.+?\] (TRACE|DEBUG| INFO| WARN|ERROR)\((.+?)\): (.+)$", line) + levelname = m.group(1) + if levelname == 'TRACE': + level = logging.TRACE + elif levelname == 'DEBUG': + level = logging.DEBUG + elif levelname == ' INFO': + level = logging.INFO + elif levelname == ' WARN': + level = logging.WARN + elif levelname == 'ERROR': + level = logging.ERROR + name = 'firmware.' + m.group(2).replace('::', '.') + text = m.group(3) + log_with_name(name, level, text) + except asyncio.CancelledError: + raise + except: + logger.error("Logging connection terminating with exception", exc_info=True) def main(): @@ -89,18 +102,16 @@ def main(): loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port)) try: _, pending = loop.run_until_complete(asyncio.wait( - [signal_handler.wait_terminate(), server.wait_terminate()], + [signal_handler.wait_terminate(), + server.wait_terminate(), + get_logs_task], return_when=asyncio.FIRST_COMPLETED)) for task in pending: task.cancel() finally: loop.run_until_complete(server.stop()) finally: - get_logs_task.cancel() - try: - loop.run_until_complete(get_logs_task) - except asyncio.CancelledError: - pass + pass finally: signal_handler.teardown() finally: diff --git a/artiq/frontend/aqctl_moninj_proxy.py b/artiq/frontend/aqctl_moninj_proxy.py index 54d8d0833..8ce6581a9 100755 --- a/artiq/frontend/aqctl_moninj_proxy.py +++ b/artiq/frontend/aqctl_moninj_proxy.py @@ -214,7 +214,9 @@ def main(): loop.run_until_complete(server.start(bind_address, args.port_control)) try: _, pending = loop.run_until_complete(asyncio.wait( - [signal_handler.wait_terminate(), server.wait_terminate()], + [signal_handler.wait_terminate(), + server.wait_terminate(), + comm_moninj.wait_terminate()], return_when=asyncio.FIRST_COMPLETED)) for task in pending: task.cancel() diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index b1bfca59d..f620d8617 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -3,6 +3,7 @@ import argparse import asyncio import atexit +import importlib import os import logging import sys @@ -43,6 +44,9 @@ def get_argparser(): parser.add_argument( "--db-file", default=None, help="database file for local GUI settings") + parser.add_argument( + "-p", "--load-plugin", dest="plugin_modules", action="append", + help="Python module to load on startup") common_args.verbosity_args(parser) return parser @@ -95,6 +99,11 @@ def main(): args = get_argparser().parse_args() widget_log_handler = log.init_log(args, "dashboard") + # load any plugin modules first (to register argument_ui classes, etc.) + if args.plugin_modules: + for mod in args.plugin_modules: + importlib.import_module(mod) + if args.db_file is None: args.db_file = os.path.join(get_user_config_dir(), "artiq_dashboard_{server}_{port}.pyon".format( @@ -160,6 +169,7 @@ def main(): # create UI components expmgr = experiments.ExperimentManager(main_window, + sub_clients["datasets"], sub_clients["explist"], sub_clients["schedule"], rpc_clients["schedule"], @@ -179,12 +189,18 @@ def main(): rpc_clients["dataset_db"]) smgr.register(d_datasets) - d_applets = applets_ccb.AppletsCCBDock(main_window, sub_clients["datasets"]) + d_applets = applets_ccb.AppletsCCBDock(main_window, + sub_clients["datasets"], + extra_substitutes={ + "server": args.server, + "port_notify": args.port_notify, + "port_control": args.port_control, + }) atexit_register_coroutine(d_applets.stop) smgr.register(d_applets) broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify) - d_ttl_dds = moninj.MonInj() + d_ttl_dds = moninj.MonInj(rpc_clients["schedule"]) loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) atexit_register_coroutine(d_ttl_dds.stop) @@ -232,9 +248,9 @@ def main(): server_description = server_name + " ({})".format(args.server) else: server_description = args.server - logging.info("ARTIQ dashboard %s connected to %s", - artiq_version, server_description) - + logging.info("ARTIQ dashboard version: %s", + artiq_version) + logging.info("ARTIQ dashboard connected to moninj_proxy (%s)", server_description) # run main_window.show() loop.run_until_complete(main_window.exit_request.wait()) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 42a7a0a98..99c2fc9f1 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -53,12 +53,13 @@ def get_argparser(): "--experiment-subdir", default="", help=("path to the experiment folder from the repository root " "(default: '%(default)s')")) - log_args(parser) parser.add_argument("--name", help="friendly name, displayed in dashboards " "to identify master instead of server address") + parser.add_argument("--log-submissions", default=None, + help="set the filename to create the experiment subimission") return parser @@ -111,7 +112,7 @@ def main(): repo_backend, worker_handlers, args.experiment_subdir) atexit.register(experiment_db.close) - scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db) + scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions) scheduler.start() atexit_register_coroutine(scheduler.stop) diff --git a/artiq/gui/applets.py b/artiq/gui/applets.py index 940149631..b119d14bc 100644 --- a/artiq/gui/applets.py +++ b/artiq/gui/applets.py @@ -25,6 +25,7 @@ class AppletIPCServer(AsyncioParentComm): AsyncioParentComm.__init__(self) self.datasets_sub = datasets_sub self.datasets = set() + self.dataset_prefixes = [] def write_pyon(self, obj): self.write(pyon.encode(obj).encode() + b"\n") @@ -33,8 +34,16 @@ class AppletIPCServer(AsyncioParentComm): line = await self.readline() return pyon.decode(line.decode()) + def _is_dataset_subscribed(self, key): + if key in self.datasets: + return True + for prefix in self.dataset_prefixes: + if key.startswith(prefix): + return True + return False + def _synthesize_init(self, data): - struct = {k: v for k, v in data.items() if k in self.datasets} + struct = {k: v for k, v in data.items() if self._is_dataset_subscribed(k)} return {"action": "init", "struct": struct} @@ -43,10 +52,10 @@ class AppletIPCServer(AsyncioParentComm): mod = self._synthesize_init(mod["struct"]) else: if mod["path"]: - if mod["path"][0] not in self.datasets: + if not self._is_dataset_subscribed(mod["path"][0]): return elif mod["action"] in {"setitem", "delitem"}: - if mod["key"] not in self.datasets: + if not self._is_dataset_subscribed(mod["key"]): return self.write_pyon({"action": "mod", "mod": mod}) @@ -64,6 +73,7 @@ class AppletIPCServer(AsyncioParentComm): fix_initial_size_cb() elif action == "subscribe": self.datasets = obj["datasets"] + self.dataset_prefixes = obj["dataset_prefixes"] if self.datasets_sub.model is not None: mod = self._synthesize_init( self.datasets_sub.model.backing_store) @@ -93,7 +103,7 @@ class AppletIPCServer(AsyncioParentComm): class _AppletDock(QDockWidgetCloseDetect): - def __init__(self, datasets_sub, uid, name, spec): + def __init__(self, datasets_sub, uid, name, spec, extra_substitutes): QDockWidgetCloseDetect.__init__(self, "Applet: " + name) self.setObjectName("applet" + str(uid)) @@ -104,6 +114,7 @@ class _AppletDock(QDockWidgetCloseDetect): self.datasets_sub = datasets_sub self.applet_name = name self.spec = spec + self.extra_substitutes = extra_substitutes self.starting_stopping = False @@ -152,7 +163,8 @@ class _AppletDock(QDockWidgetCloseDetect): python = sys.executable.replace("\\", "\\\\") command = command_tpl.safe_substitute( python=python, - artiq_applet=python + " -m artiq.applets." + artiq_applet=python + " -m artiq.applets.", + **self.extra_substitutes ) logger.debug("starting command %s for %s", command, self.applet_name) await self.start_process(shlex.split(command), None) @@ -315,7 +327,11 @@ class _CompleterDelegate(QtWidgets.QStyledItemDelegate): class AppletsDock(QtWidgets.QDockWidget): - def __init__(self, main_window, datasets_sub): + def __init__(self, main_window, datasets_sub, extra_substitutes={}): + """ + :param extra_substitutes: Map of extra ``${strings}`` to substitute in applet + commands to their respective values. + """ QtWidgets.QDockWidget.__init__(self, "Applets") self.setObjectName("Applets") self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable | @@ -323,6 +339,7 @@ class AppletsDock(QtWidgets.QDockWidget): self.main_window = main_window self.datasets_sub = datasets_sub + self.extra_substitutes = extra_substitutes self.applet_uids = set() self.table = QtWidgets.QTreeWidget() @@ -421,7 +438,7 @@ class AppletsDock(QtWidgets.QDockWidget): self.table.itemChanged.connect(self.item_changed) def create(self, item, name, spec): - dock = _AppletDock(self.datasets_sub, item.applet_uid, name, spec) + dock = _AppletDock(self.datasets_sub, item.applet_uid, name, spec, self.extra_substitutes) self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock) dock.setFloating(True) asyncio.ensure_future(dock.start()) diff --git a/artiq/gui/entries.py b/artiq/gui/entries.py index b63be3902..d3d0ff8fb 100644 --- a/artiq/gui/entries.py +++ b/artiq/gui/entries.py @@ -427,9 +427,10 @@ class ScanEntry(LayoutWidget): "selected": "NoScan", "NoScan": {"value": 0.0, "repetitions": 1}, "RangeScan": {"start": 0.0, "stop": 100.0*scale, "npoints": 10, - "randomize": False}, + "randomize": False, "seed": None}, "CenterScan": {"center": 0.*scale, "span": 100.*scale, - "step": 10.*scale, "randomize": False}, + "step": 10.*scale, "randomize": False, + "seed": None}, "ExplicitScan": {"sequence": []} } if "default" in procdesc: diff --git a/artiq/language/units.py b/artiq/language/units.py index 13f6f1535..569fb964b 100644 --- a/artiq/language/units.py +++ b/artiq/language/units.py @@ -19,4 +19,4 @@ _register_unit("Hz", "m_kMG") _register_unit("dB", "_") _register_unit("V", "um_k") _register_unit("A", "um_") -_register_unit("W", "um_") +_register_unit("W", "num_") diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index 918c92dc8..098ddc334 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -30,7 +30,6 @@ class _RepoScanner: raise for class_name, class_desc in description.items(): name = class_desc["name"] - arginfo = class_desc["arginfo"] if "/" in name: logger.warning("Character '/' is not allowed in experiment " "name (%s)", name) @@ -47,7 +46,8 @@ class _RepoScanner: entry = { "file": filename, "class_name": class_name, - "arginfo": arginfo, + "arginfo": class_desc["arginfo"], + "argument_ui": class_desc["argument_ui"], "scheduler_defaults": class_desc["scheduler_defaults"] } entry_dict[name] = entry diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index d978fa2f9..d6d44acef 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -1,5 +1,6 @@ import asyncio import logging +import csv from enum import Enum from time import time @@ -113,7 +114,7 @@ class Run: class RunPool: - def __init__(self, ridc, worker_handlers, notifier, experiment_db): + def __init__(self, ridc, worker_handlers, notifier, experiment_db, log_submissions): self.runs = dict() self.state_changed = Condition() @@ -121,6 +122,13 @@ class RunPool: self.worker_handlers = worker_handlers self.notifier = notifier self.experiment_db = experiment_db + self.log_submissions = log_submissions + + def log_submission(self, rid, expid): + start_time = time() + with open(self.log_submissions, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerow([rid, start_time, expid["file"]]) def submit(self, expid, priority, due_date, flush, pipeline_name): # mutates expid to insert head repository revision if None. @@ -135,6 +143,8 @@ class RunPool: wd, repo_msg = None, None run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, self, repo_msg=repo_msg) + if self.log_submissions is not None: + self.log_submission(rid, expid) self.runs[rid] = run self.state_changed.notify() return rid @@ -311,8 +321,8 @@ class AnalyzeStage(TaskObject): class Pipeline: - def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db): - self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db) + def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db, log_submissions): + self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db, log_submissions) self._prepare = PrepareStage(self.pool, deleter.delete) self._run = RunStage(self.pool, deleter.delete) self._analyze = AnalyzeStage(self.pool, deleter.delete) @@ -383,7 +393,7 @@ class Deleter(TaskObject): class Scheduler: - def __init__(self, ridc, worker_handlers, experiment_db): + def __init__(self, ridc, worker_handlers, experiment_db, log_submissions): self.notifier = Notifier(dict()) self._pipelines = dict() @@ -393,6 +403,7 @@ class Scheduler: self._ridc = ridc self._deleter = Deleter(self._pipelines) + self._log_submissions = log_submissions def start(self): self._deleter.start() @@ -423,7 +434,7 @@ class Scheduler: logger.debug("creating pipeline '%s'", pipeline_name) pipeline = Pipeline(self._ridc, self._deleter, self._worker_handlers, self.notifier, - self._experiment_db) + self._experiment_db, self._log_submissions) self._pipelines[pipeline_name] = pipeline pipeline.start() return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 1267626ca..ca83dc7db 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -301,8 +301,14 @@ class Worker: await self._create_process(logging.WARNING) r = dict() - def register(class_name, name, arginfo, scheduler_defaults): - r[class_name] = {"name": name, "arginfo": arginfo, "scheduler_defaults": scheduler_defaults} + def register(class_name, name, arginfo, argument_ui, + scheduler_defaults): + r[class_name] = { + "name": name, + "arginfo": arginfo, + "argument_ui": argument_ui, + "scheduler_defaults": scheduler_defaults + } self.register_experiment = register await self._worker_action({"action": "examine", "file": file}, timeout) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 84c29a6d4..fdf876633 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -205,7 +205,10 @@ def examine(device_mgr, dataset_mgr, file): (k, (proc.describe(), group, tooltip)) for k, (proc, group, tooltip) in argument_mgr.requested_args.items() ) - register_experiment(class_name, name, arginfo, scheduler_defaults) + argument_ui = None + if hasattr(exp_class, "argument_ui"): + argument_ui = exp_class.argument_ui + register_experiment(class_name, name, arginfo, argument_ui, scheduler_defaults) finally: new_keys = set(sys.modules.keys()) for key in new_keys - previous_keys: diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index b4327e72e..854f17a93 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -90,7 +90,7 @@ class SchedulerCase(unittest.TestCase): def test_steps(self): loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None) + scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid) @@ -129,7 +129,7 @@ class SchedulerCase(unittest.TestCase): prepare.""" loop = self.loop handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None) + scheduler = Scheduler(_RIDCounter(0), handlers, None, None) handlers["scheduler_check_pause"] = scheduler.check_pause expid_empty = _get_expid("EmptyExperiment") @@ -293,7 +293,7 @@ class SchedulerCase(unittest.TestCase): handlers = { "update_dataset": check_termination } - scheduler = Scheduler(_RIDCounter(0), handlers, None) + scheduler = Scheduler(_RIDCounter(0), handlers, None, None) expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") @@ -351,7 +351,7 @@ class SchedulerCase(unittest.TestCase): """Check scheduler exits with experiments still running""" loop = self.loop - scheduler = Scheduler(_RIDCounter(0), {}, None) + scheduler = Scheduler(_RIDCounter(0), {}, None, None) expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. @@ -392,7 +392,7 @@ class SchedulerCase(unittest.TestCase): def test_flush(self): loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None) + scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid, 1, True)