merge parameters and results into datasets

This commit is contained in:
Sebastien Bourdeauducq 2015-10-12 17:18:23 +08:00
parent 36c3f022aa
commit 97accd2540
21 changed files with 349 additions and 351 deletions

View File

@ -63,24 +63,26 @@ def get_argparser():
parser_delete.add_argument("rid", type=int, parser_delete.add_argument("rid", type=int,
help="run identifier (RID)") help="run identifier (RID)")
parser_set_parameter = subparsers.add_parser( parser_set_dataset = subparsers.add_parser(
"set-parameter", help="add or modify a parameter") "set-dataset", help="add or modify a dataset")
parser_set_parameter.add_argument("name", help="name of the parameter") parser_set_dataset.add_argument("name", help="name of the dataset")
parser_set_parameter.add_argument("value", parser_set_dataset.add_argument("value",
help="value in PYON format") help="value in PYON format")
parser_set_dataset.add_argument("-p", "--persist", action="store_true",
help="make the dataset persistent")
parser_del_parameter = subparsers.add_parser( parser_del_dataset = subparsers.add_parser(
"del-parameter", help="delete a parameter") "del-dataset", help="delete a dataset")
parser_del_parameter.add_argument("name", help="name of the parameter") parser_del_dataset.add_argument("name", help="name of the dataset")
parser_show = subparsers.add_parser( parser_show = subparsers.add_parser(
"show", help="show schedule, log, devices or parameters") "show", help="show schedule, log, devices or datasets")
parser_show.add_argument( parser_show.add_argument(
"what", "what",
help="select object to show: schedule/log/devices/parameters") help="select object to show: schedule/log/devices/datasets")
subparsers.add_parser( subparsers.add_parser(
"scan-ddb", help="trigger a device database (re)scan") "scan-devices", help="trigger a device database (re)scan")
parser_scan_repos = subparsers.add_parser( parser_scan_repos = subparsers.add_parser(
"scan-repository", help="trigger a repository (re)scan") "scan-repository", help="trigger a repository (re)scan")
@ -129,15 +131,15 @@ def _action_delete(remote, args):
remote.delete(args.rid) remote.delete(args.rid)
def _action_set_parameter(remote, args): def _action_set_dataset(remote, args):
remote.set(args.name, pyon.decode(args.value)) remote.set(args.name, pyon.decode(args.value), args.persist)
def _action_del_parameter(remote, args): def _action_del_dataset(remote, args):
remote.delete(args.name) remote.delete(args.name)
def _action_scan_ddb(remote, args): def _action_scan_devices(remote, args):
remote.scan() remote.scan()
@ -186,11 +188,11 @@ def _show_devices(devices):
print(table) print(table)
def _show_parameters(parameters): def _show_datasets(datasets):
clear_screen() clear_screen()
table = PrettyTable(["Parameter", "Value"]) table = PrettyTable(["Dataset", "Persistent", "Value"])
for k, v in sorted(parameters.items(), key=itemgetter(0)): for k, (persist, value) in sorted(datasets.items(), key=itemgetter(0)):
table.add_row([k, str(v)]) table.add_row([k, "Y" if persist else "N", str(value)])
print(table) print(table)
@ -259,8 +261,8 @@ def main():
_show_log(args) _show_log(args)
elif args.what == "devices": elif args.what == "devices":
_show_dict(args, "devices", _show_devices) _show_dict(args, "devices", _show_devices)
elif args.what == "parameters": elif args.what == "datasets":
_show_dict(args, "parameters", _show_parameters) _show_dict(args, "datasets", _show_datasets)
else: else:
print("Unknown object to show, use -h to list valid names.") print("Unknown object to show, use -h to list valid names.")
sys.exit(1) sys.exit(1)
@ -269,9 +271,9 @@ def main():
target_name = { target_name = {
"submit": "master_schedule", "submit": "master_schedule",
"delete": "master_schedule", "delete": "master_schedule",
"set_parameter": "master_pdb", "set_dataset": "master_dataset_db",
"del_parameter": "master_pdb", "del_dataset": "master_dataset_db",
"scan_ddb": "master_ddb", "scan_devices": "master_device_db",
"scan_repository": "master_repository" "scan_repository": "master_repository"
}[action] }[action]
remote = Client(args.server, port, target_name) remote = Client(args.server, port, target_name)

View File

@ -15,8 +15,7 @@ from artiq.protocols.pc_rpc import AsyncioClient
from artiq.gui.state import StateManager from artiq.gui.state import StateManager
from artiq.gui.explorer import ExplorerDock from artiq.gui.explorer import ExplorerDock
from artiq.gui.moninj import MonInj from artiq.gui.moninj import MonInj
from artiq.gui.results import ResultsDock from artiq.gui.datasets import DatasetsDock
from artiq.gui.parameters import ParametersDock
from artiq.gui.schedule import ScheduleDock from artiq.gui.schedule import ScheduleDock
from artiq.gui.log import LogDock from artiq.gui.log import LogDock
from artiq.gui.console import ConsoleDock from artiq.gui.console import ConsoleDock
@ -92,30 +91,24 @@ def main():
args.server, args.port_notify)) args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_explorer.sub_close())) atexit.register(lambda: loop.run_until_complete(d_explorer.sub_close()))
d_results = ResultsDock(win, area) d_datasets = DatasetsDock(win, area)
smgr.register(d_results) smgr.register(d_datasets)
loop.run_until_complete(d_results.sub_connect( loop.run_until_complete(d_datasets.sub_connect(
args.server, args.port_notify)) args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_results.sub_close())) atexit.register(lambda: loop.run_until_complete(d_datasets.sub_close()))
if os.name != "nt": if os.name != "nt":
d_ttl_dds = MonInj() d_ttl_dds = MonInj()
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_ttl_dds.stop())) atexit.register(lambda: loop.run_until_complete(d_ttl_dds.stop()))
d_params = ParametersDock()
loop.run_until_complete(d_params.sub_connect(
args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_params.sub_close()))
if os.name != "nt": if os.name != "nt":
area.addDock(d_ttl_dds.dds_dock, "top") area.addDock(d_ttl_dds.dds_dock, "top")
area.addDock(d_ttl_dds.ttl_dock, "above", d_ttl_dds.dds_dock) area.addDock(d_ttl_dds.ttl_dock, "above", d_ttl_dds.dds_dock)
area.addDock(d_results, "above", d_ttl_dds.ttl_dock) area.addDock(d_datasets, "above", d_ttl_dds.ttl_dock)
else: else:
area.addDock(d_results, "top") area.addDock(d_datasets, "top")
area.addDock(d_params, "above", d_results) area.addDock(d_explorer, "above", d_datasets)
area.addDock(d_explorer, "above", d_params)
d_schedule = ScheduleDock(status_bar, schedule_ctl) d_schedule = ScheduleDock(status_bar, schedule_ctl)
loop.run_until_complete(d_schedule.sub_connect( loop.run_until_complete(d_schedule.sub_connect(
@ -127,16 +120,18 @@ def main():
args.server, args.port_notify)) args.server, args.port_notify))
atexit.register(lambda: loop.run_until_complete(d_log.sub_close())) atexit.register(lambda: loop.run_until_complete(d_log.sub_close()))
pdb = AsyncioClient() dataset_db = AsyncioClient()
loop.run_until_complete(pdb.connect_rpc( loop.run_until_complete(dataset_db.connect_rpc(
args.server, args.port_control, "master_pdb")) args.server, args.port_control, "master_dataset_db"))
atexit.register(lambda: pdb.close_rpc()) atexit.register(lambda: dataset_db.close_rpc())
def _get_parameter(k, v): def _set_dataset(k, v):
asyncio.ensure_future(pdb.set(k, v)) asyncio.ensure_future(dataset_db.set(k, v))
def _del_dataset(k):
asyncio.ensure_future(dataset_db.delete(k))
d_console = ConsoleDock( d_console = ConsoleDock(
d_params.get_parameter, d_datasets.get_dataset,
_get_parameter, _set_dataset,
d_results.get_result) _del_dataset)
area.addDock(d_console, "bottom") area.addDock(d_console, "bottom")
area.addDock(d_log, "above", d_console) area.addDock(d_log, "above", d_console)

View File

@ -93,7 +93,7 @@ class DBWriter(TaskObject):
try: try:
self._queue.put_nowait((k, v)) self._queue.put_nowait((k, v))
except asyncio.QueueFull: except asyncio.QueueFull:
logger.warning("failed to update parameter '%s': " logger.warning("failed to update dataset '%s': "
"too many pending updates", k) "too many pending updates", k)
async def _do(self): async def _do(self):
@ -103,7 +103,7 @@ class DBWriter(TaskObject):
params = {"u": self.user, "p": self.password, "db": self.database, params = {"u": self.user, "p": self.password, "db": self.database,
"consistency": "any", "precision": "n"} "consistency": "any", "precision": "n"}
fmt_ty, fmt_v = format_influxdb(v) fmt_ty, fmt_v = format_influxdb(v)
data = "{},parameter={} {}={}".format(self.table, k, fmt_ty, fmt_v) data = "{},dataset={} {}={}".format(self.table, k, fmt_ty, fmt_v)
try: try:
response = await aiohttp.request( response = await aiohttp.request(
"POST", url, params=params, data=data) "POST", url, params=params, data=data)
@ -121,15 +121,31 @@ class DBWriter(TaskObject):
response.close() response.close()
class Parameters: class _Mock:
def __setitem__(self, k, v):
pass
def __getitem__(self, k):
return self
def __delitem__(self, k):
pass
class Datasets:
def __init__(self, filter_function, writer, init): def __init__(self, filter_function, writer, init):
self.filter_function = filter_function self.filter_function = filter_function
self.writer = writer self.writer = writer
def __setitem__(self, k, v): def __setitem__(self, k, v):
if self.filter_function(k): if self.filter_function(k):
self.writer.update(k, v) self.writer.update(k, v[1])
# ignore mutations
def __getitem__(self, k):
return _Mock()
# ignore deletions
def __delitem__(self, k): def __delitem__(self, k):
pass pass
@ -145,8 +161,8 @@ class MasterReader(TaskObject):
async def _do(self): async def _do(self):
subscriber = Subscriber( subscriber = Subscriber(
"parameters", "datasets",
partial(Parameters, self.filter_function, self.writer)) partial(Datasets, self.filter_function, self.writer))
while True: while True:
try: try:
await subscriber.connect(self.server, self.port) await subscriber.connect(self.server, self.port)

View File

