2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-24 17:38:13 +08:00

ctlmgr: ping controllers

This commit is contained in:
Sebastien Bourdeauducq 2015-08-09 17:30:46 +08:00
parent 479175870f
commit 3f68d0ba8f

View File

@ -9,6 +9,7 @@ import shlex
import socket
from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.tools import verbosity_args, init_logger
from artiq.tools import asyncio_process_wait_timeout
@ -35,8 +36,18 @@ def get_argparser():
class Controller:
def __init__(self, name, command, retry):
self.launch_task = asyncio.Task(self.launcher(name, command, retry))
def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.process = None
self.launch_task = asyncio.Task(self.launcher())
@asyncio.coroutine
def end(self):
@ -44,33 +55,70 @@ class Controller:
yield from asyncio.wait_for(self.launch_task, None)
@asyncio.coroutine
def launcher(self, name, command, retry):
process = None
def _ping_notimeout(self):
remote = AsyncioClient()
yield from remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
ok = yield from remote.ping()
finally:
remote.close_rpc()
return ok
@asyncio.coroutine
def _ping(self):
try:
return (yield from asyncio.wait_for(
self._ping_notimeout(), self.ping_timeout))
except:
return False
@asyncio.coroutine
def _wait_and_ping(self):
while True:
try:
yield from asyncio_process_wait_timeout(self.process,
self.ping_timer)
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
ok = yield from self._ping()
if not ok:
logger.warning("Controller %s ping failed", self.name)
yield from self._terminate()
return
@asyncio.coroutine
def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
name, command)
self.name, self.command)
try:
process = yield from asyncio.create_subprocess_exec(
*shlex.split(command))
yield from asyncio.shield(process.wait())
self.process = yield from asyncio.create_subprocess_exec(
*shlex.split(self.command))
yield from self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", name)
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", name)
logger.warning("Restarting in %.1f seconds", retry)
yield from asyncio.sleep(retry)
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds", self.retry_timer)
yield from asyncio.sleep(self.retry_timer)
except asyncio.CancelledError:
logger.info("Terminating controller %s", name)
if process is not None and process.returncode is None:
process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio_process_wait_timeout(process, 5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
name)
process.send_signal(signal.SIGKILL)
yield from self._terminate()
@asyncio.coroutine
def _terminate(self):
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
self.process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio_process_wait_timeout(self.process, 5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
self.name)
self.process.send_signal(signal.SIGKILL)
def get_ip_addresses(host):
@ -82,8 +130,7 @@ def get_ip_addresses(host):
class Controllers:
def __init__(self, retry_command):
self.retry_command = retry_command
def __init__(self):
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
@ -95,10 +142,10 @@ class Controllers:
while True:
action, param = yield from self.queue.get()
if action == "set":
k, command = param
k, ddb_entry = param
if k in self.active:
yield from self.active[k].end()
self.active[k] = Controller(k, command, self.retry_command)
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
yield from self.active[param].end()
del self.active[param]
@ -108,10 +155,10 @@ class Controllers:
def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller"
and self.host_filter in get_ip_addresses(v["host"])):
command = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, command)))
v["command"] = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, v)))
self.active_or_queued.add(k)
def __delitem__(self, k):
@ -131,8 +178,8 @@ class Controllers:
class ControllerDB:
def __init__(self, retry_command):
self.current_controllers = Controllers(retry_command)
def __init__(self):
self.current_controllers = Controllers()
def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter
@ -146,8 +193,8 @@ class ControllerDB:
@asyncio.coroutine
def ctlmgr(server, port, retry_master, retry_command):
controller_db = ControllerDB(retry_command)
def ctlmgr(server, port, retry_master):
controller_db = ControllerDB()
try:
subscriber = Subscriber("devices", controller_db.sync_struct_init)
while True:
@ -187,7 +234,7 @@ def main():
try:
task = asyncio.Task(ctlmgr(
args.server, args.port, args.retry_master, args.retry_command))
args.server, args.port, args.retry_master))
try:
loop.run_forever()
finally: