2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-24 09:28:13 +08:00
artiq/artiq/management/pc_rpc.py

182 lines
5.7 KiB
Python
Raw Normal View History

2014-10-27 13:50:32 +08:00
"""
This module provides a remote procedure call (RPC) mechanism over sockets
between conventional computers (PCs) running Python. It strives to be
transparent and uses ``artiq.management.pyon`` internally so that e.g. Numpy
arrays can be easily used.
"""
2014-10-23 18:47:26 +08:00
import socket
import asyncio
2014-10-25 11:31:54 +08:00
import traceback
2014-10-23 18:47:26 +08:00
2014-10-25 11:35:21 +08:00
from artiq.management import pyon
2014-10-23 18:47:26 +08:00
class RemoteError(Exception):
2014-10-27 13:50:32 +08:00
"""Exception raised when a RPC failed or raised an exception on the
remote (server) side.
"""
2014-10-23 18:47:26 +08:00
pass
class Client:
2014-10-27 13:50:32 +08:00
"""This class proxies the methods available on the server so that they
can be used as if they were local methods.
For example, if the server provides method ``foo``, and ``c`` is a local
``Client`` object, then the method can be called as: ::
result = c.foo(param1, param2)
The parameters and the result are automatically transferred with the
server.
Only methods are supported. Attributes must be accessed by providing and
using "get" and/or "set" methods on the server side.
At object initialization, the connection to the remote server is
automatically attempted. The user must call ``close_rpc`` to
free resources properly after initialization completes successfully.
:param host: Identifier of the server. The string can represent a
hostname or a IPv4 or IPv6 address (see
``socket.create_connection`` in the Python standard library).
:param port: TCP port to use.
"""
2014-10-23 18:47:26 +08:00
def __init__(self, host, port):
self.socket = socket.create_connection((host, port))
def close_rpc(self):
2014-10-27 13:50:32 +08:00
"""Closes the connection to the RPC server.
No further method calls should be done after this method is called.
"""
2014-10-23 18:47:26 +08:00
self.socket.close()
2014-10-27 13:50:32 +08:00
def _do_rpc(self, name, args, kwargs):
2014-10-23 18:47:26 +08:00
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
2014-10-25 11:35:21 +08:00
line = pyon.encode(obj) + "\n"
2014-10-23 18:47:26 +08:00
self.socket.sendall(line.encode())
buf = self.socket.recv(4096).decode()
while "\n" not in buf:
more = self.socket.recv(4096)
if not more:
break
buf += more.decode()
2014-10-25 11:35:21 +08:00
obj = pyon.decode(buf)
2014-10-23 18:47:26 +08:00
if obj["result"] == "ok":
return obj["ret"]
elif obj["result"] == "error":
2014-10-25 11:31:54 +08:00
raise RemoteError(obj["message"] + "\n" + obj["traceback"])
2014-10-23 18:47:26 +08:00
else:
raise ValueError
def __getattr__(self, name):
def proxy(*args, **kwargs):
2014-10-27 13:50:32 +08:00
return self._do_rpc(name, args, kwargs)
2014-10-23 18:47:26 +08:00
return proxy
class Server:
2014-10-27 13:50:32 +08:00
"""This class creates a TCP server that handles requests coming from
``Client`` objects.
The server is designed using ``asyncio`` so that it can easily support
multiple connections without the locking issues that arise in
multi-threaded applications. Multiple connection support is useful even in
simple cases: it allows new connections to be be accepted even when the
previous client failed to properly shut down its connection.
:param target: Object providing the RPC methods to be exposed to the
client.
"""
2014-10-23 18:47:26 +08:00
def __init__(self, target):
self.target = target
2014-10-27 13:50:32 +08:00
self._client_tasks = set()
2014-10-23 18:47:26 +08:00
@asyncio.coroutine
def start(self, host, port):
2014-10-27 13:50:32 +08:00
"""Starts the server.
The user must call ``stop`` to free resources properly after this
method completes successfully.
This method is a `coroutine`.
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
2014-10-23 18:47:26 +08:00
host, port)
@asyncio.coroutine
def stop(self):
2014-10-27 13:50:32 +08:00
"""Stops the server.
"""
for task in self._client_tasks:
2014-10-23 18:47:26 +08:00
task.cancel()
self.server.close()
yield from self.server.wait_closed()
del self.server
2014-10-27 13:50:32 +08:00
def _client_done(self, task):
self._client_tasks.remove(task)
2014-10-23 18:47:26 +08:00
2014-10-27 13:50:32 +08:00
def _handle_connection(self, reader, writer):
task = asyncio.Task(self._handle_connection_task(reader, writer))
self._client_tasks.add(task)
task.add_done_callback(self._client_done)
2014-10-23 18:47:26 +08:00
@asyncio.coroutine
2014-10-27 13:50:32 +08:00
def _handle_connection_task(self, reader, writer):
2014-10-23 18:47:26 +08:00
try:
while True:
line = yield from reader.readline()
if not line:
break
2014-10-25 11:35:21 +08:00
obj = pyon.decode(line.decode())
2014-10-23 18:47:26 +08:00
action = obj["action"]
if action == "call":
try:
method = getattr(self.target, obj["name"])
2014-10-23 18:47:26 +08:00
ret = method(*obj["args"], **obj["kwargs"])
obj = {"result": "ok", "ret": ret}
except Exception as e:
obj = {"result": "error",
2014-10-25 11:31:54 +08:00
"message": type(e).__name__ + ": " + str(e),
"traceback": traceback.format_exc()}
line = pyon.encode(obj) + "\n"
2014-10-23 18:47:26 +08:00
writer.write(line.encode())
finally:
writer.close()
2014-10-27 20:37:37 +08:00
def simple_server_loop(target, host, port):
"""Runs a server until an exception is raised (e.g. the user hits Ctrl-C).
:param target: Object providing the RPC methods to be exposed to the
client.
:param host: Bind address of the server.
:param port: TCP port to bind to.
"""
loop = asyncio.get_event_loop()
try:
server = Server(target)
loop.run_until_complete(server.start(host, port))
try:
loop.run_forever()
finally:
loop.run_until_complete(server.stop())
finally:
loop.close()