diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index b95a97d9b..e9cc3ceee 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -42,7 +42,7 @@ def _get_expid(name): def _get_basic_steps(rid, expid, priority=0, flush=False): return [ - {"action": "setitem", "key": rid, "value": + {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": priority, "expid": expid, "due_date": None, "flush": flush, "repo_msg": None}, @@ -152,7 +152,7 @@ class SchedulerCase(unittest.TestCase): if mod == {"path": [0], "value": "deleting", "key": "status", - "action": "setitem"}: + "action": "setitem"}: background_completed.set() if mod == {"path": [1], "value": "prepare_done", @@ -185,6 +185,49 @@ class SchedulerCase(unittest.TestCase): loop.run_until_complete(scheduler.stop()) + def test_close_with_active_runs(self): + """Check scheduler exits with experiments still running""" + loop = self.loop + + scheduler = Scheduler(_RIDCounter(0), {}, None) + + expid_bg = _get_expid("BackgroundExperiment") + # Suppress the SystemExit backtrace when worker process is killed. + expid_bg["log_level"] = logging.CRITICAL + expid = _get_expid("EmptyExperiment") + + background_running = asyncio.Event() + empty_ready = asyncio.Event() + background_completed = asyncio.Event() + def notify(mod): + if mod == {"path": [0], + "value": "running", + "key": "status", + "action": "setitem"}: + background_running.set() + if mod == {"path": [0], + "value": "deleting", + "key": "status", + "action": "setitem"}: + background_completed.set() + if mod == {"path": [1], + "value": "prepare_done", + "key": "status", + "action": "setitem"}: + empty_ready.set() + scheduler.notifier.publish = notify + + scheduler.start() + scheduler.submit("main", expid_bg, -99, None, False) + loop.run_until_complete(background_running.wait()) + + scheduler.submit("main", expid, 0, None, False) + loop.run_until_complete(empty_ready.wait()) + + # At this point, (at least) BackgroundExperiment is still running; make + # sure we can stop the scheduler without hanging. + loop.run_until_complete(scheduler.stop()) + def test_flush(self): loop = self.loop scheduler = Scheduler(_RIDCounter(0), dict(), None)