From 347410afa287daefd32b88259309a172b5d69aeb Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 10 Dec 2014 13:04:18 +0800 Subject: [PATCH] master/client: queue display and cancellations --- artiq/management/scheduler.py | 44 +++++++++++++++++++-- frontend/artiq_client.py | 72 +++++++++++++++++++++++++++++------ setup.py | 3 +- 3 files changed, 102 insertions(+), 17 deletions(-) diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index c415ecc0e..d97a42efd 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -1,4 +1,5 @@ import asyncio +from time import time from artiq.management.worker import Worker @@ -6,9 +7,16 @@ from artiq.management.worker import Worker class Scheduler: def __init__(self, *args, **kwargs): self.worker = Worker(*args, **kwargs) + self.currently_executing = None + self.next_rid = 0 self.queued = [] self.queue_count = asyncio.Semaphore(0) + def new_rid(self): + r = self.next_rid + self.next_rid += 1 + return r + @asyncio.coroutine def start(self): self.task = asyncio.Task(self._schedule()) @@ -22,13 +30,41 @@ class Scheduler: yield from self.worker.end_process() def run_once(self, run_params, timeout): - self.queued.append((run_params, timeout)) + rid = self.new_rid() + self.queued.append((rid, run_params, timeout)) self.queue_count.release() + return rid + + def cancel_once(self, rid): + idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued) + if qrid == rid) + del self.queued[idx] + + def get_schedule(self): + if self.currently_executing is None: + ce = None + else: + rid, run_params, timeout, t = self.currently_executing + ce = rid, run_params, timeout, time() - t + return ce, self.queued + + def run_periodic(self, run_params, timeout, period): + raise NotImplementedError + + def cancel_periodic(self, prid): + raise NotImplementedError + + @asyncio.coroutine + def _run(self, rid, run_params, timeout): + self.currently_executing = rid, run_params, timeout, time() + result = yield from self.worker.run(run_params, timeout) + self.currently_executing = None + return result @asyncio.coroutine def _schedule(self): while True: yield from self.queue_count.acquire() - run_params, timeout = self.queued.pop(0) - result = yield from self.worker.run(run_params, timeout) - print(result) + rid, run_params, timeout = self.queued.pop(0) + result = yield from self._run(rid, run_params, timeout) + print(rid, result) diff --git a/frontend/artiq_client.py b/frontend/artiq_client.py index a75f1f260..6d81c2ca6 100755 --- a/frontend/artiq_client.py +++ b/frontend/artiq_client.py @@ -2,6 +2,8 @@ import argparse +from prettytable import PrettyTable + from artiq.management.pc_rpc import Client @@ -15,8 +17,9 @@ def _get_args(): help="TCP port to use to connect to the master") subparsers = parser.add_subparsers(dest="action") + subparsers.required = True - parser_add = subparsers.add_parser("add", help="add an experiment") + parser_add = subparsers.add_parser("submit", help="submit an experiment") parser_add.add_argument( "-p", "--periodic", default=None, type=float, help="run the experiment periodically every given number of seconds") @@ -29,23 +32,70 @@ def _get_args(): help="unit to run") parser_add.add_argument("file", help="file containing the unit to run") + + parser_cancel = subparsers.add_parser("cancel", + help="cancel an experiment") + parser_cancel.add_argument("-p", "--periodic", default=False, + action="store_true", + help="cancel a periodic experiment") + parser_cancel.add_argument("rid", type=int, + help="run identifier (RID/PRID)") + + parser_show = subparsers.add_parser("show", + help="show the experiment schedule") + return parser.parse_args() +def _action_submit(remote, args): + run_params = { + "file": args.file, + "unit": args.unit, + "function": args.function + } + if args.periodic is None: + rid = remote.run_once(run_params, args.timeout) + print("RID: {}".format(rid)) + else: + prid = remote.run_periodic(run_params, args.timeout, + args.periodic) + print("PRID: {}".format(prid)) + + +def _action_cancel(remote, args): + if args.periodic: + remote.cancel_periodic(args.rid) + else: + remote.cancel_once(args.rid) + + +def _action_show(remote, args): + ce, queue = remote.get_schedule() + if ce is None and not queue: + print("Queue is empty") + else: + table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"]) + if ce is not None: + rid, run_params, timeout, t = ce + print("Currently executing RID {} for {:.1f}s".format(rid, t)) + row = [rid, run_params["file"]] + for x in run_params["unit"], run_params["function"], timeout: + row.append("-" if x is None else x) + table.add_row(row) + for rid, run_params, timeout in queue: + row = [rid, run_params["file"]] + for x in run_params["unit"], run_params["function"], timeout: + row.append("-" if x is None else x) + table.add_row(row) + print("Run queue:") + print(table) + + def main(): args = _get_args() remote = Client(args.server, args.port, "master") try: - if args.action == "add": - if args.periodic is None: - remote.run_once( - { - "file": args.file, - "unit": args.unit, - "function": args.function - }, args.timeout) - else: - raise NotImplementedError + globals()["_action_" + args.action](remote, args) finally: remote.close_rpc() diff --git a/setup.py b/setup.py index 94d9f4cf4..16a252225 100755 --- a/setup.py +++ b/setup.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf8 -*- from setuptools import setup, find_packages from glob import glob @@ -16,7 +15,7 @@ setup( long_description = open("README.rst").read(), license = "BSD", install_requires = [ - "sphinx", "numpy", "scipy" + "sphinx", "numpy", "scipy", "prettytable" ], extras_require = {}, dependency_links = [],