master/client: periodic schedule support

This commit is contained in:
Sebastien Bourdeauducq 2014-12-10 19:11:13 +08:00
parent 347410afa2
commit c3953d85d5
2 changed files with 74 additions and 12 deletions

View File

@ -7,16 +7,23 @@ from artiq.management.worker import Worker
class Scheduler:
def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs)
self.currently_executing = None
self.next_rid = 0
self.currently_executing = None
self.queued = []
self.queue_count = asyncio.Semaphore(0)
self.periodic = dict()
self.periodic_modified = asyncio.Event()
def new_rid(self):
r = self.next_rid
self.next_rid += 1
return r
def new_prid(self):
prids = set(range(len(self.periodic) + 1))
prids -= set(self.periodic.keys())
return next(iter(prids))
@asyncio.coroutine
def start(self):
self.task = asyncio.Task(self._schedule())
@ -46,13 +53,16 @@ class Scheduler:
else:
rid, run_params, timeout, t = self.currently_executing
ce = rid, run_params, timeout, time() - t
return ce, self.queued
return ce, self.queued, self.periodic
def run_periodic(self, run_params, timeout, period):
raise NotImplementedError
prid = self.new_prid()
self.periodic[prid] = 0, run_params, timeout, period
self.periodic_modified.set()
return prid
def cancel_periodic(self, prid):
raise NotImplementedError
del self.periodic[prid]
@asyncio.coroutine
def _run(self, rid, run_params, timeout):
@ -62,9 +72,45 @@ class Scheduler:
return result
@asyncio.coroutine
def _schedule(self):
def _run_periodic(self):
while True:
yield from self.queue_count.acquire()
rid, run_params, timeout = self.queued.pop(0)
min_next_run = None
min_prid = None
for prid, params in self.periodic.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_prid = prid
now = time()
if min_next_run is None:
return None
min_next_run -= now
if min_next_run > 0:
return min_next_run
next_run, run_params, timeout, period = self.periodic[min_prid]
self.periodic[min_prid] = now + period, run_params, timeout, period
rid = self.new_rid()
result = yield from self._run(rid, run_params, timeout)
print(rid, result)
print(prid, rid, result)
@asyncio.coroutine
def _schedule(self):
next_periodic = yield from self._run_periodic()
while True:
ev_queue = asyncio.Task(self.queue_count.acquire())
ev_periodic = asyncio.Task(self.periodic_modified.wait())
done, pend = yield from asyncio.wait(
[ev_queue, ev_periodic],
timeout=next_periodic,
return_when=asyncio.FIRST_COMPLETED)
for t in pend:
t.cancel()
next_periodic = yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queued.pop(0)
result = yield from self._run(rid, run_params, timeout)
print(rid, result)

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import argparse
import time
from prettytable import PrettyTable
@ -70,10 +71,8 @@ def _action_cancel(remote, args):
def _action_show(remote, args):
ce, queue = remote.get_schedule()
if ce is None and not queue:
print("Queue is empty")
else:
ce, queue, periodic = remote.get_schedule()
if ce is not None or queue:
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
if ce is not None:
rid, run_params, timeout, t = ce
@ -89,6 +88,23 @@ def _action_show(remote, args):
table.add_row(row)
print("Run queue:")
print(table)
else:
print("Queue is empty")
if periodic:
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
"Timeout", "Period"])
print("Periodic schedule:")
sp = sorted(periodic.items(), key=lambda x: x[1][0])
for prid, (next_run, run_params, timeout, period) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
row.append(period)
table.add_row(row)
print(table)
else:
print("No periodic schedule")
def main():