Merge pull request #1544 from airwoodix/dataset-compression

datasets: support compression in HDF5 archives
This commit is contained in:
Sébastien Bourdeauducq 2021-12-06 12:43:19 +08:00 committed by GitHub
commit 311a818a49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 267 additions and 50 deletions

View File

@ -34,6 +34,9 @@ Highlights:
* Previously detected RTIO async errors are reported to the host after each kernel terminates and a * Previously detected RTIO async errors are reported to the host after each kernel terminates and a
warning is logged. The warning is additional to the one already printed in the core device log upon warning is logged. The warning is additional to the one already printed in the core device log upon
detection of the error. detection of the error.
* HDF5 options can now be passed when creating datasets with ``set_dataset``. This allows
in particular to use transparent compression filters as follows:
``set_dataset(name, value, hdf5_options={"compression": "gzip"})``.
Breaking changes: Breaking changes:
@ -46,6 +49,10 @@ Breaking changes:
calling `ADF5356.init()`. calling `ADF5356.init()`.
* DRTIO: Changed message alignment from 32-bits to 64-bits. * DRTIO: Changed message alignment from 32-bits to 64-bits.
* The deprecated ``set_dataset(..., save=...)`` is no longer supported. * The deprecated ``set_dataset(..., save=...)`` is no longer supported.
* The internal dataset representation was changed to support tracking HDF5 options like e.g.
a compression method. This requires changes to code reading the dataset persistence file
(``dataset_db.pyon``) and to custom applets.
ARTIQ-6 ARTIQ-6
------- -------
@ -117,6 +124,7 @@ Breaking changes:
* Experiment classes with underscore-prefixed names are now ignored when ``artiq_client`` * Experiment classes with underscore-prefixed names are now ignored when ``artiq_client``
determines which experiment to submit (consistent with ``artiq_run``). determines which experiment to submit (consistent with ``artiq_run``).
ARTIQ-5 ARTIQ-5
------- -------

View File

@ -13,7 +13,7 @@ class NumberWidget(QtWidgets.QLCDNumber):
def data_changed(self, data, mods): def data_changed(self, data, mods):
try: try:
n = float(data[self.dataset_name][1]) n = float(data[self.dataset_name]["value"])
except (KeyError, ValueError, TypeError): except (KeyError, ValueError, TypeError):
n = "---" n = "---"
self.display(n) self.display(n)

View File

@ -13,7 +13,7 @@ class Image(pyqtgraph.ImageView):
def data_changed(self, data, mods): def data_changed(self, data, mods):
try: try:
img = data[self.args.img][1] img = data[self.args.img]["value"]
except KeyError: except KeyError:
return return
self.setImage(img) self.setImage(img)

View File

@ -17,11 +17,11 @@ class HistogramPlot(pyqtgraph.PlotWidget):
def data_changed(self, data, mods, title): def data_changed(self, data, mods, title):
try: try:
y = data[self.args.y][1] y = data[self.args.y]["value"]
if self.args.x is None: if self.args.x is None:
x = None x = None
else: else:
x = data[self.args.x][1] x = data[self.args.x]["value"]
except KeyError: except KeyError:
return return
if x is None: if x is None:

View File

@ -6,6 +6,7 @@ from PyQt5.QtCore import QTimer
import pyqtgraph import pyqtgraph
from artiq.applets.simple import TitleApplet from artiq.applets.simple import TitleApplet
from artiq.master.databases import make_dataset as empty_dataset
class XYPlot(pyqtgraph.PlotWidget): class XYPlot(pyqtgraph.PlotWidget):
@ -21,14 +22,14 @@ class XYPlot(pyqtgraph.PlotWidget):
def data_changed(self, data, mods, title): def data_changed(self, data, mods, title):
try: try:
y = data[self.args.y][1] y = data[self.args.y]["value"]
except KeyError: except KeyError:
return return
x = data.get(self.args.x, (False, None))[1] x = data.get(self.args.x, empty_dataset())["value"]
if x is None: if x is None:
x = np.arange(len(y)) x = np.arange(len(y))
error = data.get(self.args.error, (False, None))[1] error = data.get(self.args.error, empty_dataset())["value"]
fit = data.get(self.args.fit, (False, None))[1] fit = data.get(self.args.fit, empty_dataset())["value"]
if not len(y) or len(y) != len(x): if not len(y) or len(y) != len(x):
self.mismatch['X values'] = True self.mismatch['X values'] = True

View File

@ -126,9 +126,9 @@ class XYHistPlot(QtWidgets.QSplitter):
def data_changed(self, data, mods): def data_changed(self, data, mods):
try: try:
xs = data[self.args.xs][1] xs = data[self.args.xs]["value"]
histogram_bins = data[self.args.histogram_bins][1] histogram_bins = data[self.args.histogram_bins]["value"]
histograms_counts = data[self.args.histograms_counts][1] histograms_counts = data[self.args.histograms_counts]["value"]
except KeyError: except KeyError:
return return
if len(xs) != histograms_counts.shape[0]: if len(xs) != histograms_counts.shape[0]:

View File

@ -10,6 +10,7 @@ from sipyco.sync_struct import Subscriber, process_mod
from sipyco import pyon from sipyco import pyon
from sipyco.pipe_ipc import AsyncioChildComm from sipyco.pipe_ipc import AsyncioChildComm
from artiq.master.databases import make_dataset as empty_dataset
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -251,7 +252,7 @@ class TitleApplet(SimpleApplet):
def emit_data_changed(self, data, mod_buffer): def emit_data_changed(self, data, mod_buffer):
if self.args.title is not None: if self.args.title is not None:
title_values = {k.replace(".", "/"): data.get(k, (False, None))[1] title_values = {k.replace(".", "/"): data.get(k, empty_dataset())["value"]
for k in self.dataset_title} for k in self.dataset_title}
try: try:
title = self.args.title.format(**title_values) title = self.args.title.format(**title_values)

View File

@ -104,8 +104,8 @@ class DatasetsDock(QtWidgets.QDockWidget):
idx = self.table_model_filter.mapToSource(idx[0]) idx = self.table_model_filter.mapToSource(idx[0])
key = self.table_model.index_to_key(idx) key = self.table_model.index_to_key(idx)
if key is not None: if key is not None:
persist, value = self.table_model.backing_store[key] dataset = self.table_model.backing_store[key]
asyncio.ensure_future(self._upload_dataset(key, value)) asyncio.ensure_future(self._upload_dataset(key, dataset["value"]))
def save_state(self): def save_state(self):
return bytes(self.table.header().saveState()) return bytes(self.table.header().saveState())

View File

@ -147,15 +147,15 @@ class Creator(QtWidgets.QDialog):
class Model(DictSyncTreeSepModel): class Model(DictSyncTreeSepModel):
def __init__(self, init): def __init__(self, init):
DictSyncTreeSepModel.__init__(self, ".", DictSyncTreeSepModel.__init__(
["Dataset", "Persistent", "Value"], self, ".", ["Dataset", "Persistent", "Value"], init
init) )
def convert(self, k, v, column): def convert(self, k, v, column):
if column == 1: if column == 1:
return "Y" if v[0] else "N" return "Y" if v["persist"] else "N"
elif column == 2: elif column == 2:
return short_format(v[1]) return short_format(v["value"])
else: else:
raise ValueError raise ValueError
@ -223,8 +223,8 @@ class DatasetsDock(QtWidgets.QDockWidget):
idx = self.table_model_filter.mapToSource(idx[0]) idx = self.table_model_filter.mapToSource(idx[0])
key = self.table_model.index_to_key(idx) key = self.table_model.index_to_key(idx)
if key is not None: if key is not None:
persist, value = self.table_model.backing_store[key] dataset = self.table_model.backing_store[key]
t = type(value) t = type(dataset["value"])
if np.issubdtype(t, np.number): if np.issubdtype(t, np.number):
dialog_cls = NumberEditor dialog_cls = NumberEditor
elif np.issubdtype(t, np.bool_): elif np.issubdtype(t, np.bool_):
@ -235,7 +235,7 @@ class DatasetsDock(QtWidgets.QDockWidget):
logger.error("Cannot edit dataset %s: " logger.error("Cannot edit dataset %s: "
"type %s is not supported", key, t) "type %s is not supported", key, t)
return return
dialog_cls(self, self.dataset_ctl, key, value).open() dialog_cls(self, self.dataset_ctl, key, dataset["value"]).open()
def delete_clicked(self): def delete_clicked(self):
idx = self.table.selectedIndexes() idx = self.table.selectedIndexes()

View File

@ -330,7 +330,8 @@ class HasEnvironment:
@rpc(flags={"async"}) @rpc(flags={"async"})
def set_dataset(self, key, value, def set_dataset(self, key, value,
broadcast=False, persist=False, archive=True): broadcast=False, persist=False, archive=True,
hdf5_options=None):
"""Sets the contents and handling modes of a dataset. """Sets the contents and handling modes of a dataset.
Datasets must be scalars (``bool``, ``int``, ``float`` or NumPy scalar) Datasets must be scalars (``bool``, ``int``, ``float`` or NumPy scalar)
@ -342,8 +343,16 @@ class HasEnvironment:
broadcast. broadcast.
:param archive: the data is saved into the local storage of the current :param archive: the data is saved into the local storage of the current
run (archived as a HDF5 file). run (archived as a HDF5 file).
:param hdf5_options: dict of keyword arguments to pass to
:meth:`h5py.Group.create_dataset`. For example, pass ``{"compression": "gzip"}``
to enable transparent zlib compression of this dataset in the HDF5 archive.
See the `h5py documentation <https://docs.h5py.org/en/stable/high/group.html#h5py.Group.create_dataset>`_
for a list of valid options.
""" """
self.__dataset_mgr.set(key, value, broadcast, persist, archive)
self.__dataset_mgr.set(
key, value, broadcast, persist, archive, hdf5_options
)
@rpc(flags={"async"}) @rpc(flags={"async"})
def mutate_dataset(self, key, index, value): def mutate_dataset(self, key, index, value):

View File

