forked from M-Labs/nac3
93 lines
3.5 KiB
Python
93 lines
3.5 KiB
Python
class EmbeddingMap:
|
|
def __init__(self, old_embedding_map=None):
|
|
self.object_inverse_map = {}
|
|
self.object_map = {}
|
|
self.string_map = {}
|
|
self.string_reverse_map = {}
|
|
self.function_map = {}
|
|
self.attributes_writeback = []
|
|
self.subkernel_message_map = {}
|
|
|
|
if not old_embedding_map is None:
|
|
for key, obj_ref in old_embedding_map.subkernels().items():
|
|
self.object_map[key] = obj_ref
|
|
obj_id = id(obj_ref)
|
|
self.object_inverse_map[obj_id] = key
|
|
|
|
# preallocate exception names
|
|
self.preallocate_runtime_exception_names(["RuntimeError",
|
|
"RTIOUnderflow",
|
|
"RTIOOverflow",
|
|
"RTIODestinationUnreachable",
|
|
"DMAError",
|
|
"I2CError",
|
|
"CacheError",
|
|
"SPIError",
|
|
"0:ZeroDivisionError",
|
|
"0:IndexError",
|
|
"0:ValueError",
|
|
"0:RuntimeError",
|
|
"0:AssertionError",
|
|
"0:KeyError",
|
|
"0:NotImplementedError",
|
|
"0:OverflowError",
|
|
"0:IOError",
|
|
"0:UnwrapNoneError"])
|
|
|
|
def preallocate_runtime_exception_names(self, names):
|
|
for i, name in enumerate(names):
|
|
if ":" not in name:
|
|
name = "0:artiq.coredevice.exceptions." + name
|
|
exn_id = self.store_str(name)
|
|
assert exn_id == i
|
|
|
|
def store_function(self, key, fun):
|
|
self.function_map[key] = fun
|
|
return key
|
|
|
|
def store_object(self, obj):
|
|
obj_id = id(obj)
|
|
if obj_id in self.object_inverse_map:
|
|
return self.object_inverse_map[obj_id]
|
|
key = len(self.object_map) + 1
|
|
self.object_map[key] = obj
|
|
self.object_inverse_map[obj_id] = key
|
|
return key
|
|
|
|
def store_str(self, s):
|
|
if s in self.string_reverse_map:
|
|
return self.string_reverse_map[s]
|
|
key = len(self.string_map)
|
|
self.string_map[key] = s
|
|
self.string_reverse_map[s] = key
|
|
return key
|
|
|
|
def retrieve_function(self, key):
|
|
return self.function_map[key]
|
|
|
|
def retrieve_object(self, key):
|
|
return self.object_map[key]
|
|
|
|
def retrieve_str(self, key):
|
|
return self.string_map[key]
|
|
|
|
def subkernels(self):
|
|
subkernels = {}
|
|
for k, v in self.object_map.items():
|
|
if getattr(v, "__artiq_destination__") is not None:
|
|
subkernels[k] = v
|
|
return subkernels
|
|
|
|
def subkernel_messages(self):
|
|
messages = {}
|
|
for msg_id in self.subkernel_message_map.values():
|
|
messages[msg_id] = self.retrieve_object(msg_id)
|
|
return messages
|
|
|
|
def subkernel_messages_unpaired(self):
|
|
unpaired = []
|
|
for msg_id in self.subkernel_message_map.values():
|
|
msg_obj = self.retrieve_object(msg_id)
|
|
if msg_obj.send_loc is None or msg_obj.recv_loc is None:
|
|
unpaired.append(msg_obj)
|
|
return unpaired |