forked from M-Labs/artiq
148 lines
4.8 KiB
Python
148 lines
4.8 KiB
Python
import asyncio
|
|
|
|
import lmdb
|
|
|
|
from sipyco.sync_struct import (Notifier, process_mod, ModAction,
|
|
update_from_dict)
|
|
from sipyco import pyon
|
|
from sipyco.asyncio_tools import TaskObject
|
|
|
|
from artiq.tools import file_import
|
|
|
|
|
|
def device_db_from_file(filename):
|
|
mod = file_import(filename)
|
|
|
|
# use __dict__ instead of direct attribute access
|
|
# for backwards compatibility of the exception interface
|
|
# (raise KeyError and not AttributeError if device_db is missing)
|
|
return mod.__dict__["device_db"]
|
|
|
|
|
|
class DeviceDB:
|
|
def __init__(self, backing_file):
|
|
self.backing_file = backing_file
|
|
self.data = Notifier(device_db_from_file(self.backing_file))
|
|
|
|
def scan(self):
|
|
update_from_dict(self.data, device_db_from_file(self.backing_file))
|
|
|
|
def get_device_db(self):
|
|
return self.data.raw_view
|
|
|
|
def get(self, key, resolve_alias=False):
|
|
desc = self.data.raw_view[key]
|
|
if resolve_alias:
|
|
while isinstance(desc, str):
|
|
desc = self.data.raw_view[desc]
|
|
return desc
|
|
|
|
def get_satellite_cpu_target(self, destination):
|
|
return self.data.raw_view["satellite_cpu_targets"][destination]
|
|
|
|
|
|
class DatasetDB(TaskObject):
|
|
def __init__(self, persist_file, autosave_period=30):
|
|
self.persist_file = persist_file
|
|
self.autosave_period = autosave_period
|
|
|
|
self.lmdb = lmdb.open(persist_file, subdir=False, map_size=2**30)
|
|
data = dict()
|
|
with self.lmdb.begin() as txn:
|
|
for key, value_and_metadata in txn.cursor():
|
|
value, metadata = pyon.decode(value_and_metadata.decode())
|
|
data[key.decode()] = (True, value, metadata)
|
|
self.data = Notifier(data)
|
|
self.pending_keys = set()
|
|
|
|
def close_db(self):
|
|
self.lmdb.close()
|
|
|
|
def save(self):
|
|
with self.lmdb.begin(write=True) as txn:
|
|
for key in self.pending_keys:
|
|
if (key not in self.data.raw_view
|
|
or not self.data.raw_view[key][0]):
|
|
txn.delete(key.encode())
|
|
else:
|
|
value_and_metadata = (self.data.raw_view[key][1],
|
|
self.data.raw_view[key][2])
|
|
txn.put(key.encode(),
|
|
pyon.encode(value_and_metadata).encode())
|
|
self.pending_keys.clear()
|
|
|
|
async def _do(self):
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(self.autosave_period)
|
|
self.save()
|
|
finally:
|
|
self.save()
|
|
|
|
def get(self, key):
|
|
return self.data.raw_view[key][1]
|
|
|
|
def get_metadata(self, key):
|
|
return self.data.raw_view[key][2]
|
|
|
|
def update(self, mod):
|
|
if mod["path"]:
|
|
key = mod["path"][0]
|
|
else:
|
|
assert (mod["action"] == ModAction.setitem.value
|
|
or mod["action"] == ModAction.delitem.value)
|
|
key = mod["key"]
|
|
self.pending_keys.add(key)
|
|
process_mod(self.data, mod)
|
|
|
|
# convenience functions (update() can be used instead)
|
|
def set(self, key, value, persist=None, metadata=None):
|
|
if persist is None:
|
|
if key in self.data.raw_view:
|
|
persist = self.data.raw_view[key][0]
|
|
else:
|
|
persist = False
|
|
if metadata is None:
|
|
if key in self.data.raw_view:
|
|
metadata = self.data.raw_view[key][2]
|
|
else:
|
|
metadata = {}
|
|
self.data[key] = (persist, value, metadata)
|
|
self.pending_keys.add(key)
|
|
|
|
def delete(self, key):
|
|
del self.data[key]
|
|
self.pending_keys.add(key)
|
|
#
|
|
|
|
|
|
class InteractiveArgDB:
|
|
def __init__(self):
|
|
self.pending = Notifier(dict())
|
|
self.futures = dict()
|
|
|
|
async def get(self, rid, arglist_desc, title):
|
|
self.pending[rid] = {"title": title, "arglist_desc": arglist_desc}
|
|
self.futures[rid] = asyncio.get_running_loop().create_future()
|
|
try:
|
|
value = await self.futures[rid]
|
|
finally:
|
|
del self.pending[rid]
|
|
del self.futures[rid]
|
|
return value
|
|
|
|
def supply(self, rid, values):
|
|
# quick sanity checks
|
|
if rid not in self.futures or self.futures[rid].done():
|
|
raise ValueError("no experiment with this RID is "
|
|
"waiting for interactive arguments")
|
|
if {i[0] for i in self.pending.raw_view[rid]["arglist_desc"]} != set(values.keys()):
|
|
raise ValueError("supplied and requested keys do not match")
|
|
self.futures[rid].set_result(values)
|
|
|
|
def cancel(self, rid):
|
|
if rid not in self.futures or self.futures[rid].done():
|
|
raise ValueError("no experiment with this RID is "
|
|
"waiting for interactive arguments")
|
|
self.futures[rid].set_result(None)
|