master/scheduler: Fix priority/due date precedence order when waiting to prepare

See test case – previously, the highest-priority pending run would
be used to calculate the timeout, rather than the earliest one.

This probably managed to go undetected for that long as any unrelated
changes to the pipeline (e.g. new submissions, or experiments pausing)
would also cause _get_run() to be re-evaluated.
This commit is contained in:
David Nadlinger 2020-06-19 05:16:08 +01:00 committed by Sebastien Bourdeauducq
parent d6aeb03889
commit c667fe136f
2 changed files with 200 additions and 38 deletions

View File

@ -87,17 +87,12 @@ class Run:
self._notifier[self.rid]["status"] = self._status.name
self._state_changed.notify()
# The run with the largest priority_key is to be scheduled first
def priority_key(self, now=None):
if self.due_date is None:
due_date_k = 0
else:
due_date_k = -self.due_date
if now is not None and self.due_date is not None:
runnable = int(now > self.due_date)
else:
runnable = 1
return (runnable, self.priority, due_date_k, -self.rid)
def priority_key(self):
"""Return a comparable value that defines a run priority order.
Applies only to runs the due date of which has already elapsed.
"""
return (self.priority, -(self.due_date or 0), -self.rid)
async def close(self):
# called through pool
@ -162,36 +157,37 @@ class PrepareStage(TaskObject):
self.delete_cb = delete_cb
def _get_run(self):
"""If a run should get prepared now, return it.
Otherwise, return a float representing the time before the next timed
run becomes due, or None if there is no such run."""
"""If a run should get prepared now, return it. Otherwise, return a
float giving the time until the next check, or None if no time-based
check is required.
The latter can be the case if there are no due-date runs, or none
of them are going to become next-in-line before further pool state
changes (which will also cause a re-evaluation).
"""
pending_runs = list(
filter(lambda r: r.status == RunStatus.pending,
self.pool.runs.values()))
now = time()
pending_runs = filter(lambda r: r.status == RunStatus.pending,
self.pool.runs.values())
try:
candidate = max(pending_runs, key=lambda r: r.priority_key(now))
except ValueError:
# pending_runs is an empty sequence
return None
def is_runnable(r):
return (r.due_date or 0) < now
prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done,
self.pool.runs.values())
try:
top_prepared_run = max(prepared_runs,
key=lambda r: r.priority_key())
except ValueError:
# there are no existing prepared runs - go ahead with <candidate>
pass
else:
# prepare <candidate> (as well) only if it has higher priority than
# the highest priority prepared run
if top_prepared_run.priority_key() >= candidate.priority_key():
return None
prepared_max = max((r.priority_key() for r in self.pool.runs.values()
if r.status == RunStatus.prepare_done),
default=None)
def takes_precedence(r):
return prepared_max is None or r.priority_key() > prepared_max
if candidate.due_date is None or candidate.due_date < now:
candidate = max(filter(is_runnable, pending_runs),
key=lambda r: r.priority_key(),
default=None)
if candidate is not None and takes_precedence(candidate):
return candidate
else:
return candidate.due_date - now
return min((r.due_date - now for r in pending_runs
if (not is_runnable(r) and takes_precedence(r))),
default=None)
async def _do(self):
while True:

View File

@ -28,7 +28,18 @@ class BackgroundExperiment(EnvExperiment):
sleep(0.2)
except TerminationRequested:
self.set_dataset("termination_ok", True,
broadcast=True, save=False)
broadcast=True, archive=False)
class CheckPauseBackgroundExperiment(EnvExperiment):
def build(self):
self.setattr_device("scheduler")
def run(self):
while True:
while not self.scheduler.check_pause():
sleep(0.2)
self.scheduler.pause()
def _get_expid(name):
@ -117,6 +128,161 @@ class SchedulerCase(unittest.TestCase):
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())
def test_pending_priority(self):
"""Check due dates take precedence over priorities when waiting to
prepare."""
loop = self.loop
handlers = {}
scheduler = Scheduler(_RIDCounter(0), handlers, None)
handlers["scheduler_check_pause"] = scheduler.check_pause
expid_empty = _get_expid("EmptyExperiment")
expid_bg = _get_expid("CheckPauseBackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed.
expid_bg["log_level"] = logging.CRITICAL
high_priority = 3
middle_priority = 2
low_priority = 1
late = time() + 100000
early = time() + 1
expect = [
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": low_priority,
"pipeline": "main",
"due_date": None,
"status": "pending",
"expid": expid_bg,
"flush": False
},
"key": 0
},
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": high_priority,
"pipeline": "main",
"due_date": late,
"status": "pending",
"expid": expid_empty,
"flush": False
},
"key": 1
},
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": middle_priority,
"pipeline": "main",
"due_date": early,
"status": "pending",
"expid": expid_empty,
"flush": False
},
"key": 2
},
{
"path": [0],
"action": "setitem",
"value": "preparing",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "prepare_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "preparing",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "prepare_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "paused",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "run_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "analyzing",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "deleting",
"key": "status"
},
{
"path": [],
"action": "delitem",
"key": 2
},
]
done = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid_bg, low_priority)
scheduler.submit("main", expid_empty, high_priority, late)
scheduler.submit("main", expid_empty, middle_priority, early)
loop.run_until_complete(done.wait())
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())
def test_pause(self):
loop = self.loop