diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index e38dfc8ec..2b37717e2 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -51,13 +51,18 @@ def main(): def run_cb(rid, run_params): rtr.current_group = run_params["rtr_group"] - scheduler = Scheduler({ + scheduler = Scheduler(run_cb) + scheduler.worker.handlers = { "req_device": ddb.request, "req_parameter": pdb.request, "set_parameter": pdb.set, "init_rt_results": rtr.init, - "update_rt_results": rtr.update - }, run_cb) + "update_rt_results": rtr.update, + "scheduler_run_queued": scheduler.run_queued, + "scheduler_cancel_queued": scheduler.cancel_queued, + "scheduler_run_timed": scheduler.run_timed, + "scheduler_cancel_timed": scheduler.cancel_timed, + } loop.run_until_complete(scheduler.start()) atexit.register(lambda: loop.run_until_complete(scheduler.stop())) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index e05af2e3a..8698bcc6a 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -6,9 +6,9 @@ from artiq.master.worker import Worker class Scheduler: - def __init__(self, worker_handlers, run_cb): + def __init__(self, run_cb): self.run_cb = run_cb - self.worker = Worker(worker_handlers) + self.worker = Worker() self.next_rid = 0 self.queue = Notifier([]) self.queue_modified = asyncio.Event() diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 168c11050..63d2a4201 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -16,9 +16,9 @@ class RunFailed(Exception): class Worker: - def __init__(self, handlers, + def __init__(self, send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): - self.handlers = handlers + self.handlers = dict() self.send_timeout = send_timeout self.start_reply_timeout = start_reply_timeout self.term_timeout = term_timeout diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 1d7ac5889..592e089ad 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -58,6 +58,15 @@ def publish_rt_results(notifier, data): update_rt_results(data) +class Scheduler: + run_queued = make_parent_action("scheduler_run_queued", + "run_params timeout") + cancel_queued = make_parent_action("scheduler_cancel_queued", "rid") + run_timed = make_parent_action("scheduler_run_timed", + "run_params timeout next_run") + cancel_timed = make_parent_action("scheduler_cancel_timed", "trid") + + def get_unit(file, unit): module = file_import(file) if unit is None: @@ -92,7 +101,7 @@ def run(obj): dbh = DBHub(ParentDDB, ParentPDB, rdb) try: try: - unit_inst = unit(dbh, **obj["arguments"]) + unit_inst = unit(dbh, scheduler=Scheduler, **obj["arguments"]) unit_inst.run() if hasattr(unit_inst, "analyze"): unit_inst.analyze()