Revert "Merge pull request #1544 from airwoodix/dataset-compression"

This reverts commit 311a818a49, reversing
changes made to 7ffe4dc2e3.
datasets
Tom Harty 2022-01-24 10:39:16 +00:00
parent f58aa3bdf6
commit 301c734fb6
14 changed files with 55 additions and 156 deletions

View File

@ -32,9 +32,6 @@ Highlights:
* Previously detected RTIO async errors are reported to the host after each kernel terminates and a
warning is logged. The warning is additional to the one already printed in the core device log upon
detection of the error.
* HDF5 options can now be passed when creating datasets with ``set_dataset``. This allows
in particular to use transparent compression filters as follows:
``set_dataset(name, value, hdf5_options={"compression": "gzip"})``.
* Removed worker DB warning for writing a dataset that is also in the archive
Breaking changes:
@ -51,10 +48,6 @@ Breaking changes:
calling `ADF5356.init()`.
* DRTIO: Changed message alignment from 32-bits to 64-bits.
* The deprecated ``set_dataset(..., save=...)`` is no longer supported.
* The internal dataset representation was changed to support tracking HDF5 options like e.g.
a compression method. This requires changes to code reading the dataset persistence file
(``dataset_db.pyon``) and to custom applets.
ARTIQ-6
-------
@ -129,7 +122,6 @@ Breaking changes:
* Experiment classes with underscore-prefixed names are now ignored when ``artiq_client``
determines which experiment to submit (consistent with ``artiq_run``).
ARTIQ-5
-------

View File

@ -13,7 +13,7 @@ class NumberWidget(QtWidgets.QLCDNumber):
def data_changed(self, data, mods):
try:
n = float(data[self.dataset_name]["value"])
n = float(data[self.dataset_name][1])
except (KeyError, ValueError, TypeError):
n = "---"
self.display(n)

View File

@ -13,7 +13,7 @@ class Image(pyqtgraph.ImageView):
def data_changed(self, data, mods):
try:
img = data[self.args.img]["value"]
img = data[self.args.img][1]
except KeyError:
return
self.setImage(img)

View File

@ -17,11 +17,11 @@ class HistogramPlot(pyqtgraph.PlotWidget):
def data_changed(self, data, mods, title):
try:
y = data[self.args.y]["value"]
y = data[self.args.y][1]
if self.args.x is None:
x = None
else:
x = data[self.args.x]["value"]
x = data[self.args.x][1]
except KeyError:
return
if x is None:

View File

@ -6,7 +6,6 @@ from PyQt5.QtCore import QTimer
import pyqtgraph
from artiq.applets.simple import TitleApplet
from artiq.master.databases import make_dataset as empty_dataset
class XYPlot(pyqtgraph.PlotWidget):
@ -22,14 +21,14 @@ class XYPlot(pyqtgraph.PlotWidget):
def data_changed(self, data, mods, title):
try:
y = data[self.args.y]["value"]
y = data[self.args.y][1]
except KeyError:
return
x = data.get(self.args.x, empty_dataset())["value"]
x = data.get(self.args.x, (False, None))[1]
if x is None:
x = np.arange(len(y))
error = data.get(self.args.error, empty_dataset())["value"]
fit = data.get(self.args.fit, empty_dataset())["value"]
error = data.get(self.args.error, (False, None))[1]
fit = data.get(self.args.fit, (False, None))[1]
if not len(y) or len(y) != len(x):
self.mismatch['X values'] = True

View File

@ -126,9 +126,9 @@ class XYHistPlot(QtWidgets.QSplitter):
def data_changed(self, data, mods):
try:
xs = data[self.args.xs]["value"]
histogram_bins = data[self.args.histogram_bins]["value"]
histograms_counts = data[self.args.histograms_counts]["value"]
xs = data[self.args.xs][1]
histogram_bins = data[self.args.histogram_bins][1]
histograms_counts = data[self.args.histograms_counts][1]
except KeyError:
return
if len(xs) != histograms_counts.shape[0]:

