master: implement interactive arguments

Interaction with experiment termination (forceful and requested) still
needs some work.
This commit is contained in:
Sébastien Bourdeauducq 2024-02-28 11:49:33 +08:00
parent 42d3c3b4b2
commit de29db0b35
4 changed files with 62 additions and 4 deletions

View File

@ -15,7 +15,8 @@ from sipyco.asyncio_tools import atexit_register_coroutine, SignalHandler
from artiq import __version__ as artiq_version from artiq import __version__ as artiq_version
from artiq.master.log import log_args, init_log from artiq.master.log import log_args, init_log
from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.databases import (DeviceDB, DatasetDB,
InteractiveArgDB)
from artiq.master.scheduler import Scheduler from artiq.master.scheduler import Scheduler
from artiq.master.rid_counter import RIDCounter from artiq.master.rid_counter import RIDCounter
from artiq.master.experiments import (FilesystemBackend, GitBackend, from artiq.master.experiments import (FilesystemBackend, GitBackend,
@ -95,6 +96,7 @@ def main():
atexit.register(dataset_db.close_db) atexit.register(dataset_db.close_db)
dataset_db.start(loop=loop) dataset_db.start(loop=loop)
atexit_register_coroutine(dataset_db.stop, loop=loop) atexit_register_coroutine(dataset_db.stop, loop=loop)
interactive_arg_db = InteractiveArgDB()
worker_handlers = dict() worker_handlers = dict()
if args.git: if args.git:
@ -110,11 +112,16 @@ def main():
scheduler.start(loop=loop) scheduler.start(loop=loop)
atexit_register_coroutine(scheduler.stop, loop=loop) atexit_register_coroutine(scheduler.stop, loop=loop)
# Python doesn't allow writing attributes to bound methods.
def get_interactive_arguments(*args, **kwargs):
return interactive_arg_db.get(*args, **kwargs)
get_interactive_arguments._worker_pass_rid = True
worker_handlers.update({ worker_handlers.update({
"get_device_db": device_db.get_device_db, "get_device_db": device_db.get_device_db,
"get_device": device_db.get, "get_device": device_db.get,
"get_dataset": dataset_db.get, "get_dataset": dataset_db.get,
"update_dataset": dataset_db.update, "update_dataset": dataset_db.update,
"get_interactive_arguments": get_interactive_arguments,
"scheduler_submit": scheduler.submit, "scheduler_submit": scheduler.submit,
"scheduler_delete": scheduler.delete, "scheduler_delete": scheduler.delete,
"scheduler_request_termination": scheduler.request_termination, "scheduler_request_termination": scheduler.request_termination,
@ -135,6 +142,7 @@ def main():
"master_management": master_management, "master_management": master_management,
"device_db": device_db, "device_db": device_db,
"dataset_db": dataset_db, "dataset_db": dataset_db,
"interactive_arg_db": interactive_arg_db,
"schedule": scheduler, "schedule": scheduler,
"experiment_db": experiment_db, "experiment_db": experiment_db,
}, allow_parallel=True) }, allow_parallel=True)
@ -146,8 +154,9 @@ def main():
"schedule": scheduler.notifier, "schedule": scheduler.notifier,
"devices": device_db.data, "devices": device_db.data,
"datasets": dataset_db.data, "datasets": dataset_db.data,
"interactive_args": interactive_arg_db.pending,
"explist": experiment_db.explist, "explist": experiment_db.explist,
"explist_status": experiment_db.status "explist_status": experiment_db.status,
}) })
loop.run_until_complete(server_notify.start( loop.run_until_complete(server_notify.start(
bind, args.port_notify)) bind, args.port_notify))

View File

@ -114,3 +114,34 @@ class DatasetDB(TaskObject):
del self.data[key] del self.data[key]
self.pending_keys.add(key) self.pending_keys.add(key)
# #
class InteractiveArgDB:
def __init__(self):
self.pending = Notifier(dict())
self.futures = dict()
async def get(self, rid, arglist_desc):
self.pending[rid] = arglist_desc
self.futures[rid] = asyncio.get_running_loop().create_future()
try:
value = await self.futures[rid]
finally:
del self.pending[rid]
del self.futures[rid]
return value
def supply(self, rid, values):
# quick sanity checks
if rid not in self.futures:
raise ValueError("no experiment with this RID is "
"waiting for interactive arguments")
if {i[0] for i in self.pending.raw_view[rid]} != set(values.keys()):
raise ValueError("supplied and requested keys do not match")
self.futures[rid].set_result(values)
def cancel(self, rid):
if rid not in self.futures:
raise ValueError("no experiment with this RID is "
"waiting for interactive arguments")
self.futures[rid].cancel()

View File

@ -226,7 +226,13 @@ class Worker:
else: else:
func = self.handlers[action] func = self.handlers[action]
try: try:
data = func(*obj["args"], **obj["kwargs"]) if getattr(func, "_worker_pass_rid", False):
args = [self.rid] + list(obj["args"])
else:
args = obj["args"]
data = func(*args, **obj["kwargs"])
if asyncio.iscoroutine(data):
data = await data
reply = {"status": "ok", "data": data} reply = {"status": "ok", "data": data}
except: except:
reply = { reply = {

View File

@ -215,6 +215,18 @@ def examine(device_mgr, dataset_mgr, file):
del sys.modules[key] del sys.modules[key]
class ArgumentManager(ProcessArgumentManager):
_get_interactive = make_parent_action("get_interactive_arguments")
def get_interactive(self, interactive_arglist):
arglist_desc = [(k, p.describe(), g, t)
for k, p, g, t in interactive_arglist]
arguments = ArgumentManager._get_interactive(arglist_desc)
for key, processor, _, _ in interactive_arglist:
arguments[key] = processor.process(arguments[key])
return arguments
def setup_diagnostics(experiment_file, repository_path): def setup_diagnostics(experiment_file, repository_path):
def render_diagnostic(self, diagnostic): def render_diagnostic(self, diagnostic):
message = "While compiling {}\n".format(experiment_file) + \ message = "While compiling {}\n".format(experiment_file) + \
@ -327,7 +339,7 @@ def main():
time.strftime("%H", start_local_time)) time.strftime("%H", start_local_time))
os.makedirs(dirname, exist_ok=True) os.makedirs(dirname, exist_ok=True)
os.chdir(dirname) os.chdir(dirname)
argument_mgr = ProcessArgumentManager(expid["arguments"]) argument_mgr = ArgumentManager(expid["arguments"])
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {})) exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
argument_mgr.check_unprocessed_arguments() argument_mgr.check_unprocessed_arguments()
put_completed() put_completed()