From b0f8141018f10d7c98f580d7051eb760b240c796 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 28 May 2015 17:48:33 +0800 Subject: [PATCH] scheduler: cancel flush when run is cancelled --- artiq/master/scheduler.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 94b3a9622..30c69352b 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -26,15 +26,15 @@ class RunStatus(Enum): def _mk_worker_method(name): @asyncio.coroutine def worker_method(self, *args, **kwargs): - if self._worker.closed.is_set(): + if self.worker.closed.is_set(): return True - m = getattr(self._worker, name) + m = getattr(self.worker, name) try: return (yield from m(*args, **kwargs)) except Exception as e: if isinstance(e, asyncio.CancelledError): raise - if self._worker.closed.is_set(): + if self.worker.closed.is_set(): logger.debug("suppressing worker exception of terminated run", exc_info=True) # Return completion on termination @@ -56,8 +56,9 @@ class Run: self.due_date = due_date self.flush = flush + self.worker = Worker(worker_handlers) + self._status = RunStatus.pending - self._worker = Worker(worker_handlers) self._notifier = notifier self._notifier[self.rid] = { @@ -76,7 +77,7 @@ class Run: @status.setter def status(self, value): self._status = value - if not self._worker.closed.is_set(): + if not self.worker.closed.is_set(): self._notifier[self.rid]["status"] = self._status.name # The run with the largest priority_key is to be scheduled first @@ -92,7 +93,7 @@ class Run: @asyncio.coroutine def close(self): # called through pool - yield from self._worker.close() + yield from self.worker.close() del self._notifier[self.rid] _prepare = _mk_worker_method("prepare") @@ -190,7 +191,12 @@ class PrepareStage(TaskObject): if run.due_date is None or run.due_date < now: if run.flush: run.status = RunStatus.flushing - yield from self.flush_tracker.wait_empty() + yield from asyncio_wait_or_cancel( + [self.flush_tracker.wait_empty(), + run.worker.closed.wait()], + return_when=asyncio.FIRST_COMPLETED) + if run.worker.closed.is_set(): + continue run.status = RunStatus.preparing self.flush_tracker.add(run.rid) try: