forked from M-Labs/artiq
1
0
Fork 0

collect realtime results in master

This commit is contained in:
Sebastien Bourdeauducq 2015-01-13 19:12:19 +08:00
parent 77a7e592cb
commit 8ed6aeb64a
5 changed files with 92 additions and 41 deletions

View File

@ -134,6 +134,10 @@ class AutoDB:
else: else:
raise ValueError raise ValueError
@classmethod
def get_realtime_results():
return dict()
def build(self): def build(self):
"""This is called by ``__init__`` after the parameter initialization """This is called by ``__init__`` after the parameter initialization
is done. is done.

View File

@ -1,4 +1,4 @@
from collections import OrderedDict, defaultdict from collections import OrderedDict
import importlib import importlib
from time import time from time import time
@ -51,14 +51,25 @@ class SimpleHistory:
class ResultDB: class ResultDB:
def __init__(self): def __init__(self, realtime_results):
self.data = defaultdict(list) self.realtime_data = Notifier({x: [] for x in realtime_results})
self.data = Notifier(dict())
def request(self, name): def request(self, name):
return self.data[name] try:
return self.realtime_data[name]
except KeyError:
try:
return self.data[name]
except KeyError:
self.data[name] = []
return self.data[name]
def set(self, name, value): def set(self, name, value):
self.data[name] = value if name in self.realtime_data:
self.realtime_data[name] = value
else:
self.data[name] = value
def _create_device(desc, dbh): def _create_device(desc, dbh):

View File

@ -8,23 +8,6 @@ from artiq.language.db import AutoDB
from artiq.management.db import DBHub, ResultDB from artiq.management.db import DBHub, ResultDB
def run(dbh, file, unit, arguments):
module = file_import(file)
if unit is None:
units = [v for k, v in module.__dict__.items()
if k[0] != "_"
and isclass(v)
and issubclass(v, AutoDB)
and v is not AutoDB]
if len(units) != 1:
raise ValueError("Found {} units in module".format(len(units)))
unit = units[0]
else:
unit = getattr(module, unit)
unit_inst = unit(dbh, **arguments)
unit_inst.run()
def get_object(): def get_object():
line = sys.__stdin__.readline() line = sys.__stdin__.readline()
return pyon.decode(line) return pyon.decode(line)
@ -65,27 +48,68 @@ class ParentPDB:
set = make_parent_action("set_parameter", "name value") set = make_parent_action("set_parameter", "name value")
init_rt_results = make_parent_action("init_rt_results", "data")
update_rt_results = make_parent_action("update_rt_results", "data")
def publish_rt_results(notifier, data):
update_rt_results(data)
def get_unit(file, unit):
module = file_import(file)
if unit is None:
units = [v for k, v in module.__dict__.items()
if k[0] != "_"
and isclass(v)
and issubclass(v, AutoDB)
and v is not AutoDB]
if len(units) != 1:
raise ValueError("Found {} units in module".format(len(units)))
return units[0]
else:
return getattr(module, unit)
def run(obj):
unit = get_unit(obj["file"], obj["unit"])
realtime_results = unit.realtime_results()
init_rt_results(realtime_results)
realtime_results_set = set()
for rr in realtime_results.keys():
if isinstance(rr, tuple):
for e in rr:
realtime_results_set.add(e)
else:
realtime_results_set.add(rr)
rdb = ResultDB(realtime_results_set)
rdb.realtime_data.publish = publish_rt_results
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
try:
unit_inst = unit(dbh, **obj["arguments"])
unit_inst.run()
except Exception:
put_object({"action": "report_completed",
"status": "failed",
"message": traceback.format_exc()})
else:
put_object({"action": "report_completed",
"status": "ok"})
finally:
dbh.close()
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
while True: while True:
obj = get_object() obj = get_object()
put_object("ack") put_object("ack")
run(obj)
rdb = ResultDB()
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
try:
run(dbh, **obj)
except Exception:
put_object({"action": "report_completed",
"status": "failed",
"message": traceback.format_exc()})
else:
put_object({"action": "report_completed",
"status": "ok"})
finally:
dbh.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -24,6 +24,14 @@ def _get_args():
return parser.parse_args() return parser.parse_args()
def init_rt_results(data):
print("init realtime results: " + str(data))
def update_rt_results(data):
print("update realtime results: " + str(data))
def main(): def main():
args = _get_args() args = _get_args()
@ -38,7 +46,9 @@ def main():
scheduler = Scheduler({ scheduler = Scheduler({
"req_device": ddb.request, "req_device": ddb.request,
"req_parameter": pdb.request, "req_parameter": pdb.request,
"set_parameter": pdb.set "set_parameter": pdb.set,
"init_rt_results": init_rt_results,
"update_rt_results": update_rt_results
}) })
loop.run_until_complete(scheduler.start()) loop.run_until_complete(scheduler.start())
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

View File

@ -4,6 +4,7 @@ import argparse
import sys import sys
from inspect import isclass from inspect import isclass
from operator import itemgetter from operator import itemgetter
from itertools import chain
from artiq.management.file_import import file_import from artiq.management.file_import import file_import
from artiq.language.db import * from artiq.language.db import *
@ -64,7 +65,7 @@ def main():
ddb = FlatFileDB(args.ddb) ddb = FlatFileDB(args.ddb)
pdb = FlatFileDB(args.pdb) pdb = FlatFileDB(args.pdb)
pdb.hooks.append(SimpleParamLogger()) pdb.hooks.append(SimpleParamLogger())
rdb = ResultDB() rdb = ResultDB(set())
dbh = DBHub(ddb, pdb, rdb) dbh = DBHub(ddb, pdb, rdb)
try: try:
if args.elf: if args.elf:
@ -105,9 +106,10 @@ def main():
unit_inst = unit(dbh, **arguments) unit_inst = unit(dbh, **arguments)
unit_inst.run() unit_inst.run()
if rdb.data: if rdb.data.read or rdb.realtime_data.read:
print("Results:") print("Results:")
for k, v in rdb.data.items(): for k, v in chain(rdb.realtime_data.read.items(),
rdb.data.read.items()):
print("{}: {}".format(k, v)) print("{}: {}".format(k, v))
finally: finally:
dbh.close() dbh.close()