diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index d7f23686b..f62ffbaa5 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -62,7 +62,7 @@ def main(): "scheduler_run_timed": scheduler.run_timed, "scheduler_cancel_timed": scheduler.cancel_timed, } - loop.run_until_complete(scheduler.start()) + scheduler.start() atexit.register(lambda: loop.run_until_complete(scheduler.stop())) server_control = Server({ diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index d95f0038c..8a359c42b 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -25,17 +25,14 @@ class Scheduler: trids -= set(self.timed.read.keys()) return next(iter(trids)) - @asyncio.coroutine def start(self): self.task = asyncio.Task(self._schedule()) - yield from self.worker.create_process() @asyncio.coroutine def stop(self): self.task.cancel() yield from asyncio.wait([self.task]) del self.task - yield from self.worker.end_process() def run_queued(self, run_params): rid = self.new_rid() diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 11ebccb80..44bbdd302 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -7,11 +7,7 @@ import traceback from artiq.protocols import pyon -class WorkerFailed(Exception): - pass - - -class RunFailed(Exception): +class WorkerError(Exception): pass @@ -24,11 +20,22 @@ class Worker: self.term_timeout = term_timeout @asyncio.coroutine - def create_process(self): + def _create_process(self): self.process = yield from asyncio.create_subprocess_exec( sys.executable, "-m", "artiq.master.worker_impl", stdout=subprocess.PIPE, stdin=subprocess.PIPE) + @asyncio.coroutine + def _end_process(self): + if self.process.returncode is not None: + return + self.process.send_signal(signal.SIGTERM) + try: + yield from asyncio.wait_for( + self.process.wait(), timeout=self.term_timeout) + except asyncio.TimeoutError: + self.process.send_signal(signal.SIGKILL) + @asyncio.coroutine def _send(self, obj, timeout): line = pyon.encode(obj) @@ -39,9 +46,9 @@ class Worker: if fut is not (): # FIXME: why does Python return this? yield from asyncio.wait_for(fut, timeout=timeout) except asyncio.TimeoutError: - raise WorkerFailed("Timeout sending data from worker") + raise WorkerError("Timeout sending data from worker") except: - raise WorkerFailed("Failed to send data to worker") + raise WorkerError("Failed to send data to worker") @asyncio.coroutine def _recv(self, timeout): @@ -49,32 +56,33 @@ class Worker: line = yield from asyncio.wait_for( self.process.stdout.readline(), timeout=timeout) except asyncio.TimeoutError: - raise WorkerFailed("Timeout receiving data from worker") + raise WorkerError("Timeout receiving data from worker") if not line: - raise WorkerFailed( - "Worker ended unexpectedly while trying to receive data") + return None try: obj = pyon.decode(line.decode()) except: - raise WorkerFailed("Worker sent invalid PYON data") + raise WorkerError("Worker sent invalid PYON data") return obj @asyncio.coroutine def run(self, rid, run_params): - obj = {"rid": rid, "run_params": run_params} - yield from self._send(obj, self.send_timeout) - obj = yield from self._recv(self.start_reply_timeout) - if obj != "ack": - raise WorkerFailed("Incorrect acknowledgement") - while True: - obj = yield from self._recv(None) - action = obj["action"] - if action == "report_completed": - if obj["status"] != "ok": - raise RunFailed(obj["message"]) - else: - return - else: + yield from self._create_process() + + try: + obj = {"rid": rid, "run_params": run_params} + yield from self._send(obj, self.send_timeout) + obj = yield from self._recv(self.start_reply_timeout) + if obj != "ack": + raise WorkerError("Incorrect acknowledgement") + while True: + obj = yield from self._recv(None) + if obj is None: + if self.process.returncode != 0: + raise WorkerError("Worker finished with status code {}" + .format(self.process.returncode)) + break + action = obj["action"] del obj["action"] try: data = self.handlers[action](**obj) @@ -83,14 +91,5 @@ class Worker: reply = {"status": "failed", "message": traceback.format_exc()} yield from self._send(reply, self.send_timeout) - - @asyncio.coroutine - def end_process(self): - if self.process.returncode is not None: - return - self.process.send_signal(signal.SIGTERM) - try: - yield from asyncio.wait_for( - self.process.wait(), timeout=self.term_timeout) - except asyncio.TimeoutError: - self.process.send_signal(signal.SIGKILL) + finally: + yield from self._end_process() diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 6f92a871e..76d56e6c5 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -1,6 +1,5 @@ import sys import time -import traceback from artiq.protocols import pyon from artiq.tools import file_import @@ -80,20 +79,12 @@ def run(rid, run_params): rdb = ResultDB(init_rt_results, update_rt_results) dbh = DBHub(ParentDDB, ParentPDB, rdb) try: - try: - exp_inst = exp(dbh, - scheduler=Scheduler, - run_params=run_params, - **run_params["arguments"]) - exp_inst.run() - exp_inst.analyze() - except Exception: - put_object({"action": "report_completed", - "status": "failed", - "message": traceback.format_exc()}) - else: - put_object({"action": "report_completed", - "status": "ok"}) + exp_inst = exp(dbh, + scheduler=Scheduler, + run_params=run_params, + **run_params["arguments"]) + exp_inst.run() + exp_inst.analyze() finally: dbh.close() @@ -107,10 +98,10 @@ def run(rid, run_params): def main(): sys.stdout = sys.stderr - while True: - obj = get_object() - put_object("ack") - run(obj["rid"], obj["run_params"]) + obj = get_object() + put_object("ack") + run(obj["rid"], obj["run_params"]) + put_object({"action": "report_completed"}) if __name__ == "__main__": main()