environment,worker_db: mutate datasets from experiments via dedicated method instead of Notifier. Closes #345

This commit is contained in:
Sebastien Bourdeauducq 2016-03-29 16:26:14 +08:00
parent a545598d4c
commit 84d50c4caf
4 changed files with 40 additions and 22 deletions

View File

@ -9,6 +9,8 @@ unreleased
* The CPU speed in the pipistrello gateware has been reduced from 83 1/3 MHz to * The CPU speed in the pipistrello gateware has been reduced from 83 1/3 MHz to
75 MHz. This will reduce the achievable sustained pulse rate and latency 75 MHz. This will reduce the achievable sustained pulse rate and latency
accordingly. ISE was intermittently failing to meet timing (#341). accordingly. ISE was intermittently failing to meet timing (#341).
* set_dataset in broadcast mode no longer returns a Notifier. Mutating datasets
should be done with mutate_dataset instead (#345).
1.0rc1 1.0rc1

View File

@ -221,13 +221,11 @@ class HasEnvironment:
broadcast=False, persist=False, save=True): broadcast=False, persist=False, save=True):
"""Sets the contents and handling modes of a dataset. """Sets the contents and handling modes of a dataset.
If the dataset is broadcasted, it must be PYON-serializable. Datasets must be scalars (``bool``, ``int``, ``float`` or NumPy scalar)
If the dataset is saved, it must be a scalar (``bool``, ``int``, or NumPy arrays.
``float`` or NumPy scalar) or a NumPy array.
:param broadcast: the data is sent in real-time to the master, which :param broadcast: the data is sent in real-time to the master, which
dispatches it. Returns a Notifier that can be used to mutate the dispatches it.
dataset.
:param persist: the master should store the data on-disk. Implies :param persist: the master should store the data on-disk. Implies
broadcast. broadcast.
:param save: the data is saved into the local storage of the current :param save: the data is saved into the local storage of the current
@ -238,7 +236,19 @@ class HasEnvironment:
return return
if self.__dataset_mgr is None: if self.__dataset_mgr is None:
raise ValueError("Dataset manager not present") raise ValueError("Dataset manager not present")
return self.__dataset_mgr.set(key, value, broadcast, persist, save) self.__dataset_mgr.set(key, value, broadcast, persist, save)
def mutate_dataset(self, key, index, value):
"""Mutate an existing dataset at the given index (e.g. set a value at
a given position in a NumPy array)
If the dataset was created in broadcast mode, the modification is
immediately transmitted."""
if self.__parent is not None:
self.__parent.mutate_dataset(key, index, value)
if self.__dataset_mgr is None:
raise ValueError("Dataset manager not present")
self.__dataset_mgr.mutate(key, index, value)
def get_dataset(self, key, default=NoDefault): def get_dataset(self, key, default=NoDefault):
"""Returns the contents of a dataset. """Returns the contents of a dataset.

View File

@ -228,20 +228,26 @@ class DatasetManager:
def set(self, key, value, broadcast=False, persist=False, save=True): def set(self, key, value, broadcast=False, persist=False, save=True):
if persist: if persist:
broadcast = True broadcast = True
r = None
if broadcast: if broadcast:
self.broadcast[key] = (persist, value) self.broadcast[key] = persist, value
r = self.broadcast[key][1]
if save: if save:
self.local[key] = value self.local[key] = value
return r
def mutate(self, key, index, value):
target = None
if key in self.local:
target = self.local[key]
if key in self.broadcast.read:
target = self.broadcast[key][1]
if target is None:
raise KeyError("Cannot mutate non-existing dataset")
target[index] = value
def get(self, key): def get(self, key):
try: if key in self.local:
return self.local[key] return self.local[key]
except KeyError: else:
pass return self.ddb.get(key)
return self.ddb.get(key)
def write_hdf5(self, f): def write_hdf5(self, f):
result_dict_to_hdf5(f, self.local) result_dict_to_hdf5(f, self.local)

View File

@ -38,19 +38,19 @@ class FloppingF(EnvExperiment):
def run(self): def run(self):
l = len(self.frequency_scan) l = len(self.frequency_scan)
frequency = self.set_dataset("flopping_f_frequency", self.set_dataset("flopping_f_frequency",
np.full(l, np.nan), np.full(l, np.nan),
broadcast=True, save=False) broadcast=True, save=False)
brightness = self.set_dataset("flopping_f_brightness", self.set_dataset("flopping_f_brightness",
np.full(l, np.nan), np.full(l, np.nan),
broadcast=True) broadcast=True)
self.set_dataset("flopping_f_fit", np.full(l, np.nan), self.set_dataset("flopping_f_fit", np.full(l, np.nan),
broadcast=True, save=False) broadcast=True, save=False)
for i, f in enumerate(self.frequency_scan): for i, f in enumerate(self.frequency_scan):
m_brightness = model(f, self.F0) + self.noise_amplitude*random.random() m_brightness = model(f, self.F0) + self.noise_amplitude*random.random()
frequency[i] = f self.mutate_dataset("flopping_f_frequency", i, f)
brightness[i] = m_brightness self.mutate_dataset("flopping_f_brightness", i, m_brightness)
time.sleep(0.1) time.sleep(0.1)
self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid, self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid,
self.scheduler.priority, time.time() + 20, False) self.scheduler.priority, time.time() + 20, False)