artiq/artiq/master/databases.py

148 lines
4.8 KiB
Python

import asyncio
import lmdb
from sipyco.sync_struct import (Notifier, process_mod, ModAction,
update_from_dict)
from sipyco import pyon
from sipyco.asyncio_tools import TaskObject
from artiq.tools import file_import
def device_db_from_file(filename):
mod = file_import(filename)
# use __dict__ instead of direct attribute access
# for backwards compatibility of the exception interface
# (raise KeyError and not AttributeError if device_db is missing)
return mod.__dict__["device_db"]
class DeviceDB:
def __init__(self, backing_file):
self.backing_file = backing_file
self.data = Notifier(device_db_from_file(self.backing_file))
def scan(self):
update_from_dict(self.data, device_db_from_file(self.backing_file))
def get_device_db(self):
return self.data.raw_view
def get(self, key, resolve_alias=False):
desc = self.data.raw_view[key]
if resolve_alias:
while isinstance(desc, str):
desc = self.data.raw_view[desc]
return desc
def get_satellite_cpu_target(self, destination):
return self.data.raw_view["satellite_cpu_targets"][destination]
class DatasetDB(TaskObject):
def __init__(self, persist_file, autosave_period=30):
self.persist_file = persist_file
self.autosave_period = autosave_period
self.lmdb = lmdb.open(persist_file, subdir=False, map_size=2**30)
data = dict()
with self.lmdb.begin() as txn:
for key, value_and_metadata in txn.cursor():
value, metadata = pyon.decode(value_and_metadata.decode())
data[key.decode()] = (True, value, metadata)
self.data = Notifier(data)
self.pending_keys = set()
def close_db(self):
self.lmdb.close()
def save(self):
with self.lmdb.begin(write=True) as txn:
for key in self.pending_keys:
if (key not in self.data.raw_view
or not self.data.raw_view[key][0]):
txn.delete(key.encode())
else:
value_and_metadata = (self.data.raw_view[key][1],
self.data.raw_view[key][2])
txn.put(key.encode(),
pyon.encode(value_and_metadata).encode())
self.pending_keys.clear()
async def _do(self):
try:
while True:
await asyncio.sleep(self.autosave_period)
self.save()
finally:
self.save()
def get(self, key):
return self.data.raw_view[key][1]
def get_metadata(self, key):
return self.data.raw_view[key][2]
def update(self, mod):
if mod["path"]:
key = mod["path"][0]
else:
assert (mod["action"] == ModAction.setitem.value
or mod["action"] == ModAction.delitem.value)
key = mod["key"]
self.pending_keys.add(key)
process_mod(self.data, mod)
# convenience functions (update() can be used instead)
def set(self, key, value, persist=None, metadata=None):
if persist is None:
if key in self.data.raw_view:
persist = self.data.raw_view[key][0]
else:
persist = False
if metadata is None:
if key in self.data.raw_view:
metadata = self.data.raw_view[key][2]
else:
metadata = {}
self.data[key] = (persist, value, metadata)
self.pending_keys.add(key)
def delete(self, key):
del self.data[key]
self.pending_keys.add(key)
#
class InteractiveArgDB:
def __init__(self):
self.pending = Notifier(dict())
self.futures = dict()
async def get(self, rid, arglist_desc, title):
self.pending[rid] = {"title": title, "arglist_desc": arglist_desc}
self.futures[rid] = asyncio.get_running_loop().create_future()
try:
value = await self.futures[rid]
finally:
del self.pending[rid]
del self.futures[rid]
return value
def supply(self, rid, values):
# quick sanity checks
if rid not in self.futures or self.futures[rid].done():
raise ValueError("no experiment with this RID is "
"waiting for interactive arguments")
if {i[0] for i in self.pending.raw_view[rid]["arglist_desc"]} != set(values.keys()):
raise ValueError("supplied and requested keys do not match")
self.futures[rid].set_result(values)
def cancel(self, rid):
if rid not in self.futures or self.futures[rid].done():
raise ValueError("no experiment with this RID is "
"waiting for interactive arguments")
self.futures[rid].set_result(None)