artiq/artiq/master/worker_impl.py

391 lines
13 KiB
Python
Raw Normal View History

"""Worker process implementation.
This module contains the worker process main() function and the glue code
necessary to connect the global artefacts used from experiment code (scheduler,
device database, etc.) to their actual implementation in the parent master
process via IPC.
"""
import sys
import time
2015-08-07 15:51:56 +08:00
import os
import inspect
2015-10-20 18:11:50 +08:00
import logging
import traceback
from collections import OrderedDict
import importlib.util
import linecache
import h5py
2019-11-10 15:55:17 +08:00
from sipyco import pipe_ipc, pyon
from sipyco.packed_exceptions import raise_packed_exc
from sipyco.logging_tools import multiline_log_config
import artiq
from artiq import tools
from artiq.master.worker_db import DeviceManager, DatasetManager, DummyDevice
from artiq.language.environment import (
is_public_experiment, TraceArgumentManager, ProcessArgumentManager
)
from artiq.language.core import host_only, set_watchdog_factory, TerminationRequested
2016-06-27 14:37:29 +08:00
from artiq.language.types import TBool
from artiq.compiler import import_cache
from artiq.coredevice.core import CompileError, _render_diagnostic
from artiq import __version__ as artiq_version
2016-01-26 21:59:36 +08:00
ipc = None
2016-04-05 15:38:49 +08:00
2014-12-31 17:41:22 +08:00
def get_object():
2016-01-26 21:59:36 +08:00
line = ipc.readline().decode()
2014-12-31 17:41:22 +08:00
return pyon.decode(line)
def put_object(obj):
2014-10-25 16:31:34 +08:00
ds = pyon.encode(obj)
2016-01-26 21:59:36 +08:00
ipc.write((ds + "\n").encode())
def make_parent_action(action):
2015-10-28 17:35:57 +08:00
def parent_action(*args, **kwargs):
request = {"action": action, "args": args, "kwargs": kwargs}
2015-01-07 17:50:05 +08:00
put_object(request)
reply = get_object()
2015-05-17 16:11:00 +08:00
if "action" in reply:
if reply["action"] == "terminate":
sys.exit()
else:
raise ValueError
2015-01-07 17:50:05 +08:00
if reply["status"] == "ok":
return reply["data"]
else:
raise_packed_exc(reply["exception"])
2015-01-07 17:50:05 +08:00
return parent_action
class ParentDeviceDB:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
get = make_parent_action("get_device")
class ParentDatasetDB:
get = make_parent_action("get_dataset")
2015-10-28 17:35:57 +08:00
update = make_parent_action("update_dataset")
2024-04-30 16:41:17 +08:00
get_metadata = make_parent_action("get_dataset_metadata")
2015-01-13 19:12:19 +08:00
class Watchdog:
2015-10-28 17:35:57 +08:00
_create = make_parent_action("create_watchdog")
_delete = make_parent_action("delete_watchdog")
def __init__(self, t):
self.t = t
def __enter__(self):
self.wid = Watchdog._create(self.t)
def __exit__(self, type, value, traceback):
Watchdog._delete(self.wid)
2015-04-28 23:23:59 +08:00
set_watchdog_factory(Watchdog)
class Scheduler:
def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid
2015-05-17 16:11:00 +08:00
self.pipeline_name = pipeline_name
self.expid = expid
self.priority = priority
pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self):
if self.pause_noexc():
raise TerminationRequested
2016-07-09 22:58:19 +08:00
_check_pause = staticmethod(make_parent_action("scheduler_check_pause"))
2016-06-27 14:37:29 +08:00
def check_pause(self, rid=None) -> TBool:
if rid is None:
rid = self.rid
return self._check_pause(rid)
_check_termination = staticmethod(make_parent_action("scheduler_check_termination"))
def check_termination(self, rid=None) -> TBool:
if rid is None:
rid = self.rid
return self._check_termination(rid)
_submit = staticmethod(make_parent_action("scheduler_submit"))
def submit(self, pipeline_name=None, expid=None, priority=None, due_date=None, flush=False):
if pipeline_name is None:
pipeline_name = self.pipeline_name
if expid is None:
expid = self.expid
if priority is None:
priority = self.priority
return self._submit(pipeline_name, expid, priority, due_date, flush)
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
class CCB:
issue = staticmethod(make_parent_action("ccb_issue"))
def get_experiment_from_file(file, class_name):
module = tools.file_import(file, prefix="artiq_worker_")
return tools.get_experiment(module, class_name)
2015-01-13 19:12:19 +08:00
class StringLoader:
def __init__(self, fake_filename, content):
self.fake_filename = fake_filename
self.content = content
def get_source(self, fullname):
return self.content
def create_module(self, spec):
return None
def exec_module(self, module):
code = compile(self.get_source(self.fake_filename), self.fake_filename, "exec")
exec(code, module.__dict__)
def get_experiment_from_content(content, class_name):
fake_filename = "expcontent"
spec = importlib.util.spec_from_loader(
"expmodule",
StringLoader(fake_filename, content)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
linecache.lazycache(fake_filename, module.__dict__)
return tools.get_experiment(module, class_name)
2015-10-28 17:35:57 +08:00
register_experiment = make_parent_action("register_experiment")
class ExamineDeviceMgr:
2015-10-28 17:35:57 +08:00
get_device_db = make_parent_action("get_device_db")
@staticmethod
2015-10-28 17:35:57 +08:00
def get(name):
return DummyDevice()
class ExamineDatasetMgr:
@staticmethod
def get(key, archive=False):
return ParentDatasetDB.get(key)
2024-04-30 16:41:17 +08:00
@staticmethod
def get_metadata(key):
return ParentDatasetDB.get_metadata(key)
def examine(device_mgr, dataset_mgr, file):
previous_keys = set(sys.modules.keys())
try:
module = tools.file_import(file)
for class_name, exp_class in inspect.getmembers(module, is_public_experiment):
if exp_class.__doc__ is None:
name = class_name
else:
name = exp_class.__doc__.strip().splitlines()[0].strip()
if name[-1] == ".":
name = name[:-1]
argument_mgr = TraceArgumentManager()
scheduler_defaults = {}
2024-02-27 10:39:37 +08:00
exp_class((device_mgr, dataset_mgr, argument_mgr, scheduler_defaults))
arginfo = OrderedDict(
(k, (proc.describe(), group, tooltip))
for k, (proc, group, tooltip) in argument_mgr.requested_args.items()
)
argument_ui = None
if hasattr(exp_class, "argument_ui"):
argument_ui = exp_class.argument_ui
register_experiment(class_name, name, arginfo, argument_ui, scheduler_defaults)
finally:
new_keys = set(sys.modules.keys())
for key in new_keys - previous_keys:
del sys.modules[key]
class ArgumentManager(ProcessArgumentManager):
_get_interactive = make_parent_action("get_interactive_arguments")
2024-03-11 17:07:25 +08:00
def get_interactive(self, interactive_arglist, title):
arglist_desc = [(k, p.describe(), g, t)
for k, p, g, t in interactive_arglist]
2024-03-11 17:07:25 +08:00
arguments = ArgumentManager._get_interactive(arglist_desc, title)
2024-03-22 17:30:56 +08:00
if arguments is not None:
for key, processor, _, _ in interactive_arglist:
arguments[key] = processor.process(arguments[key])
return arguments
def setup_diagnostics(experiment_file, repository_path):
def render_diagnostic(self, diagnostic):
message = "While compiling {}\n".format(experiment_file) + \
_render_diagnostic(diagnostic, colored=False)
if repository_path is not None:
message = message.replace(repository_path, "<repository>")
2016-01-17 01:47:35 +08:00
if diagnostic.level == "warning":
logging.warning(message)
else:
logging.error(message)
# This is kind of gross, but 1) we do not have any explicit connection
# between the worker and a coredevice.core.Core instance at all,
# and 2) the diagnostic engine really ought to be per-Core, since
# that's what uses it and the repository path is per-Core.
# So I don't know how to implement this properly for now.
#
# This hack is as good or bad as any other solution that involves
# putting inherently local objects (the diagnostic engine) into
# global slots, and there isn't any point in making it prettier by
# wrapping it in layers of indirection.
2016-04-05 15:38:49 +08:00
artiq.coredevice.core._DiagnosticEngine.render_diagnostic = \
render_diagnostic
def put_completed():
put_object({"action": "completed"})
def put_exception_report():
_, exc, _ = sys.exc_info()
# When we get CompileError, a more suitable diagnostic has already
# been printed.
if not isinstance(exc, CompileError):
short_exc_info = type(exc).__name__
exc_str = str(exc)
if exc_str:
short_exc_info += ": " + exc_str.splitlines()[0]
lines = ["Terminating with exception ("+short_exc_info+")\n"]
if hasattr(exc, "artiq_core_exception"):
lines.append(str(exc.artiq_core_exception))
if hasattr(exc, "parent_traceback"):
lines += exc.parent_traceback
lines += traceback.format_exception_only(type(exc), exc)
logging.error("".join(lines).rstrip(),
exc_info=not hasattr(exc, "parent_traceback"))
put_object({"action": "exception"})
2016-01-17 01:47:35 +08:00
def main():
2016-01-26 21:59:36 +08:00
global ipc
2016-01-27 04:30:28 +08:00
multiline_log_config(level=int(sys.argv[2]))
2016-01-26 21:59:36 +08:00
ipc = pipe_ipc.ChildComm(sys.argv[1])
start_time = None
2017-04-26 18:33:10 +08:00
run_time = None
rid = None
2015-05-17 16:11:00 +08:00
expid = None
exp = None
exp_inst = None
repository_path = None
2015-01-13 19:12:19 +08:00
def write_results():
filename = "{:09}-{}.h5".format(rid, exp.__name__)
with h5py.File(filename, "w") as f:
dataset_mgr.write_hdf5(f)
f["artiq_version"] = artiq_version
f["rid"] = rid
f["start_time"] = start_time
f["run_time"] = run_time
f["expid"] = pyon.encode(expid)
device_mgr = DeviceManager(ParentDeviceDB,
virtual_devices={"scheduler": Scheduler(),
"ccb": CCB()})
dataset_mgr = DatasetManager(ParentDatasetDB)
2015-01-13 19:12:19 +08:00
import_cache.install_hook()
try:
while True:
obj = get_object()
action = obj["action"]
2015-07-09 19:18:12 +08:00
if action == "build":
start_time = time.time()
rid = obj["rid"]
2015-05-17 16:11:00 +08:00
expid = obj["expid"]
2023-12-18 12:11:40 +08:00
if "devarg_override" in expid:
device_mgr.devarg_override = expid["devarg_override"]
if "file" in expid:
if obj["wd"] is not None:
# Using repository
experiment_file = os.path.join(obj["wd"], expid["file"])
repository_path = obj["wd"]
else:
experiment_file = expid["file"]
repository_path = None
setup_diagnostics(experiment_file, repository_path)
exp = get_experiment_from_file(experiment_file, expid["class_name"])
2015-08-07 15:51:56 +08:00
else:
setup_diagnostics("<none>", None)
exp = get_experiment_from_content(expid["content"], expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info(
rid, obj["pipeline_name"], expid, obj["priority"])
start_local_time = time.localtime(start_time)
dirname = os.path.join("results",
2024-02-27 10:39:37 +08:00
time.strftime("%Y-%m-%d", start_local_time),
time.strftime("%H", start_local_time))
os.makedirs(dirname, exist_ok=True)
os.chdir(dirname)
argument_mgr = ArgumentManager(expid["arguments"])
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
argument_mgr.check_unprocessed_arguments()
put_completed()
2015-07-09 19:18:12 +08:00
elif action == "prepare":
exp_inst.prepare()
put_completed()
elif action == "run":
run_time = time.time()
try:
exp_inst.run()
except:
# Only write results in run() on failure; on success wait
# for end of analyze stage.
write_results()
raise
2023-12-13 14:27:04 +08:00
finally:
device_mgr.notify_run_end()
put_completed()
elif action == "analyze":
try:
exp_inst.analyze()
finally:
# browser's analyze shouldn't write results,
# since it doesn't run the experiment and cannot have rid
if rid is not None:
write_results()
put_completed()
elif action == "examine":
examine(ExamineDeviceMgr, ExamineDatasetMgr, obj["file"])
put_completed()
elif action == "terminate":
break
except:
put_exception_report()
finally:
device_mgr.close_devices()
2016-01-26 21:59:36 +08:00
ipc.close()
2015-10-20 18:11:50 +08:00
if __name__ == "__main__":
main()