From 7955b63b002011497b05f715bb0c7eec4ba01a65 Mon Sep 17 00:00:00 2001 From: David Nadlinger Date: Mon, 15 Jun 2020 00:37:07 +0100 Subject: [PATCH] master: Always write results to HDF5 once run stage is reached MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, a significant risk of losing experimental results would be associated with long-running experiments, as any stray exceptions while run()ing the experiment – for instance, due to infrequent network glitches or hardware reliability issue – would cause no HDF5 file to be written. This was especially troublesome as long experiments would suffer from a higher probability of unanticipated failures, while at the same time being more costly to re-take in terms of wall-clock time. Unanticipated uncaught exceptions like that were enough of an issue that several Oxford codebases had come up with their own half-baked mitigation strategies, from swallowing all exceptions in run() by convention, to always broadcasting all results to uniquely named datasets such that the partial results could be recovered and written to HDF5 by manually run recovery experiments. This commit addresses the problem at its source, changing the worker behaviour such that an HDF5 file is always written as soon as run() starts. --- RELEASE_NOTES.rst | 1 + artiq/master/scheduler.py | 10 ++-------- artiq/master/worker.py | 4 ---- artiq/master/worker_impl.py | 35 +++++++++++++++++++---------------- 4 files changed, 22 insertions(+), 28 deletions(-) diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 503a6c675..bb287f4a5 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -16,6 +16,7 @@ Highlights: * `ad9910`: The maximum amplitude scale factor is now `0x3fff` (was `0x3ffe` before). * Applets now restart if they are running and a ccb call changes their spec +* Experiment results are now always saved to HDF5, even if run() fails. Breaking changes: diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index a1c16715f..7da7c401d 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -115,7 +115,6 @@ class Run: run = _mk_worker_method("run") resume = _mk_worker_method("resume") analyze = _mk_worker_method("analyze") - write_results = _mk_worker_method("write_results") class RunPool: @@ -309,13 +308,8 @@ class AnalyzeStage(TaskObject): try: await run.analyze() except: - logger.error("got worker exception in analyze stage of RID %d." - " Results will still be saved.", run.rid) - log_worker_exception() - try: - await run.write_results() - except: - logger.error("failed to write results of RID %d.", run.rid) + logger.error("got worker exception in analyze stage of RID %d.", + run.rid) log_worker_exception() self.delete_cb(run.rid) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 9480af8f4..36d5a202f 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -293,10 +293,6 @@ class Worker: async def analyze(self): await self._worker_action({"action": "analyze"}) - async def write_results(self, timeout=15.0): - await self._worker_action({"action": "write_results"}, - timeout) - async def examine(self, rid, file, timeout=20.0): self.rid = rid self.filename = os.path.basename(file) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 5ee78eb35..784e4297a 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -252,6 +252,16 @@ def main(): exp_inst = None repository_path = None + def write_results(): + filename = "{:09}-{}.h5".format(rid, exp.__name__) + with h5py.File(filename, "w") as f: + dataset_mgr.write_hdf5(f) + f["artiq_version"] = artiq_version + f["rid"] = rid + f["start_time"] = start_time + f["run_time"] = run_time + f["expid"] = pyon.encode(expid) + device_mgr = DeviceManager(ParentDeviceDB, virtual_devices={"scheduler": Scheduler(), "ccb": CCB()}) @@ -292,27 +302,20 @@ def main(): put_completed() elif action == "run": run_time = time.time() - exp_inst.run() + try: + exp_inst.run() + except: + # Only write results in run() on failure; on success wait + # for end of analyze stage. + write_results() + raise put_completed() elif action == "analyze": try: exp_inst.analyze() - except: - # make analyze failure non-fatal, as we may still want to - # write results afterwards - put_exception_report() - else: put_completed() - elif action == "write_results": - filename = "{:09}-{}.h5".format(rid, exp.__name__) - with h5py.File(filename, "w") as f: - dataset_mgr.write_hdf5(f) - f["artiq_version"] = artiq_version - f["rid"] = rid - f["start_time"] = start_time - f["run_time"] = run_time - f["expid"] = pyon.encode(expid) - put_completed() + finally: + write_results() elif action == "examine": examine(ExamineDeviceMgr, ExamineDatasetMgr, obj["file"]) put_completed()