forked from M-Labs/artiq
1
0
Fork 0

master/client: queue display and cancellations

This commit is contained in:
Sebastien Bourdeauducq 2014-12-10 13:04:18 +08:00
parent 0dc4eb02ae
commit 347410afa2
3 changed files with 102 additions and 17 deletions

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
from time import time
from artiq.management.worker import Worker from artiq.management.worker import Worker
@ -6,9 +7,16 @@ from artiq.management.worker import Worker
class Scheduler: class Scheduler:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs) self.worker = Worker(*args, **kwargs)
self.currently_executing = None
self.next_rid = 0
self.queued = [] self.queued = []
self.queue_count = asyncio.Semaphore(0) self.queue_count = asyncio.Semaphore(0)
def new_rid(self):
r = self.next_rid
self.next_rid += 1
return r
@asyncio.coroutine @asyncio.coroutine
def start(self): def start(self):
self.task = asyncio.Task(self._schedule()) self.task = asyncio.Task(self._schedule())
@ -22,13 +30,41 @@ class Scheduler:
yield from self.worker.end_process() yield from self.worker.end_process()
def run_once(self, run_params, timeout): def run_once(self, run_params, timeout):
self.queued.append((run_params, timeout)) rid = self.new_rid()
self.queued.append((rid, run_params, timeout))
self.queue_count.release() self.queue_count.release()
return rid
def cancel_once(self, rid):
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued)
if qrid == rid)
del self.queued[idx]
def get_schedule(self):
if self.currently_executing is None:
ce = None
else:
rid, run_params, timeout, t = self.currently_executing
ce = rid, run_params, timeout, time() - t
return ce, self.queued
def run_periodic(self, run_params, timeout, period):
raise NotImplementedError
def cancel_periodic(self, prid):
raise NotImplementedError
@asyncio.coroutine
def _run(self, rid, run_params, timeout):
self.currently_executing = rid, run_params, timeout, time()
result = yield from self.worker.run(run_params, timeout)
self.currently_executing = None
return result
@asyncio.coroutine @asyncio.coroutine
def _schedule(self): def _schedule(self):
while True: while True:
yield from self.queue_count.acquire() yield from self.queue_count.acquire()
run_params, timeout = self.queued.pop(0) rid, run_params, timeout = self.queued.pop(0)
result = yield from self.worker.run(run_params, timeout) result = yield from self._run(rid, run_params, timeout)
print(result) print(rid, result)

View File

@ -2,6 +2,8 @@
import argparse import argparse
from prettytable import PrettyTable
from artiq.management.pc_rpc import Client from artiq.management.pc_rpc import Client
@ -15,8 +17,9 @@ def _get_args():
help="TCP port to use to connect to the master") help="TCP port to use to connect to the master")
subparsers = parser.add_subparsers(dest="action") subparsers = parser.add_subparsers(dest="action")
subparsers.required = True
parser_add = subparsers.add_parser("add", help="add an experiment") parser_add = subparsers.add_parser("submit", help="submit an experiment")
parser_add.add_argument( parser_add.add_argument(
"-p", "--periodic", default=None, type=float, "-p", "--periodic", default=None, type=float,
help="run the experiment periodically every given number of seconds") help="run the experiment periodically every given number of seconds")
@ -29,23 +32,70 @@ def _get_args():
help="unit to run") help="unit to run")
parser_add.add_argument("file", help="file containing the unit to run") parser_add.add_argument("file", help="file containing the unit to run")
parser_cancel = subparsers.add_parser("cancel",
help="cancel an experiment")
parser_cancel.add_argument("-p", "--periodic", default=False,
action="store_true",
help="cancel a periodic experiment")
parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/PRID)")
parser_show = subparsers.add_parser("show",
help="show the experiment schedule")
return parser.parse_args() return parser.parse_args()
def _action_submit(remote, args):
run_params = {
"file": args.file,
"unit": args.unit,
"function": args.function
}
if args.periodic is None:
rid = remote.run_once(run_params, args.timeout)
print("RID: {}".format(rid))
else:
prid = remote.run_periodic(run_params, args.timeout,
args.periodic)
print("PRID: {}".format(prid))
def _action_cancel(remote, args):
if args.periodic:
remote.cancel_periodic(args.rid)
else:
remote.cancel_once(args.rid)
def _action_show(remote, args):
ce, queue = remote.get_schedule()
if ce is None and not queue:
print("Queue is empty")
else:
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
if ce is not None:
rid, run_params, timeout, t = ce
print("Currently executing RID {} for {:.1f}s".format(rid, t))
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
table.add_row(row)
for rid, run_params, timeout in queue:
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
table.add_row(row)
print("Run queue:")
print(table)
def main(): def main():
args = _get_args() args = _get_args()
remote = Client(args.server, args.port, "master") remote = Client(args.server, args.port, "master")
try: try:
if args.action == "add": globals()["_action_" + args.action](remote, args)
if args.periodic is None:
remote.run_once(
{
"file": args.file,
"unit": args.unit,
"function": args.function
}, args.timeout)
else:
raise NotImplementedError
finally: finally:
remote.close_rpc() remote.close_rpc()

View File

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf8 -*-
from setuptools import setup, find_packages from setuptools import setup, find_packages
from glob import glob from glob import glob
@ -16,7 +15,7 @@ setup(
long_description = open("README.rst").read(), long_description = open("README.rst").read(),
license = "BSD", license = "BSD",
install_requires = [ install_requires = [
"sphinx", "numpy", "scipy" "sphinx", "numpy", "scipy", "prettytable"
], ],
extras_require = {}, extras_require = {},
dependency_links = [], dependency_links = [],