master: fix asyncio loop management

This commit is contained in:
Sebastien Bourdeauducq 2023-01-11 18:46:54 +08:00
parent 73a4ef89ec
commit 7fd6dead8f
2 changed files with 11 additions and 11 deletions

View File

@ -86,7 +86,7 @@ def main():
server_broadcast = Broadcaster()
loop.run_until_complete(server_broadcast.start(
bind, args.port_broadcast))
atexit_register_coroutine(server_broadcast.stop)
atexit_register_coroutine(server_broadcast.stop, loop=loop)
log_forwarder.callback = (lambda msg:
server_broadcast.broadcast("log", msg))
@ -100,8 +100,8 @@ def main():
device_db = DeviceDB(args.device_db)
dataset_db = DatasetDB(args.dataset_db)
dataset_db.start()
atexit_register_coroutine(dataset_db.stop)
dataset_db.start(loop=loop)
atexit_register_coroutine(dataset_db.stop, loop=loop)
worker_handlers = dict()
if args.git:
@ -113,8 +113,8 @@ def main():
atexit.register(experiment_db.close)
scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions)
scheduler.start()
atexit_register_coroutine(scheduler.stop)
scheduler.start(loop=loop)
atexit_register_coroutine(scheduler.stop, loop=loop)
config = MasterConfig(args.name)
@ -131,7 +131,7 @@ def main():
"scheduler_check_termination": scheduler.check_termination,
"ccb_issue": ccb_issue,
})
experiment_db.scan_repository_async()
experiment_db.scan_repository_async(loop=loop)
server_control = RPCServer({
"master_config": config,
@ -142,7 +142,7 @@ def main():
}, allow_parallel=True)
loop.run_until_complete(server_control.start(
bind, args.port_control))
atexit_register_coroutine(server_control.stop)
atexit_register_coroutine(server_control.stop, loop=loop)
server_notify = Publisher({
"schedule": scheduler.notifier,
@ -153,12 +153,12 @@ def main():
})
loop.run_until_complete(server_notify.start(
bind, args.port_notify))
atexit_register_coroutine(server_notify.stop)
atexit_register_coroutine(server_notify.stop, loop=loop)
server_logging = LoggingServer()
loop.run_until_complete(server_logging.start(
bind, args.port_logging))
atexit_register_coroutine(server_logging.stop)
atexit_register_coroutine(server_logging.stop, loop=loop)
print("ARTIQ master is now ready.")
loop.run_until_complete(signal_handler.wait_terminate())

View File

@ -124,9 +124,9 @@ class ExperimentDB:
self._scanning = False
self.status["scanning"] = False
def scan_repository_async(self, new_cur_rev=None):
def scan_repository_async(self, new_cur_rev=None, loop=None):
asyncio.ensure_future(
exc_to_warning(self.scan_repository(new_cur_rev)))
exc_to_warning(self.scan_repository(new_cur_rev)), loop=loop)
async def examine(self, filename, use_repository=True, revision=None):
if use_repository: