forked from M-Labs/artiq
1
0
Fork 0

pc_rpc: document

This commit is contained in:
Sebastien Bourdeauducq 2014-10-27 13:50:32 +08:00
parent 27fc19e415
commit 934442bd07
2 changed files with 103 additions and 12 deletions

View File

@ -1,3 +1,11 @@
"""
This module provides a remote procedure call (RPC) mechanism over sockets
between conventional computers (PCs) running Python. It strives to be
transparent and uses ``artiq.management.pyon`` internally so that e.g. Numpy
arrays can be easily used.
"""
import socket import socket
import asyncio import asyncio
import traceback import traceback
@ -6,17 +14,50 @@ from artiq.management import pyon
class RemoteError(Exception): class RemoteError(Exception):
"""Exception raised when a RPC failed or raised an exception on the
remote (server) side.
"""
pass pass
class Client: class Client:
"""This class proxies the methods available on the server so that they
can be used as if they were local methods.
For example, if the server provides method ``foo``, and ``c`` is a local
``Client`` object, then the method can be called as: ::
result = c.foo(param1, param2)
The parameters and the result are automatically transferred with the
server.
Only methods are supported. Attributes must be accessed by providing and
using "get" and/or "set" methods on the server side.
At object initialization, the connection to the remote server is
automatically attempted. The user must call ``close_rpc`` to
free resources properly after initialization completes successfully.
:param host: Identifier of the server. The string can represent a
hostname or a IPv4 or IPv6 address (see
``socket.create_connection`` in the Python standard library).
:param port: TCP port to use.
"""
def __init__(self, host, port): def __init__(self, host, port):
self.socket = socket.create_connection((host, port)) self.socket = socket.create_connection((host, port))
def close_rpc(self): def close_rpc(self):
"""Closes the connection to the RPC server.
No further method calls should be done after this method is called.
"""
self.socket.close() self.socket.close()
def do_rpc(self, name, args, kwargs): def _do_rpc(self, name, args, kwargs):
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
line = pyon.encode(obj) + "\n" line = pyon.encode(obj) + "\n"
self.socket.sendall(line.encode()) self.socket.sendall(line.encode())
@ -37,38 +78,66 @@ class Client:
def __getattr__(self, name): def __getattr__(self, name):
def proxy(*args, **kwargs): def proxy(*args, **kwargs):
return self.do_rpc(name, args, kwargs) return self._do_rpc(name, args, kwargs)
return proxy return proxy
class Server: class Server:
"""This class creates a TCP server that handles requests coming from
``Client`` objects.
The server is designed using ``asyncio`` so that it can easily support
multiple connections without the locking issues that arise in
multi-threaded applications. Multiple connection support is useful even in
simple cases: it allows new connections to be be accepted even when the
previous client failed to properly shut down its connection.
:param target: Object providing the RPC methods to be exposed to the
client.
"""
def __init__(self, target): def __init__(self, target):
self.target = target self.target = target
self.client_tasks = set() self._client_tasks = set()
@asyncio.coroutine @asyncio.coroutine
def start(self, host, port): def start(self, host, port):
self.server = yield from asyncio.start_server(self.handle_connection, """Starts the server.
The user must call ``stop`` to free resources properly after this
method completes successfully.
This method is a `coroutine`.
:param host: Bind address of the server (see ``asyncio.start_server``
from the Python standard library).
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
host, port) host, port)
@asyncio.coroutine @asyncio.coroutine
def stop(self): def stop(self):
for task in self.client_tasks: """Stops the server.
"""
for task in self._client_tasks:
task.cancel() task.cancel()
self.server.close() self.server.close()
yield from self.server.wait_closed() yield from self.server.wait_closed()
del self.server del self.server
def client_done(self, task): def _client_done(self, task):
self.client_tasks.remove(task) self._client_tasks.remove(task)
def handle_connection(self, reader, writer): def _handle_connection(self, reader, writer):
task = asyncio.Task(self.handle_connection_task(reader, writer)) task = asyncio.Task(self._handle_connection_task(reader, writer))
self.client_tasks.add(task) self._client_tasks.add(task)
task.add_done_callback(self.client_done) task.add_done_callback(self._client_done)
@asyncio.coroutine @asyncio.coroutine
def handle_connection_task(self, reader, writer): def _handle_connection_task(self, reader, writer):
try: try:
while True: while True:
line = yield from reader.readline() line = yield from reader.readline()
@ -92,12 +161,28 @@ class Server:
class WaitQuit: class WaitQuit:
"""Provides facilities to handle the termination of servers.
Server targets typically inherit from this class, with the method ``quit``
called via RPC.
"""
def __init__(self): def __init__(self):
self.terminate_notify = asyncio.Semaphore(0) self.terminate_notify = asyncio.Semaphore(0)
@asyncio.coroutine @asyncio.coroutine
def wait_quit(self): def wait_quit(self):
"""Waits until the `quit` method is called. This is typically used to
keep the `asyncio` loop running until the server is requested to
terminate.
This method is a `coroutine`.
"""
yield from self.terminate_notify.acquire() yield from self.terminate_notify.acquire()
def quit(self): def quit(self):
"""Quits the server.
"""
self.terminate_notify.release() self.terminate_notify.release()

View File

@ -6,3 +6,9 @@ Management reference
.. automodule:: artiq.management.pyon .. automodule:: artiq.management.pyon
:members: :members:
:mod:`artiq.management.pc_rpc` module
-------------------------------------
.. automodule:: artiq.management.pc_rpc
:members: