From 47aa634ab56acacd950ab7c6ee1b754bc4dc532e Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 23 Jan 2015 00:23:00 +0800 Subject: [PATCH] replace periodic schedule with timed schedule --- artiq/frontend/artiq_client.py | 55 +++++++++++++++++--------------- artiq/frontend/artiq_master.py | 2 +- artiq/gui/scheduler.py | 58 +++++++++++++++------------------- artiq/master/scheduler.py | 53 ++++++++++++++++--------------- 4 files changed, 84 insertions(+), 84 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index e7224e924..a77938ba0 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -5,6 +5,7 @@ import time import asyncio import sys from operator import itemgetter +from dateutil.parser import parse as parse_date from prettytable import PrettyTable @@ -32,8 +33,10 @@ def _get_args(): parser_add = subparsers.add_parser("submit", help="submit an experiment") parser_add.add_argument( - "-p", "--periodic", default=None, type=float, - help="run the experiment periodically every given number of seconds") + "-T", "--timed", default=None, type=str, + help="run the experiment in timed mode. " + "argument specifies the time of the first run, " + "use 'now' to run immediately") parser_add.add_argument( "-t", "--timeout", default=None, type=float, help="specify a timeout for the experiment to complete") @@ -45,11 +48,11 @@ def _get_args(): parser_cancel = subparsers.add_parser("cancel", help="cancel an experiment") - parser_cancel.add_argument("-p", "--periodic", default=False, + parser_cancel.add_argument("-T", "--timed", default=False, action="store_true", - help="cancel a periodic experiment") + help="cancel a timed experiment") parser_cancel.add_argument("rid", type=int, - help="run identifier (RID/PRID)") + help="run identifier (RID/TRID)") parser_set_device = subparsers.add_parser( "set-device", help="add or modify a device") @@ -75,7 +78,7 @@ def _get_args(): "show", help="show schedule, devices or parameters") parser_show.add_argument( "what", - help="select object to show: queue/periodic/devices/parameters") + help="select object to show: queue/timed/devices/parameters") return parser.parse_args() @@ -100,20 +103,23 @@ def _action_submit(remote, args): "unit": args.unit, "arguments": arguments } - if args.periodic is None: - rid = remote.run_once(run_params, args.timeout) + if args.timed is None: + rid = remote.run_queued(run_params, args.timeout) print("RID: {}".format(rid)) else: - prid = remote.run_periodic(run_params, args.timeout, - args.periodic) - print("PRID: {}".format(prid)) + if args.timed == "now": + next_time = None + else: + next_time = time.mktime(parse_date(args.timed).timetuple()) + trid = remote.run_timed(run_params, args.timeout, next_time) + print("TRID: {}".format(trid)) def _action_cancel(remote, args): - if args.periodic: - remote.cancel_periodic(args.rid) + if args.timed: + remote.cancel_timed(args.rid) else: - remote.cancel_once(args.rid) + remote.cancel_queued(args.rid) def _action_set_device(remote, args): @@ -147,23 +153,22 @@ def _show_queue(queue): print("Queue is empty") -def _show_periodic(periodic): +def _show_timed(timed): clear_screen() - if periodic: - table = PrettyTable(["Next run", "PRID", "File", "Unit", - "Timeout", "Period", "Arguments"]) - sp = sorted(periodic.items(), key=lambda x: (x[1][0], x[0])) - for prid, (next_run, run_params, timeout, period) in sp: + if timed: + table = PrettyTable(["Next run", "TRID", "File", "Unit", + "Timeout", "Arguments"]) + sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0])) + for trid, (next_run, run_params, timeout) in sp: row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), - prid, run_params["file"]] + trid, run_params["file"]] for x in run_params["unit"], timeout: row.append("-" if x is None else x) - row.append(period) row.append(format_run_arguments(run_params["arguments"])) table.add_row(row) print(table) else: - print("No periodic schedule") + print("No timed schedule") def _show_devices(devices): @@ -226,8 +231,8 @@ def main(): if action == "show": if args.what == "queue": _show_list(args, "queue", _show_queue) - elif args.what == "periodic": - _show_dict(args, "periodic", _show_periodic) + elif args.what == "timed": + _show_dict(args, "timed", _show_timed) elif args.what == "devices": _show_dict(args, "devices", _show_devices) elif args.what == "parameters": diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 972b237b5..044b7b6b6 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -58,7 +58,7 @@ def main(): server_notify = Publisher({ "queue": scheduler.queue, - "periodic": scheduler.periodic, + "timed": scheduler.timed, "devices": ddb.data, "parameters": pdb.data, "parameters_simplehist": simplephist.history, diff --git a/artiq/gui/scheduler.py b/artiq/gui/scheduler.py index 3c45cd846..2e08c1fcb 100644 --- a/artiq/gui/scheduler.py +++ b/artiq/gui/scheduler.py @@ -18,18 +18,17 @@ class _QueueStoreSyncer(ListSyncer): return row -class _PeriodicStoreSyncer(DictSyncer): +class _TimedStoreSyncer(DictSyncer): def order_key(self, kv_pair): - # order by next run time, and then by PRID + # order by next run time, and then by TRID return (kv_pair[1][0], kv_pair[0]) - def convert(self, prid, x): - next_run, run_params, timeout, period = x + def convert(self, trid, x): + next_run, run_params, timeout = x row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), - prid, run_params["file"]] + trid, run_params["file"]] for e in run_params["unit"], timeout: row.append("-" if e is None else str(e)) - row.append(str(period)) row.append(format_run_arguments(run_params["arguments"])) return row @@ -72,64 +71,59 @@ class SchedulerWindow(Window): button = Gtk.Button("Move down") hbox.pack_start(button, True, True, 0) button = Gtk.Button("Remove") - button.connect("clicked", self.remove_queue) + button.connect("clicked", self.remove_queued) hbox.pack_start(button, True, True, 0) vbox.pack_start(hbox, False, False, 0) vbox.set_border_width(6) notebook.insert_page(vbox, Gtk.Label("Queue"), -1) - self.periodic_store = Gtk.ListStore(str, int, str, str, str, str, str) - self.periodic_tree = Gtk.TreeView(self.periodic_store) - for i, title in enumerate(["Next run", "PRID", "File", "Unit", - "Timeout", "Period", "Arguments"]): + self.timed_store = Gtk.ListStore(str, int, str, str, str, str) + self.timed_tree = Gtk.TreeView(self.timed_store) + for i, title in enumerate(["Next run", "TRID", "File", "Unit", + "Timeout", "Arguments"]): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(title, renderer, text=i) - self.periodic_tree.append_column(column) + self.timed_tree.append_column(column) scroll = Gtk.ScrolledWindow() - scroll.add(self.periodic_tree) + scroll.add(self.timed_tree) vbox = Gtk.VBox(spacing=6) vbox.pack_start(scroll, True, True, 0) - hbox = Gtk.HBox(spacing=6) - button = Gtk.Button("Change period") - hbox.pack_start(button, True, True, 0) button = Gtk.Button("Remove") - button.connect("clicked", self.remove_periodic) - hbox.pack_start(button, True, True, 0) - vbox.pack_start(hbox, False, False, 0) + button.connect("clicked", self.remove_timed) + vbox.pack_start(button, False, False, 0) vbox.set_border_width(6) - notebook.insert_page(vbox, Gtk.Label("Periodic schedule"), -1) + notebook.insert_page(vbox, Gtk.Label("Timed schedule"), -1) - def remove_queue(self, widget): + def remove_queued(self, widget): store, selected = self.queue_tree.get_selection().get_selected() if selected is not None: rid = store[selected][0] - asyncio.Task(self.schedule_ctl.cancel_once(rid)) + asyncio.Task(self.schedule_ctl.cancel_queued(rid)) - def remove_periodic(self, widget): - store, selected = self.periodic_tree.get_selection().get_selected() + def remove_timed(self, widget): + store, selected = self.timed_tree.get_selection().get_selected() if selected is not None: - prid = store[selected][1] - asyncio.Task(self.schedule_ctl.cancel_periodic(prid)) + trid = store[selected][1] + asyncio.Task(self.schedule_ctl.cancel_timed(trid)) @asyncio.coroutine def sub_connect(self, host, port): self.queue_subscriber = Subscriber("queue", self.init_queue_store) yield from self.queue_subscriber.connect(host, port) try: - self.periodic_subscriber = Subscriber( - "periodic", self.init_periodic_store) - yield from self.periodic_subscriber.connect(host, port) + self.timed_subscriber = Subscriber("timed", self.init_timed_store) + yield from self.timed_subscriber.connect(host, port) except: yield from self.queue_subscriber.close() raise @asyncio.coroutine def sub_close(self): - yield from self.periodic_subscriber.close() + yield from self.timed_subscriber.close() yield from self.queue_subscriber.close() def init_queue_store(self, init): return _QueueStoreSyncer(self.queue_store, init) - def init_periodic_store(self, init): - return _PeriodicStoreSyncer(self.periodic_store, init) + def init_timed_store(self, init): + return _TimedStoreSyncer(self.timed_store, init) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index ecc4215eb..3b2e0ab7b 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -11,18 +11,18 @@ class Scheduler: self.next_rid = 0 self.queue = Notifier([]) self.queue_modified = asyncio.Event() - self.periodic = Notifier(dict()) - self.periodic_modified = asyncio.Event() + self.timed = Notifier(dict()) + self.timed_modified = asyncio.Event() def new_rid(self): r = self.next_rid self.next_rid += 1 return r - def new_prid(self): - prids = set(range(len(self.periodic.read) + 1)) - prids -= set(self.periodic.read.keys()) - return next(iter(prids)) + def new_trid(self): + trids = set(range(len(self.timed.read) + 1)) + trids -= set(self.timed.read.keys()) + return next(iter(trids)) @asyncio.coroutine def start(self): @@ -36,13 +36,13 @@ class Scheduler: del self.task yield from self.worker.end_process() - def run_once(self, run_params, timeout): + def run_queued(self, run_params, timeout): rid = self.new_rid() self.queue.append((rid, run_params, timeout)) self.queue_modified.set() return rid - def cancel_once(self, rid): + def cancel_queued(self, rid): idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue.read) if qrid == rid) @@ -51,14 +51,16 @@ class Scheduler: raise NotImplementedError del self.queue[idx] - def run_periodic(self, run_params, timeout, period): - prid = self.new_prid() - self.periodic[prid] = 0, run_params, timeout, period - self.periodic_modified.set() - return prid + def run_timed(self, run_params, timeout, next_run): + if next_run is None: + next_run = time() + trid = self.new_trid() + self.timed[trid] = next_run, run_params, timeout + self.timed_modified.set() + return trid - def cancel_periodic(self, prid): - del self.periodic[prid] + def cancel_timed(self, trid): + del self.timed[trid] @asyncio.coroutine def _run(self, rid, run_params, timeout): @@ -71,14 +73,14 @@ class Scheduler: print("RID {} completed successfully".format(rid)) @asyncio.coroutine - def _run_periodic(self): + def _run_timed(self): while True: min_next_run = None - min_prid = None - for prid, params in self.periodic.read.items(): + min_trid = None + for trid, params in self.timed.read.items(): if min_next_run is None or params[0] < min_next_run: min_next_run = params[0] - min_prid = prid + min_trid = trid now = time() @@ -88,9 +90,8 @@ class Scheduler: if min_next_run > 0: return min_next_run - next_run, run_params, timeout, period = \ - self.periodic.read[min_prid] - self.periodic[min_prid] = now + period, run_params, timeout, period + next_run, run_params, timeout = self.timed.read[min_trid] + del self.timed[min_trid] rid = self.new_rid() self.queue.insert(0, (rid, run_params, timeout)) @@ -100,20 +101,20 @@ class Scheduler: @asyncio.coroutine def _schedule(self): while True: - next_periodic = yield from self._run_periodic() + next_timed = yield from self._run_timed() if self.queue.read: rid, run_params, timeout = self.queue.read[0] yield from self._run(rid, run_params, timeout) del self.queue[0] else: self.queue_modified.clear() - self.periodic_modified.clear() + self.timed_modified.clear() t1 = asyncio.Task(self.queue_modified.wait()) - t2 = asyncio.Task(self.periodic_modified.wait()) + t2 = asyncio.Task(self.timed_modified.wait()) try: done, pend = yield from asyncio.wait( [t1, t2], - timeout=next_periodic, + timeout=next_timed, return_when=asyncio.FIRST_COMPLETED) except: t1.cancel()