forked from M-Labs/artiq
scheduler: cancel flush when run is cancelled
This commit is contained in:
parent
e752e57fa5
commit
b0f8141018
@ -26,15 +26,15 @@ class RunStatus(Enum):
|
|||||||
def _mk_worker_method(name):
|
def _mk_worker_method(name):
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def worker_method(self, *args, **kwargs):
|
def worker_method(self, *args, **kwargs):
|
||||||
if self._worker.closed.is_set():
|
if self.worker.closed.is_set():
|
||||||
return True
|
return True
|
||||||
m = getattr(self._worker, name)
|
m = getattr(self.worker, name)
|
||||||
try:
|
try:
|
||||||
return (yield from m(*args, **kwargs))
|
return (yield from m(*args, **kwargs))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if isinstance(e, asyncio.CancelledError):
|
if isinstance(e, asyncio.CancelledError):
|
||||||
raise
|
raise
|
||||||
if self._worker.closed.is_set():
|
if self.worker.closed.is_set():
|
||||||
logger.debug("suppressing worker exception of terminated run",
|
logger.debug("suppressing worker exception of terminated run",
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
# Return completion on termination
|
# Return completion on termination
|
||||||
@ -56,8 +56,9 @@ class Run:
|
|||||||
self.due_date = due_date
|
self.due_date = due_date
|
||||||
self.flush = flush
|
self.flush = flush
|
||||||
|
|
||||||
|
self.worker = Worker(worker_handlers)
|
||||||
|
|
||||||
self._status = RunStatus.pending
|
self._status = RunStatus.pending
|
||||||
self._worker = Worker(worker_handlers)
|
|
||||||
|
|
||||||
self._notifier = notifier
|
self._notifier = notifier
|
||||||
self._notifier[self.rid] = {
|
self._notifier[self.rid] = {
|
||||||
@ -76,7 +77,7 @@ class Run:
|
|||||||
@status.setter
|
@status.setter
|
||||||
def status(self, value):
|
def status(self, value):
|
||||||
self._status = value
|
self._status = value
|
||||||
if not self._worker.closed.is_set():
|
if not self.worker.closed.is_set():
|
||||||
self._notifier[self.rid]["status"] = self._status.name
|
self._notifier[self.rid]["status"] = self._status.name
|
||||||
|
|
||||||
# The run with the largest priority_key is to be scheduled first
|
# The run with the largest priority_key is to be scheduled first
|
||||||
@ -92,7 +93,7 @@ class Run:
|
|||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def close(self):
|
def close(self):
|
||||||
# called through pool
|
# called through pool
|
||||||
yield from self._worker.close()
|
yield from self.worker.close()
|
||||||
del self._notifier[self.rid]
|
del self._notifier[self.rid]
|
||||||
|
|
||||||
_prepare = _mk_worker_method("prepare")
|
_prepare = _mk_worker_method("prepare")
|
||||||
@ -190,7 +191,12 @@ class PrepareStage(TaskObject):
|
|||||||
if run.due_date is None or run.due_date < now:
|
if run.due_date is None or run.due_date < now:
|
||||||
if run.flush:
|
if run.flush:
|
||||||
run.status = RunStatus.flushing
|
run.status = RunStatus.flushing
|
||||||
yield from self.flush_tracker.wait_empty()
|
yield from asyncio_wait_or_cancel(
|
||||||
|
[self.flush_tracker.wait_empty(),
|
||||||
|
run.worker.closed.wait()],
|
||||||
|
return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
if run.worker.closed.is_set():
|
||||||
|
continue
|
||||||
run.status = RunStatus.preparing
|
run.status = RunStatus.preparing
|
||||||
self.flush_tracker.add(run.rid)
|
self.flush_tracker.add(run.rid)
|
||||||
try:
|
try:
|
||||||
|
Loading…
Reference in New Issue
Block a user