From a7405dea1c4851ae4e2889f8817ec0bbbc788254 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 12 Feb 2015 00:14:20 +0800 Subject: [PATCH] protocols/pc_rpc: add best effort client --- artiq/protocols/pc_rpc.py | 97 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py index 4f852518a..330ab7972 100644 --- a/artiq/protocols/pc_rpc.py +++ b/artiq/protocols/pc_rpc.py @@ -15,6 +15,8 @@ client's list. import socket import asyncio import traceback +import threading +import time from artiq.protocols import pyon from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer @@ -235,6 +237,101 @@ class AsyncioClient: return proxy +class BestEffortClient: + def __init__(self, host, port, target_name, + firstcon_timeout=0.5, retry=5.0): + self.__host = host + self.__port = port + self.__target_name = target_name + self.__retry = retry + + self.__conretry_terminate = False + self.__socket = None + try: + self.__coninit(firstcon_timeout) + except: + self.__start_conretry() + else: + self.__conretry_thread = None + + def __coninit(self, timeout): + if timeout is None: + self.__socket = socket.create_connection( + (self.__host, self.__port)) + else: + self.__socket = socket.create_connection( + (self.__host, self.__port), timeout) + self.__socket.sendall(_init_string) + server_identification = self.__recv() + if self.__target_name not in server_identification["targets"]: + raise IncompatibleServer + self.__socket.sendall((self.__target_name + "\n").encode()) + + def __start_conretry(self): + self.__conretry_thread = threading.Thread(target=self.__conretry) + self.__conretry_thread.start() + + def __conretry(self): + while True: + try: + self.__coninit(None) + except: + if self.__conretry_terminate: + break + time.sleep(self.__retry) + else: + break + if self.__conretry_terminate and self.__socket is not None: + self.__socket.close() + # must be after __socket.close() to avoid race condition + self.__conretry_thread = None + + def close_rpc(self): + if self.__conretry_thread is None: + if self.__socket is not None: + self.__socket.close() + else: + # Let the thread complete I/O and then do the socket closing. + # Python fails to provide a way to cancel threads... + self.__conretry_terminate = True + + def __send(self, obj): + line = pyon.encode(obj) + "\n" + self.__socket.sendall(line.encode()) + + def __recv(self): + buf = self.__socket.recv(4096).decode() + while "\n" not in buf: + more = self.__socket.recv(4096) + if not more: + break + buf += more.decode() + return pyon.decode(buf) + + def __do_rpc(self, name, args, kwargs): + if self.__conretry_thread is not None: + return None + + obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} + try: + self.__send(obj) + obj = self.__recv() + except: + self.__start_conretry() + else: + if obj["status"] == "ok": + return obj["ret"] + elif obj["status"] == "failed": + raise RemoteError(obj["message"]) + else: + raise ValueError + + def __getattr__(self, name): + def proxy(*args, **kwargs): + return self.__do_rpc(name, args, kwargs) + return proxy + + class Server(_AsyncioServer): """This class creates a TCP server that handles requests coming from ``Client`` objects.