artiq/artiq/master/scheduler.py

517 lines
18 KiB
Python
Raw Normal View History

import asyncio
2015-05-17 16:11:00 +08:00
import logging
import csv
import os.path
2015-05-17 16:11:00 +08:00
from enum import Enum
from time import time
2019-11-10 15:55:17 +08:00
from sipyco.sync_struct import Notifier
2019-11-14 15:21:51 +08:00
from sipyco.asyncio_tools import TaskObject, Condition
2019-11-10 15:55:17 +08:00
from artiq.master.worker import Worker, log_worker_exception
2019-11-14 15:21:51 +08:00
from artiq.tools import asyncio_wait_or_cancel
2015-05-17 16:11:00 +08:00
logger = logging.getLogger(__name__)
class RunStatus(Enum):
pending = 0
2015-05-28 17:20:58 +08:00
flushing = 1
preparing = 2
prepare_done = 3
running = 4
run_done = 5
analyzing = 6
deleting = 7
2015-05-28 17:20:58 +08:00
paused = 8
2015-05-17 16:11:00 +08:00
def _mk_worker_method(name):
2015-10-03 19:28:57 +08:00
async def worker_method(self, *args, **kwargs):
if self.worker.closed.is_set():
2015-05-17 16:11:00 +08:00
return True
m = getattr(self.worker, name)
2015-05-17 16:11:00 +08:00
try:
2015-10-03 19:28:57 +08:00
return await m(*args, **kwargs)
2015-05-17 16:11:00 +08:00
except Exception as e:
if isinstance(e, asyncio.CancelledError):
raise
if self.worker.closed.is_set():
2015-05-17 16:11:00 +08:00
logger.debug("suppressing worker exception of terminated run",
exc_info=True)
# Return completion on termination
return True
else:
raise
return worker_method
class Run:
def __init__(self, rid, pipeline_name,
2015-08-07 15:51:56 +08:00
wd, expid, priority, due_date, flush,
pool, **kwargs):
2015-05-17 16:11:00 +08:00
# called through pool
self.rid = rid
self.pipeline_name = pipeline_name
2015-08-07 15:51:56 +08:00
self.wd = wd
2015-05-17 16:11:00 +08:00
self.expid = expid
2015-05-24 01:09:22 +08:00
self.priority = priority
2015-05-17 16:11:00 +08:00
self.due_date = due_date
2015-05-28 17:20:58 +08:00
self.flush = flush
2015-05-17 16:11:00 +08:00
self.worker = Worker(pool.worker_handlers)
self.termination_requested = False
2015-05-17 16:11:00 +08:00
self._status = RunStatus.pending
notification = {
2015-05-17 16:11:00 +08:00
"pipeline": self.pipeline_name,
"expid": self.expid,
2015-05-24 01:09:22 +08:00
"priority": self.priority,
2015-05-17 16:11:00 +08:00
"due_date": self.due_date,
2015-05-28 17:20:58 +08:00
"flush": self.flush,
2015-05-17 16:11:00 +08:00
"status": self._status.name
}
notification.update(kwargs)
self._notifier = pool.notifier
self._notifier[self.rid] = notification
self._state_changed = pool.state_changed
2015-05-17 16:11:00 +08:00
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value
if not self.worker.closed.is_set():
2015-05-17 16:11:00 +08:00
self._notifier[self.rid]["status"] = self._status.name
self._state_changed.notify()
2015-05-17 16:11:00 +08:00
def priority_key(self):
"""Return a comparable value that defines a run priority order.
Applies only to runs the due date of which has already elapsed.
"""
return (self.priority, -(self.due_date or 0), -self.rid)
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def close(self):
2015-05-17 16:11:00 +08:00
# called through pool
2015-10-03 19:28:57 +08:00
await self.worker.close()
2015-05-17 16:11:00 +08:00
del self._notifier[self.rid]
2015-07-09 19:18:12 +08:00
_build = _mk_worker_method("build")
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def build(self):
await self._build(self.rid, self.pipeline_name,
self.wd, self.expid,
self.priority)
2015-07-09 19:18:12 +08:00
prepare = _mk_worker_method("prepare")
2015-05-17 16:11:00 +08:00
run = _mk_worker_method("run")
resume = _mk_worker_method("resume")
analyze = _mk_worker_method("analyze")
class RunPool:
def __init__(self, ridc, worker_handlers, notifier, experiment_db, log_submissions):
2015-05-17 16:11:00 +08:00
self.runs = dict()
self.state_changed = Condition()
2015-05-17 16:11:00 +08:00
self.ridc = ridc
self.worker_handlers = worker_handlers
self.notifier = notifier
self.experiment_db = experiment_db
self.log_submissions = log_submissions
def log_submission(self, rid, expid):
start_time = time()
with open(self.log_submissions, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([rid, start_time, expid["file"]])
2015-05-17 16:11:00 +08:00
2015-05-28 17:20:58 +08:00
def submit(self, expid, priority, due_date, flush, pipeline_name):
"""
Submits an experiment to be run by this pool
If expid has the attribute `repo_rev`, treat it as a git revision or
reference and resolve into a unique git hash before submission
"""
# mutates expid to insert head repository revision if None and
# replaces relative path with the absolute one.
# called through scheduler.
rid = self.ridc.get()
2015-08-07 15:51:56 +08:00
if "repo_rev" in expid:
repo_rev_or_ref = expid["repo_rev"] or self.experiment_db.cur_rev
wd, repo_msg, repo_rev = self.experiment_db.repo_backend.request_rev(repo_rev_or_ref)
# Mutate expid's repo_rev to that returned from request_rev, in case
# a branch was passed instead of a hash
expid["repo_rev"] = repo_rev
2015-08-07 15:51:56 +08:00
else:
if "file" in expid:
expid["file"] = os.path.abspath(expid["file"])
wd, repo_msg = None, None
2015-08-07 15:51:56 +08:00
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self, repo_msg=repo_msg)
2024-02-27 11:10:21 +08:00
if self.log_submissions is not None:
self.log_submission(rid, expid)
2015-05-17 16:11:00 +08:00
self.runs[rid] = run
self.state_changed.notify()
2015-05-17 16:11:00 +08:00
return rid
2015-10-03 19:28:57 +08:00
async def delete(self, rid):
2015-05-17 16:11:00 +08:00
# called through deleter
if rid not in self.runs:
return
2015-08-07 15:51:56 +08:00
run = self.runs[rid]
2015-10-03 19:28:57 +08:00
await run.close()
2015-08-07 15:51:56 +08:00
if "repo_rev" in run.expid:
self.experiment_db.repo_backend.release_rev(run.expid["repo_rev"])
2015-05-17 16:11:00 +08:00
del self.runs[rid]
class PrepareStage(TaskObject):
def __init__(self, pool, delete_cb):
2015-05-17 16:11:00 +08:00
self.pool = pool
self.delete_cb = delete_cb
2015-05-17 16:11:00 +08:00
def _get_run(self):
"""If a run should get prepared now, return it. Otherwise, return a
float giving the time until the next check, or None if no time-based
check is required.
The latter can be the case if there are no due-date runs, or none
of them are going to become next-in-line before further pool state
changes (which will also cause a re-evaluation).
"""
pending_runs = list(
filter(lambda r: r.status == RunStatus.pending,
self.pool.runs.values()))
2015-05-17 16:11:00 +08:00
now = time()
def is_runnable(r):
return (r.due_date or 0) < now
prepared_max = max((r.priority_key() for r in self.pool.runs.values()
if r.status == RunStatus.prepare_done),
default=None)
def takes_precedence(r):
return prepared_max is None or r.priority_key() > prepared_max
candidate = max(filter(is_runnable, pending_runs),
key=lambda r: r.priority_key(),
default=None)
if candidate is not None and takes_precedence(candidate):
return candidate
return min((r.due_date - now for r in pending_runs
if (not is_runnable(r) and takes_precedence(r))),
default=None)
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def _do(self):
2015-05-17 16:11:00 +08:00
while True:
run = self._get_run()
if run is None:
2015-10-03 19:28:57 +08:00
await self.pool.state_changed.wait()
elif isinstance(run, float):
2015-10-03 19:28:57 +08:00
await asyncio_wait_or_cancel([self.pool.state_changed.wait()],
2016-02-29 21:32:48 +08:00
timeout=run)
else:
2015-05-28 17:20:58 +08:00
if run.flush:
run.status = RunStatus.flushing
while not all(r.status in (RunStatus.pending,
RunStatus.deleting)
or r.priority < run.priority
or r is run
for r in self.pool.runs.values()):
ev = [self.pool.state_changed.wait(),
run.worker.closed.wait()]
2015-10-03 19:28:57 +08:00
await asyncio_wait_or_cancel(
ev, return_when=asyncio.FIRST_COMPLETED)
if run.worker.closed.is_set():
break
if run.worker.closed.is_set():
continue
2015-05-17 16:11:00 +08:00
run.status = RunStatus.preparing
try:
2015-10-03 19:28:57 +08:00
await run.build()
await run.prepare()
except Exception:
logger.error("got worker exception in prepare stage, "
"deleting RID %d", run.rid)
log_worker_exception()
2015-05-28 17:20:58 +08:00
self.delete_cb(run.rid)
else:
run.status = RunStatus.prepare_done
2015-05-17 16:11:00 +08:00
2015-05-17 16:11:00 +08:00
class RunStage(TaskObject):
def __init__(self, pool, delete_cb):
self.pool = pool
2015-05-28 17:20:58 +08:00
self.delete_cb = delete_cb
def _get_run(self):
prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done,
self.pool.runs.values())
try:
r = max(prepared_runs, key=lambda r: r.priority_key())
except ValueError:
# prepared_runs is an empty sequence
r = None
return r
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def _do(self):
2015-05-17 16:11:00 +08:00
stack = []
while True:
next_irun = self._get_run()
2015-05-17 16:11:00 +08:00
if not stack or (
next_irun is not None and
next_irun.priority_key() > stack[-1].priority_key()):
while next_irun is None:
2015-10-03 19:28:57 +08:00
await self.pool.state_changed.wait()
next_irun = self._get_run()
stack.append(next_irun)
2015-05-17 16:11:00 +08:00
run = stack.pop()
try:
if run.status == RunStatus.paused:
run.status = RunStatus.running
# clear "termination requested" flag now
# so that if it is set again during the resume, this
# results in another exception.
request_termination = run.termination_requested
run.termination_requested = False
completed = await run.resume(request_termination)
2015-05-17 16:11:00 +08:00
else:
run.status = RunStatus.running
2015-10-03 19:28:57 +08:00
completed = await run.run()
except Exception:
logger.error("got worker exception in run stage, "
"deleting RID %d", run.rid)
log_worker_exception()
2015-05-28 17:20:58 +08:00
self.delete_cb(run.rid)
2015-05-17 16:11:00 +08:00
else:
if completed:
run.status = RunStatus.run_done
else:
run.status = RunStatus.paused
stack.append(run)
2015-05-17 16:11:00 +08:00
class AnalyzeStage(TaskObject):
def __init__(self, pool, delete_cb):
self.pool = pool
2015-05-28 17:20:58 +08:00
self.delete_cb = delete_cb
def _get_run(self):
run_runs = filter(lambda r: r.status == RunStatus.run_done,
self.pool.runs.values())
try:
r = max(run_runs, key=lambda r: r.priority_key())
except ValueError:
# run_runs is an empty sequence
r = None
return r
2015-10-03 19:28:57 +08:00
async def _do(self):
while True:
run = self._get_run()
while run is None:
2015-10-03 19:28:57 +08:00
await self.pool.state_changed.wait()
run = self._get_run()
2015-05-17 16:11:00 +08:00
run.status = RunStatus.analyzing
try:
2015-10-03 19:28:57 +08:00
await run.analyze()
except Exception:
logger.error("got worker exception in analyze stage of RID %d.",
run.rid)
log_worker_exception()
self.delete_cb(run.rid)
2015-05-17 16:11:00 +08:00
class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db, log_submissions):
self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db, log_submissions)
self._prepare = PrepareStage(self.pool, deleter.delete)
self._run = RunStage(self.pool, deleter.delete)
self._analyze = AnalyzeStage(self.pool, deleter.delete)
2015-05-17 16:11:00 +08:00
def start(self, *, loop=None):
self._prepare.start(loop=loop)
self._run.start(loop=loop)
self._analyze.start(loop=loop)
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def stop(self):
2015-05-17 16:11:00 +08:00
# NB: restart of a stopped pipeline is not supported
2015-10-03 19:28:57 +08:00
await self._analyze.stop()
await self._run.stop()
await self._prepare.stop()
2015-05-17 16:11:00 +08:00
class Deleter(TaskObject):
"""Provides a synchronous interface for instigating deletion of runs.
:meth:`RunPool.delete` is an async function (it needs to close the worker
connection, etc.), so we maintain a queue of RIDs to delete on a background task.
"""
2015-05-17 16:11:00 +08:00
def __init__(self, pipelines):
self._pipelines = pipelines
self._queue = asyncio.Queue()
2015-05-17 16:11:00 +08:00
def delete(self, rid):
"""Delete the run with the given RID.
Multiple calls for the same RID are silently ignored.
"""
2015-05-17 16:11:00 +08:00
logger.debug("delete request for RID %d", rid)
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
pipeline.pool.runs[rid].status = RunStatus.deleting
break
2015-05-17 16:11:00 +08:00
self._queue.put_nowait(rid)
2015-10-03 19:28:57 +08:00
async def join(self):
await self._queue.join()
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def _delete(self, rid):
# By looking up the run by RID, we implicitly make sure to delete each run only
# once.
2015-05-17 16:11:00 +08:00
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
logger.debug("deleting RID %d...", rid)
2015-10-03 19:28:57 +08:00
await pipeline.pool.delete(rid)
2015-05-17 16:11:00 +08:00
logger.debug("deletion of RID %d completed", rid)
break
2015-10-03 19:28:57 +08:00
async def _gc_pipelines(self):
2015-05-17 16:11:00 +08:00
pipeline_names = list(self._pipelines.keys())
for name in pipeline_names:
if not self._pipelines[name].pool.runs:
logger.debug("garbage-collecting pipeline '%s'...", name)
2015-10-03 19:28:57 +08:00
await self._pipelines[name].stop()
2015-05-17 16:11:00 +08:00
del self._pipelines[name]
logger.debug("garbage-collection of pipeline '%s' completed",
name)
2015-10-03 19:28:57 +08:00
async def _do(self):
2015-05-17 16:11:00 +08:00
while True:
2015-10-03 19:28:57 +08:00
rid = await self._queue.get()
await self._delete(rid)
await self._gc_pipelines()
2015-05-17 16:11:00 +08:00
self._queue.task_done()
class Scheduler:
def __init__(self, ridc, worker_handlers, experiment_db, log_submissions):
2015-05-17 16:11:00 +08:00
self.notifier = Notifier(dict())
self._pipelines = dict()
self._worker_handlers = worker_handlers
self._experiment_db = experiment_db
2015-05-17 16:11:00 +08:00
self._terminated = False
2016-02-16 01:20:50 +08:00
self._ridc = ridc
2015-05-17 16:11:00 +08:00
self._deleter = Deleter(self._pipelines)
self._log_submissions = log_submissions
2015-05-17 16:11:00 +08:00
def start(self, *, loop=None):
2023-01-10 12:26:24 +08:00
self._loop = loop
self._deleter.start(loop=self._loop)
2015-05-17 16:11:00 +08:00
2015-10-03 19:28:57 +08:00
async def stop(self):
2015-05-17 16:11:00 +08:00
# NB: restart of a stopped scheduler is not supported
self._terminated = True # prevent further runs from being created
for pipeline in self._pipelines.values():
for rid in pipeline.pool.runs.keys():
self._deleter.delete(rid)
2015-10-03 19:28:57 +08:00
await self._deleter.join()
await self._deleter.stop()
2015-05-17 16:11:00 +08:00
if self._pipelines:
logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, priority=0, due_date=None, flush=False):
"""Submits a new run.
When called through an experiment, the default values of
``pipeline_name``, ``expid`` and ``priority`` correspond to those of
the current run."""
# mutates expid to insert head repository revision if None, and
# replaces relative file path with absolute one
2015-05-17 16:11:00 +08:00
if self._terminated:
return
try:
pipeline = self._pipelines[pipeline_name]
except KeyError:
logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter,
2015-08-07 15:51:56 +08:00
self._worker_handlers, self.notifier,
self._experiment_db, self._log_submissions)
2015-05-17 16:11:00 +08:00
self._pipelines[pipeline_name] = pipeline
pipeline.start(loop=self._loop)
2015-05-28 17:20:58 +08:00
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
2015-05-17 16:11:00 +08:00
def delete(self, rid):
"""Kills the run with the specified RID."""
2015-05-17 16:11:00 +08:00
self._deleter.delete(rid)
def request_termination(self, rid):
"""Requests graceful termination of the run with the specified RID."""
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid]
if run.status == RunStatus.running or run.status == RunStatus.paused:
run.termination_requested = True
else:
self.delete(rid)
break
def get_status(self):
"""Returns a dictionary containing information about the runs currently
tracked by the scheduler.
Must not be modified."""
return self.notifier.raw_view
2016-06-27 14:37:29 +08:00
def check_pause(self, rid):
"""Returns ``True`` if there is a condition that could make ``pause``
not return immediately (termination requested or higher priority run).
The typical purpose of this function is to check from a kernel
whether returning control to the host and pausing would have an effect,
in order to avoid the cost of switching kernels in the common case
where ``pause`` does nothing.
This function does not have side effects, and does not have to be
followed by a call to ``pause``.
2016-06-27 14:37:29 +08:00
"""
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
run = pipeline.pool.runs[rid]
if run.status != RunStatus.running:
return False
if run.termination_requested:
return True
prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done,
pipeline.pool.runs.values())
try:
r = max(prepared_runs, key=lambda r: r.priority_key())
except ValueError:
# prepared_runs is an empty sequence
return False
return r.priority_key() > run.priority_key()
raise KeyError("RID not found")
def check_termination(self, rid):
"""Returns ``True`` if termination is requested."""
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
2024-02-28 12:48:31 +08:00
run = pipeline.pool.runs[rid]
if run.termination_requested:
return True
return False