From 4166f4e928915797e3e7bb4b98f443e5e34410b6 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 11 Nov 2015 16:22:12 +0800 Subject: [PATCH] frontend: use atexit_register_coroutine in other tools --- artiq/frontend/artiq_ctlmgr.py | 10 +++++----- artiq/frontend/artiq_gui.py | 8 +------- artiq/frontend/artiq_influxdb.py | 11 +++++------ artiq/frontend/artiq_master.py | 13 +++++++------ artiq/tools.py | 13 +++++++++++++ 5 files changed, 31 insertions(+), 24 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index b8cf0a16e..9c672d187 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -15,7 +15,7 @@ from artiq.protocols.pc_rpc import AsyncioClient, Server from artiq.protocols.logging import (LogForwarder, parse_log_message, log_with_name, SourceFilter) -from artiq.tools import TaskObject, Condition +from artiq.tools import TaskObject, Condition, atexit_register_coroutine logger = logging.getLogger(__name__) @@ -311,19 +311,19 @@ def main(): asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() - atexit.register(lambda: loop.close()) + atexit.register(loop.close) logfwd = LogForwarder(args.server, args.port_logging, args.retry_master) logfwd.addFilter(source_adder) root_logger.addHandler(logfwd) logfwd.start() - atexit.register(lambda: loop.run_until_complete(logfwd.stop())) + atexit_register_coroutine(logfwd.stop) ctlmgr = ControllerManager(args.server, args.port_notify, args.retry_master) ctlmgr.start() - atexit.register(lambda: loop.run_until_complete(ctlmgr.stop())) + atexit_register_coroutine(ctlmgr.stop) class CtlMgrRPC: retry_now = ctlmgr.retry_now @@ -331,7 +331,7 @@ def main(): rpc_target = CtlMgrRPC() rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True) loop.run_until_complete(rpc_server.start(args.bind, args.bind_port)) - atexit.register(lambda: loop.run_until_complete(rpc_server.stop())) + atexit_register_coroutine(rpc_server.stop) loop.run_until_complete(rpc_server.wait_terminate()) diff --git a/artiq/frontend/artiq_gui.py b/artiq/frontend/artiq_gui.py index 963877cc0..a55a5e532 100755 --- a/artiq/frontend/artiq_gui.py +++ b/artiq/frontend/artiq_gui.py @@ -10,7 +10,7 @@ import os from quamash import QEventLoop, QtGui, QtCore from pyqtgraph import dockarea -from artiq.tools import verbosity_args, init_logger, artiq_dir +from artiq.tools import * from artiq.protocols.pc_rpc import AsyncioClient from artiq.gui.models import ModelSubscriber from artiq.gui import state, explorer, moninj, datasets, schedule, log, console @@ -52,12 +52,6 @@ class MainWindow(QtGui.QMainWindow): self.restoreGeometry(QtCore.QByteArray(state)) -def atexit_register_coroutine(coroutine, loop=None): - if loop is None: - loop = asyncio.get_event_loop() - atexit.register(lambda: loop.run_until_complete(coroutine())) - - def main(): args = get_argparser().parse_args() init_logger(args) diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py index 558435ef3..5fa189a97 100755 --- a/artiq/frontend/artiq_influxdb.py +++ b/artiq/frontend/artiq_influxdb.py @@ -10,8 +10,7 @@ from functools import partial import numpy as np import aiohttp -from artiq.tools import verbosity_args, init_logger -from artiq.tools import TaskObject +from artiq.tools import * from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import Server from artiq.protocols import pyon @@ -239,23 +238,23 @@ def main(): init_logger(args) loop = asyncio.get_event_loop() - atexit.register(lambda: loop.close()) + atexit.register(loop.close) writer = DBWriter(args.baseurl_db, args.user_db, args.password_db, args.database, args.table) writer.start() - atexit.register(lambda: loop.run_until_complete(writer.stop())) + atexit_register_coroutine(writer.stop) filter = Filter(args.pattern_file) rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True) loop.run_until_complete(rpc_server.start(args.bind, args.bind_port)) - atexit.register(lambda: loop.run_until_complete(rpc_server.stop())) + atexit_register_coroutine(rpc_server.stop) reader = MasterReader(args.server_master, args.port_master, args.retry_master, filter._filter, writer) reader.start() - atexit.register(lambda: loop.run_until_complete(reader.stop())) + atexit_register_coroutine(reader.stop) loop.run_until_complete(rpc_server.wait_terminate()) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 3b1c89dbe..77ec35a38 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -5,6 +5,7 @@ import argparse import atexit import os +from artiq.tools import atexit_register_coroutine from artiq.protocols.pc_rpc import Server as RPCServer from artiq.protocols.sync_struct import Publisher from artiq.protocols.logging import Server as LoggingServer @@ -59,12 +60,12 @@ def main(): asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() - atexit.register(lambda: loop.close()) + atexit.register(loop.close) device_db = DeviceDB(args.device_db) dataset_db = DatasetDB(args.dataset_db) dataset_db.start() - atexit.register(lambda: loop.run_until_complete(dataset_db.stop())) + atexit_register_coroutine(dataset_db.stop) if args.git: repo_backend = GitBackend(args.repository) @@ -90,7 +91,7 @@ def main(): "scheduler_get_status": scheduler.get_status }) scheduler.start() - atexit.register(lambda: loop.run_until_complete(scheduler.stop())) + atexit_register_coroutine(scheduler.stop) server_control = RPCServer({ "master_device_db": device_db, @@ -100,7 +101,7 @@ def main(): }) loop.run_until_complete(server_control.start( args.bind, args.port_control)) - atexit.register(lambda: loop.run_until_complete(server_control.stop())) + atexit_register_coroutine(server_control.stop) server_notify = Publisher({ "schedule": scheduler.notifier, @@ -111,12 +112,12 @@ def main(): }) loop.run_until_complete(server_notify.start( args.bind, args.port_notify)) - atexit.register(lambda: loop.run_until_complete(server_notify.stop())) + atexit_register_coroutine(server_notify.stop) server_logging = LoggingServer() loop.run_until_complete(server_logging.start( args.bind, args.port_logging)) - atexit.register(lambda: loop.run_until_complete(server_logging.stop())) + atexit_register_coroutine(server_logging.stop) loop.run_forever() diff --git a/artiq/tools.py b/artiq/tools.py index cde22a629..dae3c2424 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -7,6 +7,7 @@ import asyncio import time import collections import os +import atexit import numpy as np @@ -14,6 +15,12 @@ from artiq.language.environment import is_experiment from artiq.protocols import pyon +__all__ = ["artiq_dir", "parse_arguments", "elide", "short_format", "file_import", + "get_experiment", "verbosity_args", "simple_network_args", "init_logger", + "atexit_register_coroutine", "exc_to_warning", "asyncio_wait_or_cancel", + "TaskObject", "Condition", "workaround_asyncio263"] + + logger = logging.getLogger(__name__) artiq_dir = os.path.join(os.path.abspath(os.path.dirname(__file__))) @@ -120,6 +127,12 @@ def init_logger(args): logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10) +def atexit_register_coroutine(coroutine, loop=None): + if loop is None: + loop = asyncio.get_event_loop() + atexit.register(lambda: loop.run_until_complete(coroutine())) + + async def exc_to_warning(coro): try: await coro