View File

@ -10,7 +10,6 @@ from sipyco.sync_struct import Subscriber, process_mod
from sipyco import pyon
from sipyco.pipe_ipc import AsyncioChildComm
from artiq.master.databases import make_dataset as empty_dataset
logger = logging.getLogger(__name__)
@ -252,7 +251,7 @@ class TitleApplet(SimpleApplet):
def emit_data_changed(self, data, mod_buffer):
if self.args.title is not None:
title_values = {k.replace(".", "/"): data.get(k, empty_dataset())["value"]
title_values = {k.replace(".", "/"): data.get(k, (False, None))[1]
for k in self.dataset_title}
try:
title = self.args.title.format(**title_values)

View File

@ -104,8 +104,8 @@ class DatasetsDock(QtWidgets.QDockWidget):
idx = self.table_model_filter.mapToSource(idx[0])
key = self.table_model.index_to_key(idx)
if key is not None:
dataset = self.table_model.backing_store[key]
asyncio.ensure_future(self._upload_dataset(key, dataset["value"]))
persist, value = self.table_model.backing_store[key]
asyncio.ensure_future(self._upload_dataset(key, value))
def save_state(self):
return bytes(self.table.header().saveState())

View File

@ -146,16 +146,16 @@ class Creator(QtWidgets.QDialog):
class Model(DictSyncTreeSepModel):
def __init__(self, init):
DictSyncTreeSepModel.__init__(
self, ".", ["Dataset", "Persistent", "Value"], init
)
def __init__(self, init):
DictSyncTreeSepModel.__init__(self, ".",
["Dataset", "Persistent", "Value"],
init)
def convert(self, k, v, column):
if column == 1:
return "Y" if v["persist"] else "N"
return "Y" if v[0] else "N"
elif column == 2:
return short_format(v["value"])
return short_format(v[1])
else:
raise ValueError
@ -223,8 +223,8 @@ class DatasetsDock(QtWidgets.QDockWidget):
idx = self.table_model_filter.mapToSource(idx[0])
key = self.table_model.index_to_key(idx)
if key is not None:
dataset = self.table_model.backing_store[key]
t = type(dataset["value"])
persist, value = self.table_model.backing_store[key]
t = type(value)
if np.issubdtype(t, np.number):
dialog_cls = NumberEditor
elif np.issubdtype(t, np.bool_):
@ -235,7 +235,7 @@ class DatasetsDock(QtWidgets.QDockWidget):
logger.error("Cannot edit dataset %s: "
"type %s is not supported", key, t)
return
dialog_cls(self, self.dataset_ctl, key, dataset["value"]).open()
dialog_cls(self, self.dataset_ctl, key, value).open()
def delete_clicked(self):
idx = self.table.selectedIndexes()

View File

@ -330,8 +330,7 @@ class HasEnvironment:
@rpc(flags={"async"})
def set_dataset(self, key, value,
broadcast=False, persist=False, archive=True,
hdf5_options=None):
broadcast=False, persist=False, archive=True):
"""Sets the contents and handling modes of a dataset.
Datasets must be scalars (``bool``, ``int``, ``float`` or NumPy scalar)
@ -343,16 +342,8 @@ class HasEnvironment:
broadcast.
:param archive: the data is saved into the local storage of the current
run (archived as a HDF5 file).
:param hdf5_options: dict of keyword arguments to pass to
:meth:`h5py.Group.create_dataset`. For example, pass ``{"compression": "gzip"}``
to enable transparent zlib compression of this dataset in the HDF5 archive.
See the `h5py documentation <https://docs.h5py.org/en/stable/high/group.html#h5py.Group.create_dataset>`_
for a list of valid options.
"""
self.__dataset_mgr.set(
key, value, broadcast, persist, archive, hdf5_options
)
self.__dataset_mgr.set(key, value, broadcast, persist, archive)
@rpc(flags={"async"})
def mutate_dataset(self, key, index, value):

View File

@ -35,15 +35,6 @@ class DeviceDB:
return desc
def make_dataset(*, persist=False, value=None, hdf5_options=None):
"PYON-serializable representation of a dataset in the DatasetDB"
return {
"persist": persist,
"value": value,
"hdf5_options": hdf5_options or {},
}
class DatasetDB(TaskObject):
def __init__(self, persist_file, autosave_period=30):
self.persist_file = persist_file
@ -53,23 +44,10 @@ class DatasetDB(TaskObject):
file_data = pyon.load_file(self.persist_file)
except FileNotFoundError:
file_data = dict()
self.data = Notifier(
{
k: make_dataset(
persist=True,
value=v["value"],
hdf5_options=v["hdf5_options"]
)
for k, v in file_data.items()
}
)
self.data = Notifier({k: (True, v) for k, v in file_data.items()})
def save(self):
data = {
k: d
for k, d in self.data.raw_view.items()
if d["persist"]
}
data = {k: v[1] for k, v in self.data.raw_view.items() if v[0]}
pyon.store_file(self.persist_file, data)
async def _do(self):
@ -81,23 +59,20 @@ class DatasetDB(TaskObject):
self.save()
def get(self, key):
return self.data.raw_view[key]
return self.data.raw_view[key][1]
def update(self, mod):
process_mod(self.data, mod)
# convenience functions (update() can be used instead)
def set(self, key, value, persist=None, hdf5_options=None):
def set(self, key, value, persist=None):
if persist is None:
if key in self.data.raw_view:
persist = self.data.raw_view[key]["persist"]
persist = self.data.raw_view[key][0]
else:
persist = False
self.data[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
self.data[key] = (persist, value)
def delete(self, key):
del self.data[key]
#

View File

@ -8,12 +8,9 @@ from operator import setitem
import importlib
import logging
import numpy as np
from sipyco.sync_struct import Notifier
from sipyco.pc_rpc import AutoTarget, Client, BestEffortClient
from artiq.master.databases import make_dataset
logger = logging.getLogger(__name__)
@ -118,26 +115,22 @@ class DatasetManager:
self.ddb = ddb
self._broadcaster.publish = ddb.update
def set(self, key, value, broadcast=False, persist=False, archive=True,
hdf5_options=None):
def set(self, key, value, broadcast=False, persist=False, archive=True):
if key in self.archive:
logger.warning("Modifying dataset '%s' which is in archive, "
"archive will remain untouched",
key, stack_info=True)
if persist:
broadcast = True
if broadcast:
self._broadcaster[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
self._broadcaster[key] = persist, value
elif key in self._broadcaster.raw_view:
del self._broadcaster[key]
if archive:
self.local[key] = make_dataset(
persist=persist,
value=value,
hdf5_options=hdf5_options,
)
self.local[key] = value
elif key in self.local:
del self.local[key]
@ -145,11 +138,11 @@ class DatasetManager:
target = self.local.get(key, None)
if key in self._broadcaster.raw_view:
if target is not None:
assert target["value"] is self._broadcaster.raw_view[key]["value"]
return self._broadcaster[key]["value"]
assert target is self._broadcaster.raw_view[key][1]
return self._broadcaster[key][1]
if target is None:
raise KeyError("Cannot mutate nonexistent dataset '{}'".format(key))
return target["value"]
return target
def mutate(self, key, index, value):
target = self._get_mutation_target(key)
@ -165,15 +158,15 @@ class DatasetManager:
def get(self, key, archive=False):
if key in self.local:
return self.local[key]["value"]
dataset = self.ddb.get(key)
return self.local[key]
data = self.ddb.get(key)
if archive:
if key in self.archive:
logger.warning("Dataset '%s' is already in archive, "
"overwriting", key, stack_info=True)
self.archive[key] = dataset
return dataset["value"]
self.archive[key] = data
return data
def write_hdf5(self, f):
datasets_group = f.create_group("datasets")
@ -189,7 +182,7 @@ def _write(group, k, v):
# Add context to exception message when the user writes a dataset that is
# not representable in HDF5.
try:
group.create_dataset(k, data=v["value"], **v["hdf5_options"])
group[k] = v
except TypeError as e:
raise TypeError("Error writing dataset '{}' of type '{}': {}".format(
k, type(v["value"]), e))
k, type(v), e))

