From 40561688753cc74db06e61f578fc9481c0b3426b Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 24 Apr 2023 17:34:30 +0800 Subject: [PATCH] master: store datasets in LMDB (#1743) --- RELEASE_NOTES.rst | 1 + artiq/compiler/testbench/perf_embedding.py | 7 +++- artiq/frontend/artiq_compile.py | 42 ++++++++++--------- artiq/frontend/artiq_master.py | 3 +- artiq/frontend/artiq_run.py | 47 ++++++++++++---------- artiq/master/databases.py | 38 ++++++++++++----- artiq/test/hardware_testbench.py | 3 +- flake.nix | 2 +- 8 files changed, 88 insertions(+), 55 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 590ade183..cd41ac526 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -25,6 +25,7 @@ Highlights: * Full Python 3.10 support. * Distributed DMA is now supported, allowing DMA to be run directly on satellites for corresponding RTIO events, increasing bandwidth in scenarios with heavy satellite usage. +* Persistent datasets are now stored in a LMDB database for improved performance. ARTIQ-7 ------- diff --git a/artiq/compiler/testbench/perf_embedding.py b/artiq/compiler/testbench/perf_embedding.py index 75267cb5b..5d148cb34 100644 --- a/artiq/compiler/testbench/perf_embedding.py +++ b/artiq/compiler/testbench/perf_embedding.py @@ -30,8 +30,9 @@ def main(): device_db_path = os.path.join(os.path.dirname(sys.argv[1]), "device_db.py") device_mgr = DeviceManager(DeviceDB(device_db_path)) - dataset_db_path = os.path.join(os.path.dirname(sys.argv[1]), "dataset_db.pyon") - dataset_mgr = DatasetManager(DatasetDB(dataset_db_path)) + dataset_db_path = os.path.join(os.path.dirname(sys.argv[1]), "dataset_db.mdb") + dataset_db = DatasetDB(dataset_db_path) + dataset_mgr = DatasetManager() argument_mgr = ProcessArgumentManager({}) @@ -68,5 +69,7 @@ def main(): benchmark(lambda: target.strip(elf_shlib), "Stripping debug information") + dataset_db.close_db() + if __name__ == "__main__": main() diff --git a/artiq/frontend/artiq_compile.py b/artiq/frontend/artiq_compile.py index 918a58c65..fcba5297d 100755 --- a/artiq/frontend/artiq_compile.py +++ b/artiq/frontend/artiq_compile.py @@ -24,7 +24,7 @@ def get_argparser(): common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") - parser.add_argument("--dataset-db", default="dataset_db.pyon", + parser.add_argument("--dataset-db", default="dataset_db.mdb", help="dataset file (default: '%(default)s')") parser.add_argument("-c", "--class-name", default=None, @@ -45,29 +45,33 @@ def main(): common_args.init_logger_from_args(args) device_mgr = DeviceManager(DeviceDB(args.device_db)) - dataset_mgr = DatasetManager(DatasetDB(args.dataset_db)) - + dataset_db = DatasetDB(args.dataset_db) try: - module = file_import(args.file, prefix="artiq_run_") - exp = get_experiment(module, args.class_name) - arguments = parse_arguments(args.arguments) - argument_mgr = ProcessArgumentManager(arguments) - exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {})) - argument_mgr.check_unprocessed_arguments() + dataset_mgr = DatasetManager(dataset_db) + + try: + module = file_import(args.file, prefix="artiq_run_") + exp = get_experiment(module, args.class_name) + arguments = parse_arguments(args.arguments) + argument_mgr = ProcessArgumentManager(arguments) + exp_inst = exp((device_mgr, dataset_mgr, argument_mgr, {})) + argument_mgr.check_unprocessed_arguments() - if not hasattr(exp.run, "artiq_embedded"): - raise ValueError("Experiment entry point must be a kernel") - core_name = exp.run.artiq_embedded.core_name - core = getattr(exp_inst, core_name) + if not hasattr(exp.run, "artiq_embedded"): + raise ValueError("Experiment entry point must be a kernel") + core_name = exp.run.artiq_embedded.core_name + core = getattr(exp_inst, core_name) - object_map, kernel_library, _, _ = \ - core.compile(exp.run, [exp_inst], {}, - attribute_writeback=False, print_as_rpc=False) - except CompileError as error: - return + object_map, kernel_library, _, _ = \ + core.compile(exp.run, [exp_inst], {}, + attribute_writeback=False, print_as_rpc=False) + except CompileError as error: + return + finally: + device_mgr.close_devices() finally: - device_mgr.close_devices() + dataset_db.close_db() if object_map.has_rpc(): raise ValueError("Experiment must not use RPC") diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 05959beb5..b5f725b47 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -40,7 +40,7 @@ def get_argparser(): group = parser.add_argument_group("databases") group.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") - group.add_argument("--dataset-db", default="dataset_db.pyon", + group.add_argument("--dataset-db", default="dataset_db.mdb", help="dataset file (default: '%(default)s')") group = parser.add_argument_group("repository") @@ -101,6 +101,7 @@ def main(): device_db = DeviceDB(args.device_db) dataset_db = DatasetDB(args.dataset_db) + atexit.register(dataset_db.close_db) dataset_db.start(loop=loop) atexit_register_coroutine(dataset_db.stop, loop=loop) worker_handlers = dict() diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index c3d548c26..948c34475 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -134,7 +134,7 @@ def get_argparser(with_file=True): common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") - parser.add_argument("--dataset-db", default="dataset_db.pyon", + parser.add_argument("--dataset-db", default="dataset_db.mdb", help="dataset file (default: '%(default)s')") parser.add_argument("-c", "--class-name", default=None, @@ -197,29 +197,32 @@ def run(with_file=False): virtual_devices={"scheduler": DummyScheduler(), "ccb": DummyCCB()}) dataset_db = DatasetDB(args.dataset_db) - dataset_mgr = DatasetManager(dataset_db) - try: - exp_inst = _build_experiment(device_mgr, dataset_mgr, args) - exp_inst.prepare() - exp_inst.run() - exp_inst.analyze() - except CompileError as error: - return - except Exception as exn: - if hasattr(exn, "artiq_core_exception"): - print(exn.artiq_core_exception, file=sys.stderr) - raise exn - finally: - device_mgr.close_devices() + dataset_mgr = DatasetManager(dataset_db) - if args.hdf5 is not None: - with h5py.File(args.hdf5, "w") as f: - dataset_mgr.write_hdf5(f) - else: - for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)): - print("{}: {}".format(k, v)) - dataset_db.save() + try: + exp_inst = _build_experiment(device_mgr, dataset_mgr, args) + exp_inst.prepare() + exp_inst.run() + exp_inst.analyze() + except CompileError as error: + return + except Exception as exn: + if hasattr(exn, "artiq_core_exception"): + print(exn.artiq_core_exception, file=sys.stderr) + raise exn + finally: + device_mgr.close_devices() + + if args.hdf5 is not None: + with h5py.File(args.hdf5, "w") as f: + dataset_mgr.write_hdf5(f) + else: + for k, v in sorted(dataset_mgr.local.items(), key=itemgetter(0)): + print("{}: {}".format(k, v)) + dataset_db.save() + finally: + dataset_db.close_db() def main(): diff --git a/artiq/master/databases.py b/artiq/master/databases.py index 14cfae4cd..68b494f76 100644 --- a/artiq/master/databases.py +++ b/artiq/master/databases.py @@ -1,11 +1,13 @@ import asyncio -from artiq.tools import file_import +import lmdb -from sipyco.sync_struct import Notifier, process_mod, update_from_dict +from sipyco.sync_struct import Notifier, process_mod, ModAction, update_from_dict from sipyco import pyon from sipyco.asyncio_tools import TaskObject +from artiq.tools import file_import + def device_db_from_file(filename): mod = file_import(filename) @@ -40,15 +42,25 @@ class DatasetDB(TaskObject): self.persist_file = persist_file self.autosave_period = autosave_period - try: - file_data = pyon.load_file(self.persist_file) - except FileNotFoundError: - file_data = dict() - self.data = Notifier({k: (True, v) for k, v in file_data.items()}) + self.lmdb = lmdb.open(persist_file, subdir=False, map_size=2**30) + data = dict() + with self.lmdb.begin() as txn: + for key, value in txn.cursor(): + data[key.decode()] = (True, pyon.decode(value.decode())) + self.data = Notifier(data) + self.pending_keys = set() + + def close_db(self): + self.lmdb.close() def save(self): - data = {k: v[1] for k, v in self.data.raw_view.items() if v[0]} - pyon.store_file(self.persist_file, data) + with self.lmdb.begin(write=True) as txn: + for key in self.pending_keys: + if key not in self.data.raw_view or not self.data.raw_view[key][0]: + txn.delete(key.encode()) + else: + txn.put(key.encode(), pyon.encode(self.data.raw_view[key][1]).encode()) + self.pending_keys.clear() async def _do(self): try: @@ -62,6 +74,12 @@ class DatasetDB(TaskObject): return self.data.raw_view[key][1] def update(self, mod): + if mod["path"]: + key = mod["path"][0] + else: + assert(mod["action"] == ModAction.setitem.value or mod["action"] == ModAction.delitem.value) + key = mod["key"] + self.pending_keys.add(key) process_mod(self.data, mod) # convenience functions (update() can be used instead) @@ -72,7 +90,9 @@ class DatasetDB(TaskObject): else: persist = False self.data[key] = (persist, value) + self.pending_keys.add(key) def delete(self, key): del self.data[key] + self.pending_keys.add(key) # diff --git a/artiq/test/hardware_testbench.py b/artiq/test/hardware_testbench.py index 987a1cf6b..c5cebfeff 100644 --- a/artiq/test/hardware_testbench.py +++ b/artiq/test/hardware_testbench.py @@ -21,13 +21,14 @@ class ExperimentCase(unittest.TestCase): def setUp(self): self.device_db = DeviceDB(os.path.join(artiq_root, "device_db.py")) self.dataset_db = DatasetDB( - os.path.join(artiq_root, "dataset_db.pyon")) + os.path.join(artiq_root, "dataset_db.mdb")) self.device_mgr = DeviceManager( self.device_db, virtual_devices={"scheduler": DummyScheduler()}) self.dataset_mgr = DatasetManager(self.dataset_db) def tearDown(self): self.device_mgr.close_devices() + self.dataset_db.close_db() def create(self, cls, *args, **kwargs): try: diff --git a/flake.nix b/flake.nix index 2d28f6079..1336351bb 100644 --- a/flake.nix +++ b/flake.nix @@ -127,7 +127,7 @@ nativeBuildInputs = [ pkgs.qt5.wrapQtAppsHook ]; # keep llvm_x and lld_x in sync with llvmlite propagatedBuildInputs = [ pkgs.llvm_11 pkgs.lld_11 sipyco.packages.x86_64-linux.sipyco pythonparser artiq-comtools.packages.x86_64-linux.artiq-comtools ] - ++ (with pkgs.python3Packages; [ llvmlite pyqtgraph pygit2 numpy dateutil scipy prettytable pyserial levenshtein h5py pyqt5 qasync tqdm ]); + ++ (with pkgs.python3Packages; [ llvmlite pyqtgraph pygit2 numpy dateutil scipy prettytable pyserial levenshtein h5py pyqt5 qasync tqdm lmdb ]); dontWrapQtApps = true; postFixup = ''