From f2134fa4b230b7b88a97cc3ac107848601c14eec Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 9 Mar 2015 23:34:09 +0100 Subject: [PATCH] master,worker: split prepare/run/analyze --- artiq/master/scheduler.py | 7 ++- artiq/master/worker.py | 85 +++++++++++++++++++++++-------------- artiq/master/worker_impl.py | 65 ++++++++++++++++------------ 3 files changed, 98 insertions(+), 59 deletions(-) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 8a359c42b..f028b105b 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -64,7 +64,12 @@ class Scheduler: def _run(self, rid, run_params): self.run_cb(rid, run_params) try: - yield from self.worker.run(rid, run_params) + yield from self.worker.prepare(rid, run_params) + try: + yield from self.worker.run() + yield from self.worker.analyze() + finally: + yield from self.worker.close() except Exception as e: print("RID {} failed:".format(rid)) print(e) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 44bbdd302..8617c0046 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -1,8 +1,8 @@ import sys import asyncio import subprocess -import signal import traceback +import time from artiq.protocols import pyon @@ -13,10 +13,10 @@ class WorkerError(Exception): class Worker: def __init__(self, - send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): + send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0): self.handlers = dict() self.send_timeout = send_timeout - self.start_reply_timeout = start_reply_timeout + self.prepare_timeout = prepare_timeout self.term_timeout = term_timeout @asyncio.coroutine @@ -26,15 +26,23 @@ class Worker: stdout=subprocess.PIPE, stdin=subprocess.PIPE) @asyncio.coroutine - def _end_process(self): + def close(self): if self.process.returncode is not None: + if process.returncode != 0: + raise WorkerError("Worker finished with status code {}" + .format(process.returncode)) + return + obj = {"action": "terminate"} + try: + yield from self._send(obj, self.send_timeout) + except: + self.process.kill() return - self.process.send_signal(signal.SIGTERM) try: yield from asyncio.wait_for( self.process.wait(), timeout=self.term_timeout) except asyncio.TimeoutError: - self.process.send_signal(signal.SIGKILL) + self.process.kill() @asyncio.coroutine def _send(self, obj, timeout): @@ -58,7 +66,7 @@ class Worker: except asyncio.TimeoutError: raise WorkerError("Timeout receiving data from worker") if not line: - return None + raise WorkerError("Worker ended while attempting to receive data") try: obj = pyon.decode(line.decode()) except: @@ -66,30 +74,45 @@ class Worker: return obj @asyncio.coroutine - def run(self, rid, run_params): - yield from self._create_process() + def _handle_worker_requests(self, timeout): + if timeout is None: + end_time = None + else: + end_time = time.monotonic() + timeout + while True: + obj = yield from self._recv(None if end_time is None + else end_time - time.monotonic()) + action = obj["action"] + if action == "completed": + return + del obj["action"] + try: + data = self.handlers[action](**obj) + reply = {"status": "ok", "data": data} + except: + reply = {"status": "failed", + "message": traceback.format_exc()} + yield from self._send(reply, self.send_timeout) + @asyncio.coroutine + def prepare(self, rid, run_params): + yield from self._create_process() try: - obj = {"rid": rid, "run_params": run_params} + obj = {"action": "prepare", "rid": rid, "run_params": run_params} yield from self._send(obj, self.send_timeout) - obj = yield from self._recv(self.start_reply_timeout) - if obj != "ack": - raise WorkerError("Incorrect acknowledgement") - while True: - obj = yield from self._recv(None) - if obj is None: - if self.process.returncode != 0: - raise WorkerError("Worker finished with status code {}" - .format(self.process.returncode)) - break - action = obj["action"] - del obj["action"] - try: - data = self.handlers[action](**obj) - reply = {"status": "ok", "data": data} - except: - reply = {"status": "failed", - "message": traceback.format_exc()} - yield from self._send(reply, self.send_timeout) - finally: - yield from self._end_process() + yield from self._handle_worker_requests(self.prepare_timeout) + except: + yield from self.close() + raise + + @asyncio.coroutine + def run(self): + obj = {"action": "run"} + yield from self._send(obj, self.send_timeout) + yield from self._handle_worker_requests(None) + + @asyncio.coroutine + def analyze(self): + obj = {"action": "analyze"} + yield from self._send(obj, self.send_timeout) + yield from self._handle_worker_requests(None) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 76d56e6c5..7ce96eab7 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -72,36 +72,47 @@ def get_exp(file, exp): return getattr(module, exp) -def run(rid, run_params): - start_time = time.localtime() - exp = get_exp(run_params["file"], run_params["experiment"]) - - rdb = ResultDB(init_rt_results, update_rt_results) - dbh = DBHub(ParentDDB, ParentPDB, rdb) - try: - exp_inst = exp(dbh, - scheduler=Scheduler, - run_params=run_params, - **run_params["arguments"]) - exp_inst.run() - exp_inst.analyze() - finally: - dbh.close() - - f = get_hdf5_output(start_time, rid, exp.__name__) - try: - rdb.write_hdf5(f) - finally: - f.close() - - def main(): sys.stdout = sys.stderr - obj = get_object() - put_object("ack") - run(obj["rid"], obj["run_params"]) - put_object({"action": "report_completed"}) + start_time = None + rid = None + run_params = None + exp = None + exp_inst = None + + rdb = ResultDB(init_rt_results, update_rt_results) + dbh = DBHub(ParentDDB, ParentPDB, rdb) + + try: + while True: + obj = get_object() + action = obj["action"] + if action == "prepare": + start_time = time.localtime() + rid = obj["rid"] + run_params = obj["run_params"] + exp = get_exp(run_params["file"], run_params["experiment"]) + exp_inst = exp(dbh, + scheduler=Scheduler, + run_params=run_params, + **run_params["arguments"]) + put_object({"action": "completed"}) + elif action == "run": + exp_inst.run() + put_object({"action": "completed"}) + elif action == "analyze": + exp_inst.analyze() + f = get_hdf5_output(start_time, rid, exp.__name__) + try: + rdb.write_hdf5(f) + finally: + f.close() + put_object({"action": "completed"}) + elif action == "terminate": + break + finally: + dbh.close() if __name__ == "__main__": main()