waveform: major refactor to proxy connectors

This commit is contained in:
Simon Renblad 2024-01-16 16:19:12 +08:00
parent 21c391865f
commit 70542b0a5b

View File

@ -1,6 +1,7 @@
from PyQt5 import QtCore, QtWidgets, QtGui
from PyQt5.QtCore import Qt
from sipyco.asyncio_tools import atexit_register_coroutine
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import AsyncioClient
from sipyco import pyon
@ -88,7 +89,7 @@ class _AddChannelDialog(QtWidgets.QDialog):
self.accepted.emit(channels)
self.close()
# TODO: make private
class BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget):
def __init__(self, parent, rect):
pg.GraphicsWidget.__init__(self, parent)
@ -278,14 +279,13 @@ class BitWaveform(Waveform):
display_x = []
previous_y = None
for x, y in zip(self.x_data, self.y_data):
state_unchanged = previous_y == y
if y == "X":
if y == "X": # TODO: replace with dictionary mapping
dis_y = DISPLAY_MID
elif y == "1":
dis_y = DISPLAY_HIGH
else:
dis_y = DISPLAY_LOW
if state_unchanged:
if previous_y == y: # TODO: extract to separate function
arw = pg.ArrowItem(pxMode=True, angle=90)
self.addItem(arw)
self._arrows.append(arw)
@ -309,7 +309,7 @@ class BitVectorWaveform(Waveform):
Waveform.__init__(self, channel, state, parent)
self._labels = []
hx = math.ceil(self.width / 4)
self._format_string = "{:0=" + str(hx) + "X}"
self._format_string = "{:0=" + str(hx) + "X}" # TODO: change method..
self.view_box.sigTransformChanged.connect(self._update_labels)
def _update_labels(self):
@ -339,7 +339,7 @@ class BitVectorWaveform(Waveform):
for x, y in zip(self.x_data, self.y_data):
display_x.append(x)
display_y.append(DISPLAY_LOW)
if "X" in y:
if "X" in y: # TODO change to using a dictionary
display_x.append(x)
display_y.append(DISPLAY_MID)
elif int(y) != 0:
@ -358,7 +358,7 @@ class BitVectorWaveform(Waveform):
self.plot_data_item.setData(x=[], y=[])
def format_cursor_label(self):
if "X" in self.cursor_y:
if "X" in self.cursor_y: # TODO: this will not happen in any current implementation of the bit vector handlers..
lbl = self.cursor_y
else:
lbl = self._format_string.format(int(self.cursor_y, 2))
@ -368,14 +368,17 @@ class BitVectorWaveform(Waveform):
class AnalogWaveform(Waveform):
def __init__(self, channel, state, parent=None):
Waveform.__init__(self, channel, state, parent)
self.plot_data_item.setDownsampling(ds=10, method="peak", auto=True)
self.plot_data_item.setDownsampling(ds=10, method="peak", auto=True)
# TODO: experiment with downsampling values for best performance
# TODO: potentially switch to not using a step connect
def extract_data_from_state(self):
try:
self.x_data, self.y_data = zip(*self.state['data'][self.name])
except:
logger.debug('Error caught when loading waveform data: {}'.format(self.name), exc_info=True)
# TODO: change to using max_, min_
def display(self):
try:
self.plot_data_item.setData(x=self.x_data, y=self.y_data)
@ -386,8 +389,9 @@ class AnalogWaveform(Waveform):
logger.debug('Error caught when displaying waveform: {}'.format(self.name), exc_info=True)
self.plot_data_item.setData(x=[0], y=[0])
# TODO: can also just leave it as None -> simpler
def format_cursor_label(self):
if self.cursor_y is None:
if self.cursor_y is None:
lbl = "nan"
else:
lbl = str(self.cursor_y)
@ -433,6 +437,7 @@ class WaveformArea(QtWidgets.QWidget):
scroll_area.setFrameShape(QtWidgets.QFrame.NoFrame)
layout.addWidget(scroll_area)
# TODO: name changed to VDragDropSplitter
self._splitter = DragDropSplitter(parent=scroll_area)
self._splitter.setHandleWidth(1)
scroll_area.setWidget(self._splitter)
@ -518,7 +523,7 @@ class WaveformArea(QtWidgets.QWidget):
cw.display()
cw.on_cursor_move(self._cursor_x_pos)
cw.update_x_max()
maximum = self._state["stopped_x"]
maximum = self._state["stopped_x"] # TODO: change maximum -> stopped_x
self._ref_axis.setLimits(xMax=maximum)
if maximum is not None:
self._ref_axis.setRange(xRange=(0, maximum))
@ -536,102 +541,7 @@ class WaveformArea(QtWidgets.QWidget):
cw.set_cursor_visible(self._cursor_visible)
class WaveformProxyClient:
def __init__(self, state, loop):
self._state = state
self._loop = loop
self.devices_sub = None
self.rpc_client = AsyncioClient()
self.proxy_receiver = None
self._proxy_addr = None
self._proxy_port = None
self._proxy_port_ctl = None
self._on_sub_reconnect = asyncio.Event()
self._on_rpc_reconnect = asyncio.Event()
self._reconnect_rpc_task = None
self._reconnect_receiver_task = None
async def trigger_proxy_task(self):
try:
if self.rpc_client.get_rpc_id()[0] is None:
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?")
asyncio.ensure_future(self.rpc_client.trigger())
except Exception as e:
logger.warning("Failed to pull from device: %s", e)
def update_address(self, addr, port, port_control):
self._proxy_addr = addr
self._proxy_port = port
self._proxy_port_ctl = port_control
self._on_rpc_reconnect.set()
self._on_sub_reconnect.set()
# Proxy client connections
async def start(self, server, port):
try:
await self.devices_sub.connect(server, port)
self._reconnect_rpc_task = asyncio.ensure_future(
self.reconnect_rpc(), loop=self._loop)
self._reconnect_receiver_task = asyncio.ensure_future(
self.reconnect_receiver(), loop=self._loop)
except Exception as e:
logger.error("Failed to connect to master: %s", e)
async def reconnect_rpc(self):
try:
while True:
await self._on_rpc_reconnect.wait()
self._on_rpc_reconnect.clear()
logger.info("Attempting analyzer proxy RPC connection...")
try:
await self.rpc_client.connect_rpc(self._proxy_addr,
self._proxy_port_ctl,
"coreanalyzer_proxy_control")
except Exception:
logger.info("Analyzer proxy RPC timed out, trying again...")
await asyncio.sleep(5)
self._on_rpc_reconnect.set()
else:
logger.info("RPC connected to analyzer proxy on %s/%s",
self._proxy_addr, self._proxy_port_ctl)
except asyncio.CancelledError:
pass
async def reconnect_receiver(self):
try:
while True:
await self._on_sub_reconnect.wait()
self._on_sub_reconnect.clear()
logger.info("Setting up analyzer proxy receiver...")
try:
await self.proxy_receiver.connect(
self._proxy_addr, self._proxy_port)
except Exception:
logger.info("Failed to set up analyzer proxy receiver, reconnecting...")
await asyncio.sleep(5)
self._on_sub_reconnect.set()
else:
logger.info("Receiving from analyzer proxy on %s:%s",
self._proxy_addr, self._proxy_port)
except asyncio.CancelledError:
pass
async def stop(self):
try:
self._reconnect_rpc_task.cancel()
self._reconnect_receiver_task.cancel()
await asyncio.wait_for(self._reconnect_rpc_task, None)
await asyncio.wait_for(self._reconnect_receiver_task, None)
await self.devices_sub.close()
self.rpc_client.close_rpc()
await self.proxy_receiver.close()
except Exception as e:
logger.error("Error occurred while closing proxy connections: %s",
e, exc_info=True)
# TODO: move elsewhere -> this can be a late addition
class _CursorTimeControl(QtWidgets.QLineEdit):
submit = QtCore.pyqtSignal(float)
PRECISION = 15
@ -663,6 +573,80 @@ class _CursorTimeControl(QtWidgets.QLineEdit):
self.clearFocus()
class _BaseProxyClient:
def __init__(self):
self.addr = None
self.port = None
self._reconnect_event = asyncio.Event()
self._reconnect_task = None
async def start(self):
self._reconnect_task = asyncio.ensure_future(
exc_to_warning(self._reconnect()))
def update_address(self, addr, port):
self.addr = addr
self.port = port
self._reconnect_event.set()
async def _reconnect(self):
try:
while True:
await self._reconnect_event.wait()
self._reconnect_event.clear()
try:
await self.reconnect_cr()
except Exception:
await asyncio.sleep(5)
self._reconnect_event.set()
except asyncio.CancelledError:
pass
async def close(self):
try:
self._reconnect_task.cancel()
await asyncio.wait_for(self._reconnect_task, None)
await self.disconnect_cr()
except:
logger.error("Error caught while closing proxy client.", exc_info=True)
async def reconnect_cr(self):
raise NotImplementedError
async def disconnect_cr(self):
raise NotImplementedError
class RPCProxyClient(_BaseProxyClient):
def __init__(self):
_BaseProxyClient.__init__(self)
self.client = AsyncioClient()
async def trigger_proxy_task(self):
if self.client.get_rpc_id()[0] is None:
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?")
await self.client.trigger()
async def reconnect_cr(self):
await self.client.connect_rpc(self.addr,
self.port,
"coreanalyzer_proxy_control")
async def disconnect_cr(self):
self.client.close_rpc()
class ReceiverProxyClient(_BaseProxyClient):
def __init__(self, receiver):
_BaseProxyClient.__init__(self)
self.receiver = receiver
async def reconnect_cr(self):
await self.receiver.connect(self.addr, self.port)
async def disconnect_cr(self):
await self.receiver.close()
class WaveformDock(QtWidgets.QDockWidget):
traceDataChanged = QtCore.pyqtSignal()
@ -685,14 +669,12 @@ class WaveformDock(QtWidgets.QDockWidget):
}
self._current_dir = os.getcwd()
self.proxy_client = WaveformProxyClient(self._state, loop)
devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
proxy_receiver = comm_analyzer.AnalyzerProxyReceiver(
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.rpc_client = RPCProxyClient()
receiver = comm_analyzer.AnalyzerProxyReceiver(
self.on_dump_receive)
self.proxy_client.devices_sub = devices_sub
self.proxy_client.proxy_receiver = proxy_receiver
self.receiver_client = ReceiverProxyClient(receiver)
grid = LayoutWidget()
self.setWidget(grid)
@ -710,7 +692,7 @@ class WaveformDock(QtWidgets.QDockWidget):
QtWidgets.QStyle.SP_BrowserReload))
grid.addWidget(self._request_dump_btn, 0, 1)
self._request_dump_btn.clicked.connect(
lambda: asyncio.ensure_future(self.proxy_client.trigger_proxy_task()))
lambda: asyncio.ensure_future(self.rpc_client.trigger_proxy_task()))
self._waveform_area = WaveformArea(self, self._state,
self._channel_model)
@ -748,7 +730,7 @@ class WaveformDock(QtWidgets.QDockWidget):
def _update_log_channels(self):
for log in self._state['logs']:
self._channel_model[log] = (0, "log")
self._channel_model[log] = (0, WaveformType.LOG)
def on_dump_receive(self, data):
decoded_dump = comm_analyzer.decode_dump(data)
@ -858,7 +840,8 @@ class WaveformDock(QtWidgets.QDockWidget):
addr = desc["host"]
port = desc.get("port_proxy", 1385)
port_control = desc.get("port_proxy_control", 1386)
self.proxy_client.update_address(addr, port, port_control)
self.rpc_client.update_address(addr, port_control)
self.receiver_client.update_address(addr, port)
def init_ddb(self, ddb):
logger.info("init ddb")