forked from M-Labs/artiq
1
0
Fork 0

master: Always write results to HDF5 once run stage is reached

Previously, a significant risk of losing experimental results would
be associated with long-running experiments, as any stray exceptions
while run()ing the experiment – for instance, due to infrequent
network glitches or hardware reliability issue – would cause no
HDF5 file to be written. This was especially troublesome as long
experiments would suffer from a higher probability of unanticipated
failures, while at the same time being more costly to re-take in
terms of wall-clock time.

Unanticipated uncaught exceptions like that were enough of an issue
that several Oxford codebases had come up with their own half-baked
mitigation strategies, from swallowing all exceptions in run() by
convention, to always broadcasting all results to uniquely named
datasets such that the partial results could be recovered and written
to HDF5 by manually run recovery experiments.

This commit addresses the problem at its source, changing the worker
behaviour such that an HDF5 file is always written as soon as run()
starts.
This commit is contained in:
David Nadlinger 2020-06-15 00:37:07 +01:00
parent d87042597a
commit 7955b63b00
4 changed files with 22 additions and 28 deletions

View File

@ -16,6 +16,7 @@ Highlights:
* `ad9910`: The maximum amplitude scale factor is now `0x3fff` (was `0x3ffe` * `ad9910`: The maximum amplitude scale factor is now `0x3fff` (was `0x3ffe`
before). before).
* Applets now restart if they are running and a ccb call changes their spec * Applets now restart if they are running and a ccb call changes their spec
* Experiment results are now always saved to HDF5, even if run() fails.
Breaking changes: Breaking changes:

View File

@ -115,7 +115,6 @@ class Run:
run = _mk_worker_method("run") run = _mk_worker_method("run")
resume = _mk_worker_method("resume") resume = _mk_worker_method("resume")
analyze = _mk_worker_method("analyze") analyze = _mk_worker_method("analyze")
write_results = _mk_worker_method("write_results")
class RunPool: class RunPool:
@ -309,13 +308,8 @@ class AnalyzeStage(TaskObject):
try: try:
await run.analyze() await run.analyze()
except: except:
logger.error("got worker exception in analyze stage of RID %d." logger.error("got worker exception in analyze stage of RID %d.",
" Results will still be saved.", run.rid) run.rid)
log_worker_exception()
try:
await run.write_results()
except:
logger.error("failed to write results of RID %d.", run.rid)
log_worker_exception() log_worker_exception()
self.delete_cb(run.rid) self.delete_cb(run.rid)

View File

@ -293,10 +293,6 @@ class Worker:
async def analyze(self): async def analyze(self):
await self._worker_action({"action": "analyze"}) await self._worker_action({"action": "analyze"})
async def write_results(self, timeout=15.0):
await self._worker_action({"action": "write_results"},
timeout)
async def examine(self, rid, file, timeout=20.0): async def examine(self, rid, file, timeout=20.0):
self.rid = rid self.rid = rid
self.filename = os.path.basename(file) self.filename = os.path.basename(file)

View File

@ -252,6 +252,16 @@ def main():
exp_inst = None exp_inst = None
repository_path = None repository_path = None
def write_results():
filename = "{:09}-{}.h5".format(rid, exp.__name__)
with h5py.File(filename, "w") as f:
dataset_mgr.write_hdf5(f)
f["artiq_version"] = artiq_version
f["rid"] = rid
f["start_time"] = start_time
f["run_time"] = run_time
f["expid"] = pyon.encode(expid)
device_mgr = DeviceManager(ParentDeviceDB, device_mgr = DeviceManager(ParentDeviceDB,
virtual_devices={"scheduler": Scheduler(), virtual_devices={"scheduler": Scheduler(),
"ccb": CCB()}) "ccb": CCB()})
@ -292,27 +302,20 @@ def main():
put_completed() put_completed()
elif action == "run": elif action == "run":
run_time = time.time() run_time = time.time()
exp_inst.run() try:
exp_inst.run()
except:
# Only write results in run() on failure; on success wait
# for end of analyze stage.
write_results()
raise
put_completed() put_completed()
elif action == "analyze": elif action == "analyze":
try: try:
exp_inst.analyze() exp_inst.analyze()
except:
# make analyze failure non-fatal, as we may still want to
# write results afterwards
put_exception_report()
else:
put_completed() put_completed()
elif action == "write_results": finally:
filename = "{:09}-{}.h5".format(rid, exp.__name__) write_results()
with h5py.File(filename, "w") as f:
dataset_mgr.write_hdf5(f)
f["artiq_version"] = artiq_version
f["rid"] = rid
f["start_time"] = start_time
f["run_time"] = run_time
f["expid"] = pyon.encode(expid)
put_completed()
elif action == "examine": elif action == "examine":
examine(ExamineDeviceMgr, ExamineDatasetMgr, obj["file"]) examine(ExamineDeviceMgr, ExamineDatasetMgr, obj["file"])
put_completed() put_completed()