From 03a69ec5b7c67f3586a4da0e5563307d9aaadebf Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 27 Jun 2016 14:37:29 +0800 Subject: [PATCH] scheduler: add check_pause function --- .../examples/master/repository/core_pause.py | 22 +++++++++++++++ artiq/frontend/artiq_master.py | 3 ++- artiq/frontend/artiq_run.py | 4 +++ artiq/master/scheduler.py | 27 +++++++++++++++++++ artiq/master/worker_impl.py | 7 +++++ 5 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 artiq/examples/master/repository/core_pause.py diff --git a/artiq/examples/master/repository/core_pause.py b/artiq/examples/master/repository/core_pause.py new file mode 100644 index 000000000..f483f5f6b --- /dev/null +++ b/artiq/examples/master/repository/core_pause.py @@ -0,0 +1,22 @@ +from time import sleep + +from artiq.experiment import * + + +class CorePause(EnvExperiment): + def build(self): + self.setattr_device("core") + self.setattr_device("scheduler") + + @kernel + def k(self): + print("kernel starting") + while not self.scheduler.check_pause(): + print("main kernel loop running...") + sleep(1) + print("kernel exiting") + + def run(self): + while True: + self.k() + self.scheduler.pause() diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 5d3d4e11e..105ca612c 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -95,7 +95,8 @@ def main(): "scheduler_submit": scheduler.submit, "scheduler_delete": scheduler.delete, "scheduler_request_termination": scheduler.request_termination, - "scheduler_get_status": scheduler.get_status + "scheduler_get_status": scheduler.get_status, + "scheduler_check_pause": scheduler.check_pause }) experiment_db.scan_repository_async() diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index ef3004ada..a1a0323cd 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -13,6 +13,7 @@ import h5py from llvmlite_artiq import binding as llvm from artiq.language.environment import EnvExperiment, ProcessArgumentManager +from artiq.language.types import TBool from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.coredevice.core import CompileError, host_only @@ -107,6 +108,9 @@ class DummyScheduler: def get_status(self): return dict() + def check_pause(self, rid=None) -> TBool: + return False + @host_only def pause(self): pass diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 6e41a8d48..2ae575c95 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -437,3 +437,30 @@ class Scheduler: """Returns a dictionary containing information about the runs currently tracked by the scheduler.""" return self.notifier.read + + def check_pause(self, rid): + """Returns ``True`` if there is a condition that could make ``pause`` + not return immediately (termination requested or higher priority run). + + The typical purpose of this function is to check from a kernel + whether returning control to the host and pausing would have an effect, + in order to avoid the cost of switching kernels in the common case + where ``pause`` does nothing. + """ + for pipeline in self._pipelines.values(): + if rid in pipeline.pool.runs: + run = pipeline.pool.runs[rid] + if run.status != RunStatus.running: + return False + if run.termination_requested: + return True + + prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done, + pipeline.pool.runs.values()) + try: + r = max(prepared_runs, key=lambda r: r.priority_key()) + except ValueError: + # prepared_runs is an empty sequence + return False + return r.priority_key() > run.priority_key() + raise KeyError("RID not found") diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 272b271d4..e1c6eb07a 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -15,6 +15,7 @@ from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.language.environment import (is_experiment, TraceArgumentManager, ProcessArgumentManager) from artiq.language.core import set_watchdog_factory, TerminationRequested +from artiq.language.types import TBool from artiq.coredevice.core import CompileError, host_only, _render_diagnostic from artiq import __version__ as artiq_version @@ -96,6 +97,12 @@ class Scheduler: self.expid = expid self.priority = priority + _check_pause = staticmethod(make_parent_action("scheduler_check_pause")) + def check_pause(self, rid=None) -> TBool: + if rid is None: + rid = self.rid + return self._check_pause(rid) + def get_exp(file, class_name): module = file_import(file, prefix="artiq_worker_")