diff --git a/artiq/coredevice/comm_kernel.py b/artiq/coredevice/comm_kernel.py index 3d5b8dea9..b6ffb8ee7 100644 --- a/artiq/coredevice/comm_kernel.py +++ b/artiq/coredevice/comm_kernel.py @@ -23,6 +23,8 @@ class Request(Enum): RPCReply = 7 RPCException = 8 + SubkernelUpload = 9 + class Reply(Enum): SystemInfo = 2 @@ -208,6 +210,7 @@ class CommKernel: self.unpack_float64 = struct.Struct(self.endian + "d").unpack self.pack_header = struct.Struct(self.endian + "lB").pack + self.pack_int8 = struct.Struct(self.endian + "B").pack self.pack_int32 = struct.Struct(self.endian + "l").pack self.pack_int64 = struct.Struct(self.endian + "q").pack self.pack_float64 = struct.Struct(self.endian + "d").pack @@ -322,7 +325,7 @@ class CommKernel: self._write(chunk) def _write_int8(self, value): - self._write(value) + self._write(self.pack_int8(value)) def _write_int32(self, value): self._write(self.pack_int32(value)) @@ -382,6 +385,19 @@ class CommKernel: else: self._read_expect(Reply.LoadCompleted) + def upload_subkernel(self, kernel_library, id, destination): + self._write_header(Request.SubkernelUpload) + self._write_int32(id) + self._write_int8(destination) + self._write_bytes(kernel_library) + self._flush() + + self._read_header() + if self._read_type == Reply.LoadFailed: + raise LoadError(self._read_string()) + else: + self._read_expect(Reply.LoadCompleted) + def run(self): self._write_empty(Request.RunKernel) self._flush() diff --git a/artiq/coredevice/core.py b/artiq/coredevice/core.py index 928d7eb17..f5a872335 100644 --- a/artiq/coredevice/core.py +++ b/artiq/coredevice/core.py @@ -1,5 +1,6 @@ import os, sys import numpy +from inspect import getfullargspec from functools import wraps from pythonparser import diagnostic @@ -103,12 +104,13 @@ class Core: def compile(self, function, args, kwargs, set_result=None, attribute_writeback=True, print_as_rpc=True, - target=None): + target=None, destination=0, subkernel_arg_types=[]): try: engine = _DiagnosticEngine(all_errors_are_fatal=True) stitcher = Stitcher(engine=engine, core=self, dmgr=self.dmgr, - print_as_rpc=print_as_rpc) + print_as_rpc=print_as_rpc, + destination=destination, subkernel_arg_types=subkernel_arg_types) stitcher.stitch_call(function, args, kwargs, set_result) stitcher.finalize() @@ -122,7 +124,8 @@ class Core: return stitcher.embedding_map, stripped_library, \ lambda addresses: target.symbolize(library, addresses), \ - lambda symbols: target.demangle(symbols) + lambda symbols: target.demangle(symbols), \ + module.subkernel_arg_types except diagnostic.Error as error: raise CompileError(error.diagnostic) from error @@ -140,11 +143,32 @@ class Core: def set_result(new_result): nonlocal result result = new_result - embedding_map, kernel_library, symbolizer, demangler = \ + embedding_map, kernel_library, symbolizer, demangler, subkernel_arg_types = \ self.compile(function, args, kwargs, set_result) + self.compile_subkernels(embedding_map, args, subkernel_arg_types) self._run_compiled(kernel_library, embedding_map, symbolizer, demangler) return result + def compile_subkernels(self, embedding_map, args, subkernel_arg_types): + for sid, subkernel_fn in embedding_map.subkernels().items(): + # pass self to subkernels (if applicable) + # assuming the first argument is self + subkernel_args = getfullargspec(subkernel_fn.artiq_embedded.function) + self_arg = [] + if len(subkernel_args[0]) > 0: + if subkernel_args[0][0] == 'self': + self_arg = args[:1] + destination = subkernel_fn.artiq_embedded.destination + destination_tgt = self.dmgr.ddb.get_satellite_cpu_target(destination) + target = get_target_cls(destination_tgt)(subkernel_id=sid) + object_map, kernel_library, _, _, _ = \ + self.compile(subkernel_fn, self_arg, {}, attribute_writeback=False, + print_as_rpc=False, target=target, destination=destination, + subkernel_arg_types=subkernel_arg_types.get(sid, [])) + if object_map.has_rpc_or_subkernel(): + raise ValueError("Subkernel must not use RPC or subkernels in other destinations") + self.comm.upload_subkernel(kernel_library, sid, destination) + def precompile(self, function, *args, **kwargs): """Precompile a kernel and return a callable that executes it on the core device at a later time. @@ -153,7 +177,7 @@ class Core: as additional positional and keyword arguments. The returned callable accepts no arguments. - Precompiled kernels may use RPCs. + Precompiled kernels may use RPCs and subkernels. Object attributes at the beginning of a precompiled kernel execution have the values they had at precompilation time. If up-to-date values are required, @@ -178,8 +202,9 @@ class Core: nonlocal result result = new_result - embedding_map, kernel_library, symbolizer, demangler = \ + embedding_map, kernel_library, symbolizer, demangler, subkernel_arg_types = \ self.compile(function, args, kwargs, set_result, attribute_writeback=False) + self.compile_subkernels(embedding_map, args, subkernel_arg_types) @wraps(function) def run_precompiled(): diff --git a/artiq/coredevice/exceptions.py b/artiq/coredevice/exceptions.py index 7b6967743..b40d3b552 100644 --- a/artiq/coredevice/exceptions.py +++ b/artiq/coredevice/exceptions.py @@ -148,6 +148,13 @@ class DMAError(Exception): artiq_builtin = True +class SubkernelError(Exception): + """Raised when an operation regarding a subkernel is invalid + or cannot be completed. + """ + artiq_builtin = True + + class ClockFailure(Exception): """Raised when RTIO PLL has lost lock.""" diff --git a/artiq/master/databases.py b/artiq/master/databases.py index db5760d31..ccdd6d022 100644 --- a/artiq/master/databases.py +++ b/artiq/master/databases.py @@ -36,6 +36,9 @@ class DeviceDB: desc = self.data.raw_view[desc] return desc + def get_satellite_cpu_target(self, destination): + return self.data.raw_view["satellite_cpu_targets"][destination] + class DatasetDB(TaskObject): def __init__(self, persist_file, autosave_period=30):