diff --git a/artiq/language/db.py b/artiq/language/db.py index 6c7a0d154..cb7e23908 100644 --- a/artiq/language/db.py +++ b/artiq/language/db.py @@ -134,6 +134,10 @@ class AutoDB: else: raise ValueError + @classmethod + def get_realtime_results(): + return dict() + def build(self): """This is called by ``__init__`` after the parameter initialization is done. diff --git a/artiq/management/db.py b/artiq/management/db.py index 26e559e08..8dac97e0b 100644 --- a/artiq/management/db.py +++ b/artiq/management/db.py @@ -1,4 +1,4 @@ -from collections import OrderedDict, defaultdict +from collections import OrderedDict import importlib from time import time @@ -51,14 +51,25 @@ class SimpleHistory: class ResultDB: - def __init__(self): - self.data = defaultdict(list) + def __init__(self, realtime_results): + self.realtime_data = Notifier({x: [] for x in realtime_results}) + self.data = Notifier(dict()) def request(self, name): - return self.data[name] + try: + return self.realtime_data[name] + except KeyError: + try: + return self.data[name] + except KeyError: + self.data[name] = [] + return self.data[name] def set(self, name, value): - self.data[name] = value + if name in self.realtime_data: + self.realtime_data[name] = value + else: + self.data[name] = value def _create_device(desc, dbh): diff --git a/artiq/management/worker_impl.py b/artiq/management/worker_impl.py index c03dbb673..17d92857f 100644 --- a/artiq/management/worker_impl.py +++ b/artiq/management/worker_impl.py @@ -8,23 +8,6 @@ from artiq.language.db import AutoDB from artiq.management.db import DBHub, ResultDB -def run(dbh, file, unit, arguments): - module = file_import(file) - if unit is None: - units = [v for k, v in module.__dict__.items() - if k[0] != "_" - and isclass(v) - and issubclass(v, AutoDB) - and v is not AutoDB] - if len(units) != 1: - raise ValueError("Found {} units in module".format(len(units))) - unit = units[0] - else: - unit = getattr(module, unit) - unit_inst = unit(dbh, **arguments) - unit_inst.run() - - def get_object(): line = sys.__stdin__.readline() return pyon.decode(line) @@ -65,27 +48,68 @@ class ParentPDB: set = make_parent_action("set_parameter", "name value") +init_rt_results = make_parent_action("init_rt_results", "data") +update_rt_results = make_parent_action("update_rt_results", "data") + + +def publish_rt_results(notifier, data): + update_rt_results(data) + + +def get_unit(file, unit): + module = file_import(file) + if unit is None: + units = [v for k, v in module.__dict__.items() + if k[0] != "_" + and isclass(v) + and issubclass(v, AutoDB) + and v is not AutoDB] + if len(units) != 1: + raise ValueError("Found {} units in module".format(len(units))) + return units[0] + else: + return getattr(module, unit) + + +def run(obj): + unit = get_unit(obj["file"], obj["unit"]) + + realtime_results = unit.realtime_results() + init_rt_results(realtime_results) + + realtime_results_set = set() + for rr in realtime_results.keys(): + if isinstance(rr, tuple): + for e in rr: + realtime_results_set.add(e) + else: + realtime_results_set.add(rr) + rdb = ResultDB(realtime_results_set) + rdb.realtime_data.publish = publish_rt_results + + dbh = DBHub(ParentDDB, ParentPDB, rdb) + try: + try: + unit_inst = unit(dbh, **obj["arguments"]) + unit_inst.run() + except Exception: + put_object({"action": "report_completed", + "status": "failed", + "message": traceback.format_exc()}) + else: + put_object({"action": "report_completed", + "status": "ok"}) + finally: + dbh.close() + + def main(): sys.stdout = sys.stderr while True: obj = get_object() put_object("ack") - - rdb = ResultDB() - dbh = DBHub(ParentDDB, ParentPDB, rdb) - try: - try: - run(dbh, **obj) - except Exception: - put_object({"action": "report_completed", - "status": "failed", - "message": traceback.format_exc()}) - else: - put_object({"action": "report_completed", - "status": "ok"}) - finally: - dbh.close() + run(obj) if __name__ == "__main__": main() diff --git a/frontend/artiq_master.py b/frontend/artiq_master.py index dbe47c74c..e2ace48c8 100755 --- a/frontend/artiq_master.py +++ b/frontend/artiq_master.py @@ -24,6 +24,14 @@ def _get_args(): return parser.parse_args() +def init_rt_results(data): + print("init realtime results: " + str(data)) + + +def update_rt_results(data): + print("update realtime results: " + str(data)) + + def main(): args = _get_args() @@ -38,7 +46,9 @@ def main(): scheduler = Scheduler({ "req_device": ddb.request, "req_parameter": pdb.request, - "set_parameter": pdb.set + "set_parameter": pdb.set, + "init_rt_results": init_rt_results, + "update_rt_results": update_rt_results }) loop.run_until_complete(scheduler.start()) atexit.register(lambda: loop.run_until_complete(scheduler.stop())) diff --git a/frontend/artiq_run.py b/frontend/artiq_run.py index 1d968a84b..b8855e9f9 100755 --- a/frontend/artiq_run.py +++ b/frontend/artiq_run.py @@ -4,6 +4,7 @@ import argparse import sys from inspect import isclass from operator import itemgetter +from itertools import chain from artiq.management.file_import import file_import from artiq.language.db import * @@ -64,7 +65,7 @@ def main(): ddb = FlatFileDB(args.ddb) pdb = FlatFileDB(args.pdb) pdb.hooks.append(SimpleParamLogger()) - rdb = ResultDB() + rdb = ResultDB(set()) dbh = DBHub(ddb, pdb, rdb) try: if args.elf: @@ -105,9 +106,10 @@ def main(): unit_inst = unit(dbh, **arguments) unit_inst.run() - if rdb.data: + if rdb.data.read or rdb.realtime_data.read: print("Results:") - for k, v in rdb.data.items(): + for k, v in chain(rdb.realtime_data.read.items(), + rdb.data.read.items()): print("{}: {}".format(k, v)) finally: dbh.close()