mirror of https://github.com/m-labs/artiq.git
scheduler: add check_pause function
This commit is contained in:
parent
5853e31ac2
commit
03a69ec5b7
|
@ -0,0 +1,22 @@
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
from artiq.experiment import *
|
||||||
|
|
||||||
|
|
||||||
|
class CorePause(EnvExperiment):
|
||||||
|
def build(self):
|
||||||
|
self.setattr_device("core")
|
||||||
|
self.setattr_device("scheduler")
|
||||||
|
|
||||||
|
@kernel
|
||||||
|
def k(self):
|
||||||
|
print("kernel starting")
|
||||||
|
while not self.scheduler.check_pause():
|
||||||
|
print("main kernel loop running...")
|
||||||
|
sleep(1)
|
||||||
|
print("kernel exiting")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while True:
|
||||||
|
self.k()
|
||||||
|
self.scheduler.pause()
|
|
@ -95,7 +95,8 @@ def main():
|
||||||
"scheduler_submit": scheduler.submit,
|
"scheduler_submit": scheduler.submit,
|
||||||
"scheduler_delete": scheduler.delete,
|
"scheduler_delete": scheduler.delete,
|
||||||
"scheduler_request_termination": scheduler.request_termination,
|
"scheduler_request_termination": scheduler.request_termination,
|
||||||
"scheduler_get_status": scheduler.get_status
|
"scheduler_get_status": scheduler.get_status,
|
||||||
|
"scheduler_check_pause": scheduler.check_pause
|
||||||
})
|
})
|
||||||
experiment_db.scan_repository_async()
|
experiment_db.scan_repository_async()
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import h5py
|
||||||
from llvmlite_artiq import binding as llvm
|
from llvmlite_artiq import binding as llvm
|
||||||
|
|
||||||
from artiq.language.environment import EnvExperiment, ProcessArgumentManager
|
from artiq.language.environment import EnvExperiment, ProcessArgumentManager
|
||||||
|
from artiq.language.types import TBool
|
||||||
from artiq.master.databases import DeviceDB, DatasetDB
|
from artiq.master.databases import DeviceDB, DatasetDB
|
||||||
from artiq.master.worker_db import DeviceManager, DatasetManager
|
from artiq.master.worker_db import DeviceManager, DatasetManager
|
||||||
from artiq.coredevice.core import CompileError, host_only
|
from artiq.coredevice.core import CompileError, host_only
|
||||||
|
@ -107,6 +108,9 @@ class DummyScheduler:
|
||||||
def get_status(self):
|
def get_status(self):
|
||||||
return dict()
|
return dict()
|
||||||
|
|
||||||
|
def check_pause(self, rid=None) -> TBool:
|
||||||
|
return False
|
||||||
|
|
||||||
@host_only
|
@host_only
|
||||||
def pause(self):
|
def pause(self):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -437,3 +437,30 @@ class Scheduler:
|
||||||
"""Returns a dictionary containing information about the runs currently
|
"""Returns a dictionary containing information about the runs currently
|
||||||
tracked by the scheduler."""
|
tracked by the scheduler."""
|
||||||
return self.notifier.read
|
return self.notifier.read
|
||||||
|
|
||||||
|
def check_pause(self, rid):
|
||||||
|
"""Returns ``True`` if there is a condition that could make ``pause``
|
||||||
|
not return immediately (termination requested or higher priority run).
|
||||||
|
|
||||||
|
The typical purpose of this function is to check from a kernel
|
||||||
|
whether returning control to the host and pausing would have an effect,
|
||||||
|
in order to avoid the cost of switching kernels in the common case
|
||||||
|
where ``pause`` does nothing.
|
||||||
|
"""
|
||||||
|
for pipeline in self._pipelines.values():
|
||||||
|
if rid in pipeline.pool.runs:
|
||||||
|
run = pipeline.pool.runs[rid]
|
||||||
|
if run.status != RunStatus.running:
|
||||||
|
return False
|
||||||
|
if run.termination_requested:
|
||||||
|
return True
|
||||||
|
|
||||||
|
prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done,
|
||||||
|
pipeline.pool.runs.values())
|
||||||
|
try:
|
||||||
|
r = max(prepared_runs, key=lambda r: r.priority_key())
|
||||||
|
except ValueError:
|
||||||
|
# prepared_runs is an empty sequence
|
||||||
|
return False
|
||||||
|
return r.priority_key() > run.priority_key()
|
||||||
|
raise KeyError("RID not found")
|
||||||
|
|
|
@ -15,6 +15,7 @@ from artiq.master.worker_db import DeviceManager, DatasetManager
|
||||||
from artiq.language.environment import (is_experiment, TraceArgumentManager,
|
from artiq.language.environment import (is_experiment, TraceArgumentManager,
|
||||||
ProcessArgumentManager)
|
ProcessArgumentManager)
|
||||||
from artiq.language.core import set_watchdog_factory, TerminationRequested
|
from artiq.language.core import set_watchdog_factory, TerminationRequested
|
||||||
|
from artiq.language.types import TBool
|
||||||
from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
|
from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
|
||||||
from artiq import __version__ as artiq_version
|
from artiq import __version__ as artiq_version
|
||||||
|
|
||||||
|
@ -96,6 +97,12 @@ class Scheduler:
|
||||||
self.expid = expid
|
self.expid = expid
|
||||||
self.priority = priority
|
self.priority = priority
|
||||||
|
|
||||||
|
_check_pause = staticmethod(make_parent_action("scheduler_check_pause"))
|
||||||
|
def check_pause(self, rid=None) -> TBool:
|
||||||
|
if rid is None:
|
||||||
|
rid = self.rid
|
||||||
|
return self._check_pause(rid)
|
||||||
|
|
||||||
|
|
||||||
def get_exp(file, class_name):
|
def get_exp(file, class_name):
|
||||||
module = file_import(file, prefix="artiq_worker_")
|
module = file_import(file, prefix="artiq_worker_")
|
||||||
|
|
Loading…
Reference in New Issue