From d6ae1733398aa52da2f5e113660c1d256e3be08f Mon Sep 17 00:00:00 2001 From: Simon Renblad Date: Wed, 17 Jan 2024 15:55:55 +0800 Subject: [PATCH] waveform: add proxy clients --- artiq/dashboard/waveform.py | 97 +++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 5 deletions(-) diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index 3bdf439ba..48012b3bc 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -6,6 +6,7 @@ from PyQt5 import QtCore, QtWidgets, QtGui from PyQt5.QtCore import Qt from sipyco.sync_struct import Subscriber +from sipyco.pc_rpc import AsyncioClient from artiq.tools import exc_to_warning from artiq.coredevice import comm_analyzer @@ -17,6 +18,87 @@ from artiq.gui.models import DictSyncTreeSepModel, LocalModelManager logger = logging.getLogger(__name__) +class _BaseProxyClient: + def __init__(self): + self.addr = None + self.port = None + self._reconnect_event = asyncio.Event() + self._reconnect_task = None + + async def start(self): + self._reconnect_task = asyncio.ensure_future( + exc_to_warning(self._reconnect())) + + def update_address(self, addr, port): + self.addr = addr + self.port = port + self._reconnect_event.set() + + async def _reconnect(self): + try: + while True: + await self._reconnect_event.wait() + self._reconnect_event.clear() + try: + await self.disconnect_cr() + except: + logger.error("Error caught when disconnecting proxy client.", exc_info=True) + try: + await self.reconnect_cr() + except Exception: + logger.error( + "Error caught when reconnecting proxy client. Retrying...", exc_info=True) + await asyncio.sleep(5) + self._reconnect_event.set() + except asyncio.CancelledError: + pass + + async def close(self): + try: + self._reconnect_task.cancel() + await asyncio.wait_for(self._reconnect_task, None) + await self.disconnect_cr() + except: + logger.error("Error caught while closing proxy client.", exc_info=True) + + async def reconnect_cr(self): + raise NotImplementedError + + async def disconnect_cr(self): + raise NotImplementedError + + +class RPCProxyClient(_BaseProxyClient): + def __init__(self): + _BaseProxyClient.__init__(self) + self.client = AsyncioClient() + + async def trigger_proxy_task(self): + if self.client.get_rpc_id()[0] is None: + raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") + await self.client.trigger() + + async def reconnect_cr(self): + await self.client.connect_rpc(self.addr, + self.port, + "coreanalyzer_proxy_control") + + async def disconnect_cr(self): + self.client.close_rpc() + + +class ReceiverProxyClient(_BaseProxyClient): + def __init__(self, receiver): + _BaseProxyClient.__init__(self) + self.receiver = receiver + + async def reconnect_cr(self): + await self.receiver.connect(self.addr, self.port) + + async def disconnect_cr(self): + await self.receiver.close() + + class Model(DictSyncTreeSepModel): def __init__(self, init): DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init) @@ -51,10 +133,11 @@ class WaveformDock(QtWidgets.QDockWidget): self._current_dir = os.getcwd() - devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) - - proxy_receiver = comm_analyzer.AnalyzerProxyReceiver( + self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) + self.rpc_client = RPCProxyClient() + receiver = comm_analyzer.AnalyzerProxyReceiver( self.on_dump_receive) + self.receiver_client = ReceiverProxyClient(receiver) grid = LayoutWidget() self.setWidget(grid) @@ -70,6 +153,8 @@ class WaveformDock(QtWidgets.QDockWidget): self._request_dump_btn.setIcon( QtWidgets.QApplication.style().standardIcon( QtWidgets.QStyle.SP_BrowserReload)) + self._request_dump_btn.clicked.connect( + lambda: asyncio.ensure_future(exc_to_warning(self.rpc_client.trigger_proxy_task()))) grid.addWidget(self._request_dump_btn, 0, 1) self._add_btn = QtWidgets.QToolButton() @@ -120,8 +205,10 @@ class WaveformDock(QtWidgets.QDockWidget): desc = self._ddb.get("core_analyzer") if desc is not None: addr = desc["host"] - port = desc.get("port_proxy", 1385) - port_control = desc.get("port_proxy_control", 1386) + port_proxy = desc.get("port_proxy", 1385) + port = desc.get("port", 1386) + self.receiver_client.update_address(addr, port_proxy) + self.rpc_client.update_address(addr, port) def init_ddb(self, ddb): self._ddb = ddb