worker_db: write hdf5 dataset metadata

pull/2130/head
Simon Renblad 2023-06-30 16:54:39 +08:00 committed by Sébastien Bourdeauducq
parent bf38fc8b0f
commit bfbe13e51b
1 changed files with 17 additions and 5 deletions

View File

@ -111,11 +111,12 @@ class DatasetManager:
self._broadcaster = Notifier(dict())
self.local = dict()
self.archive = dict()
self.metadata = dict()
self.ddb = ddb
self._broadcaster.publish = ddb.update
def set(self, key, value, broadcast=False, persist=False, archive=True):
def set(self, key, value, metadata, broadcast, persist, archive):
if persist:
broadcast = True
@ -123,7 +124,7 @@ class DatasetManager:
logger.warning(f"Dataset '{key}' will not be stored. Both 'broadcast' and 'archive' are set to False.")
if broadcast:
self._broadcaster[key] = persist, value
self._broadcaster[key] = persist, value, metadata
elif key in self._broadcaster.raw_view:
del self._broadcaster[key]
@ -131,6 +132,8 @@ class DatasetManager:
self.local[key] = value
elif key in self.local:
del self.local[key]
self.metadata[key] = metadata
def _get_mutation_target(self, key):
target = self.local.get(key, None)
@ -166,21 +169,30 @@ class DatasetManager:
self.archive[key] = data
return data
def get_metadata(self, key):
if key in self.metadata:
return self.metadata[key]
return self.ddb.get_metadata(key)
def write_hdf5(self, f):
datasets_group = f.create_group("datasets")
for k, v in self.local.items():
_write(datasets_group, k, v)
m = self.metadata.get(k, {})
_write(datasets_group, k, v, m)
archive_group = f.create_group("archive")
for k, v in self.archive.items():
_write(archive_group, k, v)
m = self.metadata.get(k, {})
_write(archive_group, k, v, m)
def _write(group, k, v):
def _write(group, k, v, m):
# Add context to exception message when the user writes a dataset that is
# not representable in HDF5.
try:
group[k] = v
for key, val in m.items():
group[k].attrs[key] = val
except TypeError as e:
raise TypeError("Error writing dataset '{}' of type '{}': {}".format(
k, type(v), e))