test_scheduler: Test for hang when exiting with running experiments

The respective code path in artiq.master.scheduler._mk_worker_method
wasn't previously covered.
This commit is contained in:
David Nadlinger 2019-01-19 21:36:42 +00:00
parent 0dab7ecd73
commit 4ba4e9c540
1 changed files with 45 additions and 2 deletions

View File

@ -42,7 +42,7 @@ def _get_expid(name):
def _get_basic_steps(rid, expid, priority=0, flush=False): def _get_basic_steps(rid, expid, priority=0, flush=False):
return [ return [
{"action": "setitem", "key": rid, "value": {"action": "setitem", "key": rid, "value":
{"pipeline": "main", "status": "pending", "priority": priority, {"pipeline": "main", "status": "pending", "priority": priority,
"expid": expid, "due_date": None, "flush": flush, "expid": expid, "due_date": None, "flush": flush,
"repo_msg": None}, "repo_msg": None},
@ -152,7 +152,7 @@ class SchedulerCase(unittest.TestCase):
if mod == {"path": [0], if mod == {"path": [0],
"value": "deleting", "value": "deleting",
"key": "status", "key": "status",
"action": "setitem"}: "action": "setitem"}:
background_completed.set() background_completed.set()
if mod == {"path": [1], if mod == {"path": [1],
"value": "prepare_done", "value": "prepare_done",
@ -185,6 +185,49 @@ class SchedulerCase(unittest.TestCase):
loop.run_until_complete(scheduler.stop()) loop.run_until_complete(scheduler.stop())
def test_close_with_active_runs(self):
"""Check scheduler exits with experiments still running"""
loop = self.loop
scheduler = Scheduler(_RIDCounter(0), {}, None)
expid_bg = _get_expid("BackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed.
expid_bg["log_level"] = logging.CRITICAL
expid = _get_expid("EmptyExperiment")
background_running = asyncio.Event()
empty_ready = asyncio.Event()
background_completed = asyncio.Event()
def notify(mod):
if mod == {"path": [0],
"value": "running",
"key": "status",
"action": "setitem"}:
background_running.set()
if mod == {"path": [0],
"value": "deleting",
"key": "status",
"action": "setitem"}:
background_completed.set()
if mod == {"path": [1],
"value": "prepare_done",
"key": "status",
"action": "setitem"}:
empty_ready.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait())
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(empty_ready.wait())
# At this point, (at least) BackgroundExperiment is still running; make
# sure we can stop the scheduler without hanging.
loop.run_until_complete(scheduler.stop())
def test_flush(self): def test_flush(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None) scheduler = Scheduler(_RIDCounter(0), dict(), None)