forked from M-Labs/artiq
1
0
Fork 0

master: handle dpdb in master process

This commit is contained in:
Sebastien Bourdeauducq 2014-12-31 17:41:22 +08:00
parent 0aa450ad5d
commit 9cd89a0c50
5 changed files with 171 additions and 68 deletions

View File

@ -2,6 +2,8 @@ from collections import OrderedDict
import importlib import importlib
from artiq.language.context import * from artiq.language.context import *
from artiq.management import pyon
from artiq.management.sync_struct import Notifier
def create_device(desc, mvs): def create_device(desc, mvs):
@ -10,35 +12,46 @@ def create_device(desc, mvs):
return device_class(mvs, **desc["parameters"]) return device_class(mvs, **desc["parameters"])
class DeviceParamDB: class DeviceParamSupplier:
def __init__(self, devices, parameters): """Supplies devices and parameters to AutoContext objects.
self.devices = devices
self.parameters = parameters """
def __init__(self, req_device, req_parameter):
self.req_device = req_device
self.req_parameter = req_parameter
self.active_devices = OrderedDict() self.active_devices = OrderedDict()
def get_missing_value(self, name, kind, requester): def get_missing_value(self, name, kind, requester):
if isinstance(kind, Device): if isinstance(kind, Device):
if name in self.active_devices: if name in self.active_devices:
return self.active_devices[name] return self.active_devices[name]
elif name in self.devices: else:
desc = self.devices[name] try:
while isinstance(desc, str): desc = self.req_device(name)
# alias except KeyError:
desc = self.devices[desc] raise KeyError(
"Unknown device '{}' of type '{}' requested by {}"
.format(name, kind.type_hint, requester))
try:
while isinstance(desc, str):
# alias
desc = self.req_device(desc)
except KeyError:
raise KeyError(
"Unknown alias '{}' for device '{}' of type '{}'"
" requested by {}"
.format(desc, name, kind.type_hint, requester))
dev = create_device(desc, self) dev = create_device(desc, self)
self.active_devices[name] = dev self.active_devices[name] = dev
return dev return dev
else:
raise KeyError("Unknown device '{}' of type '{}'"
" requested by {}"
.format(name, kind.type_hint, requester))
elif isinstance(kind, Parameter): elif isinstance(kind, Parameter):
if name in self.parameters: try:
return self.parameters[name] return self.req_parameter(name)
elif kind.default is not NoDefault: except KeyError:
return kind.default if kind.default is not NoDefault:
else: return kind.default
raise KeyError("Unknown parameter: " + name) else:
raise KeyError("Unknown parameter: " + name)
else: else:
raise NotImplementedError raise NotImplementedError
@ -47,3 +60,39 @@ class DeviceParamDB:
if hasattr(dev, "close"): if hasattr(dev, "close"):
dev.close() dev.close()
self.active_devices = OrderedDict() self.active_devices = OrderedDict()
class DeviceParamDB:
def __init__(self, ddb_file, pdb_file):
self.ddb_file = ddb_file
self.pdb_file = pdb_file
self.ddb = Notifier(pyon.load_file(self.ddb_file))
self.pdb = Notifier(pyon.load_file(self.pdb_file))
def save_ddb(self):
pyon.store_file(self.ddb_file, self.ddb)
def save_pdb(self):
pyon.store_file(self.pdb_file, self.pdb)
def req_device(self, name):
return self.ddb.backing_struct[name]
def set_device(self, name, description):
self.ddb[name] = description
self.save_ddb()
def del_device(self, name):
del self.ddb[name]
self.save_ddb()
def req_parameter(self, name):
return self.pdb.backing_struct[name]
def set_parameter(self, name, value):
self.pdb[name] = value
self.save_pdb()
def del_parameter(self, name):
del self.pdb[name]
self.save_pdb()

View File

