From 3f68d0ba8fbcaf0a360a9d68fe2070fe17144a17 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 9 Aug 2015 17:30:46 +0800 Subject: [PATCH] ctlmgr: ping controllers --- artiq/frontend/artiq_ctlmgr.py | 117 +++++++++++++++++++++++---------- 1 file changed, 82 insertions(+), 35 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index 45ab109c4..d59476855 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -9,6 +9,7 @@ import shlex import socket from artiq.protocols.sync_struct import Subscriber +from artiq.protocols.pc_rpc import AsyncioClient from artiq.tools import verbosity_args, init_logger from artiq.tools import asyncio_process_wait_timeout @@ -35,8 +36,18 @@ def get_argparser(): class Controller: - def __init__(self, name, command, retry): - self.launch_task = asyncio.Task(self.launcher(name, command, retry)) + def __init__(self, name, ddb_entry): + self.name = name + self.command = ddb_entry["command"] + self.retry_timer = ddb_entry.get("retry_timer", 5) + + self.host = ddb_entry["host"] + self.port = ddb_entry["port"] + self.ping_timer = ddb_entry.get("ping_timer", 30) + self.ping_timeout = ddb_entry.get("ping_timeout", 30) + + self.process = None + self.launch_task = asyncio.Task(self.launcher()) @asyncio.coroutine def end(self): @@ -44,33 +55,70 @@ class Controller: yield from asyncio.wait_for(self.launch_task, None) @asyncio.coroutine - def launcher(self, name, command, retry): - process = None + def _ping_notimeout(self): + remote = AsyncioClient() + yield from remote.connect_rpc(self.host, self.port, None) + try: + targets, _ = remote.get_rpc_id() + remote.select_rpc_target(targets[0]) + ok = yield from remote.ping() + finally: + remote.close_rpc() + return ok + + @asyncio.coroutine + def _ping(self): + try: + return (yield from asyncio.wait_for( + self._ping_notimeout(), self.ping_timeout)) + except: + return False + + @asyncio.coroutine + def _wait_and_ping(self): + while True: + try: + yield from asyncio_process_wait_timeout(self.process, + self.ping_timer) + except asyncio.TimeoutError: + logger.debug("pinging controller %s", self.name) + ok = yield from self._ping() + if not ok: + logger.warning("Controller %s ping failed", self.name) + yield from self._terminate() + return + + @asyncio.coroutine + def launcher(self): try: while True: logger.info("Starting controller %s with command: %s", - name, command) + self.name, self.command) try: - process = yield from asyncio.create_subprocess_exec( - *shlex.split(command)) - yield from asyncio.shield(process.wait()) + self.process = yield from asyncio.create_subprocess_exec( + *shlex.split(self.command)) + yield from self._wait_and_ping() except FileNotFoundError: - logger.warning("Controller %s failed to start", name) + logger.warning("Controller %s failed to start", self.name) else: - logger.warning("Controller %s exited", name) - logger.warning("Restarting in %.1f seconds", retry) - yield from asyncio.sleep(retry) + logger.warning("Controller %s exited", self.name) + logger.warning("Restarting in %.1f seconds", self.retry_timer) + yield from asyncio.sleep(self.retry_timer) except asyncio.CancelledError: - logger.info("Terminating controller %s", name) - if process is not None and process.returncode is None: - process.send_signal(signal.SIGTERM) - logger.debug("Signal sent") - try: - yield from asyncio_process_wait_timeout(process, 5.0) - except asyncio.TimeoutError: - logger.warning("Controller %s did not respond to SIGTERM", - name) - process.send_signal(signal.SIGKILL) + yield from self._terminate() + + @asyncio.coroutine + def _terminate(self): + logger.info("Terminating controller %s", self.name) + if self.process is not None and self.process.returncode is None: + self.process.send_signal(signal.SIGTERM) + logger.debug("Signal sent") + try: + yield from asyncio_process_wait_timeout(self.process, 5.0) + except asyncio.TimeoutError: + logger.warning("Controller %s did not respond to SIGTERM", + self.name) + self.process.send_signal(signal.SIGKILL) def get_ip_addresses(host): @@ -82,8 +130,7 @@ def get_ip_addresses(host): class Controllers: - def __init__(self, retry_command): - self.retry_command = retry_command + def __init__(self): self.host_filter = None self.active_or_queued = set() self.queue = asyncio.Queue() @@ -95,10 +142,10 @@ class Controllers: while True: action, param = yield from self.queue.get() if action == "set": - k, command = param + k, ddb_entry = param if k in self.active: yield from self.active[k].end() - self.active[k] = Controller(k, command, self.retry_command) + self.active[k] = Controller(k, ddb_entry) elif action == "del": yield from self.active[param].end() del self.active[param] @@ -108,10 +155,10 @@ class Controllers: def __setitem__(self, k, v): if (isinstance(v, dict) and v["type"] == "controller" and self.host_filter in get_ip_addresses(v["host"])): - command = v["command"].format(name=k, - bind=self.host_filter, - port=v["port"]) - self.queue.put_nowait(("set", (k, command))) + v["command"] = v["command"].format(name=k, + bind=self.host_filter, + port=v["port"]) + self.queue.put_nowait(("set", (k, v))) self.active_or_queued.add(k) def __delitem__(self, k): @@ -131,8 +178,8 @@ class Controllers: class ControllerDB: - def __init__(self, retry_command): - self.current_controllers = Controllers(retry_command) + def __init__(self): + self.current_controllers = Controllers() def set_host_filter(self, host_filter): self.current_controllers.host_filter = host_filter @@ -146,8 +193,8 @@ class ControllerDB: @asyncio.coroutine -def ctlmgr(server, port, retry_master, retry_command): - controller_db = ControllerDB(retry_command) +def ctlmgr(server, port, retry_master): + controller_db = ControllerDB() try: subscriber = Subscriber("devices", controller_db.sync_struct_init) while True: @@ -187,7 +234,7 @@ def main(): try: task = asyncio.Task(ctlmgr( - args.server, args.port, args.retry_master, args.retry_command)) + args.server, args.port, args.retry_master)) try: loop.run_forever() finally: