From 1fdad21f081705ea07cacf938f6ced79d3e76b55 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 28 Dec 2014 18:56:26 +0800 Subject: [PATCH] master/client: queue pubsub --- artiq/management/pc_rpc.py | 2 +- artiq/management/scheduler.py | 28 ++---- artiq/management/sync_struct.py | 105 ++++++++++++---------- artiq/management/{network.py => tools.py} | 5 ++ frontend/artiq_client.py | 63 ++++++++----- frontend/artiq_master.py | 23 +++-- 6 files changed, 134 insertions(+), 92 deletions(-) rename artiq/management/{network.py => tools.py} (95%) diff --git a/artiq/management/pc_rpc.py b/artiq/management/pc_rpc.py index 5ca015b76..27527c83a 100644 --- a/artiq/management/pc_rpc.py +++ b/artiq/management/pc_rpc.py @@ -17,7 +17,7 @@ import asyncio import traceback from artiq.management import pyon -from artiq.management.network import AsyncioServer +from artiq.management.tools import AsyncioServer class RemoteError(Exception): diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index 25f28dcee..292b406ac 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -1,6 +1,7 @@ import asyncio from time import time +from artiq.management.sync_struct import Notifier from artiq.management.worker import Worker @@ -8,8 +9,7 @@ class Scheduler: def __init__(self, *args, **kwargs): self.worker = Worker(*args, **kwargs) self.next_rid = 0 - self.currently_executing = None - self.queued = [] + self.queued = Notifier([]) self.queue_count = asyncio.Semaphore(0) self.periodic = dict() self.periodic_modified = asyncio.Event() @@ -47,14 +47,6 @@ class Scheduler: if qrid == rid) del self.queued[idx] - def get_schedule(self): - if self.currently_executing is None: - ce = None - else: - rid, run_params, timeout, t = self.currently_executing - ce = rid, run_params, timeout, time() - t - return ce, self.queued, self.periodic - def run_periodic(self, run_params, timeout, period): prid = self.new_prid() self.periodic[prid] = 0, run_params, timeout, period @@ -64,13 +56,6 @@ class Scheduler: def cancel_periodic(self, prid): del self.periodic[prid] - @asyncio.coroutine - def _run(self, rid, run_params, timeout): - self.currently_executing = rid, run_params, timeout, time() - result = yield from self.worker.run(run_params, timeout) - self.currently_executing = None - return result - @asyncio.coroutine def _run_periodic(self): while True: @@ -93,8 +78,10 @@ class Scheduler: self.periodic[min_prid] = now + period, run_params, timeout, period rid = self.new_rid() - result = yield from self._run(rid, run_params, timeout) + self.queued.insert(0, (rid, run_params, timeout)) + result = yield from self.worker.run(run_params, timeout) print(prid, rid, result) + del self.queued[0] @asyncio.coroutine def _schedule(self): @@ -111,6 +98,7 @@ class Scheduler: yield from self._run_periodic() if ev_queue in done: - rid, run_params, timeout = self.queued.pop(0) - result = yield from self._run(rid, run_params, timeout) + rid, run_params, timeout = self.queued.backing_struct[0] + result = yield from self.worker.run(run_params, timeout) print(rid, result) + del self.queued[0] diff --git a/artiq/management/sync_struct.py b/artiq/management/sync_struct.py index 14255fa88..83b7e8541 100644 --- a/artiq/management/sync_struct.py +++ b/artiq/management/sync_struct.py @@ -1,16 +1,15 @@ import asyncio from artiq.management import pyon -from artiq.management.network import AsyncioServer +from artiq.management.tools import AsyncioServer _init_string = b"ARTIQ sync_struct\n" class Subscriber: - def __init__(self, target_builder, error_cb, notify_cb=None): + def __init__(self, target_builder, notify_cb=None): self.target_builder = target_builder - self.error_cb = error_cb self.notify_cb = notify_cb @asyncio.coroutine @@ -19,7 +18,7 @@ class Subscriber: yield from asyncio.open_connection(host, port) try: self._writer.write(_init_string) - self._receive_task = asyncio.Task(self._receive_cr()) + self.receive_task = asyncio.Task(self._receive_cr()) except: self._writer.close() del self._reader @@ -29,9 +28,9 @@ class Subscriber: @asyncio.coroutine def close(self): try: - self._receive_task.cancel() + self.receive_task.cancel() try: - yield from asyncio.wait_for(self._receive_task, None) + yield from asyncio.wait_for(self.receive_task, None) except asyncio.CancelledError: pass finally: @@ -41,34 +40,66 @@ class Subscriber: @asyncio.coroutine def _receive_cr(self): - try: - target = None - while True: - line = yield from self._reader.readline() - obj = pyon.decode(line.decode()) - action = obj["action"] + target = None + while True: + line = yield from self._reader.readline() + if not line: + return + obj = pyon.decode(line.decode()) + action = obj["action"] - if action == "init": - target = self.target_builder(obj["struct"]) - elif action == "append": - target.append(obj["x"]) - elif action == "pop": - target.pop(obj["i"]) - elif action == "delitem": - target.__delitem__(obj["key"]) - if self.notify_cb is not None: - self.notify_cb() - except: - self.error_cb() - raise + if action == "init": + target = self.target_builder(obj["struct"]) + elif action == "append": + target.append(obj["x"]) + elif action == "insert": + target.insert(obj["i"], obj["x"]) + elif action == "pop": + target.pop(obj["i"]) + elif action == "delitem": + target.__delitem__(obj["key"]) + if self.notify_cb is not None: + self.notify_cb() + + +class Notifier: + def __init__(self, backing_struct): + self.backing_struct = backing_struct + self.publisher = None + + # Backing struct modification methods. + # All modifications must go through them! + + def append(self, x): + self.backing_struct.append(x) + if self.publisher is not None: + self.publisher.publish({"action": "append", "x": x}) + + def insert(self, i, x): + self.backing_struct.insert(i, x) + if self.publisher is not None: + self.publisher.publish({"action": "insert", "i": i, "x": x}) + + def pop(self, i=-1): + r = self.backing_struct.pop(i) + if self.publisher is not None: + self.publisher.publish({"action": "pop", "i": i}) + return r + + def __delitem__(self, key): + self.backing_struct.__delitem__(key) + if self.publisher is not None: + self.publisher.publish({"action": "delitem", "key": key}) class Publisher(AsyncioServer): - def __init__(self, backing_struct): + def __init__(self, notifier): AsyncioServer.__init__(self) - self.backing_struct = backing_struct + self.notifier = notifier self._recipients = set() + self.notifier.publisher = self + @asyncio.coroutine def _handle_connection_cr(self, reader, writer): try: @@ -76,7 +107,7 @@ class Publisher(AsyncioServer): if line != _init_string: return - obj = {"action": "init", "struct": self.backing_struct} + obj = {"action": "init", "struct": self.notifier.backing_struct} line = pyon.encode(obj) + "\n" writer.write(line.encode()) @@ -96,24 +127,8 @@ class Publisher(AsyncioServer): finally: writer.close() - def _publish(self, obj): + def publish(self, obj): line = pyon.encode(obj) + "\n" line = line.encode() for recipient in self._recipients: recipient.put_nowait(line) - - # Backing struct modification methods. - # All modifications must go through them! - - def append(self, x): - self.backing_struct.append(x) - self._publish({"action": "append", "x": x}) - - def pop(self, i=-1): - r = self.backing_struct.pop(i) - self._publish({"action": "pop", "i": i}) - return r - - def __delitem__(self, key): - self.backing_struct.__delitem__(key) - self._publish({"action": "delitem", "key": key}) diff --git a/artiq/management/network.py b/artiq/management/tools.py similarity index 95% rename from artiq/management/network.py rename to artiq/management/tools.py index 451bb272f..470ec7180 100644 --- a/artiq/management/network.py +++ b/artiq/management/tools.py @@ -1,7 +1,12 @@ import asyncio +import sys from copy import copy +def clear_screen(): + sys.stdout.write("\x1b[2J\x1b[H") + + class AsyncioServer: """Generic TCP server based on asyncio. diff --git a/frontend/artiq_client.py b/frontend/artiq_client.py index 176c96590..961e86d54 100755 --- a/frontend/artiq_client.py +++ b/frontend/artiq_client.py @@ -2,10 +2,14 @@ import argparse import time +import sys +import asyncio from prettytable import PrettyTable from artiq.management.pc_rpc import Client +from artiq.management.sync_struct import Subscriber +from artiq.management.tools import clear_screen def _get_args(): @@ -14,7 +18,7 @@ def _get_args(): "-s", "--server", default="::1", help="hostname or IP of the master to connect to") parser.add_argument( - "--port", default=8888, type=int, + "--port", default=None, type=int, help="TCP port to use to connect to the master") subparsers = parser.add_subparsers(dest="action") @@ -42,8 +46,8 @@ def _get_args(): parser_cancel.add_argument("rid", type=int, help="run identifier (RID/PRID)") - parser_show = subparsers.add_parser("show", - help="show the experiment schedule") + parser_show = subparsers.add_parser("show-queue", + help="show the experiment queue") return parser.parse_args() @@ -70,30 +74,25 @@ def _action_cancel(remote, args): remote.cancel_once(args.rid) -def _action_show(remote, args): - ce, queue, periodic = remote.get_schedule() - if ce is not None or queue: +def _show_queue(queue): + clear_screen() + if queue: table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"]) - if ce is not None: - rid, run_params, timeout, t = ce - print("Currently executing RID {} for {:.1f}s".format(rid, t)) - row = [rid, run_params["file"]] - for x in run_params["unit"], run_params["function"], timeout: - row.append("-" if x is None else x) - table.add_row(row) for rid, run_params, timeout in queue: row = [rid, run_params["file"]] for x in run_params["unit"], run_params["function"], timeout: row.append("-" if x is None else x) table.add_row(row) - print("Run queue:") print(table) else: print("Queue is empty") + + +def _show_periodic(periodic): + clear_screen() if periodic: table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function", "Timeout", "Period"]) - print("Periodic schedule:") sp = sorted(periodic.items(), key=lambda x: x[1][0]) for prid, (next_run, run_params, timeout, period) in sp: row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), @@ -107,13 +106,37 @@ def _action_show(remote, args): print("No periodic schedule") +def _run_subscriber(host, port, subscriber): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(subscriber.connect(host, port)) + try: + loop.run_until_complete(asyncio.wait_for(subscriber.receive_task, + None)) + print("Connection to master lost") + finally: + loop.run_until_complete(subscriber.close()) + finally: + loop.close() + + def main(): args = _get_args() - remote = Client(args.server, args.port, "master") - try: - globals()["_action_" + args.action](remote, args) - finally: - remote.close_rpc() + if args.action == "show-queue": + queue = [] + def init_queue(x): + queue[:] = x + return queue + subscriber = Subscriber(init_queue, lambda: _show_queue(queue)) + port = 8887 if args.port is None else args.port + _run_subscriber(args.server, port, subscriber) + else: + port = 8888 if args.port is None else args.port + remote = Client(args.server, port, "schedule_control") + try: + globals()["_action_" + args.action](remote, args) + finally: + remote.close_rpc() if __name__ == "__main__": main() diff --git a/frontend/artiq_master.py b/frontend/artiq_master.py index 615c1b052..7a1e6f005 100755 --- a/frontend/artiq_master.py +++ b/frontend/artiq_master.py @@ -4,6 +4,7 @@ import asyncio import argparse from artiq.management.pc_rpc import Server +from artiq.management.sync_struct import Publisher from artiq.management.scheduler import Scheduler @@ -13,8 +14,11 @@ def _get_args(): "--bind", default="::1", help="hostname or IP address to bind to") parser.add_argument( - "--port", default=8888, type=int, - help="TCP port to listen to") + "--port-schedule-control", default=8888, type=int, + help="TCP port to listen to for schedule control") + parser.add_argument( + "--port-schedule-notify", default=8887, type=int, + help="TCP port to listen to for schedule notifications") return parser.parse_args() @@ -25,12 +29,19 @@ def main(): scheduler = Scheduler("ddb.pyon", "pdb.pyon") loop.run_until_complete(scheduler.start()) try: - server = Server(scheduler, "master") - loop.run_until_complete(server.start(args.bind, args.port)) + schedule_control = Server(scheduler, "schedule_control") + loop.run_until_complete(schedule_control.start( + args.bind, args.port_schedule_control)) try: - loop.run_forever() + schedule_notify = Publisher(scheduler.queued) + loop.run_until_complete(schedule_notify.start( + args.bind, args.port_schedule_notify)) + try: + loop.run_forever() + finally: + loop.run_until_complete(schedule_notify.stop()) finally: - loop.run_until_complete(server.stop()) + loop.run_until_complete(schedule_control.stop()) finally: loop.run_until_complete(scheduler.stop()) finally: