forked from M-Labs/artiq
1
0
Fork 0

worker: factor timeouts

This commit is contained in:
Sebastien Bourdeauducq 2015-07-14 23:43:08 +02:00
parent a07f2473b0
commit 7770ab64f2
1 changed files with 12 additions and 17 deletions

View File

@ -26,14 +26,9 @@ class WorkerError(Exception):
class Worker: class Worker:
def __init__(self, handlers, def __init__(self, handlers, send_timeout=0.5):
send_timeout=0.5, term_timeout=1.0,
build_timeout=15.0, results_timeout=15.0):
self.handlers = handlers self.handlers = handlers
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.term_timeout = term_timeout
self.build_timeout = build_timeout
self.results_timeout = results_timeout
self.rid = None self.rid = None
self.process = None self.process = None
@ -74,7 +69,7 @@ class Worker:
self.io_lock.release() self.io_lock.release()
@asyncio.coroutine @asyncio.coroutine
def close(self): def close(self, term_timeout=1.0):
"""Interrupts any I/O with the worker process and terminates the """Interrupts any I/O with the worker process and terminates the
worker process. worker process.
@ -96,7 +91,7 @@ class Worker:
return return
obj = {"action": "terminate"} obj = {"action": "terminate"}
try: try:
yield from self._send(obj, self.send_timeout, cancellable=False) yield from self._send(obj, cancellable=False)
except: except:
logger.warning("failed to send terminate command to worker" logger.warning("failed to send terminate command to worker"
" (RID %d), killing", self.rid, exc_info=True) " (RID %d), killing", self.rid, exc_info=True)
@ -105,7 +100,7 @@ class Worker:
return return
try: try:
yield from asyncio_process_wait_timeout(self.process, yield from asyncio_process_wait_timeout(self.process,
self.term_timeout) term_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("worker did not exit (RID %d), killing", self.rid) logger.warning("worker did not exit (RID %d), killing", self.rid)
self.process.kill() self.process.kill()
@ -116,7 +111,7 @@ class Worker:
self.io_lock.release() self.io_lock.release()
@asyncio.coroutine @asyncio.coroutine
def _send(self, obj, timeout, cancellable=True): def _send(self, obj, cancellable=True):
assert self.io_lock.locked() assert self.io_lock.locked()
line = pyon.encode(obj) line = pyon.encode(obj)
self.process.stdin.write(line.encode()) self.process.stdin.write(line.encode())
@ -125,7 +120,7 @@ class Worker:
if cancellable: if cancellable:
ifs.append(self.closed.wait()) ifs.append(self.closed.wait())
fs = yield from asyncio_wait_or_cancel( fs = yield from asyncio_wait_or_cancel(
ifs, timeout=timeout, ifs, timeout=self.send_timeout,
return_when=asyncio.FIRST_COMPLETED) return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs): if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker") raise WorkerTimeout("Timeout sending data to worker")
@ -185,7 +180,7 @@ class Worker:
"message": traceback.format_exc()} "message": traceback.format_exc()}
yield from self.io_lock.acquire() yield from self.io_lock.acquire()
try: try:
yield from self._send(reply, self.send_timeout) yield from self._send(reply)
finally: finally:
self.io_lock.release() self.io_lock.release()
@ -196,7 +191,7 @@ class Worker:
try: try:
yield from self.io_lock.acquire() yield from self.io_lock.acquire()
try: try:
yield from self._send(obj, self.send_timeout) yield from self._send(obj)
finally: finally:
self.io_lock.release() self.io_lock.release()
try: try:
@ -209,7 +204,7 @@ class Worker:
return completed return completed
@asyncio.coroutine @asyncio.coroutine
def build(self, rid, pipeline_name, expid, priority): def build(self, rid, pipeline_name, expid, priority, timeout=15.0):
self.rid = rid self.rid = rid
yield from self._create_process() yield from self._create_process()
yield from self._worker_action( yield from self._worker_action(
@ -218,7 +213,7 @@ class Worker:
"pipeline_name": pipeline_name, "pipeline_name": pipeline_name,
"expid": expid, "expid": expid,
"priority": priority}, "priority": priority},
self.build_timeout) timeout)
@asyncio.coroutine @asyncio.coroutine
def prepare(self): def prepare(self):
@ -247,6 +242,6 @@ class Worker:
yield from self._worker_action({"action": "analyze"}) yield from self._worker_action({"action": "analyze"})
@asyncio.coroutine @asyncio.coroutine
def write_results(self): def write_results(self, timeout=15.0):
yield from self._worker_action({"action": "write_results"}, yield from self._worker_action({"action": "write_results"},
self.results_timeout) timeout)