diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index a1c16715f..c5be11665 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -87,17 +87,12 @@ class Run: self._notifier[self.rid]["status"] = self._status.name self._state_changed.notify() - # The run with the largest priority_key is to be scheduled first - def priority_key(self, now=None): - if self.due_date is None: - due_date_k = 0 - else: - due_date_k = -self.due_date - if now is not None and self.due_date is not None: - runnable = int(now > self.due_date) - else: - runnable = 1 - return (runnable, self.priority, due_date_k, -self.rid) + def priority_key(self): + """Return a comparable value that defines a run priority order. + + Applies only to runs the due date of which has already elapsed. + """ + return (self.priority, -(self.due_date or 0), -self.rid) async def close(self): # called through pool @@ -162,36 +157,37 @@ class PrepareStage(TaskObject): self.delete_cb = delete_cb def _get_run(self): - """If a run should get prepared now, return it. - Otherwise, return a float representing the time before the next timed - run becomes due, or None if there is no such run.""" + """If a run should get prepared now, return it. Otherwise, return a + float giving the time until the next check, or None if no time-based + check is required. + + The latter can be the case if there are no due-date runs, or none + of them are going to become next-in-line before further pool state + changes (which will also cause a re-evaluation). + """ + pending_runs = list( + filter(lambda r: r.status == RunStatus.pending, + self.pool.runs.values())) + now = time() - pending_runs = filter(lambda r: r.status == RunStatus.pending, - self.pool.runs.values()) - try: - candidate = max(pending_runs, key=lambda r: r.priority_key(now)) - except ValueError: - # pending_runs is an empty sequence - return None + def is_runnable(r): + return (r.due_date or 0) < now - prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done, - self.pool.runs.values()) - try: - top_prepared_run = max(prepared_runs, - key=lambda r: r.priority_key()) - except ValueError: - # there are no existing prepared runs - go ahead with - pass - else: - # prepare (as well) only if it has higher priority than - # the highest priority prepared run - if top_prepared_run.priority_key() >= candidate.priority_key(): - return None + prepared_max = max((r.priority_key() for r in self.pool.runs.values() + if r.status == RunStatus.prepare_done), + default=None) + def takes_precedence(r): + return prepared_max is None or r.priority_key() > prepared_max - if candidate.due_date is None or candidate.due_date < now: + candidate = max(filter(is_runnable, pending_runs), + key=lambda r: r.priority_key(), + default=None) + if candidate is not None and takes_precedence(candidate): return candidate - else: - return candidate.due_date - now + + return min((r.due_date - now for r in pending_runs + if (not is_runnable(r) and takes_precedence(r))), + default=None) async def _do(self): while True: diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index e9cc3ceee..ad4f243bd 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -28,7 +28,18 @@ class BackgroundExperiment(EnvExperiment): sleep(0.2) except TerminationRequested: self.set_dataset("termination_ok", True, - broadcast=True, save=False) + broadcast=True, archive=False) + + +class CheckPauseBackgroundExperiment(EnvExperiment): + def build(self): + self.setattr_device("scheduler") + + def run(self): + while True: + while not self.scheduler.check_pause(): + sleep(0.2) + self.scheduler.pause() def _get_expid(name): @@ -117,6 +128,161 @@ class SchedulerCase(unittest.TestCase): scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) + def test_pending_priority(self): + """Check due dates take precedence over priorities when waiting to + prepare.""" + loop = self.loop + handlers = {} + scheduler = Scheduler(_RIDCounter(0), handlers, None) + handlers["scheduler_check_pause"] = scheduler.check_pause + + expid_empty = _get_expid("EmptyExperiment") + + expid_bg = _get_expid("CheckPauseBackgroundExperiment") + # Suppress the SystemExit backtrace when worker process is killed. + expid_bg["log_level"] = logging.CRITICAL + + high_priority = 3 + middle_priority = 2 + low_priority = 1 + late = time() + 100000 + early = time() + 1 + + expect = [ + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": low_priority, + "pipeline": "main", + "due_date": None, + "status": "pending", + "expid": expid_bg, + "flush": False + }, + "key": 0 + }, + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": high_priority, + "pipeline": "main", + "due_date": late, + "status": "pending", + "expid": expid_empty, + "flush": False + }, + "key": 1 + }, + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": middle_priority, + "pipeline": "main", + "due_date": early, + "status": "pending", + "expid": expid_empty, + "flush": False + }, + "key": 2 + }, + { + "path": [0], + "action": "setitem", + "value": "preparing", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "prepare_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "preparing", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "prepare_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "paused", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "run_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "analyzing", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "deleting", + "key": "status" + }, + { + "path": [], + "action": "delitem", + "key": 2 + }, + ] + done = asyncio.Event() + expect_idx = 0 + def notify(mod): + nonlocal expect_idx + self.assertEqual(mod, expect[expect_idx]) + expect_idx += 1 + if expect_idx >= len(expect): + done.set() + scheduler.notifier.publish = notify + + scheduler.start() + + scheduler.submit("main", expid_bg, low_priority) + scheduler.submit("main", expid_empty, high_priority, late) + scheduler.submit("main", expid_empty, middle_priority, early) + + loop.run_until_complete(done.wait()) + scheduler.notifier.publish = None + loop.run_until_complete(scheduler.stop()) + def test_pause(self): loop = self.loop