From d085c1e4a4e93b87d75839e3bce2bd6da5938b9b Mon Sep 17 00:00:00 2001 From: Simon Renblad Date: Fri, 9 Feb 2024 16:03:55 +0800 Subject: [PATCH] waveform, analyzer proxy fix connect errors --- artiq/coredevice/comm_analyzer.py | 40 +++++++++---- artiq/coredevice/core.py | 4 +- artiq/dashboard/waveform.py | 69 +++++++++++++--------- artiq/frontend/aqctl_coreanalyzer_proxy.py | 4 +- 4 files changed, 70 insertions(+), 47 deletions(-) diff --git a/artiq/coredevice/comm_analyzer.py b/artiq/coredevice/comm_analyzer.py index be6993bdd..9a175fc8c 100644 --- a/artiq/coredevice/comm_analyzer.py +++ b/artiq/coredevice/comm_analyzer.py @@ -117,6 +117,8 @@ def decode_dump(data): (sent_bytes, total_byte_count, error_occurred, log_channel, dds_onehot_sel) = parts + logger.debug("analyzer dump has length %d", sent_bytes) + expected_len = sent_bytes + 15 if expected_len != len(data): raise ValueError("analyzer dump has incorrect length " @@ -128,39 +130,47 @@ def decode_dump(data): if total_byte_count > sent_bytes: logger.info("analyzer ring buffer has wrapped %d times", total_byte_count//sent_bytes) + if sent_bytes == 0: + logger.warning("analyzer dump is empty") position = 15 messages = [] for _ in range(sent_bytes//32): messages.append(decode_message(data[position:position+32])) position += 32 + + if len(messages) == 1 and isinstance(messages[0], StoppedMessage): + logger.warning("analyzer dump is empty aside from stop message") + return DecodedDump(log_channel, bool(dds_onehot_sel), messages) # simplified from sipyco broadcast Receiver class AnalyzerProxyReceiver: - def __init__(self, receive_cb): + def __init__(self, receive_cb, disconnect_cb=None): self.receive_cb = receive_cb + self.disconnect_cb = disconnect_cb + self.receive_task = None + self.writer = None async def connect(self, host, port): self.reader, self.writer = \ await keepalive.async_open_connection(host, port) + magic = get_analyzer_magic() try: - self.receive_task = asyncio.ensure_future(self._receive_cr()) + self.receive_task = asyncio.create_task(self._receive_cr()) except: - self.writer.close() - del self.reader - del self.writer + if self.writer is not None: + self.writer.close() + del self.reader + del self.writer raise async def close(self): - try: + self.disconnect_cb = None + if self.receive_task is not None: self.receive_task.cancel() - try: - await asyncio.wait_for(self.receive_task, None) - except asyncio.CancelledError: - pass - finally: + if self.writer is not None: self.writer.close() del self.reader del self.writer @@ -168,11 +178,14 @@ class AnalyzerProxyReceiver: async def _receive_cr(self): try: while True: - endian_byte = await self.reader.readexactly(1) + endian_byte = await self.reader.read(1) if endian_byte == b"E": endian = '>' elif endian_byte == b"e": endian = '<' + elif endian_byte == b"": + # EOF reached, connection lost + return else: raise ValueError payload_length_word = await self.reader.readexactly(4) @@ -186,7 +199,8 @@ class AnalyzerProxyReceiver: data = endian_byte + payload_length_word + remaining_data self.receive_cb(data) finally: - pass + if self.disconnect_cb is not None: + self.disconnect_cb() def vcd_codes(): diff --git a/artiq/coredevice/core.py b/artiq/coredevice/core.py index d92351d57..ae28f98ab 100644 --- a/artiq/coredevice/core.py +++ b/artiq/coredevice/core.py @@ -353,6 +353,4 @@ class Core: if self.analyzer_proxy is None: raise IOError("No analyzer proxy configured") else: - success = self.analyzer_proxy.trigger() - if not success: - raise IOError("Analyzer proxy reported failure") + self.analyzer_proxy.trigger() diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index a20d65a32..f03a12e05 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -37,8 +37,7 @@ class _BaseProxyClient: self._reconnect_task = None async def start(self): - self._reconnect_task = asyncio.ensure_future( - exc_to_warning(self._reconnect())) + self._reconnect_task = asyncio.create_task(self._reconnect()) def update_address(self, addr, port): self.addr = addr @@ -50,32 +49,25 @@ class _BaseProxyClient: while True: await self._reconnect_event.wait() self._reconnect_event.clear() - try: - await self.disconnect_cr() - except: - logger.error("Error caught when disconnecting proxy client", exc_info=True) try: await self.reconnect_cr() - except Exception: - logger.error( - "Error caught when reconnecting proxy client, retrying...", exc_info=True) + except: await asyncio.sleep(5) self._reconnect_event.set() except asyncio.CancelledError: pass async def close(self): - try: - self._reconnect_task.cancel() - await asyncio.wait_for(self._reconnect_task, None) - await self.disconnect_cr() - except: - logger.error("Error caught while closing proxy client", exc_info=True) + self._reconnect_task.cancel() + await self.close_cr() + + def reconnect(self): + self._reconnect_event.set() async def reconnect_cr(self): raise NotImplementedError - async def disconnect_cr(self): + async def close_cr(self): raise NotImplementedError @@ -85,17 +77,26 @@ class RPCProxyClient(_BaseProxyClient): self.client = AsyncioClient() async def trigger_proxy_task(self): - if self.client.get_rpc_id()[0] is None: - raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") - await self.client.trigger() + try: + await self.client.trigger() + except: + logger.error("analyzer proxy reported failure", exc_info=True) async def reconnect_cr(self): - await self.client.connect_rpc(self.addr, - self.port, - "coreanalyzer_proxy_control") + try: + await self.client.connect_rpc(self.addr, + self.port, + "coreanalyzer_proxy_control") + logger.info("connected to analyzer proxy control %s:%d", self.addr, self.port) + except: + logger.error("error connecting to analyzer proxy control", exc_info=True) + raise - async def disconnect_cr(self): - self.client.close_rpc() + async def close_cr(self): + try: + self.client.close_rpc() + except: + logger.error("error closing connection with analyzer proxy control", exc_info=True) class ReceiverProxyClient(_BaseProxyClient): @@ -104,10 +105,18 @@ class ReceiverProxyClient(_BaseProxyClient): self.receiver = receiver async def reconnect_cr(self): - await self.receiver.connect(self.addr, self.port) + try: + await self.receiver.connect(self.addr, self.port) + logger.info("listening to analyzer proxy %s:%d", self.addr, self.port) + except: + logger.error("error connecting to analyzer proxy", exc_info=True) + raise - async def disconnect_cr(self): - await self.receiver.close() + async def close_cr(self): + try: + await self.receiver.close() + except: + logger.error("error closing connection to analyzer proxy", exc_info=True) class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget): @@ -707,7 +716,7 @@ class WaveformDock(QtWidgets.QDockWidget): self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) self.rpc_client = RPCProxyClient() receiver = comm_analyzer.AnalyzerProxyReceiver( - self.on_dump_receive) + self.on_dump_receive, self.on_proxy_disconnect) self.receiver_client = ReceiverProxyClient(receiver) grid = LayoutWidget() @@ -788,6 +797,10 @@ class WaveformDock(QtWidgets.QDockWidget): self._waveform_view.setTimescale(self._waveform_data['timescale']) self._cursor_control.setTimescale(self._waveform_data['timescale']) + def on_proxy_disconnect(self): + self.receiver_client.reconnect() + self.rpc_client.reconnect() + async def load_trace(self): try: filename = await get_open_file_name( diff --git a/artiq/frontend/aqctl_coreanalyzer_proxy.py b/artiq/frontend/aqctl_coreanalyzer_proxy.py index ec9891423..8a2bee56e 100755 --- a/artiq/frontend/aqctl_coreanalyzer_proxy.py +++ b/artiq/frontend/aqctl_coreanalyzer_proxy.py @@ -60,9 +60,7 @@ class ProxyControl: self.distribute_cb(dump) except: logger.warning("Trigger failed:", exc_info=True) - return False - else: - return True + raise def get_argparser():