Graceful experiment termination. Closes #76

This commit is contained in:
Sebastien Bourdeauducq 2015-10-06 13:50:00 +08:00
parent d94f0211a6
commit 139072d402
7 changed files with 84 additions and 19 deletions

View File

@ -58,6 +58,8 @@ def get_argparser():
parser_delete = subparsers.add_parser("delete", parser_delete = subparsers.add_parser("delete",
help="delete an experiment " help="delete an experiment "
"from the schedule") "from the schedule")
parser_delete.add_argument("-g", action="store_true",
help="request graceful termination")
parser_delete.add_argument("rid", type=int, parser_delete.add_argument("rid", type=int,
help="run identifier (RID)") help="run identifier (RID)")
@ -121,7 +123,10 @@ def _action_submit(remote, args):
def _action_delete(remote, args): def _action_delete(remote, args):
remote.delete(args.rid) if args.g:
remote.request_termination(args.rid)
else:
remote.delete(args.rid)
def _action_set_parameter(remote, args): def _action_set_parameter(remote, args):

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import time import time
from functools import partial
from quamash import QtGui, QtCore from quamash import QtGui, QtCore
from pyqtgraph import dockarea from pyqtgraph import dockarea
@ -71,10 +72,14 @@ class ScheduleDock(dockarea.Dock):
self.addWidget(self.table) self.addWidget(self.table)
self.table.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu) self.table.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
request_termination_action = QtGui.QAction("Request termination", self.table)
request_termination_action.triggered.connect(partial(self.delete_clicked, True))
self.table.addAction(request_termination_action)
delete_action = QtGui.QAction("Delete", self.table) delete_action = QtGui.QAction("Delete", self.table)
delete_action.triggered.connect(self.delete_clicked) delete_action.triggered.connect(partial(self.delete_clicked, False))
self.table.addAction(delete_action) self.table.addAction(delete_action)
async def sub_connect(self, host, port): async def sub_connect(self, host, port):
self.subscriber = Subscriber("schedule", self.init_schedule_model) self.subscriber = Subscriber("schedule", self.init_schedule_model)
await self.subscriber.connect(host, port) await self.subscriber.connect(host, port)
@ -87,13 +92,16 @@ class ScheduleDock(dockarea.Dock):
self.table.setModel(self.table_model) self.table.setModel(self.table_model)
return self.table_model return self.table_model
async def delete(self, rid): async def delete(self, rid, graceful):
await self.schedule_ctl.delete(rid) if graceful:
await self.schedule_ctl.request_termination(rid)
else:
await self.schedule_ctl.delete(rid)
def delete_clicked(self): def delete_clicked(self, graceful):
idx = self.table.selectedIndexes() idx = self.table.selectedIndexes()
if idx: if idx:
row = idx[0].row() row = idx[0].row()
rid = self.table_model.row_to_key[row] rid = self.table_model.row_to_key[row]
self.status_bar.showMessage("Deleted RID {}".format(rid)) self.status_bar.showMessage("Deleted RID {}".format(rid))
asyncio.ensure_future(self.delete(rid)) asyncio.ensure_future(self.delete(rid, graceful))

View File

@ -6,7 +6,8 @@ from collections import namedtuple
from functools import wraps from functools import wraps
__all__ = ["int64", "round64", "kernel", "portable", __all__ = ["int64", "round64", "TerminationRequested",
"kernel", "portable",
"set_time_manager", "set_syscall_manager", "set_watchdog_factory", "set_time_manager", "set_syscall_manager", "set_watchdog_factory",
"RuntimeException", "EncodedException"] "RuntimeException", "EncodedException"]
@ -77,6 +78,11 @@ def round64(x):
return int64(round(x)) return int64(round(x))
class TerminationRequested(Exception):
"""Raised by ``pause`` when the user has requested termination."""
pass
_KernelFunctionInfo = namedtuple("_KernelFunctionInfo", "core_name k_function") _KernelFunctionInfo = namedtuple("_KernelFunctionInfo", "core_name k_function")

View File

@ -57,6 +57,7 @@ class Run:
self.flush = flush self.flush = flush
self.worker = Worker(pool.worker_handlers) self.worker = Worker(pool.worker_handlers)
self.termination_requested = False
self._status = RunStatus.pending self._status = RunStatus.pending
@ -267,7 +268,12 @@ class RunStage(TaskObject):
try: try:
if run.status == RunStatus.paused: if run.status == RunStatus.paused:
run.status = RunStatus.running run.status = RunStatus.running
completed = await run.resume() # clear "termination requested" flag now
# so that if it is set again during the resume, this
# results in another exception.
request_termination = run.termination_requested
run.termination_requested = False
completed = await run.resume(request_termination)
else: else:
run.status = RunStatus.running run.status = RunStatus.running
completed = await run.run() completed = await run.run()
@ -422,3 +428,13 @@ class Scheduler:
def delete(self, rid): def delete(self, rid):
self._deleter.delete(rid) self._deleter.delete(rid)
def request_termination(self, rid):
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid]
if run.status == RunStatus.running or run.status == RunStatus.paused:
run.termination_requested = True
else:
self.delete(rid)
break

