mirror of https://github.com/m-labs/artiq.git
waveform: add proxy clients
This commit is contained in:
parent
9088ffa2ca
commit
73be2257d3
|
@ -6,6 +6,7 @@ from PyQt5 import QtCore, QtWidgets, QtGui
|
||||||
from PyQt5.QtCore import Qt
|
from PyQt5.QtCore import Qt
|
||||||
|
|
||||||
from sipyco.sync_struct import Subscriber
|
from sipyco.sync_struct import Subscriber
|
||||||
|
from sipyco.pc_rpc import AsyncioClient
|
||||||
|
|
||||||
from artiq.tools import exc_to_warning
|
from artiq.tools import exc_to_warning
|
||||||
from artiq.coredevice import comm_analyzer
|
from artiq.coredevice import comm_analyzer
|
||||||
|
@ -17,6 +18,87 @@ from artiq.gui.models import DictSyncTreeSepModel, LocalModelManager
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class _BaseProxyClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.addr = None
|
||||||
|
self.port = None
|
||||||
|
self._reconnect_event = asyncio.Event()
|
||||||
|
self._reconnect_task = None
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
self._reconnect_task = asyncio.ensure_future(
|
||||||
|
exc_to_warning(self._reconnect()))
|
||||||
|
|
||||||
|
def update_address(self, addr, port):
|
||||||
|
self.addr = addr
|
||||||
|
self.port = port
|
||||||
|
self._reconnect_event.set()
|
||||||
|
|
||||||
|
async def _reconnect(self):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
await self._reconnect_event.wait()
|
||||||
|
self._reconnect_event.clear()
|
||||||
|
try:
|
||||||
|
await self.disconnect_cr()
|
||||||
|
except:
|
||||||
|
logger.error("Error caught when disconnecting proxy client.", exc_info=True)
|
||||||
|
try:
|
||||||
|
await self.reconnect_cr()
|
||||||
|
except Exception:
|
||||||
|
logger.error(
|
||||||
|
"Error caught when reconnecting proxy client. Retrying...", exc_info=True)
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
self._reconnect_event.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
try:
|
||||||
|
self._reconnect_task.cancel()
|
||||||
|
await asyncio.wait_for(self._reconnect_task, None)
|
||||||
|
await self.disconnect_cr()
|
||||||
|
except:
|
||||||
|
logger.error("Error caught while closing proxy client.", exc_info=True)
|
||||||
|
|
||||||
|
async def reconnect_cr(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def disconnect_cr(self):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class RPCProxyClient(_BaseProxyClient):
|
||||||
|
def __init__(self):
|
||||||
|
_BaseProxyClient.__init__(self)
|
||||||
|
self.client = AsyncioClient()
|
||||||
|
|
||||||
|
async def trigger_proxy_task(self):
|
||||||
|
if self.client.get_rpc_id()[0] is None:
|
||||||
|
raise AttributeError("Unable to identify RPC target. Is analyzer proxy connected?")
|
||||||
|
await self.client.trigger()
|
||||||
|
|
||||||
|
async def reconnect_cr(self):
|
||||||
|
await self.client.connect_rpc(self.addr,
|
||||||
|
self.port,
|
||||||
|
"coreanalyzer_proxy_control")
|
||||||
|
|
||||||
|
async def disconnect_cr(self):
|
||||||
|
self.client.close_rpc()
|
||||||
|
|
||||||
|
|
||||||
|
class ReceiverProxyClient(_BaseProxyClient):
|
||||||
|
def __init__(self, receiver):
|
||||||
|
_BaseProxyClient.__init__(self)
|
||||||
|
self.receiver = receiver
|
||||||
|
|
||||||
|
async def reconnect_cr(self):
|
||||||
|
await self.receiver.connect(self.addr, self.port)
|
||||||
|
|
||||||
|
async def disconnect_cr(self):
|
||||||
|
await self.receiver.close()
|
||||||
|
|
||||||
|
|
||||||
class Model(DictSyncTreeSepModel):
|
class Model(DictSyncTreeSepModel):
|
||||||
def __init__(self, init):
|
def __init__(self, init):
|
||||||
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
|
DictSyncTreeSepModel.__init__(self, "/", ["Channels"], init)
|
||||||
|
@ -51,10 +133,11 @@ class WaveformDock(QtWidgets.QDockWidget):
|
||||||
|
|
||||||
self._current_dir = os.getcwd()
|
self._current_dir = os.getcwd()
|
||||||
|
|
||||||
devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
|
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
|
||||||
|
self.rpc_client = RPCProxyClient()
|
||||||
proxy_receiver = comm_analyzer.AnalyzerProxyReceiver(
|
receiver = comm_analyzer.AnalyzerProxyReceiver(
|
||||||
self.on_dump_receive)
|
self.on_dump_receive)
|
||||||
|
self.receiver_client = ReceiverProxyClient(receiver)
|
||||||
|
|
||||||
grid = LayoutWidget()
|
grid = LayoutWidget()
|
||||||
self.setWidget(grid)
|
self.setWidget(grid)
|
||||||
|
@ -70,6 +153,8 @@ class WaveformDock(QtWidgets.QDockWidget):
|
||||||
self._request_dump_btn.setIcon(
|
self._request_dump_btn.setIcon(
|
||||||
QtWidgets.QApplication.style().standardIcon(
|
QtWidgets.QApplication.style().standardIcon(
|
||||||
QtWidgets.QStyle.SP_BrowserReload))
|
QtWidgets.QStyle.SP_BrowserReload))
|
||||||
|
self._request_dump_btn.clicked.connect(
|
||||||
|
lambda: asyncio.ensure_future(exc_to_warning(self.rpc_client.trigger_proxy_task())))
|
||||||
grid.addWidget(self._request_dump_btn, 0, 1)
|
grid.addWidget(self._request_dump_btn, 0, 1)
|
||||||
|
|
||||||
self._add_btn = QtWidgets.QToolButton()
|
self._add_btn = QtWidgets.QToolButton()
|
||||||
|
@ -120,8 +205,10 @@ class WaveformDock(QtWidgets.QDockWidget):
|
||||||
desc = self._ddb.get("core_analyzer")
|
desc = self._ddb.get("core_analyzer")
|
||||||
if desc is not None:
|
if desc is not None:
|
||||||
addr = desc["host"]
|
addr = desc["host"]
|
||||||
port = desc.get("port_proxy", 1385)
|
port_proxy = desc.get("port_proxy", 1385)
|
||||||
port_control = desc.get("port_proxy_control", 1386)
|
port = desc.get("port", 1386)
|
||||||
|
self.receiver_client.update_address(addr, port_proxy)
|
||||||
|
self.rpc_client.update_address(addr, port)
|
||||||
|
|
||||||
def init_ddb(self, ddb):
|
def init_ddb(self, ddb):
|
||||||
self._ddb = ddb
|
self._ddb = ddb
|
||||||
|
|
Loading…
Reference in New Issue