From e7d85c5b87aa242924d993a24bb8ca04af919751 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 7 Feb 2015 23:52:05 +0800 Subject: [PATCH] ctlmgr: basic working implementation --- artiq/frontend/artiq_ctlmgr.py | 103 ++++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 27 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index 74b2b5191..bd9332405 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -4,6 +4,7 @@ import asyncio import argparse import os import logging +import signal from artiq.protocols.sync_struct import Subscriber from artiq.tools import verbosity_args, init_logger @@ -24,30 +25,74 @@ def get_argparser(): parser.add_argument( "--retry-master", default=5.0, type=float, help="retry timer for reconnecting to master") + parser.add_argument( + "--retry-command", default=5.0, type=float, + help="retry timer for restarting a controller command") return parser +class Controller: + def __init__(self, name, command, retry): + self.launch_task = asyncio.Task(self.launcher(name, command, retry)) + + def end(self): + self.launch_task.cancel() + + @asyncio.coroutine + def launcher(self, name, command, retry): + process = None + try: + while True: + logger.info("Starting controller %s with command: %s", + name, command) + process = yield from asyncio.create_subprocess_exec(*command.split()) + yield from asyncio.shield(process.wait()) + logger.warning("Controller %s exited", name) + logger.warning("Restarting in %.1f seconds", retry) + yield from asyncio.sleep(retry) + except asyncio.CancelledError: + logger.info("Terminating controller %s", name) + if process is not None and process.returncode is None: + process.send_signal(signal.SIGTERM) + logger.debug("Signal sent") + try: + yield from asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + logger.warning("Controller %s did not respond to SIGTERM", + name) + process.send_signal(signal.SIGKILL) + + class Controllers: - def __init__(self): + def __init__(self, retry_command): + self.retry_command = retry_command self.host_filter = None + self.active = dict() def __setitem__(self, k, v): if (isinstance(v, dict) and v["type"] == "controller" and v["host"] == self.host_filter): - command = v["command"].format(bind=self.host_filter, + command = v["command"].format(name=k, + bind=self.host_filter, port=v["port"]) - print("start controller {}: {}".format(k, command)) + self.active[k] = Controller(k, command, self.retry_command) def __delitem__(self, k): - print("del {}".format(k)) + if k in self.active: + self.active[k].end() + del self.active[k] def delete_all(self): - print("delete all") + for name in list(self.active.keys()): + del self[name] class ControllerDB: - def __init__(self): - self.current_controllers = Controllers() + def __init__(self, retry_command): + self.current_controllers = Controllers(retry_command) + + def set_host_filter(self, host_filter): + self.current_controllers.host_filter = host_filter def sync_struct_init(self, init): if self.current_controllers is not None: @@ -56,17 +101,9 @@ class ControllerDB: self.current_controllers[k] = v -def main(): - args = get_argparser().parse_args() - init_logger(args) - - controller_db = ControllerDB() - - if os.name == "nt": - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - else: - loop = asyncio.get_event_loop() +@asyncio.coroutine +def ctlmgr(server, port, retry_master, retry_command): + controller_db = ControllerDB(retry_command) try: subscriber = Subscriber("devices", controller_db.sync_struct_init) while True: @@ -74,25 +111,37 @@ def main(): def set_host_filter(): s = subscriber.writer.get_extra_info("socket") localhost = s.getsockname()[0] - controller_db.current_controllers.host_filter = localhost - loop.run_until_complete( - subscriber.connect(args.server, args.port, - set_host_filter)) + controller_db.set_host_filter(localhost) + yield from subscriber.connect(server, port, set_host_filter) try: - loop.run_until_complete(subscriber.receive_task) + yield from asyncio.wait([subscriber.receive_task]) finally: - loop.run_until_complete(subscriber.close()) + yield from subscriber.close() except (ConnectionAbortedError, ConnectionError, ConnectionRefusedError, ConnectionResetError) as e: logger.warning("Connection to master failed (%s: %s)", e.__class__.__name__, str(e)) else: logger.warning("Connection to master lost") - logger.warning("Retrying in %.1f seconds", args.retry_master) - loop.run_until_complete(asyncio.sleep(args.retry_master)) + logger.warning("Retrying in %.1f seconds", retry_master) + yield from asyncio.sleep(retry_master) + finally: + controller_db.current_controllers.delete_all() + + +def main(): + args = get_argparser().parse_args() + init_logger(args) + if os.name == "nt": + loop = asyncio.ProactorEventLoop() + asyncio.set_event_loop(loop) + else: + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(ctlmgr(args.server, args.port, + args.retry_master, args.retry_command)) finally: loop.close() - controller_db.current_controllers.delete_all() if __name__ == "__main__": main()