1
0
forked from M-Labs/nac3
nac3/nac3artiq/demo/embedding_map.py

93 lines
3.5 KiB
Python
Raw Normal View History

2022-02-12 21:17:37 +08:00
class EmbeddingMap:
2024-09-03 16:58:57 +08:00
def __init__(self, old_embedding_map=None):
2022-02-12 21:17:37 +08:00
self.object_inverse_map = {}
self.object_map = {}
self.string_map = {}
self.string_reverse_map = {}
self.function_map = {}
self.attributes_writeback = []
2024-09-03 16:58:57 +08:00
self.subkernel_message_map = {}
if not old_embedding_map is None:
for key, obj_ref in old_embedding_map.subkernels().items():
self.object_map[key] = obj_ref
obj_id = id(obj_ref)
self.object_inverse_map[obj_id] = key
2022-02-12 21:17:37 +08:00
# preallocate exception names
self.preallocate_runtime_exception_names(["RuntimeError",
"RTIOUnderflow",
"RTIOOverflow",
"RTIODestinationUnreachable",
"DMAError",
"I2CError",
"CacheError",
"SPIError",
"0:ZeroDivisionError",
"0:IndexError",
"0:ValueError",
"0:RuntimeError",
"0:AssertionError",
"0:KeyError",
"0:NotImplementedError",
"0:OverflowError",
"0:IOError",
"0:UnwrapNoneError"])
2022-02-12 21:17:37 +08:00
def preallocate_runtime_exception_names(self, names):
for i, name in enumerate(names):
if ":" not in name:
name = "0:artiq.coredevice.exceptions." + name
exn_id = self.store_str(name)
assert exn_id == i
def store_function(self, key, fun):
self.function_map[key] = fun
return key
def store_object(self, obj):
obj_id = id(obj)
if obj_id in self.object_inverse_map:
return self.object_inverse_map[obj_id]
key = len(self.object_map) + 1
2022-02-12 21:17:37 +08:00
self.object_map[key] = obj
self.object_inverse_map[obj_id] = key
return key
def store_str(self, s):
if s in self.string_reverse_map:
return self.string_reverse_map[s]
key = len(self.string_map)
self.string_map[key] = s
self.string_reverse_map[s] = key
return key
def retrieve_function(self, key):
return self.function_map[key]
def retrieve_object(self, key):
return self.object_map[key]
def retrieve_str(self, key):
return self.string_map[key]
2024-09-03 16:58:57 +08:00
def subkernels(self):
subkernels = {}
for k, v in self.object_map.items():
if getattr(v, "__artiq_destination__") is not None:
subkernels[k] = v
return subkernels
def subkernel_messages(self):
messages = {}
for msg_id in self.subkernel_message_map.values():
messages[msg_id] = self.retrieve_object(msg_id)
return messages
def subkernel_messages_unpaired(self):
unpaired = []
for msg_id in self.subkernel_message_map.values():
msg_obj = self.retrieve_object(msg_id)
if msg_obj.send_loc is None or msg_obj.recv_loc is None:
unpaired.append(msg_obj)
return unpaired