artiq/artiq/master/worker_impl.py

252 lines
7.6 KiB
Python
Raw Normal View History

import sys
import time
2015-08-07 15:51:56 +08:00
import os
2015-10-20 18:11:50 +08:00
import logging
from collections import OrderedDict
from artiq.protocols import pyon
from artiq.tools import file_import
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
2015-07-14 04:08:20 +08:00
from artiq.language.environment import is_experiment
from artiq.language.core import set_watchdog_factory, TerminationRequested
2016-01-16 08:06:18 +08:00
from artiq.coredevice.core import CompileError, host_only
from artiq import __version__ as artiq_version
2014-12-31 17:41:22 +08:00
def get_object():
line = sys.__stdin__.readline()
return pyon.decode(line)
def put_object(obj):
2014-10-25 16:31:34 +08:00
ds = pyon.encode(obj)
sys.__stdout__.write(ds)
sys.__stdout__.write("\n")
sys.__stdout__.flush()
2015-01-07 17:50:05 +08:00
class ParentActionError(Exception):
pass
2014-12-31 17:41:22 +08:00
2015-10-28 17:35:57 +08:00
def make_parent_action(action, exception=ParentActionError):
def parent_action(*args, **kwargs):
request = {"action": action, "args": args, "kwargs": kwargs}
2015-01-07 17:50:05 +08:00
put_object(request)
reply = get_object()
2015-05-17 16:11:00 +08:00
if "action" in reply:
if reply["action"] == "terminate":
sys.exit()
else:
raise ValueError
2015-01-07 17:50:05 +08:00
if reply["status"] == "ok":
return reply["data"]
else:
raise exception(reply["message"])
2015-01-07 17:50:05 +08:00
return parent_action
2015-07-22 05:13:50 +08:00
class LogForwarder:
def __init__(self):
self.buffer = ""
2015-10-28 17:35:57 +08:00
to_parent = staticmethod(make_parent_action("log"))
2015-07-22 05:13:50 +08:00
def write(self, data):
self.buffer += data
while "\n" in self.buffer:
i = self.buffer.index("\n")
self.to_parent(self.buffer[:i])
self.buffer = self.buffer[i+1:]
def flush(self):
pass
class ParentDeviceDB:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
get = make_parent_action("get_device", KeyError)
class ParentDatasetDB:
2015-10-28 17:35:57 +08:00
get = make_parent_action("get_dataset", KeyError)
update = make_parent_action("update_dataset")
2015-01-13 19:12:19 +08:00
class Watchdog:
2015-10-28 17:35:57 +08:00
_create = make_parent_action("create_watchdog")
_delete = make_parent_action("delete_watchdog")
def __init__(self, t):
self.t = t
def __enter__(self):
self.wid = Watchdog._create(self.t)
def __exit__(self, type, value, traceback):
Watchdog._delete(self.wid)
2015-04-28 23:23:59 +08:00
set_watchdog_factory(Watchdog)
class Scheduler:
2015-10-28 17:35:57 +08:00
pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self):
if self.pause_noexc():
raise TerminationRequested
2015-05-17 16:11:00 +08:00
2015-10-28 17:35:57 +08:00
submit = staticmethod(make_parent_action("scheduler_submit"))
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
2015-05-17 16:11:00 +08:00
def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid
2015-05-17 16:11:00 +08:00
self.pipeline_name = pipeline_name
self.expid = expid
self.priority = priority
2015-07-15 17:08:12 +08:00
def get_exp(file, class_name):
module = file_import(file, prefix="artiq_worker_")
2015-07-15 17:08:12 +08:00
if class_name is None:
exps = [v for k, v in module.__dict__.items()
if is_experiment(v)]
if len(exps) != 1:
raise ValueError("Found {} experiments in module"
.format(len(exps)))
return exps[0]
2015-01-13 19:12:19 +08:00
else:
2015-07-15 17:08:12 +08:00
return getattr(module, class_name)
2015-01-13 19:12:19 +08:00
2015-10-28 17:35:57 +08:00
register_experiment = make_parent_action("register_experiment")
class ExamineDeviceMgr:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
2015-10-28 17:35:57 +08:00
def get(name):
return None
class DummyDatasetMgr:
2015-10-28 17:35:57 +08:00
def set(key, value, broadcast=False, persist=False, save=True):
return None
2015-10-28 17:35:57 +08:00
def get(key):
pass
def examine(device_mgr, dataset_mgr, file):
module = file_import(file)
for class_name, exp_class in module.__dict__.items():
if class_name[0] == "_":
continue
if is_experiment(exp_class):
if exp_class.__doc__ is None:
name = class_name
else:
name = exp_class.__doc__.splitlines()[0].strip()
if name[-1] == ".":
name = name[:-1]
exp_inst = exp_class(device_mgr, dataset_mgr,
default_arg_none=True,
enable_processors=True)
arginfo = OrderedDict(
(k, (proc.describe(), group))
for k, (proc, group) in exp_inst.requested_args.items())
register_experiment(class_name, name, arginfo)
def string_to_hdf5(f, key, value):
dtype = "S{}".format(len(value))
dataset = f.create_dataset(key, (), dtype)
dataset[()] = value.encode()
def main():
sys.stdout = LogForwarder()
sys.stderr = LogForwarder()
2015-10-20 18:11:50 +08:00
logging.basicConfig(level=int(sys.argv[1]))
start_time = None
rid = None
2015-05-17 16:11:00 +08:00
expid = None
exp = None
exp_inst = None
repository_path = None
2015-01-13 19:12:19 +08:00
device_mgr = DeviceManager(ParentDeviceDB,
virtual_devices={"scheduler": Scheduler()})
dataset_mgr = DatasetManager(ParentDatasetDB)
2015-01-13 19:12:19 +08:00
try:
while True:
obj = get_object()
action = obj["action"]
2015-07-09 19:18:12 +08:00
if action == "build":
start_time = time.localtime()
rid = obj["rid"]
2015-05-17 16:11:00 +08:00
expid = obj["expid"]
2015-08-07 15:51:56 +08:00
if obj["wd"] is not None:
# Using repository
experiment_file = os.path.join(obj["wd"], expid["file"])
repository_path = obj["wd"]
2015-08-07 15:51:56 +08:00
else:
experiment_file = expid["file"]
exp = get_exp(experiment_file, expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info(
rid, obj["pipeline_name"], expid, obj["priority"])
exp_inst = exp(
device_mgr, dataset_mgr, enable_processors=True,
2015-07-14 04:08:20 +08:00
**expid["arguments"])
put_object({"action": "completed"})
2015-07-09 19:18:12 +08:00
elif action == "prepare":
exp_inst.prepare()
put_object({"action": "completed"})
elif action == "run":
exp_inst.run()
put_object({"action": "completed"})
elif action == "analyze":
exp_inst.analyze()
2015-03-12 02:06:46 +08:00
put_object({"action": "completed"})
elif action == "write_results":
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
dataset_mgr.write_hdf5(f)
string_to_hdf5(f, "artiq_version", artiq_version)
2015-08-07 15:51:56 +08:00
if "repo_rev" in expid:
string_to_hdf5(f, "repo_rev", expid["repo_rev"])
finally:
f.close()
put_object({"action": "completed"})
elif action == "examine":
2015-10-28 17:35:57 +08:00
examine(ExamineDeviceMgr, DummyDatasetMgr, obj["file"])
put_object({"action": "completed"})
elif action == "terminate":
break
except CompileError as exc:
# TODO: This should be replaced with a proper DiagnosticEngine.
message = "Cannot compile {}\n".format(experiment_file) + exc.render_string()
if repository_path is not None:
message = message.replace(repository_path, "<repository>")
logging.error(message)
except Exception as exc:
short_exc_info = type(exc).__name__
exc_str = str(exc)
if exc_str:
short_exc_info += ": " + exc_str
logging.error("Terminating with exception (%s)",
short_exc_info, exc_info=True)
finally:
device_mgr.close_devices()
2015-10-20 18:11:50 +08:00
if __name__ == "__main__":
main()