scheduler: fix cancellations

This commit is contained in:
Sebastien Bourdeauducq 2015-01-05 19:41:40 +08:00
parent f695715d56
commit f9d4056ee9
1 changed files with 18 additions and 15 deletions

View File

@ -10,7 +10,7 @@ class Scheduler:
self.worker = Worker(worker_handlers) self.worker = Worker(worker_handlers)
self.next_rid = 0 self.next_rid = 0
self.queue = Notifier([]) self.queue = Notifier([])
self.queue_count = asyncio.Semaphore(0) self.queue_modified = asyncio.Event()
self.periodic = Notifier(dict()) self.periodic = Notifier(dict())
self.periodic_modified = asyncio.Event() self.periodic_modified = asyncio.Event()
@ -39,12 +39,16 @@ class Scheduler:
def run_once(self, run_params, timeout): def run_once(self, run_params, timeout):
rid = self.new_rid() rid = self.new_rid()
self.queue.append((rid, run_params, timeout)) self.queue.append((rid, run_params, timeout))
self.queue_count.release() self.queue_modified.set()
return rid return rid
def cancel_once(self, rid): def cancel_once(self, rid):
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue) idx = next(idx for idx, (qrid, _, _)
in enumerate(self.queue.backing_struct)
if qrid == rid) if qrid == rid)
if idx == 0:
# Cannot cancel when already running
raise NotImplementedError
del self.queue[idx] del self.queue[idx]
def run_periodic(self, run_params, timeout, period): def run_periodic(self, run_params, timeout, period):
@ -97,18 +101,17 @@ class Scheduler:
def _schedule(self): def _schedule(self):
while True: while True:
next_periodic = yield from self._run_periodic() next_periodic = yield from self._run_periodic()
ev_queue = asyncio.Task(self.queue_count.acquire()) if self.queue.backing_struct:
ev_periodic = asyncio.Task(self.periodic_modified.wait())
done, pend = yield from asyncio.wait(
[ev_queue, ev_periodic],
timeout=next_periodic,
return_when=asyncio.FIRST_COMPLETED)
self.periodic_modified.clear()
for t in pend:
t.cancel()
yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queue.backing_struct[0] rid, run_params, timeout = self.queue.backing_struct[0]
yield from self._run(rid, run_params, timeout) yield from self._run(rid, run_params, timeout)
del self.queue[0] del self.queue[0]
else:
self.queue_modified.clear()
self.periodic_modified.clear()
done, pend = yield from asyncio.wait(
[
self.queue_modified.wait(),
self.periodic_modified.wait()
],
timeout=next_periodic,
return_when=asyncio.FIRST_COMPLETED)