artiq/artiq/master/worker_impl.py

323 lines
11 KiB
Python
Raw Normal View History

"""Worker process implementation.
This module contains the worker process main() function and the glue code
necessary to connect the global artefacts used from experiment code (scheduler,
device database, etc.) to their actual implementation in the parent master
process via IPC.
"""
import sys
import time
2015-08-07 15:51:56 +08:00
import os
2015-10-20 18:11:50 +08:00
import logging
import traceback
from collections import OrderedDict
import h5py
import artiq
2016-01-26 21:59:36 +08:00
from artiq.protocols import pipe_ipc, pyon
from artiq.protocols.packed_exceptions import raise_packed_exc
from artiq.tools import multiline_log_config, file_import
from artiq.master.worker_db import DeviceManager, DatasetManager, DummyDevice
2016-04-16 19:31:07 +08:00
from artiq.language.environment import (is_experiment, TraceArgumentManager,
ProcessArgumentManager)
from artiq.language.core import set_watchdog_factory, TerminationRequested
2016-06-27 14:37:29 +08:00
from artiq.language.types import TBool
from artiq.compiler import import_cache
from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
from artiq import __version__ as artiq_version
2016-01-26 21:59:36 +08:00
ipc = None
2016-04-05 15:38:49 +08:00
2014-12-31 17:41:22 +08:00
def get_object():
2016-01-26 21:59:36 +08:00
line = ipc.readline().decode()
2014-12-31 17:41:22 +08:00
return pyon.decode(line)
def put_object(obj):
2014-10-25 16:31:34 +08:00
ds = pyon.encode(obj)
2016-01-26 21:59:36 +08:00
ipc.write((ds + "\n").encode())
def make_parent_action(action):
2015-10-28 17:35:57 +08:00
def parent_action(*args, **kwargs):
request = {"action": action, "args": args, "kwargs": kwargs}
2015-01-07 17:50:05 +08:00
put_object(request)
reply = get_object()
2015-05-17 16:11:00 +08:00
if "action" in reply:
if reply["action"] == "terminate":
sys.exit()
else:
raise ValueError
2015-01-07 17:50:05 +08:00
if reply["status"] == "ok":
return reply["data"]
else:
raise_packed_exc(reply["exception"])
2015-01-07 17:50:05 +08:00
return parent_action
class ParentDeviceDB:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
get = make_parent_action("get_device")
class ParentDatasetDB:
get = make_parent_action("get_dataset")
2015-10-28 17:35:57 +08:00
update = make_parent_action("update_dataset")
2015-01-13 19:12:19 +08:00
class Watchdog:
2015-10-28 17:35:57 +08:00
_create = make_parent_action("create_watchdog")
_delete = make_parent_action("delete_watchdog")
def __init__(self, t):
self.t = t
def __enter__(self):
self.wid = Watchdog._create(self.t)
def __exit__(self, type, value, traceback):
Watchdog._delete(self.wid)
2015-04-28 23:23:59 +08:00
set_watchdog_factory(Watchdog)
class Scheduler:
def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid
2015-05-17 16:11:00 +08:00
self.pipeline_name = pipeline_name
self.expid = expid
self.priority = priority
pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self):
if self.pause_noexc():
raise TerminationRequested
2016-07-09 22:58:19 +08:00
_check_pause = staticmethod(make_parent_action("scheduler_check_pause"))
2016-06-27 14:37:29 +08:00
def check_pause(self, rid=None) -> TBool:
if rid is None:
rid = self.rid
return self._check_pause(rid)
_submit = staticmethod(make_parent_action("scheduler_submit"))
def submit(self, pipeline_name=None, expid=None, priority=None, due_date=None, flush=False):
if pipeline_name is None:
pipeline_name = self.pipeline_name
if expid is None:
expid = self.expid
if priority is None:
priority = self.priority
return self._submit(pipeline_name, expid, priority, due_date, flush)
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
class CCB:
issue = staticmethod(make_parent_action("ccb_issue"))
2015-07-15 17:08:12 +08:00
def get_exp(file, class_name):
module = file_import(file, prefix="artiq_worker_")
2015-07-15 17:08:12 +08:00
if class_name is None:
exps = [v for k, v in module.__dict__.items()
if is_experiment(v)]
if len(exps) != 1:
raise ValueError("Found {} experiments in module"
.format(len(exps)))
return exps[0]
2015-01-13 19:12:19 +08:00
else:
2015-07-15 17:08:12 +08:00
return getattr(module, class_name)
2015-01-13 19:12:19 +08:00
2015-10-28 17:35:57 +08:00
register_experiment = make_parent_action("register_experiment")
class ExamineDeviceMgr:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
@staticmethod
2015-10-28 17:35:57 +08:00
def get(name):
return DummyDevice()
class ExamineDatasetMgr:
@staticmethod
def get(key, archive=False):
return ParentDatasetDB.get(key)
@staticmethod
def update(self, mod):
pass
def examine(device_mgr, dataset_mgr, file):
previous_keys = set(sys.modules.keys())
try:
module = file_import(file)
for class_name, exp_class in module.__dict__.items():
if class_name[0] == "_":
continue
if is_experiment(exp_class):
if exp_class.__doc__ is None:
name = class_name
else:
name = exp_class.__doc__.splitlines()[0].strip()
if name[-1] == ".":
name = name[:-1]
argument_mgr = TraceArgumentManager()
scheduler_defaults = {}
cls = exp_class((device_mgr, dataset_mgr, argument_mgr, scheduler_defaults))
arginfo = OrderedDict(
(k, (proc.describe(), group, tooltip))
for k, (proc, group, tooltip) in argument_mgr.requested_args.items())
register_experiment(class_name, name, arginfo, scheduler_defaults)
finally:
new_keys = set(sys.modules.keys())
for key in new_keys - previous_keys:
del sys.modules[key]
def setup_diagnostics(experiment_file, repository_path):
def render_diagnostic(self, diagnostic):
message = "While compiling {}\n".format(experiment_file) + \
_render_diagnostic(diagnostic, colored=False)
if repository_path is not None:
message = message.replace(repository_path, "<repository>")
2016-01-17 01:47:35 +08:00
if diagnostic.level == "warning":
logging.warn(message)
else:
logging.error(message)
# This is kind of gross, but 1) we do not have any explicit connection
# between the worker and a coredevice.core.Core instance at all,
# and 2) the diagnostic engine really ought to be per-Core, since
# that's what uses it and the repository path is per-Core.
# So I don't know how to implement this properly for now.
#
# This hack is as good or bad as any other solution that involves
# putting inherently local objects (the diagnostic engine) into
# global slots, and there isn't any point in making it prettier by
# wrapping it in layers of indirection.
2016-04-05 15:38:49 +08:00
artiq.coredevice.core._DiagnosticEngine.render_diagnostic = \
render_diagnostic
def put_exception_report():
_, exc, _ = sys.exc_info()
# When we get CompileError, a more suitable diagnostic has already
# been printed.
if not isinstance(exc, CompileError):
short_exc_info = type(exc).__name__
exc_str = str(exc)
if exc_str:
short_exc_info += ": " + exc_str.splitlines()[0]
lines = ["Terminating with exception ("+short_exc_info+")\n"]
if hasattr(exc, "artiq_core_exception"):
lines.append(str(exc.artiq_core_exception))
if hasattr(exc, "parent_traceback"):
lines += exc.parent_traceback
lines += traceback.format_exception_only(type(exc), exc)
logging.error("".join(lines).rstrip(),
exc_info=not hasattr(exc, "parent_traceback"))
put_object({"action": "exception"})
2016-01-17 01:47:35 +08:00
def main():
2016-01-26 21:59:36 +08:00
global ipc
2016-01-27 04:30:28 +08:00
multiline_log_config(level=int(sys.argv[2]))
2016-01-26 21:59:36 +08:00
ipc = pipe_ipc.ChildComm(sys.argv[1])
start_time = None
2017-04-26 18:33:10 +08:00
run_time = None
rid = None
2015-05-17 16:11:00 +08:00
expid = None
exp = None
exp_inst = None
repository_path = None
2015-01-13 19:12:19 +08:00
device_mgr = DeviceManager(ParentDeviceDB,
virtual_devices={"scheduler": Scheduler(),
"ccb": CCB()})
dataset_mgr = DatasetManager(ParentDatasetDB)
2015-01-13 19:12:19 +08:00
import_cache.install_hook()
try:
while True:
obj = get_object()
action = obj["action"]
2015-07-09 19:18:12 +08:00
if action == "build":
start_time = time.time()
rid = obj["rid"]
2015-05-17 16:11:00 +08:00
expid = obj["expid"]
2015-08-07 15:51:56 +08:00
if obj["wd"] is not None:
# Using repository
experiment_file = os.path.join(obj["wd"], expid["file"])
repository_path = obj["wd"]
2015-08-07 15:51:56 +08:00
else:
experiment_file = expid["file"]
repository_path = None
setup_diagnostics(experiment_file, repository_path)
exp = get_exp(experiment_file, expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info(
rid, obj["pipeline_name"], expid, obj["priority"])
start_local_time = time.localtime(start_time)
dirname = os.path.join("results",
time.strftime("%Y-%m-%d", start_local_time),
time.strftime("%H", start_local_time))
os.makedirs(dirname, exist_ok=True)
os.chdir(dirname)
2016-04-16 19:31:07 +08:00
argument_mgr = ProcessArgumentManager(expid["arguments"])
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
put_object({"action": "completed"})
2015-07-09 19:18:12 +08:00
elif action == "prepare":
exp_inst.prepare()
put_object({"action": "completed"})
elif action == "run":
run_time = time.time()
exp_inst.run()
put_object({"action": "completed"})
elif action == "analyze":
try:
exp_inst.analyze()
except:
# make analyze failure non-fatal, as we may still want to
# write results afterwards
put_exception_report()
else:
put_object({"action": "completed"})
2015-03-12 02:06:46 +08:00
elif action == "write_results":
filename = "{:09}-{}.h5".format(rid, exp.__name__)
with h5py.File(filename, "w") as f:
dataset_mgr.write_hdf5(f)
f["artiq_version"] = artiq_version
f["rid"] = rid
f["start_time"] = start_time
f["run_time"] = run_time
f["expid"] = pyon.encode(expid)
put_object({"action": "completed"})
elif action == "examine":
examine(ExamineDeviceMgr, ExamineDatasetMgr, obj["file"])
put_object({"action": "completed"})
elif action == "terminate":
break
except:
put_exception_report()
finally:
device_mgr.close_devices()
2016-01-26 21:59:36 +08:00
ipc.close()
2015-10-20 18:11:50 +08:00
if __name__ == "__main__":
main()