diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index fefa8539f..0e1b0f1b6 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -4,8 +4,7 @@ from enum import Enum from time import time from artiq.master.worker import Worker -from artiq.tools import (asyncio_wait_or_cancel, asyncio_queue_peek, - TaskObject, WaitSet) +from artiq.tools import asyncio_wait_or_cancel, TaskObject, Condition from artiq.protocols.sync_struct import Notifier @@ -20,7 +19,7 @@ class RunStatus(Enum): running = 4 run_done = 5 analyzing = 6 - analyze_done = 7 + deleting = 7 paused = 8 @@ -48,7 +47,7 @@ def _mk_worker_method(name): class Run: def __init__(self, rid, pipeline_name, wd, expid, priority, due_date, flush, - worker_handlers, notifier, **kwargs): + pool, **kwargs): # called through pool self.rid = rid self.pipeline_name = pipeline_name @@ -58,7 +57,7 @@ class Run: self.due_date = due_date self.flush = flush - self.worker = Worker(worker_handlers) + self.worker = Worker(pool.worker_handlers) self._status = RunStatus.pending @@ -71,8 +70,9 @@ class Run: "status": self._status.name } notification.update(kwargs) - self._notifier = notifier + self._notifier = pool.notifier self._notifier[self.rid] = notification + self._state_changed = pool.state_changed @property def status(self): @@ -83,6 +83,7 @@ class Run: self._status = value if not self.worker.closed.is_set(): self._notifier[self.rid]["status"] = self._status.name + self._state_changed.notify() # The run with the largest priority_key is to be scheduled first def priority_key(self, now=None): @@ -130,28 +131,27 @@ class RIDCounter: class RunPool: def __init__(self, ridc, worker_handlers, notifier, repo_backend): self.runs = dict() - self.submitted_cb = None + self.state_changed = Condition() - self._ridc = ridc - self._worker_handlers = worker_handlers - self._notifier = notifier - self._repo_backend = repo_backend + self.ridc = ridc + self.worker_handlers = worker_handlers + self.notifier = notifier + self.repo_backend = repo_backend def submit(self, expid, priority, due_date, flush, pipeline_name): - # mutates expid to insert head repository revision if None - # called through scheduler - rid = self._ridc.get() + # mutates expid to insert head repository revision if None. + # called through scheduler. + rid = self.ridc.get() if "repo_rev" in expid: if expid["repo_rev"] is None: - expid["repo_rev"] = self._repo_backend.get_head_rev() - wd, repo_msg = self._repo_backend.request_rev(expid["repo_rev"]) + expid["repo_rev"] = self.repo_backend.get_head_rev() + wd, repo_msg = self.repo_backend.request_rev(expid["repo_rev"]) else: wd, repo_msg = None, None run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, - self._worker_handlers, self._notifier, repo_msg=repo_msg) + self, repo_msg=repo_msg) self.runs[rid] = run - if self.submitted_cb is not None: - self.submitted_cb() + self.state_changed.notify() return rid @asyncio.coroutine @@ -162,47 +162,72 @@ class RunPool: run = self.runs[rid] yield from run.close() if "repo_rev" in run.expid: - self._repo_backend.release_rev(run.expid["repo_rev"]) + self.repo_backend.release_rev(run.expid["repo_rev"]) del self.runs[rid] class PrepareStage(TaskObject): - def __init__(self, flush_tracker, delete_cb, pool, outq): - self.flush_tracker = flush_tracker - self.delete_cb = delete_cb + def __init__(self, pool, delete_cb): self.pool = pool - self.outq = outq + self.delete_cb = delete_cb - self.pool_submitted = asyncio.Event() - self.pool.submitted_cb = lambda: self.pool_submitted.set() + def _get_run(self): + """If a run should get prepared now, return it. + Otherwise, return a float representing the time before the next timed + run becomes due, or None if there is no such run.""" + now = time() + pending_runs = filter(lambda r: r.status == RunStatus.pending, + self.pool.runs.values()) + try: + candidate = max(pending_runs, key=lambda r: r.priority_key(now)) + except ValueError: + # pending_runs is an empty sequence + return None + + prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done, + self.pool.runs.values()) + try: + top_prepared_run = max(prepared_runs, + key=lambda r: r.priority_key()) + except ValueError: + # there are no existing prepared runs - go ahead with + pass + else: + # prepare (as well) only if it has higher priority than + # the highest priority prepared run + if top_prepared_run.priority_key() >= candidate.priority_key(): + return None + + if candidate.due_date is None or candidate.due_date < now: + return candidate + else: + return candidate.due_date - now @asyncio.coroutine - def _push_runs(self): - """Pushes all runs that have no due date of have a due date in the - past. - - Returns the time before the next schedulable run, or None if the - pool is empty.""" + def _do(self): while True: - now = time() - pending_runs = filter(lambda r: r.status == RunStatus.pending, - self.pool.runs.values()) - try: - run = max(pending_runs, key=lambda r: r.priority_key(now)) - except ValueError: - # pending_runs is an empty sequence - return None - if run.due_date is None or run.due_date < now: + run = self._get_run() + if run is None: + yield from self.pool.state_changed.wait() + elif isinstance(run, float): + yield from asyncio_wait_or_cancel([self.pool.state_changed.wait()], + timeout=run) + else: if run.flush: run.status = RunStatus.flushing - yield from asyncio_wait_or_cancel( - [self.flush_tracker.wait_empty(), - run.worker.closed.wait()], - return_when=asyncio.FIRST_COMPLETED) + while not all(r.status in (RunStatus.pending, + RunStatus.deleting) + or r is run + for r in self.pool.runs.values()): + ev = [self.pool.state_changed.wait(), + run.worker.closed.wait()] + yield from asyncio_wait_or_cancel( + ev, return_when=asyncio.FIRST_COMPLETED) + if run.worker.closed.is_set(): + break if run.worker.closed.is_set(): - continue + continue run.status = RunStatus.preparing - self.flush_tracker.add(run.rid) try: yield from run.build() yield from run.prepare() @@ -211,44 +236,38 @@ class PrepareStage(TaskObject): "deleting RID %d", run.rid, exc_info=True) self.delete_cb(run.rid) - run.status = RunStatus.prepare_done - yield from self.outq.put(run) - else: - return run.due_date - now - - @asyncio.coroutine - def _do(self): - while True: - next_timed_in = yield from self._push_runs() - if next_timed_in is None: - # pool is empty - wait for something to be added to it - yield from self.pool_submitted.wait() - else: - # wait for next_timed_in seconds, or until the pool is modified - yield from asyncio_wait_or_cancel([self.pool_submitted.wait()], - timeout=next_timed_in) - self.pool_submitted.clear() + else: + run.status = RunStatus.prepare_done class RunStage(TaskObject): - def __init__(self, delete_cb, inq, outq): + def __init__(self, pool, delete_cb): + self.pool = pool self.delete_cb = delete_cb - self.inq = inq - self.outq = outq + + def _get_run(self): + prepared_runs = filter(lambda r: r.status == RunStatus.prepare_done, + self.pool.runs.values()) + try: + r = max(prepared_runs, key=lambda r: r.priority_key()) + except ValueError: + # prepared_runs is an empty sequence + r = None + return r @asyncio.coroutine def _do(self): stack = [] while True: - try: - next_irun = asyncio_queue_peek(self.inq) - except asyncio.QueueEmpty: - next_irun = None + next_irun = self._get_run() if not stack or ( next_irun is not None and next_irun.priority_key() > stack[-1].priority_key()): - stack.append((yield from self.inq.get())) + while next_irun is None: + yield from self.pool.state_changed.wait() + next_irun = self._get_run() + stack.append(next_irun) run = stack.pop() try: @@ -266,21 +285,33 @@ class RunStage(TaskObject): else: if completed: run.status = RunStatus.run_done - yield from self.outq.put(run) else: run.status = RunStatus.paused stack.append(run) class AnalyzeStage(TaskObject): - def __init__(self, delete_cb, inq): + def __init__(self, pool, delete_cb): + self.pool = pool self.delete_cb = delete_cb - self.inq = inq + + def _get_run(self): + run_runs = filter(lambda r: r.status == RunStatus.run_done, + self.pool.runs.values()) + try: + r = max(run_runs, key=lambda r: r.priority_key()) + except ValueError: + # run_runs is an empty sequence + r = None + return r @asyncio.coroutine def _do(self): while True: - run = yield from self.inq.get() + run = self._get_run() + while run is None: + yield from self.pool.state_changed.wait() + run = self._get_run() run.status = RunStatus.analyzing try: yield from run.analyze() @@ -290,22 +321,16 @@ class AnalyzeStage(TaskObject): "deleting RID %d", run.rid, exc_info=True) self.delete_cb(run.rid) - run.status = RunStatus.analyze_done - self.delete_cb(run.rid) + else: + self.delete_cb(run.rid) class Pipeline: def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend): - flush_tracker = WaitSet() - def delete_cb(rid): - deleter.delete(rid) - flush_tracker.discard(rid) self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend) - self._prepare = PrepareStage(flush_tracker, delete_cb, - self.pool, asyncio.Queue(maxsize=1)) - self._run = RunStage(delete_cb, - self._prepare.outq, asyncio.Queue(maxsize=1)) - self._analyze = AnalyzeStage(delete_cb, self._run.outq) + self._prepare = PrepareStage(self.pool, deleter.delete) + self._run = RunStage(self.pool, deleter.delete) + self._analyze = AnalyzeStage(self.pool, deleter.delete) def start(self): self._prepare.start() @@ -327,6 +352,10 @@ class Deleter(TaskObject): def delete(self, rid): logger.debug("delete request for RID %d", rid) + for pipeline in self._pipelines.values(): + if rid in pipeline.pool.runs: + pipeline.pool.runs[rid].status = RunStatus.deleting + break self._queue.put_nowait(rid) @asyncio.coroutine diff --git a/artiq/test/scheduler.py b/artiq/test/scheduler.py index 580214725..a3783863e 100644 --- a/artiq/test/scheduler.py +++ b/artiq/test/scheduler.py @@ -50,7 +50,7 @@ def _get_basic_steps(rid, expid, priority=0, flush=False): "path": [rid]}, {"action": "setitem", "key": "status", "value": "analyzing", "path": [rid]}, - {"action": "setitem", "key": "status", "value": "analyze_done", + {"action": "setitem", "key": "status", "value": "deleting", "path": [rid]}, {"action": "delitem", "key": rid, "path": []} ] diff --git a/artiq/tools.py b/artiq/tools.py index de98adecd..844cbd291 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -5,6 +5,7 @@ import logging import sys import asyncio import time +import collections import os.path from artiq.language.environment import is_experiment @@ -125,14 +126,6 @@ def asyncio_wait_or_cancel(fs, **kwargs): return fs -def asyncio_queue_peek(q): - """Like q.get_nowait(), but does not remove the item from the queue.""" - if q._queue: - return q._queue[0] - else: - raise asyncio.QueueEmpty - - class TaskObject: def start(self): self.task = asyncio.async(self._do()) @@ -151,25 +144,25 @@ class TaskObject: raise NotImplementedError -class WaitSet: - def __init__(self): - self._s = set() - self._ev = asyncio.Event() - - def _update_ev(self): - if self._s: - self._ev.clear() +class Condition: + def __init__(self, *, loop=None): + if loop is not None: + self._loop = loop else: - self._ev.set() - - def add(self, e): - self._s.add(e) - self._update_ev() - - def discard(self, e): - self._s.discard(e) - self._update_ev() + self._loop = asyncio.get_event_loop() + self._waiters = collections.deque() @asyncio.coroutine - def wait_empty(self): - yield from self._ev.wait() + def wait(self): + """Wait until notified.""" + fut = asyncio.Future(loop=self._loop) + self._waiters.append(fut) + try: + yield from fut + finally: + self._waiters.remove(fut) + + def notify(self): + for fut in self._waiters: + if not fut.done(): + fut.set_result(False)