master: store datasets in LMDB (#1743)

pull/2082/head
Sebastien Bourdeauducq 2023-04-24 17:34:30 +08:00
parent 9331911139
commit 4056168875
8 changed files with 88 additions and 55 deletions

View File

@ -25,6 +25,7 @@ Highlights:
* Full Python 3.10 support.
* Distributed DMA is now supported, allowing DMA to be run directly on satellites for corresponding
RTIO events, increasing bandwidth in scenarios with heavy satellite usage.
* Persistent datasets are now stored in a LMDB database for improved performance.
ARTIQ-7
-------

View File

@ -30,8 +30,9 @@ def main():
device_db_path = os.path.join(os.path.dirname(sys.argv[1]), "device_db.py")
device_mgr = DeviceManager(DeviceDB(device_db_path))
dataset_db_path = os.path.join(os.path.dirname(sys.argv[1]), "dataset_db.pyon")
dataset_mgr = DatasetManager(DatasetDB(dataset_db_path))
dataset_db_path = os.path.join(os.path.dirname(sys.argv[1]), "dataset_db.mdb")
dataset_db = DatasetDB(dataset_db_path)
dataset_mgr = DatasetManager()
argument_mgr = ProcessArgumentManager({})
@ -68,5 +69,7 @@ def main():
benchmark(lambda: target.strip(elf_shlib),
"Stripping debug information")
dataset_db.close_db()
if __name__ == "__main__":
main()

View File

@ -24,7 +24,7 @@ def get_argparser():
common_args.verbosity_args(parser)
parser.add_argument("--device-db", default="device_db.py",
help="device database file (default: '%(default)s')")
parser.add_argument("--dataset-db", default="dataset_db.pyon",
parser.add_argument("--dataset-db", default="dataset_db.mdb",
help="dataset file (default: '%(default)s')")
parser.add_argument("-c", "--class-name", default=None,
@ -45,29 +45,33 @@ def main():
common_args.init_logger_from_args(args)
device_mgr = DeviceManager(DeviceDB(args.device_db))
dataset_mgr = DatasetManager(DatasetDB(args.dataset_db))
dataset_db = DatasetDB(args.dataset_db)
try:
module = file_import(args.file, prefix="artiq_run_")
exp = get_experiment(module, args.class_name)
arguments = parse_arguments(args.arguments)
argument_mgr = ProcessArgumentManager(arguments)
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
argument_mgr.check_unprocessed_arguments()
dataset_mgr = DatasetManager(dataset_db)
try:
module = file_import(args.file, prefix="artiq_run_")
exp = get_experiment(module, args.class_name)
arguments = parse_arguments(args.arguments)
argument_mgr = ProcessArgumentManager(arguments)
exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {}))
argument_mgr.check_unprocessed_arguments()
if not hasattr(exp.run, "artiq_embedded"):
raise ValueError("Experiment entry point must be a kernel")
core_name = exp.run.artiq_embedded.core_name
core = getattr(exp_inst, core_name)
if not hasattr(exp.run, "artiq_embedded"):
raise ValueError("Experiment entry point must be a kernel")
core_name = exp.run.artiq_embedded.core_name
core = getattr(exp_inst, core_name)
object_map, kernel_library, _, _ = \
core.compile(exp.run, [exp_inst], {},
attribute_writeback=False, print_as_rpc=False)
except CompileError as error:
return
object_map, kernel_library, _, _ = \
core.compile(exp.run, [exp_inst], {},
attribute_writeback=False, print_as_rpc=False)
except CompileError as error:
return
finally:
device_mgr.close_devices()
finally:
device_mgr.close_devices()
dataset_db.close_db()
if object_map.has_rpc():
raise ValueError("Experiment must not use RPC")

View File

@ -40,7 +40,7 @@ def get_argparser():
group = parser.add_argument_group("databases")
group.add_argument("--device-db", default="device_db.py",
help="device database file (default: '%(default)s')")
group.add_argument("--dataset-db", default="dataset_db.pyon",
group.add_argument("--dataset-db", default="dataset_db.mdb",
help="dataset file (default: '%(default)s')")
group = parser.add_argument_group("repository")
@ -101,6 +101,7 @@ def main():
device_db = DeviceDB(args.device_db)
dataset_db = DatasetDB(args.dataset_db)
atexit.register(dataset_db.close_db)
dataset_db.start(loop=loop)
atexit_register_coroutine(dataset_db.stop, loop=loop)
worker_handlers = dict()

View File

@ -134,7 +134,7 @@ def get_argparser(with_file=True):
common_args.verbosity_args(parser)
parser.add_argument("--device-db", default="device_db.py",
help="device database file (default: '%(default)s')")
parser.add_argument("--dataset-db", default="dataset_db.pyon",
parser.add_argument("--dataset-db", default="dataset_db.mdb",
help="dataset file (default: '%(default)s')")
parser.add_argument("-c", "--class-name", default=None,
@ -197,29 +197,32 @@ def run(with_file=False):
virtual_devices={"scheduler": DummyScheduler(),
"ccb": DummyCCB()})
dataset_db = DatasetDB(args.dataset_db)
dataset_mgr = DatasetManager(dataset_db)
try:
exp_inst = _build_experiment(device_mgr, dataset_mgr, args)
exp_inst.prepare()
exp_inst.run()
exp_inst.analyze()
except CompileError as error:
return
except Exception as exn:
if hasattr(exn, "artiq_core_exception"):
print(exn.artiq_core_exception, file=sys.stderr)
raise exn
finally:
device_mgr.close_devices()
dataset_mgr = DatasetManager(dataset_db)
if args.hdf5 is not None:
with h5py.File(args.hdf5, "w") as f:
dataset_mgr.write_hdf5(f)
else:
for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)):
print("{}: {}".format(k, v))
dataset_db.save()
try:
exp_inst = _build_experiment(device_mgr, dataset_mgr, args)
exp_inst.prepare()
exp_inst.run()
exp_inst.analyze()
except CompileError as error:
return
except Exception as exn:
if hasattr(exn, "artiq_core_exception"):
print(exn.artiq_core_exception, file=sys.stderr)
raise exn
finally:
device_mgr.close_devices()
if args.hdf5 is not None:
with h5py.File(args.hdf5, "w") as f:
dataset_mgr.write_hdf5(f)
else:
for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)):
print("{}: {}".format(k, v))
dataset_db.save()
finally:
dataset_db.close_db()
def main():