@ -6,8 +6,8 @@ from artiq.management.worker import Worker
class Scheduler: class Scheduler:
def __init__(self, *args, **kwargs): def __init__(self, worker_handlers):
self.worker = Worker(*args, **kwargs) self.worker = Worker(worker_handlers)
self.next_rid = 0 self.next_rid = 0
self.queue = Notifier([]) self.queue = Notifier([])
self.queue_count = asyncio.Semaphore(0) self.queue_count = asyncio.Semaphore(0)
@ -56,6 +56,16 @@ class Scheduler:
def cancel_periodic(self, prid): def cancel_periodic(self, prid):
del self.periodic[prid] del self.periodic[prid]
@asyncio.coroutine
def _run(self, rid, run_params, timeout):
try:
yield from self.worker.run(run_params, timeout)
except Exception as e:
print("RID {} failed:".format(rid))
print(e)
else:
print("RID {} completed successfully".format(rid))
@asyncio.coroutine @asyncio.coroutine
def _run_periodic(self): def _run_periodic(self):
while True: while True:
@ -80,8 +90,7 @@ class Scheduler:
rid = self.new_rid() rid = self.new_rid()
self.queue.insert(0, (rid, run_params, timeout)) self.queue.insert(0, (rid, run_params, timeout))
result = yield from self.worker.run(run_params, timeout) yield from self._run(rid, run_params, timeout)
print(prid, rid, result)
del self.queue[0] del self.queue[0]
@asyncio.coroutine @asyncio.coroutine
@ -101,6 +110,5 @@ class Scheduler:
yield from self._run_periodic() yield from self._run_periodic()
if ev_queue in done: if ev_queue in done:
rid, run_params, timeout = self.queue.backing_struct[0] rid, run_params, timeout = self.queue.backing_struct[0]
result = yield from self.worker.run(run_params, timeout) yield from self._run(rid, run_params, timeout)
print(rid, result)
del self.queue[0] del self.queue[0]

View File

@ -10,11 +10,14 @@ class WorkerFailed(Exception):
pass pass
class RunFailed(Exception):
pass
class Worker: class Worker:
def __init__(self, ddb, pdb, def __init__(self, handlers,
send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0):
self.ddb = ddb self.handlers = handlers
self.pdb = pdb
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout self.start_reply_timeout = start_reply_timeout
self.term_timeout = term_timeout self.term_timeout = term_timeout
@ -23,7 +26,6 @@ class Worker:
def create_process(self): def create_process(self):
self.process = yield from asyncio.create_subprocess_exec( self.process = yield from asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.management.worker_impl", sys.executable, "-m", "artiq.management.worker_impl",
self.ddb, self.pdb,
stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stdin=subprocess.PIPE)
@asyncio.coroutine @asyncio.coroutine
@ -62,8 +64,22 @@ class Worker:
obj = yield from self._recv(self.start_reply_timeout) obj = yield from self._recv(self.start_reply_timeout)
if obj != "ack": if obj != "ack":
raise WorkerFailed("Incorrect acknowledgement") raise WorkerFailed("Incorrect acknowledgement")
result = yield from self._recv(result_timeout) while True:
return result obj = yield from self._recv(result_timeout)
action = obj["action"]
if action == "report_completed":
if obj["status"] != "ok":
raise RunFailed(obj["message"])
else:
return
else:
del obj["action"]
try:
data = self.handlers[action](**obj)
reply = {"status": "ok", "data": data}
except:
reply = {"status": "failed"}
yield from self._send(reply, self.send_timeout)
@asyncio.coroutine @asyncio.coroutine
def end_process(self): def end_process(self):

View File

@ -1,13 +1,14 @@
import sys import sys
from inspect import isclass from inspect import isclass
import traceback
from artiq.management import pyon from artiq.management import pyon
from artiq.management.file_import import file_import from artiq.management.file_import import file_import
from artiq.language.context import AutoContext from artiq.language.context import AutoContext
from artiq.management.dpdb import DeviceParamDB from artiq.management.dpdb import DeviceParamSupplier
def run(dpdb, file, unit, function): def run(dps, file, unit, function):
module = file_import(file) module = file_import(file)
if unit is None: if unit is None:
units = [v for k, v in module.__dict__.items() units = [v for k, v in module.__dict__.items()
@ -20,11 +21,16 @@ def run(dpdb, file, unit, function):
unit = units[0] unit = units[0]
else: else:
unit = getattr(module, unit) unit = getattr(module, unit)
unit_inst = unit(dpdb) unit_inst = unit(dps)
f = getattr(unit_inst, function) f = getattr(unit_inst, function)
f() f()
def get_object():
line = sys.__stdin__.readline()
return pyon.decode(line)
def put_object(obj): def put_object(obj):
ds = pyon.encode(obj) ds = pyon.encode(obj)
sys.__stdout__.write(ds) sys.__stdout__.write(ds)
@ -32,24 +38,45 @@ def put_object(obj):
sys.__stdout__.flush() sys.__stdout__.flush()
def req_device(name):
put_object({"action": "req_device", "name": name})
obj = get_object()
if obj["status"] == "ok":
return obj["data"]
else:
raise KeyError
def req_parameter(name):
put_object({"action": "req_parameter", "name": name})
obj = get_object()
if obj["status"] == "ok":
return obj["data"]
else:
raise KeyError
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
devices = pyon.load_file(sys.argv[1]) dps = DeviceParamSupplier(req_device, req_parameter)
parameters = pyon.load_file(sys.argv[2])
dpdb = DeviceParamDB(devices, parameters)
while True: while True:
line = sys.__stdin__.readline() obj = get_object()
obj = pyon.decode(line)
put_object("ack") put_object("ack")
try: try:
run(dpdb, **obj) try:
except Exception as e: run(dps, **obj)
put_object({"status": "failed", "message": str(e)}) except Exception:
else: put_object({"action": "report_completed",
put_object({"status": "ok"}) "status": "failed",
"message": traceback.format_exc()})
else:
put_object({"action": "report_completed",
"status": "ok"})
finally:
dps.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -2,9 +2,11 @@
import asyncio import asyncio
import argparse import argparse
import atexit
from artiq.management.pc_rpc import Server from artiq.management.pc_rpc import Server
from artiq.management.sync_struct import Publisher from artiq.management.sync_struct import Publisher
from artiq.management.dpdb import DeviceParamDB
from artiq.management.scheduler import Scheduler from artiq.management.scheduler import Scheduler
@ -24,31 +26,32 @@ def _get_args():
def main(): def main():
args = _get_args() args = _get_args()
dpdb = DeviceParamDB("ddb.pyon", "pdb.pyon")
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: atexit.register(lambda: loop.close())
scheduler = Scheduler("ddb.pyon", "pdb.pyon")
loop.run_until_complete(scheduler.start()) scheduler = Scheduler({
try: "req_device": dpdb.req_device,
schedule_control = Server(scheduler, "schedule_control") "req_parameter": dpdb.req_parameter
loop.run_until_complete(schedule_control.start( })
args.bind, args.port_schedule_control)) loop.run_until_complete(scheduler.start())
try: atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
schedule_notify = Publisher({
"queue": scheduler.queue, schedule_control = Server(scheduler, "schedule_control")
"periodic": scheduler.periodic loop.run_until_complete(schedule_control.start(
}) args.bind, args.port_schedule_control))
loop.run_until_complete(schedule_notify.start( atexit.register(lambda: loop.run_until_complete(schedule_control.stop()))
args.bind, args.port_schedule_notify))
try: schedule_notify = Publisher({
loop.run_forever() "queue": scheduler.queue,
finally: "periodic": scheduler.periodic
loop.run_until_complete(schedule_notify.stop()) })
finally: loop.run_until_complete(schedule_notify.start(
loop.run_until_complete(schedule_control.stop()) args.bind, args.port_schedule_notify))
finally: atexit.register(lambda: loop.run_until_complete(schedule_notify.stop()))
loop.run_until_complete(scheduler.stop())
finally: loop.run_forever()
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()