protocols/pc_rpc: add best effort client

This commit is contained in:
Sebastien Bourdeauducq 2015-02-12 00:14:20 +08:00
parent c91705c5d1
commit a7405dea1c

View File

@ -15,6 +15,8 @@ client's list.
import socket
import asyncio
import traceback
import threading
import time
from artiq.protocols import pyon
from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer
@ -235,6 +237,101 @@ class AsyncioClient:
return proxy
class BestEffortClient:
def __init__(self, host, port, target_name,
firstcon_timeout=0.5, retry=5.0):
self.__host = host
self.__port = port
self.__target_name = target_name
self.__retry = retry
self.__conretry_terminate = False
self.__socket = None
try:
self.__coninit(firstcon_timeout)
except:
self.__start_conretry()
else:
self.__conretry_thread = None
def __coninit(self, timeout):
if timeout is None:
self.__socket = socket.create_connection(
(self.__host, self.__port))
else:
self.__socket = socket.create_connection(
(self.__host, self.__port), timeout)
self.__socket.sendall(_init_string)
server_identification = self.__recv()
if self.__target_name not in server_identification["targets"]:
raise IncompatibleServer
self.__socket.sendall((self.__target_name + "\n").encode())
def __start_conretry(self):
self.__conretry_thread = threading.Thread(target=self.__conretry)
self.__conretry_thread.start()
def __conretry(self):
while True:
try:
self.__coninit(None)
except:
if self.__conretry_terminate:
break
time.sleep(self.__retry)
else:
break
if self.__conretry_terminate and self.__socket is not None:
self.__socket.close()
# must be after __socket.close() to avoid race condition
self.__conretry_thread = None
def close_rpc(self):
if self.__conretry_thread is None:
if self.__socket is not None:
self.__socket.close()
else:
# Let the thread complete I/O and then do the socket closing.
# Python fails to provide a way to cancel threads...
self.__conretry_terminate = True
def __send(self, obj):
line = pyon.encode(obj) + "\n"
self.__socket.sendall(line.encode())
def __recv(self):
buf = self.__socket.recv(4096).decode()
while "\n" not in buf:
more = self.__socket.recv(4096)
if not more:
break
buf += more.decode()
return pyon.decode(buf)
def __do_rpc(self, name, args, kwargs):
if self.__conretry_thread is not None:
return None
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
try:
self.__send(obj)
obj = self.__recv()
except:
self.__start_conretry()
else:
if obj["status"] == "ok":
return obj["ret"]
elif obj["status"] == "failed":
raise RemoteError(obj["message"])
else:
raise ValueError
def __getattr__(self, name):
def proxy(*args, **kwargs):
return self.__do_rpc(name, args, kwargs)
return proxy
class Server(_AsyncioServer):
"""This class creates a TCP server that handles requests coming from
``Client`` objects.