View File

@ -1,11 +1,13 @@
import asyncio
from artiq.tools import file_import
import lmdb
from sipyco.sync_struct import Notifier, process_mod, update_from_dict
from sipyco.sync_struct import Notifier, process_mod, ModAction, update_from_dict
from sipyco import pyon
from sipyco.asyncio_tools import TaskObject
from artiq.tools import file_import
def device_db_from_file(filename):
mod = file_import(filename)
@ -40,15 +42,25 @@ class DatasetDB(TaskObject):
self.persist_file = persist_file
self.autosave_period = autosave_period
try:
file_data = pyon.load_file(self.persist_file)
except FileNotFoundError:
file_data = dict()
self.data = Notifier({k: (True, v) for k, v in file_data.items()})
self.lmdb = lmdb.open(persist_file, subdir=False, map_size=2**30)
data = dict()
with self.lmdb.begin() as txn:
for key, value in txn.cursor():
data[key.decode()] = (True, pyon.decode(value.decode()))
self.data = Notifier(data)
self.pending_keys = set()
def close_db(self):
self.lmdb.close()
def save(self):
data = {k: v[1] for k, v in self.data.raw_view.items() if v[0]}
pyon.store_file(self.persist_file, data)
with self.lmdb.begin(write=True) as txn:
for key in self.pending_keys:
if key not in self.data.raw_view or not self.data.raw_view[key][0]:
txn.delete(key.encode())
else:
txn.put(key.encode(), pyon.encode(self.data.raw_view[key][1]).encode())
self.pending_keys.clear()
async def _do(self):
try:
@ -62,6 +74,12 @@ class DatasetDB(TaskObject):
return self.data.raw_view[key][1]
def update(self, mod):
if mod["path"]:
key = mod["path"][0]
else:
assert(mod["action"] == ModAction.setitem.value or mod["action"] == ModAction.delitem.value)
key = mod["key"]
self.pending_keys.add(key)
process_mod(self.data, mod)
# convenience functions (update() can be used instead)
@ -72,7 +90,9 @@ class DatasetDB(TaskObject):
else:
persist = False
self.data[key] = (persist, value)
self.pending_keys.add(key)
def delete(self, key):
del self.data[key]
self.pending_keys.add(key)
#

View File

@ -21,13 +21,14 @@ class ExperimentCase(unittest.TestCase):
def setUp(self):
self.device_db = DeviceDB(os.path.join(artiq_root, "device_db.py"))
self.dataset_db = DatasetDB(
os.path.join(artiq_root, "dataset_db.pyon"))
os.path.join(artiq_root, "dataset_db.mdb"))
self.device_mgr = DeviceManager(
self.device_db, virtual_devices={"scheduler": DummyScheduler()})
self.dataset_mgr = DatasetManager(self.dataset_db)
def tearDown(self):
self.device_mgr.close_devices()
self.dataset_db.close_db()
def create(self, cls, *args, **kwargs):
try:

View File

@ -127,7 +127,7 @@
nativeBuildInputs = [ pkgs.qt5.wrapQtAppsHook ];
# keep llvm_x and lld_x in sync with llvmlite
propagatedBuildInputs = [ pkgs.llvm_11 pkgs.lld_11 sipyco.packages.x86_64-linux.sipyco pythonparser artiq-comtools.packages.x86_64-linux.artiq-comtools ]
++ (with pkgs.python3Packages; [ llvmlite pyqtgraph pygit2 numpy dateutil scipy prettytable pyserial levenshtein h5py pyqt5 qasync tqdm ]);
++ (with pkgs.python3Packages; [ llvmlite pyqtgraph pygit2 numpy dateutil scipy prettytable pyserial levenshtein h5py pyqt5 qasync tqdm lmdb ]);
dontWrapQtApps = true;
postFixup = ''