mirror of https://github.com/m-labs/artiq.git
comm_analyzer: add AnalyzerProxyReceiver
This commit is contained in:
parent
da15e94c22
commit
64567bc26f
|
@ -2,6 +2,8 @@ from operator import itemgetter
|
|||
from collections import namedtuple
|
||||
from itertools import count
|
||||
from contextlib import contextmanager
|
||||
from sipyco import keepalive
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
import struct
|
||||
import logging
|
||||
|
@ -131,6 +133,58 @@ def decode_dump(data):
|
|||
return DecodedDump(log_channel, bool(dds_onehot_sel), messages)
|
||||
|
||||
|
||||
# simplified from sipyco broadcast Receiver
|
||||
class AnalyzerProxyReceiver:
|
||||
def __init__(self, receive_cb):
|
||||
self.receive_cb = receive_cb
|
||||
|
||||
async def connect(self, host, port):
|
||||
self.reader, self.writer = \
|
||||
await keepalive.async_open_connection(host, port)
|
||||
try:
|
||||
self.receive_task = asyncio.ensure_future(self._receive_cr())
|
||||
except:
|
||||
self.writer.close()
|
||||
del self.reader
|
||||
del self.writer
|
||||
raise
|
||||
|
||||
async def close(self):
|
||||
try:
|
||||
self.receive_task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(self.receive_task, None)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
self.writer.close()
|
||||
del self.reader
|
||||
del self.writer
|
||||
|
||||
async def _receive_cr(self):
|
||||
try:
|
||||
while True:
|
||||
endian_byte = await self.reader.readexactly(1)
|
||||
if endian_byte == b"E":
|
||||
endian = '>'
|
||||
elif endian_byte == b"e":
|
||||
endian = '<'
|
||||
else:
|
||||
raise ValueError
|
||||
payload_length_word = await self.reader.readexactly(4)
|
||||
payload_length = struct.unpack(endian + "I", payload_length_word)[0]
|
||||
if payload_length > 10 * 512 * 1024:
|
||||
# 10x buffer size of firmware
|
||||
raise ValueError
|
||||
|
||||
# The remaining header length is 11 bytes.
|
||||
remaining_data = await self.reader.readexactly(payload_length + 11)
|
||||
data = endian_byte + payload_length_word + remaining_data
|
||||
self.receive_cb(data)
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
def vcd_codes():
|
||||
codechars = [chr(i) for i in range(33, 127)]
|
||||
for n in count():
|
||||
|
|
Loading…
Reference in New Issue