View File

@ -221,12 +221,12 @@ class Worker:
self.yield_time = time.monotonic() self.yield_time = time.monotonic()
return completed return completed
async def resume(self): async def resume(self, request_termination):
stop_duration = time.monotonic() - self.yield_time stop_duration = time.monotonic() - self.yield_time
for wid, expiry in self.watchdogs: for wid, expiry in self.watchdogs:
self.watchdogs[wid] += stop_duration self.watchdogs[wid] += stop_duration
completed = await self._worker_action({"status": "ok", completed = await self._worker_action({"status": "ok",
"data": None}) "data": request_termination})
if not completed: if not completed:
self.yield_time = time.monotonic() self.yield_time = time.monotonic()
return completed return completed

View File

@ -6,7 +6,7 @@ from artiq.protocols import pyon
from artiq.tools import file_import from artiq.tools import file_import
from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output from artiq.master.worker_db import DeviceManager, ResultDB, get_hdf5_output
from artiq.language.environment import is_experiment from artiq.language.environment import is_experiment
from artiq.language.core import set_watchdog_factory from artiq.language.core import set_watchdog_factory, TerminationRequested
def get_object(): def get_object():
@ -93,7 +93,11 @@ set_watchdog_factory(Watchdog)
class Scheduler: class Scheduler:
pause = staticmethod(make_parent_action("pause", "")) pause_noexc = staticmethod(make_parent_action("pause", ""))
def pause(self):
if self.pause_noexc():
raise TerminationRequested
submit = staticmethod(make_parent_action("scheduler_submit", submit = staticmethod(make_parent_action("scheduler_submit",
"pipeline_name expid priority due_date flush")) "pipeline_name expid priority due_date flush"))

View File

@ -21,9 +21,12 @@ class BackgroundExperiment(EnvExperiment):
self.setattr_device("scheduler") self.setattr_device("scheduler")
def run(self): def run(self):
while True: try:
self.scheduler.pause() while True:
sleep(0.2) self.scheduler.pause()
sleep(0.2)
except TerminationRequested:
self.set_parameter("termination_ok", True)
def _get_expid(name): def _get_expid(name):
@ -103,13 +106,25 @@ class SchedulerCase(unittest.TestCase):
def test_pause(self): def test_pause(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(0, dict(), None)
termination_ok = False
def check_termination(key, value):
nonlocal termination_ok
self.assertEqual(key, "termination_ok")
self.assertEqual(value, True)
termination_ok = True
handlers = {
"set_parameter": check_termination
}
scheduler = Scheduler(0, handlers, None)
expid_bg = _get_expid("BackgroundExperiment") expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid) expect = _get_basic_steps(1, expid)
background_running = asyncio.Event() background_running = asyncio.Event()
done = asyncio.Event() empty_completed = asyncio.Event()
background_completed = asyncio.Event()
expect_idx = 0 expect_idx = 0
def notify(mod): def notify(mod):
nonlocal expect_idx nonlocal expect_idx
@ -118,18 +133,29 @@ class SchedulerCase(unittest.TestCase):
"key": "status", "key": "status",
"action": "setitem"}: "action": "setitem"}:
background_running.set() background_running.set()
if mod == {"path": [0],
"value": "deleting",
"key": "status",
"action": "setitem"}:
background_completed.set()
if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1): if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1):
self.assertEqual(mod, expect[expect_idx]) self.assertEqual(mod, expect[expect_idx])
expect_idx += 1 expect_idx += 1
if expect_idx >= len(expect): if expect_idx >= len(expect):
done.set() empty_completed.set()
scheduler.notifier.publish = notify scheduler.notifier.publish = notify
scheduler.start() scheduler.start()
scheduler.submit("main", expid_bg, -99, None, False) scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait()) loop.run_until_complete(background_running.wait())
scheduler.submit("main", expid, 0, None, False) scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait()) loop.run_until_complete(empty_completed.wait())
self.assertFalse(termination_ok)
scheduler.request_termination(0)
loop.run_until_complete(background_completed.wait())
self.assertTrue(termination_ok)
loop.run_until_complete(scheduler.stop()) loop.run_until_complete(scheduler.stop())
def test_flush(self): def test_flush(self):