diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index e21fcda73..d1c19396a 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -69,6 +69,7 @@ class Scheduler: try: yield from worker.run() yield from worker.analyze() + yield from worker.write_results() finally: yield from worker.close() except Exception as e: diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 45cd3d58e..1ddc63307 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -23,15 +23,20 @@ class WorkerError(Exception): class Worker: def __init__(self, handlers, - send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0): + send_timeout=0.5, term_timeout=1.0, + prepare_timeout=15.0, results_timeout=15.0): self.handlers = handlers self.send_timeout = send_timeout - self.prepare_timeout = prepare_timeout self.term_timeout = term_timeout + self.prepare_timeout = prepare_timeout + self.results_timeout = results_timeout self.watchdogs = dict() # wid -> expiration (using time.monotonic) def create_watchdog(self, t): - avail = set(range(len(self.watchdogs) + 1)) \ + n_user_watchdogs = len(self.watchdogs) + if -1 in self.watchdogs: + n_user_watchdogs -= 1 + avail = set(range(n_user_watchdogs + 1)) \ - set(self.watchdogs.keys()) wid = next(iter(avail)) self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s") @@ -101,9 +106,12 @@ class Worker: return obj @asyncio.coroutine - def _handle_worker_requests(self, timeout_func): + def _handle_worker_requests(self): while True: - obj = yield from self._recv(timeout_func()) + try: + obj = yield from self._recv(self.watchdog_time()) + except WorkerTimeout: + raise WorkerWatchdogTimeout action = obj["action"] if action == "completed": return @@ -122,32 +130,40 @@ class Worker: "message": traceback.format_exc()} yield from self._send(reply, self.send_timeout) + @asyncio.coroutine + def _worker_action(self, obj, timeout=None): + if timeout is not None: + self.watchdogs[-1] = time.monotonic() + timeout + try: + yield from self._send(obj, self.send_timeout) + try: + yield from self._handle_worker_requests() + except WorkerTimeout: + raise WorkerWatchdogTimeout + finally: + if timeout is not None: + del self.watchdogs[-1] + @asyncio.coroutine def prepare(self, rid, run_params): yield from self._create_process() try: - obj = {"action": "prepare", "rid": rid, "run_params": run_params} - yield from self._send(obj, self.send_timeout) - end_time = time.monotonic() + self.prepare_timeout - yield from self._handle_worker_requests( - lambda: end_time - time.monotonic()) + yield from self._worker_action( + {"action": "prepare", "rid": rid, "run_params": run_params}, + self.prepare_timeout) except: yield from self.close() raise - @asyncio.coroutine - def _run_analyze(self, action): - obj = {"action": action} - yield from self._send(obj, self.send_timeout) - try: - yield from self._handle_worker_requests(self.watchdog_time) - except WorkerTimeout: - raise WorkerWatchdogTimeout - @asyncio.coroutine def run(self): - yield from self._run_analyze("run") + yield from self._worker_action({"action": "run"}) @asyncio.coroutine def analyze(self): - yield from self._run_analyze("analyze") + yield from self._worker_action({"action": "analyze"}) + + @asyncio.coroutine + def write_results(self): + yield from self._worker_action({"action": "write_results"}, + self.results_timeout) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 49ad8c132..f428093a4 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -118,6 +118,8 @@ def main(): put_object({"action": "completed"}) elif action == "analyze": exp_inst.analyze() + put_object({"action": "completed"}) + elif action == "write_results": f = get_hdf5_output(start_time, rid, exp.__name__) try: rdb.write_hdf5(f)