forked from M-Labs/artiq
1
0
Fork 0

management: add sync_struct

This commit is contained in:
Sebastien Bourdeauducq 2014-12-27 23:27:35 +08:00
parent f7232fd3d1
commit f033810e04
4 changed files with 191 additions and 39 deletions

View File

@ -0,0 +1,55 @@
import asyncio
from copy import copy
class AsyncioServer:
"""Generic TCP server based on asyncio.
Users of this class must derive from it and define the
``_handle_connection_cr`` method and coroutine.
"""
def __init__(self):
self._client_tasks = set()
@asyncio.coroutine
def start(self, host, port):
"""Starts the server.
The user must call ``stop`` to free resources properly after this
method completes successfully.
This method is a `coroutine`.
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
host, port)
@asyncio.coroutine
def stop(self):
"""Stops the server.
"""
wait_for = copy(self._client_tasks)
for task in self._client_tasks:
task.cancel()
for task in wait_for:
try:
yield from asyncio.wait_for(task, None)
except asyncio.CancelledError:
pass
self.server.close()
yield from self.server.wait_closed()
del self.server
def _client_done(self, task):
self._client_tasks.remove(task)
def _handle_connection(self, reader, writer):
task = asyncio.Task(self._handle_connection_cr(reader, writer))
self._client_tasks.add(task)
task.add_done_callback(self._client_done)

View File

@ -17,6 +17,7 @@ import asyncio
import traceback import traceback
from artiq.management import pyon from artiq.management import pyon
from artiq.management.network import AsyncioServer
class RemoteError(Exception): class RemoteError(Exception):
@ -123,7 +124,7 @@ class Client:
return proxy return proxy
class Server: class Server(AsyncioServer):
"""This class creates a TCP server that handles requests coming from """This class creates a TCP server that handles requests coming from
``Client`` objects. ``Client`` objects.
@ -142,49 +143,13 @@ class Server:
""" """
def __init__(self, target, id_type, id_parameters=None): def __init__(self, target, id_type, id_parameters=None):
AsyncioServer.__init__(self)
self.target = target self.target = target
self.id_type = id_type self.id_type = id_type
self.id_parameters = id_parameters self.id_parameters = id_parameters
self._client_tasks = set()
@asyncio.coroutine @asyncio.coroutine
def start(self, host, port): def _handle_connection_cr(self, reader, writer):
"""Starts the server.
The user must call ``stop`` to free resources properly after this
method completes successfully.
This method is a `coroutine`.
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
host, port)
@asyncio.coroutine
def stop(self):
"""Stops the server.
"""
for task in self._client_tasks:
task.cancel()
self.server.close()
yield from self.server.wait_closed()
del self.server
def _client_done(self, task):
self._client_tasks.remove(task)
def _handle_connection(self, reader, writer):
task = asyncio.Task(self._handle_connection_task(reader, writer))
self._client_tasks.add(task)
task.add_done_callback(self._client_done)
@asyncio.coroutine
def _handle_connection_task(self, reader, writer):
try: try:
line = yield from reader.readline() line = yield from reader.readline()
if line != _init_string: if line != _init_string:

View File

@ -0,0 +1,119 @@
import asyncio
from artiq.management import pyon
from artiq.management.network import AsyncioServer
_init_string = b"ARTIQ sync_struct\n"
class Subscriber:
def __init__(self, target_builder, error_cb, notify_cb=None):
self.target_builder = target_builder
self.error_cb = error_cb
self.notify_cb = notify_cb
@asyncio.coroutine
def connect(self, host, port):
self._reader, self._writer = \
yield from asyncio.open_connection(host, port)
try:
self._writer.write(_init_string)
self._receive_task = asyncio.Task(self._receive_cr())
except:
self._writer.close()
del self._reader
del self._writer
raise
@asyncio.coroutine
def close(self):
try:
self._receive_task.cancel()
try:
yield from asyncio.wait_for(self._receive_task, None)
except asyncio.CancelledError:
pass
finally:
self._writer.close()
del self._reader
del self._writer
@asyncio.coroutine
def _receive_cr(self):
try:
target = None
while True:
line = yield from self._reader.readline()
obj = pyon.decode(line.decode())
action = obj["action"]
if action == "init":
target = self.target_builder(obj["struct"])
elif action == "append":
target.append(obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "delitem":
target.__delitem__(obj["key"])
if self.notify_cb is not None:
self.notify_cb()
except:
self.error_cb()
raise
class Publisher(AsyncioServer):
def __init__(self, backing_struct):
AsyncioServer.__init__(self)
self.backing_struct = backing_struct
self._recipients = set()
@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
try:
line = yield from reader.readline()
if line != _init_string:
return
obj = {"action": "init", "struct": self.backing_struct}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
queue = asyncio.Queue()
self._recipients.add(queue)
try:
while True:
line = yield from queue.get()
writer.write(line)
# raise exception on connection error
yield from writer.drain()
finally:
self._recipients.remove(queue)
except ConnectionResetError:
# subscribers disconnecting are a normal occurence
pass
finally:
writer.close()
def _publish(self, obj):
line = pyon.encode(obj) + "\n"
line = line.encode()
for recipient in self._recipients:
recipient.put_nowait(line)
# Backing struct modification methods.
# All modifications must go through them!
def append(self, x):
self.backing_struct.append(x)
self._publish({"action": "append", "x": x})
def pop(self, i=-1):
r = self.backing_struct.pop(i)
self._publish({"action": "pop", "i": i})
return r
def __delitem__(self, key):
self.backing_struct.__delitem__(key)
self._publish({"action": "delitem", "key": key})

View File

@ -7,8 +7,21 @@ Management reference
.. automodule:: artiq.management.pyon .. automodule:: artiq.management.pyon
:members: :members:
:mod:`artiq.management.network` module
-------------------------------------
.. automodule:: artiq.management.network
:members:
:mod:`artiq.management.pc_rpc` module :mod:`artiq.management.pc_rpc` module
------------------------------------- -------------------------------------
.. automodule:: artiq.management.pc_rpc .. automodule:: artiq.management.pc_rpc
:members: :members:
:mod:`artiq.management.sync_struct` module
-------------------------------------
.. automodule:: artiq.management.sync_struct
:members: