From f9d4056ee93eb50a5f85387f540cf50ba4e1d2ba Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 5 Jan 2015 19:41:40 +0800 Subject: [PATCH] scheduler: fix cancellations --- artiq/management/scheduler.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index 46b99a1e7..9d9448783 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -10,7 +10,7 @@ class Scheduler: self.worker = Worker(worker_handlers) self.next_rid = 0 self.queue = Notifier([]) - self.queue_count = asyncio.Semaphore(0) + self.queue_modified = asyncio.Event() self.periodic = Notifier(dict()) self.periodic_modified = asyncio.Event() @@ -39,12 +39,16 @@ class Scheduler: def run_once(self, run_params, timeout): rid = self.new_rid() self.queue.append((rid, run_params, timeout)) - self.queue_count.release() + self.queue_modified.set() return rid def cancel_once(self, rid): - idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue) + idx = next(idx for idx, (qrid, _, _) + in enumerate(self.queue.backing_struct) if qrid == rid) + if idx == 0: + # Cannot cancel when already running + raise NotImplementedError del self.queue[idx] def run_periodic(self, run_params, timeout, period): @@ -97,18 +101,17 @@ class Scheduler: def _schedule(self): while True: next_periodic = yield from self._run_periodic() - ev_queue = asyncio.Task(self.queue_count.acquire()) - ev_periodic = asyncio.Task(self.periodic_modified.wait()) - done, pend = yield from asyncio.wait( - [ev_queue, ev_periodic], - timeout=next_periodic, - return_when=asyncio.FIRST_COMPLETED) - self.periodic_modified.clear() - for t in pend: - t.cancel() - - yield from self._run_periodic() - if ev_queue in done: + if self.queue.backing_struct: rid, run_params, timeout = self.queue.backing_struct[0] yield from self._run(rid, run_params, timeout) del self.queue[0] + else: + self.queue_modified.clear() + self.periodic_modified.clear() + done, pend = yield from asyncio.wait( + [ + self.queue_modified.wait(), + self.periodic_modified.wait() + ], + timeout=next_periodic, + return_when=asyncio.FIRST_COMPLETED)