From 70542b0a5b3a15c598dfd3f7836708ba04144c35 Mon Sep 17 00:00:00 2001 From: Simon Renblad Date: Tue, 16 Jan 2024 16:19:12 +0800 Subject: [PATCH] waveform: major refactor to proxy connectors --- artiq/dashboard/waveform.py | 217 +++++++++++++++++------------------- 1 file changed, 100 insertions(+), 117 deletions(-) diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index 9f1d58a1a..5f702efeb 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -1,6 +1,7 @@ from PyQt5 import QtCore, QtWidgets, QtGui from PyQt5.QtCore import Qt +from sipyco.asyncio_tools import atexit_register_coroutine from sipyco.sync_struct import Subscriber from sipyco.pc_rpc import AsyncioClient from sipyco import pyon @@ -88,7 +89,7 @@ class _AddChannelDialog(QtWidgets.QDialog): self.accepted.emit(channels) self.close() - +# TODO: make private class BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget): def __init__(self, parent, rect): pg.GraphicsWidget.__init__(self, parent) @@ -278,14 +279,13 @@ class BitWaveform(Waveform): display_x = [] previous_y = None for x, y in zip(self.x_data, self.y_data): - state_unchanged = previous_y == y - if y == "X": + if y == "X": # TODO: replace with dictionary mapping dis_y = DISPLAY_MID elif y == "1": dis_y = DISPLAY_HIGH else: dis_y = DISPLAY_LOW - if state_unchanged: + if previous_y == y: # TODO: extract to separate function arw = pg.ArrowItem(pxMode=True, angle=90) self.addItem(arw) self._arrows.append(arw) @@ -309,7 +309,7 @@ class BitVectorWaveform(Waveform): Waveform.__init__(self, channel, state, parent) self._labels = [] hx = math.ceil(self.width / 4) - self._format_string = "{:0=" + str(hx) + "X}" + self._format_string = "{:0=" + str(hx) + "X}" # TODO: change method.. self.view_box.sigTransformChanged.connect(self._update_labels) def _update_labels(self): @@ -339,7 +339,7 @@ class BitVectorWaveform(Waveform): for x, y in zip(self.x_data, self.y_data): display_x.append(x) display_y.append(DISPLAY_LOW) - if "X" in y: + if "X" in y: # TODO change to using a dictionary display_x.append(x) display_y.append(DISPLAY_MID) elif int(y) != 0: @@ -358,7 +358,7 @@ class BitVectorWaveform(Waveform): self.plot_data_item.setData(x=[], y=[]) def format_cursor_label(self): - if "X" in self.cursor_y: + if "X" in self.cursor_y: # TODO: this will not happen in any current implementation of the bit vector handlers.. lbl = self.cursor_y else: lbl = self._format_string.format(int(self.cursor_y, 2)) @@ -368,14 +368,17 @@ class BitVectorWaveform(Waveform): class AnalogWaveform(Waveform): def __init__(self, channel, state, parent=None): Waveform.__init__(self, channel, state, parent) - self.plot_data_item.setDownsampling(ds=10, method="peak", auto=True) + self.plot_data_item.setDownsampling(ds=10, method="peak", auto=True) + # TODO: experiment with downsampling values for best performance + # TODO: potentially switch to not using a step connect def extract_data_from_state(self): try: self.x_data, self.y_data = zip(*self.state['data'][self.name]) except: logger.debug('Error caught when loading waveform data: {}'.format(self.name), exc_info=True) - + + # TODO: change to using max_, min_ def display(self): try: self.plot_data_item.setData(x=self.x_data, y=self.y_data) @@ -386,8 +389,9 @@ class AnalogWaveform(Waveform): logger.debug('Error caught when displaying waveform: {}'.format(self.name), exc_info=True) self.plot_data_item.setData(x=[0], y=[0]) + # TODO: can also just leave it as None -> simpler def format_cursor_label(self): - if self.cursor_y is None: + if self.cursor_y is None: lbl = "nan" else: lbl = str(self.cursor_y) @@ -433,6 +437,7 @@ class WaveformArea(QtWidgets.QWidget): scroll_area.setFrameShape(QtWidgets.QFrame.NoFrame) layout.addWidget(scroll_area) + # TODO: name changed to VDragDropSplitter self._splitter = DragDropSplitter(parent=scroll_area) self._splitter.setHandleWidth(1) scroll_area.setWidget(self._splitter) @@ -518,7 +523,7 @@ class WaveformArea(QtWidgets.QWidget): cw.display() cw.on_cursor_move(self._cursor_x_pos) cw.update_x_max() - maximum = self._state["stopped_x"] + maximum = self._state["stopped_x"] # TODO: change maximum -> stopped_x self._ref_axis.setLimits(xMax=maximum) if maximum is not None: self._ref_axis.setRange(xRange=(0, maximum)) @@ -536,102 +541,7 @@ class WaveformArea(QtWidgets.QWidget): cw.set_cursor_visible(self._cursor_visible) -class WaveformProxyClient: - def __init__(self, state, loop): - self._state = state - self._loop = loop - - self.devices_sub = None - self.rpc_client = AsyncioClient() - self.proxy_receiver = None - - self._proxy_addr = None - self._proxy_port = None - self._proxy_port_ctl = None - self._on_sub_reconnect = asyncio.Event() - self._on_rpc_reconnect = asyncio.Event() - self._reconnect_rpc_task = None - self._reconnect_receiver_task = None - - async def trigger_proxy_task(self): - try: - if self.rpc_client.get_rpc_id()[0] is None: - raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") - asyncio.ensure_future(self.rpc_client.trigger()) - except Exception as e: - logger.warning("Failed to pull from device: %s", e) - - def update_address(self, addr, port, port_control): - self._proxy_addr = addr - self._proxy_port = port - self._proxy_port_ctl = port_control - self._on_rpc_reconnect.set() - self._on_sub_reconnect.set() - - # Proxy client connections - async def start(self, server, port): - try: - await self.devices_sub.connect(server, port) - self._reconnect_rpc_task = asyncio.ensure_future( - self.reconnect_rpc(), loop=self._loop) - self._reconnect_receiver_task = asyncio.ensure_future( - self.reconnect_receiver(), loop=self._loop) - except Exception as e: - logger.error("Failed to connect to master: %s", e) - - async def reconnect_rpc(self): - try: - while True: - await self._on_rpc_reconnect.wait() - self._on_rpc_reconnect.clear() - logger.info("Attempting analyzer proxy RPC connection...") - try: - await self.rpc_client.connect_rpc(self._proxy_addr, - self._proxy_port_ctl, - "coreanalyzer_proxy_control") - except Exception: - logger.info("Analyzer proxy RPC timed out, trying again...") - await asyncio.sleep(5) - self._on_rpc_reconnect.set() - else: - logger.info("RPC connected to analyzer proxy on %s/%s", - self._proxy_addr, self._proxy_port_ctl) - except asyncio.CancelledError: - pass - - async def reconnect_receiver(self): - try: - while True: - await self._on_sub_reconnect.wait() - self._on_sub_reconnect.clear() - logger.info("Setting up analyzer proxy receiver...") - try: - await self.proxy_receiver.connect( - self._proxy_addr, self._proxy_port) - except Exception: - logger.info("Failed to set up analyzer proxy receiver, reconnecting...") - await asyncio.sleep(5) - self._on_sub_reconnect.set() - else: - logger.info("Receiving from analyzer proxy on %s:%s", - self._proxy_addr, self._proxy_port) - except asyncio.CancelledError: - pass - - async def stop(self): - try: - self._reconnect_rpc_task.cancel() - self._reconnect_receiver_task.cancel() - await asyncio.wait_for(self._reconnect_rpc_task, None) - await asyncio.wait_for(self._reconnect_receiver_task, None) - await self.devices_sub.close() - self.rpc_client.close_rpc() - await self.proxy_receiver.close() - except Exception as e: - logger.error("Error occurred while closing proxy connections: %s", - e, exc_info=True) - - +# TODO: move elsewhere -> this can be a late addition class _CursorTimeControl(QtWidgets.QLineEdit): submit = QtCore.pyqtSignal(float) PRECISION = 15 @@ -663,6 +573,80 @@ class _CursorTimeControl(QtWidgets.QLineEdit): self.clearFocus() +class _BaseProxyClient: + def __init__(self): + self.addr = None + self.port = None + self._reconnect_event = asyncio.Event() + self._reconnect_task = None + + async def start(self): + self._reconnect_task = asyncio.ensure_future( + exc_to_warning(self._reconnect())) + + def update_address(self, addr, port): + self.addr = addr + self.port = port + self._reconnect_event.set() + + async def _reconnect(self): + try: + while True: + await self._reconnect_event.wait() + self._reconnect_event.clear() + try: + await self.reconnect_cr() + except Exception: + await asyncio.sleep(5) + self._reconnect_event.set() + except asyncio.CancelledError: + pass + + async def close(self): + try: + self._reconnect_task.cancel() + await asyncio.wait_for(self._reconnect_task, None) + await self.disconnect_cr() + except: + logger.error("Error caught while closing proxy client.", exc_info=True) + + async def reconnect_cr(self): + raise NotImplementedError + + async def disconnect_cr(self): + raise NotImplementedError + + +class RPCProxyClient(_BaseProxyClient): + def __init__(self): + _BaseProxyClient.__init__(self) + self.client = AsyncioClient() + + async def trigger_proxy_task(self): + if self.client.get_rpc_id()[0] is None: + raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") + await self.client.trigger() + + async def reconnect_cr(self): + await self.client.connect_rpc(self.addr, + self.port, + "coreanalyzer_proxy_control") + + async def disconnect_cr(self): + self.client.close_rpc() + + +class ReceiverProxyClient(_BaseProxyClient): + def __init__(self, receiver): + _BaseProxyClient.__init__(self) + self.receiver = receiver + + async def reconnect_cr(self): + await self.receiver.connect(self.addr, self.port) + + async def disconnect_cr(self): + await self.receiver.close() + class WaveformDock(QtWidgets.QDockWidget): traceDataChanged = QtCore.pyqtSignal() @@ -685,14 +669,12 @@ class WaveformDock(QtWidgets.QDockWidget): } self._current_dir = os.getcwd() - - self.proxy_client = WaveformProxyClient(self._state, loop) - devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) - - proxy_receiver = comm_analyzer.AnalyzerProxyReceiver( + + self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) + self.rpc_client = RPCProxyClient() + receiver = comm_analyzer.AnalyzerProxyReceiver( self.on_dump_receive) - self.proxy_client.devices_sub = devices_sub - self.proxy_client.proxy_receiver = proxy_receiver + self.receiver_client = ReceiverProxyClient(receiver) grid = LayoutWidget() self.setWidget(grid) @@ -710,7 +692,7 @@ class WaveformDock(QtWidgets.QDockWidget): QtWidgets.QStyle.SP_BrowserReload)) grid.addWidget(self._request_dump_btn, 0, 1) self._request_dump_btn.clicked.connect( - lambda: asyncio.ensure_future(self.proxy_client.trigger_proxy_task())) + lambda: asyncio.ensure_future(self.rpc_client.trigger_proxy_task())) self._waveform_area = WaveformArea(self, self._state, self._channel_model) @@ -748,7 +730,7 @@ class WaveformDock(QtWidgets.QDockWidget): def _update_log_channels(self): for log in self._state['logs']: - self._channel_model[log] = (0, "log") + self._channel_model[log] = (0, WaveformType.LOG) def on_dump_receive(self, data): decoded_dump = comm_analyzer.decode_dump(data) @@ -858,7 +840,8 @@ class WaveformDock(QtWidgets.QDockWidget): addr = desc["host"] port = desc.get("port_proxy", 1385) port_control = desc.get("port_proxy_control", 1386) - self.proxy_client.update_address(addr, port, port_control) + self.rpc_client.update_address(addr, port_control) + self.receiver_client.update_address(addr, port) def init_ddb(self, ddb): logger.info("init ddb")