master/client: use dpdb and file import

This commit is contained in:
Sebastien Bourdeauducq 2014-12-08 19:22:02 +08:00
parent 123656e2cd
commit e814da1ba3
6 changed files with 64 additions and 33 deletions

View File

@ -4,8 +4,8 @@ from artiq.management.worker import Worker
class Scheduler: class Scheduler:
def __init__(self): def __init__(self, *args, **kwargs):
self.worker = Worker() self.worker = Worker(*args, **kwargs)
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
@asyncio.coroutine @asyncio.coroutine

View File

@ -11,8 +11,10 @@ class WorkerFailed(Exception):
class Worker: class Worker:
def __init__(self, send_timeout=0.5, start_reply_timeout=1.0, def __init__(self, ddb, pdb,
term_timeout=1.0): send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0):
self.ddb = ddb
self.pdb = pdb
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout self.start_reply_timeout = start_reply_timeout
self.term_timeout = term_timeout self.term_timeout = term_timeout
@ -21,6 +23,7 @@ class Worker:
def create_process(self): def create_process(self):
self.process = yield from asyncio.create_subprocess_exec( self.process = yield from asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.management.worker_impl", sys.executable, "-m", "artiq.management.worker_impl",
self.ddb, self.pdb,
stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stdin=subprocess.PIPE)
@asyncio.coroutine @asyncio.coroutine

View File

@ -1,23 +1,28 @@
import sys import sys
import importlib from inspect import isclass
from artiq.management import pyon from artiq.management import pyon
from artiq.management.file_import import file_import
from artiq.language.context import AutoContext
from artiq.management.dpdb import DeviceParamDB
def import_in_folder(path, name): def run(dpdb, file, unit, function):
try: module = file_import(file)
del sys.modules[name] # force path search if unit is None:
except KeyError: units = [v for k, v in module.__dict__.items()
pass if k[0] != "_"
loader = importlib.find_loader(name, [path]) and isclass(v)
if loader is None: and issubclass(v, AutoContext)
raise ImportError("Could not find loader") and v is not AutoContext]
return loader.load_module() if len(units) != 1:
raise ValueError("Found {} units in module".format(len(units)))
unit = units[0]
def run(path, name): else:
module = import_in_folder(path, name) unit = getattr(module, unit)
module.main() unit_inst = unit(dpdb)
f = getattr(unit_inst, function)
f()
def put_object(obj): def put_object(obj):
@ -30,13 +35,17 @@ def put_object(obj):
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
devices = pyon.load_file(sys.argv[1])
parameters = pyon.load_file(sys.argv[2])
dpdb = DeviceParamDB(devices, parameters)
while True: while True:
line = sys.__stdin__.readline() line = sys.__stdin__.readline()
obj = pyon.decode(line) obj = pyon.decode(line)
put_object("ack") put_object("ack")
try: try:
run(**obj) run(dpdb, **obj)
except Exception as e: except Exception as e:
put_object({"status": "failed", "message": str(e)}) put_object({"status": "failed", "message": str(e)})
else: else:

View File

@ -13,10 +13,22 @@ def _get_args():
parser.add_argument( parser.add_argument(
"--port", default=8888, type=int, "--port", default=8888, type=int,
help="TCP port to use to connect to the master") help="TCP port to use to connect to the master")
parser.add_argument(
"-o", "--run-once", default=[], nargs=3, subparsers = parser.add_subparsers(dest="action")
action="append",
help="run experiment once. arguments: <path> <name> <timeout>") parser_add = subparsers.add_parser("add", help="add an experiment")
parser_add.add_argument(
"-p", "--periodic", default=None, type=float,
help="run the experiment periodically every given number of seconds")
parser_add.add_argument(
"-t", "--timeout", default=None, type=float,
help="specify a timeout for the experiment to complete")
parser_add.add_argument("-f", "--function", default="run",
help="function to run")
parser_add.add_argument("-u", "--unit", default=None,
help="unit to run")
parser_add.add_argument("file", help="file containing the unit to run")
return parser.parse_args() return parser.parse_args()
@ -24,12 +36,16 @@ def main():
args = _get_args() args = _get_args()
remote = Client(args.server, args.port, "master") remote = Client(args.server, args.port, "master")
try: try:
for path, name, timeout in args.run_once: if args.action == "add":
if args.periodic is None:
remote.run_once( remote.run_once(
{ {
"path": path, "file": args.file,
"name": name "unit": args.unit,
}, int(timeout)) "function": args.function
}, args.timeout)
else:
raise NotImplementedError
finally: finally:
remote.close_rpc() remote.close_rpc()

View File

@ -8,7 +8,7 @@ from artiq.management.scheduler import Scheduler
def _get_args(): def _get_args():
parser = argparse.ArgumentParser(description="PDQ2 controller") parser = argparse.ArgumentParser(description="ARTIQ master")
parser.add_argument( parser.add_argument(
"--bind", default="::1", "--bind", default="::1",
help="hostname or IP address to bind to") help="hostname or IP address to bind to")
@ -22,7 +22,7 @@ def main():
args = _get_args() args = _get_args()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
scheduler = Scheduler() scheduler = Scheduler("ddb.pyon", "pdb.pyon")
loop.run_until_complete(scheduler.start()) loop.run_until_complete(scheduler.start())
try: try:
server = Server(scheduler, "master") server = Server(scheduler, "master")

View File

@ -57,7 +57,10 @@ def main():
module = file_import(args.file) module = file_import(args.file)
if args.unit is None: if args.unit is None:
units = [(k, v) for k, v in module.__dict__.items() units = [(k, v) for k, v in module.__dict__.items()
if k[0] != "_" and isclass(v) and issubclass(v, AutoContext) and v is not AutoContext] if k[0] != "_"
and isclass(v)
and issubclass(v, AutoContext)
and v is not AutoContext]
l = len(units) l = len(units)
if l == 0: if l == 0:
print("No units found in module") print("No units found in module")