diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index 581c45e01..3ca3f4ab5 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -1,6 +1,8 @@ from PyQt5 import QtCore, QtWidgets, QtGui from PyQt5.QtCore import Qt +from sipyco.pc_rpc import AsyncioClient + from artiq.tools import exc_to_warning from artiq.gui.models import DictSyncTreeSepModel, LocalModelManager from artiq.gui.dndwidgets import DragDropSplitter, VDragScrollArea @@ -524,3 +526,99 @@ class WaveformArea(QtWidgets.QWidget): for i in range(self._splitter.count()): cw = self._splitter.widget(i) cw.set_cursor_visible(self._cursor_visible) + + +class WaveformProxyClient: + def __init__(self, state, loop): + self._state = state + self._loop = loop + + self.devices_sub = None + self.rpc_client = AsyncioClient() + self.proxy_receiver = None + + self._proxy_addr = None + self._proxy_port = None + self._proxy_port_ctl = None + self._on_sub_reconnect = asyncio.Event() + self._on_rpc_reconnect = asyncio.Event() + self._reconnect_rpc_task = None + self._reconnect_receiver_task = None + + async def trigger_proxy_task(self): + try: + if self.rpc_client.get_rpc_id()[0] is None: + raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") + asyncio.ensure_future(self.rpc_client.trigger()) + except Exception as e: + logger.warning("Failed to pull from device: %s", e) + + def update_address(self, addr, port, port_control): + self._proxy_addr = addr + self._proxy_port = port + self._proxy_port_ctl = port_control + self._on_rpc_reconnect.set() + self._on_sub_reconnect.set() + + # Proxy client connections + async def start(self, server, port): + try: + await self.devices_sub.connect(server, port) + self._reconnect_rpc_task = asyncio.ensure_future( + self.reconnect_rpc(), loop=self._loop) + self._reconnect_receiver_task = asyncio.ensure_future( + self.reconnect_receiver(), loop=self._loop) + except Exception as e: + logger.error("Failed to connect to master: %s", e) + + async def reconnect_rpc(self): + try: + while True: + await self._on_rpc_reconnect.wait() + self._on_rpc_reconnect.clear() + logger.info("Attempting analyzer proxy RPC connection...") + try: + await self.rpc_client.connect_rpc(self._proxy_addr, + self._proxy_port_ctl, + "coreanalyzer_proxy_control") + except Exception: + logger.info("Analyzer proxy RPC timed out, trying again...") + await asyncio.sleep(5) + self._on_rpc_reconnect.set() + else: + logger.info("RPC connected to analyzer proxy on %s/%s", + self._proxy_addr, self._proxy_port_ctl) + except asyncio.CancelledError: + pass + + async def reconnect_receiver(self): + try: + while True: + await self._on_sub_reconnect.wait() + self._on_sub_reconnect.clear() + logger.info("Setting up analyzer proxy receiver...") + try: + await self.proxy_receiver.connect( + self._proxy_addr, self._proxy_port) + except Exception: + logger.info("Failed to set up analyzer proxy receiver, reconnecting...") + await asyncio.sleep(5) + self._on_sub_reconnect.set() + else: + logger.info("Receiving from analyzer proxy on %s:%s", + self._proxy_addr, self._proxy_port) + except asyncio.CancelledError: + pass + + async def stop(self): + try: + self._reconnect_rpc_task.cancel() + self._reconnect_receiver_task.cancel() + await asyncio.wait_for(self._reconnect_rpc_task, None) + await asyncio.wait_for(self._reconnect_receiver_task, None) + await self.devices_sub.close() + self.rpc_client.close_rpc() + await self.proxy_receiver.close() + except Exception as e: + logger.error("Error occurred while closing proxy connections: %s", + e, exc_info=True)