From f033810e045529909c83867b366f1034a9c77ea3 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 27 Dec 2014 23:27:35 +0800 Subject: [PATCH] management: add sync_struct --- artiq/management/network.py | 55 +++++++++++++ artiq/management/pc_rpc.py | 43 +--------- artiq/management/sync_struct.py | 119 ++++++++++++++++++++++++++++ doc/manual/management_reference.rst | 13 +++ 4 files changed, 191 insertions(+), 39 deletions(-) create mode 100644 artiq/management/network.py create mode 100644 artiq/management/sync_struct.py diff --git a/artiq/management/network.py b/artiq/management/network.py new file mode 100644 index 000000000..451bb272f --- /dev/null +++ b/artiq/management/network.py @@ -0,0 +1,55 @@ +import asyncio +from copy import copy + + +class AsyncioServer: + """Generic TCP server based on asyncio. + + Users of this class must derive from it and define the + ``_handle_connection_cr`` method and coroutine. + + """ + def __init__(self): + self._client_tasks = set() + + @asyncio.coroutine + def start(self, host, port): + """Starts the server. + + The user must call ``stop`` to free resources properly after this + method completes successfully. + + This method is a `coroutine`. + + :param host: Bind address of the server (see ``asyncio.start_server`` + from the Python standard library). + :param port: TCP port to bind to. + + """ + self.server = yield from asyncio.start_server(self._handle_connection, + host, port) + + @asyncio.coroutine + def stop(self): + """Stops the server. + + """ + wait_for = copy(self._client_tasks) + for task in self._client_tasks: + task.cancel() + for task in wait_for: + try: + yield from asyncio.wait_for(task, None) + except asyncio.CancelledError: + pass + self.server.close() + yield from self.server.wait_closed() + del self.server + + def _client_done(self, task): + self._client_tasks.remove(task) + + def _handle_connection(self, reader, writer): + task = asyncio.Task(self._handle_connection_cr(reader, writer)) + self._client_tasks.add(task) + task.add_done_callback(self._client_done) diff --git a/artiq/management/pc_rpc.py b/artiq/management/pc_rpc.py index 286baad33..5ca015b76 100644 --- a/artiq/management/pc_rpc.py +++ b/artiq/management/pc_rpc.py @@ -17,6 +17,7 @@ import asyncio import traceback from artiq.management import pyon +from artiq.management.network import AsyncioServer class RemoteError(Exception): @@ -123,7 +124,7 @@ class Client: return proxy -class Server: +class Server(AsyncioServer): """This class creates a TCP server that handles requests coming from ``Client`` objects. @@ -142,49 +143,13 @@ class Server: """ def __init__(self, target, id_type, id_parameters=None): + AsyncioServer.__init__(self) self.target = target self.id_type = id_type self.id_parameters = id_parameters - self._client_tasks = set() @asyncio.coroutine - def start(self, host, port): - """Starts the server. - - The user must call ``stop`` to free resources properly after this - method completes successfully. - - This method is a `coroutine`. - - :param host: Bind address of the server (see ``asyncio.start_server`` - from the Python standard library). - :param port: TCP port to bind to. - - """ - self.server = yield from asyncio.start_server(self._handle_connection, - host, port) - - @asyncio.coroutine - def stop(self): - """Stops the server. - - """ - for task in self._client_tasks: - task.cancel() - self.server.close() - yield from self.server.wait_closed() - del self.server - - def _client_done(self, task): - self._client_tasks.remove(task) - - def _handle_connection(self, reader, writer): - task = asyncio.Task(self._handle_connection_task(reader, writer)) - self._client_tasks.add(task) - task.add_done_callback(self._client_done) - - @asyncio.coroutine - def _handle_connection_task(self, reader, writer): + def _handle_connection_cr(self, reader, writer): try: line = yield from reader.readline() if line != _init_string: diff --git a/artiq/management/sync_struct.py b/artiq/management/sync_struct.py new file mode 100644 index 000000000..14255fa88 --- /dev/null +++ b/artiq/management/sync_struct.py @@ -0,0 +1,119 @@ +import asyncio + +from artiq.management import pyon +from artiq.management.network import AsyncioServer + + +_init_string = b"ARTIQ sync_struct\n" + + +class Subscriber: + def __init__(self, target_builder, error_cb, notify_cb=None): + self.target_builder = target_builder + self.error_cb = error_cb + self.notify_cb = notify_cb + + @asyncio.coroutine + def connect(self, host, port): + self._reader, self._writer = \ + yield from asyncio.open_connection(host, port) + try: + self._writer.write(_init_string) + self._receive_task = asyncio.Task(self._receive_cr()) + except: + self._writer.close() + del self._reader + del self._writer + raise + + @asyncio.coroutine + def close(self): + try: + self._receive_task.cancel() + try: + yield from asyncio.wait_for(self._receive_task, None) + except asyncio.CancelledError: + pass + finally: + self._writer.close() + del self._reader + del self._writer + + @asyncio.coroutine + def _receive_cr(self): + try: + target = None + while True: + line = yield from self._reader.readline() + obj = pyon.decode(line.decode()) + action = obj["action"] + + if action == "init": + target = self.target_builder(obj["struct"]) + elif action == "append": + target.append(obj["x"]) + elif action == "pop": + target.pop(obj["i"]) + elif action == "delitem": + target.__delitem__(obj["key"]) + if self.notify_cb is not None: + self.notify_cb() + except: + self.error_cb() + raise + + +class Publisher(AsyncioServer): + def __init__(self, backing_struct): + AsyncioServer.__init__(self) + self.backing_struct = backing_struct + self._recipients = set() + + @asyncio.coroutine + def _handle_connection_cr(self, reader, writer): + try: + line = yield from reader.readline() + if line != _init_string: + return + + obj = {"action": "init", "struct": self.backing_struct} + line = pyon.encode(obj) + "\n" + writer.write(line.encode()) + + queue = asyncio.Queue() + self._recipients.add(queue) + try: + while True: + line = yield from queue.get() + writer.write(line) + # raise exception on connection error + yield from writer.drain() + finally: + self._recipients.remove(queue) + except ConnectionResetError: + # subscribers disconnecting are a normal occurence + pass + finally: + writer.close() + + def _publish(self, obj): + line = pyon.encode(obj) + "\n" + line = line.encode() + for recipient in self._recipients: + recipient.put_nowait(line) + + # Backing struct modification methods. + # All modifications must go through them! + + def append(self, x): + self.backing_struct.append(x) + self._publish({"action": "append", "x": x}) + + def pop(self, i=-1): + r = self.backing_struct.pop(i) + self._publish({"action": "pop", "i": i}) + return r + + def __delitem__(self, key): + self.backing_struct.__delitem__(key) + self._publish({"action": "delitem", "key": key}) diff --git a/doc/manual/management_reference.rst b/doc/manual/management_reference.rst index 6f1bccafe..96582c044 100644 --- a/doc/manual/management_reference.rst +++ b/doc/manual/management_reference.rst @@ -7,8 +7,21 @@ Management reference .. automodule:: artiq.management.pyon :members: +:mod:`artiq.management.network` module +------------------------------------- + +.. automodule:: artiq.management.network + :members: + + :mod:`artiq.management.pc_rpc` module ------------------------------------- .. automodule:: artiq.management.pc_rpc :members: + +:mod:`artiq.management.sync_struct` module +------------------------------------- + +.. automodule:: artiq.management.sync_struct + :members: