forked from M-Labs/artiq
1
0
Fork 0

scheduler: catch worker exceptions in prepare and analyze stages

This commit is contained in:
Sebastien Bourdeauducq 2015-05-24 20:23:49 +08:00
parent d6ced1c780
commit a21373841c
1 changed files with 20 additions and 6 deletions

View File

@ -160,7 +160,8 @@ class TaskObject:
class PrepareStage(TaskObject): class PrepareStage(TaskObject):
def __init__(self, pool, outq): def __init__(self, deleter, pool, outq):
self.deleter = deleter
self.pool = pool self.pool = pool
self.outq = outq self.outq = outq
@ -185,7 +186,13 @@ class PrepareStage(TaskObject):
return None return None
if run.due_date is None or run.due_date < now: if run.due_date is None or run.due_date < now:
run.status = RunStatus.preparing run.status = RunStatus.preparing
yield from run.prepare() try:
yield from run.prepare()
except:
logger.warning("got worker exception in prepare stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
run.status = RunStatus.prepare_done run.status = RunStatus.prepare_done
yield from self.outq.put(run) yield from self.outq.put(run)
else: else:
@ -235,7 +242,8 @@ class RunStage(TaskObject):
run.status = RunStatus.running run.status = RunStatus.running
completed = yield from run.run() completed = yield from run.run()
except: except:
logger.warning("got worker exception, deleting RID %d", logger.warning("got worker exception in run stage, "
"deleting RID %d",
run.rid, exc_info=True) run.rid, exc_info=True)
self.deleter.delete(run.rid) self.deleter.delete(run.rid)
else: else:
@ -257,8 +265,14 @@ class AnalyzeStage(TaskObject):
while True: while True:
run = yield from self.inq.get() run = yield from self.inq.get()
run.status = RunStatus.analyzing run.status = RunStatus.analyzing
yield from run.analyze() try:
yield from run.write_results() yield from run.analyze()
yield from run.write_results()
except:
logger.warning("got worker exception in analyze stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
run.status = RunStatus.analyze_done run.status = RunStatus.analyze_done
self.deleter.delete(run.rid) self.deleter.delete(run.rid)
@ -266,7 +280,7 @@ class AnalyzeStage(TaskObject):
class Pipeline: class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier): def __init__(self, ridc, deleter, worker_handlers, notifier):
self.pool = RunPool(ridc, worker_handlers, notifier) self.pool = RunPool(ridc, worker_handlers, notifier)
self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1)) self._prepare = PrepareStage(deleter, self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1)) self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(deleter, self._run.outq) self._analyze = AnalyzeStage(deleter, self._run.outq)