forked from M-Labs/artiq
1
0
Fork 0

waveform, analyzer proxy fix connect errors

This commit is contained in:
Simon Renblad 2024-02-09 16:03:55 +08:00 committed by Sébastien Bourdeauducq
parent 720cbb4490
commit d085c1e4a4
4 changed files with 70 additions and 47 deletions

View File

@ -117,6 +117,8 @@ def decode_dump(data):
(sent_bytes, total_byte_count, (sent_bytes, total_byte_count,
error_occurred, log_channel, dds_onehot_sel) = parts error_occurred, log_channel, dds_onehot_sel) = parts
logger.debug("analyzer dump has length %d", sent_bytes)
expected_len = sent_bytes + 15 expected_len = sent_bytes + 15
if expected_len != len(data): if expected_len != len(data):
raise ValueError("analyzer dump has incorrect length " raise ValueError("analyzer dump has incorrect length "
@ -128,39 +130,47 @@ def decode_dump(data):
if total_byte_count > sent_bytes: if total_byte_count > sent_bytes:
logger.info("analyzer ring buffer has wrapped %d times", logger.info("analyzer ring buffer has wrapped %d times",
total_byte_count//sent_bytes) total_byte_count//sent_bytes)
if sent_bytes == 0:
logger.warning("analyzer dump is empty")
position = 15 position = 15
messages = [] messages = []
for _ in range(sent_bytes//32): for _ in range(sent_bytes//32):
messages.append(decode_message(data[position:position+32])) messages.append(decode_message(data[position:position+32]))
position += 32 position += 32
if len(messages) == 1 and isinstance(messages[0], StoppedMessage):
logger.warning("analyzer dump is empty aside from stop message")
return DecodedDump(log_channel, bool(dds_onehot_sel), messages) return DecodedDump(log_channel, bool(dds_onehot_sel), messages)
# simplified from sipyco broadcast Receiver # simplified from sipyco broadcast Receiver
class AnalyzerProxyReceiver: class AnalyzerProxyReceiver:
def __init__(self, receive_cb): def __init__(self, receive_cb, disconnect_cb=None):
self.receive_cb = receive_cb self.receive_cb = receive_cb
self.disconnect_cb = disconnect_cb
self.receive_task = None
self.writer = None
async def connect(self, host, port): async def connect(self, host, port):
self.reader, self.writer = \ self.reader, self.writer = \
await keepalive.async_open_connection(host, port) await keepalive.async_open_connection(host, port)
magic = get_analyzer_magic()
try: try:
self.receive_task = asyncio.ensure_future(self._receive_cr()) self.receive_task = asyncio.create_task(self._receive_cr())
except: except:
self.writer.close() if self.writer is not None:
del self.reader self.writer.close()
del self.writer del self.reader
del self.writer
raise raise
async def close(self): async def close(self):
try: self.disconnect_cb = None
if self.receive_task is not None:
self.receive_task.cancel() self.receive_task.cancel()
try: if self.writer is not None:
await asyncio.wait_for(self.receive_task, None)
except asyncio.CancelledError:
pass
finally:
self.writer.close() self.writer.close()
del self.reader del self.reader
del self.writer del self.writer
@ -168,11 +178,14 @@ class AnalyzerProxyReceiver:
async def _receive_cr(self): async def _receive_cr(self):
try: try:
while True: while True:
endian_byte = await self.reader.readexactly(1) endian_byte = await self.reader.read(1)
if endian_byte == b"E": if endian_byte == b"E":
endian = '>' endian = '>'
elif endian_byte == b"e": elif endian_byte == b"e":
endian = '<' endian = '<'
elif endian_byte == b"":
# EOF reached, connection lost
return
else: else:
raise ValueError raise ValueError
payload_length_word = await self.reader.readexactly(4) payload_length_word = await self.reader.readexactly(4)
@ -186,7 +199,8 @@ class AnalyzerProxyReceiver:
data = endian_byte + payload_length_word + remaining_data data = endian_byte + payload_length_word + remaining_data
self.receive_cb(data) self.receive_cb(data)
finally: finally:
pass if self.disconnect_cb is not None:
self.disconnect_cb()
def vcd_codes(): def vcd_codes():

View File

@ -353,6 +353,4 @@ class Core:
if self.analyzer_proxy is None: if self.analyzer_proxy is None:
raise IOError("No analyzer proxy configured") raise IOError("No analyzer proxy configured")
else: else:
success = self.analyzer_proxy.trigger() self.analyzer_proxy.trigger()
if not success:
raise IOError("Analyzer proxy reported failure")

View File

@ -37,8 +37,7 @@ class _BaseProxyClient:
self._reconnect_task = None self._reconnect_task = None
async def start(self): async def start(self):
self._reconnect_task = asyncio.ensure_future( self._reconnect_task = asyncio.create_task(self._reconnect())
exc_to_warning(self._reconnect()))
def update_address(self, addr, port): def update_address(self, addr, port):
self.addr = addr self.addr = addr
@ -50,32 +49,25 @@ class _BaseProxyClient:
while True: while True:
await self._reconnect_event.wait() await self._reconnect_event.wait()
self._reconnect_event.clear() self._reconnect_event.clear()
try:
await self.disconnect_cr()
except:
logger.error("Error caught when disconnecting proxy client", exc_info=True)
try: try:
await self.reconnect_cr() await self.reconnect_cr()
except Exception: except:
logger.error(
"Error caught when reconnecting proxy client, retrying...", exc_info=True)
await asyncio.sleep(5) await asyncio.sleep(5)
self._reconnect_event.set() self._reconnect_event.set()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
async def close(self): async def close(self):
try: self._reconnect_task.cancel()
self._reconnect_task.cancel() await self.close_cr()
await asyncio.wait_for(self._reconnect_task, None)
await self.disconnect_cr() def reconnect(self):
except: self._reconnect_event.set()
logger.error("Error caught while closing proxy client", exc_info=True)
async def reconnect_cr(self): async def reconnect_cr(self):
raise NotImplementedError raise NotImplementedError
async def disconnect_cr(self): async def close_cr(self):
raise NotImplementedError raise NotImplementedError
@ -85,17 +77,26 @@ class RPCProxyClient(_BaseProxyClient):
self.client = AsyncioClient() self.client = AsyncioClient()
async def trigger_proxy_task(self): async def trigger_proxy_task(self):
if self.client.get_rpc_id()[0] is None: try:
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?") await self.client.trigger()
await self.client.trigger() except:
logger.error("analyzer proxy reported failure", exc_info=True)
async def reconnect_cr(self): async def reconnect_cr(self):
await self.client.connect_rpc(self.addr, try:
self.port, await self.client.connect_rpc(self.addr,
"coreanalyzer_proxy_control") self.port,
"coreanalyzer_proxy_control")
logger.info("connected to analyzer proxy control %s:%d", self.addr, self.port)
except:
logger.error("error connecting to analyzer proxy control", exc_info=True)
raise
async def disconnect_cr(self): async def close_cr(self):
self.client.close_rpc() try:
self.client.close_rpc()
except:
logger.error("error closing connection with analyzer proxy control", exc_info=True)
class ReceiverProxyClient(_BaseProxyClient): class ReceiverProxyClient(_BaseProxyClient):
@ -104,10 +105,18 @@ class ReceiverProxyClient(_BaseProxyClient):
self.receiver = receiver self.receiver = receiver
async def reconnect_cr(self): async def reconnect_cr(self):
await self.receiver.connect(self.addr, self.port) try:
await self.receiver.connect(self.addr, self.port)
logger.info("listening to analyzer proxy %s:%d", self.addr, self.port)
except:
logger.error("error connecting to analyzer proxy", exc_info=True)
raise
async def disconnect_cr(self): async def close_cr(self):
await self.receiver.close() try:
await self.receiver.close()
except:
logger.error("error closing connection to analyzer proxy", exc_info=True)
class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget): class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget):
@ -707,7 +716,7 @@ class WaveformDock(QtWidgets.QDockWidget):
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.rpc_client = RPCProxyClient() self.rpc_client = RPCProxyClient()
receiver = comm_analyzer.AnalyzerProxyReceiver( receiver = comm_analyzer.AnalyzerProxyReceiver(
self.on_dump_receive) self.on_dump_receive, self.on_proxy_disconnect)
self.receiver_client = ReceiverProxyClient(receiver) self.receiver_client = ReceiverProxyClient(receiver)
grid = LayoutWidget() grid = LayoutWidget()
@ -788,6 +797,10 @@ class WaveformDock(QtWidgets.QDockWidget):
self._waveform_view.setTimescale(self._waveform_data['timescale']) self._waveform_view.setTimescale(self._waveform_data['timescale'])
self._cursor_control.setTimescale(self._waveform_data['timescale']) self._cursor_control.setTimescale(self._waveform_data['timescale'])
def on_proxy_disconnect(self):
self.receiver_client.reconnect()
self.rpc_client.reconnect()
async def load_trace(self): async def load_trace(self):
try: try:
filename = await get_open_file_name( filename = await get_open_file_name(

View File

@ -60,9 +60,7 @@ class ProxyControl:
self.distribute_cb(dump) self.distribute_cb(dump)
except: except:
logger.warning("Trigger failed:", exc_info=True) logger.warning("Trigger failed:", exc_info=True)
return False raise
else:
return True
def get_argparser(): def get_argparser():