@ -6,9 +6,8 @@ import atexit
import os import os
from artiq.protocols.pc_rpc import Server from artiq.protocols.pc_rpc import Server
from artiq.protocols.sync_struct import Notifier, Publisher, process_mod from artiq.protocols.sync_struct import Notifier, Publisher
from artiq.master.ddb import DDB from artiq.master.databases import DeviceDB, DatasetDB
from artiq.protocols.file_db import FlatFileDB
from artiq.master.scheduler import Scheduler from artiq.master.scheduler import Scheduler
from artiq.master.worker_db import get_last_rid from artiq.master.worker_db import get_last_rid
from artiq.master.repository import FilesystemBackend, GitBackend, Repository from artiq.master.repository import FilesystemBackend, GitBackend, Repository
@ -28,10 +27,10 @@ def get_argparser():
"--port-control", default=3251, type=int, "--port-control", default=3251, type=int,
help="TCP port to listen to for control (default: %(default)d)") help="TCP port to listen to for control (default: %(default)d)")
group = parser.add_argument_group("databases") group = parser.add_argument_group("databases")
group.add_argument("-d", "--ddb", default="ddb.pyon", group.add_argument("--device-db", default="device_db.pyon",
help="device database file") help="device database file (default: '%(default)s')")
group.add_argument("-p", "--pdb", default="pdb.pyon", group.add_argument("--dataset-db", default="dataset_db.pyon",
help="parameter database file") help="dataset file (default: '%(default)s')")
group = parser.add_argument_group("repository") group = parser.add_argument_group("repository")
group.add_argument( group.add_argument(
"-g", "--git", default=False, action="store_true", "-g", "--git", default=False, action="store_true",
@ -65,25 +64,25 @@ def main():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
atexit.register(lambda: loop.close()) atexit.register(lambda: loop.close())
ddb = DDB(args.ddb) device_db = DeviceDB(args.device_db)
pdb = FlatFileDB(args.pdb) dataset_db = DatasetDB(args.dataset_db)
rtr = Notifier(dict()) dataset_db.start()
atexit.register(lambda: loop.run_until_complete(dataset_db.stop()))
log = Log(1000) log = Log(1000)
if args.git: if args.git:
repo_backend = GitBackend(args.repository) repo_backend = GitBackend(args.repository)
else: else:
repo_backend = FilesystemBackend(args.repository) repo_backend = FilesystemBackend(args.repository)
repository = Repository(repo_backend, ddb.get_ddb, log.log) repository = Repository(repo_backend, device_db.get_ddb, log.log)
atexit.register(repository.close) atexit.register(repository.close)
repository.scan_async() repository.scan_async()
worker_handlers = { worker_handlers = {
"get_ddb": ddb.get_ddb, "get_ddb": device_db.get_ddb,
"get_device": ddb.get, "get_device": device_db.get,
"get_parameter": pdb.get, "get_dataset": dataset_db.get,
"set_parameter": pdb.set, "update_dataset": dataset_db.update,
"update_rt_results": lambda mod: process_mod(rtr, mod),
"log": log.log "log": log.log
} }
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
@ -92,8 +91,8 @@ def main():
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
server_control = Server({ server_control = Server({
"master_ddb": ddb, "master_device_db": device_db,
"master_pdb": pdb, "master_dataset_db": dataset_db,
"master_schedule": scheduler, "master_schedule": scheduler,
"master_repository": repository "master_repository": repository
}) })
@ -103,9 +102,8 @@ def main():
server_notify = Publisher({ server_notify = Publisher({
"schedule": scheduler.notifier, "schedule": scheduler.notifier,
"devices": ddb.data, "devices": device_db.data,
"parameters": pdb.data, "datasets": dataset_db.data,
"rt_results": rtr,
"explist": repository.explist, "explist": repository.explist,
"log": log.data "log": log.data
}) })

View File

@ -4,17 +4,14 @@
import argparse import argparse
import sys import sys
import time
from operator import itemgetter from operator import itemgetter
from itertools import chain
import logging import logging
import h5py import h5py
from artiq.language.environment import EnvExperiment from artiq.language.environment import EnvExperiment
from artiq.protocols.file_db import FlatFileDB from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.ddb import DDB from artiq.master.worker_db import DeviceManager, DatasetManager
from artiq.master.worker_db import DeviceManager, ResultDB
from artiq.tools import * from artiq.tools import *
@ -33,11 +30,6 @@ class ELFRunner(EnvExperiment):
self.core.comm.serve(dict(), dict()) self.core.comm.serve(dict(), dict())
class SimpleParamLogger:
def set(self, timestamp, name, value):
logger.info("Parameter change: {} = {}".format(name, value))
class DummyScheduler: class DummyScheduler:
def __init__(self): def __init__(self):
self.next_rid = 0 self.next_rid = 0
@ -63,10 +55,10 @@ def get_argparser(with_file=True):
description="Local experiment running tool") description="Local experiment running tool")
verbosity_args(parser) verbosity_args(parser)
parser.add_argument("-d", "--ddb", default="ddb.pyon", parser.add_argument("--device-db", default="device_db.pyon",
help="device database file") help="device database file (default: '%(default)s')")
parser.add_argument("-p", "--pdb", default="pdb.pyon", parser.add_argument("--dataset-db", default="dataset_db.pyon",
help="parameter database file") help="dataset file (default: '%(default)s')")
parser.add_argument("-e", "--experiment", default=None, parser.add_argument("-e", "--experiment", default=None,
help="experiment to run") help="experiment to run")
@ -82,7 +74,7 @@ def get_argparser(with_file=True):
return parser return parser
def _build_experiment(dmgr, pdb, rdb, args): def _build_experiment(device_mgr, dataset_mgr, args):
if hasattr(args, "file"): if hasattr(args, "file"):
if args.file.endswith(".elf"): if args.file.endswith(".elf"):
if args.arguments: if args.arguments:
@ -90,7 +82,7 @@ def _build_experiment(dmgr, pdb, rdb, args):
if args.experiment: if args.experiment:
raise ValueError("experiment-by-name not supported " raise ValueError("experiment-by-name not supported "
"for ELF kernels") "for ELF kernels")
return ELFRunner(dmgr, pdb, rdb, file=args.file) return ELFRunner(device_mgr, dataset_mgr, file=args.file)
else: else:
module = file_import(args.file) module = file_import(args.file)
file = args.file file = args.file
@ -104,35 +96,34 @@ def _build_experiment(dmgr, pdb, rdb, args):
"experiment": args.experiment, "experiment": args.experiment,
"arguments": arguments "arguments": arguments
} }
dmgr.virtual_devices["scheduler"].expid = expid device_mgr.virtual_devices["scheduler"].expid = expid
return exp(dmgr, pdb, rdb, **arguments) return exp(device_mgr, dataset_mgr, **arguments)
def run(with_file=False): def run(with_file=False):
args = get_argparser(with_file).parse_args() args = get_argparser(with_file).parse_args()
init_logger(args) init_logger(args)
dmgr = DeviceManager(DDB(args.ddb), device_mgr = DeviceManager(DeviceDB(args.device_db),
virtual_devices={"scheduler": DummyScheduler()}) virtual_devices={"scheduler": DummyScheduler()})
pdb = FlatFileDB(args.pdb) dataset_db = DatasetDB(args.dataset_db)
pdb.hooks.append(SimpleParamLogger()) dataset_mgr = DatasetManager(dataset_db)
rdb = ResultDB()
try: try:
exp_inst = _build_experiment(dmgr, pdb, rdb, args) exp_inst = _build_experiment(device_mgr, dataset_mgr, args)
exp_inst.prepare() exp_inst.prepare()
exp_inst.run() exp_inst.run()
exp_inst.analyze() exp_inst.analyze()
finally: finally:
dmgr.close_devices() device_mgr.close_devices()
if args.hdf5 is not None: if args.hdf5 is not None:
with h5py.File(args.hdf5, "w") as f: with h5py.File(args.hdf5, "w") as f:
rdb.write_hdf5(f) dataset_mgr.write_hdf5(f)
elif rdb.rt.read or rdb.nrt: else:
r = chain(rdb.rt.read.items(), rdb.nrt.items()) for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)):
for k, v in sorted(r, key=itemgetter(0)):
print("{}: {}".format(k, v)) print("{}: {}".format(k, v))
dataset_db.save()
def main(): def main():

