forked from M-Labs/artiq
1
0
Fork 0

Merge branch 'master' into nac3

This commit is contained in:
Sebastien Bourdeauducq 2022-07-02 19:20:15 +08:00
commit 6483361e6a
18 changed files with 497 additions and 104 deletions

View File

@ -64,9 +64,10 @@ class AppletIPCClient(AsyncioChildComm):
exc_info=True) exc_info=True)
self.close_cb() self.close_cb()
def subscribe(self, datasets, init_cb, mod_cb): def subscribe(self, datasets, init_cb, mod_cb, dataset_prefixes=[]):
self.write_pyon({"action": "subscribe", self.write_pyon({"action": "subscribe",
"datasets": datasets}) "datasets": datasets,
"dataset_prefixes": dataset_prefixes})
self.init_cb = init_cb self.init_cb = init_cb
self.mod_cb = mod_cb self.mod_cb = mod_cb
asyncio.ensure_future(self.listen()) asyncio.ensure_future(self.listen())
@ -113,6 +114,9 @@ class SimpleApplet:
self.embed = os.getenv("ARTIQ_APPLET_EMBED") self.embed = os.getenv("ARTIQ_APPLET_EMBED")
self.datasets = {getattr(self.args, arg.replace("-", "_")) self.datasets = {getattr(self.args, arg.replace("-", "_"))
for arg in self.dataset_args} for arg in self.dataset_args}
# Optional prefixes (dataset sub-trees) to match subscriptions against;
# currently only used by out-of-tree subclasses (ndscan).
self.dataset_prefixes = []
def qasync_init(self): def qasync_init(self):
app = QtWidgets.QApplication([]) app = QtWidgets.QApplication([])
@ -162,6 +166,14 @@ class SimpleApplet:
self.data = data self.data = data
return data return data
def is_dataset_subscribed(self, key):
if key in self.datasets:
return True
for prefix in self.dataset_prefixes:
if key.startswith(prefix):
return True
return False
def filter_mod(self, mod): def filter_mod(self, mod):
if self.embed is not None: if self.embed is not None:
# the parent already filters for us # the parent already filters for us
@ -170,9 +182,9 @@ class SimpleApplet:
if mod["action"] == "init": if mod["action"] == "init":
return True return True
if mod["path"]: if mod["path"]:
return mod["path"][0] in self.datasets return self.is_dataset_subscribed(mod["path"][0])
elif mod["action"] in {"setitem", "delitem"}: elif mod["action"] in {"setitem", "delitem"}:
return mod["key"] in self.datasets return self.is_dataset_subscribed(mod["key"])
else: else:
return False return False
@ -204,7 +216,8 @@ class SimpleApplet:
self.loop.run_until_complete(self.subscriber.connect( self.loop.run_until_complete(self.subscriber.connect(
self.args.server, self.args.port)) self.args.server, self.args.port))
else: else:
self.ipc.subscribe(self.datasets, self.sub_init, self.sub_mod) self.ipc.subscribe(self.datasets, self.sub_init, self.sub_mod,
dataset_prefixes=self.dataset_prefixes)
def unsubscribe(self): def unsubscribe(self):
if self.embed is None: if self.embed is None:

View File

@ -526,6 +526,9 @@ class CommKernel:
self._skip_rpc_value(tags) self._skip_rpc_value(tags)
elif tag == "r": elif tag == "r":
self._skip_rpc_value(tags) self._skip_rpc_value(tags)
elif tag == "a":
_ndims = tags.pop(0)
self._skip_rpc_value(tags)
else: else:
pass pass

View File

@ -46,6 +46,9 @@ class CommMonInj:
del self._writer del self._writer
raise raise
def wait_terminate(self):
return self._receive_task
async def close(self): async def close(self):
self.disconnect_cb = None self.disconnect_cb = None
try: try:
@ -91,6 +94,10 @@ class CommMonInj:
self.injection_status_cb(channel, override, value) self.injection_status_cb(channel, override, value)
else: else:
raise ValueError("Unknown packet type", ty) raise ValueError("Unknown packet type", ty)
except asyncio.CancelledError:
raise
except:
logger.error("Moninj connection terminating with exception", exc_info=True)
finally: finally:
if self.disconnect_cb is not None: if self.disconnect_cb is not None:
self.disconnect_cb() self.disconnect_cb()

View File

@ -164,7 +164,7 @@ class _ArgumentEditor(QtWidgets.QTreeWidget):
async def _recompute_argument(self, name): async def _recompute_argument(self, name):
try: try:
expdesc = await self.manager.compute_expdesc(self.expurl) expdesc, _ = await self.manager.compute_expdesc(self.expurl)
except: except:
logger.error("Could not recompute argument '%s' of '%s'", logger.error("Could not recompute argument '%s' of '%s'",
name, self.expurl, exc_info=True) name, self.expurl, exc_info=True)
@ -216,6 +216,15 @@ class _ArgumentEditor(QtWidgets.QTreeWidget):
pass pass
self.verticalScrollBar().setValue(state["scroll"]) self.verticalScrollBar().setValue(state["scroll"])
# Hooks that allow user-supplied argument editors to react to imminent user
# actions. Here, we always keep the manager-stored submission arguments
# up-to-date, so no further action is required.
def about_to_submit(self):
pass
def about_to_close(self):
pass
log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
@ -241,7 +250,8 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
self.manager = manager self.manager = manager
self.expurl = expurl self.expurl = expurl
self.argeditor = _ArgumentEditor(self.manager, self, self.expurl) editor_class = self.manager.get_argument_editor_class(expurl)
self.argeditor = editor_class(self.manager, self, self.expurl)
self.layout.addWidget(self.argeditor, 0, 0, 1, 5) self.layout.addWidget(self.argeditor, 0, 0, 1, 5)
self.layout.setRowStretch(0, 1) self.layout.setRowStretch(0, 1)
@ -369,6 +379,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
self.hdf5_load_directory = os.path.expanduser("~") self.hdf5_load_directory = os.path.expanduser("~")
def submit_clicked(self): def submit_clicked(self):
self.argeditor.about_to_submit()
try: try:
self.manager.submit(self.expurl) self.manager.submit(self.expurl)
except: except:
@ -391,7 +402,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
async def _recompute_arguments_task(self, overrides=dict()): async def _recompute_arguments_task(self, overrides=dict()):
try: try:
expdesc = await self.manager.compute_expdesc(self.expurl) expdesc, ui_name = await self.manager.compute_expdesc(self.expurl)
except: except:
logger.error("Could not recompute experiment description of '%s'", logger.error("Could not recompute experiment description of '%s'",
self.expurl, exc_info=True) self.expurl, exc_info=True)
@ -404,12 +415,13 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
arginfo[k][0]["default"].insert(0, v) arginfo[k][0]["default"].insert(0, v)
else: else:
arginfo[k][0]["default"] = v arginfo[k][0]["default"] = v
self.manager.initialize_submission_arguments(self.expurl, arginfo) self.manager.initialize_submission_arguments(self.expurl, arginfo, ui_name)
argeditor_state = self.argeditor.save_state() argeditor_state = self.argeditor.save_state()
self.argeditor.deleteLater() self.argeditor.deleteLater()
self.argeditor = _ArgumentEditor(self.manager, self, self.expurl) editor_class = self.manager.get_argument_editor_class(self.expurl)
self.argeditor = editor_class(self.manager, self, self.expurl)
self.argeditor.restore_state(argeditor_state) self.argeditor.restore_state(argeditor_state)
self.layout.addWidget(self.argeditor, 0, 0, 1, 5) self.layout.addWidget(self.argeditor, 0, 0, 1, 5)
@ -422,7 +434,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
async def _recompute_sched_options_task(self): async def _recompute_sched_options_task(self):
try: try:
expdesc = await self.manager.compute_expdesc(self.expurl) expdesc, _ = await self.manager.compute_expdesc(self.expurl)
except: except:
logger.error("Could not recompute experiment description of '%s'", logger.error("Could not recompute experiment description of '%s'",
self.expurl, exc_info=True) self.expurl, exc_info=True)
@ -473,6 +485,7 @@ class _ExperimentDock(QtWidgets.QMdiSubWindow):
await self._recompute_arguments_task(arguments) await self._recompute_arguments_task(arguments)
def closeEvent(self, event): def closeEvent(self, event):
self.argeditor.about_to_close()
self.sigClosed.emit() self.sigClosed.emit()
QtWidgets.QMdiSubWindow.closeEvent(self, event) QtWidgets.QMdiSubWindow.closeEvent(self, event)
@ -544,7 +557,13 @@ class _QuickOpenDialog(QtWidgets.QDialog):
class ExperimentManager: class ExperimentManager:
def __init__(self, main_window, #: Global registry for custom argument editor classes, indexed by the experiment
#: `argument_ui` string; can be populated by dashboard plugins such as ndscan.
#: If no handler for a requested UI name is found, the default built-in argument
#: editor will be used.
argument_ui_classes = dict()
def __init__(self, main_window, dataset_sub,
explist_sub, schedule_sub, explist_sub, schedule_sub,
schedule_ctl, experiment_db_ctl): schedule_ctl, experiment_db_ctl):
self.main_window = main_window self.main_window = main_window
@ -555,7 +574,10 @@ class ExperimentManager:
self.submission_scheduling = dict() self.submission_scheduling = dict()
self.submission_options = dict() self.submission_options = dict()
self.submission_arguments = dict() self.submission_arguments = dict()
self.argument_ui_names = dict()
self.datasets = dict()
dataset_sub.add_setmodel_callback(self.set_dataset_model)
self.explist = dict() self.explist = dict()
explist_sub.add_setmodel_callback(self.set_explist_model) explist_sub.add_setmodel_callback(self.set_explist_model)
self.schedule = dict() self.schedule = dict()
@ -570,6 +592,9 @@ class ExperimentManager:
quick_open_shortcut.setContext(QtCore.Qt.ApplicationShortcut) quick_open_shortcut.setContext(QtCore.Qt.ApplicationShortcut)
quick_open_shortcut.activated.connect(self.show_quick_open) quick_open_shortcut.activated.connect(self.show_quick_open)
def set_dataset_model(self, model):
self.datasets = model
def set_explist_model(self, model): def set_explist_model(self, model):
self.explist = model.backing_store self.explist = model.backing_store
@ -586,6 +611,17 @@ class ExperimentManager:
else: else:
raise ValueError("Malformed experiment URL") raise ValueError("Malformed experiment URL")
def get_argument_editor_class(self, expurl):
ui_name = self.argument_ui_names.get(expurl, None)
if not ui_name and expurl[:5] == "repo:":
ui_name = self.explist.get(expurl[5:], {}).get("argument_ui", None)
if ui_name:
result = self.argument_ui_classes.get(ui_name, None)
if result:
return result
logger.warning("Ignoring unknown argument UI '%s'", ui_name)
return _ArgumentEditor
def get_submission_scheduling(self, expurl): def get_submission_scheduling(self, expurl):
if expurl in self.submission_scheduling: if expurl in self.submission_scheduling:
return self.submission_scheduling[expurl] return self.submission_scheduling[expurl]
@ -615,7 +651,7 @@ class ExperimentManager:
self.submission_options[expurl] = options self.submission_options[expurl] = options
return options return options
def initialize_submission_arguments(self, expurl, arginfo): def initialize_submission_arguments(self, expurl, arginfo, ui_name):
arguments = OrderedDict() arguments = OrderedDict()
for name, (procdesc, group, tooltip) in arginfo.items(): for name, (procdesc, group, tooltip) in arginfo.items():
state = procdesc_to_entry(procdesc).default_state(procdesc) state = procdesc_to_entry(procdesc).default_state(procdesc)
@ -626,6 +662,7 @@ class ExperimentManager:
"state": state, # mutated by entries "state": state, # mutated by entries
} }
self.submission_arguments[expurl] = arguments self.submission_arguments[expurl] = arguments
self.argument_ui_names[expurl] = ui_name
return arguments return arguments
def get_submission_arguments(self, expurl): def get_submission_arguments(self, expurl):
@ -635,9 +672,9 @@ class ExperimentManager:
if expurl[:5] != "repo:": if expurl[:5] != "repo:":
raise ValueError("Submission arguments must be preinitialized " raise ValueError("Submission arguments must be preinitialized "
"when not using repository") "when not using repository")
arginfo = self.explist[expurl[5:]]["arginfo"] class_desc = self.explist[expurl[5:]]
arguments = self.initialize_submission_arguments(expurl, arginfo) return self.initialize_submission_arguments(expurl,
return arguments class_desc["arginfo"], class_desc.get("argument_ui", None))
def open_experiment(self, expurl): def open_experiment(self, expurl):
if expurl in self.open_experiments: if expurl in self.open_experiments:
@ -739,13 +776,15 @@ class ExperimentManager:
revision = None revision = None
description = await self.experiment_db_ctl.examine( description = await self.experiment_db_ctl.examine(
file, use_repository, revision) file, use_repository, revision)
return description[class_name] class_desc = description[class_name]
return class_desc, class_desc.get("argument_ui", None)
async def open_file(self, file): async def open_file(self, file):
description = await self.experiment_db_ctl.examine(file, False) description = await self.experiment_db_ctl.examine(file, False)
for class_name, class_desc in description.items(): for class_name, class_desc in description.items():
expurl = "file:{}@{}".format(class_name, file) expurl = "file:{}@{}".format(class_name, file)
self.initialize_submission_arguments(expurl, class_desc["arginfo"]) self.initialize_submission_arguments(expurl, class_desc["arginfo"],
class_desc.get("argument_ui", None))
if expurl in self.open_experiments: if expurl in self.open_experiments:
self.open_experiments[expurl].close() self.open_experiments[expurl].close()
self.open_experiment(expurl) self.open_experiment(expurl)
@ -758,6 +797,7 @@ class ExperimentManager:
"options": self.submission_options, "options": self.submission_options,
"arguments": self.submission_arguments, "arguments": self.submission_arguments,
"docks": self.dock_states, "docks": self.dock_states,
"argument_uis": self.argument_ui_names,
"open_docks": set(self.open_experiments.keys()) "open_docks": set(self.open_experiments.keys())
} }
@ -768,6 +808,7 @@ class ExperimentManager:
self.submission_scheduling = state["scheduling"] self.submission_scheduling = state["scheduling"]
self.submission_options = state["options"] self.submission_options = state["options"]
self.submission_arguments = state["arguments"] self.submission_arguments = state["arguments"]
self.argument_ui_names = state.get("argument_uis", {})
for expurl in state["open_docks"]: for expurl in state["open_docks"]:
self.open_experiment(expurl) self.open_experiment(expurl)

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import textwrap
from collections import namedtuple from collections import namedtuple
from PyQt5 import QtCore, QtWidgets, QtGui from PyQt5 import QtCore, QtWidgets, QtGui
@ -13,6 +14,16 @@ from artiq.gui.flowlayout import FlowLayout
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _CancellableLineEdit(QtWidgets.QLineEdit):
def escapePressedConnect(self, cb):
self.esc_cb = cb
def keyPressEvent(self, event):
key = event.key()
if key == QtCore.Qt.Key_Escape:
self.esc_cb(event)
QtWidgets.QLineEdit.keyPressEvent(self, event)
class _TTLWidget(QtWidgets.QFrame): class _TTLWidget(QtWidgets.QFrame):
def __init__(self, dm, channel, force_out, title): def __init__(self, dm, channel, force_out, title):
@ -168,15 +179,136 @@ class _SimpleDisplayWidget(QtWidgets.QFrame):
raise NotImplementedError raise NotImplementedError
class _DDSWidget(_SimpleDisplayWidget): class _DDSWidget(QtWidgets.QFrame):
def __init__(self, dm, bus_channel, channel, title): def __init__(self, dm, title, bus_channel=0, channel=0, cpld=None):
self.dm = dm
self.bus_channel = bus_channel self.bus_channel = bus_channel
self.channel = channel self.channel = channel
self.dds_name = title
self.cpld = cpld
self.cur_frequency = 0 self.cur_frequency = 0
_SimpleDisplayWidget.__init__(self, title)
QtWidgets.QFrame.__init__(self)
self.setFrameShape(QtWidgets.QFrame.Box)
self.setFrameShadow(QtWidgets.QFrame.Raised)
grid = QtWidgets.QGridLayout()
grid.setContentsMargins(0, 0, 0, 0)
grid.setHorizontalSpacing(0)
grid.setVerticalSpacing(0)
self.setLayout(grid)
label = QtWidgets.QLabel(title)
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 1, 1)
# FREQ DATA/EDIT FIELD
self.data_stack = QtWidgets.QStackedWidget()
# page 1: display data
grid_disp = LayoutWidget()
grid_disp.layout.setContentsMargins(0, 0, 0, 0)
grid_disp.layout.setHorizontalSpacing(0)
grid_disp.layout.setVerticalSpacing(0)
self.value_label = QtWidgets.QLabel()
self.value_label.setAlignment(QtCore.Qt.AlignCenter)
grid_disp.addWidget(self.value_label, 0, 1, 1, 2)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_disp.addWidget(unit, 0, 3, 1, 1)
self.data_stack.addWidget(grid_disp)
# page 2: edit data
grid_edit = LayoutWidget()
grid_edit.layout.setContentsMargins(0, 0, 0, 0)
grid_edit.layout.setHorizontalSpacing(0)
grid_edit.layout.setVerticalSpacing(0)
self.value_edit = _CancellableLineEdit(self)
self.value_edit.setAlignment(QtCore.Qt.AlignRight)
grid_edit.addWidget(self.value_edit, 0, 1, 1, 2)
unit = QtWidgets.QLabel("MHz")
unit.setAlignment(QtCore.Qt.AlignCenter)
grid_edit.addWidget(unit, 0, 3, 1, 1)
self.data_stack.addWidget(grid_edit)
grid.addWidget(self.data_stack, 2, 1)
# BUTTONS
self.button_stack = QtWidgets.QStackedWidget()
# page 1: SET button
set_grid = LayoutWidget()
set_btn = QtWidgets.QToolButton()
set_btn.setText("Set")
set_btn.setToolTip("Set frequency")
set_grid.addWidget(set_btn, 0, 1, 1, 1)
# for urukuls also allow switching off RF
if self.cpld:
off_btn = QtWidgets.QToolButton()
off_btn.setText("Off")
off_btn.setToolTip("Switch off the output")
set_grid.addWidget(off_btn, 0, 2, 1, 1)
self.button_stack.addWidget(set_grid)
# page 2: apply/cancel buttons
apply_grid = LayoutWidget()
apply = QtWidgets.QToolButton()
apply.setText("Apply")
apply.setToolTip("Apply changes")
apply_grid.addWidget(apply, 0, 1, 1, 1)
cancel = QtWidgets.QToolButton()
cancel.setText("Cancel")
cancel.setToolTip("Cancel changes")
apply_grid.addWidget(cancel, 0, 2, 1, 1)
self.button_stack.addWidget(apply_grid)
grid.addWidget(self.button_stack, 3, 1)
grid.setRowStretch(1, 1)
grid.setRowStretch(2, 1)
grid.setRowStretch(3, 1)
set_btn.clicked.connect(self.set_clicked)
apply.clicked.connect(self.apply_changes)
if self.cpld:
off_btn.clicked.connect(self.off_clicked)
self.value_edit.returnPressed.connect(lambda: self.apply_changes(None))
self.value_edit.escapePressedConnect(self.cancel_changes)
cancel.clicked.connect(self.cancel_changes)
self.refresh_display()
def set_clicked(self, set):
self.data_stack.setCurrentIndex(1)
self.button_stack.setCurrentIndex(1)
self.value_edit.setText("{:.7f}"
.format(self.cur_frequency/1e6))
self.value_edit.setFocus()
self.value_edit.selectAll()
def off_clicked(self, set):
self.dm.dds_channel_toggle(self.dds_name, self.cpld, sw=False)
def apply_changes(self, apply):
self.data_stack.setCurrentIndex(0)
self.button_stack.setCurrentIndex(0)
frequency = float(self.value_edit.text())*1e6
self.dm.dds_set_frequency(self.dds_name, self.cpld, frequency)
def cancel_changes(self, cancel):
self.data_stack.setCurrentIndex(0)
self.button_stack.setCurrentIndex(0)
def refresh_display(self): def refresh_display(self):
self.value.setText("<font size=\"4\">{:.7f}</font><font size=\"2\"> MHz</font>" self.value_label.setText("<font size=\"4\">{:.7f}</font>"
.format(self.cur_frequency/1e6))
self.value_edit.setText("{:.7f}"
.format(self.cur_frequency/1e6)) .format(self.cur_frequency/1e6))
def sort_key(self): def sort_key(self):
@ -213,6 +345,8 @@ def setup_from_ddb(ddb):
comment = v.get("comment") comment = v.get("comment")
if v["type"] == "local": if v["type"] == "local":
if v["module"] == "artiq.coredevice.ttl": if v["module"] == "artiq.coredevice.ttl":
if "ttl_urukul" in k:
continue
channel = v["arguments"]["channel"] channel = v["arguments"]["channel"]
force_out = v["class"] == "TTLOut" force_out = v["class"] == "TTLOut"
widget = _WidgetDesc(k, comment, _TTLWidget, (channel, force_out, k)) widget = _WidgetDesc(k, comment, _TTLWidget, (channel, force_out, k))
@ -222,7 +356,19 @@ def setup_from_ddb(ddb):
bus_channel = v["arguments"]["bus_channel"] bus_channel = v["arguments"]["bus_channel"]
channel = v["arguments"]["channel"] channel = v["arguments"]["channel"]
dds_sysclk = v["arguments"]["sysclk"] dds_sysclk = v["arguments"]["sysclk"]
widget = _WidgetDesc(k, comment, _DDSWidget, (bus_channel, channel, k)) widget = _WidgetDesc(k, comment, _DDSWidget, (k, bus_channel, channel))
description.add(widget)
elif (v["module"] == "artiq.coredevice.ad9910"
and v["class"] == "AD9910") or \
(v["module"] == "artiq.coredevice.ad9912"
and v["class"] == "AD9912"):
channel = v["arguments"]["chip_select"] - 4
if channel < 0:
continue
dds_cpld = v["arguments"]["cpld_device"]
spi_dev = ddb[dds_cpld]["arguments"]["spi_device"]
bus_channel = ddb[spi_dev]["arguments"]["channel"]
widget = _WidgetDesc(k, comment, _DDSWidget, (k, bus_channel, channel, dds_cpld))
description.add(widget) description.add(widget)
elif ( (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") elif ( (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx")
or (v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino")): or (v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino")):
@ -243,13 +389,15 @@ def setup_from_ddb(ddb):
class _DeviceManager: class _DeviceManager:
def __init__(self): def __init__(self, schedule_ctl):
self.mi_addr = None self.mi_addr = None
self.mi_port = None self.mi_port = None
self.reconnect_mi = asyncio.Event() self.reconnect_mi = asyncio.Event()
self.mi_connection = None self.mi_connection = None
self.mi_connector_task = asyncio.ensure_future(self.mi_connector()) self.mi_connector_task = asyncio.ensure_future(self.mi_connector())
self.schedule_ctl = schedule_ctl
self.ddb = dict() self.ddb = dict()
self.description = set() self.description = set()
self.widgets_by_uid = dict() self.widgets_by_uid = dict()
@ -344,6 +492,108 @@ class _DeviceManager:
# override state may have changed # override state may have changed
widget.refresh_display() widget.refresh_display()
async def _submit_by_content(self, content, class_name, title):
expid = {
"log_level": logging.WARNING,
"content": content,
"class_name": class_name,
"arguments": []
}
scheduling = {
"pipeline_name": "main",
"priority": 0,
"due_date": None,
"flush": False
}
rid = await self.schedule_ctl.submit(
scheduling["pipeline_name"],
expid,
scheduling["priority"], scheduling["due_date"],
scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid)
def dds_set_frequency(self, dds_channel, dds_cpld, freq):
# create kernel and fill it in and send-by-content
if dds_cpld:
# urukuls need CPLD init and switch to on
# keep previous config if it was set already
cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(dds_cpld)
cpld_init = """cfg = self.core_cache.get("_{cpld}_cfg")
if len(cfg) > 0:
self.{cpld}.cfg_reg = cfg[0]
else:
self.{cpld}.init()
self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg])
cfg = self.core_cache.get("_{cpld}_cfg")
""".format(cpld=dds_cpld)
cfg_sw = """self.{}.cfg_sw(True)
cfg[0] = self.{}.cfg_reg
""".format(dds_channel, dds_cpld)
else:
cpld_dev = ""
cpld_init = ""
cfg_sw = ""
dds_exp = textwrap.dedent("""
from artiq.experiment import *
class SetDDS(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("{dds_channel}")
{cpld_dev}
@kernel
def run(self):
self.core.break_realtime()
delay(2*ms)
{cpld_init}
self.{dds_channel}.init()
self.{dds_channel}.set({freq})
{cfg_sw}
""".format(dds_channel=dds_channel, freq=freq,
cpld_dev=cpld_dev, cpld_init=cpld_init,
cfg_sw=cfg_sw))
asyncio.ensure_future(
self._submit_by_content(
dds_exp,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq/1e6)))
def dds_channel_toggle(self, dds_channel, dds_cpld, sw=True):
# urukul only
toggle_exp = textwrap.dedent("""
from artiq.experiment import *
class ToggleDDS(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("{ch}")
self.setattr_device("core_cache")
self.setattr_device("{cpld}")
@kernel
def run(self):
self.core.break_realtime()
delay(2*ms)
cfg = self.core_cache.get("_{cpld}_cfg")
if len(cfg) > 0:
self.{cpld}.cfg_reg = cfg[0]
else:
self.{cpld}.init()
self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg])
cfg = self.core_cache.get("_{cpld}_cfg")
self.{ch}.init()
self.{ch}.cfg_sw({sw})
cfg[0] = self.{cpld}.cfg_reg
""".format(ch=dds_channel, cpld=dds_cpld, sw=sw))
asyncio.ensure_future(
self._submit_by_content(
toggle_exp,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
)
def setup_ttl_monitoring(self, enable, channel): def setup_ttl_monitoring(self, enable, channel):
if self.mi_connection is not None: if self.mi_connection is not None:
self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value) self.mi_connection.monitor_probe(enable, channel, TTLProbe.level.value)
@ -410,6 +660,8 @@ class _DeviceManager:
await asyncio.sleep(10.) await asyncio.sleep(10.)
self.reconnect_mi.set() self.reconnect_mi.set()
else: else:
logger.info("ARTIQ dashboard connected to moninj proxy (%s)",
self.mi_addr)
self.mi_connection = new_mi_connection self.mi_connection = new_mi_connection
for ttl_channel in self.ttl_widgets.keys(): for ttl_channel in self.ttl_widgets.keys():
self.setup_ttl_monitoring(True, ttl_channel) self.setup_ttl_monitoring(True, ttl_channel)
@ -451,12 +703,12 @@ class _MonInjDock(QtWidgets.QDockWidget):
class MonInj: class MonInj:
def __init__(self): def __init__(self, schedule_ctl):
self.ttl_dock = _MonInjDock("TTL") self.ttl_dock = _MonInjDock("TTL")
self.dds_dock = _MonInjDock("DDS") self.dds_dock = _MonInjDock("DDS")
self.dac_dock = _MonInjDock("DAC") self.dac_dock = _MonInjDock("DAC")
self.dm = _DeviceManager() self.dm = _DeviceManager(schedule_ctl)
self.dm.ttl_cb = lambda: self.ttl_dock.layout_widgets( self.dm.ttl_cb = lambda: self.ttl_dock.layout_widgets(
self.dm.ttl_widgets.values()) self.dm.ttl_widgets.values())
self.dm.dds_cb = lambda: self.dds_dock.layout_widgets( self.dm.dds_cb = lambda: self.dds_dock.layout_widgets(

View File

@ -6,6 +6,9 @@ use board_misoc::clock;
use board_artiq::drtio_routing; use board_artiq::drtio_routing;
use sched::Io; use sched::Io;
use sched::Mutex; use sched::Mutex;
const ASYNC_ERROR_COLLISION: u8 = 1 << 0;
const ASYNC_ERROR_BUSY: u8 = 1 << 1;
const ASYNC_ERROR_SEQUENCE_ERROR: u8 = 1 << 2;
#[cfg(has_drtio)] #[cfg(has_drtio)]
pub mod drtio { pub mod drtio {
@ -211,12 +214,18 @@ pub mod drtio {
Ok(drtioaux::Packet::DestinationDownReply) => Ok(drtioaux::Packet::DestinationDownReply) =>
destination_set_up(routing_table, up_destinations, destination, false), destination_set_up(routing_table, up_destinations, destination, false),
Ok(drtioaux::Packet::DestinationOkReply) => (), Ok(drtioaux::Packet::DestinationOkReply) => (),
Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => {
error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel), error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}", destination, channel);
Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR };
error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel), }
Ok(drtioaux::Packet::DestinationBusyReply { channel }) => Ok(drtioaux::Packet::DestinationCollisionReply { channel }) => {
error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel), error!("[DEST#{}] RTIO collision involving channel 0x{:04x}", destination, channel);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION };
}
Ok(drtioaux::Packet::DestinationBusyReply { channel }) => {
error!("[DEST#{}] RTIO busy error involving channel 0x{:04x}", destination, channel);
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY };
}
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
} }
@ -339,15 +348,15 @@ fn async_error_thread(io: Io) {
unsafe { unsafe {
io.until(|| csr::rtio_core::async_error_read() != 0).unwrap(); io.until(|| csr::rtio_core::async_error_read() != 0).unwrap();
let errors = csr::rtio_core::async_error_read(); let errors = csr::rtio_core::async_error_read();
if errors & 1 != 0 { if errors & ASYNC_ERROR_COLLISION != 0 {
error!("RTIO collision involving channel {}", error!("RTIO collision involving channel {}",
csr::rtio_core::collision_channel_read()); csr::rtio_core::collision_channel_read());
} }
if errors & 2 != 0 { if errors & ASYNC_ERROR_BUSY != 0 {
error!("RTIO busy error involving channel {}", error!("RTIO busy error involving channel {}",
csr::rtio_core::busy_channel_read()); csr::rtio_core::busy_channel_read());
} }
if errors & 4 != 0 { if errors & ASYNC_ERROR_SEQUENCE_ERROR != 0 {
error!("RTIO sequence error involving channel {}", error!("RTIO sequence error involving channel {}",
csr::rtio_core::sequence_error_channel_read()); csr::rtio_core::sequence_error_channel_read());
} }

View File

@ -10,9 +10,11 @@ from sipyco.pc_rpc import Server
from sipyco import common_args from sipyco import common_args
from sipyco.logging_tools import log_with_name from sipyco.logging_tools import log_with_name
from sipyco.asyncio_tools import SignalHandler from sipyco.asyncio_tools import SignalHandler
from sipyco.keepalive import async_open_connection
from artiq.coredevice.comm_mgmt import Request, Reply from artiq.coredevice.comm_mgmt import Request, Reply
logger = logging.getLogger(__name__)
def get_argparser(): def get_argparser():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -38,7 +40,14 @@ async def get_logs_sim(host):
async def get_logs(host): async def get_logs(host):
reader, writer = await asyncio.open_connection(host, 1380) try:
reader, writer = await async_open_connection(
host,
1380,
after_idle=1,
interval=1,
max_fails=3,
)
writer.write(b"ARTIQ management\n") writer.write(b"ARTIQ management\n")
endian = await reader.readexactly(1) endian = await reader.readexactly(1)
if endian == b"e": if endian == b"e":
@ -70,6 +79,10 @@ async def get_logs(host):
name = 'firmware.' + m.group(2).replace('::', '.') name = 'firmware.' + m.group(2).replace('::', '.')
text = m.group(3) text = m.group(3)
log_with_name(name, level, text) log_with_name(name, level, text)
except asyncio.CancelledError:
raise
except:
logger.error("Logging connection terminating with exception", exc_info=True)
def main(): def main():
@ -89,17 +102,15 @@ def main():
loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port)) loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port))
try: try:
_, pending = loop.run_until_complete(asyncio.wait( _, pending = loop.run_until_complete(asyncio.wait(
[signal_handler.wait_terminate(), server.wait_terminate()], [signal_handler.wait_terminate(),
server.wait_terminate(),
get_logs_task],
return_when=asyncio.FIRST_COMPLETED)) return_when=asyncio.FIRST_COMPLETED))
for task in pending: for task in pending:
task.cancel() task.cancel()
finally: finally:
loop.run_until_complete(server.stop()) loop.run_until_complete(server.stop())
finally: finally:
get_logs_task.cancel()
try:
loop.run_until_complete(get_logs_task)
except asyncio.CancelledError:
pass pass
finally: finally:
signal_handler.teardown() signal_handler.teardown()

View File

@ -214,7 +214,9 @@ def main():
loop.run_until_complete(server.start(bind_address, args.port_control)) loop.run_until_complete(server.start(bind_address, args.port_control))
try: try:
_, pending = loop.run_until_complete(asyncio.wait( _, pending = loop.run_until_complete(asyncio.wait(
[signal_handler.wait_terminate(), server.wait_terminate()], [signal_handler.wait_terminate(),
server.wait_terminate(),
comm_moninj.wait_terminate()],
return_when=asyncio.FIRST_COMPLETED)) return_when=asyncio.FIRST_COMPLETED))
for task in pending: for task in pending:
task.cancel() task.cancel()

View File

@ -3,6 +3,7 @@
import argparse import argparse
import asyncio import asyncio
import atexit import atexit
import importlib
import os import os
import logging import logging
import sys import sys
@ -43,6 +44,9 @@ def get_argparser():
parser.add_argument( parser.add_argument(
"--db-file", default=None, "--db-file", default=None,
help="database file for local GUI settings") help="database file for local GUI settings")
parser.add_argument(
"-p", "--load-plugin", dest="plugin_modules", action="append",
help="Python module to load on startup")
common_args.verbosity_args(parser) common_args.verbosity_args(parser)
return parser return parser
@ -95,6 +99,11 @@ def main():
args = get_argparser().parse_args() args = get_argparser().parse_args()
widget_log_handler = log.init_log(args, "dashboard") widget_log_handler = log.init_log(args, "dashboard")
# load any plugin modules first (to register argument_ui classes, etc.)
if args.plugin_modules:
for mod in args.plugin_modules:
importlib.import_module(mod)
if args.db_file is None: if args.db_file is None:
args.db_file = os.path.join(get_user_config_dir(), args.db_file = os.path.join(get_user_config_dir(),
"artiq_dashboard_{server}_{port}.pyon".format( "artiq_dashboard_{server}_{port}.pyon".format(
@ -160,6 +169,7 @@ def main():
# create UI components # create UI components
expmgr = experiments.ExperimentManager(main_window, expmgr = experiments.ExperimentManager(main_window,
sub_clients["datasets"],
sub_clients["explist"], sub_clients["explist"],
sub_clients["schedule"], sub_clients["schedule"],
rpc_clients["schedule"], rpc_clients["schedule"],
@ -179,12 +189,18 @@ def main():
rpc_clients["dataset_db"]) rpc_clients["dataset_db"])
smgr.register(d_datasets) smgr.register(d_datasets)
d_applets = applets_ccb.AppletsCCBDock(main_window, sub_clients["datasets"]) d_applets = applets_ccb.AppletsCCBDock(main_window,
sub_clients["datasets"],
extra_substitutes={
"server": args.server,
"port_notify": args.port_notify,
"port_control": args.port_control,
})
atexit_register_coroutine(d_applets.stop) atexit_register_coroutine(d_applets.stop)
smgr.register(d_applets) smgr.register(d_applets)
broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify) broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify)
d_ttl_dds = moninj.MonInj() d_ttl_dds = moninj.MonInj(rpc_clients["schedule"])
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit_register_coroutine(d_ttl_dds.stop) atexit_register_coroutine(d_ttl_dds.stop)
@ -232,9 +248,9 @@ def main():
server_description = server_name + " ({})".format(args.server) server_description = server_name + " ({})".format(args.server)
else: else:
server_description = args.server server_description = args.server
logging.info("ARTIQ dashboard %s connected to %s", logging.info("ARTIQ dashboard version: %s",
artiq_version, server_description) artiq_version)
logging.info("ARTIQ dashboard connected to moninj_proxy (%s)", server_description)
# run # run
main_window.show() main_window.show()
loop.run_until_complete(main_window.exit_request.wait()) loop.run_until_complete(main_window.exit_request.wait())

View File

@ -53,12 +53,13 @@ def get_argparser():
"--experiment-subdir", default="", "--experiment-subdir", default="",
help=("path to the experiment folder from the repository root " help=("path to the experiment folder from the repository root "
"(default: '%(default)s')")) "(default: '%(default)s')"))
log_args(parser) log_args(parser)
parser.add_argument("--name", parser.add_argument("--name",
help="friendly name, displayed in dashboards " help="friendly name, displayed in dashboards "
"to identify master instead of server address") "to identify master instead of server address")
parser.add_argument("--log-submissions", default=None,
help="set the filename to create the experiment subimission")
return parser return parser
@ -111,7 +112,7 @@ def main():
repo_backend, worker_handlers, args.experiment_subdir) repo_backend, worker_handlers, args.experiment_subdir)
atexit.register(experiment_db.close) atexit.register(experiment_db.close)
scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db) scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions)
scheduler.start() scheduler.start()
atexit_register_coroutine(scheduler.stop) atexit_register_coroutine(scheduler.stop)

View File

@ -25,6 +25,7 @@ class AppletIPCServer(AsyncioParentComm):
AsyncioParentComm.__init__(self) AsyncioParentComm.__init__(self)
self.datasets_sub = datasets_sub self.datasets_sub = datasets_sub
self.datasets = set() self.datasets = set()
self.dataset_prefixes = []
def write_pyon(self, obj): def write_pyon(self, obj):
self.write(pyon.encode(obj).encode() + b"\n") self.write(pyon.encode(obj).encode() + b"\n")
@ -33,8 +34,16 @@ class AppletIPCServer(AsyncioParentComm):
line = await self.readline() line = await self.readline()
return pyon.decode(line.decode()) return pyon.decode(line.decode())
def _is_dataset_subscribed(self, key):
if key in self.datasets:
return True
for prefix in self.dataset_prefixes:
if key.startswith(prefix):
return True
return False
def _synthesize_init(self, data): def _synthesize_init(self, data):
struct = {k: v for k, v in data.items() if k in self.datasets} struct = {k: v for k, v in data.items() if self._is_dataset_subscribed(k)}
return {"action": "init", return {"action": "init",
"struct": struct} "struct": struct}
@ -43,10 +52,10 @@ class AppletIPCServer(AsyncioParentComm):
mod = self._synthesize_init(mod["struct"]) mod = self._synthesize_init(mod["struct"])
else: else:
if mod["path"]: if mod["path"]:
if mod["path"][0] not in self.datasets: if not self._is_dataset_subscribed(mod["path"][0]):
return return
elif mod["action"] in {"setitem", "delitem"}: elif mod["action"] in {"setitem", "delitem"}:
if mod["key"] not in self.datasets: if not self._is_dataset_subscribed(mod["key"]):
return return
self.write_pyon({"action": "mod", "mod": mod}) self.write_pyon({"action": "mod", "mod": mod})
@ -64,6 +73,7 @@ class AppletIPCServer(AsyncioParentComm):
fix_initial_size_cb() fix_initial_size_cb()
elif action == "subscribe": elif action == "subscribe":
self.datasets = obj["datasets"] self.datasets = obj["datasets"]
self.dataset_prefixes = obj["dataset_prefixes"]
if self.datasets_sub.model is not None: if self.datasets_sub.model is not None:
mod = self._synthesize_init( mod = self._synthesize_init(
self.datasets_sub.model.backing_store) self.datasets_sub.model.backing_store)
@ -93,7 +103,7 @@ class AppletIPCServer(AsyncioParentComm):
class _AppletDock(QDockWidgetCloseDetect): class _AppletDock(QDockWidgetCloseDetect):
def __init__(self, datasets_sub, uid, name, spec): def __init__(self, datasets_sub, uid, name, spec, extra_substitutes):
QDockWidgetCloseDetect.__init__(self, "Applet: " + name) QDockWidgetCloseDetect.__init__(self, "Applet: " + name)
self.setObjectName("applet" + str(uid)) self.setObjectName("applet" + str(uid))
@ -104,6 +114,7 @@ class _AppletDock(QDockWidgetCloseDetect):
self.datasets_sub = datasets_sub self.datasets_sub = datasets_sub
self.applet_name = name self.applet_name = name
self.spec = spec self.spec = spec
self.extra_substitutes = extra_substitutes
self.starting_stopping = False self.starting_stopping = False
@ -152,7 +163,8 @@ class _AppletDock(QDockWidgetCloseDetect):
python = sys.executable.replace("\\", "\\\\") python = sys.executable.replace("\\", "\\\\")
command = command_tpl.safe_substitute( command = command_tpl.safe_substitute(
python=python, python=python,
artiq_applet=python + " -m artiq.applets." artiq_applet=python + " -m artiq.applets.",
**self.extra_substitutes
) )
logger.debug("starting command %s for %s", command, self.applet_name) logger.debug("starting command %s for %s", command, self.applet_name)
await self.start_process(shlex.split(command), None) await self.start_process(shlex.split(command), None)
@ -315,7 +327,11 @@ class _CompleterDelegate(QtWidgets.QStyledItemDelegate):
class AppletsDock(QtWidgets.QDockWidget): class AppletsDock(QtWidgets.QDockWidget):
def __init__(self, main_window, datasets_sub): def __init__(self, main_window, datasets_sub, extra_substitutes={}):
"""
:param extra_substitutes: Map of extra ``${strings}`` to substitute in applet
commands to their respective values.
"""
QtWidgets.QDockWidget.__init__(self, "Applets") QtWidgets.QDockWidget.__init__(self, "Applets")
self.setObjectName("Applets") self.setObjectName("Applets")
self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable | self.setFeatures(QtWidgets.QDockWidget.DockWidgetMovable |
@ -323,6 +339,7 @@ class AppletsDock(QtWidgets.QDockWidget):
self.main_window = main_window self.main_window = main_window
self.datasets_sub = datasets_sub self.datasets_sub = datasets_sub
self.extra_substitutes = extra_substitutes
self.applet_uids = set() self.applet_uids = set()
self.table = QtWidgets.QTreeWidget() self.table = QtWidgets.QTreeWidget()
@ -421,7 +438,7 @@ class AppletsDock(QtWidgets.QDockWidget):
self.table.itemChanged.connect(self.item_changed) self.table.itemChanged.connect(self.item_changed)
def create(self, item, name, spec): def create(self, item, name, spec):
dock = _AppletDock(self.datasets_sub, item.applet_uid, name, spec) dock = _AppletDock(self.datasets_sub, item.applet_uid, name, spec, self.extra_substitutes)
self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock) self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock)
dock.setFloating(True) dock.setFloating(True)
asyncio.ensure_future(dock.start()) asyncio.ensure_future(dock.start())

