dashboard: fix device subscriber connections

This commit is contained in:
Simon Renblad 2024-02-28 17:04:51 +08:00 committed by Sébastien Bourdeauducq
parent 692572a3b9
commit e56331248e
4 changed files with 54 additions and 54 deletions

View File

@ -151,8 +151,6 @@ class AnalyzerProxyReceiver:
def __init__(self, receive_cb, disconnect_cb=None): def __init__(self, receive_cb, disconnect_cb=None):
self.receive_cb = receive_cb self.receive_cb = receive_cb
self.disconnect_cb = disconnect_cb self.disconnect_cb = disconnect_cb
self.receive_task = None
self.writer = None
async def connect(self, host, port): async def connect(self, host, port):
self.reader, self.writer = \ self.reader, self.writer = \
@ -162,17 +160,20 @@ class AnalyzerProxyReceiver:
assert line == ANALYZER_MAGIC assert line == ANALYZER_MAGIC
self.receive_task = asyncio.create_task(self._receive_cr()) self.receive_task = asyncio.create_task(self._receive_cr())
except: except:
if self.writer is not None:
self.writer.close() self.writer.close()
del self.reader del self.reader
del self.writer del self.writer
raise raise
def close(self): async def close(self):
self.disconnect_cb = None self.disconnect_cb = None
if self.receive_task is not None: try:
self.receive_task.cancel() self.receive_task.cancel()
if self.writer is not None: try:
await self.receive_task
except asyncio.CancelledError:
pass
finally:
self.writer.close() self.writer.close()
del self.reader del self.reader
del self.writer del self.writer
@ -200,6 +201,8 @@ class AnalyzerProxyReceiver:
remaining_data = await self.reader.readexactly(payload_length + 11) remaining_data = await self.reader.readexactly(payload_length + 11)
data = endian_byte + payload_length_word + remaining_data data = endian_byte + payload_length_word + remaining_data
self.receive_cb(data) self.receive_cb(data)
except Exception:
logger.error("analyzer receiver connection terminating with exception", exc_info=True)
finally: finally:
if self.disconnect_cb is not None: if self.disconnect_cb is not None:
self.disconnect_cb() self.disconnect_cb()

View File

@ -5,8 +5,6 @@ from collections import namedtuple
from PyQt5 import QtCore, QtWidgets, QtGui from PyQt5 import QtCore, QtWidgets, QtGui
from sipyco.sync_struct import Subscriber
from artiq.coredevice.comm_moninj import * from artiq.coredevice.comm_moninj import *
from artiq.coredevice.ad9910 import ( from artiq.coredevice.ad9910 import (
_AD9910_REG_PROFILE0, _AD9910_REG_PROFILE7, _AD9910_REG_PROFILE0, _AD9910_REG_PROFILE7,
@ -458,9 +456,8 @@ class _DeviceManager:
def init_ddb(self, ddb): def init_ddb(self, ddb):
self.ddb = ddb self.ddb = ddb
return ddb
def notify(self, mod): def notify_ddb(self, mod):
mi_addr, mi_port, description = setup_from_ddb(self.ddb) mi_addr, mi_port, description = setup_from_ddb(self.ddb)
if (mi_addr, mi_port) != (self.mi_addr, self.mi_port): if (mi_addr, mi_port) != (self.mi_addr, self.mi_port):
@ -786,12 +783,6 @@ class MonInj:
self.dm.dac_cb = lambda: self.dac_dock.layout_widgets( self.dm.dac_cb = lambda: self.dac_dock.layout_widgets(
self.dm.dac_widgets.values()) self.dm.dac_widgets.values())
self.subscriber = Subscriber("devices", self.dm.init_ddb, self.dm.notify)
async def start(self, server, port):
await self.subscriber.connect(server, port)
async def stop(self): async def stop(self):
await self.subscriber.close()
if self.dm is not None: if self.dm is not None:
await self.dm.close() await self.dm.close()

View File

@ -10,7 +10,6 @@ from PyQt5 import QtCore, QtWidgets, QtGui
import pyqtgraph as pg import pyqtgraph as pg
import numpy as np import numpy as np
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import AsyncioClient from sipyco.pc_rpc import AsyncioClient
from sipyco import pyon from sipyco import pyon
@ -66,23 +65,23 @@ class ProxyClient():
remote.close_rpc() remote.close_rpc()
async def _reconnect(self): async def _reconnect(self):
try:
while True: while True:
await self._reconnect_event.wait() await self._reconnect_event.wait()
self._reconnect_event.clear() self._reconnect_event.clear()
if self.receiver is not None: if self.receiver is not None:
self.receiver.close() await self.receiver.close()
self.receiver = None self.receiver = None
self.receiver = comm_analyzer.AnalyzerProxyReceiver( new_receiver = comm_analyzer.AnalyzerProxyReceiver(
self.receive_cb, self.reconnect) self.receive_cb, self.reconnect)
try: try:
if self.addr is not None: if self.addr is not None:
await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy), await asyncio.wait_for(new_receiver.connect(self.addr, self.port_proxy),
self.timeout) self.timeout)
logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy) logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy)
self.timer_cur = self.timer self.timer_cur = self.timer
self.receiver = new_receiver
continue continue
except: except Exception:
logger.error("error connecting to analyzer proxy", exc_info=True) logger.error("error connecting to analyzer proxy", exc_info=True)
try: try:
await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur) await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur)
@ -91,8 +90,6 @@ class ProxyClient():
self._reconnect_event.set() self._reconnect_event.set()
else: else:
self.timer_cur = self.timer self.timer_cur = self.timer
except asyncio.CancelledError:
pass
async def close(self): async def close(self):
self._reconnect_task.cancel() self._reconnect_task.cancel()
@ -101,7 +98,7 @@ class ProxyClient():
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
if self.receiver is not None: if self.receiver is not None:
self.receiver.close() await self.receiver.close()
def reconnect(self): def reconnect(self):
self._reconnect_event.set() self._reconnect_event.set()
@ -715,7 +712,6 @@ class WaveformDock(QtWidgets.QDockWidget):
self._current_dir = os.getcwd() self._current_dir = os.getcwd()
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.proxy_client = ProxyClient(self.on_dump_receive, self.proxy_client = ProxyClient(self.on_dump_receive,
timeout, timeout,
timer, timer,
@ -909,5 +905,9 @@ class WaveformDock(QtWidgets.QDockWidget):
self._process_ddb() self._process_ddb()
return ddb return ddb
def update_ddb(self, mod): def notify_ddb(self, mod):
self._process_ddb() self._process_ddb()
async def stop(self):
if self.proxy_client is not None:
await self.proxy_client.close()

View File

@ -15,6 +15,7 @@ from sipyco.pc_rpc import AsyncioClient, Client
from sipyco.broadcast import Receiver from sipyco.broadcast import Receiver
from sipyco import common_args from sipyco import common_args
from sipyco.asyncio_tools import atexit_register_coroutine from sipyco.asyncio_tools import atexit_register_coroutine
from sipyco.sync_struct import Subscriber
from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version
from artiq.tools import get_user_config_dir from artiq.tools import get_user_config_dir
@ -226,7 +227,6 @@ def main():
broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify) broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify)
d_ttl_dds = moninj.MonInj(rpc_clients["schedule"]) d_ttl_dds = moninj.MonInj(rpc_clients["schedule"])
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit_register_coroutine(d_ttl_dds.stop, loop=loop) atexit_register_coroutine(d_ttl_dds.stop, loop=loop)
d_waveform = waveform.WaveformDock( d_waveform = waveform.WaveformDock(
@ -234,9 +234,15 @@ def main():
args.analyzer_proxy_timer, args.analyzer_proxy_timer,
args.analyzer_proxy_timer_backoff args.analyzer_proxy_timer_backoff
) )
atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop) atexit_register_coroutine(d_waveform.stop, loop=loop)
loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify))
atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop) def init_cbs(ddb):
d_ttl_dds.dm.init_ddb(ddb)
d_waveform.init_ddb(ddb)
return ddb
devices_sub = Subscriber("devices", init_cbs, [d_ttl_dds.dm.notify_ddb, d_waveform.notify_ddb])
loop.run_until_complete(devices_sub.connect(args.server, args.port_notify))
atexit_register_coroutine(devices_sub.close, loop=loop)
d_schedule = schedule.ScheduleDock( d_schedule = schedule.ScheduleDock(
rpc_clients["schedule"], sub_clients["schedule"]) rpc_clients["schedule"], sub_clients["schedule"])