diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index acf27fe5d..d4f8064a8 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -9,9 +9,9 @@ class Scheduler: def __init__(self, *args, **kwargs): self.worker = Worker(*args, **kwargs) self.next_rid = 0 - self.queued = Notifier([]) + self.queue = Notifier([]) self.queue_count = asyncio.Semaphore(0) - self.periodic = dict() + self.periodic = Notifier(dict()) self.periodic_modified = asyncio.Event() def new_rid(self): @@ -20,8 +20,8 @@ class Scheduler: return r def new_prid(self): - prids = set(range(len(self.periodic) + 1)) - prids -= set(self.periodic.keys()) + prids = set(range(len(self.periodic.backing_struct) + 1)) + prids -= set(self.periodic.backing_struct.keys()) return next(iter(prids)) @asyncio.coroutine @@ -38,14 +38,14 @@ class Scheduler: def run_once(self, run_params, timeout): rid = self.new_rid() - self.queued.append((rid, run_params, timeout)) + self.queue.append((rid, run_params, timeout)) self.queue_count.release() return rid def cancel_once(self, rid): - idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued) + idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue) if qrid == rid) - del self.queued[idx] + del self.queue[idx] def run_periodic(self, run_params, timeout, period): prid = self.new_prid() @@ -61,7 +61,7 @@ class Scheduler: while True: min_next_run = None min_prid = None - for prid, params in self.periodic.items(): + for prid, params in self.periodic.backing_struct.items(): if min_next_run is None or params[0] < min_next_run: min_next_run = params[0] min_prid = prid @@ -74,14 +74,15 @@ class Scheduler: if min_next_run > 0: return min_next_run - next_run, run_params, timeout, period = self.periodic[min_prid] + next_run, run_params, timeout, period = \ + self.periodic.backing_struct[min_prid] self.periodic[min_prid] = now + period, run_params, timeout, period rid = self.new_rid() - self.queued.insert(0, (rid, run_params, timeout)) + self.queue.insert(0, (rid, run_params, timeout)) result = yield from self.worker.run(run_params, timeout) print(prid, rid, result) - del self.queued[0] + del self.queue[0] @asyncio.coroutine def _schedule(self): @@ -99,7 +100,7 @@ class Scheduler: yield from self._run_periodic() if ev_queue in done: - rid, run_params, timeout = self.queued.backing_struct[0] + rid, run_params, timeout = self.queue.backing_struct[0] result = yield from self.worker.run(run_params, timeout) print(rid, result) - del self.queued[0] + del self.queue[0] diff --git a/artiq/management/sync_struct.py b/artiq/management/sync_struct.py index 83b7e8541..97a83e2c1 100644 --- a/artiq/management/sync_struct.py +++ b/artiq/management/sync_struct.py @@ -8,7 +8,8 @@ _init_string = b"ARTIQ sync_struct\n" class Subscriber: - def __init__(self, target_builder, notify_cb=None): + def __init__(self, notifier_name, target_builder, notify_cb=None): + self.notifier_name = notifier_name self.target_builder = target_builder self.notify_cb = notify_cb @@ -18,6 +19,7 @@ class Subscriber: yield from asyncio.open_connection(host, port) try: self._writer.write(_init_string) + self._writer.write((self.notifier_name + "\n").encode()) self.receive_task = asyncio.Task(self._receive_cr()) except: self._writer.close() @@ -56,8 +58,11 @@ class Subscriber: target.insert(obj["i"], obj["x"]) elif action == "pop": target.pop(obj["i"]) + elif action == "setitem": + target.__setitem__(obj["key"], obj["value"]) elif action == "delitem": target.__delitem__(obj["key"]) + if self.notify_cb is not None: self.notify_cb() @@ -73,32 +78,41 @@ class Notifier: def append(self, x): self.backing_struct.append(x) if self.publisher is not None: - self.publisher.publish({"action": "append", "x": x}) + self.publisher.publish(self, {"action": "append", "x": x}) def insert(self, i, x): self.backing_struct.insert(i, x) if self.publisher is not None: - self.publisher.publish({"action": "insert", "i": i, "x": x}) + self.publisher.publish(self, {"action": "insert", "i": i, "x": x}) def pop(self, i=-1): r = self.backing_struct.pop(i) if self.publisher is not None: - self.publisher.publish({"action": "pop", "i": i}) + self.publisher.publish(self, {"action": "pop", "i": i}) return r + def __setitem__(self, key, value): + self.backing_struct.__setitem__(key, value) + if self.publisher is not None: + self.publisher.publish(self, {"action": "setitem", + "key": key, + "value": value}) + def __delitem__(self, key): self.backing_struct.__delitem__(key) if self.publisher is not None: - self.publisher.publish({"action": "delitem", "key": key}) + self.publisher.publish(self, {"action": "delitem", "key": key}) class Publisher(AsyncioServer): - def __init__(self, notifier): + def __init__(self, notifiers): AsyncioServer.__init__(self) - self.notifier = notifier - self._recipients = set() + self.notifiers = notifiers + self._recipients = {k: set() for k in notifiers.keys()} + self._notifier_names = {id(v): k for k, v in notifiers.items()} - self.notifier.publisher = self + for notifier in notifiers.values(): + notifier.publisher = self @asyncio.coroutine def _handle_connection_cr(self, reader, writer): @@ -107,12 +121,22 @@ class Publisher(AsyncioServer): if line != _init_string: return - obj = {"action": "init", "struct": self.notifier.backing_struct} + line = yield from reader.readline() + if not line: + return + notifier_name = line.decode()[:-1] + + try: + notifier = self.notifiers[notifier_name] + except KeyError: + return + + obj = {"action": "init", "struct": notifier.backing_struct} line = pyon.encode(obj) + "\n" writer.write(line.encode()) queue = asyncio.Queue() - self._recipients.add(queue) + self._recipients[notifier_name].add(queue) try: while True: line = yield from queue.get() @@ -120,15 +144,16 @@ class Publisher(AsyncioServer): # raise exception on connection error yield from writer.drain() finally: - self._recipients.remove(queue) + self._recipients[notifier_name].remove(queue) except ConnectionResetError: # subscribers disconnecting are a normal occurence pass finally: writer.close() - def publish(self, obj): + def publish(self, notifier, obj): line = pyon.encode(obj) + "\n" line = line.encode() - for recipient in self._recipients: + notifier_name = self._notifier_names[id(notifier)] + for recipient in self._recipients[notifier_name]: recipient.put_nowait(line) diff --git a/frontend/artiq_client.py b/frontend/artiq_client.py index 8a64774aa..018ad5f8b 100755 --- a/frontend/artiq_client.py +++ b/frontend/artiq_client.py @@ -37,7 +37,6 @@ def _get_args(): help="unit to run") parser_add.add_argument("file", help="file containing the unit to run") - parser_cancel = subparsers.add_parser("cancel", help="cancel an experiment") parser_cancel.add_argument("-p", "--periodic", default=False, @@ -46,8 +45,11 @@ def _get_args(): parser_cancel.add_argument("rid", type=int, help="run identifier (RID/PRID)") - parser_show = subparsers.add_parser("show-queue", - help="show the experiment queue") + parser_show_queue = subparsers.add_parser( + "show-queue", help="show the experiment queue") + + parser_show_periodic = subparsers.add_parser( + "show-periodic", help="show the periodic experiment table") return parser.parse_args() @@ -93,7 +95,7 @@ def _show_periodic(periodic): if periodic: table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function", "Timeout", "Period"]) - sp = sorted(periodic.items(), key=lambda x: x[1][0]) + sp = sorted(periodic.items(), key=lambda x: (x[1][0], x[0])) for prid, (next_run, run_params, timeout, period) in sp: row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), prid, run_params["file"]] @@ -107,6 +109,8 @@ def _show_periodic(periodic): def _run_subscriber(host, port, subscriber): + if port is None: + port = 8887 loop = asyncio.get_event_loop() try: loop.run_until_complete(subscriber.connect(host, port)) @@ -127,9 +131,18 @@ def main(): def init_queue(x): queue[:] = x return queue - subscriber = Subscriber(init_queue, lambda: _show_queue(queue)) - port = 8887 if args.port is None else args.port - _run_subscriber(args.server, port, subscriber) + subscriber = Subscriber("queue", init_queue, + lambda: _show_queue(queue)) + _run_subscriber(args.server, args.port, subscriber) + elif args.action == "show-periodic": + periodic = dict() + def init_periodic(x): + periodic.clear() + periodic.update(x) + return periodic + subscriber = Subscriber("periodic", init_periodic, + lambda: _show_periodic(periodic)) + _run_subscriber(args.server, args.port, subscriber) else: port = 8888 if args.port is None else args.port remote = Client(args.server, port, "schedule_control") diff --git a/frontend/artiq_gui.py b/frontend/artiq_gui.py index d185d91e3..5ad88a6a4 100755 --- a/frontend/artiq_gui.py +++ b/frontend/artiq_gui.py @@ -2,6 +2,7 @@ import argparse import asyncio +import time import gbulb from gi.repository import Gtk @@ -19,8 +20,8 @@ class QueueStoreSyncer: def _convert(self, x): rid, run_params, timeout = x row = [rid, run_params["file"]] - for x in run_params["unit"], run_params["function"], timeout: - row.append("-" if x is None else str(x)) + for e in run_params["unit"], run_params["function"], timeout: + row.append("-" if e is None else str(e)) return row def append(self, x): @@ -33,9 +34,57 @@ class QueueStoreSyncer: del self.queue_store[key] +class PeriodicStoreSyncer: + def __init__(self, periodic_store, init): + self.periodic_store = periodic_store + self.periodic_store.clear() + self.order = [] + for prid, x in sorted(init.items(), key=lambda e: (e[1][0], e[0])): + self.periodic_store.append(self._convert(prid, x)) + self.order.append((x[0], prid)) + + def _convert(self, prid, x): + next_run, run_params, timeout, period = x + row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), + prid, run_params["file"]] + for e in run_params["unit"], run_params["function"], timeout: + row.append("-" if e is None else str(e)) + row.append(str(period)) + return row + + def _find_index(self, prid): + for i, e in enumerate(self.periodic_store): + if e[1] == prid: + return i + raise KeyError + + def __setitem__(self, prid, x): + try: + i = self._find_index(prid) + except KeyError: + pass + else: + del self.periodic_store[i] + del self.order[i] + for i, o in enumerate(self.order): + if o > (x[0], prid): + break + self.periodic_store.insert(i, self._convert(prid, x)) + self.order.insert(i, (x[0], prid)) + + def __delitem__(self, key): + i = self._find_index(key) + del self.periodic_store[i] + del self.order[i] + + class SchedulerWindow(Gtk.Window): def __init__(self): Gtk.Window.__init__(self, title="Scheduler") + self.set_border_width(10) + + vpane = Gtk.VPaned() + self.add(vpane) self.queue_store = Gtk.ListStore(int, str, str, str, str) tree = Gtk.TreeView(self.queue_store) @@ -44,20 +93,44 @@ class SchedulerWindow(Gtk.Window): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(title, renderer, text=i) tree.append_column(column) - self.add(tree) + scroll = Gtk.ScrolledWindow() + scroll.add(tree) + vpane.add1(scroll) + + self.periodic_store = Gtk.ListStore(str, int, str, str, str, str, str) + tree = Gtk.TreeView(self.periodic_store) + for i, title in enumerate(["Next run", "PRID", "File", "Unit", + "Function", "Timeout", "Period"]): + renderer = Gtk.CellRendererText() + column = Gtk.TreeViewColumn(title, renderer, text=i) + tree.append_column(column) + scroll = Gtk.ScrolledWindow() + scroll.add(tree) + vpane.add2(scroll) @asyncio.coroutine def sub_connect(self, host, port): - self.subscriber = Subscriber(self.init_queue_store) - yield from self.subscriber.connect(host, port) + self.queue_subscriber = Subscriber("queue", self.init_queue_store) + yield from self.queue_subscriber.connect(host, port) + try: + self.periodic_subscriber = Subscriber( + "periodic", self.init_periodic_store) + yield from self.periodic_subscriber.connect(host, port) + except: + yield from self.queue_subscriber.close() + raise @asyncio.coroutine def sub_close(self): - yield from self.subscriber.close() + yield from self.periodic_subscriber.close() + yield from self.queue_subscriber.close() def init_queue_store(self, init): return QueueStoreSyncer(self.queue_store, init) + def init_periodic_store(self, init): + return PeriodicStoreSyncer(self.periodic_store, init) + def _get_args(): parser = argparse.ArgumentParser(description="ARTIQ GUI client") diff --git a/frontend/artiq_master.py b/frontend/artiq_master.py index 7a1e6f005..b5a49e637 100755 --- a/frontend/artiq_master.py +++ b/frontend/artiq_master.py @@ -33,9 +33,12 @@ def main(): loop.run_until_complete(schedule_control.start( args.bind, args.port_schedule_control)) try: - schedule_notify = Publisher(scheduler.queued) + schedule_notify = Publisher({ + "queue": scheduler.queue, + "periodic": scheduler.periodic + }) loop.run_until_complete(schedule_notify.start( - args.bind, args.port_schedule_notify)) + args.bind, args.port_schedule_notify)) try: loop.run_forever() finally: