From a21373841c864a4e143a1370143006acf518da90 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 24 May 2015 20:23:49 +0800 Subject: [PATCH] scheduler: catch worker exceptions in prepare and analyze stages --- artiq/master/scheduler.py | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 91fe06b47..30c19d3b4 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -160,7 +160,8 @@ class TaskObject: class PrepareStage(TaskObject): - def __init__(self, pool, outq): + def __init__(self, deleter, pool, outq): + self.deleter = deleter self.pool = pool self.outq = outq @@ -185,7 +186,13 @@ class PrepareStage(TaskObject): return None if run.due_date is None or run.due_date < now: run.status = RunStatus.preparing - yield from run.prepare() + try: + yield from run.prepare() + except: + logger.warning("got worker exception in prepare stage, " + "deleting RID %d", + run.rid, exc_info=True) + self.deleter.delete(run.rid) run.status = RunStatus.prepare_done yield from self.outq.put(run) else: @@ -235,7 +242,8 @@ class RunStage(TaskObject): run.status = RunStatus.running completed = yield from run.run() except: - logger.warning("got worker exception, deleting RID %d", + logger.warning("got worker exception in run stage, " + "deleting RID %d", run.rid, exc_info=True) self.deleter.delete(run.rid) else: @@ -257,8 +265,14 @@ class AnalyzeStage(TaskObject): while True: run = yield from self.inq.get() run.status = RunStatus.analyzing - yield from run.analyze() - yield from run.write_results() + try: + yield from run.analyze() + yield from run.write_results() + except: + logger.warning("got worker exception in analyze stage, " + "deleting RID %d", + run.rid, exc_info=True) + self.deleter.delete(run.rid) run.status = RunStatus.analyze_done self.deleter.delete(run.rid) @@ -266,7 +280,7 @@ class AnalyzeStage(TaskObject): class Pipeline: def __init__(self, ridc, deleter, worker_handlers, notifier): self.pool = RunPool(ridc, worker_handlers, notifier) - self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1)) + self._prepare = PrepareStage(deleter, self.pool, asyncio.Queue(maxsize=1)) self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1)) self._analyze = AnalyzeStage(deleter, self._run.outq)