waveform: add proxy clients

This commit is contained in:
Simon Renblad 2024-01-17 15:55:55 +08:00
parent 9088ffa2ca
commit d6ae173339

View File

@ -6,6 +6,7 @@ from PyQt5 import QtCore, QtWidgets, QtGui
from PyQt5.QtCore import Qt
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import AsyncioClient
from artiq.tools import exc_to_warning
from artiq.coredevice import comm_analyzer
@ -17,6 +18,87 @@ from artiq.gui.models import DictSyncTreeSepModel, LocalModelManager
logger = logging.getLogger(__name__)
class _BaseProxyClient:
def __init__(self):
self.addr = None
self.port = None
self._reconnect_event = asyncio.Event()
self._reconnect_task = None
async def start(self):
self._reconnect_task = asyncio.ensure_future(
exc_to_warning(self._reconnect()))
def update_address(self, addr, port):
self.addr = addr
self.port = port
self._reconnect_event.set()
async def _reconnect(self):
try:
while True:
await self._reconnect_event.wait()
self._reconnect_event.clear()
try:
await self.disconnect_cr()
except:
logger.error("Error caught when disconnecting proxy client.", exc_info=True)
try:
await self.reconnect_cr()
except Exception:
logger.error(
"Error caught when reconnecting proxy client. Retrying...", exc_info=True)
await asyncio.sleep(5)
self._reconnect_event.set()
except asyncio.CancelledError:
pass
async def close(self):
try:
self._reconnect_task.cancel()
await asyncio.wait_for(self._reconnect_task, None)
await self.disconnect_cr()
except:
logger.error("Error caught while closing proxy client.", exc_info=True)
async def reconnect_cr(self):
raise NotImplementedError
async def disconnect_cr(self):
raise NotImplementedError
class RPCProxyClient(_BaseProxyClient):
def __init__(self):
_BaseProxyClient.__init__(self)
self.client = AsyncioClient()
async def trigger_proxy_task(self):
if self.client.get_rpc_id()[0] is None:
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?")
await self.client.trigger()
async def reconnect_cr(self):
await self.client.connect_rpc(self.addr,
self.port,
"coreanalyzer_proxy_control")
async def disconnect_cr(self):
self.client.close_rpc()
class ReceiverProxyClient(_BaseProxyClient):
def __init__(self, receiver):
_BaseProxyClient.__init__(self)
self.receiver = receiver
async def reconnect_cr(self):
await self.receiver.connect(self.addr, self.port)
async def disconnect_cr(self):
await self.receiver.close()
class Model(DictSyncTreeSepModel):
def __init__(self, init):
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
@ -51,10 +133,11 @@ class WaveformDock(QtWidgets.QDockWidget):
self._current_dir = os.getcwd()
devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
proxy_receiver = comm_analyzer.AnalyzerProxyReceiver(
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.rpc_client = RPCProxyClient()
receiver = comm_analyzer.AnalyzerProxyReceiver(
self.on_dump_receive)
self.receiver_client = ReceiverProxyClient(receiver)
grid = LayoutWidget()
self.setWidget(grid)
@ -70,6 +153,8 @@ class WaveformDock(QtWidgets.QDockWidget):
self._request_dump_btn.setIcon(
QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_BrowserReload))
self._request_dump_btn.clicked.connect(
lambda: asyncio.ensure_future(exc_to_warning(self.rpc_client.trigger_proxy_task())))
grid.addWidget(self._request_dump_btn, 0, 1)
self._add_btn = QtWidgets.QToolButton()
@ -120,8 +205,10 @@ class WaveformDock(QtWidgets.QDockWidget):
desc = self._ddb.get("core_analyzer")
if desc is not None:
addr = desc["host"]
port = desc.get("port_proxy", 1385)
port_control = desc.get("port_proxy_control", 1386)
port_proxy = desc.get("port_proxy", 1385)
port = desc.get("port", 1386)
self.receiver_client.update_address(addr, port_proxy)
self.rpc_client.update_address(addr, port)
def init_ddb(self, ddb):
self._ddb = ddb