forked from M-Labs/artiq
1
0
Fork 0

worker_impl: port to NAC3

This commit is contained in:
Sebastien Bourdeauducq 2022-02-26 17:39:59 +08:00
parent 892b96892f
commit 7b02918a43
1 changed files with 4 additions and 30 deletions

View File

@ -20,6 +20,8 @@ from sipyco import pipe_ipc, pyon
from sipyco.packed_exceptions import raise_packed_exc from sipyco.packed_exceptions import raise_packed_exc
from sipyco.logging_tools import multiline_log_config from sipyco.logging_tools import multiline_log_config
from nac3artiq import CompileError
import artiq import artiq
from artiq import tools from artiq import tools
from artiq.master.worker_db import DeviceManager, DatasetManager, DummyDevice from artiq.master.worker_db import DeviceManager, DatasetManager, DummyDevice
@ -27,9 +29,7 @@ from artiq.language.environment import (
is_public_experiment, TraceArgumentManager, ProcessArgumentManager is_public_experiment, TraceArgumentManager, ProcessArgumentManager
) )
from artiq.language.core import set_watchdog_factory, TerminationRequested from artiq.language.core import set_watchdog_factory, TerminationRequested
from artiq.language.types import TBool
from artiq.language import import_cache from artiq.language import import_cache
from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
from artiq import __version__ as artiq_version from artiq import __version__ as artiq_version
@ -98,13 +98,13 @@ class Scheduler:
self.priority = priority self.priority = priority
pause_noexc = staticmethod(make_parent_action("pause")) pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self): def pause(self):
if self.pause_noexc(): if self.pause_noexc():
raise TerminationRequested raise TerminationRequested
_check_pause = staticmethod(make_parent_action("scheduler_check_pause")) _check_pause = staticmethod(make_parent_action("scheduler_check_pause"))
def check_pause(self, rid=None) -> TBool: def check_pause(self, rid=None) -> bool:
if rid is None: if rid is None:
rid = self.rid rid = self.rid
return self._check_pause(rid) return self._check_pause(rid)
@ -182,31 +182,6 @@ def examine(device_mgr, dataset_mgr, file):
del sys.modules[key] del sys.modules[key]
def setup_diagnostics(experiment_file, repository_path):
def render_diagnostic(self, diagnostic):
message = "While compiling {}\n".format(experiment_file) + \
_render_diagnostic(diagnostic, colored=False)
if repository_path is not None:
message = message.replace(repository_path, "<repository>")
if diagnostic.level == "warning":
logging.warning(message)
else:
logging.error(message)
# This is kind of gross, but 1) we do not have any explicit connection
# between the worker and a coredevice.core.Core instance at all,
# and 2) the diagnostic engine really ought to be per-Core, since
# that's what uses it and the repository path is per-Core.
# So I don't know how to implement this properly for now.
#
# This hack is as good or bad as any other solution that involves
# putting inherently local objects (the diagnostic engine) into
# global slots, and there isn't any point in making it prettier by
# wrapping it in layers of indirection.
artiq.coredevice.core._DiagnosticEngine.render_diagnostic = \
render_diagnostic
def put_completed(): def put_completed():
put_object({"action": "completed"}) put_object({"action": "completed"})
@ -278,7 +253,6 @@ def main():
else: else:
experiment_file = expid["file"] experiment_file = expid["file"]
repository_path = None repository_path = None
setup_diagnostics(experiment_file, repository_path)
exp = get_experiment(experiment_file, expid["class_name"]) exp = get_experiment(experiment_file, expid["class_name"])
device_mgr.virtual_devices["scheduler"].set_run_info( device_mgr.virtual_devices["scheduler"].set_run_info(
rid, obj["pipeline_name"], expid, obj["priority"]) rid, obj["pipeline_name"], expid, obj["priority"])