artiq/artiq/frontend/aqctl_coreanalyzer_proxy.py

113 lines
3.4 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
2023-12-08 18:55:07 +08:00
import argparse
import asyncio
import atexit
import logging
2023-12-08 18:56:10 +08:00
2023-12-08 18:55:07 +08:00
from sipyco.asyncio_tools import AsyncioServer, SignalHandler, atexit_register_coroutine
from sipyco.pc_rpc import Server
from sipyco import common_args
2024-02-14 17:39:14 +08:00
from artiq.coredevice.comm_analyzer import get_analyzer_dump, ANALYZER_MAGIC
2023-12-08 18:55:07 +08:00
2023-12-08 18:56:10 +08:00
2023-12-08 18:55:07 +08:00
logger = logging.getLogger(__name__)
# simplified version of sipyco Broadcaster
class ProxyServer(AsyncioServer):
2023-12-13 13:07:35 +08:00
def __init__(self, queue_limit=8):
2023-12-08 18:55:07 +08:00
AsyncioServer.__init__(self)
self._recipients = set()
self._queue_limit = queue_limit
async def _handle_connection_cr(self, reader, writer):
try:
2024-02-14 17:39:14 +08:00
writer.write(ANALYZER_MAGIC)
2023-12-08 18:55:07 +08:00
queue = asyncio.Queue(self._queue_limit)
self._recipients.add(queue)
try:
while True:
dump = await queue.get()
writer.write(dump)
# raise exception on connection error
await writer.drain()
finally:
self._recipients.remove(queue)
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError):
# receivers disconnecting are a normal occurence
pass
finally:
writer.close()
2023-12-13 13:07:35 +08:00
def distribute(self, dump):
2023-12-08 18:55:07 +08:00
for recipient in self._recipients:
2023-12-08 18:56:10 +08:00
recipient.put_nowait(dump)
2023-12-08 18:55:07 +08:00
class ProxyControl:
2023-12-13 13:07:35 +08:00
def __init__(self, distribute_cb, core_addr, core_port=1382):
self.distribute_cb = distribute_cb
2023-12-08 18:55:07 +08:00
self.core_addr = core_addr
self.core_port = core_port
def ping(self):
return True
2023-12-13 13:07:35 +08:00
def trigger(self):
2023-12-08 18:55:07 +08:00
try:
dump = get_analyzer_dump(self.core_addr, self.core_port)
2023-12-13 13:07:35 +08:00
self.distribute_cb(dump)
2023-12-08 18:55:07 +08:00
except:
2023-12-13 13:07:35 +08:00
logger.warning("Trigger failed:", exc_info=True)
raise
2023-12-08 18:55:07 +08:00
def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ core analyzer proxy")
common_args.verbosity_args(parser)
common_args.simple_network_args(parser, [
("proxy", "proxying", 1385),
("control", "control", 1386)
])
parser.add_argument("core_addr", metavar="CORE_ADDR",
help="hostname or IP address of the core device")
return parser
def main():
args = get_argparser().parse_args()
common_args.init_logger_from_args(args)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
atexit.register(loop.close)
signal_handler = SignalHandler()
signal_handler.setup()
atexit.register(signal_handler.teardown)
bind_address = common_args.bind_address_from_args(args)
proxy_server = ProxyServer()
loop.run_until_complete(proxy_server.start(bind_address, args.port_proxy))
atexit_register_coroutine(proxy_server.stop, loop=loop)
2023-12-13 13:07:35 +08:00
controller = ProxyControl(proxy_server.distribute, args.core_addr)
2023-12-08 18:55:07 +08:00
server = Server({"coreanalyzer_proxy_control": controller}, None, True)
loop.run_until_complete(server.start(bind_address, args.port_control))
atexit_register_coroutine(server.stop, loop=loop)
_, pending = loop.run_until_complete(asyncio.wait(
[loop.create_task(signal_handler.wait_terminate()),
loop.create_task(server.wait_terminate())],
return_when=asyncio.FIRST_COMPLETED))
for task in pending:
task.cancel()
if __name__ == "__main__":
main()