forked from M-Labs/artiq
1
0
Fork 0

dashboard fix moninj, analyzer clients

This commit is contained in:
Simon Renblad 2024-02-27 10:37:00 +08:00 committed by Sébastien Bourdeauducq
parent c151f0c3ce
commit a21805598a
3 changed files with 26 additions and 23 deletions

View File

@ -168,7 +168,7 @@ class AnalyzerProxyReceiver:
del self.writer del self.writer
raise raise
async def close(self): def close(self):
self.disconnect_cb = None self.disconnect_cb = None
if self.receive_task is not None: if self.receive_task is not None:
self.receive_task.cancel() self.receive_task.cancel()

View File

@ -29,22 +29,18 @@ WAVEFORM_MAX_HEIGHT = 200
class ProxyClient(): class ProxyClient():
def __init__(self, receive_cb, disconnect_cb): def __init__(self, receive_cb, timeout=5, timer=5, timer_backoff=1.1):
self.receive_cb = receive_cb self.receive_cb = receive_cb
self.disconnect_cb = disconnect_cb
self.receiver = None self.receiver = None
self.addr = None self.addr = None
self.port_proxy = None self.port_proxy = None
self.port = None self.port = None
self._reconnect_event = asyncio.Event() self._reconnect_event = asyncio.Event()
self._reconnect_task = None
async def start(self, timeout=5, timer=5, timer_backoff=1.1):
self.timeout = timeout self.timeout = timeout
self.timer = timer self.timer = timer
self.timer_cur = timer self.timer_cur = timer
self.timer_backoff = timer_backoff self.timer_backoff = timer_backoff
self._reconnect_task = asyncio.create_task(self._reconnect()) self._reconnect_task = asyncio.ensure_future(self._reconnect())
def update_address(self, addr, port, port_proxy): def update_address(self, addr, port, port_proxy):
self.addr = addr self.addr = addr
@ -56,6 +52,9 @@ class ProxyClient():
remote = AsyncioClient() remote = AsyncioClient()
try: try:
try: try:
if self.addr is None:
logger.error("missing core_analyzer host in device db")
return
await remote.connect_rpc(self.addr, self.port, "coreanalyzer_proxy_control") await remote.connect_rpc(self.addr, self.port, "coreanalyzer_proxy_control")
except: except:
logger.error("error connecting to analyzer proxy control", exc_info=True) logger.error("error connecting to analyzer proxy control", exc_info=True)
@ -72,14 +71,15 @@ class ProxyClient():
await self._reconnect_event.wait() await self._reconnect_event.wait()
self._reconnect_event.clear() self._reconnect_event.clear()
if self.receiver is not None: if self.receiver is not None:
await self.receiver.close() self.receiver.close()
self.receiver = None self.receiver = None
self.receiver = comm_analyzer.AnalyzerProxyReceiver( self.receiver = comm_analyzer.AnalyzerProxyReceiver(
self.receive_cb, self.disconnect_cb) self.receive_cb, self.reconnect)
try: try:
await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy), if self.addr is not None:
self.timeout) await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy),
logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy) self.timeout)
logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy)
self.timer_cur = self.timer self.timer_cur = self.timer
continue continue
except: except:
@ -97,9 +97,11 @@ class ProxyClient():
async def close(self): async def close(self):
self._reconnect_task.cancel() self._reconnect_task.cancel()
try: try:
await self.receiver.close() await asyncio.wait_for(self._reconnect_task, None)
except: except asyncio.CancelledError:
logger.error("error closing connection to analyzer proxy", exc_info=True) pass
if self.receiver is not None:
self.receiver.close()
def reconnect(self): def reconnect(self):
self._reconnect_event.set() self._reconnect_event.set()
@ -692,7 +694,7 @@ class _AddChannelDialog(QtWidgets.QDialog):
class WaveformDock(QtWidgets.QDockWidget): class WaveformDock(QtWidgets.QDockWidget):
def __init__(self): def __init__(self, timeout, timer, timer_backoff):
QtWidgets.QDockWidget.__init__(self, "Waveform") QtWidgets.QDockWidget.__init__(self, "Waveform")
self.setObjectName("Waveform") self.setObjectName("Waveform")
self.setFeatures( self.setFeatures(
@ -714,7 +716,10 @@ class WaveformDock(QtWidgets.QDockWidget):
self._current_dir = os.getcwd() self._current_dir = os.getcwd()
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.proxy_client = ProxyClient(self.on_dump_receive, self.on_proxy_disconnect) self.proxy_client = ProxyClient(self.on_dump_receive,
timeout,
timer,
timer_backoff)
grid = LayoutWidget() grid = LayoutWidget()
self.setWidget(grid) self.setWidget(grid)
@ -795,9 +800,6 @@ class WaveformDock(QtWidgets.QDockWidget):
self._waveform_view.setTimescale(self._waveform_data['timescale']) self._waveform_view.setTimescale(self._waveform_data['timescale'])
self._cursor_control.setTimescale(self._waveform_data['timescale']) self._cursor_control.setTimescale(self._waveform_data['timescale'])
def on_proxy_disconnect(self):
self.proxy_client.reconnect()
async def load_trace(self): async def load_trace(self):
try: try:
filename = await get_open_file_name( filename = await get_open_file_name(
@ -899,6 +901,8 @@ class WaveformDock(QtWidgets.QDockWidget):
port_proxy = desc.get("port_proxy", 1385) port_proxy = desc.get("port_proxy", 1385)
port = desc.get("port", 1386) port = desc.get("port", 1386)
self.proxy_client.update_address(addr, port, port_proxy) self.proxy_client.update_address(addr, port, port_proxy)
else:
self.proxy_client.update_address(None, None, None)
def init_ddb(self, ddb): def init_ddb(self, ddb):
self._ddb = ddb self._ddb = ddb

View File

@ -229,12 +229,11 @@ def main():
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit_register_coroutine(d_ttl_dds.stop, loop=loop) atexit_register_coroutine(d_ttl_dds.stop, loop=loop)
d_waveform = waveform.WaveformDock() d_waveform = waveform.WaveformDock(
loop.run_until_complete(d_waveform.proxy_client.start(
args.analyzer_proxy_timeout, args.analyzer_proxy_timeout,
args.analyzer_proxy_timer, args.analyzer_proxy_timer,
args.analyzer_proxy_timer_backoff args.analyzer_proxy_timer_backoff
)) )
atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop) atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop)
loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify)) loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify))
atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop) atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop)