master: expose more scheduler APIs to the experiments

This commit is contained in:
Sebastien Bourdeauducq 2015-10-30 13:41:18 +08:00
parent 0e375e4980
commit 2c77c80b4f
6 changed files with 62 additions and 9 deletions

View File

@ -83,7 +83,12 @@ def main():
"log": log_worker "log": log_worker
} }
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
worker_handlers["scheduler_submit"] = scheduler.submit worker_handlers.update({
"scheduler_submit": scheduler.submit,
"scheduler_delete": scheduler.delete,
"scheduler_request_termination": scheduler.request_termination,
"scheduler_get_status": scheduler.get_status
})
scheduler.start() scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

View File

@ -32,20 +32,28 @@ class ELFRunner(EnvExperiment):
class DummyScheduler: class DummyScheduler:
def __init__(self): def __init__(self):
self.next_rid = 0 self.rid = 0
self.pipeline_name = "main" self.pipeline_name = "main"
self.priority = 0 self.priority = 0
self.expid = None self.expid = None
self._next_rid = 1
def submit(self, pipeline_name, expid, priority, due_date, flush): def submit(self, pipeline_name, expid, priority, due_date, flush):
rid = self.next_rid rid = self._next_rid
self.next_rid += 1 self._next_rid += 1
logger.info("Submitting: %s, RID=%s", expid, rid) logger.info("Submitting: %s, RID=%s", expid, rid)
return rid return rid
def delete(self, rid): def delete(self, rid):
logger.info("Deleting RID %s", rid) logger.info("Deleting RID %s", rid)
def request_termination(self, rid):
logger.info("Requesting termination of RID %s", rid)
def get_status(self):
return dict()
def pause(self): def pause(self):
pass pass

View File

@ -412,6 +412,7 @@ class Scheduler:
logger.warning("some pipelines were not garbage-collected") logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, priority, due_date, flush): def submit(self, pipeline_name, expid, priority, due_date, flush):
"""Submits a new run."""
# mutates expid to insert head repository revision if None # mutates expid to insert head repository revision if None
if self._terminated: if self._terminated:
return return
@ -427,9 +428,11 @@ class Scheduler:
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
def delete(self, rid): def delete(self, rid):
"""Kills the run with the specified RID."""
self._deleter.delete(rid) self._deleter.delete(rid)
def request_termination(self, rid): def request_termination(self, rid):
"""Requests graceful termination of the run with the specified RID."""
for pipeline in self._pipelines.values(): for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs: if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid] run = pipeline.pool.runs[rid]
@ -438,3 +441,8 @@ class Scheduler:
else: else:
self.delete(rid) self.delete(rid)
break break
def get_status(self):
"""Returns a dictionary containing information about the runs currently
tracked by the scheduler."""
return self.notifier.read

View File

@ -95,9 +95,13 @@ class Scheduler:
raise TerminationRequested raise TerminationRequested
submit = staticmethod(make_parent_action("scheduler_submit")) submit = staticmethod(make_parent_action("scheduler_submit"))
cancel = staticmethod(make_parent_action("scheduler_cancel")) delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
def set_run_info(self, pipeline_name, expid, priority): def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid
self.pipeline_name = pipeline_name self.pipeline_name = pipeline_name
self.expid = expid self.expid = expid
self.priority = priority self.priority = priority
@ -182,7 +186,7 @@ def main():
expf = expid["file"] expf = expid["file"]
exp = get_exp(expf, expid["class_name"]) exp = get_exp(expf, expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info( device_mgr.virtual_devices["scheduler"].set_run_info(
obj["pipeline_name"], expid, obj["priority"]) rid, obj["pipeline_name"], expid, obj["priority"])
exp_inst = exp(device_mgr, dataset_mgr, exp_inst = exp(device_mgr, dataset_mgr,
**expid["arguments"]) **expid["arguments"])
put_object({"action": "completed"}) put_object({"action": "completed"})

View File

@ -111,8 +111,18 @@ Push commits containing experiments to the bare repository using e.g. Git over S
The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option. The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option.
Reference Scheduler API reference
********* ***********************
The scheduler is exposed to the experiments via a virtual device called ``scheduler``. It can be requested like any regular device, and then the methods below can be called on the returned object.
The scheduler virtual device also contains the attributes ``rid``, ``pipeline_name``, ``priority`` and ``expid`` that contain the corresponding information about the current run.
.. autoclass:: artiq.master.scheduler.Scheduler
:members:
Front-end tool reference
************************
.. argparse:: .. argparse::
:ref: artiq.frontend.artiq_master.get_argparser :ref: artiq.frontend.artiq_master.get_argparser

View File

@ -0,0 +1,18 @@
from artiq import *
class TerminateAll(EnvExperiment):
def build(self):
self.setattr_device("scheduler")
self.setattr_argument("graceful_termination", BooleanValue(True))
def run(self):
if self.graceful_termination:
terminate = self.scheduler.request_termination
else:
terminate = self.scheduler.delete
print("our RID", self.scheduler.rid)
for rid in self.scheduler.get_status().keys():
if rid != self.scheduler.rid:
terminate(rid)