From 97accd254042933a069000b892c31f7f0e312271 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 12 Oct 2015 17:18:23 +0800 Subject: [PATCH] merge parameters and results into datasets --- artiq/frontend/artiq_client.py | 50 +++--- artiq/frontend/artiq_gui.py | 43 +++-- artiq/frontend/artiq_influxdb.py | 28 +++- artiq/frontend/artiq_master.py | 40 +++-- artiq/frontend/artiq_run.py | 49 +++--- artiq/gui/console.py | 14 +- artiq/gui/{results.py => datasets.py} | 30 ++-- artiq/language/environment.py | 111 ++++--------- artiq/master/databases.py | 62 +++++++ artiq/master/ddb.py | 24 --- artiq/master/worker_db.py | 154 +++++++++--------- artiq/master/worker_impl.py | 42 +++-- examples/master/dataset_db.pyon | 1 + examples/master/{ddb.pyon => device_db.pyon} | 0 examples/master/pdb.pyon | 1 - .../repository/flopping_f_simulation.py | 26 +-- .../master/repository/photon_histogram.py | 11 +- examples/master/repository/speed_benchmark.py | 12 +- examples/sim/al_spectroscopy.py | 2 +- examples/sim/{pdb.pyon => dataset_db.pyon} | 0 examples/sim/{ddb.pyon => device_db.pyon} | 0 21 files changed, 349 insertions(+), 351 deletions(-) rename artiq/gui/{results.py => datasets.py} (83%) create mode 100644 artiq/master/databases.py delete mode 100644 artiq/master/ddb.py create mode 100644 examples/master/dataset_db.pyon rename examples/master/{ddb.pyon => device_db.pyon} (100%) delete mode 100644 examples/master/pdb.pyon rename examples/sim/{pdb.pyon => dataset_db.pyon} (100%) rename examples/sim/{ddb.pyon => device_db.pyon} (100%) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index c5e153302..b1bdd2bf3 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -63,24 +63,26 @@ def get_argparser(): parser_delete.add_argument("rid", type=int, help="run identifier (RID)") - parser_set_parameter = subparsers.add_parser( - "set-parameter", help="add or modify a parameter") - parser_set_parameter.add_argument("name", help="name of the parameter") - parser_set_parameter.add_argument("value", - help="value in PYON format") + parser_set_dataset = subparsers.add_parser( + "set-dataset", help="add or modify a dataset") + parser_set_dataset.add_argument("name", help="name of the dataset") + parser_set_dataset.add_argument("value", + help="value in PYON format") + parser_set_dataset.add_argument("-p", "--persist", action="store_true", + help="make the dataset persistent") - parser_del_parameter = subparsers.add_parser( - "del-parameter", help="delete a parameter") - parser_del_parameter.add_argument("name", help="name of the parameter") + parser_del_dataset = subparsers.add_parser( + "del-dataset", help="delete a dataset") + parser_del_dataset.add_argument("name", help="name of the dataset") parser_show = subparsers.add_parser( - "show", help="show schedule, log, devices or parameters") + "show", help="show schedule, log, devices or datasets") parser_show.add_argument( "what", - help="select object to show: schedule/log/devices/parameters") + help="select object to show: schedule/log/devices/datasets") subparsers.add_parser( - "scan-ddb", help="trigger a device database (re)scan") + "scan-devices", help="trigger a device database (re)scan") parser_scan_repos = subparsers.add_parser( "scan-repository", help="trigger a repository (re)scan") @@ -129,15 +131,15 @@ def _action_delete(remote, args): remote.delete(args.rid) -def _action_set_parameter(remote, args): - remote.set(args.name, pyon.decode(args.value)) +def _action_set_dataset(remote, args): + remote.set(args.name, pyon.decode(args.value), args.persist) -def _action_del_parameter(remote, args): +def _action_del_dataset(remote, args): remote.delete(args.name) -def _action_scan_ddb(remote, args): +def _action_scan_devices(remote, args): remote.scan() @@ -186,11 +188,11 @@ def _show_devices(devices): print(table) -def _show_parameters(parameters): +def _show_datasets(datasets): clear_screen() - table = PrettyTable(["Parameter", "Value"]) - for k, v in sorted(parameters.items(), key=itemgetter(0)): - table.add_row([k, str(v)]) + table = PrettyTable(["Dataset", "Persistent", "Value"]) + for k, (persist, value) in sorted(datasets.items(), key=itemgetter(0)): + table.add_row([k, "Y" if persist else "N", str(value)]) print(table) @@ -259,8 +261,8 @@ def main(): _show_log(args) elif args.what == "devices": _show_dict(args, "devices", _show_devices) - elif args.what == "parameters": - _show_dict(args, "parameters", _show_parameters) + elif args.what == "datasets": + _show_dict(args, "datasets", _show_datasets) else: print("Unknown object to show, use -h to list valid names.") sys.exit(1) @@ -269,9 +271,9 @@ def main(): target_name = { "submit": "master_schedule", "delete": "master_schedule", - "set_parameter": "master_pdb", - "del_parameter": "master_pdb", - "scan_ddb": "master_ddb", + "set_dataset": "master_dataset_db", + "del_dataset": "master_dataset_db", + "scan_devices": "master_device_db", "scan_repository": "master_repository" }[action] remote = Client(args.server, port, target_name) diff --git a/artiq/frontend/artiq_gui.py b/artiq/frontend/artiq_gui.py index f6f9db57c..10db8eb41 100755 --- a/artiq/frontend/artiq_gui.py +++ b/artiq/frontend/artiq_gui.py @@ -15,8 +15,7 @@ from artiq.protocols.pc_rpc import AsyncioClient from artiq.gui.state import StateManager from artiq.gui.explorer import ExplorerDock from artiq.gui.moninj import MonInj -from artiq.gui.results import ResultsDock -from artiq.gui.parameters import ParametersDock +from artiq.gui.datasets import DatasetsDock from artiq.gui.schedule import ScheduleDock from artiq.gui.log import LogDock from artiq.gui.console import ConsoleDock @@ -92,30 +91,24 @@ def main(): args.server, args.port_notify)) atexit.register(lambda: loop.run_until_complete(d_explorer.sub_close())) - d_results = ResultsDock(win, area) - smgr.register(d_results) - loop.run_until_complete(d_results.sub_connect( + d_datasets = DatasetsDock(win, area) + smgr.register(d_datasets) + loop.run_until_complete(d_datasets.sub_connect( args.server, args.port_notify)) - atexit.register(lambda: loop.run_until_complete(d_results.sub_close())) + atexit.register(lambda: loop.run_until_complete(d_datasets.sub_close())) if os.name != "nt": d_ttl_dds = MonInj() loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) atexit.register(lambda: loop.run_until_complete(d_ttl_dds.stop())) - d_params = ParametersDock() - loop.run_until_complete(d_params.sub_connect( - args.server, args.port_notify)) - atexit.register(lambda: loop.run_until_complete(d_params.sub_close())) - if os.name != "nt": area.addDock(d_ttl_dds.dds_dock, "top") area.addDock(d_ttl_dds.ttl_dock, "above", d_ttl_dds.dds_dock) - area.addDock(d_results, "above", d_ttl_dds.ttl_dock) + area.addDock(d_datasets, "above", d_ttl_dds.ttl_dock) else: - area.addDock(d_results, "top") - area.addDock(d_params, "above", d_results) - area.addDock(d_explorer, "above", d_params) + area.addDock(d_datasets, "top") + area.addDock(d_explorer, "above", d_datasets) d_schedule = ScheduleDock(status_bar, schedule_ctl) loop.run_until_complete(d_schedule.sub_connect( @@ -127,16 +120,18 @@ def main(): args.server, args.port_notify)) atexit.register(lambda: loop.run_until_complete(d_log.sub_close())) - pdb = AsyncioClient() - loop.run_until_complete(pdb.connect_rpc( - args.server, args.port_control, "master_pdb")) - atexit.register(lambda: pdb.close_rpc()) - def _get_parameter(k, v): - asyncio.ensure_future(pdb.set(k, v)) + dataset_db = AsyncioClient() + loop.run_until_complete(dataset_db.connect_rpc( + args.server, args.port_control, "master_dataset_db")) + atexit.register(lambda: dataset_db.close_rpc()) + def _set_dataset(k, v): + asyncio.ensure_future(dataset_db.set(k, v)) + def _del_dataset(k): + asyncio.ensure_future(dataset_db.delete(k)) d_console = ConsoleDock( - d_params.get_parameter, - _get_parameter, - d_results.get_result) + d_datasets.get_dataset, + _set_dataset, + _del_dataset) area.addDock(d_console, "bottom") area.addDock(d_log, "above", d_console) diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py index f69da9b0e..36e7be2c3 100755 --- a/artiq/frontend/artiq_influxdb.py +++ b/artiq/frontend/artiq_influxdb.py @@ -93,7 +93,7 @@ class DBWriter(TaskObject): try: self._queue.put_nowait((k, v)) except asyncio.QueueFull: - logger.warning("failed to update parameter '%s': " + logger.warning("failed to update dataset '%s': " "too many pending updates", k) async def _do(self): @@ -103,7 +103,7 @@ class DBWriter(TaskObject): params = {"u": self.user, "p": self.password, "db": self.database, "consistency": "any", "precision": "n"} fmt_ty, fmt_v = format_influxdb(v) - data = "{},parameter={} {}={}".format(self.table, k, fmt_ty, fmt_v) + data = "{},dataset={} {}={}".format(self.table, k, fmt_ty, fmt_v) try: response = await aiohttp.request( "POST", url, params=params, data=data) @@ -121,15 +121,31 @@ class DBWriter(TaskObject): response.close() -class Parameters: +class _Mock: + def __setitem__(self, k, v): + pass + + def __getitem__(self, k): + return self + + def __delitem__(self, k): + pass + + +class Datasets: def __init__(self, filter_function, writer, init): self.filter_function = filter_function self.writer = writer def __setitem__(self, k, v): if self.filter_function(k): - self.writer.update(k, v) + self.writer.update(k, v[1]) + # ignore mutations + def __getitem__(self, k): + return _Mock() + + # ignore deletions def __delitem__(self, k): pass @@ -145,8 +161,8 @@ class MasterReader(TaskObject): async def _do(self): subscriber = Subscriber( - "parameters", - partial(Parameters, self.filter_function, self.writer)) + "datasets", + partial(Datasets, self.filter_function, self.writer)) while True: try: await subscriber.connect(self.server, self.port) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index e0f3b740d..264658ef9 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -6,9 +6,8 @@ import atexit import os from artiq.protocols.pc_rpc import Server -from artiq.protocols.sync_struct import Notifier, Publisher, process_mod -from artiq.master.ddb import DDB -from artiq.protocols.file_db import FlatFileDB +from artiq.protocols.sync_struct import Notifier, Publisher +from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler from artiq.master.worker_db import get_last_rid from artiq.master.repository import FilesystemBackend, GitBackend, Repository @@ -28,10 +27,10 @@ def get_argparser(): "--port-control", default=3251, type=int, help="TCP port to listen to for control (default: %(default)d)") group = parser.add_argument_group("databases") - group.add_argument("-d", "--ddb", default="ddb.pyon", - help="device database file") - group.add_argument("-p", "--pdb", default="pdb.pyon", - help="parameter database file") + group.add_argument("--device-db", default="device_db.pyon", + help="device database file (default: '%(default)s')") + group.add_argument("--dataset-db", default="dataset_db.pyon", + help="dataset file (default: '%(default)s')") group = parser.add_argument_group("repository") group.add_argument( "-g", "--git", default=False, action="store_true", @@ -65,25 +64,25 @@ def main(): loop = asyncio.get_event_loop() atexit.register(lambda: loop.close()) - ddb = DDB(args.ddb) - pdb = FlatFileDB(args.pdb) - rtr = Notifier(dict()) + device_db = DeviceDB(args.device_db) + dataset_db = DatasetDB(args.dataset_db) + dataset_db.start() + atexit.register(lambda: loop.run_until_complete(dataset_db.stop())) log = Log(1000) if args.git: repo_backend = GitBackend(args.repository) else: repo_backend = FilesystemBackend(args.repository) - repository = Repository(repo_backend, ddb.get_ddb, log.log) + repository = Repository(repo_backend, device_db.get_ddb, log.log) atexit.register(repository.close) repository.scan_async() worker_handlers = { - "get_ddb": ddb.get_ddb, - "get_device": ddb.get, - "get_parameter": pdb.get, - "set_parameter": pdb.set, - "update_rt_results": lambda mod: process_mod(rtr, mod), + "get_ddb": device_db.get_ddb, + "get_device": device_db.get, + "get_dataset": dataset_db.get, + "update_dataset": dataset_db.update, "log": log.log } scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) @@ -92,8 +91,8 @@ def main(): atexit.register(lambda: loop.run_until_complete(scheduler.stop())) server_control = Server({ - "master_ddb": ddb, - "master_pdb": pdb, + "master_device_db": device_db, + "master_dataset_db": dataset_db, "master_schedule": scheduler, "master_repository": repository }) @@ -103,9 +102,8 @@ def main(): server_notify = Publisher({ "schedule": scheduler.notifier, - "devices": ddb.data, - "parameters": pdb.data, - "rt_results": rtr, + "devices": device_db.data, + "datasets": dataset_db.data, "explist": repository.explist, "log": log.data }) diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index 3474f4740..b4477077d 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -4,17 +4,14 @@ import argparse import sys -import time from operator import itemgetter -from itertools import chain import logging import h5py from artiq.language.environment import EnvExperiment -from artiq.protocols.file_db import FlatFileDB -from artiq.master.ddb import DDB -from artiq.master.worker_db import DeviceManager, ResultDB +from artiq.master.databases import DeviceDB, DatasetDB +from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.tools import * @@ -33,11 +30,6 @@ class ELFRunner(EnvExperiment): self.core.comm.serve(dict(), dict()) -class SimpleParamLogger: - def set(self, timestamp, name, value): - logger.info("Parameter change: {} = {}".format(name, value)) - - class DummyScheduler: def __init__(self): self.next_rid = 0 @@ -63,10 +55,10 @@ def get_argparser(with_file=True): description="Local experiment running tool") verbosity_args(parser) - parser.add_argument("-d", "--ddb", default="ddb.pyon", - help="device database file") - parser.add_argument("-p", "--pdb", default="pdb.pyon", - help="parameter database file") + parser.add_argument("--device-db", default="device_db.pyon", + help="device database file (default: '%(default)s')") + parser.add_argument("--dataset-db", default="dataset_db.pyon", + help="dataset file (default: '%(default)s')") parser.add_argument("-e", "--experiment", default=None, help="experiment to run") @@ -82,7 +74,7 @@ def get_argparser(with_file=True): return parser -def _build_experiment(dmgr, pdb, rdb, args): +def _build_experiment(device_mgr, dataset_mgr, args): if hasattr(args, "file"): if args.file.endswith(".elf"): if args.arguments: @@ -90,7 +82,7 @@ def _build_experiment(dmgr, pdb, rdb, args): if args.experiment: raise ValueError("experiment-by-name not supported " "for ELF kernels") - return ELFRunner(dmgr, pdb, rdb, file=args.file) + return ELFRunner(device_mgr, dataset_mgr, file=args.file) else: module = file_import(args.file) file = args.file @@ -104,35 +96,34 @@ def _build_experiment(dmgr, pdb, rdb, args): "experiment": args.experiment, "arguments": arguments } - dmgr.virtual_devices["scheduler"].expid = expid - return exp(dmgr, pdb, rdb, **arguments) + device_mgr.virtual_devices["scheduler"].expid = expid + return exp(device_mgr, dataset_mgr, **arguments) def run(with_file=False): args = get_argparser(with_file).parse_args() init_logger(args) - dmgr = DeviceManager(DDB(args.ddb), - virtual_devices={"scheduler": DummyScheduler()}) - pdb = FlatFileDB(args.pdb) - pdb.hooks.append(SimpleParamLogger()) - rdb = ResultDB() + device_mgr = DeviceManager(DeviceDB(args.device_db), + virtual_devices={"scheduler": DummyScheduler()}) + dataset_db = DatasetDB(args.dataset_db) + dataset_mgr = DatasetManager(dataset_db) try: - exp_inst = _build_experiment(dmgr, pdb, rdb, args) + exp_inst = _build_experiment(device_mgr, dataset_mgr, args) exp_inst.prepare() exp_inst.run() exp_inst.analyze() finally: - dmgr.close_devices() + device_mgr.close_devices() if args.hdf5 is not None: with h5py.File(args.hdf5, "w") as f: - rdb.write_hdf5(f) - elif rdb.rt.read or rdb.nrt: - r = chain(rdb.rt.read.items(), rdb.nrt.items()) - for k, v in sorted(r, key=itemgetter(0)): + dataset_mgr.write_hdf5(f) + else: + for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)): print("{}: {}".format(k, v)) + dataset_db.save() def main(): diff --git a/artiq/gui/console.py b/artiq/gui/console.py index 38015ef29..b7b98e573 100644 --- a/artiq/gui/console.py +++ b/artiq/gui/console.py @@ -5,19 +5,19 @@ _help = """ This is an interactive Python console. The following functions are available: - get_parameter(key) - set_parameter(key, value) [asynchronous update] - get_result(key) [real-time results only] + get_dataset(key) + set_dataset(key, value, persist=False) [asynchronous update] + del_dataset(key) [asynchronous update] """ class ConsoleDock(dockarea.Dock): - def __init__(self, get_parameter, set_parameter, get_result): + def __init__(self, get_dataset, set_dataset, del_dataset): dockarea.Dock.__init__(self, "Console", size=(1000, 300)) ns = { - "get_parameter": get_parameter, - "set_parameter": set_parameter, - "get_result": get_result + "get_dataset": get_dataset, + "set_dataset": set_dataset, + "del_dataset": del_dataset } c = console.ConsoleWidget(namespace=ns, text=_help) self.addWidget(c) diff --git a/artiq/gui/results.py b/artiq/gui/datasets.py similarity index 83% rename from artiq/gui/results.py rename to artiq/gui/datasets.py index edd40e2ad..8d50f761d 100644 --- a/artiq/gui/results.py +++ b/artiq/gui/datasets.py @@ -15,9 +15,9 @@ from artiq.gui.displays import * logger = logging.getLogger(__name__) -class ResultsModel(DictSyncModel): +class DatasetsModel(DictSyncModel): def __init__(self, parent, init): - DictSyncModel.__init__(self, ["Result", "Value"], + DictSyncModel.__init__(self, ["Dataset", "Persistent", "Value"], parent, init) def sort_key(self, k, v): @@ -27,7 +27,9 @@ class ResultsModel(DictSyncModel): if column == 0: return k elif column == 1: - return short_format(v) + return "Y" if v[0] else "N" + elif column == 2: + return short_format(v[1]) else: raise ValueError @@ -38,9 +40,9 @@ def _get_display_type_name(display_cls): return name -class ResultsDock(dockarea.Dock): +class DatasetsDock(dockarea.Dock): def __init__(self, dialog_parent, dock_area): - dockarea.Dock.__init__(self, "Results", size=(1500, 500)) + dockarea.Dock.__init__(self, "Datasets", size=(1500, 500)) self.dialog_parent = dialog_parent self.dock_area = dock_area @@ -65,22 +67,26 @@ class ResultsDock(dockarea.Dock): self.displays = dict() - def get_result(self, key): - return self.table_model.backing_store[key] + def get_dataset(self, key): + return self.table_model.backing_store[key][1] async def sub_connect(self, host, port): - self.subscriber = Subscriber("rt_results", self.init_results_model, + self.subscriber = Subscriber("datasets", self.init_datasets_model, self.on_mod) await self.subscriber.connect(host, port) async def sub_close(self): await self.subscriber.close() - def init_results_model(self, init): - self.table_model = ResultsModel(self.table, init) + def init_datasets_model(self, init): + self.table_model = DatasetsModel(self.table, init) self.table.setModel(self.table_model) return self.table_model + def update_display_data(self, dsp): + dsp.update_data({k: self.table_model.backing_store[k][1] + for k in dsp.data_sources()}) + def on_mod(self, mod): if mod["action"] == "init": for display in self.displays.values(): @@ -96,7 +102,7 @@ class ResultsDock(dockarea.Dock): for display in self.displays.values(): if source in display.data_sources(): - display.update_data(self.table_model.backing_store) + self.update_display_data(display) def create_dialog(self, ty): dlg_class = display_types[ty][0] @@ -111,7 +117,7 @@ class ResultsDock(dockarea.Dock): dsp_class = display_types[ty][1] dsp = dsp_class(name, settings) self.displays[name] = dsp - dsp.update_data(self.table_model.backing_store) + self.update_display_data(dsp) def on_close(): del self.displays[name] diff --git a/artiq/language/environment.py b/artiq/language/environment.py index b2b1c2a1d..4801720f7 100644 --- a/artiq/language/environment.py +++ b/artiq/language/environment.py @@ -113,15 +113,13 @@ class StringValue(_SimpleArgProcessor): class HasEnvironment: """Provides methods to manage the environment of an experiment (devices, parameters, results, arguments).""" - def __init__(self, dmgr=None, pdb=None, rdb=None, *, parent=None, - param_override=dict(), default_arg_none=False, **kwargs): + def __init__(self, device_mgr=None, dataset_mgr=None, *, parent=None, + default_arg_none=False, **kwargs): self.requested_args = OrderedDict() - self.__dmgr = dmgr - self.__pdb = pdb - self.__rdb = rdb + self.__device_mgr = device_mgr + self.__dataset_mgr = dataset_mgr self.__parent = parent - self.__param_override = param_override self.__default_arg_none = default_arg_none self.__kwargs = kwargs @@ -143,17 +141,16 @@ class HasEnvironment: are set to ``None``.""" raise NotImplementedError - def dbs(self): - """Returns the device manager, the parameter database and the result - database, in this order. + def managers(self): + """Returns the device manager and the dataset manager, in this order. This is the same order that the constructor takes them, allowing sub-objects to be created with this idiom to pass the environment around: :: - sub_object = SomeLibrary(*self.dbs()) + sub_object = SomeLibrary(*self.managers()) """ - return self.__dmgr, self.__pdb, self.__rdb + return self.__device_mgr, self.__dataset_mgr def get_argument(self, key, processor=None, group=None): """Retrieves and returns the value of an argument. @@ -193,91 +190,45 @@ class HasEnvironment: """Returns the full contents of the device database.""" if self.__parent is not None: return self.__parent.get_ddb() - return self.__dmgr.get_ddb() + return self.__device_mgr.get_ddb() def get_device(self, key): """Creates and returns a device driver.""" if self.__parent is not None: return self.__parent.get_device(key) - if self.__dmgr is None: + if self.__device_mgr is None: raise ValueError("Device manager not present") - return self.__dmgr.get(key) + return self.__device_mgr.get(key) def setattr_device(self, key): """Sets a device driver as attribute. The names of the device driver and of the attribute are the same.""" setattr(self, key, self.get_device(key)) - def get_parameter(self, key, default=NoDefault): - """Retrieves and returns a parameter.""" - if self.__parent is not None and key not in self.__param_override: - return self.__parent.get_parameter(key, default) - if self.__pdb is None: - raise ValueError("Parameter database not present") - if key in self.__param_override: - return self.__param_override[key] + def set_dataset(self, key, value, + broadcast=False, persist=False, save=True): + if self.__parent is not None: + self.__parent.set_dataset(key, value, broadcast, persist, save) + return + if self.__dataset_mgr is None: + raise ValueError("Dataset manager not present") + return self.__dataset_mgr.set(key, value, broadcast, persist, save) + + def get_dataset(self, key, default=NoDefault): + if self.__parent is not None: + return self.__parent.get_dataset(key, default) + if self.__dataset_mgr is None: + raise ValueError("Dataset manager not present") try: - return self.__pdb.get(key) + return self.__dataset_mgr.get(key) except KeyError: - if default is not NoDefault: - return default - else: + if default is NoDefault: raise + else: + return default - def setattr_parameter(self, key, default=NoDefault): - """Sets a parameter as attribute. The names of the argument and of the - parameter are the same.""" - setattr(self, key, self.get_parameter(key, default)) - - def set_parameter(self, key, value): - """Writes the value of a parameter into the parameter database.""" - if self.__parent is not None: - self.__parent.set_parameter(key, value) - return - if self.__pdb is None: - raise ValueError("Parameter database not present") - self.__pdb.set(key, value) - - def set_result(self, key, value, realtime=False, store=True): - """Writes the value of a result. - - :param realtime: Marks the result as real-time, making it immediately - available to clients such as the user interface. Returns a - ``Notifier`` instance that can be used to modify mutable results - (such as lists) and synchronize the modifications with the clients. - :param store: Defines if the result should be stored permanently, - e.g. in HDF5 output. Default is to store. - """ - if self.__parent is not None: - self.__parent.set_result(key, value, realtime, store) - return - if self.__rdb is None: - raise ValueError("Result database not present") - if realtime: - if key in self.__rdb.nrt: - raise ValueError("Result is already non-realtime") - self.__rdb.rt[key] = value - notifier = self.__rdb.rt[key] - notifier.kernel_attr_init = False - self.__rdb.set_store(key, store) - return notifier - else: - if key in self.__rdb.rt.read: - raise ValueError("Result is already realtime") - self.__rdb.nrt[key] = value - self.__rdb.set_store(key, store) - - def get_result(self, key): - """Retrieves the value of a result. - - There is no difference between real-time and non-real-time results - (this function does not return ``Notifier`` instances). - """ - if self.__parent is not None: - return self.__parent.get_result(key) - if self.__rdb is None: - raise ValueError("Result database not present") - return self.__rdb.get(key) + def setattr_dataset(self, key, default=NoDefault): + setattr(self, key, self.get_dataset(key, default)) class Experiment: diff --git a/artiq/master/databases.py b/artiq/master/databases.py new file mode 100644 index 000000000..acbf1a37a --- /dev/null +++ b/artiq/master/databases.py @@ -0,0 +1,62 @@ +import asyncio + +from artiq.protocols.sync_struct import Notifier, process_mod +from artiq.protocols import pyon +from artiq.tools import TaskObject + + +class DeviceDB: + def __init__(self, backing_file): + self.backing_file = backing_file + self.data = Notifier(pyon.load_file(self.backing_file)) + + def scan(self): + new_data = pyon.load_file(self.backing_file) + + for k in list(self.data.read.keys()): + if k not in new_data: + del self.data[k] + for k in new_data.keys(): + if k not in self.data.read or self.data.read[k] != new_data[k]: + self.data[k] = new_data[k] + + def get_ddb(self): + return self.data.read + + def get(self, key): + return self.data.read[key] + + +class DatasetDB(TaskObject): + def __init__(self, persist_file, autosave_period=30): + self.persist_file = persist_file + self.autosave_period = autosave_period + + file_data = pyon.load_file(self.persist_file) + self.data = Notifier({k: (True, v) for k, v in file_data.items()}) + + def save(self): + data = {k: v[1] for k, v in self.data.read.items() if v[0]} + pyon.store_file(self.persist_file, data) + + async def _do(self): + try: + while True: + await asyncio.sleep(self.autosave_period) + self.save() + finally: + self.save() + + def get(self, key): + return self.data.read[key][1] + + def update(self, mod): + process_mod(self.data, mod) + + # convenience functions (update() can be used instead) + def set(self, key, value, persist=False): + self.data[key] = (persist, value) + + def delete(self, key): + del self.data[key] + # diff --git a/artiq/master/ddb.py b/artiq/master/ddb.py deleted file mode 100644 index b798422c8..000000000 --- a/artiq/master/ddb.py +++ /dev/null @@ -1,24 +0,0 @@ -from artiq.protocols.sync_struct import Notifier -from artiq.protocols import pyon - - -class DDB: - def __init__(self, backing_file): - self.backing_file = backing_file - self.data = Notifier(pyon.load_file(self.backing_file)) - - def scan(self): - new_data = pyon.load_file(self.backing_file) - - for k in list(self.data.read.keys()): - if k not in new_data: - del self.data[k] - for k in new_data.keys(): - if k not in self.data.read or self.data.read[k] != new_data[k]: - self.data[k] = new_data[k] - - def get_ddb(self): - return self.data.read - - def get(self, key): - return self.data.read[key] diff --git a/artiq/master/worker_db.py b/artiq/master/worker_db.py index fce88f3a6..f36371a1f 100644 --- a/artiq/master/worker_db.py +++ b/artiq/master/worker_db.py @@ -15,6 +15,64 @@ from artiq.protocols.pc_rpc import Client, BestEffortClient logger = logging.getLogger(__name__) +def _create_device(desc, device_mgr): + ty = desc["type"] + if ty == "local": + module = importlib.import_module(desc["module"]) + device_class = getattr(module, desc["class"]) + return device_class(device_mgr, **desc["arguments"]) + elif ty == "controller": + if desc["best_effort"]: + cl = BestEffortClient + else: + cl = Client + return cl(desc["host"], desc["port"], desc["target_name"]) + else: + raise ValueError("Unsupported type in device DB: " + ty) + + +class DeviceManager: + """Handles creation and destruction of local device drivers and controller + RPC clients.""" + def __init__(self, ddb, virtual_devices=dict()): + self.ddb = ddb + self.virtual_devices = virtual_devices + self.active_devices = OrderedDict() + + def get_ddb(self): + """Returns the full contents of the device database.""" + return self.ddb.get_ddb() + + def get(self, name): + """Get the device driver or controller client corresponding to a + device database entry.""" + if name in self.virtual_devices: + return self.virtual_devices[name] + if name in self.active_devices: + return self.active_devices[name] + else: + desc = self.ddb.get(name) + while isinstance(desc, str): + # alias + desc = self.ddb.get(desc) + dev = _create_device(desc, self) + self.active_devices[name] = dev + return dev + + def close_devices(self): + """Closes all active devices, in the opposite order as they were + requested.""" + for dev in reversed(list(self.active_devices.values())): + try: + if isinstance(dev, (Client, BestEffortClient)): + dev.close_rpc() + elif hasattr(dev, "close"): + dev.close() + except Exception as e: + logger.warning("Exception %r when closing device %r", e, dev) + self.active_devices.clear() + + def get_hdf5_output(start_time, rid, name): dirname = os.path.join("results", time.strftime("%Y-%m-%d", start_time), @@ -87,84 +145,30 @@ def result_dict_to_hdf5(f, rd): dataset[()] = data -class ResultDB: - def __init__(self): - self.rt = Notifier(dict()) - self.nrt = dict() - self.store = set() +class DatasetManager: + def __init__(self, ddb): + self.broadcast = Notifier(dict()) + self.local = dict() + + self.ddb = ddb + self.broadcast.publish = ddb.update + + def set(self, key, value, broadcast=False, persist=False, save=True): + if persist: + broadcast = True + r = None + if broadcast: + self.broadcast[key] = (persist, value) + r = self.broadcast[key][1] + if save: + self.local[key] = value + return r def get(self, key): try: - return self.nrt[key] + return self.local[key] except KeyError: - return self.rt[key].read - - def set_store(self, key, store): - if store: - self.store.add(key) - else: - self.store.discard(key) + return self.ddb.get(key) def write_hdf5(self, f): - result_dict_to_hdf5( - f, {k: v for k, v in self.rt.read.items() if k in self.store}) - result_dict_to_hdf5( - f, {k: v for k, v in self.nrt.items() if k in self.store}) - - -def _create_device(desc, dmgr): - ty = desc["type"] - if ty == "local": - module = importlib.import_module(desc["module"]) - device_class = getattr(module, desc["class"]) - return device_class(dmgr, **desc["arguments"]) - elif ty == "controller": - if desc["best_effort"]: - cl = BestEffortClient - else: - cl = Client - return cl(desc["host"], desc["port"], desc["target_name"]) - else: - raise ValueError("Unsupported type in device DB: " + ty) - - -class DeviceManager: - """Handles creation and destruction of local device drivers and controller - RPC clients.""" - def __init__(self, ddb, virtual_devices=dict()): - self.ddb = ddb - self.virtual_devices = virtual_devices - self.active_devices = OrderedDict() - - def get_ddb(self): - """Returns the full contents of the device database.""" - return self.ddb.get_ddb() - - def get(self, name): - """Get the device driver or controller client corresponding to a - device database entry.""" - if name in self.virtual_devices: - return self.virtual_devices[name] - if name in self.active_devices: - return self.active_devices[name] - else: - desc = self.ddb.get(name) - while isinstance(desc, str): - # alias - desc = self.ddb.get(desc) - dev = _create_device(desc, self) - self.active_devices[name] = dev - return dev - - def close_devices(self): - """Closes all active devices, in the opposite order as they were - requested.""" - for dev in reversed(list(self.active_devices.values())): - try: - if isinstance(dev, (Client, BestEffortClient)): - dev.close_rpc() - elif hasattr(dev, "close"): - dev.close() - except Exception as e: - logger.warning("Exception %r when closing device %r", e, dev) - self.active_devices.clear() + result_dict_to_hdf5(f, self.local) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index db1229d2e..ff9a58a4c 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -4,7 +4,7 @@ import os from artiq.protocols import pyon from artiq.tools import file_import -from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output +from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.language.environment import is_experiment from artiq.language.core import set_watchdog_factory, TerminationRequested @@ -62,17 +62,14 @@ class LogForwarder: pass -class ParentDDB: +class ParentDeviceDB: get_ddb = make_parent_action("get_ddb", "") get = make_parent_action("get_device", "key", KeyError) -class ParentPDB: - get = make_parent_action("get_parameter", "key", KeyError) - set = make_parent_action("set_parameter", "key value") - - -update_rt_results = make_parent_action("update_rt_results", "mod") +class ParentDatasetDB: + get = make_parent_action("get_dataset", "key", KeyError) + update = make_parent_action("update_dataset", "mod") class Watchdog: @@ -126,22 +123,22 @@ register_experiment = make_parent_action("register_experiment", "class_name name arguments") -class ExamineDMGR: +class ExamineDeviceMgr: get_ddb = make_parent_action("get_ddb", "") def get(self, name): return None -class DummyPDB: - def get(self, name): +class DummyDatasetMgr: + def set(self, key, value, broadcast=False, persist=False, save=True): return None - def set(self, name, value): + def get(self, key): pass -def examine(dmgr, pdb, rdb, file): +def examine(device_mgr, dataset_mgr, file): module = file_import(file) for class_name, exp_class in module.__dict__.items(): if class_name[0] == "_": @@ -153,7 +150,7 @@ def examine(dmgr, pdb, rdb, file): name = exp_class.__doc__.splitlines()[0].strip() if name[-1] == ".": name = name[:-1] - exp_inst = exp_class(dmgr, pdb, rdb, default_arg_none=True) + exp_inst = exp_class(device_mgr, dataset_mgr, default_arg_none=True) arguments = [(k, (proc.describe(), group)) for k, (proc, group) in exp_inst.requested_args.items()] register_experiment(class_name, name, arguments) @@ -168,10 +165,9 @@ def main(): exp = None exp_inst = None - dmgr = DeviceManager(ParentDDB, - virtual_devices={"scheduler": Scheduler()}) - rdb = ResultDB() - rdb.rt.publish = update_rt_results + device_mgr = DeviceManager(ParentDeviceDB, + virtual_devices={"scheduler": Scheduler()}) + dataset_mgr = DatasetManager(ParentDatasetDB) try: while True: @@ -187,9 +183,9 @@ def main(): else: expf = expid["file"] exp = get_exp(expf, expid["class_name"]) - dmgr.virtual_devices["scheduler"].set_run_info( + device_mgr.virtual_devices["scheduler"].set_run_info( obj["pipeline_name"], expid, obj["priority"]) - exp_inst = exp(dmgr, ParentPDB, rdb, + exp_inst = exp(device_mgr, dataset_mgr, **expid["arguments"]) put_object({"action": "completed"}) elif action == "prepare": @@ -204,7 +200,7 @@ def main(): elif action == "write_results": f = get_hdf5_output(start_time, rid, exp.__name__) try: - rdb.write_hdf5(f) + dataset_mgr.write_hdf5(f) if "repo_rev" in expid: rr = expid["repo_rev"] dtype = "S{}".format(len(rr)) @@ -214,12 +210,12 @@ def main(): f.close() put_object({"action": "completed"}) elif action == "examine": - examine(ExamineDMGR(), DummyPDB(), ResultDB(), obj["file"]) + examine(ExamineDeviceMgr(), DummyDatasetMgr(), obj["file"]) put_object({"action": "completed"}) elif action == "terminate": break finally: - dmgr.close_devices() + device_mgr.close_devices() if __name__ == "__main__": main() diff --git a/examples/master/dataset_db.pyon b/examples/master/dataset_db.pyon new file mode 100644 index 000000000..00f15d739 --- /dev/null +++ b/examples/master/dataset_db.pyon @@ -0,0 +1 @@ +{"flopping_freq": 1499.9876804260716} diff --git a/examples/master/ddb.pyon b/examples/master/device_db.pyon similarity index 100% rename from examples/master/ddb.pyon rename to examples/master/device_db.pyon diff --git a/examples/master/pdb.pyon b/examples/master/pdb.pyon deleted file mode 100644 index 27f1e7665..000000000 --- a/examples/master/pdb.pyon +++ /dev/null @@ -1 +0,0 @@ -{"flopping_freq": 1500.0164816344934} diff --git a/examples/master/repository/flopping_f_simulation.py b/examples/master/repository/flopping_f_simulation.py index 90cb2468d..3624502c9 100644 --- a/examples/master/repository/flopping_f_simulation.py +++ b/examples/master/repository/flopping_f_simulation.py @@ -37,11 +37,11 @@ class FloppingF(EnvExperiment): self.setattr_device("scheduler") def run(self): - frequency = self.set_result("flopping_f_frequency", [], - realtime=True, store=False) - brightness = self.set_result("flopping_f_brightness", [], - realtime=True) - self.set_result("flopping_f_fit", [], realtime=True, store=False) + frequency = self.set_dataset("flopping_f_frequency", [], + broadcast=True, save=False) + brightness = self.set_dataset("flopping_f_brightness", [], + broadcast=True) + self.set_dataset("flopping_f_fit", [], broadcast=True, save=False) for f in self.frequency_scan: m_brightness = model(f, self.F0) + self.noise_amplitude*random.random() @@ -52,16 +52,16 @@ class FloppingF(EnvExperiment): self.scheduler.priority, time.time() + 20, False) def analyze(self): - # Use get_result so that analyze can be run stand-alone. - frequency = self.get_result("flopping_f_frequency") - brightness = self.get_result("flopping_f_brightness") + # Use get_dataset so that analyze can be run stand-alone. + frequency = self.get_dataset("flopping_f_frequency") + brightness = self.get_dataset("flopping_f_brightness") popt, pcov = curve_fit(model_numpy, frequency, brightness, - p0=[self.get_parameter("flopping_freq")]) + p0=[self.get_dataset("flopping_freq")]) perr = np.sqrt(np.diag(pcov)) if perr < 0.1: F0 = float(popt) - self.set_parameter("flopping_freq", F0) - self.set_result("flopping_f_fit", - [model(x, F0) for x in frequency], - realtime=True, store=False) + self.set_dataset("flopping_freq", F0, persist=True, save=False) + self.set_dataset("flopping_f_fit", + [model(x, F0) for x in frequency], + broadcast=True, save=False) diff --git a/examples/master/repository/photon_histogram.py b/examples/master/repository/photon_histogram.py index 99f66c388..a42552d53 100644 --- a/examples/master/repository/photon_histogram.py +++ b/examples/master/repository/photon_histogram.py @@ -16,9 +16,9 @@ class PhotonHistogram(EnvExperiment): self.setattr_argument("nbins", FreeValue(100)) self.setattr_argument("repeats", FreeValue(100)) - self.setattr_parameter("cool_f", 230*MHz) - self.setattr_parameter("detect_f", 220*MHz) - self.setattr_parameter("detect_t", 100*us) + self.setattr_dataset("cool_f", 230*MHz) + self.setattr_dataset("detect_f", 220*MHz) + self.setattr_dataset("detect_t", 100*us) @kernel def program_cooling(self): @@ -60,8 +60,9 @@ class PhotonHistogram(EnvExperiment): hist[n] += 1 total += n - self.set_result("cooling_photon_histogram", hist) - self.set_parameter("ion_present", total > 5*self.repeats) + self.set_dataset("cooling_photon_histogram", hist) + self.set_dataset("ion_present", total > 5*self.repeats, + broadcast=True) if __name__ == "__main__": diff --git a/examples/master/repository/speed_benchmark.py b/examples/master/repository/speed_benchmark.py index 8d8921bcb..11ad806b5 100644 --- a/examples/master/repository/speed_benchmark.py +++ b/examples/master/repository/speed_benchmark.py @@ -111,9 +111,9 @@ class SpeedBenchmark(EnvExperiment): self.scheduler.pause() end_time = time.monotonic() - self.set_result("benchmark_run_time", - (end_time-start_time)/self.nruns, - realtime=True) + self.set_dataset("benchmark_run_time", + (end_time-start_time)/self.nruns, + broadcast=True) def run(self): if self.mode == "Single experiment": @@ -133,6 +133,6 @@ class _Report(EnvExperiment): def run(self): end_time = time.monotonic() - self.set_result("benchmark_run_time", - (end_time-self.start_time)/self.nruns, - realtime=True) + self.set_dataset("benchmark_run_time", + (end_time-self.start_time)/self.nruns, + broadcast=True) diff --git a/examples/sim/al_spectroscopy.py b/examples/sim/al_spectroscopy.py index df038aa63..3cde9c72d 100644 --- a/examples/sim/al_spectroscopy.py +++ b/examples/sim/al_spectroscopy.py @@ -12,7 +12,7 @@ class AluminumSpectroscopy(EnvExperiment): self.setattr_device("spectroscopy_b") self.setattr_device("state_detection") self.setattr_device("pmt") - self.setattr_parameter("spectroscopy_freq", 432*MHz) + self.setattr_dataset("spectroscopy_freq", 432*MHz) self.setattr_argument("photon_limit_low", FreeValue(10)) self.setattr_argument("photon_limit_high", FreeValue(15)) diff --git a/examples/sim/pdb.pyon b/examples/sim/dataset_db.pyon similarity index 100% rename from examples/sim/pdb.pyon rename to examples/sim/dataset_db.pyon diff --git a/examples/sim/ddb.pyon b/examples/sim/device_db.pyon similarity index 100% rename from examples/sim/ddb.pyon rename to examples/sim/device_db.pyon