From 7770ab64f288bb42733841ca028ca5d3bda98cc8 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 14 Jul 2015 23:43:08 +0200 Subject: [PATCH] worker: factor timeouts --- artiq/master/worker.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index e3b1827ed..923559d30 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -26,14 +26,9 @@ class WorkerError(Exception): class Worker: - def __init__(self, handlers, - send_timeout=0.5, term_timeout=1.0, - build_timeout=15.0, results_timeout=15.0): + def __init__(self, handlers, send_timeout=0.5): self.handlers = handlers self.send_timeout = send_timeout - self.term_timeout = term_timeout - self.build_timeout = build_timeout - self.results_timeout = results_timeout self.rid = None self.process = None @@ -74,7 +69,7 @@ class Worker: self.io_lock.release() @asyncio.coroutine - def close(self): + def close(self, term_timeout=1.0): """Interrupts any I/O with the worker process and terminates the worker process. @@ -96,7 +91,7 @@ class Worker: return obj = {"action": "terminate"} try: - yield from self._send(obj, self.send_timeout, cancellable=False) + yield from self._send(obj, cancellable=False) except: logger.warning("failed to send terminate command to worker" " (RID %d), killing", self.rid, exc_info=True) @@ -105,7 +100,7 @@ class Worker: return try: yield from asyncio_process_wait_timeout(self.process, - self.term_timeout) + term_timeout) except asyncio.TimeoutError: logger.warning("worker did not exit (RID %d), killing", self.rid) self.process.kill() @@ -116,7 +111,7 @@ class Worker: self.io_lock.release() @asyncio.coroutine - def _send(self, obj, timeout, cancellable=True): + def _send(self, obj, cancellable=True): assert self.io_lock.locked() line = pyon.encode(obj) self.process.stdin.write(line.encode()) @@ -125,7 +120,7 @@ class Worker: if cancellable: ifs.append(self.closed.wait()) fs = yield from asyncio_wait_or_cancel( - ifs, timeout=timeout, + ifs, timeout=self.send_timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): raise WorkerTimeout("Timeout sending data to worker") @@ -185,7 +180,7 @@ class Worker: "message": traceback.format_exc()} yield from self.io_lock.acquire() try: - yield from self._send(reply, self.send_timeout) + yield from self._send(reply) finally: self.io_lock.release() @@ -196,7 +191,7 @@ class Worker: try: yield from self.io_lock.acquire() try: - yield from self._send(obj, self.send_timeout) + yield from self._send(obj) finally: self.io_lock.release() try: @@ -209,7 +204,7 @@ class Worker: return completed @asyncio.coroutine - def build(self, rid, pipeline_name, expid, priority): + def build(self, rid, pipeline_name, expid, priority, timeout=15.0): self.rid = rid yield from self._create_process() yield from self._worker_action( @@ -218,7 +213,7 @@ class Worker: "pipeline_name": pipeline_name, "expid": expid, "priority": priority}, - self.build_timeout) + timeout) @asyncio.coroutine def prepare(self): @@ -247,6 +242,6 @@ class Worker: yield from self._worker_action({"action": "analyze"}) @asyncio.coroutine - def write_results(self): + def write_results(self, timeout=15.0): yield from self._worker_action({"action": "write_results"}, - self.results_timeout) + timeout)