artiq/artiq/master/scheduler.py

367 lines
11 KiB
Python
Raw Normal View History

import asyncio
2015-05-17 16:11:00 +08:00
import logging
from enum import Enum
from time import time
2015-01-14 12:16:49 +08:00
from artiq.master.worker import Worker
2015-05-17 16:11:00 +08:00
from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek
from artiq.protocols.sync_struct import Notifier
2015-05-17 16:11:00 +08:00
logger = logging.getLogger(__name__)
class RunStatus(Enum):
pending = 0
preparing = 1
prepare_done = 2
running = 3
run_done = 4
analyzing = 5
analyze_done = 6
paused = 7
def _mk_worker_method(name):
@asyncio.coroutine
def worker_method(self, *args, **kwargs):
if self._terminated:
return True
m = getattr(self._worker, name)
try:
return (yield from m(*args, **kwargs))
except Exception as e:
if isinstance(e, asyncio.CancelledError):
raise
if self._terminated:
logger.debug("suppressing worker exception of terminated run",
exc_info=True)
# Return completion on termination
return True
else:
raise
return worker_method
class Run:
def __init__(self, rid, pipeline_name,
expid, due_date,
worker_handlers, notifier):
# called through pool
self.rid = rid
self.pipeline_name = pipeline_name
self.expid = expid
self.due_date = due_date
self._status = RunStatus.pending
self._terminated = False
self._worker = Worker(worker_handlers)
self._notifier = notifier
self._notifier[self.rid] = {
"pipeline": self.pipeline_name,
"expid": self.expid,
"due_date": self.due_date,
"status": self._status.name
}
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value
if not self._terminated:
self._notifier[self.rid]["status"] = self._status.name
# The run with the largest priority_key is to be scheduled first
def priority_key(self, now):
if self.due_date is None:
overdue = 0
due_date_k = 0
else:
overdue = int(now > self.due_date)
due_date_k = -self.due_date
return (overdue, due_date_k, -self.rid)
@asyncio.coroutine
def close(self):
# called through pool
self._terminated = True
yield from self._worker.close()
del self._notifier[self.rid]
_prepare = _mk_worker_method("prepare")
@asyncio.coroutine
def prepare(self):
yield from self._prepare(self.rid, self.pipeline_name, self.expid)
2015-05-17 16:11:00 +08:00
run = _mk_worker_method("run")
resume = _mk_worker_method("resume")
analyze = _mk_worker_method("analyze")
write_results = _mk_worker_method("write_results")
class RIDCounter:
def __init__(self, next_rid):
self._next_rid = next_rid
def get(self):
rid = self._next_rid
self._next_rid += 1
return rid
class RunPool:
def __init__(self, ridc, worker_handlers, notifier):
self.runs = dict()
self.submitted_callback = None
self._ridc = ridc
self._worker_handlers = worker_handlers
self._notifier = notifier
def submit(self, expid, due_date, pipeline_name):
# called through scheduler
rid = self._ridc.get()
run = Run(rid, pipeline_name, expid, due_date,
self._worker_handlers, self._notifier)
self.runs[rid] = run
if self.submitted_callback is not None:
self.submitted_callback()
return rid
@asyncio.coroutine
def delete(self, rid):
# called through deleter
if rid not in self.runs:
return
yield from self.runs[rid].close()
del self.runs[rid]
class TaskObject:
2014-10-23 18:48:03 +08:00
def start(self):
2015-05-17 16:11:00 +08:00
self.task = asyncio.async(self._do())
2014-10-23 18:48:03 +08:00
@asyncio.coroutine
def stop(self):
self.task.cancel()
yield from asyncio.wait([self.task])
del self.task
2015-05-17 16:11:00 +08:00
@asyncio.coroutine
def _do(self):
raise NotImplementedError
2015-05-17 16:11:00 +08:00
class PrepareStage(TaskObject):
def __init__(self, pool, outq):
self.pool = pool
self.outq = outq
self.pool_submitted = asyncio.Event()
self.pool.submitted_callback = lambda: self.pool_submitted.set()
@asyncio.coroutine
def _push_runs(self):
"""Pushes all runs that have no due date of have a due date in the
past.
Returns the time before the next schedulable run, or None if the
pool is empty."""
while True:
now = time()
pending_runs = filter(lambda r: r.status == RunStatus.pending,
self.pool.runs.values())
try:
2015-05-17 16:11:00 +08:00
run = max(pending_runs, key=lambda r: r.priority_key(now))
except ValueError:
# pending_runs is an empty sequence
return None
if run.due_date is None or run.due_date < now:
run.status = RunStatus.preparing
yield from run.prepare()
run.status = RunStatus.prepare_done
yield from self.outq.put(run)
else:
return run.due_date - now
2014-12-31 17:41:22 +08:00
2014-10-23 18:48:03 +08:00
@asyncio.coroutine
2015-05-17 16:11:00 +08:00
def _do(self):
while True:
2015-05-17 16:11:00 +08:00
next_timed_in = yield from self._push_runs()
if next_timed_in is None:
# pool is empty - wait for something to be added to it
yield from self.pool_submitted.wait()
else:
# wait for next_timed_in seconds, or until the pool is modified
yield from asyncio_wait_or_cancel([self.pool_submitted.wait()],
timeout=next_timed_in)
self.pool_submitted.clear()
2015-05-17 16:11:00 +08:00
class RunStage(TaskObject):
def __init__(self, deleter, inq, outq):
self.deleter = deleter
self.inq = inq
self.outq = outq
@asyncio.coroutine
def _do(self):
stack = []
while True:
try:
next_irun = asyncio_queue_peek(self.inq)
except asyncio.QueueEmpty:
next_irun = None
now = time()
2015-05-17 16:11:00 +08:00
if not stack or (
next_irun is not None and
next_irun.priority_key(now) > stack[-1].priority_key(now)):
stack.append((yield from self.inq.get()))
2015-05-17 16:11:00 +08:00
run = stack.pop()
try:
if run.status == RunStatus.paused:
run.status = RunStatus.running
completed = yield from run.resume()
else:
run.status = RunStatus.running
completed = yield from run.run()
except:
logger.warning("got worker exception, deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
else:
if completed:
run.status = RunStatus.run_done
yield from self.outq.put(run)
else:
run.status = RunStatus.paused
stack.append(run)
2015-05-17 16:11:00 +08:00
class AnalyzeStage(TaskObject):
def __init__(self, deleter, inq):
self.deleter = deleter
self.inq = inq
@asyncio.coroutine
2015-05-17 16:11:00 +08:00
def _do(self):
while True:
2015-05-17 16:11:00 +08:00
run = yield from self.inq.get()
run.status = RunStatus.analyzing
yield from run.analyze()
yield from run.write_results()
run.status = RunStatus.analyze_done
self.deleter.delete(run.rid)
class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
self.pool = RunPool(ridc, worker_handlers, notifier)
self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(deleter, self._run.outq)
def start(self):
self._prepare.start()
self._run.start()
self._analyze.start()
@asyncio.coroutine
def stop(self):
# NB: restart of a stopped pipeline is not supported
yield from self._analyze.stop()
yield from self._run.stop()
yield from self._prepare.stop()
class Deleter(TaskObject):
def __init__(self, pipelines):
self._pipelines = pipelines
self._queue = asyncio.JoinableQueue()
def delete(self, rid):
logger.debug("delete request for RID %d", rid)
self._queue.put_nowait(rid)
@asyncio.coroutine
def join(self):
yield from self._queue.join()
@asyncio.coroutine
def _delete(self, rid):
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
logger.debug("deleting RID %d...", rid)
yield from pipeline.pool.delete(rid)
logger.debug("deletion of RID %d completed", rid)
break
@asyncio.coroutine
def _gc_pipelines(self):
pipeline_names = list(self._pipelines.keys())
for name in pipeline_names:
if not self._pipelines[name].pool.runs:
logger.debug("garbage-collecting pipeline '%s'...", name)
yield from self._pipelines[name].stop()
del self._pipelines[name]
logger.debug("garbage-collection of pipeline '%s' completed",
name)
@asyncio.coroutine
def _do(self):
while True:
rid = yield from self._queue.get()
yield from self._delete(rid)
yield from self._gc_pipelines()
self._queue.task_done()
class Scheduler:
def __init__(self, next_rid, worker_handlers):
self.notifier = Notifier(dict())
self._pipelines = dict()
self._worker_handlers = worker_handlers
self._terminated = False
self._ridc = RIDCounter(next_rid)
self._deleter = Deleter(self._pipelines)
def start(self):
self._deleter.start()
@asyncio.coroutine
def stop(self):
# NB: restart of a stopped scheduler is not supported
self._terminated = True # prevent further runs from being created
for pipeline in self._pipelines.values():
for rid in pipeline.pool.runs.keys():
self._deleter.delete(rid)
yield from self._deleter.join()
yield from self._deleter.stop()
if self._pipelines:
logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, due_date):
if self._terminated:
return
try:
pipeline = self._pipelines[pipeline_name]
except KeyError:
logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
return pipeline.pool.submit(expid, due_date, pipeline_name)
def delete(self, rid):
self._deleter.delete(rid)