diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index f62ffbaa5..a41091feb 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -51,7 +51,7 @@ def main(): def run_cb(rid, run_params): rtr.current_group = run_params["rtr_group"] scheduler = Scheduler(run_cb, get_last_rid() + 1) - scheduler.worker.handlers = { + scheduler.worker_handlers = { "req_device": ddb.request, "req_parameter": pdb.request, "set_parameter": pdb.set, diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index b530f4e02..f982dff1b 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -33,6 +33,17 @@ class SimpleParamLogger: print("Parameter change: {} -> {}".format(name, value)) +class DummyWatchdog: + def __init__(self, t): + pass + + def __enter__(self): + pass + + def __exit__(self, type, value, traceback): + pass + + class DummyScheduler: def __init__(self): self.next_rid = 0 @@ -57,6 +68,8 @@ class DummyScheduler: def cancel_timed(self, trid): print("Cancelling TRID {}".format(trid)) + watchdog = DummyWatchdog + def get_argparser(): parser = argparse.ArgumentParser( diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index f028b105b..e21fcda73 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -7,8 +7,8 @@ from artiq.master.worker import Worker class Scheduler: def __init__(self, run_cb, first_rid): + self.worker_handlers = dict() self.run_cb = run_cb - self.worker = Worker() self.next_rid = first_rid self.queue = Notifier([]) self.queue_modified = asyncio.Event() @@ -63,16 +63,17 @@ class Scheduler: @asyncio.coroutine def _run(self, rid, run_params): self.run_cb(rid, run_params) + worker = Worker(self.worker_handlers) try: - yield from self.worker.prepare(rid, run_params) + yield from worker.prepare(rid, run_params) try: - yield from self.worker.run() - yield from self.worker.analyze() + yield from worker.run() + yield from worker.analyze() finally: - yield from self.worker.close() + yield from worker.close() except Exception as e: print("RID {} failed:".format(rid)) - print(e) + print("{}: {}".format(e.__class__.__name__, e)) else: print("RID {} completed successfully".format(rid)) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 8617c0046..45cd3d58e 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -5,6 +5,16 @@ import traceback import time from artiq.protocols import pyon +from artiq.language.units import strip_unit +from artiq.tools import asyncio_process_wait_timeout + + +class WorkerTimeout(Exception): + pass + + +class WorkerWatchdogTimeout(Exception): + pass class WorkerError(Exception): @@ -12,12 +22,29 @@ class WorkerError(Exception): class Worker: - def __init__(self, + def __init__(self, handlers, send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0): - self.handlers = dict() + self.handlers = handlers self.send_timeout = send_timeout self.prepare_timeout = prepare_timeout self.term_timeout = term_timeout + self.watchdogs = dict() # wid -> expiration (using time.monotonic) + + def create_watchdog(self, t): + avail = set(range(len(self.watchdogs) + 1)) \ + - set(self.watchdogs.keys()) + wid = next(iter(avail)) + self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s") + return wid + + def delete_watchdog(self, wid): + del self.watchdogs[wid] + + def watchdog_time(self): + if self.watchdogs: + return min(self.watchdogs.values()) - time.monotonic() + else: + return None @asyncio.coroutine def _create_process(self): @@ -39,8 +66,8 @@ class Worker: self.process.kill() return try: - yield from asyncio.wait_for( - self.process.wait(), timeout=self.term_timeout) + yield from asyncio_process_wait_timeout(self.process, + self.term_timeout) except asyncio.TimeoutError: self.process.kill() @@ -54,7 +81,7 @@ class Worker: if fut is not (): # FIXME: why does Python return this? yield from asyncio.wait_for(fut, timeout=timeout) except asyncio.TimeoutError: - raise WorkerError("Timeout sending data from worker") + raise WorkerTimeout("Timeout sending data from worker") except: raise WorkerError("Failed to send data to worker") @@ -64,7 +91,7 @@ class Worker: line = yield from asyncio.wait_for( self.process.stdout.readline(), timeout=timeout) except asyncio.TimeoutError: - raise WorkerError("Timeout receiving data from worker") + raise WorkerTimeout("Timeout receiving data from worker") if not line: raise WorkerError("Worker ended while attempting to receive data") try: @@ -74,20 +101,21 @@ class Worker: return obj @asyncio.coroutine - def _handle_worker_requests(self, timeout): - if timeout is None: - end_time = None - else: - end_time = time.monotonic() + timeout + def _handle_worker_requests(self, timeout_func): while True: - obj = yield from self._recv(None if end_time is None - else end_time - time.monotonic()) + obj = yield from self._recv(timeout_func()) action = obj["action"] if action == "completed": return del obj["action"] + if action == "create_watchdog": + func = self.create_watchdog + elif action == "delete_watchdog": + func = self.delete_watchdog + else: + func = self.handlers[action] try: - data = self.handlers[action](**obj) + data = func(**obj) reply = {"status": "ok", "data": data} except: reply = {"status": "failed", @@ -100,19 +128,26 @@ class Worker: try: obj = {"action": "prepare", "rid": rid, "run_params": run_params} yield from self._send(obj, self.send_timeout) - yield from self._handle_worker_requests(self.prepare_timeout) + end_time = time.monotonic() + self.prepare_timeout + yield from self._handle_worker_requests( + lambda: end_time - time.monotonic()) except: yield from self.close() raise @asyncio.coroutine - def run(self): - obj = {"action": "run"} + def _run_analyze(self, action): + obj = {"action": action} yield from self._send(obj, self.send_timeout) - yield from self._handle_worker_requests(None) + try: + yield from self._handle_worker_requests(self.watchdog_time) + except WorkerTimeout: + raise WorkerWatchdogTimeout + + @asyncio.coroutine + def run(self): + yield from self._run_analyze("run") @asyncio.coroutine def analyze(self): - obj = {"action": "analyze"} - yield from self._send(obj, self.send_timeout) - yield from self._handle_worker_requests(None) + yield from self._run_analyze("analyze") diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 7ce96eab7..49ad8c132 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -51,12 +51,27 @@ init_rt_results = make_parent_action("init_rt_results", "description") update_rt_results = make_parent_action("update_rt_results", "mod") +class Watchdog: + _create = make_parent_action("create_watchdog", "t") + _delete = make_parent_action("delete_watchdog", "wid") + + def __init__(self, t): + self.t = t + + def __enter__(self): + self.wid = Watchdog._create(self.t) + + def __exit__(self, type, value, traceback): + Watchdog._delete(self.wid) + + class Scheduler: run_queued = make_parent_action("scheduler_run_queued", "run_params") cancel_queued = make_parent_action("scheduler_cancel_queued", "rid") run_timed = make_parent_action("scheduler_run_timed", "run_params next_run") cancel_timed = make_parent_action("scheduler_cancel_timed", "trid") + watchdog = Watchdog def get_exp(file, exp): diff --git a/artiq/tools.py b/artiq/tools.py index e63d4b109..5455996e4 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -3,6 +3,8 @@ import importlib.machinery import linecache import logging import sys +import asyncio +import time import os.path @@ -58,3 +60,17 @@ def simple_network_args(parser, default_port): def init_logger(args): logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10) + + +@asyncio.coroutine +def asyncio_process_wait_timeout(process, timeout): + # In Python < 3.5, asyncio.wait_for(process.wait(), ... + # causes a futures.InvalidStateError inside asyncio if and when the + # process terminates after the timeout. + # Work around this problem. + end_time = time.monotonic() + timeout + r = True + while r: + r = yield from asyncio.wait_for( + process.stdout.read(1024), + timeout=end_time - time.monotonic())