From 139072d402d00a9fc78d92fbc0d21ab44e217ec8 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 6 Oct 2015 13:50:00 +0800 Subject: [PATCH] Graceful experiment termination. Closes #76 --- artiq/frontend/artiq_client.py | 7 +++++- artiq/gui/schedule.py | 18 ++++++++++----- artiq/language/core.py | 8 ++++++- artiq/master/scheduler.py | 18 ++++++++++++++- artiq/master/worker.py | 4 ++-- artiq/master/worker_impl.py | 8 +++++-- artiq/test/scheduler.py | 40 ++++++++++++++++++++++++++++------ 7 files changed, 84 insertions(+), 19 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index f3a15fa24..c5e153302 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -58,6 +58,8 @@ def get_argparser(): parser_delete = subparsers.add_parser("delete", help="delete an experiment " "from the schedule") + parser_delete.add_argument("-g", action="store_true", + help="request graceful termination") parser_delete.add_argument("rid", type=int, help="run identifier (RID)") @@ -121,7 +123,10 @@ def _action_submit(remote, args): def _action_delete(remote, args): - remote.delete(args.rid) + if args.g: + remote.request_termination(args.rid) + else: + remote.delete(args.rid) def _action_set_parameter(remote, args): diff --git a/artiq/gui/schedule.py b/artiq/gui/schedule.py index f1606caf9..53d5197e9 100644 --- a/artiq/gui/schedule.py +++ b/artiq/gui/schedule.py @@ -1,5 +1,6 @@ import asyncio import time +from functools import partial from quamash import QtGui, QtCore from pyqtgraph import dockarea @@ -71,10 +72,14 @@ class ScheduleDock(dockarea.Dock): self.addWidget(self.table) self.table.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu) + request_termination_action = QtGui.QAction("Request termination", self.table) + request_termination_action.triggered.connect(partial(self.delete_clicked, True)) + self.table.addAction(request_termination_action) delete_action = QtGui.QAction("Delete", self.table) - delete_action.triggered.connect(self.delete_clicked) + delete_action.triggered.connect(partial(self.delete_clicked, False)) self.table.addAction(delete_action) + async def sub_connect(self, host, port): self.subscriber = Subscriber("schedule", self.init_schedule_model) await self.subscriber.connect(host, port) @@ -87,13 +92,16 @@ class ScheduleDock(dockarea.Dock): self.table.setModel(self.table_model) return self.table_model - async def delete(self, rid): - await self.schedule_ctl.delete(rid) + async def delete(self, rid, graceful): + if graceful: + await self.schedule_ctl.request_termination(rid) + else: + await self.schedule_ctl.delete(rid) - def delete_clicked(self): + def delete_clicked(self, graceful): idx = self.table.selectedIndexes() if idx: row = idx[0].row() rid = self.table_model.row_to_key[row] self.status_bar.showMessage("Deleted RID {}".format(rid)) - asyncio.ensure_future(self.delete(rid)) + asyncio.ensure_future(self.delete(rid, graceful)) diff --git a/artiq/language/core.py b/artiq/language/core.py index a4ca0b883..da362a0e5 100644 --- a/artiq/language/core.py +++ b/artiq/language/core.py @@ -6,7 +6,8 @@ from collections import namedtuple from functools import wraps -__all__ = ["int64", "round64", "kernel", "portable", +__all__ = ["int64", "round64", "TerminationRequested", + "kernel", "portable", "set_time_manager", "set_syscall_manager", "set_watchdog_factory", "RuntimeException", "EncodedException"] @@ -77,6 +78,11 @@ def round64(x): return int64(round(x)) +class TerminationRequested(Exception): + """Raised by ``pause`` when the user has requested termination.""" + pass + + _KernelFunctionInfo = namedtuple("_KernelFunctionInfo", "core_name k_function") diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index fdd925287..41abce3db 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -57,6 +57,7 @@ class Run: self.flush = flush self.worker = Worker(pool.worker_handlers) + self.termination_requested = False self._status = RunStatus.pending @@ -267,7 +268,12 @@ class RunStage(TaskObject): try: if run.status == RunStatus.paused: run.status = RunStatus.running - completed = await run.resume() + # clear "termination requested" flag now + # so that if it is set again during the resume, this + # results in another exception. + request_termination = run.termination_requested + run.termination_requested = False + completed = await run.resume(request_termination) else: run.status = RunStatus.running completed = await run.run() @@ -422,3 +428,13 @@ class Scheduler: def delete(self, rid): self._deleter.delete(rid) + + def request_termination(self, rid): + for pipeline in self._pipelines.values(): + if rid in pipeline.pool.runs: + run = pipeline.pool.runs[rid] + if run.status == RunStatus.running or run.status == RunStatus.paused: + run.termination_requested = True + else: + self.delete(rid) + break diff --git a/artiq/master/worker.py b/artiq/master/worker.py index e94018785..faa2dcf18 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -221,12 +221,12 @@ class Worker: self.yield_time = time.monotonic() return completed - async def resume(self): + async def resume(self, request_termination): stop_duration = time.monotonic() - self.yield_time for wid, expiry in self.watchdogs: self.watchdogs[wid] += stop_duration completed = await self._worker_action({"status": "ok", - "data": None}) + "data": request_termination}) if not completed: self.yield_time = time.monotonic() return completed diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 8f1572cbb..db1229d2e 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -6,7 +6,7 @@ from artiq.protocols import pyon from artiq.tools import file_import from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output from artiq.language.environment import is_experiment -from artiq.language.core import set_watchdog_factory +from artiq.language.core import set_watchdog_factory, TerminationRequested def get_object(): @@ -93,7 +93,11 @@ set_watchdog_factory(Watchdog) class Scheduler: - pause = staticmethod(make_parent_action("pause", "")) + pause_noexc = staticmethod(make_parent_action("pause", "")) + + def pause(self): + if self.pause_noexc(): + raise TerminationRequested submit = staticmethod(make_parent_action("scheduler_submit", "pipeline_name expid priority due_date flush")) diff --git a/artiq/test/scheduler.py b/artiq/test/scheduler.py index 89e89e146..e92ac638c 100644 --- a/artiq/test/scheduler.py +++ b/artiq/test/scheduler.py @@ -21,9 +21,12 @@ class BackgroundExperiment(EnvExperiment): self.setattr_device("scheduler") def run(self): - while True: - self.scheduler.pause() - sleep(0.2) + try: + while True: + self.scheduler.pause() + sleep(0.2) + except TerminationRequested: + self.set_parameter("termination_ok", True) def _get_expid(name): @@ -103,13 +106,25 @@ class SchedulerCase(unittest.TestCase): def test_pause(self): loop = self.loop - scheduler = Scheduler(0, dict(), None) + + termination_ok = False + def check_termination(key, value): + nonlocal termination_ok + self.assertEqual(key, "termination_ok") + self.assertEqual(value, True) + termination_ok = True + handlers = { + "set_parameter": check_termination + } + scheduler = Scheduler(0, handlers, None) + expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid) background_running = asyncio.Event() - done = asyncio.Event() + empty_completed = asyncio.Event() + background_completed = asyncio.Event() expect_idx = 0 def notify(mod): nonlocal expect_idx @@ -118,18 +133,29 @@ class SchedulerCase(unittest.TestCase): "key": "status", "action": "setitem"}: background_running.set() + if mod == {"path": [0], + "value": "deleting", + "key": "status", + "action": "setitem"}: + background_completed.set() if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): self.assertEqual(mod, expect[expect_idx]) expect_idx += 1 if expect_idx >= len(expect): - done.set() + empty_completed.set() scheduler.notifier.publish = notify scheduler.start() scheduler.submit("main", expid_bg, -99, None, False) loop.run_until_complete(background_running.wait()) scheduler.submit("main", expid, 0, None, False) - loop.run_until_complete(done.wait()) + loop.run_until_complete(empty_completed.wait()) + + self.assertFalse(termination_ok) + scheduler.request_termination(0) + loop.run_until_complete(background_completed.wait()) + self.assertTrue(termination_ok) + loop.run_until_complete(scheduler.stop()) def test_flush(self):