From 966ed5d0135cd32f7f4cdbba049cc28a394c6884 Mon Sep 17 00:00:00 2001 From: David Nadlinger Date: Fri, 19 Jun 2020 05:16:08 +0100 Subject: [PATCH] master/scheduler: Fix priority/due date precedence order when waiting to prepare MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See test case – previously, the highest-priority pending run would be used to calculate the timeout, rather than the earliest one. This probably managed to go undetected for that long as any unrelated changes to the pipeline (e.g. new submissions, or experiments pausing) would also cause _get_run() to be re-evaluated. --- artiq/master/scheduler.py | 70 +++++++-------- artiq/test/test_scheduler.py | 168 ++++++++++++++++++++++++++++++++++- 2 files changed, 200 insertions(+), 38 deletions(-) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 7da7c401d..d978fa2f9 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -87,17 +87,12 @@ class Run: self._notifier[self.rid]["status"] = self._status.name self._state_changed.notify() - # The run with the largest priority_key is to be scheduled first - def priority_key(self, now=None): - if self.due_date is None: - due_date_k = 0 - else: - due_date_k = -self.due_date - if now is not None and self.due_date is not None: - runnable = int(now > self.due_date) - else: - runnable = 1 - return (runnable, self.priority, due_date_k, -self.rid) + def priority_key(self): + """Return a comparable value that defines a run priority order. + + Applies only to runs the due date of which has already elapsed. + """ + return (self.priority, -(self.due_date or 0), -self.rid) async def close(self): # called through pool @@ -161,36 +156,37 @@ class PrepareStage(TaskObject): self.delete_cb = delete_cb def _get_run(self): - """If a run should get prepared now, return it. - Otherwise, return a float representing the time before the next timed - run becomes due, or None if there is no such run.""" + """If a run should get prepared now, return it. Otherwise, return a + float giving the time until the next check, or None if no time-based + check is required. + + The latter can be the case if there are no due-date runs, or none + of them are going to become next-in-line before further pool state + changes (which will also cause a re-evaluation). + """ + pending_runs = list( + filter(lambda r: r.status == RunStatus.pending, + self.pool.runs.values())) + now = time() - pending_runs = filter(lambda r: r.status == RunStatus.pending, - self.pool.runs.values()) - try: - candidate = max(pending_runs, key=lambda r: r.priority_key(now)) - except ValueError: - # pending_runs is an empty sequence - return None + def is_runnable(r): + return (r.due_date or 0) < now - prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done, - self.pool.runs.values()) - try: - top_prepared_run = max(prepared_runs, - key=lambda r: r.priority_key()) - except ValueError: - # there are no existing prepared runs - go ahead with - pass - else: - # prepare (as well) only if it has higher priority than - # the highest priority prepared run - if top_prepared_run.priority_key() >= candidate.priority_key(): - return None + prepared_max = max((r.priority_key() for r in self.pool.runs.values() + if r.status == RunStatus.prepare_done), + default=None) + def takes_precedence(r): + return prepared_max is None or r.priority_key() > prepared_max - if candidate.due_date is None or candidate.due_date < now: + candidate = max(filter(is_runnable, pending_runs), + key=lambda r: r.priority_key(), + default=None) + if candidate is not None and takes_precedence(candidate): return candidate - else: - return candidate.due_date - now + + return min((r.due_date - now for r in pending_runs + if (not is_runnable(r) and takes_precedence(r))), + default=None) async def _do(self): while True: diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index e9cc3ceee..ad4f243bd 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -28,7 +28,18 @@ class BackgroundExperiment(EnvExperiment): sleep(0.2) except TerminationRequested: self.set_dataset("termination_ok", True, - broadcast=True, save=False) + broadcast=True, archive=False) + + +class CheckPauseBackgroundExperiment(EnvExperiment): + def build(self): + self.setattr_device("scheduler") + + def run(self): + while True: + while not self.scheduler.check_pause(): + sleep(0.2) + self.scheduler.pause() def _get_expid(name): @@ -117,6 +128,161 @@ class SchedulerCase(unittest.TestCase): scheduler.notifier.publish = None loop.run_until_complete(scheduler.stop()) + def test_pending_priority(self): + """Check due dates take precedence over priorities when waiting to + prepare.""" + loop = self.loop + handlers = {} + scheduler = Scheduler(_RIDCounter(0), handlers, None) + handlers["scheduler_check_pause"] = scheduler.check_pause + + expid_empty = _get_expid("EmptyExperiment") + + expid_bg = _get_expid("CheckPauseBackgroundExperiment") + # Suppress the SystemExit backtrace when worker process is killed. + expid_bg["log_level"] = logging.CRITICAL + + high_priority = 3 + middle_priority = 2 + low_priority = 1 + late = time() + 100000 + early = time() + 1 + + expect = [ + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": low_priority, + "pipeline": "main", + "due_date": None, + "status": "pending", + "expid": expid_bg, + "flush": False + }, + "key": 0 + }, + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": high_priority, + "pipeline": "main", + "due_date": late, + "status": "pending", + "expid": expid_empty, + "flush": False + }, + "key": 1 + }, + { + "path": [], + "action": "setitem", + "value": { + "repo_msg": None, + "priority": middle_priority, + "pipeline": "main", + "due_date": early, + "status": "pending", + "expid": expid_empty, + "flush": False + }, + "key": 2 + }, + { + "path": [0], + "action": "setitem", + "value": "preparing", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "prepare_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "preparing", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "prepare_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "paused", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "run_done", + "key": "status" + }, + { + "path": [0], + "action": "setitem", + "value": "running", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "analyzing", + "key": "status" + }, + { + "path": [2], + "action": "setitem", + "value": "deleting", + "key": "status" + }, + { + "path": [], + "action": "delitem", + "key": 2 + }, + ] + done = asyncio.Event() + expect_idx = 0 + def notify(mod): + nonlocal expect_idx + self.assertEqual(mod, expect[expect_idx]) + expect_idx += 1 + if expect_idx >= len(expect): + done.set() + scheduler.notifier.publish = notify + + scheduler.start() + + scheduler.submit("main", expid_bg, low_priority) + scheduler.submit("main", expid_empty, high_priority, late) + scheduler.submit("main", expid_empty, middle_priority, early) + + loop.run_until_complete(done.wait()) + scheduler.notifier.publish = None + loop.run_until_complete(scheduler.stop()) + def test_pause(self): loop = self.loop