repository: load experiments in worker, list arguments

This commit is contained in:
Sebastien Bourdeauducq 2015-07-15 10:54:44 +02:00
parent 7770ab64f2
commit 9ed4dcd7d1
4 changed files with 103 additions and 29 deletions

View File

@ -37,7 +37,6 @@ def main():
ddb = FlatFileDB("ddb.pyon") ddb = FlatFileDB("ddb.pyon")
pdb = FlatFileDB("pdb.pyon") pdb = FlatFileDB("pdb.pyon")
rtr = Notifier(dict()) rtr = Notifier(dict())
repository = Repository()
if os.name == "nt": if os.name == "nt":
loop = asyncio.ProactorEventLoop() loop = asyncio.ProactorEventLoop()
@ -61,12 +60,14 @@ def main():
"master_ddb": ddb, "master_ddb": ddb,
"master_pdb": pdb, "master_pdb": pdb,
"master_schedule": scheduler, "master_schedule": scheduler,
"master_repository": repository,
}) })
loop.run_until_complete(server_control.start( loop.run_until_complete(server_control.start(
args.bind, args.port_control)) args.bind, args.port_control))
atexit.register(lambda: loop.run_until_complete(server_control.stop())) atexit.register(lambda: loop.run_until_complete(server_control.stop()))
repository = Repository()
loop.run_until_complete(repository.scan())
server_notify = Publisher({ server_notify = Publisher({
"schedule": scheduler.notifier, "schedule": scheduler.notifier,
"devices": ddb.data, "devices": ddb.data,

View File

@ -1,38 +1,61 @@
import os import os
import logging
import asyncio
from artiq.protocols.sync_struct import Notifier from artiq.protocols.sync_struct import Notifier
from artiq.tools import file_import from artiq.master.worker import Worker
from artiq.language.environment import is_experiment
def scan_experiments(): logger = logging.getLogger(__name__)
@asyncio.coroutine
def _scan_experiments():
r = dict() r = dict()
for f in os.listdir("repository"): for f in os.listdir("repository"):
if f.endswith(".py"): if f.endswith(".py"):
try: try:
m = file_import(os.path.join("repository", f)) full_name = os.path.join("repository", f)
except: worker = Worker()
continue try:
for k, v in m.__dict__.items(): description = yield from worker.examine(full_name)
if is_experiment(v): finally:
if v.__doc__ is None: yield from worker.close()
name = k for class_name, class_desc in description.items():
else: name = class_desc["name"]
name = v.__doc__.splitlines()[0].strip() arguments = class_desc["arguments"]
if name[-1] == ".": if name in r:
name = name[:-1] logger.warning("Duplicate experiment name: '%s'", name)
basename = name
i = 1
while name in r:
name = basename + str(i)
i += 1
entry = { entry = {
"file": os.path.join("repository", f), "file": full_name,
"experiment": k "class_name": class_name,
"arguments": arguments
} }
r[name] = entry r[name] = entry
except:
logger.warning("Skipping file '%s'", f, exc_info=True)
return r return r
def _sync_explist(target, source):
for k in list(target.read.keys()):
if k not in source:
del target[k]
for k in source.keys():
if k not in target.read or target.read[k] != source[k]:
target[k] = source[k]
class Repository: class Repository:
def __init__(self): def __init__(self):
self.explist = Notifier(scan_experiments()) self.explist = Notifier(dict())
def get_data(self, filename): @asyncio.coroutine
with open(os.path.join("repository", filename)) as f: def scan(self):
return f.read() new_explist = yield from _scan_experiments()
_sync_explist(self.explist, new_explist)

View File

@ -26,7 +26,7 @@ class WorkerError(Exception):
class Worker: class Worker:
def __init__(self, handlers, send_timeout=0.5): def __init__(self, handlers=dict(), send_timeout=0.5):
self.handlers = handlers self.handlers = handlers
self.send_timeout = send_timeout self.send_timeout = send_timeout
@ -74,7 +74,7 @@ class Worker:
worker process. worker process.
This method should always be called by the user to clean up, even if This method should always be called by the user to clean up, even if
prepare() raises an exception.""" build() or examine() raises an exception."""
self.closed.set() self.closed.set()
yield from self.io_lock.acquire() yield from self.io_lock.acquire()
try: try:
@ -83,10 +83,10 @@ class Worker:
logger.debug("worker was not created (RID %s)", self.rid) logger.debug("worker was not created (RID %s)", self.rid)
return return
if self.process.returncode is not None: if self.process.returncode is not None:
logger.debug("worker already terminated (RID %d)", self.rid) logger.debug("worker already terminated (RID %s)", self.rid)
if self.process.returncode != 0: if self.process.returncode != 0:
logger.warning("worker finished with status code %d" logger.warning("worker finished with status code %d"
" (RID %d)", self.process.returncode, " (RID %s)", self.process.returncode,
self.rid) self.rid)
return return
obj = {"action": "terminate"} obj = {"action": "terminate"}
@ -94,7 +94,7 @@ class Worker:
yield from self._send(obj, cancellable=False) yield from self._send(obj, cancellable=False)
except: except:
logger.warning("failed to send terminate command to worker" logger.warning("failed to send terminate command to worker"
" (RID %d), killing", self.rid, exc_info=True) " (RID %s), killing", self.rid, exc_info=True)
self.process.kill() self.process.kill()
yield from asyncio_process_wait(self.process) yield from asyncio_process_wait(self.process)
return return
@ -102,11 +102,11 @@ class Worker:
yield from asyncio_process_wait_timeout(self.process, yield from asyncio_process_wait_timeout(self.process,
term_timeout) term_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("worker did not exit (RID %d), killing", self.rid) logger.warning("worker did not exit (RID %s), killing", self.rid)
self.process.kill() self.process.kill()
yield from asyncio_process_wait(self.process) yield from asyncio_process_wait(self.process)
else: else:
logger.debug("worker exited gracefully (RID %d)", self.rid) logger.debug("worker exited gracefully (RID %s)", self.rid)
finally: finally:
self.io_lock.release() self.io_lock.release()
@ -170,6 +170,8 @@ class Worker:
func = self.create_watchdog func = self.create_watchdog
elif action == "delete_watchdog": elif action == "delete_watchdog":
func = self.delete_watchdog func = self.delete_watchdog
elif action == "register_experiment":
func = self.register_experiment
else: else:
func = self.handlers[action] func = self.handlers[action]
try: try:
@ -245,3 +247,15 @@ class Worker:
def write_results(self, timeout=15.0): def write_results(self, timeout=15.0):
yield from self._worker_action({"action": "write_results"}, yield from self._worker_action({"action": "write_results"},
timeout) timeout)
@asyncio.coroutine
def examine(self, file, timeout=20.0):
yield from self._create_process()
r = dict()
def register(class_name, name, arguments):
r[class_name] = {"name": name, "arguments": arguments}
self.register_experiment = register
yield from self._worker_action({"action": "examine",
"file": file}, timeout)
del self.register_experiment
return r

View File

@ -99,6 +99,39 @@ def get_exp(file, exp):
return getattr(module, exp) return getattr(module, exp)
register_experiment = make_parent_action("register_experiment",
"class_name name arguments")
class DummyDMGR:
def get(self, name):
return None
class DummyPDB:
def get(self, name):
return None
def set(self, name, value):
pass
def examine(dmgr, pdb, rdb, file):
module = file_import(file)
for class_name, exp_class in module.__dict__.items():
if is_experiment(exp_class):
if exp_class.__doc__ is None:
name = class_name
else:
name = exp_class.__doc__.splitlines()[0].strip()
if name[-1] == ".":
name = name[:-1]
exp_inst = exp_class(dmgr, pdb, rdb)
arguments = {k: v.describe()
for k, v in exp_inst.requested_args.items()}
register_experiment(class_name, name, arguments)
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
@ -143,6 +176,9 @@ def main():
finally: finally:
f.close() f.close()
put_object({"action": "completed"}) put_object({"action": "completed"})
elif action == "examine":
examine(DummyDMGR(), DummyPDB(), ResultDB(), obj["file"])
put_object({"action": "completed"})
elif action == "terminate": elif action == "terminate":
break break
finally: finally: