From 7fd6dead8fd98798f2d023af921dfe65279e49e2 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 11 Jan 2023 18:46:54 +0800 Subject: [PATCH] master: fix asyncio loop management --- artiq/frontend/artiq_master.py | 18 +++++++++--------- artiq/master/experiments.py | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index da57e6b3d..ced3c828c 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -86,7 +86,7 @@ def main(): server_broadcast = Broadcaster() loop.run_until_complete(server_broadcast.start( bind, args.port_broadcast)) - atexit_register_coroutine(server_broadcast.stop) + atexit_register_coroutine(server_broadcast.stop, loop=loop) log_forwarder.callback = (lambda msg: server_broadcast.broadcast("log", msg)) @@ -100,8 +100,8 @@ def main(): device_db = DeviceDB(args.device_db) dataset_db = DatasetDB(args.dataset_db) - dataset_db.start() - atexit_register_coroutine(dataset_db.stop) + dataset_db.start(loop=loop) + atexit_register_coroutine(dataset_db.stop, loop=loop) worker_handlers = dict() if args.git: @@ -113,8 +113,8 @@ def main(): atexit.register(experiment_db.close) scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions) - scheduler.start() - atexit_register_coroutine(scheduler.stop) + scheduler.start(loop=loop) + atexit_register_coroutine(scheduler.stop, loop=loop) config = MasterConfig(args.name) @@ -131,7 +131,7 @@ def main(): "scheduler_check_termination": scheduler.check_termination, "ccb_issue": ccb_issue, }) - experiment_db.scan_repository_async() + experiment_db.scan_repository_async(loop=loop) server_control = RPCServer({ "master_config": config, @@ -142,7 +142,7 @@ def main(): }, allow_parallel=True) loop.run_until_complete(server_control.start( bind, args.port_control)) - atexit_register_coroutine(server_control.stop) + atexit_register_coroutine(server_control.stop, loop=loop) server_notify = Publisher({ "schedule": scheduler.notifier, @@ -153,12 +153,12 @@ def main(): }) loop.run_until_complete(server_notify.start( bind, args.port_notify)) - atexit_register_coroutine(server_notify.stop) + atexit_register_coroutine(server_notify.stop, loop=loop) server_logging = LoggingServer() loop.run_until_complete(server_logging.start( bind, args.port_logging)) - atexit_register_coroutine(server_logging.stop) + atexit_register_coroutine(server_logging.stop, loop=loop) print("ARTIQ master is now ready.") loop.run_until_complete(signal_handler.wait_terminate()) diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index 098ddc334..df29f2a6b 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -124,9 +124,9 @@ class ExperimentDB: self._scanning = False self.status["scanning"] = False - def scan_repository_async(self, new_cur_rev=None): + def scan_repository_async(self, new_cur_rev=None, loop=None): asyncio.ensure_future( - exc_to_warning(self.scan_repository(new_cur_rev))) + exc_to_warning(self.scan_repository(new_cur_rev)), loop=loop) async def examine(self, filename, use_repository=True, revision=None): if use_repository: