master: fix asyncio loop management

This commit is contained in:
Sebastien Bourdeauducq 2023-01-11 18:46:54 +08:00
parent 73a4ef89ec
commit 7fd6dead8f
2 changed files with 11 additions and 11 deletions

View File

@ -86,7 +86,7 @@ def main():
server_broadcast = Broadcaster() server_broadcast = Broadcaster()
loop.run_until_complete(server_broadcast.start( loop.run_until_complete(server_broadcast.start(
bind, args.port_broadcast)) bind, args.port_broadcast))
atexit_register_coroutine(server_broadcast.stop) atexit_register_coroutine(server_broadcast.stop, loop=loop)
log_forwarder.callback = (lambda msg: log_forwarder.callback = (lambda msg:
server_broadcast.broadcast("log", msg)) server_broadcast.broadcast("log", msg))
@ -100,8 +100,8 @@ def main():
device_db = DeviceDB(args.device_db) device_db = DeviceDB(args.device_db)
dataset_db = DatasetDB(args.dataset_db) dataset_db = DatasetDB(args.dataset_db)
dataset_db.start() dataset_db.start(loop=loop)
atexit_register_coroutine(dataset_db.stop) atexit_register_coroutine(dataset_db.stop, loop=loop)
worker_handlers = dict() worker_handlers = dict()
if args.git: if args.git:
@ -113,8 +113,8 @@ def main():
atexit.register(experiment_db.close) atexit.register(experiment_db.close)
scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions) scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions)
scheduler.start() scheduler.start(loop=loop)
atexit_register_coroutine(scheduler.stop) atexit_register_coroutine(scheduler.stop, loop=loop)
config = MasterConfig(args.name) config = MasterConfig(args.name)
@ -131,7 +131,7 @@ def main():
"scheduler_check_termination": scheduler.check_termination, "scheduler_check_termination": scheduler.check_termination,
"ccb_issue": ccb_issue, "ccb_issue": ccb_issue,
}) })
experiment_db.scan_repository_async() experiment_db.scan_repository_async(loop=loop)
server_control = RPCServer({ server_control = RPCServer({
"master_config": config, "master_config": config,
@ -142,7 +142,7 @@ def main():
}, allow_parallel=True) }, allow_parallel=True)
loop.run_until_complete(server_control.start( loop.run_until_complete(server_control.start(
bind, args.port_control)) bind, args.port_control))
atexit_register_coroutine(server_control.stop) atexit_register_coroutine(server_control.stop, loop=loop)
server_notify = Publisher({ server_notify = Publisher({
"schedule": scheduler.notifier, "schedule": scheduler.notifier,
@ -153,12 +153,12 @@ def main():
}) })
loop.run_until_complete(server_notify.start( loop.run_until_complete(server_notify.start(
bind, args.port_notify)) bind, args.port_notify))
atexit_register_coroutine(server_notify.stop) atexit_register_coroutine(server_notify.stop, loop=loop)
server_logging = LoggingServer() server_logging = LoggingServer()
loop.run_until_complete(server_logging.start( loop.run_until_complete(server_logging.start(
bind, args.port_logging)) bind, args.port_logging))
atexit_register_coroutine(server_logging.stop) atexit_register_coroutine(server_logging.stop, loop=loop)
print("ARTIQ master is now ready.") print("ARTIQ master is now ready.")
loop.run_until_complete(signal_handler.wait_terminate()) loop.run_until_complete(signal_handler.wait_terminate())

View File

@ -124,9 +124,9 @@ class ExperimentDB:
self._scanning = False self._scanning = False
self.status["scanning"] = False self.status["scanning"] = False
def scan_repository_async(self, new_cur_rev=None): def scan_repository_async(self, new_cur_rev=None, loop=None):
asyncio.ensure_future( asyncio.ensure_future(
exc_to_warning(self.scan_repository(new_cur_rev))) exc_to_warning(self.scan_repository(new_cur_rev)), loop=loop)
async def examine(self, filename, use_repository=True, revision=None): async def examine(self, filename, use_repository=True, revision=None):
if use_repository: if use_repository: