diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 37e3a6fde..bf3b6d0c5 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -37,7 +37,6 @@ def main(): ddb = FlatFileDB("ddb.pyon") pdb = FlatFileDB("pdb.pyon") rtr = Notifier(dict()) - repository = Repository() if os.name == "nt": loop = asyncio.ProactorEventLoop() @@ -61,12 +60,14 @@ def main(): "master_ddb": ddb, "master_pdb": pdb, "master_schedule": scheduler, - "master_repository": repository, }) loop.run_until_complete(server_control.start( args.bind, args.port_control)) atexit.register(lambda: loop.run_until_complete(server_control.stop())) + repository = Repository() + loop.run_until_complete(repository.scan()) + server_notify = Publisher({ "schedule": scheduler.notifier, "devices": ddb.data, diff --git a/artiq/master/repository.py b/artiq/master/repository.py index 3ddea5110..c4c098f2b 100644 --- a/artiq/master/repository.py +++ b/artiq/master/repository.py @@ -1,38 +1,61 @@ import os +import logging +import asyncio from artiq.protocols.sync_struct import Notifier -from artiq.tools import file_import -from artiq.language.environment import is_experiment +from artiq.master.worker import Worker -def scan_experiments(): +logger = logging.getLogger(__name__) + + +@asyncio.coroutine +def _scan_experiments(): r = dict() for f in os.listdir("repository"): if f.endswith(".py"): try: - m = file_import(os.path.join("repository", f)) - except: - continue - for k, v in m.__dict__.items(): - if is_experiment(v): - if v.__doc__ is None: - name = k - else: - name = v.__doc__.splitlines()[0].strip() - if name[-1] == ".": - name = name[:-1] + full_name = os.path.join("repository", f) + worker = Worker() + try: + description = yield from worker.examine(full_name) + finally: + yield from worker.close() + for class_name, class_desc in description.items(): + name = class_desc["name"] + arguments = class_desc["arguments"] + if name in r: + logger.warning("Duplicate experiment name: '%s'", name) + basename = name + i = 1 + while name in r: + name = basename + str(i) + i += 1 entry = { - "file": os.path.join("repository", f), - "experiment": k + "file": full_name, + "class_name": class_name, + "arguments": arguments } r[name] = entry + except: + logger.warning("Skipping file '%s'", f, exc_info=True) return r +def _sync_explist(target, source): + for k in list(target.read.keys()): + if k not in source: + del target[k] + for k in source.keys(): + if k not in target.read or target.read[k] != source[k]: + target[k] = source[k] + + class Repository: def __init__(self): - self.explist = Notifier(scan_experiments()) + self.explist = Notifier(dict()) - def get_data(self, filename): - with open(os.path.join("repository", filename)) as f: - return f.read() + @asyncio.coroutine + def scan(self): + new_explist = yield from _scan_experiments() + _sync_explist(self.explist, new_explist) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 923559d30..198c97afc 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -26,7 +26,7 @@ class WorkerError(Exception): class Worker: - def __init__(self, handlers, send_timeout=0.5): + def __init__(self, handlers=dict(), send_timeout=0.5): self.handlers = handlers self.send_timeout = send_timeout @@ -74,7 +74,7 @@ class Worker: worker process. This method should always be called by the user to clean up, even if - prepare() raises an exception.""" + build() or examine() raises an exception.""" self.closed.set() yield from self.io_lock.acquire() try: @@ -83,10 +83,10 @@ class Worker: logger.debug("worker was not created (RID %s)", self.rid) return if self.process.returncode is not None: - logger.debug("worker already terminated (RID %d)", self.rid) + logger.debug("worker already terminated (RID %s)", self.rid) if self.process.returncode != 0: logger.warning("worker finished with status code %d" - " (RID %d)", self.process.returncode, + " (RID %s)", self.process.returncode, self.rid) return obj = {"action": "terminate"} @@ -94,7 +94,7 @@ class Worker: yield from self._send(obj, cancellable=False) except: logger.warning("failed to send terminate command to worker" - " (RID %d), killing", self.rid, exc_info=True) + " (RID %s), killing", self.rid, exc_info=True) self.process.kill() yield from asyncio_process_wait(self.process) return @@ -102,11 +102,11 @@ class Worker: yield from asyncio_process_wait_timeout(self.process, term_timeout) except asyncio.TimeoutError: - logger.warning("worker did not exit (RID %d), killing", self.rid) + logger.warning("worker did not exit (RID %s), killing", self.rid) self.process.kill() yield from asyncio_process_wait(self.process) else: - logger.debug("worker exited gracefully (RID %d)", self.rid) + logger.debug("worker exited gracefully (RID %s)", self.rid) finally: self.io_lock.release() @@ -170,6 +170,8 @@ class Worker: func = self.create_watchdog elif action == "delete_watchdog": func = self.delete_watchdog + elif action == "register_experiment": + func = self.register_experiment else: func = self.handlers[action] try: @@ -245,3 +247,15 @@ class Worker: def write_results(self, timeout=15.0): yield from self._worker_action({"action": "write_results"}, timeout) + + @asyncio.coroutine + def examine(self, file, timeout=20.0): + yield from self._create_process() + r = dict() + def register(class_name, name, arguments): + r[class_name] = {"name": name, "arguments": arguments} + self.register_experiment = register + yield from self._worker_action({"action": "examine", + "file": file}, timeout) + del self.register_experiment + return r diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index eb0fec592..59df48a02 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -99,6 +99,39 @@ def get_exp(file, exp): return getattr(module, exp) +register_experiment = make_parent_action("register_experiment", + "class_name name arguments") + + +class DummyDMGR: + def get(self, name): + return None + + +class DummyPDB: + def get(self, name): + return None + + def set(self, name, value): + pass + + +def examine(dmgr, pdb, rdb, file): + module = file_import(file) + for class_name, exp_class in module.__dict__.items(): + if is_experiment(exp_class): + if exp_class.__doc__ is None: + name = class_name + else: + name = exp_class.__doc__.splitlines()[0].strip() + if name[-1] == ".": + name = name[:-1] + exp_inst = exp_class(dmgr, pdb, rdb) + arguments = {k: v.describe() + for k, v in exp_inst.requested_args.items()} + register_experiment(class_name, name, arguments) + + def main(): sys.stdout = sys.stderr @@ -143,6 +176,9 @@ def main(): finally: f.close() put_object({"action": "completed"}) + elif action == "examine": + examine(DummyDMGR(), DummyPDB(), ResultDB(), obj["file"]) + put_object({"action": "completed"}) elif action == "terminate": break finally: