pc_rpc: multiple target support

This commit is contained in:
Sebastien Bourdeauducq 2014-12-31 20:13:10 +08:00
parent 9cd89a0c50
commit d2a5dfa1ec
10 changed files with 114 additions and 93 deletions

View File

@ -30,7 +30,7 @@ class RemoteError(Exception):
class IncompatibleServer(Exception): class IncompatibleServer(Exception):
"""Raised by the client when attempting to connect to a server that does """Raised by the client when attempting to connect to a server that does
not have the expected type. not have the expected target.
""" """
pass pass
@ -62,22 +62,38 @@ class Client:
hostname or a IPv4 or IPv6 address (see hostname or a IPv4 or IPv6 address (see
``socket.create_connection`` in the Python standard library). ``socket.create_connection`` in the Python standard library).
:param port: TCP port to use. :param port: TCP port to use.
:param expected_id_type: Server type to expect. ``IncompatibleServer`` is :param target_name: Target name to select. ``IncompatibleServer`` is
raised when the types do not match. Use ``None`` to accept any server raised if the target does not exist.
type. Use ``None`` to skip selecting a target. The list of targets can then
be retrieved using ``get_rpc_id`` and then one can be selected later
using ``select_rpc_target``.
""" """
def __init__(self, host, port, expected_id_type): def __init__(self, host, port, target_name):
self.socket = socket.create_connection((host, port)) self._socket = socket.create_connection((host, port))
self.socket.sendall(_init_string) self._socket.sendall(_init_string)
self._identify(expected_id_type)
def get_rpc_id(self): server_identification = self._recv()
"""Returns a dictionary containing the identification information of self._target_names = server_identification["targets"]
the server. self._id_parameters = server_identification["parameters"]
if target_name is not None:
self.select_rpc_target(target_name)
def select_rpc_target(self, target_name):
"""Selects a RPC target by name. This function should be called
exactly once if the object was created with ``target_name=None``.
""" """
return self._server_identification if target_name not in self._target_names:
raise IncompatibleServer
self._socket.sendall((target_name + "\n").encode())
def get_rpc_id(self):
"""Returns a tuple (target_names, id_parameters) containing the
identification information of the server.
"""
return (self._target_names, self._id_parameters)
def close_rpc(self): def close_rpc(self):
"""Closes the connection to the RPC server. """Closes the connection to the RPC server.
@ -85,15 +101,16 @@ class Client:
No further method calls should be done after this method is called. No further method calls should be done after this method is called.
""" """
self.socket.close() self._socket.close()
def _send_recv(self, obj): def _send(self, obj):
line = pyon.encode(obj) + "\n" line = pyon.encode(obj) + "\n"
self.socket.sendall(line.encode()) self._socket.sendall(line.encode())
buf = self.socket.recv(4096).decode() def _recv(self):
buf = self._socket.recv(4096).decode()
while "\n" not in buf: while "\n" not in buf:
more = self.socket.recv(4096) more = self._socket.recv(4096)
if not more: if not more:
break break
buf += more.decode() buf += more.decode()
@ -101,20 +118,15 @@ class Client:
return obj return obj
def _identify(self, expected_id_type):
obj = {"action": "identify"}
self._server_identification = self._send_recv(obj)
if (expected_id_type is not None
and self._server_identification["type"] != expected_id_type):
raise IncompatibleServer
def _do_rpc(self, name, args, kwargs): def _do_rpc(self, name, args, kwargs):
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
obj = self._send_recv(obj) self._send(obj)
if obj["result"] == "ok":
obj = self._recv()
if obj["status"] == "ok":
return obj["ret"] return obj["ret"]
elif obj["result"] == "error": elif obj["status"] == "failed":
raise RemoteError(obj["message"] + "\n" + obj["traceback"]) raise RemoteError(obj["message"])
else: else:
raise ValueError raise ValueError
@ -134,18 +146,16 @@ class Server(AsyncioServer):
simple cases: it allows new connections to be be accepted even when the simple cases: it allows new connections to be be accepted even when the
previous client failed to properly shut down its connection. previous client failed to properly shut down its connection.
:param target: Object providing the RPC methods to be exposed to the :param targets: A dictionary of objects providing the RPC methods to be
client. exposed to the client. Keys are names identifying each object.
:param id_type: A string identifying the server type. Clients use it to Clients select one of these objects using its name upon connection.
verify that they are connected to the proper server.
:param id_parameters: An optional human-readable string giving more :param id_parameters: An optional human-readable string giving more
information about the parameters of the server. information about the parameters of the server.
""" """
def __init__(self, target, id_type, id_parameters=None): def __init__(self, targets, id_parameters=None):
AsyncioServer.__init__(self) AsyncioServer.__init__(self)
self.target = target self.targets = targets
self.id_type = id_type
self.id_parameters = id_parameters self.id_parameters = id_parameters
@asyncio.coroutine @asyncio.coroutine
@ -154,34 +164,41 @@ class Server(AsyncioServer):
line = yield from reader.readline() line = yield from reader.readline()
if line != _init_string: if line != _init_string:
return return
obj = {
"targets": sorted(self.targets.keys()),
"parameters": self.id_parameters
}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
line = yield from reader.readline()
if not line:
return
target_name = line.decode()[:-1]
try:
target = self.targets[target_name]
except KeyError:
return
while True: while True:
line = yield from reader.readline() line = yield from reader.readline()
if not line: if not line:
break break
obj = pyon.decode(line.decode()) obj = pyon.decode(line.decode())
action = obj["action"] try:
if action == "call": method = getattr(target, obj["name"])
try: ret = method(*obj["args"], **obj["kwargs"])
method = getattr(self.target, obj["name"]) obj = {"status": "ok", "ret": ret}
ret = method(*obj["args"], **obj["kwargs"]) except Exception:
obj = {"result": "ok", "ret": ret} obj = {"status": "failed",
except Exception as e: "message": traceback.format_exc()}
obj = {"result": "error", line = pyon.encode(obj) + "\n"
"message": type(e).__name__ + ": " + str(e), writer.write(line.encode())
"traceback": traceback.format_exc()}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
elif action == "identify":
obj = {"type": self.id_type}
if self.id_parameters is not None:
obj["parameters"] = self.id_parameters
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
finally: finally:
writer.close() writer.close()
def simple_server_loop(target, id_type, host, port, id_parameters=None): def simple_server_loop(targets, host, port, id_parameters=None):
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C). """Runs a server until an exception is raised (e.g. the user hits Ctrl-C).
See ``Server`` for a description of the parameters. See ``Server`` for a description of the parameters.
@ -189,7 +206,7 @@ def simple_server_loop(target, id_type, host, port, id_parameters=None):
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
server = Server(target, id_type, id_parameters) server = Server(targets, id_parameters)
loop.run_until_complete(server.start(host, port)) loop.run_until_complete(server.start(host, port))
try: try:
loop.run_forever() loop.run_forever()

View File

@ -66,7 +66,7 @@ def run_server():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
echo = Echo() echo = Echo()
server = pc_rpc.Server(echo, "test") server = pc_rpc.Server({"test": echo})
loop.run_until_complete(server.start(test_address, test_port)) loop.run_until_complete(server.start(test_address, test_port))
try: try:
loop.run_until_complete(echo.wait_quit()) loop.run_until_complete(echo.wait_quit())

View File

@ -30,12 +30,14 @@ Default TCP port list
When writing a new driver, choose a free TCP port and add it to this list. When writing a new driver, choose a free TCP port and add it to this list.
+-----------+--------------+ +--------------------------+--------------+
| Component | Default port | | Component | Default port |
+===========+==============+ +==========================+==============+
| Master | 8888 | | Master (notifications) | 8887 |
+-----------+--------------+ +--------------------------+--------------+
| PDQ2 | 8889 | | Master (control) | 8888 |
+-----------+--------------+ +--------------------------+--------------+
| LDA | 8890 | | PDQ2 | 8889 |
+-----------+--------------+ +--------------------------+--------------+
| LDA | 8890 |
+--------------------------+--------------+

View File

@ -23,7 +23,7 @@ To turn it into a server, we use :class:`artiq.management.pc_rpc`. Import the fu
and add a ``main`` function that is run when the program is executed: :: and add a ``main`` function that is run when the program is executed: ::
def main(): def main():
simple_server_loop(Hello(), "hello", "::1", 7777) simple_server_loop({"hello": Hello()}, "::1", 7777)
if __name__ == "__main__": if __name__ == "__main__":
main() main()
@ -49,10 +49,10 @@ and verify that you can connect to the TCP port: ::
:tip: Use the key combination Ctrl-AltGr-9 to get the ``telnet>`` prompt, and enter ``close`` to quit Telnet. Quit the controller with Ctrl-C. :tip: Use the key combination Ctrl-AltGr-9 to get the ``telnet>`` prompt, and enter ``close`` to quit Telnet. Quit the controller with Ctrl-C.
Also verify that you can get the type of the server (the "hello" string passed to ``simple_server_loop``) using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: :: Also verify that a target (service) named "hello" (as passed in the first argument to ``simple_server_loop``) exists using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: ::
$ artiq_ctlid.py ::1 7777 $ artiq_ctlid.py ::1 7777
Type: hello Target(s): hello
The client The client
---------- ----------

View File

@ -144,7 +144,7 @@ def main():
_run_subscriber(args.server, args.port, subscriber) _run_subscriber(args.server, args.port, subscriber)
else: else:
port = 8888 if args.port is None else args.port port = 8888 if args.port is None else args.port
remote = Client(args.server, port, "schedule_control") remote = Client(args.server, port, "master_schedule")
try: try:
globals()["_action_" + args.action](remote, args) globals()["_action_" + args.action](remote, args)
finally: finally:

View File

@ -19,12 +19,12 @@ def main():
args = _get_args() args = _get_args()
remote = Client(args.server, args.port, None) remote = Client(args.server, args.port, None)
try: try:
ident = remote.get_rpc_id() target_names, id_parameters = remote.get_rpc_id()
finally: finally:
remote.close_rpc() remote.close_rpc()
print("Type: " + ident["type"]) print("Target(s): " + ", ".join(target_names))
if "parameters" in ident: if id_parameters is not None:
print("Parameters: " + ident["parameters"]) print("Parameters: " + id_parameters)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -16,11 +16,11 @@ def _get_args():
"-s", "--server", default="::1", "-s", "--server", default="::1",
help="hostname or IP of the master to connect to") help="hostname or IP of the master to connect to")
parser.add_argument( parser.add_argument(
"--port-schedule-control", default=8888, type=int, "--port-notify", default=8887, type=int,
help="TCP port to connect to for schedule control") help="TCP port to connect to for notifications")
parser.add_argument( parser.add_argument(
"--port-schedule-notify", default=8887, type=int, "--port-control", default=8888, type=int,
help="TCP port to connect to for schedule notifications") help="TCP port to connect to for control")
return parser.parse_args() return parser.parse_args()
@ -39,7 +39,7 @@ def main():
parameters_win.show_all() parameters_win.show_all()
loop.run_until_complete(scheduler_win.sub_connect( loop.run_until_complete(scheduler_win.sub_connect(
args.server, args.port_schedule_notify)) args.server, args.port_notify))
try: try:
loop.run_forever() loop.run_forever()
finally: finally:

View File

@ -16,11 +16,11 @@ def _get_args():
"--bind", default="::1", "--bind", default="::1",
help="hostname or IP address to bind to") help="hostname or IP address to bind to")
parser.add_argument( parser.add_argument(
"--port-schedule-control", default=8888, type=int, "--port-notify", default=8887, type=int,
help="TCP port to listen to for schedule control") help="TCP port to listen to for notifications")
parser.add_argument( parser.add_argument(
"--port-schedule-notify", default=8887, type=int, "--port-control", default=8888, type=int,
help="TCP port to listen to for schedule notifications") help="TCP port to listen to for control")
return parser.parse_args() return parser.parse_args()
@ -38,18 +38,20 @@ def main():
loop.run_until_complete(scheduler.start()) loop.run_until_complete(scheduler.start())
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
schedule_control = Server(scheduler, "schedule_control") server_control = Server({
loop.run_until_complete(schedule_control.start( "master_schedule": scheduler
args.bind, args.port_schedule_control)) })
atexit.register(lambda: loop.run_until_complete(schedule_control.stop())) loop.run_until_complete(server_control.start(
args.bind, args.port_control))
atexit.register(lambda: loop.run_until_complete(server_control.stop()))
schedule_notify = Publisher({ server_notify = Publisher({
"queue": scheduler.queue, "queue": scheduler.queue,
"periodic": scheduler.periodic "periodic": scheduler.periodic
}) })
loop.run_until_complete(schedule_notify.start( loop.run_until_complete(server_notify.start(
args.bind, args.port_schedule_notify)) args.bind, args.port_notify))
atexit.register(lambda: loop.run_until_complete(schedule_notify.stop())) atexit.register(lambda: loop.run_until_complete(server_notify.stop()))
loop.run_forever() loop.run_forever()

View File

@ -23,7 +23,7 @@ def main():
else: else:
lda = Lda(args.serial, args.device) lda = Lda(args.serial, args.device)
simple_server_loop(lda, "lda", simple_server_loop({"lda": lda},
args.bind, args.port) args.bind, args.port)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -32,8 +32,8 @@ def main():
dev = Pdq2(serial=args.serial) dev = Pdq2(serial=args.serial)
try: try:
simple_server_loop(dev, "pdq2", args.bind, args.port, simple_server_loop({"pdq2": dev}, args.bind, args.port,
id_parameters="serial="+str(args.serial)) id_parameters="serial=" + str(args.serial))
finally: finally:
dev.close() dev.close()