diff --git a/artiq/coredevice/comm_analyzer.py b/artiq/coredevice/comm_analyzer.py index ea4e8757a..9fad6666a 100644 --- a/artiq/coredevice/comm_analyzer.py +++ b/artiq/coredevice/comm_analyzer.py @@ -151,8 +151,6 @@ class AnalyzerProxyReceiver: def __init__(self, receive_cb, disconnect_cb=None): self.receive_cb = receive_cb self.disconnect_cb = disconnect_cb - self.receive_task = None - self.writer = None async def connect(self, host, port): self.reader, self.writer = \ @@ -162,17 +160,20 @@ class AnalyzerProxyReceiver: assert line == ANALYZER_MAGIC self.receive_task = asyncio.create_task(self._receive_cr()) except: - if self.writer is not None: - self.writer.close() - del self.reader - del self.writer + self.writer.close() + del self.reader + del self.writer raise - def close(self): + async def close(self): self.disconnect_cb = None - if self.receive_task is not None: + try: self.receive_task.cancel() - if self.writer is not None: + try: + await self.receive_task + except asyncio.CancelledError: + pass + finally: self.writer.close() del self.reader del self.writer @@ -200,6 +201,8 @@ class AnalyzerProxyReceiver: remaining_data = await self.reader.readexactly(payload_length + 11) data = endian_byte + payload_length_word + remaining_data self.receive_cb(data) + except Exception: + logger.error("analyzer receiver connection terminating with exception", exc_info=True) finally: if self.disconnect_cb is not None: self.disconnect_cb() diff --git a/artiq/dashboard/moninj.py b/artiq/dashboard/moninj.py index 3ee97b897..4810f4d34 100644 --- a/artiq/dashboard/moninj.py +++ b/artiq/dashboard/moninj.py @@ -5,8 +5,6 @@ from collections import namedtuple from PyQt5 import QtCore, QtWidgets, QtGui -from sipyco.sync_struct import Subscriber - from artiq.coredevice.comm_moninj import * from artiq.coredevice.ad9910 import ( _AD9910_REG_PROFILE0, _AD9910_REG_PROFILE7, @@ -458,9 +456,8 @@ class _DeviceManager: def init_ddb(self, ddb): self.ddb = ddb - return ddb - def notify(self, mod): + def notify_ddb(self, mod): mi_addr, mi_port, description = setup_from_ddb(self.ddb) if (mi_addr, mi_port) != (self.mi_addr, self.mi_port): @@ -786,12 +783,6 @@ class MonInj: self.dm.dac_cb = lambda: self.dac_dock.layout_widgets( self.dm.dac_widgets.values()) - self.subscriber = Subscriber("devices", self.dm.init_ddb, self.dm.notify) - - async def start(self, server, port): - await self.subscriber.connect(server, port) - async def stop(self): - await self.subscriber.close() if self.dm is not None: await self.dm.close() diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index 718321f4a..a0eda50a3 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -10,7 +10,6 @@ from PyQt5 import QtCore, QtWidgets, QtGui import pyqtgraph as pg import numpy as np -from sipyco.sync_struct import Subscriber from sipyco.pc_rpc import AsyncioClient from sipyco import pyon @@ -66,33 +65,31 @@ class ProxyClient(): remote.close_rpc() async def _reconnect(self): - try: - while True: - await self._reconnect_event.wait() - self._reconnect_event.clear() - if self.receiver is not None: - self.receiver.close() - self.receiver = None - self.receiver = comm_analyzer.AnalyzerProxyReceiver( - self.receive_cb, self.reconnect) - try: - if self.addr is not None: - await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy), - self.timeout) - logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy) + while True: + await self._reconnect_event.wait() + self._reconnect_event.clear() + if self.receiver is not None: + await self.receiver.close() + self.receiver = None + new_receiver = comm_analyzer.AnalyzerProxyReceiver( + self.receive_cb, self.reconnect) + try: + if self.addr is not None: + await asyncio.wait_for(new_receiver.connect(self.addr, self.port_proxy), + self.timeout) + logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy) self.timer_cur = self.timer - continue - except: - logger.error("error connecting to analyzer proxy", exc_info=True) - try: - await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur) - except asyncio.TimeoutError: - self.timer_cur *= self.timer_backoff - self._reconnect_event.set() - else: - self.timer_cur = self.timer - except asyncio.CancelledError: - pass + self.receiver = new_receiver + continue + except Exception: + logger.error("error connecting to analyzer proxy", exc_info=True) + try: + await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur) + except asyncio.TimeoutError: + self.timer_cur *= self.timer_backoff + self._reconnect_event.set() + else: + self.timer_cur = self.timer async def close(self): self._reconnect_task.cancel() @@ -101,7 +98,7 @@ class ProxyClient(): except asyncio.CancelledError: pass if self.receiver is not None: - self.receiver.close() + await self.receiver.close() def reconnect(self): self._reconnect_event.set() @@ -715,8 +712,7 @@ class WaveformDock(QtWidgets.QDockWidget): self._current_dir = os.getcwd() - self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) - self.proxy_client = ProxyClient(self.on_dump_receive, + self.proxy_client = ProxyClient(self.on_dump_receive, timeout, timer, timer_backoff) @@ -909,5 +905,9 @@ class WaveformDock(QtWidgets.QDockWidget): self._process_ddb() return ddb - def update_ddb(self, mod): + def notify_ddb(self, mod): self._process_ddb() + + async def stop(self): + if self.proxy_client is not None: + await self.proxy_client.close() diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index e9c6269d3..05bc5e531 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -15,6 +15,7 @@ from sipyco.pc_rpc import AsyncioClient, Client from sipyco.broadcast import Receiver from sipyco import common_args from sipyco.asyncio_tools import atexit_register_coroutine +from sipyco.sync_struct import Subscriber from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version from artiq.tools import get_user_config_dir @@ -226,7 +227,6 @@ def main(): broadcast_clients["ccb"].notify_cbs.append(d_applets.ccb_notify) d_ttl_dds = moninj.MonInj(rpc_clients["schedule"]) - loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) atexit_register_coroutine(d_ttl_dds.stop, loop=loop) d_waveform = waveform.WaveformDock( @@ -234,9 +234,15 @@ def main(): args.analyzer_proxy_timer, args.analyzer_proxy_timer_backoff ) - atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop) - loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify)) - atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop) + atexit_register_coroutine(d_waveform.stop, loop=loop) + + def init_cbs(ddb): + d_ttl_dds.dm.init_ddb(ddb) + d_waveform.init_ddb(ddb) + return ddb + devices_sub = Subscriber("devices", init_cbs, [d_ttl_dds.dm.notify_ddb, d_waveform.notify_ddb]) + loop.run_until_complete(devices_sub.connect(args.server, args.port_notify)) + atexit_register_coroutine(devices_sub.close, loop=loop) d_schedule = schedule.ScheduleDock( rpc_clients["schedule"], sub_clients["schedule"])