View File

@ -427,9 +427,10 @@ class ScanEntry(LayoutWidget):
"selected": "NoScan", "selected": "NoScan",
"NoScan": {"value": 0.0, "repetitions": 1}, "NoScan": {"value": 0.0, "repetitions": 1},
"RangeScan": {"start": 0.0, "stop": 100.0*scale, "npoints": 10, "RangeScan": {"start": 0.0, "stop": 100.0*scale, "npoints": 10,
"randomize": False}, "randomize": False, "seed": None},
"CenterScan": {"center": 0.*scale, "span": 100.*scale, "CenterScan": {"center": 0.*scale, "span": 100.*scale,
"step": 10.*scale, "randomize": False}, "step": 10.*scale, "randomize": False,
"seed": None},
"ExplicitScan": {"sequence": []} "ExplicitScan": {"sequence": []}
} }
if "default" in procdesc: if "default" in procdesc:

View File

@ -19,4 +19,4 @@ _register_unit("Hz", "m_kMG")
_register_unit("dB", "_") _register_unit("dB", "_")
_register_unit("V", "um_k") _register_unit("V", "um_k")
_register_unit("A", "um_") _register_unit("A", "um_")
_register_unit("W", "um_") _register_unit("W", "num_")

View File

@ -30,7 +30,6 @@ class _RepoScanner:
raise raise
for class_name, class_desc in description.items(): for class_name, class_desc in description.items():
name = class_desc["name"] name = class_desc["name"]
arginfo = class_desc["arginfo"]
if "/" in name: if "/" in name:
logger.warning("Character '/' is not allowed in experiment " logger.warning("Character '/' is not allowed in experiment "
"name (%s)", name) "name (%s)", name)
@ -47,7 +46,8 @@ class _RepoScanner:
entry = { entry = {
"file": filename, "file": filename,
"class_name": class_name, "class_name": class_name,
"arginfo": arginfo, "arginfo": class_desc["arginfo"],
"argument_ui": class_desc["argument_ui"],
"scheduler_defaults": class_desc["scheduler_defaults"] "scheduler_defaults": class_desc["scheduler_defaults"]
} }
entry_dict[name] = entry entry_dict[name] = entry

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import csv
from enum import Enum from enum import Enum
from time import time from time import time
@ -113,7 +114,7 @@ class Run:
class RunPool: class RunPool:
def __init__(self, ridc, worker_handlers, notifier, experiment_db): def __init__(self, ridc, worker_handlers, notifier, experiment_db, log_submissions):
self.runs = dict() self.runs = dict()
self.state_changed = Condition() self.state_changed = Condition()
@ -121,6 +122,13 @@ class RunPool:
self.worker_handlers = worker_handlers self.worker_handlers = worker_handlers
self.notifier = notifier self.notifier = notifier
self.experiment_db = experiment_db self.experiment_db = experiment_db
self.log_submissions = log_submissions
def log_submission(self, rid, expid):
start_time = time()
with open(self.log_submissions, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([rid, start_time, expid["file"]])
def submit(self, expid, priority, due_date, flush, pipeline_name): def submit(self, expid, priority, due_date, flush, pipeline_name):
# mutates expid to insert head repository revision if None. # mutates expid to insert head repository revision if None.
@ -135,6 +143,8 @@ class RunPool:
wd, repo_msg = None, None wd, repo_msg = None, None
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self, repo_msg=repo_msg) self, repo_msg=repo_msg)
if self.log_submissions is not None:
self.log_submission(rid, expid)
self.runs[rid] = run self.runs[rid] = run
self.state_changed.notify() self.state_changed.notify()
return rid return rid
@ -311,8 +321,8 @@ class AnalyzeStage(TaskObject):
class Pipeline: class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db): def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db, log_submissions):
self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db) self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db, log_submissions)
self._prepare = PrepareStage(self.pool, deleter.delete) self._prepare = PrepareStage(self.pool, deleter.delete)
self._run = RunStage(self.pool, deleter.delete) self._run = RunStage(self.pool, deleter.delete)
self._analyze = AnalyzeStage(self.pool, deleter.delete) self._analyze = AnalyzeStage(self.pool, deleter.delete)
@ -383,7 +393,7 @@ class Deleter(TaskObject):
class Scheduler: class Scheduler:
def __init__(self, ridc, worker_handlers, experiment_db): def __init__(self, ridc, worker_handlers, experiment_db, log_submissions):
self.notifier = Notifier(dict()) self.notifier = Notifier(dict())
self._pipelines = dict() self._pipelines = dict()
@ -393,6 +403,7 @@ class Scheduler:
self._ridc = ridc self._ridc = ridc
self._deleter = Deleter(self._pipelines) self._deleter = Deleter(self._pipelines)
self._log_submissions = log_submissions
def start(self): def start(self):
self._deleter.start() self._deleter.start()
@ -423,7 +434,7 @@ class Scheduler:
logger.debug("creating pipeline '%s'", pipeline_name) logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter, pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier, self._worker_handlers, self.notifier,
self._experiment_db) self._experiment_db, self._log_submissions)
self._pipelines[pipeline_name] = pipeline self._pipelines[pipeline_name] = pipeline
pipeline.start() pipeline.start()
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)

View File

@ -301,8 +301,14 @@ class Worker:
await self._create_process(logging.WARNING) await self._create_process(logging.WARNING)
r = dict() r = dict()
def register(class_name, name, arginfo, scheduler_defaults): def register(class_name, name, arginfo, argument_ui,
r[class_name] = {"name": name, "arginfo": arginfo, "scheduler_defaults": scheduler_defaults} scheduler_defaults):
r[class_name] = {
"name": name,
"arginfo": arginfo,
"argument_ui": argument_ui,
"scheduler_defaults": scheduler_defaults
}
self.register_experiment = register self.register_experiment = register
await self._worker_action({"action": "examine", "file": file}, await self._worker_action({"action": "examine", "file": file},
timeout) timeout)

View File

@ -205,7 +205,10 @@ def examine(device_mgr, dataset_mgr, file):
(k, (proc.describe(), group, tooltip)) (k, (proc.describe(), group, tooltip))
for k, (proc, group, tooltip) in argument_mgr.requested_args.items() for k, (proc, group, tooltip) in argument_mgr.requested_args.items()
) )
register_experiment(class_name, name, arginfo, scheduler_defaults) argument_ui = None
if hasattr(exp_class, "argument_ui"):
argument_ui = exp_class.argument_ui
register_experiment(class_name, name, arginfo, argument_ui, scheduler_defaults)
finally: finally:
new_keys = set(sys.modules.keys()) new_keys = set(sys.modules.keys())
for key in new_keys - previous_keys: for key in new_keys - previous_keys:

