2014-10-05 16:25:31 +08:00
|
|
|
import asyncio
|
2014-12-10 13:04:18 +08:00
|
|
|
from time import time
|
2014-10-05 16:25:31 +08:00
|
|
|
|
2014-12-28 18:56:26 +08:00
|
|
|
from artiq.management.sync_struct import Notifier
|
2014-10-05 16:25:31 +08:00
|
|
|
from artiq.management.worker import Worker
|
|
|
|
|
|
|
|
|
|
|
|
class Scheduler:
|
2014-12-31 17:41:22 +08:00
|
|
|
def __init__(self, worker_handlers):
|
|
|
|
self.worker = Worker(worker_handlers)
|
2014-12-10 13:04:18 +08:00
|
|
|
self.next_rid = 0
|
2014-12-29 18:44:50 +08:00
|
|
|
self.queue = Notifier([])
|
2014-12-09 16:26:50 +08:00
|
|
|
self.queue_count = asyncio.Semaphore(0)
|
2014-12-29 18:44:50 +08:00
|
|
|
self.periodic = Notifier(dict())
|
2014-12-10 19:11:13 +08:00
|
|
|
self.periodic_modified = asyncio.Event()
|
2014-10-05 16:25:31 +08:00
|
|
|
|
2014-12-10 13:04:18 +08:00
|
|
|
def new_rid(self):
|
|
|
|
r = self.next_rid
|
|
|
|
self.next_rid += 1
|
|
|
|
return r
|
|
|
|
|
2014-12-10 19:11:13 +08:00
|
|
|
def new_prid(self):
|
2014-12-29 18:44:50 +08:00
|
|
|
prids = set(range(len(self.periodic.backing_struct) + 1))
|
|
|
|
prids -= set(self.periodic.backing_struct.keys())
|
2014-12-10 19:11:13 +08:00
|
|
|
return next(iter(prids))
|
|
|
|
|
2014-10-23 18:48:03 +08:00
|
|
|
@asyncio.coroutine
|
|
|
|
def start(self):
|
|
|
|
self.task = asyncio.Task(self._schedule())
|
|
|
|
yield from self.worker.create_process()
|
2014-10-05 16:25:31 +08:00
|
|
|
|
2014-10-23 18:48:03 +08:00
|
|
|
@asyncio.coroutine
|
|
|
|
def stop(self):
|
|
|
|
self.task.cancel()
|
|
|
|
yield from asyncio.wait([self.task])
|
|
|
|
del self.task
|
|
|
|
yield from self.worker.end_process()
|
2014-10-05 16:25:31 +08:00
|
|
|
|
2014-10-23 18:48:03 +08:00
|
|
|
def run_once(self, run_params, timeout):
|
2014-12-10 13:04:18 +08:00
|
|
|
rid = self.new_rid()
|
2014-12-29 18:44:50 +08:00
|
|
|
self.queue.append((rid, run_params, timeout))
|
2014-12-09 16:26:50 +08:00
|
|
|
self.queue_count.release()
|
2014-12-10 13:04:18 +08:00
|
|
|
return rid
|
|
|
|
|
|
|
|
def cancel_once(self, rid):
|
2014-12-29 18:44:50 +08:00
|
|
|
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue)
|
2014-12-10 13:04:18 +08:00
|
|
|
if qrid == rid)
|
2014-12-29 18:44:50 +08:00
|
|
|
del self.queue[idx]
|
2014-12-10 13:04:18 +08:00
|
|
|
|
|
|
|
def run_periodic(self, run_params, timeout, period):
|
2014-12-10 19:11:13 +08:00
|
|
|
prid = self.new_prid()
|
|
|
|
self.periodic[prid] = 0, run_params, timeout, period
|
|
|
|
self.periodic_modified.set()
|
|
|
|
return prid
|
2014-12-10 13:04:18 +08:00
|
|
|
|
|
|
|
def cancel_periodic(self, prid):
|
2014-12-10 19:11:13 +08:00
|
|
|
del self.periodic[prid]
|
2014-12-10 13:04:18 +08:00
|
|
|
|
2014-12-31 17:41:22 +08:00
|
|
|
@asyncio.coroutine
|
|
|
|
def _run(self, rid, run_params, timeout):
|
|
|
|
try:
|
|
|
|
yield from self.worker.run(run_params, timeout)
|
|
|
|
except Exception as e:
|
|
|
|
print("RID {} failed:".format(rid))
|
|
|
|
print(e)
|
|
|
|
else:
|
|
|
|
print("RID {} completed successfully".format(rid))
|
|
|
|
|
2014-10-23 18:48:03 +08:00
|
|
|
@asyncio.coroutine
|
2014-12-10 19:11:13 +08:00
|
|
|
def _run_periodic(self):
|
2014-10-05 16:25:31 +08:00
|
|
|
while True:
|
2014-12-10 19:11:13 +08:00
|
|
|
min_next_run = None
|
|
|
|
min_prid = None
|
2014-12-29 18:44:50 +08:00
|
|
|
for prid, params in self.periodic.backing_struct.items():
|
2014-12-10 19:11:13 +08:00
|
|
|
if min_next_run is None or params[0] < min_next_run:
|
|
|
|
min_next_run = params[0]
|
|
|
|
min_prid = prid
|
|
|
|
|
|
|
|
now = time()
|
|
|
|
|
|
|
|
if min_next_run is None:
|
|
|
|
return None
|
|
|
|
min_next_run -= now
|
|
|
|
if min_next_run > 0:
|
|
|
|
return min_next_run
|
|
|
|
|
2014-12-29 18:44:50 +08:00
|
|
|
next_run, run_params, timeout, period = \
|
|
|
|
self.periodic.backing_struct[min_prid]
|
2014-12-10 19:11:13 +08:00
|
|
|
self.periodic[min_prid] = now + period, run_params, timeout, period
|
|
|
|
|
|
|
|
rid = self.new_rid()
|
2014-12-29 18:44:50 +08:00
|
|
|
self.queue.insert(0, (rid, run_params, timeout))
|
2014-12-31 17:41:22 +08:00
|
|
|
yield from self._run(rid, run_params, timeout)
|
2014-12-29 18:44:50 +08:00
|
|
|
del self.queue[0]
|
2014-12-10 19:11:13 +08:00
|
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
|
def _schedule(self):
|
|
|
|
while True:
|
2014-12-11 15:57:41 +08:00
|
|
|
next_periodic = yield from self._run_periodic()
|
2014-12-10 19:11:13 +08:00
|
|
|
ev_queue = asyncio.Task(self.queue_count.acquire())
|
|
|
|
ev_periodic = asyncio.Task(self.periodic_modified.wait())
|
|
|
|
done, pend = yield from asyncio.wait(
|
|
|
|
[ev_queue, ev_periodic],
|
|
|
|
timeout=next_periodic,
|
|
|
|
return_when=asyncio.FIRST_COMPLETED)
|
2014-12-29 13:32:11 +08:00
|
|
|
self.periodic_modified.clear()
|
2014-12-10 19:11:13 +08:00
|
|
|
for t in pend:
|
|
|
|
t.cancel()
|
|
|
|
|
2014-12-11 15:57:41 +08:00
|
|
|
yield from self._run_periodic()
|
2014-12-10 19:11:13 +08:00
|
|
|
if ev_queue in done:
|
2014-12-29 18:44:50 +08:00
|
|
|
rid, run_params, timeout = self.queue.backing_struct[0]
|
2014-12-31 17:41:22 +08:00
|
|
|
yield from self._run(rid, run_params, timeout)
|
2014-12-29 18:44:50 +08:00
|
|
|
del self.queue[0]
|