From de29db0b35b6841d1db372091fd7c2dc50235738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Bourdeauducq?= Date: Wed, 28 Feb 2024 11:49:33 +0800 Subject: [PATCH] master: implement interactive arguments Interaction with experiment termination (forceful and requested) still needs some work. --- artiq/frontend/artiq_master.py | 13 +++++++++++-- artiq/master/databases.py | 31 +++++++++++++++++++++++++++++++ artiq/master/worker.py | 8 +++++++- artiq/master/worker_impl.py | 14 +++++++++++++- 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 074d50e48..dfd22565b 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -15,7 +15,8 @@ from sipyco.asyncio_tools import atexit_register_coroutine, SignalHandler from artiq import __version__ as artiq_version from artiq.master.log import log_args, init_log -from artiq.master.databases import DeviceDB, DatasetDB +from artiq.master.databases import (DeviceDB, DatasetDB, + InteractiveArgDB) from artiq.master.scheduler import Scheduler from artiq.master.rid_counter import RIDCounter from artiq.master.experiments import (FilesystemBackend, GitBackend, @@ -95,6 +96,7 @@ def main(): atexit.register(dataset_db.close_db) dataset_db.start(loop=loop) atexit_register_coroutine(dataset_db.stop, loop=loop) + interactive_arg_db = InteractiveArgDB() worker_handlers = dict() if args.git: @@ -110,11 +112,16 @@ def main(): scheduler.start(loop=loop) atexit_register_coroutine(scheduler.stop, loop=loop) + # Python doesn't allow writing attributes to bound methods. + def get_interactive_arguments(*args, **kwargs): + return interactive_arg_db.get(*args, **kwargs) + get_interactive_arguments._worker_pass_rid = True worker_handlers.update({ "get_device_db": device_db.get_device_db, "get_device": device_db.get, "get_dataset": dataset_db.get, "update_dataset": dataset_db.update, + "get_interactive_arguments": get_interactive_arguments, "scheduler_submit": scheduler.submit, "scheduler_delete": scheduler.delete, "scheduler_request_termination": scheduler.request_termination, @@ -135,6 +142,7 @@ def main(): "master_management": master_management, "device_db": device_db, "dataset_db": dataset_db, + "interactive_arg_db": interactive_arg_db, "schedule": scheduler, "experiment_db": experiment_db, }, allow_parallel=True) @@ -146,8 +154,9 @@ def main(): "schedule": scheduler.notifier, "devices": device_db.data, "datasets": dataset_db.data, + "interactive_args": interactive_arg_db.pending, "explist": experiment_db.explist, - "explist_status": experiment_db.status + "explist_status": experiment_db.status, }) loop.run_until_complete(server_notify.start( bind, args.port_notify)) diff --git a/artiq/master/databases.py b/artiq/master/databases.py index a24085446..c34a01300 100644 --- a/artiq/master/databases.py +++ b/artiq/master/databases.py @@ -114,3 +114,34 @@ class DatasetDB(TaskObject): del self.data[key] self.pending_keys.add(key) # + + +class InteractiveArgDB: + def __init__(self): + self.pending = Notifier(dict()) + self.futures = dict() + + async def get(self, rid, arglist_desc): + self.pending[rid] = arglist_desc + self.futures[rid] = asyncio.get_running_loop().create_future() + try: + value = await self.futures[rid] + finally: + del self.pending[rid] + del self.futures[rid] + return value + + def supply(self, rid, values): + # quick sanity checks + if rid not in self.futures: + raise ValueError("no experiment with this RID is " + "waiting for interactive arguments") + if {i[0] for i in self.pending.raw_view[rid]} != set(values.keys()): + raise ValueError("supplied and requested keys do not match") + self.futures[rid].set_result(values) + + def cancel(self, rid): + if rid not in self.futures: + raise ValueError("no experiment with this RID is " + "waiting for interactive arguments") + self.futures[rid].cancel() diff --git a/artiq/master/worker.py b/artiq/master/worker.py index ca83dc7db..ddcf57802 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -226,7 +226,13 @@ class Worker: else: func = self.handlers[action] try: - data = func(*obj["args"], **obj["kwargs"]) + if getattr(func, "_worker_pass_rid", False): + args = [self.rid] + list(obj["args"]) + else: + args = obj["args"] + data = func(*args, **obj["kwargs"]) + if asyncio.iscoroutine(data): + data = await data reply = {"status": "ok", "data": data} except: reply = { diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index d4b9893a6..72e5326ad 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -215,6 +215,18 @@ def examine(device_mgr, dataset_mgr, file): del sys.modules[key] +class ArgumentManager(ProcessArgumentManager): + _get_interactive = make_parent_action("get_interactive_arguments") + + def get_interactive(self, interactive_arglist): + arglist_desc = [(k, p.describe(), g, t) + for k, p, g, t in interactive_arglist] + arguments = ArgumentManager._get_interactive(arglist_desc) + for key, processor, _, _ in interactive_arglist: + arguments[key] = processor.process(arguments[key]) + return arguments + + def setup_diagnostics(experiment_file, repository_path): def render_diagnostic(self, diagnostic): message = "While compiling {}\n".format(experiment_file) + \ @@ -327,7 +339,7 @@ def main(): time.strftime("%H", start_local_time)) os.makedirs(dirname, exist_ok=True) os.chdir(dirname) - argument_mgr = ProcessArgumentManager(expid["arguments"]) + argument_mgr = ArgumentManager(expid["arguments"]) exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {})) argument_mgr.check_unprocessed_arguments() put_completed()