artiq/artiq/master/worker_impl.py

134 lines
3.6 KiB
Python
Raw Normal View History

import sys
import time
from inspect import isclass
2014-12-31 17:41:22 +08:00
import traceback
from artiq.protocols import pyon
from artiq.tools import file_import
2015-02-22 05:28:18 +08:00
from artiq.master.worker_db import DBHub, ResultDB
from artiq.master.results import get_hdf5_output
2014-12-31 17:41:22 +08:00
def get_object():
line = sys.__stdin__.readline()
return pyon.decode(line)
def put_object(obj):
2014-10-25 16:31:34 +08:00
ds = pyon.encode(obj)
sys.__stdout__.write(ds)
sys.__stdout__.write("\n")
sys.__stdout__.flush()
2015-01-07 17:50:05 +08:00
class ParentActionError(Exception):
pass
2014-12-31 17:41:22 +08:00
2015-01-07 17:50:05 +08:00
def make_parent_action(action, argnames, exception=ParentActionError):
argnames = argnames.split()
def parent_action(*args):
request = {"action": action}
for argname, arg in zip(argnames, args):
request[argname] = arg
put_object(request)
reply = get_object()
if reply["status"] == "ok":
return reply["data"]
else:
raise exception(reply["message"])
2015-01-07 17:50:05 +08:00
return parent_action
class ParentDDB:
request = make_parent_action("req_device", "name", KeyError)
class ParentPDB:
request = make_parent_action("req_parameter", "name", KeyError)
set = make_parent_action("set_parameter", "name value")
2014-12-31 17:41:22 +08:00
2015-01-14 11:37:08 +08:00
init_rt_results = make_parent_action("init_rt_results", "description")
update_rt_results = make_parent_action("update_rt_results", "mod")
2015-01-13 19:12:19 +08:00
def publish_rt_results(notifier, data):
update_rt_results(data)
class Scheduler:
run_queued = make_parent_action("scheduler_run_queued", "run_params")
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed",
"run_params next_run")
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")
2015-01-13 19:12:19 +08:00
def get_unit(file, unit):
module = file_import(file)
if unit is None:
units = [v for k, v in module.__dict__.items()
if isclass(v) and hasattr(v, "__artiq_unit__")]
2015-01-13 19:12:19 +08:00
if len(units) != 1:
raise ValueError("Found {} units in module".format(len(units)))
return units[0]
else:
return getattr(module, unit)
def run(rid, run_params):
start_time = time.localtime()
unit = get_unit(run_params["file"], run_params["unit"])
2015-01-13 19:12:19 +08:00
realtime_results = unit.realtime_results()
init_rt_results(realtime_results)
realtime_results_set = set()
for rr in realtime_results.keys():
if isinstance(rr, tuple):
for e in rr:
realtime_results_set.add(e)
else:
realtime_results_set.add(rr)
rdb = ResultDB(realtime_results_set)
rdb.realtime_data.publish = publish_rt_results
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
try:
unit_inst = unit(dbh,
scheduler=Scheduler,
run_params=run_params,
**run_params["arguments"])
2015-01-13 19:12:19 +08:00
unit_inst.run()
if hasattr(unit_inst, "analyze"):
unit_inst.analyze()
2015-01-13 19:12:19 +08:00
except Exception:
put_object({"action": "report_completed",
"status": "failed",
"message": traceback.format_exc()})
else:
put_object({"action": "report_completed",
"status": "ok"})
finally:
dbh.close()
2015-02-22 05:28:18 +08:00
f = get_hdf5_output(start_time, rid, unit.__name__)
try:
rdb.write_hdf5(f)
finally:
f.close()
2015-01-13 19:12:19 +08:00
def main():
sys.stdout = sys.stderr
while True:
2014-12-31 17:41:22 +08:00
obj = get_object()
put_object("ack")
run(obj["rid"], obj["run_params"])
if __name__ == "__main__":
main()