From 2c77c80b4f9af43cb0b6dccd809a807b68b70b00 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 30 Oct 2015 13:41:18 +0800 Subject: [PATCH] master: expose more scheduler APIs to the experiments --- artiq/frontend/artiq_master.py | 7 ++++++- artiq/frontend/artiq_run.py | 14 +++++++++++--- artiq/master/scheduler.py | 8 ++++++++ artiq/master/worker_impl.py | 10 +++++++--- doc/manual/management_system.rst | 14 ++++++++++++-- examples/master/repository/terminate_all.py | 18 ++++++++++++++++++ 6 files changed, 62 insertions(+), 9 deletions(-) create mode 100644 examples/master/repository/terminate_all.py diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 186d14e68..4659cf1a0 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -83,7 +83,12 @@ def main(): "log": log_worker } scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) - worker_handlers["scheduler_submit"] = scheduler.submit + worker_handlers.update({ + "scheduler_submit": scheduler.submit, + "scheduler_delete": scheduler.delete, + "scheduler_request_termination": scheduler.request_termination, + "scheduler_get_status": scheduler.get_status + }) scheduler.start() atexit.register(lambda: loop.run_until_complete(scheduler.stop())) diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index b4477077d..0db16a4d6 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -32,20 +32,28 @@ class ELFRunner(EnvExperiment): class DummyScheduler: def __init__(self): - self.next_rid = 0 + self.rid = 0 self.pipeline_name = "main" self.priority = 0 self.expid = None + self._next_rid = 1 + def submit(self, pipeline_name, expid, priority, due_date, flush): - rid = self.next_rid - self.next_rid += 1 + rid = self._next_rid + self._next_rid += 1 logger.info("Submitting: %s, RID=%s", expid, rid) return rid def delete(self, rid): logger.info("Deleting RID %s", rid) + def request_termination(self, rid): + logger.info("Requesting termination of RID %s", rid) + + def get_status(self): + return dict() + def pause(self): pass diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 11eb6384e..b0987d361 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -412,6 +412,7 @@ class Scheduler: logger.warning("some pipelines were not garbage-collected") def submit(self, pipeline_name, expid, priority, due_date, flush): + """Submits a new run.""" # mutates expid to insert head repository revision if None if self._terminated: return @@ -427,9 +428,11 @@ class Scheduler: return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) def delete(self, rid): + """Kills the run with the specified RID.""" self._deleter.delete(rid) def request_termination(self, rid): + """Requests graceful termination of the run with the specified RID.""" for pipeline in self._pipelines.values(): if rid in pipeline.pool.runs: run = pipeline.pool.runs[rid] @@ -438,3 +441,8 @@ class Scheduler: else: self.delete(rid) break + + def get_status(self): + """Returns a dictionary containing information about the runs currently + tracked by the scheduler.""" + return self.notifier.read diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 722fcb75c..e4e617e80 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -95,9 +95,13 @@ class Scheduler: raise TerminationRequested submit = staticmethod(make_parent_action("scheduler_submit")) - cancel = staticmethod(make_parent_action("scheduler_cancel")) + delete = staticmethod(make_parent_action("scheduler_delete")) + request_termination = staticmethod( + make_parent_action("scheduler_request_termination")) + get_status = staticmethod(make_parent_action("scheduler_get_status")) - def set_run_info(self, pipeline_name, expid, priority): + def set_run_info(self, rid, pipeline_name, expid, priority): + self.rid = rid self.pipeline_name = pipeline_name self.expid = expid self.priority = priority @@ -182,7 +186,7 @@ def main(): expf = expid["file"] exp = get_exp(expf, expid["class_name"]) device_mgr.virtual_devices["scheduler"].set_run_info( - obj["pipeline_name"], expid, obj["priority"]) + rid, obj["pipeline_name"], expid, obj["priority"]) exp_inst = exp(device_mgr, dataset_mgr, **expid["arguments"]) put_object({"action": "completed"}) diff --git a/doc/manual/management_system.rst b/doc/manual/management_system.rst index b3072ad49..84a471cce 100644 --- a/doc/manual/management_system.rst +++ b/doc/manual/management_system.rst @@ -111,8 +111,18 @@ Push commits containing experiments to the bare repository using e.g. Git over S The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option. -Reference -********* +Scheduler API reference +*********************** + +The scheduler is exposed to the experiments via a virtual device called ``scheduler``. It can be requested like any regular device, and then the methods below can be called on the returned object. + +The scheduler virtual device also contains the attributes ``rid``, ``pipeline_name``, ``priority`` and ``expid`` that contain the corresponding information about the current run. + +.. autoclass:: artiq.master.scheduler.Scheduler + :members: + +Front-end tool reference +************************ .. argparse:: :ref: artiq.frontend.artiq_master.get_argparser diff --git a/examples/master/repository/terminate_all.py b/examples/master/repository/terminate_all.py new file mode 100644 index 000000000..e9f7c9242 --- /dev/null +++ b/examples/master/repository/terminate_all.py @@ -0,0 +1,18 @@ +from artiq import * + + +class TerminateAll(EnvExperiment): + def build(self): + self.setattr_device("scheduler") + self.setattr_argument("graceful_termination", BooleanValue(True)) + + def run(self): + if self.graceful_termination: + terminate = self.scheduler.request_termination + else: + terminate = self.scheduler.delete + + print("our RID", self.scheduler.rid) + for rid in self.scheduler.get_status().keys(): + if rid != self.scheduler.rid: + terminate(rid)