diff --git a/artiq/coredevice/comm_analyzer.py b/artiq/coredevice/comm_analyzer.py index 3e3fcafba..84fe7cbfd 100644 --- a/artiq/coredevice/comm_analyzer.py +++ b/artiq/coredevice/comm_analyzer.py @@ -2,6 +2,8 @@ from operator import itemgetter from collections import namedtuple from itertools import count from contextlib import contextmanager +from sipyco import keepalive +import asyncio from enum import Enum import struct import logging @@ -131,6 +133,58 @@ def decode_dump(data): return DecodedDump(log_channel, bool(dds_onehot_sel), messages) +# simplified from sipyco broadcast Receiver +class AnalyzerProxyReceiver: + def __init__(self, receive_cb): + self.receive_cb = receive_cb + + async def connect(self, host, port): + self.reader, self.writer = \ + await keepalive.async_open_connection(host, port) + try: + self.receive_task = asyncio.ensure_future(self._receive_cr()) + except: + self.writer.close() + del self.reader + del self.writer + raise + + async def close(self): + try: + self.receive_task.cancel() + try: + await asyncio.wait_for(self.receive_task, None) + except asyncio.CancelledError: + pass + finally: + self.writer.close() + del self.reader + del self.writer + + async def _receive_cr(self): + try: + while True: + endian_byte = await self.reader.readexactly(1) + if endian_byte == b"E": + endian = '>' + elif endian_byte == b"e": + endian = '<' + else: + raise ValueError + payload_length_word = await self.reader.readexactly(4) + payload_length = struct.unpack(endian + "I", payload_length_word)[0] + if payload_length > 10 * 512 * 1024: + # 10x buffer size of firmware + raise ValueError + + # The remaining header length is 11 bytes. + remaining_data = await self.reader.readexactly(payload_length + 11) + data = endian_byte + payload_length_word + remaining_data + self.receive_cb(data) + finally: + pass + + def vcd_codes(): codechars = [chr(i) for i in range(33, 127)] for n in count():