diff --git a/artiq/management/pc_rpc.py b/artiq/management/pc_rpc.py index 27527c83a..cb7bff0ed 100644 --- a/artiq/management/pc_rpc.py +++ b/artiq/management/pc_rpc.py @@ -30,7 +30,7 @@ class RemoteError(Exception): class IncompatibleServer(Exception): """Raised by the client when attempting to connect to a server that does - not have the expected type. + not have the expected target. """ pass @@ -62,22 +62,38 @@ class Client: hostname or a IPv4 or IPv6 address (see ``socket.create_connection`` in the Python standard library). :param port: TCP port to use. - :param expected_id_type: Server type to expect. ``IncompatibleServer`` is - raised when the types do not match. Use ``None`` to accept any server - type. + :param target_name: Target name to select. ``IncompatibleServer`` is + raised if the target does not exist. + Use ``None`` to skip selecting a target. The list of targets can then + be retrieved using ``get_rpc_id`` and then one can be selected later + using ``select_rpc_target``. """ - def __init__(self, host, port, expected_id_type): - self.socket = socket.create_connection((host, port)) - self.socket.sendall(_init_string) - self._identify(expected_id_type) + def __init__(self, host, port, target_name): + self._socket = socket.create_connection((host, port)) + self._socket.sendall(_init_string) - def get_rpc_id(self): - """Returns a dictionary containing the identification information of - the server. + server_identification = self._recv() + self._target_names = server_identification["targets"] + self._id_parameters = server_identification["parameters"] + if target_name is not None: + self.select_rpc_target(target_name) + + def select_rpc_target(self, target_name): + """Selects a RPC target by name. This function should be called + exactly once if the object was created with ``target_name=None``. """ - return self._server_identification + if target_name not in self._target_names: + raise IncompatibleServer + self._socket.sendall((target_name + "\n").encode()) + + def get_rpc_id(self): + """Returns a tuple (target_names, id_parameters) containing the + identification information of the server. + + """ + return (self._target_names, self._id_parameters) def close_rpc(self): """Closes the connection to the RPC server. @@ -85,15 +101,16 @@ class Client: No further method calls should be done after this method is called. """ - self.socket.close() + self._socket.close() - def _send_recv(self, obj): + def _send(self, obj): line = pyon.encode(obj) + "\n" - self.socket.sendall(line.encode()) + self._socket.sendall(line.encode()) - buf = self.socket.recv(4096).decode() + def _recv(self): + buf = self._socket.recv(4096).decode() while "\n" not in buf: - more = self.socket.recv(4096) + more = self._socket.recv(4096) if not more: break buf += more.decode() @@ -101,20 +118,15 @@ class Client: return obj - def _identify(self, expected_id_type): - obj = {"action": "identify"} - self._server_identification = self._send_recv(obj) - if (expected_id_type is not None - and self._server_identification["type"] != expected_id_type): - raise IncompatibleServer - def _do_rpc(self, name, args, kwargs): obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} - obj = self._send_recv(obj) - if obj["result"] == "ok": + self._send(obj) + + obj = self._recv() + if obj["status"] == "ok": return obj["ret"] - elif obj["result"] == "error": - raise RemoteError(obj["message"] + "\n" + obj["traceback"]) + elif obj["status"] == "failed": + raise RemoteError(obj["message"]) else: raise ValueError @@ -134,18 +146,16 @@ class Server(AsyncioServer): simple cases: it allows new connections to be be accepted even when the previous client failed to properly shut down its connection. - :param target: Object providing the RPC methods to be exposed to the - client. - :param id_type: A string identifying the server type. Clients use it to - verify that they are connected to the proper server. + :param targets: A dictionary of objects providing the RPC methods to be + exposed to the client. Keys are names identifying each object. + Clients select one of these objects using its name upon connection. :param id_parameters: An optional human-readable string giving more information about the parameters of the server. """ - def __init__(self, target, id_type, id_parameters=None): + def __init__(self, targets, id_parameters=None): AsyncioServer.__init__(self) - self.target = target - self.id_type = id_type + self.targets = targets self.id_parameters = id_parameters @asyncio.coroutine @@ -154,34 +164,41 @@ class Server(AsyncioServer): line = yield from reader.readline() if line != _init_string: return + + obj = { + "targets": sorted(self.targets.keys()), + "parameters": self.id_parameters + } + line = pyon.encode(obj) + "\n" + writer.write(line.encode()) + line = yield from reader.readline() + if not line: + return + target_name = line.decode()[:-1] + try: + target = self.targets[target_name] + except KeyError: + return + while True: line = yield from reader.readline() if not line: break obj = pyon.decode(line.decode()) - action = obj["action"] - if action == "call": - try: - method = getattr(self.target, obj["name"]) - ret = method(*obj["args"], **obj["kwargs"]) - obj = {"result": "ok", "ret": ret} - except Exception as e: - obj = {"result": "error", - "message": type(e).__name__ + ": " + str(e), - "traceback": traceback.format_exc()} - line = pyon.encode(obj) + "\n" - writer.write(line.encode()) - elif action == "identify": - obj = {"type": self.id_type} - if self.id_parameters is not None: - obj["parameters"] = self.id_parameters - line = pyon.encode(obj) + "\n" - writer.write(line.encode()) + try: + method = getattr(target, obj["name"]) + ret = method(*obj["args"], **obj["kwargs"]) + obj = {"status": "ok", "ret": ret} + except Exception: + obj = {"status": "failed", + "message": traceback.format_exc()} + line = pyon.encode(obj) + "\n" + writer.write(line.encode()) finally: writer.close() -def simple_server_loop(target, id_type, host, port, id_parameters=None): +def simple_server_loop(targets, host, port, id_parameters=None): """Runs a server until an exception is raised (e.g. the user hits Ctrl-C). See ``Server`` for a description of the parameters. @@ -189,7 +206,7 @@ def simple_server_loop(target, id_type, host, port, id_parameters=None): """ loop = asyncio.get_event_loop() try: - server = Server(target, id_type, id_parameters) + server = Server(targets, id_parameters) loop.run_until_complete(server.start(host, port)) try: loop.run_forever() diff --git a/artiq/test/pc_rpc.py b/artiq/test/pc_rpc.py index 2528954ab..63f7b8263 100644 --- a/artiq/test/pc_rpc.py +++ b/artiq/test/pc_rpc.py @@ -66,7 +66,7 @@ def run_server(): loop = asyncio.get_event_loop() try: echo = Echo() - server = pc_rpc.Server(echo, "test") + server = pc_rpc.Server({"test": echo}) loop.run_until_complete(server.start(test_address, test_port)) try: loop.run_until_complete(echo.wait_quit()) diff --git a/doc/manual/drivers_reference.rst b/doc/manual/drivers_reference.rst index 3f2bc8d5a..06cd0cf6f 100644 --- a/doc/manual/drivers_reference.rst +++ b/doc/manual/drivers_reference.rst @@ -30,12 +30,14 @@ Default TCP port list When writing a new driver, choose a free TCP port and add it to this list. -+-----------+--------------+ -| Component | Default port | -+===========+==============+ -| Master | 8888 | -+-----------+--------------+ -| PDQ2 | 8889 | -+-----------+--------------+ -| LDA | 8890 | -+-----------+--------------+ ++--------------------------+--------------+ +| Component | Default port | ++==========================+==============+ +| Master (notifications) | 8887 | ++--------------------------+--------------+ +| Master (control) | 8888 | ++--------------------------+--------------+ +| PDQ2 | 8889 | ++--------------------------+--------------+ +| LDA | 8890 | ++--------------------------+--------------+ diff --git a/doc/manual/writing_a_driver.rst b/doc/manual/writing_a_driver.rst index 432eabc1d..3fa1e6c43 100644 --- a/doc/manual/writing_a_driver.rst +++ b/doc/manual/writing_a_driver.rst @@ -23,7 +23,7 @@ To turn it into a server, we use :class:`artiq.management.pc_rpc`. Import the fu and add a ``main`` function that is run when the program is executed: :: def main(): - simple_server_loop(Hello(), "hello", "::1", 7777) + simple_server_loop({"hello": Hello()}, "::1", 7777) if __name__ == "__main__": main() @@ -49,10 +49,10 @@ and verify that you can connect to the TCP port: :: :tip: Use the key combination Ctrl-AltGr-9 to get the ``telnet>`` prompt, and enter ``close`` to quit Telnet. Quit the controller with Ctrl-C. -Also verify that you can get the type of the server (the "hello" string passed to ``simple_server_loop``) using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: :: +Also verify that a target (service) named "hello" (as passed in the first argument to ``simple_server_loop``) exists using the ``artiq_ctlid.py`` program from the ARTIQ front-end tools: :: $ artiq_ctlid.py ::1 7777 - Type: hello + Target(s): hello The client ---------- diff --git a/frontend/artiq_client.py b/frontend/artiq_client.py index d2056be40..142a93c52 100755 --- a/frontend/artiq_client.py +++ b/frontend/artiq_client.py @@ -144,7 +144,7 @@ def main(): _run_subscriber(args.server, args.port, subscriber) else: port = 8888 if args.port is None else args.port - remote = Client(args.server, port, "schedule_control") + remote = Client(args.server, port, "master_schedule") try: globals()["_action_" + args.action](remote, args) finally: diff --git a/frontend/artiq_ctlid.py b/frontend/artiq_ctlid.py index 0b4df1d07..d850fd69e 100755 --- a/frontend/artiq_ctlid.py +++ b/frontend/artiq_ctlid.py @@ -19,12 +19,12 @@ def main(): args = _get_args() remote = Client(args.server, args.port, None) try: - ident = remote.get_rpc_id() + target_names, id_parameters = remote.get_rpc_id() finally: remote.close_rpc() - print("Type: " + ident["type"]) - if "parameters" in ident: - print("Parameters: " + ident["parameters"]) + print("Target(s): " + ", ".join(target_names)) + if id_parameters is not None: + print("Parameters: " + id_parameters) if __name__ == "__main__": main() diff --git a/frontend/artiq_gui.py b/frontend/artiq_gui.py index 26e50faad..b303f1381 100755 --- a/frontend/artiq_gui.py +++ b/frontend/artiq_gui.py @@ -16,11 +16,11 @@ def _get_args(): "-s", "--server", default="::1", help="hostname or IP of the master to connect to") parser.add_argument( - "--port-schedule-control", default=8888, type=int, - help="TCP port to connect to for schedule control") + "--port-notify", default=8887, type=int, + help="TCP port to connect to for notifications") parser.add_argument( - "--port-schedule-notify", default=8887, type=int, - help="TCP port to connect to for schedule notifications") + "--port-control", default=8888, type=int, + help="TCP port to connect to for control") return parser.parse_args() @@ -39,7 +39,7 @@ def main(): parameters_win.show_all() loop.run_until_complete(scheduler_win.sub_connect( - args.server, args.port_schedule_notify)) + args.server, args.port_notify)) try: loop.run_forever() finally: diff --git a/frontend/artiq_master.py b/frontend/artiq_master.py index 560c2c7c4..32aab7ea7 100755 --- a/frontend/artiq_master.py +++ b/frontend/artiq_master.py @@ -16,11 +16,11 @@ def _get_args(): "--bind", default="::1", help="hostname or IP address to bind to") parser.add_argument( - "--port-schedule-control", default=8888, type=int, - help="TCP port to listen to for schedule control") + "--port-notify", default=8887, type=int, + help="TCP port to listen to for notifications") parser.add_argument( - "--port-schedule-notify", default=8887, type=int, - help="TCP port to listen to for schedule notifications") + "--port-control", default=8888, type=int, + help="TCP port to listen to for control") return parser.parse_args() @@ -38,18 +38,20 @@ def main(): loop.run_until_complete(scheduler.start()) atexit.register(lambda: loop.run_until_complete(scheduler.stop())) - schedule_control = Server(scheduler, "schedule_control") - loop.run_until_complete(schedule_control.start( - args.bind, args.port_schedule_control)) - atexit.register(lambda: loop.run_until_complete(schedule_control.stop())) + server_control = Server({ + "master_schedule": scheduler + }) + loop.run_until_complete(server_control.start( + args.bind, args.port_control)) + atexit.register(lambda: loop.run_until_complete(server_control.stop())) - schedule_notify = Publisher({ + server_notify = Publisher({ "queue": scheduler.queue, "periodic": scheduler.periodic }) - loop.run_until_complete(schedule_notify.start( - args.bind, args.port_schedule_notify)) - atexit.register(lambda: loop.run_until_complete(schedule_notify.stop())) + loop.run_until_complete(server_notify.start( + args.bind, args.port_notify)) + atexit.register(lambda: loop.run_until_complete(server_notify.stop())) loop.run_forever() diff --git a/frontend/lda_controller.py b/frontend/lda_controller.py index 48a8af568..dfc76a7ce 100755 --- a/frontend/lda_controller.py +++ b/frontend/lda_controller.py @@ -23,7 +23,7 @@ def main(): else: lda = Lda(args.serial, args.device) - simple_server_loop(lda, "lda", + simple_server_loop({"lda": lda}, args.bind, args.port) if __name__ == "__main__": diff --git a/frontend/pdq2_controller.py b/frontend/pdq2_controller.py index 340d10452..dc8e993a9 100755 --- a/frontend/pdq2_controller.py +++ b/frontend/pdq2_controller.py @@ -32,8 +32,8 @@ def main(): dev = Pdq2(serial=args.serial) try: - simple_server_loop(dev, "pdq2", args.bind, args.port, - id_parameters="serial="+str(args.serial)) + simple_server_loop({"pdq2": dev}, args.bind, args.port, + id_parameters="serial=" + str(args.serial)) finally: dev.close()