comm_analyzer: add AnalyzerProxyReceiver

This commit is contained in:
Simon Renblad 2023-12-11 11:43:27 +08:00
parent 669edf17c5
commit 9a11af03b4

View File

@ -2,6 +2,8 @@ from operator import itemgetter
from collections import namedtuple
from itertools import count
from contextlib import contextmanager
from sipyco import keepalive
import asyncio
from enum import Enum
import struct
import logging
@ -131,6 +133,58 @@ def decode_dump(data):
return DecodedDump(log_channel, bool(dds_onehot_sel), messages)
# simplified from sipyco broadcast Receiver
class AnalyzerProxyReceiver:
def __init__(self, receive_cb):
self.receive_cb = receive_cb
async def connect(self, host, port):
self.reader, self.writer = \
await keepalive.async_open_connection(host, port)
try:
self.receive_task = asyncio.ensure_future(self._receive_cr())
except:
self.writer.close()
del self.reader
del self.writer
raise
async def close(self):
try:
self.receive_task.cancel()
try:
await asyncio.wait_for(self.receive_task, None)
except asyncio.CancelledError:
pass
finally:
self.writer.close()
del self.reader
del self.writer
async def _receive_cr(self):
try:
while True:
endian_byte = await self.reader.readexactly(1)
if endian_byte == b"E":
endian = '>'
elif endian_byte == b"e":
endian = '<'
else:
raise ValueError
payload_length_word = await self.reader.readexactly(4)
payload_length = struct.unpack(endian + "I", payload_length_word)[0]
if payload_length > 10 * 512 * 1024:
# 10x buffer size of firmware
raise ValueError
# The remaining header length is 11 bytes.
remaining_data = await self.reader.readexactly(payload_length + 11)
data = endian_byte + payload_length_word + remaining_data
self.receive_cb(data)
finally:
pass
def vcd_codes():
codechars = [chr(i) for i in range(33, 127)]
for n in count():