View File

@ -90,7 +90,7 @@ class SchedulerCase(unittest.TestCase):
def test_steps(self): def test_steps(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None) scheduler = Scheduler(_RIDCounter(0), dict(), None, None)
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid) expect = _get_basic_steps(1, expid)
@ -129,7 +129,7 @@ class SchedulerCase(unittest.TestCase):
prepare.""" prepare."""
loop = self.loop loop = self.loop
handlers = {} handlers = {}
scheduler = Scheduler(_RIDCounter(0), handlers, None) scheduler = Scheduler(_RIDCounter(0), handlers, None, None)
handlers["scheduler_check_pause"] = scheduler.check_pause handlers["scheduler_check_pause"] = scheduler.check_pause
expid_empty = _get_expid("EmptyExperiment") expid_empty = _get_expid("EmptyExperiment")
@ -293,7 +293,7 @@ class SchedulerCase(unittest.TestCase):
handlers = { handlers = {
"update_dataset": check_termination "update_dataset": check_termination
} }
scheduler = Scheduler(_RIDCounter(0), handlers, None) scheduler = Scheduler(_RIDCounter(0), handlers, None, None)
expid_bg = _get_expid("BackgroundExperiment") expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
@ -351,7 +351,7 @@ class SchedulerCase(unittest.TestCase):
"""Check scheduler exits with experiments still running""" """Check scheduler exits with experiments still running"""
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), {}, None) scheduler = Scheduler(_RIDCounter(0), {}, None, None)
expid_bg = _get_expid("BackgroundExperiment") expid_bg = _get_expid("BackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed. # Suppress the SystemExit backtrace when worker process is killed.
@ -392,7 +392,7 @@ class SchedulerCase(unittest.TestCase):
def test_flush(self): def test_flush(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None) scheduler = Scheduler(_RIDCounter(0), dict(), None, None)
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid, 1, True) expect = _get_basic_steps(1, expid, 1, True)