From 69d96b015801266f89a08c8dc59f5aa1d98dcf35 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 18 Oct 2016 17:08:36 +0800 Subject: [PATCH] master: archive input datasets. Closes #587 --- RELEASE_NOTES.rst | 3 +++ artiq/browser/files.py | 10 ++++++++- .../repository/flopping_f_simulation.py | 5 +++-- artiq/language/environment.py | 14 ++++++++---- artiq/master/worker_db.py | 22 ++++++++++++++++--- artiq/master/worker_impl.py | 2 +- 6 files changed, 45 insertions(+), 11 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index ca452af70..16bda10f0 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -13,6 +13,9 @@ Release notes * Dynamic __getattr__'s returning RPC target methods are not supported anymore. Controller driver classes must define all their methods intended for RPC as members. +* Datasets requested by experiments are by default archived into their HDF5 + output. If this behavior is undesirable, turn it off by passing + ``archive=False`` to ``get_dataset``. 2.0 diff --git a/artiq/browser/files.py b/artiq/browser/files.py index 1a45c7938..1dfbac28e 100644 --- a/artiq/browser/files.py +++ b/artiq/browser/files.py @@ -187,8 +187,16 @@ class FilesDock(QtWidgets.QDockWidget): except: logger.warning("unable to read metadata from %s", info.filePath(), exc_info=True) + rd = dict() + if "archive" in f: + rd = {k: (True, v.value) for k, v in f["archive"].items()} if "datasets" in f: - rd = {k: (True, v.value) for k, v in f["datasets"].items()} + for k, v in f["datasets"].items(): + if k in rd: + logger.warning("dataset '%s' is both in archive and " + "outputs", k) + rd[k] = (True, v.value) + if rd: self.datasets.init(rd) self.dataset_changed.emit(info.filePath()) diff --git a/artiq/examples/master/repository/flopping_f_simulation.py b/artiq/examples/master/repository/flopping_f_simulation.py index 1af331999..1693ab201 100644 --- a/artiq/examples/master/repository/flopping_f_simulation.py +++ b/artiq/examples/master/repository/flopping_f_simulation.py @@ -58,7 +58,7 @@ class FloppingF(EnvExperiment): # Use get_dataset so that analyze can be run stand-alone. brightness = self.get_dataset("flopping_f_brightness") try: - frequency = self.get_dataset("flopping_f_frequency") + frequency = self.get_dataset("flopping_f_frequency", archive=False) except KeyError: # Since flopping_f_frequency is not saved, it is missing if # analyze() is run on HDF5 data. But assuming that the arguments @@ -68,7 +68,8 @@ class FloppingF(EnvExperiment): self.set_dataset("flopping_f_frequency", frequency, broadcast=True, save=False) popt, pcov = curve_fit(model, frequency, brightness, - p0=[self.get_dataset("flopping_freq", 1500.0)]) + p0=[self.get_dataset("flopping_freq", 1500.0, + archive=False)]) perr = np.sqrt(np.diag(pcov)) if perr < 0.1: F0 = float(popt) diff --git a/artiq/language/environment.py b/artiq/language/environment.py index 2fd2b39b1..822279e48 100644 --- a/artiq/language/environment.py +++ b/artiq/language/environment.py @@ -303,7 +303,7 @@ class HasEnvironment: as ``slice(*sub_tuple)`` (multi-dimensional slicing).""" self.__dataset_mgr.mutate(key, index, value) - def get_dataset(self, key, default=NoDefault): + def get_dataset(self, key, default=NoDefault, archive=True): """Returns the contents of a dataset. The local storage is searched first, followed by the master storage @@ -312,19 +312,25 @@ class HasEnvironment: If the dataset does not exist, returns the default value. If no default is provided, raises ``KeyError``. + + By default, datasets obtained by this method are archived into the output + HDF5 file of the experiment. If an archived dataset is requested more + than one time (and therefore its value has potentially changed) or is + modified, a warning is emitted. Archival can be turned off by setting + the ``archive`` argument to ``False``. """ try: - return self.__dataset_mgr.get(key) + return self.__dataset_mgr.get(key, archive) except KeyError: if default is NoDefault: raise else: return default - def setattr_dataset(self, key, default=NoDefault): + def setattr_dataset(self, key, default=NoDefault, archive=True): """Sets the contents of a dataset as attribute. The names of the dataset and of the attribute are the same.""" - setattr(self, key, self.get_dataset(key, default)) + setattr(self, key, self.get_dataset(key, default, archive)) class Experiment: diff --git a/artiq/master/worker_db.py b/artiq/master/worker_db.py index e00da8a1e..29d4f11c7 100644 --- a/artiq/master/worker_db.py +++ b/artiq/master/worker_db.py @@ -181,11 +181,17 @@ class DatasetManager: def __init__(self, ddb): self.broadcast = Notifier(dict()) self.local = dict() + self.archive = dict() self.ddb = ddb self.broadcast.publish = ddb.update def set(self, key, value, broadcast=False, persist=False, save=True): + if key in self.archive: + logger.warning("Modifying dataset '%s' which is in archive, " + "archive will remain untouched", + key, stack_info=True) + if persist: broadcast = True if broadcast: @@ -211,12 +217,22 @@ class DatasetManager: index = slice(*index) setitem(target, index, value) - def get(self, key): + def get(self, key, archive): if key in self.local: return self.local[key] else: - return self.ddb.get(key) + data = self.ddb.get(key) + if archive: + if key in self.archive: + logger.warning("Dataset '%s' is already in archive, " + "overwriting", key, stack_info=True) + self.archive[key] = data + return data def write_hdf5(self, f): + datasets_group = f.create_group("datasets") for k, v in self.local.items(): - f[k] = v + datasets_group[k] = v + archive_group = f.create_group("archive") + for k, v in self.archive.items(): + archive_group[k] = v diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 0c99c5238..854d1a31f 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -245,7 +245,7 @@ def main(): elif action == "write_results": filename = "{:09}-{}.h5".format(rid, exp.__name__) with h5py.File(filename, "w") as f: - dataset_mgr.write_hdf5(f.create_group("datasets")) + dataset_mgr.write_hdf5(f) f["artiq_version"] = artiq_version f["rid"] = rid f["start_time"] = int(time.mktime(start_time))