waveform: add WaveformProxyClient

This commit is contained in:
Simon Renblad 2024-01-09 15:44:16 +08:00
parent 6552a860d9
commit a6335a5206

View File

@ -1,6 +1,8 @@
from PyQt5 import QtCore, QtWidgets, QtGui
from PyQt5.QtCore import Qt
from sipyco.pc_rpc import AsyncioClient
from artiq.tools import exc_to_warning
from artiq.gui.models import DictSyncTreeSepModel, LocalModelManager
from artiq.gui.dndwidgets import DragDropSplitter, VDragScrollArea
@ -524,3 +526,99 @@ class WaveformArea(QtWidgets.QWidget):
for i in range(self._splitter.count()):
cw = self._splitter.widget(i)
cw.set_cursor_visible(self._cursor_visible)
class WaveformProxyClient:
def __init__(self, state, loop):
self._state = state
self._loop = loop
self.devices_sub = None
self.rpc_client = AsyncioClient()
self.proxy_receiver = None
self._proxy_addr = None
self._proxy_port = None
self._proxy_port_ctl = None
self._on_sub_reconnect = asyncio.Event()
self._on_rpc_reconnect = asyncio.Event()
self._reconnect_rpc_task = None
self._reconnect_receiver_task = None
async def trigger_proxy_task(self):
try:
if self.rpc_client.get_rpc_id()[0] is None:
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?")
asyncio.ensure_future(self.rpc_client.trigger())
except Exception as e:
logger.warning("Failed to pull from device: %s", e)
def update_address(self, addr, port, port_control):
self._proxy_addr = addr
self._proxy_port = port
self._proxy_port_ctl = port_control
self._on_rpc_reconnect.set()
self._on_sub_reconnect.set()
# Proxy client connections
async def start(self, server, port):
try:
await self.devices_sub.connect(server, port)
self._reconnect_rpc_task = asyncio.ensure_future(
self.reconnect_rpc(), loop=self._loop)
self._reconnect_receiver_task = asyncio.ensure_future(
self.reconnect_receiver(), loop=self._loop)
except Exception as e:
logger.error("Failed to connect to master: %s", e)
async def reconnect_rpc(self):
try:
while True:
await self._on_rpc_reconnect.wait()
self._on_rpc_reconnect.clear()
logger.info("Attempting analyzer proxy RPC connection...")
try:
await self.rpc_client.connect_rpc(self._proxy_addr,
self._proxy_port_ctl,
"coreanalyzer_proxy_control")
except Exception:
logger.info("Analyzer proxy RPC timed out, trying again...")
await asyncio.sleep(5)
self._on_rpc_reconnect.set()
else:
logger.info("RPC connected to analyzer proxy on %s/%s",
self._proxy_addr, self._proxy_port_ctl)
except asyncio.CancelledError:
pass
async def reconnect_receiver(self):
try:
while True:
await self._on_sub_reconnect.wait()
self._on_sub_reconnect.clear()
logger.info("Setting up analyzer proxy receiver...")
try:
await self.proxy_receiver.connect(
self._proxy_addr, self._proxy_port)
except Exception:
logger.info("Failed to set up analyzer proxy receiver, reconnecting...")
await asyncio.sleep(5)
self._on_sub_reconnect.set()
else:
logger.info("Receiving from analyzer proxy on %s:%s",
self._proxy_addr, self._proxy_port)
except asyncio.CancelledError:
pass
async def stop(self):
try:
self._reconnect_rpc_task.cancel()
self._reconnect_receiver_task.cancel()
await asyncio.wait_for(self._reconnect_rpc_task, None)
await asyncio.wait_for(self._reconnect_receiver_task, None)
await self.devices_sub.close()
self.rpc_client.close_rpc()
await self.proxy_receiver.close()
except Exception as e:
logger.error("Error occurred while closing proxy connections: %s",
e, exc_info=True)