NAC3 integration WIP

This commit is contained in:
Sebastien Bourdeauducq 2021-11-10 12:18:20 +08:00
parent 2f60a38a9c
commit c8ebd80fe2
4 changed files with 111 additions and 345 deletions

View File

@ -1,55 +1,29 @@
import os, sys import os, sys
import numpy from numpy import int32, int64
from artiq import __artiq_dir__ as artiq_dir import nac3artiq
from artiq.language.core import * from artiq.language.core import *
from artiq.language.types import * from artiq.language import core as core_language
from artiq.language.units import * from artiq.language.units import *
from artiq.compiler.module import Module
from artiq.compiler.embedding import Stitcher
from artiq.compiler.targets import RISCVTarget, CortexA9Target
from artiq.coredevice.comm_kernel import CommKernel, CommKernelDummy from artiq.coredevice.comm_kernel import CommKernel, CommKernelDummy
# Import for side effects (creating the exception classes).
from artiq.coredevice import exceptions
def _render_diagnostic(diagnostic, colored): @extern
def shorten_path(path): def rtio_init():
return path.replace(artiq_dir, "<artiq>")
lines = [shorten_path(path) for path in diagnostic.render(colored=colored)]
return "\n".join(lines)
colors_supported = os.name == "posix"
class _DiagnosticEngine(diagnostic.Engine):
def render_diagnostic(self, diagnostic):
sys.stderr.write(_render_diagnostic(diagnostic, colored=colors_supported) + "\n")
class CompileError(Exception):
def __init__(self, diagnostic):
self.diagnostic = diagnostic
def __str__(self):
# Prepend a newline so that the message shows up on after
# exception class name printed by Python.
return "\n" + _render_diagnostic(self.diagnostic, colored=colors_supported)
@syscall
def rtio_init() -> TNone:
raise NotImplementedError("syscall not simulated") raise NotImplementedError("syscall not simulated")
@syscall(flags={"nounwind", "nowrite"}) @extern
def rtio_get_destination_status(linkno: TInt32) -> TBool: def rtio_get_destination_status(destination: int32) -> bool:
raise NotImplementedError("syscall not simulated") raise NotImplementedError("syscall not simulated")
@syscall(flags={"nounwind", "nowrite"}) @extern
def rtio_get_counter() -> TInt64: def rtio_get_counter() -> int64:
raise NotImplementedError("syscall not simulated") raise NotImplementedError("syscall not simulated")
@nac3
class Core: class Core:
"""Core device driver. """Core device driver.
@ -64,89 +38,65 @@ class Core:
and the RTIO coarse timestamp frequency (e.g. SERDES multiplication and the RTIO coarse timestamp frequency (e.g. SERDES multiplication
factor). factor).
""" """
ref_period: KernelInvariant[float]
kernel_invariants = { ref_multiplier: KernelInvariant[int32]
"core", "ref_period", "coarse_ref_period", "ref_multiplier", coarse_ref_period: KernelInvariant[float]
}
def __init__(self, dmgr, host, ref_period, ref_multiplier=8, target="riscv"): def __init__(self, dmgr, host, ref_period, ref_multiplier=8, target="riscv"):
self.ref_period = ref_period self.ref_period = ref_period
self.ref_multiplier = ref_multiplier self.ref_multiplier = ref_multiplier
if target == "riscv":
self.target_cls = RISCVTarget
elif target == "cortexa9":
self.target_cls = CortexA9Target
else:
raise ValueError("Unsupported target")
self.coarse_ref_period = ref_period*ref_multiplier self.coarse_ref_period = ref_period*ref_multiplier
if host is None: if host is None:
self.comm = CommKernelDummy() self.comm = CommKernelDummy()
else: else:
self.comm = CommKernel(host) self.comm = CommKernel(host)
self.first_run = True self.first_run = True
self.dmgr = dmgr self.dmgr = dmgr
self.core = self self.core = self
self.comm.core = self self.comm.core = self
self.compiler = nac3artiq.NAC3(target)
def close(self): def close(self):
self.comm.close() self.comm.close()
def compile(self, function, args, kwargs, set_result=None, def compile(self, method, args, kwargs, file_output=None):
attribute_writeback=True, print_as_rpc=True): if core_language._allow_module_registration:
try: self.compiler.analyze_modules(core_language._registered_modules)
engine = _DiagnosticEngine(all_errors_are_fatal=True) core_language._allow_module_registration = False
stitcher = Stitcher(engine=engine, core=self, dmgr=self.dmgr, if hasattr(method, "__self__"):
print_as_rpc=print_as_rpc) obj = method.__self__
stitcher.stitch_call(function, args, kwargs, set_result) name = method.__name__
stitcher.finalize() else:
obj = method
name = ""
module = Module(stitcher, if file_output is None:
ref_period=self.ref_period, return self.compiler.compile_method_to_mem(obj, name, args)
attribute_writeback=attribute_writeback) else:
target = self.target_cls() self.compiler.compile_method_to_file(obj, name, args, file_output)
library = target.compile_and_link([module])
stripped_library = target.strip(library)
return stitcher.embedding_map, stripped_library, \
lambda addresses: target.symbolize(library, addresses), \
lambda symbols: target.demangle(symbols)
except diagnostic.Error as error:
raise CompileError(error.diagnostic) from error
def run(self, function, args, kwargs): def run(self, function, args, kwargs):
result = None kernel_library = self.compile(function, args, kwargs)
@rpc(flags={"async"})
def set_result(new_result):
nonlocal result
result = new_result
embedding_map, kernel_library, symbolizer, demangler = \
self.compile(function, args, kwargs, set_result)
if self.first_run: if self.first_run:
self.comm.check_system_info() self.comm.check_system_info()
self.first_run = False self.first_run = False
self.comm.load(kernel_library) self.comm.load(kernel_library)
self.comm.run() self.comm.run()
self.comm.serve(embedding_map, symbolizer, demangler) self.comm.serve(None, None, None)
return result return result
@portable @portable
def seconds_to_mu(self, seconds): def seconds_to_mu(self, seconds: float):
"""Convert seconds to the corresponding number of machine units """Convert seconds to the corresponding number of machine units
(RTIO cycles). (RTIO cycles).
:param seconds: time (in seconds) to convert. :param seconds: time (in seconds) to convert.
""" """
return numpy.int64(seconds//self.ref_period) return int64(seconds//self.ref_period)
@portable @portable
def mu_to_seconds(self, mu): def mu_to_seconds(self, mu: int64):
"""Convert machine units (RTIO cycles) to seconds. """Convert machine units (RTIO cycles) to seconds.
:param mu: cycle count to convert. :param mu: cycle count to convert.
@ -154,7 +104,11 @@ class Core:
return mu*self.ref_period return mu*self.ref_period
@kernel @kernel
def get_rtio_counter_mu(self): def delay(self, dt: float):
delay_mu(self.seconds_to_mu(dt))
@kernel
def get_rtio_counter_mu(self) -> int64:
"""Retrieve the current value of the hardware RTIO timeline counter. """Retrieve the current value of the hardware RTIO timeline counter.
As the timing of kernel code executed on the CPU is inherently As the timing of kernel code executed on the CPU is inherently
@ -167,7 +121,7 @@ class Core:
return rtio_get_counter() return rtio_get_counter()
@kernel @kernel
def wait_until_mu(self, cursor_mu): def wait_until_mu(self, cursor_mu: int64):
"""Block execution until the hardware RTIO counter reaches the given """Block execution until the hardware RTIO counter reaches the given
value (see :meth:`get_rtio_counter_mu`). value (see :meth:`get_rtio_counter_mu`).
@ -178,7 +132,7 @@ class Core:
pass pass
@kernel @kernel
def get_rtio_destination_status(self, destination): def get_rtio_destination_status(self, destination: int32):
"""Returns whether the specified RTIO destination is up. """Returns whether the specified RTIO destination is up.
This is particularly useful in startup kernels to delay This is particularly useful in startup kernels to delay
startup until certain DRTIO destinations are up.""" startup until certain DRTIO destinations are up."""
@ -190,7 +144,7 @@ class Core:
at the current value of the hardware RTIO counter plus a margin of at the current value of the hardware RTIO counter plus a margin of
125000 machine units.""" 125000 machine units."""
rtio_init() rtio_init()
at_mu(rtio_get_counter() + 125000) at_mu(rtio_get_counter() + int64(125000))
@kernel @kernel
def break_realtime(self): def break_realtime(self):
@ -199,6 +153,6 @@ class Core:
If the time cursor is already after that position, this function If the time cursor is already after that position, this function
does nothing.""" does nothing."""
min_now = rtio_get_counter() + 125000 min_now = rtio_get_counter() + int64(125000)
if now_mu() < min_now: if now_mu() < min_now:
at_mu(min_now) at_mu(min_now)

View File

@ -8,7 +8,6 @@ from artiq import __version__ as artiq_version
from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.master.worker_db import DeviceManager, DatasetManager
from artiq.language.environment import ProcessArgumentManager from artiq.language.environment import ProcessArgumentManager
from artiq.coredevice.core import CompileError
from artiq.tools import * from artiq.tools import *
@ -47,6 +46,11 @@ def main():
device_mgr = DeviceManager(DeviceDB(args.device_db)) device_mgr = DeviceManager(DeviceDB(args.device_db))
dataset_mgr = DatasetManager(DatasetDB(args.dataset_db)) dataset_mgr = DatasetManager(DatasetDB(args.dataset_db))
output = args.output
if output is None:
basename, ext = os.path.splitext(args.file)
output = "{}.elf".format(basename)
try: try:
module = file_import(args.file, prefix="artiq_run_") module = file_import(args.file, prefix="artiq_run_")
exp = get_experiment(module, args.class_name) exp = get_experiment(module, args.class_name)
@ -54,29 +58,12 @@ def main():
argument_mgr = ProcessArgumentManager(arguments) argument_mgr = ProcessArgumentManager(arguments)
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {})) exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
if not hasattr(exp.run, "artiq_embedded"): if not getattr(exp.run, "__artiq_kernel__", False):
raise ValueError("Experiment entry point must be a kernel") raise ValueError("Experiment entry point must be a kernel")
core_name = exp.run.artiq_embedded.core_name exp_inst.core.compile(exp_inst.run, [], {}, file_output=output)
core = getattr(exp_inst, core_name)
object_map, kernel_library, _, _ = \
core.compile(exp.run, [exp_inst], {},
attribute_writeback=False, print_as_rpc=False)
except CompileError as error:
return
finally: finally:
device_mgr.close_devices() device_mgr.close_devices()
if object_map.has_rpc():
raise ValueError("Experiment must not use RPC")
output = args.output
if output is None:
basename, ext = os.path.splitext(args.file)
output = "{}.elf".format(basename)
with open(output, "wb") as f:
f.write(kernel_library)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -2,273 +2,95 @@
Core ARTIQ extensions to the Python language. Core ARTIQ extensions to the Python language.
""" """
from collections import namedtuple from typing import Generic, TypeVar
from functools import wraps from functools import wraps
import numpy from inspect import getfullargspec, getmodule
from types import SimpleNamespace
__all__ = ["kernel", "portable", "rpc", "syscall", "host_only", __all__ = [
"kernel_from_string", "set_time_manager", "set_watchdog_factory", "KernelInvariant",
"TerminationRequested"] "extern", "kernel", "portable", "nac3", "rpc",
"parallel", "sequential",
# global namespace for kernels "set_watchdog_factory", "watchdog", "TerminationRequested"
kernel_globals = ( ]
"sequential", "parallel", "interleave",
"delay_mu", "now_mu", "at_mu", "delay",
"watchdog"
)
__all__.extend(kernel_globals)
_ARTIQEmbeddedInfo = namedtuple("_ARTIQEmbeddedInfo", T = TypeVar('T')
"core_name portable function syscall forbidden flags") class KernelInvariant(Generic[T]):
pass
def kernel(arg=None, flags={}):
"""
This decorator marks an object's method for execution on the core
device.
When a decorated method is called from the Python interpreter, the :attr:`core` _allow_module_registration = True
attribute of the object is retrieved and used as core device driver. The _registered_modules = set()
core device driver will typically compile, transfer and run the method
(kernel) on the device.
When kernels call another method: def _register_module_of(obj):
assert _allow_module_registration
# Delay NAC3 analysis until all referenced variables are supposed to exist on the CPython side.
_registered_modules.add(getmodule(obj))
- if the method is a kernel for the same core device, it is compiled
and sent in the same binary. Calls between kernels happen entirely on
the device.
- if the method is a regular Python method (not a kernel), it generates
a remote procedure call (RPC) for execution on the host.
The decorator takes an optional parameter that defaults to :attr`core` and def extern(function):
specifies the name of the attribute to use as core device driver. """Decorates a function declaration defined by the core device runtime."""
_register_module_of(function)
return function
This decorator must be present in the global namespace of all modules using
it for the import cache to work properly. def kernel(function_or_method):
""" """Decorates a function or method to be executed on the core device."""
if isinstance(arg, str): _register_module_of(function_or_method)
def inner_decorator(function): argspec = getfullargspec(function_or_method)
@wraps(function) if argspec.args and argspec.args[0] == "self":
def run_on_core(self, *k_args, **k_kwargs): @wraps(function_or_method)
return getattr(self, arg).run(run_on_core, ((self,) + k_args), k_kwargs) def run_on_core(self, *args, **kwargs):
run_on_core.artiq_embedded = _ARTIQEmbeddedInfo( fake_method = SimpleNamespace(__self__=self, __name__=function_or_method.__name__)
core_name=arg, portable=False, function=function, syscall=None, self.core.run(fake_method, *args, **kwargs)
forbidden=False, flags=set(flags)) else:
@wraps(function_or_method)
def run_on_core(*args, **kwargs):
raise RuntimeError("Kernel functions need explicit core.run()")
run_on_core.__artiq_kernel__ = True
return run_on_core return run_on_core
return inner_decorator
elif arg is None:
def inner_decorator(function):
return kernel(function, flags)
return inner_decorator
else:
return kernel("core", flags)(arg)
def portable(arg=None, flags={}):
def portable(function):
"""Decorates a function or method to be executed on the same device (host/core device) as the caller."""
_register_module_of(function)
return function
def nac3(cls):
""" """
This decorator marks a function for execution on the same device as its Decorates a class to be analyzed by NAC3.
caller. All classes containing kernels or portable methods must use this decorator.
In other words, a decorated function called from the interpreter on the
host will be executed on the host (no compilation and execution on the
core device). A decorated function called from a kernel will be executed
on the core device (no RPC).
This decorator must be present in the global namespace of all modules using
it for the import cache to work properly.
""" """
if arg is None: _register_module_of(cls)
def inner_decorator(function): return cls
return portable(function, flags)
return inner_decorator
else:
arg.artiq_embedded = \
_ARTIQEmbeddedInfo(core_name=None, portable=True, function=arg, syscall=None,
forbidden=False, flags=set(flags))
return arg
def rpc(arg=None, flags={}): def rpc(arg=None, flags={}):
""" """
This decorator marks a function for execution on the host interpreter. This decorator marks a function for execution on the host interpreter.
This is also the default behavior of ARTIQ; however, this decorator allows
specifying additional flags.
""" """
if arg is None: if arg is None:
def inner_decorator(function): def inner_decorator(function):
return rpc(function, flags) return rpc(function, flags)
return inner_decorator return inner_decorator
else:
arg.artiq_embedded = \
_ARTIQEmbeddedInfo(core_name=None, portable=False, function=arg, syscall=None,
forbidden=False, flags=set(flags))
return arg
def syscall(arg=None, flags={}):
"""
This decorator marks a function as a system call. When executed on a core
device, a C function with the provided name (or the same name as
the Python function, if not provided) will be called. When executed on
host, the Python function will be called as usual.
Every argument and the return value must be annotated with ARTIQ types.
Only drivers should normally define syscalls.
"""
if isinstance(arg, str):
def inner_decorator(function):
function.artiq_embedded = \
_ARTIQEmbeddedInfo(core_name=None, portable=False, function=None,
syscall=arg, forbidden=False,
flags=set(flags))
return function
return inner_decorator
elif arg is None:
def inner_decorator(function):
return syscall(function.__name__, flags)(function)
return inner_decorator
else:
return syscall(arg.__name__)(arg)
def host_only(function):
"""
This decorator marks a function so that it can only be executed
in the host Python interpreter.
"""
function.artiq_embedded = \
_ARTIQEmbeddedInfo(core_name=None, portable=False, function=None, syscall=None,
forbidden=True, flags={})
return function
def kernel_from_string(parameters, body_code, decorator=kernel): @nac3
"""Build a kernel function from the supplied source code in string form, class KernelContextManager:
similar to ``exec()``/``eval()``. @kernel
Operating on pieces of source code as strings is a very brittle form of
metaprogramming; kernels generated like this are hard to debug, and
inconvenient to write. Nevertheless, this can sometimes be useful to work
around restrictions in ARTIQ Python. In that instance, care should be taken
to keep string-generated code to a minimum and cleanly separate it from
surrounding code.
The resulting function declaration is also evaluated using ``exec()`` for
use from host Python code. To encourage a modicum of code hygiene, no
global symbols are available by default; any objects accessed by the
function body must be passed in explicitly as parameters.
:param parameters: A list of parameter names the generated functions
accepts. Each entry can either be a string or a tuple of two strings;
if the latter, the second element specifies the type annotation.
:param body_code: The code for the function body, in string form.
``return`` statements can be used to return values, as usual.
:param decorator: One of ``kernel`` or ``portable`` (optionally with
parameters) to specify how the function will be executed.
:return: The function generated from the arguments.
"""
# Build complete function declaration.
decl = "def kernel_from_string_fn("
for p in parameters:
type_annotation = ""
if isinstance(p, tuple):
name, typ = p
type_annotation = ": " + typ
else:
name = p
decl += name + type_annotation + ","
decl += "):\n"
decl += "\n".join(" " + line for line in body_code.split("\n"))
# Evaluate to get host-side function declaration.
context = {}
try:
exec(decl, context)
except SyntaxError:
raise SyntaxError("Error parsing kernel function: '{}'".format(decl))
fn = decorator(context["kernel_from_string_fn"])
# Save source code for the compiler to pick up later.
fn.artiq_embedded = fn.artiq_embedded._replace(function=decl)
return fn
class _DummyTimeManager:
def _not_implemented(self, *args, **kwargs):
raise NotImplementedError(
"Attempted to interpret kernel without a time manager")
enter_sequential = _not_implemented
enter_parallel = _not_implemented
exit = _not_implemented
take_time_mu = _not_implemented
get_time_mu = _not_implemented
set_time_mu = _not_implemented
take_time = _not_implemented
_time_manager = _DummyTimeManager()
def set_time_manager(time_manager):
"""Set the time manager used for simulating kernels by running them
directly inside the Python interpreter. The time manager responds to the
entering and leaving of interleave/parallel/sequential blocks, delays, etc. and
provides a time-stamped logging facility for events.
"""
global _time_manager
_time_manager = time_manager
class _Sequential:
"""In a sequential block, statements are executed one after another, with
the time increasing as one moves down the statement list."""
def __enter__(self): def __enter__(self):
_time_manager.enter_sequential() pass
def __exit__(self, type, value, traceback): @kernel
_time_manager.exit() def __exit__(self):
sequential = _Sequential() pass
parallel = KernelContextManager()
sequential = KernelContextManager()
class _Parallel:
"""In a parallel block, all top-level statements start their execution at
the same time.
The execution time of a parallel block is the execution time of its longest
statement. A parallel block may contain sequential blocks, which themselves
may contain interleave blocks, etc.
"""
def __enter__(self):
_time_manager.enter_parallel()
def __exit__(self, type, value, traceback):
_time_manager.exit()
parallel = _Parallel()
interleave = _Parallel() # no difference in semantics on host
def delay_mu(duration):
"""Increases the RTIO time by the given amount (in machine units)."""
_time_manager.take_time_mu(duration)
def now_mu():
"""Retrieve the current RTIO timeline cursor, in machine units.
Note the conceptual difference between this and the current value of the
hardware RTIO counter; see e.g.
:meth:`artiq.coredevice.core.Core.get_rtio_counter_mu` for the latter.
"""
return _time_manager.get_time_mu()
def at_mu(time):
"""Sets the RTIO time to the specified absolute value, in machine units."""
_time_manager.set_time_mu(time)
def delay(duration):
"""Increases the RTIO time by the given amount (in seconds)."""
_time_manager.take_time(duration)
class _DummyWatchdog: class _DummyWatchdog:

View File

@ -80,6 +80,9 @@ def file_import(filename, prefix="file_import_"):
try: try:
spec = importlib.util.spec_from_file_location(modname, filename) spec = importlib.util.spec_from_file_location(modname, filename)
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
# Add to sys.modules, otherwise inspect thinks it is a built-in module and often breaks.
# This must take place before module execution because NAC3 decorators call inspect.
sys.modules[modname] = module
spec.loader.exec_module(module) spec.loader.exec_module(module)
finally: finally:
sys.path.remove(path) sys.path.remove(path)