From 9cd89a0c504a1e9c593451e94f9d0e95a19081f7 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 31 Dec 2014 17:41:22 +0800 Subject: [PATCH] master: handle dpdb in master process --- artiq/management/dpdb.py | 87 ++++++++++++++++++++++++++------- artiq/management/scheduler.py | 20 +++++--- artiq/management/worker.py | 28 ++++++++--- artiq/management/worker_impl.py | 53 +++++++++++++++----- frontend/artiq_master.py | 51 ++++++++++--------- 5 files changed, 171 insertions(+), 68 deletions(-) diff --git a/artiq/management/dpdb.py b/artiq/management/dpdb.py index dc4a3cfd7..f6868148b 100644 --- a/artiq/management/dpdb.py +++ b/artiq/management/dpdb.py @@ -2,6 +2,8 @@ from collections import OrderedDict import importlib from artiq.language.context import * +from artiq.management import pyon +from artiq.management.sync_struct import Notifier def create_device(desc, mvs): @@ -10,35 +12,46 @@ def create_device(desc, mvs): return device_class(mvs, **desc["parameters"]) -class DeviceParamDB: - def __init__(self, devices, parameters): - self.devices = devices - self.parameters = parameters +class DeviceParamSupplier: + """Supplies devices and parameters to AutoContext objects. + + """ + def __init__(self, req_device, req_parameter): + self.req_device = req_device + self.req_parameter = req_parameter self.active_devices = OrderedDict() def get_missing_value(self, name, kind, requester): if isinstance(kind, Device): if name in self.active_devices: return self.active_devices[name] - elif name in self.devices: - desc = self.devices[name] - while isinstance(desc, str): - # alias - desc = self.devices[desc] + else: + try: + desc = self.req_device(name) + except KeyError: + raise KeyError( + "Unknown device '{}' of type '{}' requested by {}" + .format(name, kind.type_hint, requester)) + try: + while isinstance(desc, str): + # alias + desc = self.req_device(desc) + except KeyError: + raise KeyError( + "Unknown alias '{}' for device '{}' of type '{}'" + " requested by {}" + .format(desc, name, kind.type_hint, requester)) dev = create_device(desc, self) self.active_devices[name] = dev return dev - else: - raise KeyError("Unknown device '{}' of type '{}'" - " requested by {}" - .format(name, kind.type_hint, requester)) elif isinstance(kind, Parameter): - if name in self.parameters: - return self.parameters[name] - elif kind.default is not NoDefault: - return kind.default - else: - raise KeyError("Unknown parameter: " + name) + try: + return self.req_parameter(name) + except KeyError: + if kind.default is not NoDefault: + return kind.default + else: + raise KeyError("Unknown parameter: " + name) else: raise NotImplementedError @@ -47,3 +60,39 @@ class DeviceParamDB: if hasattr(dev, "close"): dev.close() self.active_devices = OrderedDict() + + +class DeviceParamDB: + def __init__(self, ddb_file, pdb_file): + self.ddb_file = ddb_file + self.pdb_file = pdb_file + self.ddb = Notifier(pyon.load_file(self.ddb_file)) + self.pdb = Notifier(pyon.load_file(self.pdb_file)) + + def save_ddb(self): + pyon.store_file(self.ddb_file, self.ddb) + + def save_pdb(self): + pyon.store_file(self.pdb_file, self.pdb) + + def req_device(self, name): + return self.ddb.backing_struct[name] + + def set_device(self, name, description): + self.ddb[name] = description + self.save_ddb() + + def del_device(self, name): + del self.ddb[name] + self.save_ddb() + + def req_parameter(self, name): + return self.pdb.backing_struct[name] + + def set_parameter(self, name, value): + self.pdb[name] = value + self.save_pdb() + + def del_parameter(self, name): + del self.pdb[name] + self.save_pdb() diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index d4f8064a8..46b99a1e7 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -6,8 +6,8 @@ from artiq.management.worker import Worker class Scheduler: - def __init__(self, *args, **kwargs): - self.worker = Worker(*args, **kwargs) + def __init__(self, worker_handlers): + self.worker = Worker(worker_handlers) self.next_rid = 0 self.queue = Notifier([]) self.queue_count = asyncio.Semaphore(0) @@ -56,6 +56,16 @@ class Scheduler: def cancel_periodic(self, prid): del self.periodic[prid] + @asyncio.coroutine + def _run(self, rid, run_params, timeout): + try: + yield from self.worker.run(run_params, timeout) + except Exception as e: + print("RID {} failed:".format(rid)) + print(e) + else: + print("RID {} completed successfully".format(rid)) + @asyncio.coroutine def _run_periodic(self): while True: @@ -80,8 +90,7 @@ class Scheduler: rid = self.new_rid() self.queue.insert(0, (rid, run_params, timeout)) - result = yield from self.worker.run(run_params, timeout) - print(prid, rid, result) + yield from self._run(rid, run_params, timeout) del self.queue[0] @asyncio.coroutine @@ -101,6 +110,5 @@ class Scheduler: yield from self._run_periodic() if ev_queue in done: rid, run_params, timeout = self.queue.backing_struct[0] - result = yield from self.worker.run(run_params, timeout) - print(rid, result) + yield from self._run(rid, run_params, timeout) del self.queue[0] diff --git a/artiq/management/worker.py b/artiq/management/worker.py index e20dc6f36..e401fc6de 100644 --- a/artiq/management/worker.py +++ b/artiq/management/worker.py @@ -10,11 +10,14 @@ class WorkerFailed(Exception): pass +class RunFailed(Exception): + pass + + class Worker: - def __init__(self, ddb, pdb, + def __init__(self, handlers, send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): - self.ddb = ddb - self.pdb = pdb + self.handlers = handlers self.send_timeout = send_timeout self.start_reply_timeout = start_reply_timeout self.term_timeout = term_timeout @@ -23,7 +26,6 @@ class Worker: def create_process(self): self.process = yield from asyncio.create_subprocess_exec( sys.executable, "-m", "artiq.management.worker_impl", - self.ddb, self.pdb, stdout=subprocess.PIPE, stdin=subprocess.PIPE) @asyncio.coroutine @@ -62,8 +64,22 @@ class Worker: obj = yield from self._recv(self.start_reply_timeout) if obj != "ack": raise WorkerFailed("Incorrect acknowledgement") - result = yield from self._recv(result_timeout) - return result + while True: + obj = yield from self._recv(result_timeout) + action = obj["action"] + if action == "report_completed": + if obj["status"] != "ok": + raise RunFailed(obj["message"]) + else: + return + else: + del obj["action"] + try: + data = self.handlers[action](**obj) + reply = {"status": "ok", "data": data} + except: + reply = {"status": "failed"} + yield from self._send(reply, self.send_timeout) @asyncio.coroutine def end_process(self): diff --git a/artiq/management/worker_impl.py b/artiq/management/worker_impl.py index d3682f08d..6026a2d2c 100644 --- a/artiq/management/worker_impl.py +++ b/artiq/management/worker_impl.py @@ -1,13 +1,14 @@ import sys from inspect import isclass +import traceback from artiq.management import pyon from artiq.management.file_import import file_import from artiq.language.context import AutoContext -from artiq.management.dpdb import DeviceParamDB +from artiq.management.dpdb import DeviceParamSupplier -def run(dpdb, file, unit, function): +def run(dps, file, unit, function): module = file_import(file) if unit is None: units = [v for k, v in module.__dict__.items() @@ -20,11 +21,16 @@ def run(dpdb, file, unit, function): unit = units[0] else: unit = getattr(module, unit) - unit_inst = unit(dpdb) + unit_inst = unit(dps) f = getattr(unit_inst, function) f() +def get_object(): + line = sys.__stdin__.readline() + return pyon.decode(line) + + def put_object(obj): ds = pyon.encode(obj) sys.__stdout__.write(ds) @@ -32,24 +38,45 @@ def put_object(obj): sys.__stdout__.flush() +def req_device(name): + put_object({"action": "req_device", "name": name}) + obj = get_object() + if obj["status"] == "ok": + return obj["data"] + else: + raise KeyError + + +def req_parameter(name): + put_object({"action": "req_parameter", "name": name}) + obj = get_object() + if obj["status"] == "ok": + return obj["data"] + else: + raise KeyError + + def main(): sys.stdout = sys.stderr - devices = pyon.load_file(sys.argv[1]) - parameters = pyon.load_file(sys.argv[2]) - dpdb = DeviceParamDB(devices, parameters) + dps = DeviceParamSupplier(req_device, req_parameter) while True: - line = sys.__stdin__.readline() - obj = pyon.decode(line) + obj = get_object() put_object("ack") try: - run(dpdb, **obj) - except Exception as e: - put_object({"status": "failed", "message": str(e)}) - else: - put_object({"status": "ok"}) + try: + run(dps, **obj) + except Exception: + put_object({"action": "report_completed", + "status": "failed", + "message": traceback.format_exc()}) + else: + put_object({"action": "report_completed", + "status": "ok"}) + finally: + dps.close() if __name__ == "__main__": main() diff --git a/frontend/artiq_master.py b/frontend/artiq_master.py index b5a49e637..560c2c7c4 100755 --- a/frontend/artiq_master.py +++ b/frontend/artiq_master.py @@ -2,9 +2,11 @@ import asyncio import argparse +import atexit from artiq.management.pc_rpc import Server from artiq.management.sync_struct import Publisher +from artiq.management.dpdb import DeviceParamDB from artiq.management.scheduler import Scheduler @@ -24,31 +26,32 @@ def _get_args(): def main(): args = _get_args() + dpdb = DeviceParamDB("ddb.pyon", "pdb.pyon") + loop = asyncio.get_event_loop() - try: - scheduler = Scheduler("ddb.pyon", "pdb.pyon") - loop.run_until_complete(scheduler.start()) - try: - schedule_control = Server(scheduler, "schedule_control") - loop.run_until_complete(schedule_control.start( - args.bind, args.port_schedule_control)) - try: - schedule_notify = Publisher({ - "queue": scheduler.queue, - "periodic": scheduler.periodic - }) - loop.run_until_complete(schedule_notify.start( - args.bind, args.port_schedule_notify)) - try: - loop.run_forever() - finally: - loop.run_until_complete(schedule_notify.stop()) - finally: - loop.run_until_complete(schedule_control.stop()) - finally: - loop.run_until_complete(scheduler.stop()) - finally: - loop.close() + atexit.register(lambda: loop.close()) + + scheduler = Scheduler({ + "req_device": dpdb.req_device, + "req_parameter": dpdb.req_parameter + }) + loop.run_until_complete(scheduler.start()) + atexit.register(lambda: loop.run_until_complete(scheduler.stop())) + + schedule_control = Server(scheduler, "schedule_control") + loop.run_until_complete(schedule_control.start( + args.bind, args.port_schedule_control)) + atexit.register(lambda: loop.run_until_complete(schedule_control.stop())) + + schedule_notify = Publisher({ + "queue": scheduler.queue, + "periodic": scheduler.periodic + }) + loop.run_until_complete(schedule_notify.start( + args.bind, args.port_schedule_notify)) + atexit.register(lambda: loop.run_until_complete(schedule_notify.stop())) + + loop.run_forever() if __name__ == "__main__": main()