View File

@ -3,9 +3,6 @@
import copy
import unittest
import h5py
import numpy as np
from sipyco.sync_struct import process_mod
from artiq.experiment import EnvExperiment
@ -17,7 +14,7 @@ class MockDatasetDB:
self.data = dict()
def get(self, key):
return self.data[key]["value"]
return self.data[key][1]
def update(self, mod):
# Copy mod before applying to avoid sharing references to objects
@ -85,9 +82,9 @@ class ExperimentDatasetCase(unittest.TestCase):
def test_append_broadcast(self):
self.exp.set(KEY, [], broadcast=True)
self.exp.append(KEY, 0)
self.assertEqual(self.dataset_db.data[KEY]["value"], [0])
self.assertEqual(self.dataset_db.data[KEY][1], [0])
self.exp.append(KEY, 1)
self.assertEqual(self.dataset_db.data[KEY]["value"], [0, 1])
self.assertEqual(self.dataset_db.data[KEY][1], [0, 1])
def test_append_array(self):
for broadcast in (True, False):
@ -106,44 +103,3 @@ class ExperimentDatasetCase(unittest.TestCase):
with self.assertRaises(KeyError):
self.exp.append(KEY, 0)
def test_write_hdf5_options(self):
data = np.random.randint(0, 1024, 1024)
self.exp.set(
KEY,
data,
hdf5_options=dict(
compression="gzip",
compression_opts=6,
shuffle=True,
fletcher32=True
),
)
with h5py.File("test.h5", "a", "core", backing_store=False) as f:
self.dataset_mgr.write_hdf5(f)
self.assertTrue(np.array_equal(f["datasets"][KEY][()], data))
self.assertEqual(f["datasets"][KEY].compression, "gzip")
self.assertEqual(f["datasets"][KEY].compression_opts, 6)
self.assertTrue(f["datasets"][KEY].shuffle)
self.assertTrue(f["datasets"][KEY].fletcher32)
def test_write_hdf5_no_options(self):
data = np.random.randint(0, 1024, 1024)
self.exp.set(KEY, data)
with h5py.File("test.h5", "a", "core", backing_store=False) as f:
self.dataset_mgr.write_hdf5(f)
self.assertTrue(np.array_equal(f["datasets"][KEY][()], data))
self.assertIsNone(f["datasets"][KEY].compression)
def test_write_hdf5_invalid_type(self):
class CustomType:
def __init__(self, x):
self.x = x
self.exp.set(KEY, CustomType(42))
with h5py.File("test.h5", "w", "core", backing_store=False) as f:
with self.assertRaisesRegex(TypeError, "CustomType"):
self.dataset_mgr.write_hdf5(f)

View File

@ -6,7 +6,6 @@ from time import time, sleep
from artiq.experiment import *
from artiq.master.scheduler import Scheduler
from artiq.master.databases import make_dataset
class EmptyExperiment(EnvExperiment):
@ -288,13 +287,8 @@ class SchedulerCase(unittest.TestCase):
nonlocal termination_ok
self.assertEqual(
mod,
{
"action": "setitem",
"key": "termination_ok",
"value": make_dataset(value=True),
"path": []
}
)
{"action": "setitem", "key": "termination_ok",
"value": (False, True), "path": []})
termination_ok = True
handlers = {
"update_dataset": check_termination