From d6ced1c7809f2be9554c7c0fc50c6009b93c70cf Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 24 May 2015 01:09:22 +0800 Subject: [PATCH] scheduler: support priorities --- artiq/frontend/artiq_client.py | 20 ++++++++++++-------- artiq/gui/schedule.py | 14 ++++++++------ artiq/master/scheduler.py | 14 ++++++++------ 3 files changed, 28 insertions(+), 20 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index 6259c90d3..443b72b40 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -32,12 +32,14 @@ def get_argparser(): subparsers.required = True parser_add = subparsers.add_parser("submit", help="submit an experiment") - parser_add.add_argument( - "-t", "--timed", default=None, type=str, - help="set a due date for the experiment") + parser_add.add_argument("-t", "--timed", default=None, type=str, + help="set a due date for the experiment") parser_add.add_argument("-p", "--pipeline", default="main", type=str, help="pipeline to run the experiment in " "(default: %(default)s)") + parser_add.add_argument("-P", "--priority", default=0, type=int, + help="priority (higher value means sooner " + "scheduling, default: %(default)s)") parser_add.add_argument("-e", "--experiment", default=None, help="experiment to run") parser_add.add_argument("file", @@ -104,7 +106,7 @@ def _action_submit(remote, args): due_date = None else: due_date = time.mktime(parse_date(args.timed).timetuple()) - rid = remote.submit(args.pipeline, expid, due_date) + rid = remote.submit(args.pipeline, expid, args.priority, due_date) print("RID: {}".format(rid)) @@ -132,11 +134,13 @@ def _show_schedule(schedule): clear_screen() if schedule: l = sorted(schedule.items(), - key=lambda x: (x[1]["due_date"] or 0, x[0])) - table = PrettyTable(["RID", "Pipeline", " Status ", "Due date", - "File", "Experiment", "Arguments"]) + key=lambda x: (x[1]["due_date"] or 0, + -x[1]["priority"], + x[0])) + table = PrettyTable(["RID", "Pipeline", " Status ", "Prio", + "Due date", "File", "Experiment", "Arguments"]) for rid, v in l: - row = [rid, v["pipeline"], v["status"]] + row = [rid, v["pipeline"], v["status"], v["priority"]] if v["due_date"] is None: row.append("") else: diff --git a/artiq/gui/schedule.py b/artiq/gui/schedule.py index 451d99be8..46f42d583 100644 --- a/artiq/gui/schedule.py +++ b/artiq/gui/schedule.py @@ -12,13 +12,13 @@ from artiq.tools import format_arguments class _ScheduleModel(DictSyncModel): def __init__(self, parent, init): DictSyncModel.__init__(self, - ["RID", "Pipeline", "Status", "Due date", + ["RID", "Pipeline", "Status", "Prio", "Due date", "File", "Experiment", "Arguments"], parent, init) def sort_key(self, k, v): - # order by due date, and then by RID - return (v["due_date"] or 0, k) + # order by due date, and then by priority and RID + return (v["due_date"] or 0, -v["priority"], k) def convert(self, k, v, column): if column == 0: @@ -28,19 +28,21 @@ class _ScheduleModel(DictSyncModel): elif column == 2: return v["status"] elif column == 3: + return str(v["priority"]) + elif column == 4: if v["due_date"] is None: return "" else: return time.strftime("%m/%d %H:%M:%S", time.localtime(v["due_date"])) - elif column == 4: - return v["expid"]["file"] elif column == 5: + return v["expid"]["file"] + elif column == 6: if v["expid"]["experiment"] is None: return "" else: return v["expid"]["experiment"] - elif column == 6: + elif column == 7: return format_arguments(v["expid"]["arguments"]) else: raise ValueError diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index fbe560606..91fe06b47 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -45,12 +45,13 @@ def _mk_worker_method(name): class Run: def __init__(self, rid, pipeline_name, - expid, due_date, + expid, priority, due_date, worker_handlers, notifier): # called through pool self.rid = rid self.pipeline_name = pipeline_name self.expid = expid + self.priority = priority self.due_date = due_date self._status = RunStatus.pending @@ -61,6 +62,7 @@ class Run: self._notifier[self.rid] = { "pipeline": self.pipeline_name, "expid": self.expid, + "priority": self.priority, "due_date": self.due_date, "status": self._status.name } @@ -83,7 +85,7 @@ class Run: else: overdue = int(now > self.due_date) due_date_k = -self.due_date - return (overdue, due_date_k, -self.rid) + return (overdue, self.priority, due_date_k, -self.rid) @asyncio.coroutine def close(self): @@ -123,10 +125,10 @@ class RunPool: self._worker_handlers = worker_handlers self._notifier = notifier - def submit(self, expid, due_date, pipeline_name): + def submit(self, expid, priority, due_date, pipeline_name): # called through scheduler rid = self._ridc.get() - run = Run(rid, pipeline_name, expid, due_date, + run = Run(rid, pipeline_name, expid, priority, due_date, self._worker_handlers, self._notifier) self.runs[rid] = run if self.submitted_callback is not None: @@ -349,7 +351,7 @@ class Scheduler: if self._pipelines: logger.warning("some pipelines were not garbage-collected") - def submit(self, pipeline_name, expid, due_date): + def submit(self, pipeline_name, expid, priority, due_date): if self._terminated: return try: @@ -360,7 +362,7 @@ class Scheduler: self._worker_handlers, self.notifier) self._pipelines[pipeline_name] = pipeline pipeline.start() - return pipeline.pool.submit(expid, due_date, pipeline_name) + return pipeline.pool.submit(expid, priority, due_date, pipeline_name) def delete(self, rid): self._deleter.delete(rid)