master: expose scheduler API to experiments

This commit is contained in:
Sebastien Bourdeauducq 2015-02-19 12:09:11 -07:00
parent 9ffc370416
commit c69c4d5ce9
4 changed files with 22 additions and 8 deletions

View File

@ -51,13 +51,18 @@ def main():
def run_cb(rid, run_params): def run_cb(rid, run_params):
rtr.current_group = run_params["rtr_group"] rtr.current_group = run_params["rtr_group"]
scheduler = Scheduler({ scheduler = Scheduler(run_cb)
scheduler.worker.handlers = {
"req_device": ddb.request, "req_device": ddb.request,
"req_parameter": pdb.request, "req_parameter": pdb.request,
"set_parameter": pdb.set, "set_parameter": pdb.set,
"init_rt_results": rtr.init, "init_rt_results": rtr.init,
"update_rt_results": rtr.update "update_rt_results": rtr.update,
}, run_cb) "scheduler_run_queued": scheduler.run_queued,
"scheduler_cancel_queued": scheduler.cancel_queued,
"scheduler_run_timed": scheduler.run_timed,
"scheduler_cancel_timed": scheduler.cancel_timed,
}
loop.run_until_complete(scheduler.start()) loop.run_until_complete(scheduler.start())
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))

View File

@ -6,9 +6,9 @@ from artiq.master.worker import Worker
class Scheduler: class Scheduler:
def __init__(self, worker_handlers, run_cb): def __init__(self, run_cb):
self.run_cb = run_cb self.run_cb = run_cb
self.worker = Worker(worker_handlers) self.worker = Worker()
self.next_rid = 0 self.next_rid = 0
self.queue = Notifier([]) self.queue = Notifier([])
self.queue_modified = asyncio.Event() self.queue_modified = asyncio.Event()

View File

@ -16,9 +16,9 @@ class RunFailed(Exception):
class Worker: class Worker:
def __init__(self, handlers, def __init__(self,
send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0):
self.handlers = handlers self.handlers = dict()
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout self.start_reply_timeout = start_reply_timeout
self.term_timeout = term_timeout self.term_timeout = term_timeout

View File

@ -58,6 +58,15 @@ def publish_rt_results(notifier, data):
update_rt_results(data) update_rt_results(data)
class Scheduler:
run_queued = make_parent_action("scheduler_run_queued",
"run_params timeout")
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed",
"run_params timeout next_run")
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")
def get_unit(file, unit): def get_unit(file, unit):
module = file_import(file) module = file_import(file)
if unit is None: if unit is None:
@ -92,7 +101,7 @@ def run(obj):
dbh = DBHub(ParentDDB, ParentPDB, rdb) dbh = DBHub(ParentDDB, ParentPDB, rdb)
try: try:
try: try:
unit_inst = unit(dbh, **obj["arguments"]) unit_inst = unit(dbh, scheduler=Scheduler, **obj["arguments"])
unit_inst.run() unit_inst.run()
if hasattr(unit_inst, "analyze"): if hasattr(unit_inst, "analyze"):
unit_inst.analyze() unit_inst.analyze()