forked from M-Labs/artiq
master: watchdog support
Introduces a watchdog context manager to use in the experiment code that terminates the process with an error if it times out. The syntax is: with self.scheduler.watchdog(20*s): ... Watchdogs timers are implemented by the master process (and the worker communicates the necessary information about them) so that they can be enforced even if the worker crashes. They can be nested arbitrarily. During yields, all watchdog timers for the yielding worker are suspended [TODO]. Setting up watchdogs is not supported in kernels, however, a kernel can be called within watchdog contexts (and terminating the worker will terminate the kernel [TODO]). It is possible to implement a heartbeat mechanism using a watchdog, e.g.: for i in range(...): with self.scheduler.watchdog(...): .... Crashes/freezes within the iterator or the loop management would not be detected, but they should be rare enough.
This commit is contained in:
parent
f2134fa4b2
commit
d5795fd619
|
@ -51,7 +51,7 @@ def main():
|
||||||
def run_cb(rid, run_params):
|
def run_cb(rid, run_params):
|
||||||
rtr.current_group = run_params["rtr_group"]
|
rtr.current_group = run_params["rtr_group"]
|
||||||
scheduler = Scheduler(run_cb, get_last_rid() + 1)
|
scheduler = Scheduler(run_cb, get_last_rid() + 1)
|
||||||
scheduler.worker.handlers = {
|
scheduler.worker_handlers = {
|
||||||
"req_device": ddb.request,
|
"req_device": ddb.request,
|
||||||
"req_parameter": pdb.request,
|
"req_parameter": pdb.request,
|
||||||
"set_parameter": pdb.set,
|
"set_parameter": pdb.set,
|
||||||
|
|
|
@ -33,6 +33,17 @@ class SimpleParamLogger:
|
||||||
print("Parameter change: {} -> {}".format(name, value))
|
print("Parameter change: {} -> {}".format(name, value))
|
||||||
|
|
||||||
|
|
||||||
|
class DummyWatchdog:
|
||||||
|
def __init__(self, t):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class DummyScheduler:
|
class DummyScheduler:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.next_rid = 0
|
self.next_rid = 0
|
||||||
|
@ -57,6 +68,8 @@ class DummyScheduler:
|
||||||
def cancel_timed(self, trid):
|
def cancel_timed(self, trid):
|
||||||
print("Cancelling TRID {}".format(trid))
|
print("Cancelling TRID {}".format(trid))
|
||||||
|
|
||||||
|
watchdog = DummyWatchdog
|
||||||
|
|
||||||
|
|
||||||
def get_argparser():
|
def get_argparser():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
|
|
|
@ -7,8 +7,8 @@ from artiq.master.worker import Worker
|
||||||
|
|
||||||
class Scheduler:
|
class Scheduler:
|
||||||
def __init__(self, run_cb, first_rid):
|
def __init__(self, run_cb, first_rid):
|
||||||
|
self.worker_handlers = dict()
|
||||||
self.run_cb = run_cb
|
self.run_cb = run_cb
|
||||||
self.worker = Worker()
|
|
||||||
self.next_rid = first_rid
|
self.next_rid = first_rid
|
||||||
self.queue = Notifier([])
|
self.queue = Notifier([])
|
||||||
self.queue_modified = asyncio.Event()
|
self.queue_modified = asyncio.Event()
|
||||||
|
@ -63,16 +63,17 @@ class Scheduler:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _run(self, rid, run_params):
|
def _run(self, rid, run_params):
|
||||||
self.run_cb(rid, run_params)
|
self.run_cb(rid, run_params)
|
||||||
|
worker = Worker(self.worker_handlers)
|
||||||
try:
|
try:
|
||||||
yield from self.worker.prepare(rid, run_params)
|
yield from worker.prepare(rid, run_params)
|
||||||
try:
|
try:
|
||||||
yield from self.worker.run()
|
yield from worker.run()
|
||||||
yield from self.worker.analyze()
|
yield from worker.analyze()
|
||||||
finally:
|
finally:
|
||||||
yield from self.worker.close()
|
yield from worker.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("RID {} failed:".format(rid))
|
print("RID {} failed:".format(rid))
|
||||||
print(e)
|
print("{}: {}".format(e.__class__.__name__, e))
|
||||||
else:
|
else:
|
||||||
print("RID {} completed successfully".format(rid))
|
print("RID {} completed successfully".format(rid))
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,16 @@ import traceback
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from artiq.protocols import pyon
|
from artiq.protocols import pyon
|
||||||
|
from artiq.language.units import strip_unit
|
||||||
|
from artiq.tools import asyncio_process_wait_timeout
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerTimeout(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerWatchdogTimeout(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class WorkerError(Exception):
|
class WorkerError(Exception):
|
||||||
|
@ -12,12 +22,29 @@ class WorkerError(Exception):
|
||||||
|
|
||||||
|
|
||||||
class Worker:
|
class Worker:
|
||||||
def __init__(self,
|
def __init__(self, handlers,
|
||||||
send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
|
send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
|
||||||
self.handlers = dict()
|
self.handlers = handlers
|
||||||
self.send_timeout = send_timeout
|
self.send_timeout = send_timeout
|
||||||
self.prepare_timeout = prepare_timeout
|
self.prepare_timeout = prepare_timeout
|
||||||
self.term_timeout = term_timeout
|
self.term_timeout = term_timeout
|
||||||
|
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
|
||||||
|
|
||||||
|
def create_watchdog(self, t):
|
||||||
|
avail = set(range(len(self.watchdogs) + 1)) \
|
||||||
|
- set(self.watchdogs.keys())
|
||||||
|
wid = next(iter(avail))
|
||||||
|
self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s")
|
||||||
|
return wid
|
||||||
|
|
||||||
|
def delete_watchdog(self, wid):
|
||||||
|
del self.watchdogs[wid]
|
||||||
|
|
||||||
|
def watchdog_time(self):
|
||||||
|
if self.watchdogs:
|
||||||
|
return min(self.watchdogs.values()) - time.monotonic()
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _create_process(self):
|
def _create_process(self):
|
||||||
|
@ -39,8 +66,8 @@ class Worker:
|
||||||
self.process.kill()
|
self.process.kill()
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
yield from asyncio.wait_for(
|
yield from asyncio_process_wait_timeout(self.process,
|
||||||
self.process.wait(), timeout=self.term_timeout)
|
self.term_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
self.process.kill()
|
self.process.kill()
|
||||||
|
|
||||||
|
@ -54,7 +81,7 @@ class Worker:
|
||||||
if fut is not (): # FIXME: why does Python return this?
|
if fut is not (): # FIXME: why does Python return this?
|
||||||
yield from asyncio.wait_for(fut, timeout=timeout)
|
yield from asyncio.wait_for(fut, timeout=timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
raise WorkerError("Timeout sending data from worker")
|
raise WorkerTimeout("Timeout sending data from worker")
|
||||||
except:
|
except:
|
||||||
raise WorkerError("Failed to send data to worker")
|
raise WorkerError("Failed to send data to worker")
|
||||||
|
|
||||||
|
@ -64,7 +91,7 @@ class Worker:
|
||||||
line = yield from asyncio.wait_for(
|
line = yield from asyncio.wait_for(
|
||||||
self.process.stdout.readline(), timeout=timeout)
|
self.process.stdout.readline(), timeout=timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
raise WorkerError("Timeout receiving data from worker")
|
raise WorkerTimeout("Timeout receiving data from worker")
|
||||||
if not line:
|
if not line:
|
||||||
raise WorkerError("Worker ended while attempting to receive data")
|
raise WorkerError("Worker ended while attempting to receive data")
|
||||||
try:
|
try:
|
||||||
|
@ -74,20 +101,21 @@ class Worker:
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _handle_worker_requests(self, timeout):
|
def _handle_worker_requests(self, timeout_func):
|
||||||
if timeout is None:
|
|
||||||
end_time = None
|
|
||||||
else:
|
|
||||||
end_time = time.monotonic() + timeout
|
|
||||||
while True:
|
while True:
|
||||||
obj = yield from self._recv(None if end_time is None
|
obj = yield from self._recv(timeout_func())
|
||||||
else end_time - time.monotonic())
|
|
||||||
action = obj["action"]
|
action = obj["action"]
|
||||||
if action == "completed":
|
if action == "completed":
|
||||||
return
|
return
|
||||||
del obj["action"]
|
del obj["action"]
|
||||||
|
if action == "create_watchdog":
|
||||||
|
func = self.create_watchdog
|
||||||
|
elif action == "delete_watchdog":
|
||||||
|
func = self.delete_watchdog
|
||||||
|
else:
|
||||||
|
func = self.handlers[action]
|
||||||
try:
|
try:
|
||||||
data = self.handlers[action](**obj)
|
data = func(**obj)
|
||||||
reply = {"status": "ok", "data": data}
|
reply = {"status": "ok", "data": data}
|
||||||
except:
|
except:
|
||||||
reply = {"status": "failed",
|
reply = {"status": "failed",
|
||||||
|
@ -100,19 +128,26 @@ class Worker:
|
||||||
try:
|
try:
|
||||||
obj = {"action": "prepare", "rid": rid, "run_params": run_params}
|
obj = {"action": "prepare", "rid": rid, "run_params": run_params}
|
||||||
yield from self._send(obj, self.send_timeout)
|
yield from self._send(obj, self.send_timeout)
|
||||||
yield from self._handle_worker_requests(self.prepare_timeout)
|
end_time = time.monotonic() + self.prepare_timeout
|
||||||
|
yield from self._handle_worker_requests(
|
||||||
|
lambda: end_time - time.monotonic())
|
||||||
except:
|
except:
|
||||||
yield from self.close()
|
yield from self.close()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def run(self):
|
def _run_analyze(self, action):
|
||||||
obj = {"action": "run"}
|
obj = {"action": action}
|
||||||
yield from self._send(obj, self.send_timeout)
|
yield from self._send(obj, self.send_timeout)
|
||||||
yield from self._handle_worker_requests(None)
|
try:
|
||||||
|
yield from self._handle_worker_requests(self.watchdog_time)
|
||||||
|
except WorkerTimeout:
|
||||||
|
raise WorkerWatchdogTimeout
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def run(self):
|
||||||
|
yield from self._run_analyze("run")
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def analyze(self):
|
def analyze(self):
|
||||||
obj = {"action": "analyze"}
|
yield from self._run_analyze("analyze")
|
||||||
yield from self._send(obj, self.send_timeout)
|
|
||||||
yield from self._handle_worker_requests(None)
|
|
||||||
|
|
|
@ -51,12 +51,27 @@ init_rt_results = make_parent_action("init_rt_results", "description")
|
||||||
update_rt_results = make_parent_action("update_rt_results", "mod")
|
update_rt_results = make_parent_action("update_rt_results", "mod")
|
||||||
|
|
||||||
|
|
||||||
|
class Watchdog:
|
||||||
|
_create = make_parent_action("create_watchdog", "t")
|
||||||
|
_delete = make_parent_action("delete_watchdog", "wid")
|
||||||
|
|
||||||
|
def __init__(self, t):
|
||||||
|
self.t = t
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.wid = Watchdog._create(self.t)
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
Watchdog._delete(self.wid)
|
||||||
|
|
||||||
|
|
||||||
class Scheduler:
|
class Scheduler:
|
||||||
run_queued = make_parent_action("scheduler_run_queued", "run_params")
|
run_queued = make_parent_action("scheduler_run_queued", "run_params")
|
||||||
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
|
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
|
||||||
run_timed = make_parent_action("scheduler_run_timed",
|
run_timed = make_parent_action("scheduler_run_timed",
|
||||||
"run_params next_run")
|
"run_params next_run")
|
||||||
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")
|
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")
|
||||||
|
watchdog = Watchdog
|
||||||
|
|
||||||
|
|
||||||
def get_exp(file, exp):
|
def get_exp(file, exp):
|
||||||
|
|
|
@ -3,6 +3,8 @@ import importlib.machinery
|
||||||
import linecache
|
import linecache
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
import os.path
|
import os.path
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,3 +60,17 @@ def simple_network_args(parser, default_port):
|
||||||
|
|
||||||
def init_logger(args):
|
def init_logger(args):
|
||||||
logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10)
|
logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10)
|
||||||
|
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def asyncio_process_wait_timeout(process, timeout):
|
||||||
|
# In Python < 3.5, asyncio.wait_for(process.wait(), ...
|
||||||
|
# causes a futures.InvalidStateError inside asyncio if and when the
|
||||||
|
# process terminates after the timeout.
|
||||||
|
# Work around this problem.
|
||||||
|
end_time = time.monotonic() + timeout
|
||||||
|
r = True
|
||||||
|
while r:
|
||||||
|
r = yield from asyncio.wait_for(
|
||||||
|
process.stdout.read(1024),
|
||||||
|
timeout=end_time - time.monotonic())
|
||||||
|
|
Loading…
Reference in New Issue