View File

@ -5,19 +5,19 @@ _help = """
This is an interactive Python console. This is an interactive Python console.
The following functions are available: The following functions are available:
get_parameter(key) get_dataset(key)
set_parameter(key, value) [asynchronous update] set_dataset(key, value, persist=False) [asynchronous update]
get_result(key) [real-time results only] del_dataset(key) [asynchronous update]
""" """
class ConsoleDock(dockarea.Dock): class ConsoleDock(dockarea.Dock):
def __init__(self, get_parameter, set_parameter, get_result): def __init__(self, get_dataset, set_dataset, del_dataset):
dockarea.Dock.__init__(self, "Console", size=(1000, 300)) dockarea.Dock.__init__(self, "Console", size=(1000, 300))
ns = { ns = {
"get_parameter": get_parameter, "get_dataset": get_dataset,
"set_parameter": set_parameter, "set_dataset": set_dataset,
"get_result": get_result "del_dataset": del_dataset
} }
c = console.ConsoleWidget(namespace=ns, text=_help) c = console.ConsoleWidget(namespace=ns, text=_help)
self.addWidget(c) self.addWidget(c)

View File

@ -15,9 +15,9 @@ from artiq.gui.displays import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ResultsModel(DictSyncModel): class DatasetsModel(DictSyncModel):
def __init__(self, parent, init): def __init__(self, parent, init):
DictSyncModel.__init__(self, ["Result", "Value"], DictSyncModel.__init__(self, ["Dataset", "Persistent", "Value"],
parent, init) parent, init)
def sort_key(self, k, v): def sort_key(self, k, v):
@ -27,7 +27,9 @@ class ResultsModel(DictSyncModel):
if column == 0: if column == 0:
return k return k
elif column == 1: elif column == 1:
return short_format(v) return "Y" if v[0] else "N"
elif column == 2:
return short_format(v[1])
else: else:
raise ValueError raise ValueError
@ -38,9 +40,9 @@ def _get_display_type_name(display_cls):
return name return name
class ResultsDock(dockarea.Dock): class DatasetsDock(dockarea.Dock):
def __init__(self, dialog_parent, dock_area): def __init__(self, dialog_parent, dock_area):
dockarea.Dock.__init__(self, "Results", size=(1500, 500)) dockarea.Dock.__init__(self, "Datasets", size=(1500, 500))
self.dialog_parent = dialog_parent self.dialog_parent = dialog_parent
self.dock_area = dock_area self.dock_area = dock_area
@ -65,22 +67,26 @@ class ResultsDock(dockarea.Dock):
self.displays = dict() self.displays = dict()
def get_result(self, key): def get_dataset(self, key):
return self.table_model.backing_store[key] return self.table_model.backing_store[key][1]
async def sub_connect(self, host, port): async def sub_connect(self, host, port):
self.subscriber = Subscriber("rt_results", self.init_results_model, self.subscriber = Subscriber("datasets", self.init_datasets_model,
self.on_mod) self.on_mod)
await self.subscriber.connect(host, port) await self.subscriber.connect(host, port)
async def sub_close(self): async def sub_close(self):
await self.subscriber.close() await self.subscriber.close()
def init_results_model(self, init): def init_datasets_model(self, init):
self.table_model = ResultsModel(self.table, init) self.table_model = DatasetsModel(self.table, init)
self.table.setModel(self.table_model) self.table.setModel(self.table_model)
return self.table_model return self.table_model
def update_display_data(self, dsp):
dsp.update_data({k: self.table_model.backing_store[k][1]
for k in dsp.data_sources()})
def on_mod(self, mod): def on_mod(self, mod):
if mod["action"] == "init": if mod["action"] == "init":
for display in self.displays.values(): for display in self.displays.values():
@ -96,7 +102,7 @@ class ResultsDock(dockarea.Dock):
for display in self.displays.values(): for display in self.displays.values():
if source in display.data_sources(): if source in display.data_sources():
display.update_data(self.table_model.backing_store) self.update_display_data(display)
def create_dialog(self, ty): def create_dialog(self, ty):
dlg_class = display_types[ty][0] dlg_class = display_types[ty][0]
@ -111,7 +117,7 @@ class ResultsDock(dockarea.Dock):
dsp_class = display_types[ty][1] dsp_class = display_types[ty][1]
dsp = dsp_class(name, settings) dsp = dsp_class(name, settings)
self.displays[name] = dsp self.displays[name] = dsp
dsp.update_data(self.table_model.backing_store) self.update_display_data(dsp)
def on_close(): def on_close():
del self.displays[name] del self.displays[name]

View File

@ -113,15 +113,13 @@ class StringValue(_SimpleArgProcessor):
class HasEnvironment: class HasEnvironment:
"""Provides methods to manage the environment of an experiment (devices, """Provides methods to manage the environment of an experiment (devices,
parameters, results, arguments).""" parameters, results, arguments)."""
def __init__(self, dmgr=None, pdb=None, rdb=None, *, parent=None, def __init__(self, device_mgr=None, dataset_mgr=None, *, parent=None,
param_override=dict(), default_arg_none=False, **kwargs): default_arg_none=False, **kwargs):
self.requested_args = OrderedDict() self.requested_args = OrderedDict()
self.__dmgr = dmgr self.__device_mgr = device_mgr
self.__pdb = pdb self.__dataset_mgr = dataset_mgr
self.__rdb = rdb
self.__parent = parent self.__parent = parent
self.__param_override = param_override
self.__default_arg_none = default_arg_none self.__default_arg_none = default_arg_none
self.__kwargs = kwargs self.__kwargs = kwargs
@ -143,17 +141,16 @@ class HasEnvironment:
are set to ``None``.""" are set to ``None``."""
raise NotImplementedError raise NotImplementedError
def dbs(self): def managers(self):
"""Returns the device manager, the parameter database and the result """Returns the device manager and the dataset manager, in this order.
database, in this order.
This is the same order that the constructor takes them, allowing This is the same order that the constructor takes them, allowing
sub-objects to be created with this idiom to pass the environment sub-objects to be created with this idiom to pass the environment
around: :: around: ::
sub_object = SomeLibrary(*self.dbs()) sub_object = SomeLibrary(*self.managers())
""" """
return self.__dmgr, self.__pdb, self.__rdb return self.__device_mgr, self.__dataset_mgr
def get_argument(self, key, processor=None, group=None): def get_argument(self, key, processor=None, group=None):
"""Retrieves and returns the value of an argument. """Retrieves and returns the value of an argument.
@ -193,91 +190,45 @@ class HasEnvironment:
"""Returns the full contents of the device database.""" """Returns the full contents of the device database."""
if self.__parent is not None: if self.__parent is not None:
return self.__parent.get_ddb() return self.__parent.get_ddb()
return self.__dmgr.get_ddb() return self.__device_mgr.get_ddb()
def get_device(self, key): def get_device(self, key):
"""Creates and returns a device driver.""" """Creates and returns a device driver."""
if self.__parent is not None: if self.__parent is not None:
return self.__parent.get_device(key) return self.__parent.get_device(key)
if self.__dmgr is None: if self.__device_mgr is None:
raise ValueError("Device manager not present") raise ValueError("Device manager not present")
return self.__dmgr.get(key) return self.__device_mgr.get(key)
def setattr_device(self, key): def setattr_device(self, key):
"""Sets a device driver as attribute. The names of the device driver """Sets a device driver as attribute. The names of the device driver
and of the attribute are the same.""" and of the attribute are the same."""
setattr(self, key, self.get_device(key)) setattr(self, key, self.get_device(key))
def get_parameter(self, key, default=NoDefault): def set_dataset(self, key, value,
"""Retrieves and returns a parameter.""" broadcast=False, persist=False, save=True):
if self.__parent is not None and key not in self.__param_override: if self.__parent is not None:
return self.__parent.get_parameter(key, default) self.__parent.set_dataset(key, value, broadcast, persist, save)
if self.__pdb is None: return
raise ValueError("Parameter database not present") if self.__dataset_mgr is None:
if key in self.__param_override: raise ValueError("Dataset manager not present")
return self.__param_override[key] return self.__dataset_mgr.set(key, value, broadcast, persist, save)
def get_dataset(self, key, default=NoDefault):
if self.__parent is not None:
return self.__parent.get_dataset(key, default)
if self.__dataset_mgr is None:
raise ValueError("Dataset manager not present")
try: try:
return self.__pdb.get(key) return self.__dataset_mgr.get(key)
except KeyError: except KeyError:
if default is not NoDefault: if default is NoDefault:
return default
else:
raise raise
else:
return default
def setattr_parameter(self, key, default=NoDefault): def setattr_dataset(self, key, default=NoDefault):
"""Sets a parameter as attribute. The names of the argument and of the setattr(self, key, self.get_dataset(key, default))
parameter are the same."""
setattr(self, key, self.get_parameter(key, default))
def set_parameter(self, key, value):
"""Writes the value of a parameter into the parameter database."""
if self.__parent is not None:
self.__parent.set_parameter(key, value)
return
if self.__pdb is None:
raise ValueError("Parameter database not present")
self.__pdb.set(key, value)
def set_result(self, key, value, realtime=False, store=True):
"""Writes the value of a result.
:param realtime: Marks the result as real-time, making it immediately
available to clients such as the user interface. Returns a
``Notifier`` instance that can be used to modify mutable results
(such as lists) and synchronize the modifications with the clients.
:param store: Defines if the result should be stored permanently,
e.g. in HDF5 output. Default is to store.
"""
if self.__parent is not None:
self.__parent.set_result(key, value, realtime, store)
return
if self.__rdb is None:
raise ValueError("Result database not present")
if realtime:
if key in self.__rdb.nrt:
raise ValueError("Result is already non-realtime")
self.__rdb.rt[key] = value
notifier = self.__rdb.rt[key]
notifier.kernel_attr_init = False
self.__rdb.set_store(key, store)
return notifier
else:
if key in self.__rdb.rt.read:
raise ValueError("Result is already realtime")
self.__rdb.nrt[key] = value
self.__rdb.set_store(key, store)
def get_result(self, key):
"""Retrieves the value of a result.
There is no difference between real-time and non-real-time results
(this function does not return ``Notifier`` instances).
"""
if self.__parent is not None:
return self.__parent.get_result(key)
if self.__rdb is None:
raise ValueError("Result database not present")
return self.__rdb.get(key)
class Experiment: class Experiment:

62
artiq/master/databases.py Normal file
View File

@ -0,0 +1,62 @@
import asyncio
from artiq.protocols.sync_struct import Notifier, process_mod
from artiq.protocols import pyon
from artiq.tools import TaskObject
class DeviceDB:
def __init__(self, backing_file):
self.backing_file = backing_file
self.data = Notifier(pyon.load_file(self.backing_file))
def scan(self):
new_data = pyon.load_file(self.backing_file)
for k in list(self.data.read.keys()):
if k not in new_data:
del self.data[k]
for k in new_data.keys():
if k not in self.data.read or self.data.read[k] != new_data[k]:
self.data[k] = new_data[k]
def get_ddb(self):
return self.data.read
def get(self, key):
return self.data.read[key]
class DatasetDB(TaskObject):
def __init__(self, persist_file, autosave_period=30):
self.persist_file = persist_file
self.autosave_period = autosave_period
file_data = pyon.load_file(self.persist_file)
self.data = Notifier({k: (True, v) for k, v in file_data.items()})
def save(self):
data = {k: v[1] for k, v in self.data.read.items() if v[0]}
pyon.store_file(self.persist_file, data)
async def _do(self):
try:
while True:
await asyncio.sleep(self.autosave_period)
self.save()
finally:
self.save()
def get(self, key):
return self.data.read[key][1]
def update(self, mod):
process_mod(self.data, mod)
# convenience functions (update() can be used instead)
def set(self, key, value, persist=False):
self.data[key] = (persist, value)
def delete(self, key):
del self.data[key]
#

View File

@ -1,24 +0,0 @@
from artiq.protocols.sync_struct import Notifier
from artiq.protocols import pyon
class DDB:
def __init__(self, backing_file):
self.backing_file = backing_file
self.data = Notifier(pyon.load_file(self.backing_file))
def scan(self):
new_data = pyon.load_file(self.backing_file)
for k in list(self.data.read.keys()):
if k not in new_data:
del self.data[k]
for k in new_data.keys():
if k not in self.data.read or self.data.read[k] != new_data[k]:
self.data[k] = new_data[k]
def get_ddb(self):
return self.data.read
def get(self, key):
return self.data.read[key]

View File

@ -15,6 +15,64 @@ from artiq.protocols.pc_rpc import Client, BestEffortClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _create_device(desc, device_mgr):
ty = desc["type"]
if ty == "local":
module = importlib.import_module(desc["module"])
device_class = getattr(module, desc["class"])
return device_class(device_mgr, **desc["arguments"])
elif ty == "controller":
if desc["best_effort"]:
cl = BestEffortClient
else:
cl = Client
return cl(desc["host"], desc["port"], desc["target_name"])
else:
raise ValueError("Unsupported type in device DB: " + ty)
class DeviceManager:
"""Handles creation and destruction of local device drivers and controller
RPC clients."""
def __init__(self, ddb, virtual_devices=dict()):
self.ddb = ddb
self.virtual_devices = virtual_devices
self.active_devices = OrderedDict()
def get_ddb(self):
"""Returns the full contents of the device database."""
return self.ddb.get_ddb()
def get(self, name):
"""Get the device driver or controller client corresponding to a
device database entry."""
if name in self.virtual_devices:
return self.virtual_devices[name]
if name in self.active_devices:
return self.active_devices[name]
else:
desc = self.ddb.get(name)
while isinstance(desc, str):
# alias
desc = self.ddb.get(desc)
dev = _create_device(desc, self)
self.active_devices[name] = dev
return dev
def close_devices(self):
"""Closes all active devices, in the opposite order as they were
requested."""
for dev in reversed(list(self.active_devices.values())):
try:
if isinstance(dev, (Client, BestEffortClient)):
dev.close_rpc()
elif hasattr(dev, "close"):
dev.close()
except Exception as e:
logger.warning("Exception %r when closing device %r", e, dev)
self.active_devices.clear()
def get_hdf5_output(start_time, rid, name): def get_hdf5_output(start_time, rid, name):
dirname = os.path.join("results", dirname = os.path.join("results",
time.strftime("%Y-%m-%d", start_time), time.strftime("%Y-%m-%d", start_time),
@ -87,84 +145,30 @@ def result_dict_to_hdf5(f, rd):
dataset[()] = data dataset[()] = data
class ResultDB: class DatasetManager:
def __init__(self): def __init__(self, ddb):
self.rt = Notifier(dict()) self.broadcast = Notifier(dict())
self.nrt = dict() self.local = dict()
self.store = set()
self.ddb = ddb
self.broadcast.publish = ddb.update
def set(self, key, value, broadcast=False, persist=False, save=True):
if persist:
broadcast = True
r = None
if broadcast:
self.broadcast[key] = (persist, value)
r = self.broadcast[key][1]
if save:
self.local[key] = value
return r
def get(self, key): def get(self, key):
try: try:
return self.nrt[key] return self.local[key]
except KeyError: except KeyError:
return self.rt[key].read return self.ddb.get(key)
def set_store(self, key, store):
if store:
self.store.add(key)
else:
self.store.discard(key)
def write_hdf5(self, f): def write_hdf5(self, f):
result_dict_to_hdf5( result_dict_to_hdf5(f, self.local)
f, {k: v for k, v in self.rt.read.items() if k in self.store})
result_dict_to_hdf5(
f, {k: v for k, v in self.nrt.items() if k in self.store})
def _create_device(desc, dmgr):
ty = desc["type"]
if ty == "local":
module = importlib.import_module(desc["module"])
device_class = getattr(module, desc["class"])
return device_class(dmgr, **desc["arguments"])
elif ty == "controller":
if desc["best_effort"]:
cl = BestEffortClient
else:
cl = Client
return cl(desc["host"], desc["port"], desc["target_name"])
else:
raise ValueError("Unsupported type in device DB: " + ty)
class DeviceManager:
"""Handles creation and destruction of local device drivers and controller
RPC clients."""
def __init__(self, ddb, virtual_devices=dict()):
self.ddb = ddb
self.virtual_devices = virtual_devices
self.active_devices = OrderedDict()
def get_ddb(self):
"""Returns the full contents of the device database."""
return self.ddb.get_ddb()
def get(self, name):
"""Get the device driver or controller client corresponding to a
device database entry."""
if name in self.virtual_devices:
return self.virtual_devices[name]
if name in self.active_devices:
return self.active_devices[name]
else:
desc = self.ddb.get(name)
while isinstance(desc, str):
# alias
desc = self.ddb.get(desc)
dev = _create_device(desc, self)
self.active_devices[name] = dev
return dev
def close_devices(self):
"""Closes all active devices, in the opposite order as they were
requested."""
for dev in reversed(list(self.active_devices.values())):
try:
if isinstance(dev, (Client, BestEffortClient)):
dev.close_rpc()
elif hasattr(dev, "close"):
dev.close()
except Exception as e:
logger.warning("Exception %r when closing device %r", e, dev)
self.active_devices.clear()

View File

@ -4,7 +4,7 @@ import os
from artiq.protocols import pyon from artiq.protocols import pyon
from artiq.tools import file_import from artiq.tools import file_import
from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
from artiq.language.environment import is_experiment from artiq.language.environment import is_experiment
from artiq.language.core import set_watchdog_factory, TerminationRequested from artiq.language.core import set_watchdog_factory, TerminationRequested
@ -62,17 +62,14 @@ class LogForwarder:
pass pass
class ParentDDB: class ParentDeviceDB:
get_ddb = make_parent_action("get_ddb", "") get_ddb = make_parent_action("get_ddb", "")
get = make_parent_action("get_device", "key", KeyError) get = make_parent_action("get_device", "key", KeyError)
class ParentPDB: class ParentDatasetDB:
get = make_parent_action("get_parameter", "key", KeyError) get = make_parent_action("get_dataset", "key", KeyError)
set = make_parent_action("set_parameter", "key value") update = make_parent_action("update_dataset", "mod")
update_rt_results = make_parent_action("update_rt_results", "mod")
class Watchdog: class Watchdog:
@ -126,22 +123,22 @@ register_experiment = make_parent_action("register_experiment",
"class_name name arguments") "class_name name arguments")
class ExamineDMGR: class ExamineDeviceMgr:
get_ddb = make_parent_action("get_ddb", "") get_ddb = make_parent_action("get_ddb", "")
def get(self, name): def get(self, name):
return None return None
class DummyPDB: class DummyDatasetMgr:
def get(self, name): def set(self, key, value, broadcast=False, persist=False, save=True):
return None return None
def set(self, name, value): def get(self, key):
pass pass
def examine(dmgr, pdb, rdb, file): def examine(device_mgr, dataset_mgr, file):
module = file_import(file) module = file_import(file)
for class_name, exp_class in module.__dict__.items(): for class_name, exp_class in module.__dict__.items():
if class_name[0] == "_": if class_name[0] == "_":
@ -153,7 +150,7 @@ def examine(dmgr, pdb, rdb, file):
name = exp_class.__doc__.splitlines()[0].strip() name = exp_class.__doc__.splitlines()[0].strip()
if name[-1] == ".": if name[-1] == ".":
name = name[:-1] name = name[:-1]
exp_inst = exp_class(dmgr, pdb, rdb, default_arg_none=True) exp_inst = exp_class(device_mgr, dataset_mgr, default_arg_none=True)
arguments = [(k, (proc.describe(), group)) arguments = [(k, (proc.describe(), group))
for k, (proc, group) in exp_inst.requested_args.items()] for k, (proc, group) in exp_inst.requested_args.items()]
register_experiment(class_name, name, arguments) register_experiment(class_name, name, arguments)
@ -168,10 +165,9 @@ def main():
exp = None exp = None
exp_inst = None exp_inst = None
dmgr = DeviceManager(ParentDDB, device_mgr = DeviceManager(ParentDeviceDB,
virtual_devices={"scheduler": Scheduler()}) virtual_devices={"scheduler": Scheduler()})
rdb = ResultDB() dataset_mgr = DatasetManager(ParentDatasetDB)
rdb.rt.publish = update_rt_results
try: try:
while True: while True:
@ -187,9 +183,9 @@ def main():
else: else:
expf = expid["file"] expf = expid["file"]
exp = get_exp(expf, expid["class_name"]) exp = get_exp(expf, expid["class_name"])
dmgr.virtual_devices["scheduler"].set_run_info( device_mgr.virtual_devices["scheduler"].set_run_info(
obj["pipeline_name"], expid, obj["priority"]) obj["pipeline_name"], expid, obj["priority"])
exp_inst = exp(dmgr, ParentPDB, rdb, exp_inst = exp(device_mgr, dataset_mgr,
**expid["arguments"]) **expid["arguments"])
put_object({"action": "completed"}) put_object({"action": "completed"})
elif action == "prepare": elif action == "prepare":
@ -204,7 +200,7 @@ def main():
elif action == "write_results": elif action == "write_results":
f = get_hdf5_output(start_time, rid, exp.__name__) f = get_hdf5_output(start_time, rid, exp.__name__)
try: try:
rdb.write_hdf5(f) dataset_mgr.write_hdf5(f)
if "repo_rev" in expid: if "repo_rev" in expid:
rr = expid["repo_rev"] rr = expid["repo_rev"]
dtype = "S{}".format(len(rr)) dtype = "S{}".format(len(rr))
@ -214,12 +210,12 @@ def main():
f.close() f.close()
put_object({"action": "completed"}) put_object({"action": "completed"})
elif action == "examine": elif action == "examine":
examine(ExamineDMGR(), DummyPDB(), ResultDB(), obj["file"]) examine(ExamineDeviceMgr(), DummyDatasetMgr(), obj["file"])
put_object({"action": "completed"}) put_object({"action": "completed"})
elif action == "terminate": elif action == "terminate":
break break
finally: finally:
dmgr.close_devices() device_mgr.close_devices()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -0,0 +1 @@
{"flopping_freq": 1499.9876804260716}

View File

@ -1 +0,0 @@
{"flopping_freq": 1500.0164816344934}

View File

@ -37,11 +37,11 @@ class FloppingF(EnvExperiment):
self.setattr_device("scheduler") self.setattr_device("scheduler")
def run(self): def run(self):
frequency = self.set_result("flopping_f_frequency", [], frequency = self.set_dataset("flopping_f_frequency", [],
realtime=True, store=False) broadcast=True, save=False)
brightness = self.set_result("flopping_f_brightness", [], brightness = self.set_dataset("flopping_f_brightness", [],
realtime=True) broadcast=True)
self.set_result("flopping_f_fit", [], realtime=True, store=False) self.set_dataset("flopping_f_fit", [], broadcast=True, save=False)
for f in self.frequency_scan: for f in self.frequency_scan:
m_brightness = model(f, self.F0) + self.noise_amplitude*random.random() m_brightness = model(f, self.F0) + self.noise_amplitude*random.random()
@ -52,16 +52,16 @@ class FloppingF(EnvExperiment):
self.scheduler.priority, time.time() + 20, False) self.scheduler.priority, time.time() + 20, False)
def analyze(self): def analyze(self):
# Use get_result so that analyze can be run stand-alone. # Use get_dataset so that analyze can be run stand-alone.
frequency = self.get_result("flopping_f_frequency") frequency = self.get_dataset("flopping_f_frequency")
brightness = self.get_result("flopping_f_brightness") brightness = self.get_dataset("flopping_f_brightness")
popt, pcov = curve_fit(model_numpy, popt, pcov = curve_fit(model_numpy,
frequency, brightness, frequency, brightness,
p0=[self.get_parameter("flopping_freq")]) p0=[self.get_dataset("flopping_freq")])
perr = np.sqrt(np.diag(pcov)) perr = np.sqrt(np.diag(pcov))
if perr < 0.1: if perr < 0.1:
F0 = float(popt) F0 = float(popt)
self.set_parameter("flopping_freq", F0) self.set_dataset("flopping_freq", F0, persist=True, save=False)
self.set_result("flopping_f_fit", self.set_dataset("flopping_f_fit",
[model(x, F0) for x in frequency], [model(x, F0) for x in frequency],
realtime=True, store=False) broadcast=True, save=False)

View File

@ -16,9 +16,9 @@ class PhotonHistogram(EnvExperiment):
self.setattr_argument("nbins", FreeValue(100)) self.setattr_argument("nbins", FreeValue(100))
self.setattr_argument("repeats", FreeValue(100)) self.setattr_argument("repeats", FreeValue(100))
self.setattr_parameter("cool_f", 230*MHz) self.setattr_dataset("cool_f", 230*MHz)
self.setattr_parameter("detect_f", 220*MHz) self.setattr_dataset("detect_f", 220*MHz)
self.setattr_parameter("detect_t", 100*us) self.setattr_dataset("detect_t", 100*us)
@kernel @kernel
def program_cooling(self): def program_cooling(self):
@ -60,8 +60,9 @@ class PhotonHistogram(EnvExperiment):
hist[n] += 1 hist[n] += 1
total += n total += n
self.set_result("cooling_photon_histogram", hist) self.set_dataset("cooling_photon_histogram", hist)
self.set_parameter("ion_present", total > 5*self.repeats) self.set_dataset("ion_present", total > 5*self.repeats,
broadcast=True)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -111,9 +111,9 @@ class SpeedBenchmark(EnvExperiment):
self.scheduler.pause() self.scheduler.pause()
end_time = time.monotonic() end_time = time.monotonic()
self.set_result("benchmark_run_time", self.set_dataset("benchmark_run_time",
(end_time-start_time)/self.nruns, (end_time-start_time)/self.nruns,
realtime=True) broadcast=True)
def run(self): def run(self):
if self.mode == "Single experiment": if self.mode == "Single experiment":
@ -133,6 +133,6 @@ class _Report(EnvExperiment):
def run(self): def run(self):
end_time = time.monotonic() end_time = time.monotonic()
self.set_result("benchmark_run_time", self.set_dataset("benchmark_run_time",
(end_time-self.start_time)/self.nruns, (end_time-self.start_time)/self.nruns,
realtime=True) broadcast=True)

View File

@ -12,7 +12,7 @@ class AluminumSpectroscopy(EnvExperiment):
self.setattr_device("spectroscopy_b") self.setattr_device("spectroscopy_b")
self.setattr_device("state_detection") self.setattr_device("state_detection")
self.setattr_device("pmt") self.setattr_device("pmt")
self.setattr_parameter("spectroscopy_freq", 432*MHz) self.setattr_dataset("spectroscopy_freq", 432*MHz)
self.setattr_argument("photon_limit_low", FreeValue(10)) self.setattr_argument("photon_limit_low", FreeValue(10))
self.setattr_argument("photon_limit_high", FreeValue(15)) self.setattr_argument("photon_limit_high", FreeValue(15))