diff --git a/artiq/dashboard/waveform.py b/artiq/dashboard/waveform.py index f97601501..ff796fed4 100644 --- a/artiq/dashboard/waveform.py +++ b/artiq/dashboard/waveform.py @@ -28,97 +28,82 @@ WAVEFORM_MIN_HEIGHT = 50 WAVEFORM_MAX_HEIGHT = 200 -class _BaseProxyClient: - def __init__(self): +class ProxyClient(): + def __init__(self, receive_cb, disconnect_cb): + self.receive_cb = receive_cb + self.disconnect_cb = disconnect_cb + self.receiver = None self.addr = None + self.port_proxy = None self.port = None self._reconnect_event = asyncio.Event() self._reconnect_task = None - async def start(self): + async def start(self, timeout=5, timer=5, timer_backoff=1.1): + self.timeout = timeout + self.timer = timer + self.timer_cur = timer + self.timer_backoff = timer_backoff self._reconnect_task = asyncio.create_task(self._reconnect()) - def update_address(self, addr, port): + def update_address(self, addr, port, port_proxy): self.addr = addr self.port = port - self._reconnect_event.set() + self.port_proxy = port_proxy + self.reconnect() + + async def trigger_proxy_task(self): + remote = AsyncioClient() + try: + try: + await remote.connect_rpc(self.addr, self.port, "coreanalyzer_proxy_control") + except: + logger.error("error connecting to analyzer proxy control", exc_info=True) + return + await remote.trigger() + except: + logger.error("analyzer proxy reported failure", exc_info=True) + finally: + remote.close_rpc() async def _reconnect(self): try: while True: await self._reconnect_event.wait() self._reconnect_event.clear() + if self.receiver is not None: + await self.receiver.close() + self.receiver = None + self.receiver = comm_analyzer.AnalyzerProxyReceiver( + self.receive_cb, self.disconnect_cb) try: - await asyncio.wait_for(self.reconnect_cr(), timeout=5) - except asyncio.CancelledError: - raise + await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy), + self.timeout) + logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy) + self.timer_cur = self.timer + continue except: - await asyncio.sleep(5) + logger.error("error connecting to analyzer proxy", exc_info=True) + try: + await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur) + except asyncio.TimeoutError: + self.timer_cur *= self.timer_backoff self._reconnect_event.set() + else: + self.timer_cur = self.timer except asyncio.CancelledError: pass async def close(self): self._reconnect_task.cancel() - await self.close_cr() - - def reconnect(self): - self._reconnect_event.set() - - async def reconnect_cr(self): - raise NotImplementedError - - async def close_cr(self): - raise NotImplementedError - - -class RPCProxyClient(_BaseProxyClient): - def __init__(self): - _BaseProxyClient.__init__(self) - self.client = AsyncioClient() - - async def trigger_proxy_task(self): - try: - await self.client.trigger() - except: - logger.error("analyzer proxy reported failure", exc_info=True) - - async def reconnect_cr(self): - try: - await self.client.connect_rpc(self.addr, - self.port, - "coreanalyzer_proxy_control") - logger.info("connected to analyzer proxy control %s:%d", self.addr, self.port) - except: - logger.error("error connecting to analyzer proxy control", exc_info=True) - raise - - async def close_cr(self): - try: - self.client.close_rpc() - except: - logger.error("error closing connection with analyzer proxy control", exc_info=True) - - -class ReceiverProxyClient(_BaseProxyClient): - def __init__(self, receiver): - _BaseProxyClient.__init__(self) - self.receiver = receiver - - async def reconnect_cr(self): - try: - await self.receiver.connect(self.addr, self.port) - logger.info("listening to analyzer proxy %s:%d", self.addr, self.port) - except: - logger.error("error connecting to analyzer proxy", exc_info=True) - raise - - async def close_cr(self): try: await self.receiver.close() except: logger.error("error closing connection to analyzer proxy", exc_info=True) + def reconnect(self): + self._reconnect_event.set() + class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget): def __init__(self, parent, rect): @@ -733,10 +718,7 @@ class WaveformDock(QtWidgets.QDockWidget): self._current_dir = os.getcwd() self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) - self.rpc_client = RPCProxyClient() - receiver = comm_analyzer.AnalyzerProxyReceiver( - self.on_dump_receive, self.on_proxy_disconnect) - self.receiver_client = ReceiverProxyClient(receiver) + self.proxy_client = ProxyClient(self.on_dump_receive, self.on_proxy_disconnect) grid = LayoutWidget() self.setWidget(grid) @@ -753,7 +735,7 @@ class WaveformDock(QtWidgets.QDockWidget): QtWidgets.QApplication.style().standardIcon( QtWidgets.QStyle.SP_BrowserReload)) self._request_dump_btn.clicked.connect( - lambda: asyncio.ensure_future(exc_to_warning(self.rpc_client.trigger_proxy_task()))) + lambda: asyncio.ensure_future(exc_to_warning(self.proxy_client.trigger_proxy_task()))) grid.addWidget(self._request_dump_btn, 0, 1) self._add_channel_dialog = _AddChannelDialog(self, self._channel_model) @@ -818,8 +800,7 @@ class WaveformDock(QtWidgets.QDockWidget): self._cursor_control.setTimescale(self._waveform_data['timescale']) def on_proxy_disconnect(self): - self.receiver_client.reconnect() - self.rpc_client.reconnect() + self.proxy_client.reconnect() async def load_trace(self): try: @@ -921,8 +902,7 @@ class WaveformDock(QtWidgets.QDockWidget): addr = desc["host"] port_proxy = desc.get("port_proxy", 1385) port = desc.get("port", 1386) - self.receiver_client.update_address(addr, port_proxy) - self.rpc_client.update_address(addr, port) + self.proxy_client.update_address(addr, port, port_proxy) def init_ddb(self, ddb): self._ddb = ddb diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index 17ec0527d..f6a1c4504 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -48,6 +48,15 @@ def get_argparser(): parser.add_argument( "-p", "--load-plugin", dest="plugin_modules", action="append", help="Python module to load on startup") + parser.add_argument( + "--analyzer-proxy-timeout", default=5, type=float, + help="connection timeout to core analyzer proxy") + parser.add_argument( + "--analyzer-proxy-timer", default=5, type=float, + help="retry timer to core analyzer proxy") + parser.add_argument( + "--analyzer-proxy-timer-backoff", default=1.1, type=float, + help="retry timer backoff multiplier to core analyzer proxy") common_args.verbosity_args(parser) return parser @@ -221,12 +230,14 @@ def main(): atexit_register_coroutine(d_ttl_dds.stop, loop=loop) d_waveform = waveform.WaveformDock() + loop.run_until_complete(d_waveform.proxy_client.start( + args.analyzer_proxy_timeout, + args.analyzer_proxy_timer, + args.analyzer_proxy_timer_backoff + )) + atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop) loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify)) atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop) - for name in ["rpc_client", "receiver_client"]: - client = getattr(d_waveform, name) - loop.run_until_complete(client.start()) - atexit_register_coroutine(client.close, loop=loop) d_schedule = schedule.ScheduleDock( rpc_clients["schedule"], sub_clients["schedule"])