2
0
mirror of https://github.com/m-labs/artiq.git synced 2024-12-26 11:48:27 +08:00

dashboard: change analyzer proxy client

This commit is contained in:
Simon Renblad 2024-02-21 13:37:47 +08:00 committed by Sébastien Bourdeauducq
parent 7c1274f254
commit 0a24d72b9f
2 changed files with 67 additions and 76 deletions

View File

@ -28,97 +28,82 @@ WAVEFORM_MIN_HEIGHT = 50
WAVEFORM_MAX_HEIGHT = 200 WAVEFORM_MAX_HEIGHT = 200
class _BaseProxyClient: class ProxyClient():
def __init__(self): def __init__(self, receive_cb, disconnect_cb):
self.receive_cb = receive_cb
self.disconnect_cb = disconnect_cb
self.receiver = None
self.addr = None self.addr = None
self.port_proxy = None
self.port = None self.port = None
self._reconnect_event = asyncio.Event() self._reconnect_event = asyncio.Event()
self._reconnect_task = None self._reconnect_task = None
async def start(self): async def start(self, timeout=5, timer=5, timer_backoff=1.1):
self.timeout = timeout
self.timer = timer
self.timer_cur = timer
self.timer_backoff = timer_backoff
self._reconnect_task = asyncio.create_task(self._reconnect()) self._reconnect_task = asyncio.create_task(self._reconnect())
def update_address(self, addr, port): def update_address(self, addr, port, port_proxy):
self.addr = addr self.addr = addr
self.port = port self.port = port
self._reconnect_event.set() self.port_proxy = port_proxy
self.reconnect()
async def trigger_proxy_task(self):
remote = AsyncioClient()
try:
try:
await remote.connect_rpc(self.addr, self.port, "coreanalyzer_proxy_control")
except:
logger.error("error connecting to analyzer proxy control", exc_info=True)
return
await remote.trigger()
except:
logger.error("analyzer proxy reported failure", exc_info=True)
finally:
remote.close_rpc()
async def _reconnect(self): async def _reconnect(self):
try: try:
while True: while True:
await self._reconnect_event.wait() await self._reconnect_event.wait()
self._reconnect_event.clear() self._reconnect_event.clear()
if self.receiver is not None:
await self.receiver.close()
self.receiver = None
self.receiver = comm_analyzer.AnalyzerProxyReceiver(
self.receive_cb, self.disconnect_cb)
try: try:
await asyncio.wait_for(self.reconnect_cr(), timeout=5) await asyncio.wait_for(self.receiver.connect(self.addr, self.port_proxy),
except asyncio.CancelledError: self.timeout)
raise logger.info("connected to analyzer proxy %s:%d", self.addr, self.port_proxy)
self.timer_cur = self.timer
continue
except: except:
await asyncio.sleep(5) logger.error("error connecting to analyzer proxy", exc_info=True)
try:
await asyncio.wait_for(self._reconnect_event.wait(), self.timer_cur)
except asyncio.TimeoutError:
self.timer_cur *= self.timer_backoff
self._reconnect_event.set() self._reconnect_event.set()
else:
self.timer_cur = self.timer
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
async def close(self): async def close(self):
self._reconnect_task.cancel() self._reconnect_task.cancel()
await self.close_cr()
def reconnect(self):
self._reconnect_event.set()
async def reconnect_cr(self):
raise NotImplementedError
async def close_cr(self):
raise NotImplementedError
class RPCProxyClient(_BaseProxyClient):
def __init__(self):
_BaseProxyClient.__init__(self)
self.client = AsyncioClient()
async def trigger_proxy_task(self):
try:
await self.client.trigger()
except:
logger.error("analyzer proxy reported failure", exc_info=True)
async def reconnect_cr(self):
try:
await self.client.connect_rpc(self.addr,
self.port,
"coreanalyzer_proxy_control")
logger.info("connected to analyzer proxy control %s:%d", self.addr, self.port)
except:
logger.error("error connecting to analyzer proxy control", exc_info=True)
raise
async def close_cr(self):
try:
self.client.close_rpc()
except:
logger.error("error closing connection with analyzer proxy control", exc_info=True)
class ReceiverProxyClient(_BaseProxyClient):
def __init__(self, receiver):
_BaseProxyClient.__init__(self)
self.receiver = receiver
async def reconnect_cr(self):
try:
await self.receiver.connect(self.addr, self.port)
logger.info("listening to analyzer proxy %s:%d", self.addr, self.port)
except:
logger.error("error connecting to analyzer proxy", exc_info=True)
raise
async def close_cr(self):
try: try:
await self.receiver.close() await self.receiver.close()
except: except:
logger.error("error closing connection to analyzer proxy", exc_info=True) logger.error("error closing connection to analyzer proxy", exc_info=True)
def reconnect(self):
self._reconnect_event.set()
class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget): class _BackgroundItem(pg.GraphicsWidgetAnchor, pg.GraphicsWidget):
def __init__(self, parent, rect): def __init__(self, parent, rect):
@ -733,10 +718,7 @@ class WaveformDock(QtWidgets.QDockWidget):
self._current_dir = os.getcwd() self._current_dir = os.getcwd()
self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb) self.devices_sub = Subscriber("devices", self.init_ddb, self.update_ddb)
self.rpc_client = RPCProxyClient() self.proxy_client = ProxyClient(self.on_dump_receive, self.on_proxy_disconnect)
receiver = comm_analyzer.AnalyzerProxyReceiver(
self.on_dump_receive, self.on_proxy_disconnect)
self.receiver_client = ReceiverProxyClient(receiver)
grid = LayoutWidget() grid = LayoutWidget()
self.setWidget(grid) self.setWidget(grid)
@ -753,7 +735,7 @@ class WaveformDock(QtWidgets.QDockWidget):
QtWidgets.QApplication.style().standardIcon( QtWidgets.QApplication.style().standardIcon(
QtWidgets.QStyle.SP_BrowserReload)) QtWidgets.QStyle.SP_BrowserReload))
self._request_dump_btn.clicked.connect( self._request_dump_btn.clicked.connect(
lambda: asyncio.ensure_future(exc_to_warning(self.rpc_client.trigger_proxy_task()))) lambda: asyncio.ensure_future(exc_to_warning(self.proxy_client.trigger_proxy_task())))
grid.addWidget(self._request_dump_btn, 0, 1) grid.addWidget(self._request_dump_btn, 0, 1)
self._add_channel_dialog = _AddChannelDialog(self, self._channel_model) self._add_channel_dialog = _AddChannelDialog(self, self._channel_model)
@ -818,8 +800,7 @@ class WaveformDock(QtWidgets.QDockWidget):
self._cursor_control.setTimescale(self._waveform_data['timescale']) self._cursor_control.setTimescale(self._waveform_data['timescale'])
def on_proxy_disconnect(self): def on_proxy_disconnect(self):
self.receiver_client.reconnect() self.proxy_client.reconnect()
self.rpc_client.reconnect()
async def load_trace(self): async def load_trace(self):
try: try:
@ -921,8 +902,7 @@ class WaveformDock(QtWidgets.QDockWidget):
addr = desc["host"] addr = desc["host"]
port_proxy = desc.get("port_proxy", 1385) port_proxy = desc.get("port_proxy", 1385)
port = desc.get("port", 1386) port = desc.get("port", 1386)
self.receiver_client.update_address(addr, port_proxy) self.proxy_client.update_address(addr, port, port_proxy)
self.rpc_client.update_address(addr, port)
def init_ddb(self, ddb): def init_ddb(self, ddb):
self._ddb = ddb self._ddb = ddb

View File

@ -48,6 +48,15 @@ def get_argparser():
parser.add_argument( parser.add_argument(
"-p", "--load-plugin", dest="plugin_modules", action="append", "-p", "--load-plugin", dest="plugin_modules", action="append",
help="Python module to load on startup") help="Python module to load on startup")
parser.add_argument(
"--analyzer-proxy-timeout", default=5, type=float,
help="connection timeout to core analyzer proxy")
parser.add_argument(
"--analyzer-proxy-timer", default=5, type=float,
help="retry timer to core analyzer proxy")
parser.add_argument(
"--analyzer-proxy-timer-backoff", default=1.1, type=float,
help="retry timer backoff multiplier to core analyzer proxy")
common_args.verbosity_args(parser) common_args.verbosity_args(parser)
return parser return parser
@ -221,12 +230,14 @@ def main():
atexit_register_coroutine(d_ttl_dds.stop, loop=loop) atexit_register_coroutine(d_ttl_dds.stop, loop=loop)
d_waveform = waveform.WaveformDock() d_waveform = waveform.WaveformDock()
loop.run_until_complete(d_waveform.proxy_client.start(
args.analyzer_proxy_timeout,
args.analyzer_proxy_timer,
args.analyzer_proxy_timer_backoff
))
atexit_register_coroutine(d_waveform.proxy_client.close, loop=loop)
loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify)) loop.run_until_complete(d_waveform.devices_sub.connect(args.server, args.port_notify))
atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop) atexit_register_coroutine(d_waveform.devices_sub.close, loop=loop)
for name in ["rpc_client", "receiver_client"]:
client = getattr(d_waveform, name)
loop.run_until_complete(client.start())
atexit_register_coroutine(client.close, loop=loop)
d_schedule = schedule.ScheduleDock( d_schedule = schedule.ScheduleDock(
rpc_clients["schedule"], sub_clients["schedule"]) rpc_clients["schedule"], sub_clients["schedule"])