From 7cff63e5394943b6f59aca5183f39f19d77add11 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 27 May 2022 15:17:27 +0800 Subject: [PATCH] frontend: use sipyco SignalHandler (#1063) --- artiq/frontend/aqctl_corelog.py | 32 ++++++++++++++++--------- artiq/frontend/aqctl_moninj_proxy.py | 35 +++++++++++++++++----------- artiq/frontend/artiq_master.py | 7 ++++-- artiq/frontend/artiq_rtiomon.py | 25 +++++++++++++------- 4 files changed, 64 insertions(+), 35 deletions(-) diff --git a/artiq/frontend/aqctl_corelog.py b/artiq/frontend/aqctl_corelog.py index 95bbfe023..c86c276d5 100755 --- a/artiq/frontend/aqctl_corelog.py +++ b/artiq/frontend/aqctl_corelog.py @@ -9,6 +9,7 @@ import re from sipyco.pc_rpc import Server from sipyco import common_args from sipyco.logging_tools import log_with_name +from sipyco.asyncio_tools import SignalHandler from artiq.coredevice.comm_mgmt import Request, Reply @@ -77,21 +78,30 @@ def main(): loop = asyncio.get_event_loop() try: - get_logs_task = asyncio.ensure_future( - get_logs_sim(args.core_addr) if args.simulation else get_logs(args.core_addr)) + signal_handler = SignalHandler() + signal_handler.setup() try: - server = Server({"corelog": PingTarget()}, None, True) - loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port)) + get_logs_task = asyncio.ensure_future( + get_logs_sim(args.core_addr) if args.simulation else get_logs(args.core_addr)) try: - loop.run_until_complete(server.wait_terminate()) + server = Server({"corelog": PingTarget()}, None, True) + loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port)) + try: + _, pending = loop.run_until_complete(asyncio.wait( + [signal_handler.wait_terminate(), server.wait_terminate()], + return_when=asyncio.FIRST_COMPLETED)) + for task in pending: + task.cancel() + finally: + loop.run_until_complete(server.stop()) finally: - loop.run_until_complete(server.stop()) + get_logs_task.cancel() + try: + loop.run_until_complete(get_logs_task) + except asyncio.CancelledError: + pass finally: - get_logs_task.cancel() - try: - loop.run_until_complete(get_logs_task) - except asyncio.CancelledError: - pass + signal_handler.teardown() finally: loop.close() diff --git a/artiq/frontend/aqctl_moninj_proxy.py b/artiq/frontend/aqctl_moninj_proxy.py index 0ce317fba..5d7aca2a3 100755 --- a/artiq/frontend/aqctl_moninj_proxy.py +++ b/artiq/frontend/aqctl_moninj_proxy.py @@ -6,7 +6,7 @@ import asyncio import struct from enum import Enum -from sipyco.asyncio_tools import AsyncioServer +from sipyco.asyncio_tools import AsyncioServer, SignalHandler from sipyco.pc_rpc import Server from sipyco import common_args @@ -198,24 +198,33 @@ def main(): loop = asyncio.get_event_loop() try: - monitor_mux = MonitorMux() - comm_moninj = CommMonInj(monitor_mux.monitor_cb, monitor_mux.injection_status_cb) - monitor_mux.comm_moninj = comm_moninj - loop.run_until_complete(comm_moninj.connect(args.core_addr)) + signal_handler = SignalHandler() + signal_handler.setup() try: - proxy_server = ProxyServer(monitor_mux) - loop.run_until_complete(proxy_server.start(bind_address, args.port_proxy)) + monitor_mux = MonitorMux() + comm_moninj = CommMonInj(monitor_mux.monitor_cb, monitor_mux.injection_status_cb) + monitor_mux.comm_moninj = comm_moninj + loop.run_until_complete(comm_moninj.connect(args.core_addr)) try: - server = Server({"moninj_proxy": PingTarget()}, None, True) - loop.run_until_complete(server.start(bind_address, args.port_control)) + proxy_server = ProxyServer(monitor_mux) + loop.run_until_complete(proxy_server.start(bind_address, args.port_proxy)) try: - loop.run_until_complete(server.wait_terminate()) + server = Server({"moninj_proxy": PingTarget()}, None, True) + loop.run_until_complete(server.start(bind_address, args.port_control)) + try: + _, pending = loop.run_until_complete(asyncio.wait( + [signal_handler.wait_terminate(), server.wait_terminate()], + return_when=asyncio.FIRST_COMPLETED)) + for task in pending: + task.cancel() + finally: + loop.run_until_complete(server.stop()) finally: - loop.run_until_complete(server.stop()) + loop.run_until_complete(proxy_server.stop()) finally: - loop.run_until_complete(proxy_server.stop()) + loop.run_until_complete(comm_moninj.close()) finally: - loop.run_until_complete(comm_moninj.close()) + signal_handler.teardown() finally: loop.close() diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 83c6615ae..2b8893d07 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -10,7 +10,7 @@ from sipyco.sync_struct import Publisher from sipyco.logging_tools import Server as LoggingServer from sipyco.broadcast import Broadcaster from sipyco import common_args -from sipyco.asyncio_tools import atexit_register_coroutine +from sipyco.asyncio_tools import atexit_register_coroutine, SignalHandler from artiq import __version__ as artiq_version from artiq.master.log import log_args, init_log @@ -76,6 +76,9 @@ def main(): log_forwarder = init_log(args) loop = asyncio.get_event_loop() atexit.register(loop.close) + signal_handler = SignalHandler() + signal_handler.setup() + atexit.register(signal_handler.teardown) bind = common_args.bind_address_from_args(args) server_broadcast = Broadcaster() @@ -155,7 +158,7 @@ def main(): atexit_register_coroutine(server_logging.stop) print("ARTIQ master is now ready.") - loop.run_forever() + loop.run_until_complete(signal_handler.wait_terminate()) if __name__ == "__main__": main() diff --git a/artiq/frontend/artiq_rtiomon.py b/artiq/frontend/artiq_rtiomon.py index eed6a9074..8649f8d5f 100755 --- a/artiq/frontend/artiq_rtiomon.py +++ b/artiq/frontend/artiq_rtiomon.py @@ -3,6 +3,8 @@ import argparse import asyncio +from sipyco.asyncio_tools import SignalHandler + from artiq.coredevice.comm_moninj import * @@ -18,19 +20,24 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - + loop = asyncio.get_event_loop() try: - comm = CommMonInj( - lambda channel, probe, value: print("0x{:06x}: {}".format(channel, value)), - lambda channel, override, value: None) - loop.run_until_complete(comm.connect(args.core_addr)) + signal_handler = SignalHandler() + signal_handler.setup() try: - for channel in args.channel: - comm.monitor_probe(True, channel, 0) - loop.run_forever() + comm = CommMonInj( + lambda channel, probe, value: print("0x{:06x}: {}".format(channel, value)), + lambda channel, override, value: None) + loop.run_until_complete(comm.connect(args.core_addr)) + try: + for channel in args.channel: + comm.monitor_probe(True, channel, 0) + loop.run_until_complete(signal_handler.wait_terminate()) + finally: + loop.run_until_complete(comm.close()) finally: - loop.run_until_complete(comm.close()) + signal_handler.teardown() finally: loop.close()