From c3953d85d5c81a7847a77b096254fb9e1a99f8c6 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 10 Dec 2014 19:11:13 +0800 Subject: [PATCH] master/client: periodic schedule support --- artiq/management/scheduler.py | 62 ++++++++++++++++++++++++++++++----- frontend/artiq_client.py | 24 +++++++++++--- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index d97a42efd..81ec8fb12 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -7,16 +7,23 @@ from artiq.management.worker import Worker class Scheduler: def __init__(self, *args, **kwargs): self.worker = Worker(*args, **kwargs) - self.currently_executing = None self.next_rid = 0 + self.currently_executing = None self.queued = [] self.queue_count = asyncio.Semaphore(0) + self.periodic = dict() + self.periodic_modified = asyncio.Event() def new_rid(self): r = self.next_rid self.next_rid += 1 return r + def new_prid(self): + prids = set(range(len(self.periodic) + 1)) + prids -= set(self.periodic.keys()) + return next(iter(prids)) + @asyncio.coroutine def start(self): self.task = asyncio.Task(self._schedule()) @@ -46,13 +53,16 @@ class Scheduler: else: rid, run_params, timeout, t = self.currently_executing ce = rid, run_params, timeout, time() - t - return ce, self.queued + return ce, self.queued, self.periodic def run_periodic(self, run_params, timeout, period): - raise NotImplementedError + prid = self.new_prid() + self.periodic[prid] = 0, run_params, timeout, period + self.periodic_modified.set() + return prid def cancel_periodic(self, prid): - raise NotImplementedError + del self.periodic[prid] @asyncio.coroutine def _run(self, rid, run_params, timeout): @@ -62,9 +72,45 @@ class Scheduler: return result @asyncio.coroutine - def _schedule(self): + def _run_periodic(self): while True: - yield from self.queue_count.acquire() - rid, run_params, timeout = self.queued.pop(0) + min_next_run = None + min_prid = None + for prid, params in self.periodic.items(): + if min_next_run is None or params[0] < min_next_run: + min_next_run = params[0] + min_prid = prid + + now = time() + + if min_next_run is None: + return None + min_next_run -= now + if min_next_run > 0: + return min_next_run + + next_run, run_params, timeout, period = self.periodic[min_prid] + self.periodic[min_prid] = now + period, run_params, timeout, period + + rid = self.new_rid() result = yield from self._run(rid, run_params, timeout) - print(rid, result) + print(prid, rid, result) + + @asyncio.coroutine + def _schedule(self): + next_periodic = yield from self._run_periodic() + while True: + ev_queue = asyncio.Task(self.queue_count.acquire()) + ev_periodic = asyncio.Task(self.periodic_modified.wait()) + done, pend = yield from asyncio.wait( + [ev_queue, ev_periodic], + timeout=next_periodic, + return_when=asyncio.FIRST_COMPLETED) + for t in pend: + t.cancel() + + next_periodic = yield from self._run_periodic() + if ev_queue in done: + rid, run_params, timeout = self.queued.pop(0) + result = yield from self._run(rid, run_params, timeout) + print(rid, result) diff --git a/frontend/artiq_client.py b/frontend/artiq_client.py index 6d81c2ca6..176c96590 100755 --- a/frontend/artiq_client.py +++ b/frontend/artiq_client.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import argparse +import time from prettytable import PrettyTable @@ -70,10 +71,8 @@ def _action_cancel(remote, args): def _action_show(remote, args): - ce, queue = remote.get_schedule() - if ce is None and not queue: - print("Queue is empty") - else: + ce, queue, periodic = remote.get_schedule() + if ce is not None or queue: table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"]) if ce is not None: rid, run_params, timeout, t = ce @@ -89,6 +88,23 @@ def _action_show(remote, args): table.add_row(row) print("Run queue:") print(table) + else: + print("Queue is empty") + if periodic: + table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function", + "Timeout", "Period"]) + print("Periodic schedule:") + sp = sorted(periodic.items(), key=lambda x: x[1][0]) + for prid, (next_run, run_params, timeout, period) in sp: + row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), + prid, run_params["file"]] + for x in run_params["unit"], run_params["function"], timeout: + row.append("-" if x is None else x) + row.append(period) + table.add_row(row) + print(table) + else: + print("No periodic schedule") def main():