@ -35,6 +35,15 @@ class DeviceDB:
return desc return desc
def make_dataset(*, persist=False, value=None, hdf5_options=None):
"PYON-serializable representation of a dataset in the DatasetDB"
return {
"persist": persist,
"value": value,
"hdf5_options": hdf5_options or {},
}
class DatasetDB(TaskObject): class DatasetDB(TaskObject):
def __init__(self, persist_file, autosave_period=30): def __init__(self, persist_file, autosave_period=30):
self.persist_file = persist_file self.persist_file = persist_file
@ -44,10 +53,23 @@ class DatasetDB(TaskObject):
file_data = pyon.load_file(self.persist_file) file_data = pyon.load_file(self.persist_file)
except FileNotFoundError: except FileNotFoundError:
file_data = dict() file_data = dict()
self.data = Notifier({k: (True, v) for k, v in file_data.items()}) self.data = Notifier(
{
k: make_dataset(
persist=True,
value=v["value"],
hdf5_options=v["hdf5_options"]
)
for k, v in file_data.items()
}
)
def save(self): def save(self):
data = {k: v[1] for k, v in self.data.raw_view.items() if v[0]} data = {
k: d
for k, d in self.data.raw_view.items()
if d["persist"]
}
pyon.store_file(self.persist_file, data) pyon.store_file(self.persist_file, data)
async def _do(self): async def _do(self):
@ -59,20 +81,23 @@ class DatasetDB(TaskObject):
self.save() self.save()
def get(self, key): def get(self, key):
return self.data.raw_view[key][1] return self.data.raw_view[key]
def update(self, mod): def update(self, mod):
process_mod(self.data, mod) process_mod(self.data, mod)
# convenience functions (update() can be used instead) # convenience functions (update() can be used instead)
def set(self, key, value, persist=None): def set(self, key, value, persist=None, hdf5_options=None):
if persist is None: if persist is None:
if key in self.data.raw_view: if key in self.data.raw_view:
persist = self.data.raw_view[key][0] persist = self.data.raw_view[key]["persist"]
else: else:
persist = False persist = False
self.data[key] = (persist, value) self.data[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
def delete(self, key): def delete(self, key):
del self.data[key] del self.data[key]
#

View File

@ -8,9 +8,12 @@ from operator import setitem
import importlib import importlib
import logging import logging
import numpy as np
from sipyco.sync_struct import Notifier from sipyco.sync_struct import Notifier
from sipyco.pc_rpc import AutoTarget, Client, BestEffortClient from sipyco.pc_rpc import AutoTarget, Client, BestEffortClient
from artiq.master.databases import make_dataset
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -115,7 +118,8 @@ class DatasetManager:
self.ddb = ddb self.ddb = ddb
self._broadcaster.publish = ddb.update self._broadcaster.publish = ddb.update
def set(self, key, value, broadcast=False, persist=False, archive=True): def set(self, key, value, broadcast=False, persist=False, archive=True,
hdf5_options=None):
if key in self.archive: if key in self.archive:
logger.warning("Modifying dataset '%s' which is in archive, " logger.warning("Modifying dataset '%s' which is in archive, "
"archive will remain untouched", "archive will remain untouched",
@ -125,12 +129,20 @@ class DatasetManager:
broadcast = True broadcast = True
if broadcast: if broadcast:
self._broadcaster[key] = persist, value self._broadcaster[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
elif key in self._broadcaster.raw_view: elif key in self._broadcaster.raw_view:
del self._broadcaster[key] del self._broadcaster[key]
if archive: if archive:
self.local[key] = value self.local[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
elif key in self.local: elif key in self.local:
del self.local[key] del self.local[key]
@ -138,11 +150,11 @@ class DatasetManager:
target = self.local.get(key, None) target = self.local.get(key, None)
if key in self._broadcaster.raw_view: if key in self._broadcaster.raw_view:
if target is not None: if target is not None:
assert target is self._broadcaster.raw_view[key][1] assert target["value"] is self._broadcaster.raw_view[key]["value"]
return self._broadcaster[key][1] return self._broadcaster[key]["value"]
if target is None: if target is None:
raise KeyError("Cannot mutate nonexistent dataset '{}'".format(key)) raise KeyError("Cannot mutate nonexistent dataset '{}'".format(key))
return target return target["value"]
def mutate(self, key, index, value): def mutate(self, key, index, value):
target = self._get_mutation_target(key) target = self._get_mutation_target(key)
@ -158,15 +170,15 @@ class DatasetManager:
def get(self, key, archive=False): def get(self, key, archive=False):
if key in self.local: if key in self.local:
return self.local[key] return self.local[key]["value"]
data = self.ddb.get(key) dataset = self.ddb.get(key)
if archive: if archive:
if key in self.archive: if key in self.archive:
logger.warning("Dataset '%s' is already in archive, " logger.warning("Dataset '%s' is already in archive, "
"overwriting", key, stack_info=True) "overwriting", key, stack_info=True)
self.archive[key] = data self.archive[key] = dataset
return data return dataset["value"]
def write_hdf5(self, f): def write_hdf5(self, f):
datasets_group = f.create_group("datasets") datasets_group = f.create_group("datasets")
@ -182,7 +194,7 @@ def _write(group, k, v):
# Add context to exception message when the user writes a dataset that is # Add context to exception message when the user writes a dataset that is
# not representable in HDF5. # not representable in HDF5.
try: try:
group[k] = v group.create_dataset(k, data=v["value"], **v["hdf5_options"])
except TypeError as e: except TypeError as e:
raise TypeError("Error writing dataset '{}' of type '{}': {}".format( raise TypeError("Error writing dataset '{}' of type '{}': {}".format(
k, type(v), e)) k, type(v["value"]), e))

View File

@ -0,0 +1,111 @@
"""Test internal dataset representation (persistence, applets)"""
import unittest
import tempfile
from artiq.master.databases import DatasetDB
from sipyco import pyon
KEY1 = "key1"
KEY2 = "key2"
KEY3 = "key3"
DATA = list(range(10))
COMP = "gzip"
class TestDatasetDB(unittest.TestCase):
def setUp(self):
# empty dataset persistance file
self.persist_file = tempfile.NamedTemporaryFile(mode="w+")
print("{}", file=self.persist_file, flush=True)
self.ddb = DatasetDB(self.persist_file.name)
self.ddb.set(KEY1, DATA, persist=True)
self.ddb.set(KEY2, DATA, persist=True, hdf5_options=dict(compression=COMP))
self.ddb.set(KEY3, DATA, hdf5_options=dict(shuffle=True))
self.save_ddb_to_disk()
def save_ddb_to_disk(self):
self.ddb.save()
self.persist_file.flush()
def load_ddb_from_disk(self):
return pyon.load_file(self.persist_file.name)
def test_persist_format(self):
data = pyon.load_file(self.persist_file.name)
for key in [KEY1, KEY2]:
self.assertTrue(data[key]["persist"])
self.assertEqual(data[key]["value"], DATA)
self.assertEqual(data[KEY2]["hdf5_options"]["compression"], COMP)
self.assertEqual(data[KEY1]["hdf5_options"], dict())
def test_only_persist_marked_datasets(self):
data = self.load_ddb_from_disk()
with self.assertRaises(KeyError):
data[KEY3]
def test_memory_format(self):
ds = self.ddb.get(KEY2)
self.assertTrue(ds["persist"])
self.assertEqual(ds["value"], DATA)
self.assertEqual(ds["hdf5_options"]["compression"], COMP)
ds = self.ddb.get(KEY3)
self.assertFalse(ds["persist"])
self.assertEqual(ds["value"], DATA)
self.assertTrue(ds["hdf5_options"]["shuffle"])
def test_delete(self):
self.ddb.delete(KEY1)
self.save_ddb_to_disk()
data = self.load_ddb_from_disk()
with self.assertRaises(KeyError):
data[KEY1]
self.assertTrue(data[KEY2]["persist"])
def test_update(self):
self.assertFalse(self.ddb.get(KEY3)["persist"])
mod = {
"action": "setitem",
"path": [KEY3],
"key": "persist",
"value": True,
}
self.ddb.update(mod)
self.assertTrue(self.ddb.get(KEY3)["persist"])
def test_update_hdf5_options(self):
with self.assertRaises(KeyError):
self.ddb.get(KEY1)["hdf5_options"]["shuffle"]
mod = {
"action": "setitem",
"path": [KEY1, "hdf5_options"],
"key": "shuffle",
"value": False,
}
self.ddb.update(mod)
self.assertFalse(self.ddb.get(KEY1)["hdf5_options"]["shuffle"])
def test_reset_copies_persist(self):
self.assertTrue(self.ddb.get(KEY1)["persist"])
self.ddb.set(KEY1, DATA)
self.assertTrue(self.ddb.get(KEY1)["persist"])
self.assertFalse(self.ddb.get(KEY3)["persist"])
self.ddb.set(KEY3, DATA)
self.assertFalse(self.ddb.get(KEY3)["persist"])
self.ddb.set(KEY3, DATA, persist=True)
self.assertTrue(self.ddb.get(KEY3)["persist"])

View File

@ -3,6 +3,9 @@
import copy import copy
import unittest import unittest
import h5py
import numpy as np
from sipyco.sync_struct import process_mod from sipyco.sync_struct import process_mod
from artiq.experiment import EnvExperiment from artiq.experiment import EnvExperiment
@ -14,7 +17,7 @@ class MockDatasetDB:
self.data = dict() self.data = dict()
def get(self, key): def get(self, key):
return self.data[key][1] return self.data[key]["value"]
def update(self, mod): def update(self, mod):
# Copy mod before applying to avoid sharing references to objects # Copy mod before applying to avoid sharing references to objects
@ -82,9 +85,9 @@ class ExperimentDatasetCase(unittest.TestCase):
def test_append_broadcast(self): def test_append_broadcast(self):
self.exp.set(KEY, [], broadcast=True) self.exp.set(KEY, [], broadcast=True)
self.exp.append(KEY, 0) self.exp.append(KEY, 0)
self.assertEqual(self.dataset_db.data[KEY][1], [0]) self.assertEqual(self.dataset_db.data[KEY]["value"], [0])
self.exp.append(KEY, 1) self.exp.append(KEY, 1)
self.assertEqual(self.dataset_db.data[KEY][1], [0, 1]) self.assertEqual(self.dataset_db.data[KEY]["value"], [0, 1])
def test_append_array(self): def test_append_array(self):
for broadcast in (True, False): for broadcast in (True, False):
@ -103,3 +106,44 @@ class ExperimentDatasetCase(unittest.TestCase):
with self.assertRaises(KeyError): with self.assertRaises(KeyError):
self.exp.append(KEY, 0) self.exp.append(KEY, 0)
def test_write_hdf5_options(self):
data = np.random.randint(0, 1024, 1024)
self.exp.set(
KEY,
data,
hdf5_options=dict(
compression="gzip",
compression_opts=6,
shuffle=True,
fletcher32=True
),
)
with h5py.File("test.h5", "a", "core", backing_store=False) as f:
self.dataset_mgr.write_hdf5(f)
self.assertTrue(np.array_equal(f["datasets"][KEY][()], data))
self.assertEqual(f["datasets"][KEY].compression, "gzip")
self.assertEqual(f["datasets"][KEY].compression_opts, 6)
self.assertTrue(f["datasets"][KEY].shuffle)
self.assertTrue(f["datasets"][KEY].fletcher32)
def test_write_hdf5_no_options(self):
data = np.random.randint(0, 1024, 1024)
self.exp.set(KEY, data)
with h5py.File("test.h5", "a", "core", backing_store=False) as f:
self.dataset_mgr.write_hdf5(f)
self.assertTrue(np.array_equal(f["datasets"][KEY][()], data))
self.assertIsNone(f["datasets"][KEY].compression)
def test_write_hdf5_invalid_type(self):
class CustomType:
def __init__(self, x):
self.x = x
self.exp.set(KEY, CustomType(42))
with h5py.File("test.h5", "w", "core", backing_store=False) as f:
with self.assertRaisesRegex(TypeError, "CustomType"):
self.dataset_mgr.write_hdf5(f)

View File

@ -7,6 +7,7 @@ from time import time, sleep
from artiq.experiment import * from artiq.experiment import *
from artiq.master.scheduler import Scheduler from artiq.master.scheduler import Scheduler
from artiq.master.databases import make_dataset
class EmptyExperiment(EnvExperiment): class EmptyExperiment(EnvExperiment):
@ -291,8 +292,13 @@ class SchedulerCase(unittest.TestCase):
nonlocal termination_ok nonlocal termination_ok
self.assertEqual( self.assertEqual(
mod, mod,
{"action": "setitem", "key": "termination_ok", {
"value": (False, True), "path": []}) "action": "setitem",
"key": "termination_ok",
"value": make_dataset(value=True),
"path": []
}
)
termination_ok = True termination_ok = True
handlers = { handlers = {
"update_dataset": check_termination "update_dataset": check_termination