1
0
forked from M-Labs/artiq
artiq/artiq/frontend/artiq_ctlmgr.py

283 lines
9.8 KiB
Python
Raw Normal View History

2015-02-06 12:17:51 +08:00
#!/usr/bin/env python3
import asyncio
import atexit
2015-02-06 12:17:51 +08:00
import argparse
import os
import logging
import shlex
2015-02-09 20:03:34 +08:00
import socket
2015-02-06 12:17:51 +08:00
from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient, Server
from artiq.tools import verbosity_args, init_logger
from artiq.tools import TaskObject, Condition
logger = logging.getLogger(__name__)
2015-02-06 12:17:51 +08:00
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ controller manager")
verbosity_args(parser)
2015-02-06 12:17:51 +08:00
parser.add_argument(
"-s", "--server", default="::1",
help="hostname or IP of the master to connect to")
parser.add_argument(
"--port", default=3250, type=int,
help="TCP port to use to connect to the master")
parser.add_argument(
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
2015-02-07 23:52:05 +08:00
parser.add_argument(
"--bind", default="::1",
help="hostname or IP address to bind to")
parser.add_argument(
"--bind-port", default=3249, type=int,
help="TCP port to listen to for control (default: %(default)d)")
2015-02-06 12:17:51 +08:00
return parser
2015-02-07 23:52:05 +08:00
class Controller:
2015-08-09 17:30:46 +08:00
def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
2015-08-09 18:28:56 +08:00
self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1)
2015-08-09 17:30:46 +08:00
self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.term_timeout = ddb_entry.get("term_timeout", 30)
2015-08-09 17:30:46 +08:00
2015-08-09 18:28:56 +08:00
self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
2015-08-09 17:30:46 +08:00
self.process = None
self.launch_task = asyncio.Task(self.launcher())
2015-02-07 23:52:05 +08:00
2015-10-03 19:28:57 +08:00
async def end(self):
2015-02-07 23:52:05 +08:00
self.launch_task.cancel()
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(self.launch_task, None)
2015-02-07 23:52:05 +08:00
2015-10-03 19:28:57 +08:00
async def _call_controller(self, method):
2015-08-09 17:30:46 +08:00
remote = AsyncioClient()
2015-10-03 19:28:57 +08:00
await remote.connect_rpc(self.host, self.port, None)
2015-08-09 17:30:46 +08:00
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
2015-10-03 19:28:57 +08:00
r = await getattr(remote, method)()
2015-08-09 17:30:46 +08:00
finally:
remote.close_rpc()
return r
2015-08-09 17:30:46 +08:00
2015-10-03 19:28:57 +08:00
async def _ping(self):
2015-08-09 17:30:46 +08:00
try:
2015-10-03 19:28:57 +08:00
ok = await asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout)
2015-08-09 18:28:56 +08:00
if ok:
self.retry_timer_cur = self.retry_timer
return ok
2015-08-09 17:30:46 +08:00
except:
return False
2015-10-03 19:28:57 +08:00
async def _wait_and_ping(self):
2015-08-09 17:30:46 +08:00
while True:
try:
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(self.process.wait(),
self.ping_timer)
2015-08-09 17:30:46 +08:00
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
2015-10-03 19:28:57 +08:00
ok = await self._ping()
2015-08-09 17:30:46 +08:00
if not ok:
logger.warning("Controller %s ping failed", self.name)
2015-10-03 19:28:57 +08:00
await self._terminate()
2015-08-09 17:30:46 +08:00
return
else:
break
2015-08-09 17:30:46 +08:00
2015-10-03 19:28:57 +08:00
async def launcher(self):
2015-02-07 23:52:05 +08:00
try:
while True:
logger.info("Starting controller %s with command: %s",
2015-08-09 17:30:46 +08:00
self.name, self.command)
try:
2015-10-03 19:28:57 +08:00
self.process = await asyncio.create_subprocess_exec(
2015-08-09 17:30:46 +08:00
*shlex.split(self.command))
2015-10-03 19:28:57 +08:00
await self._wait_and_ping()
except FileNotFoundError:
2015-08-09 17:30:46 +08:00
logger.warning("Controller %s failed to start", self.name)
else:
2015-08-09 17:30:46 +08:00
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
2015-08-09 18:28:56 +08:00
self.retry_timer_cur *= self.retry_timer_backoff
2015-02-07 23:52:05 +08:00
except asyncio.CancelledError:
2015-10-03 19:28:57 +08:00
await self._terminate()
2015-08-09 17:30:46 +08:00
2015-10-03 19:28:57 +08:00
async def _terminate(self):
2015-08-09 17:30:46 +08:00
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
try:
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(self._call_controller("terminate"),
self.term_timeout)
except:
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
self.process.kill()
try:
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(self.process.wait(),
self.term_timeout)
except:
logger.warning("Controller %s failed to exit, killing",
2015-08-09 17:30:46 +08:00
self.name)
self.process.kill()
2015-10-03 19:28:57 +08:00
await self.process.wait()
logger.debug("Controller %s terminated", self.name)
2015-02-07 23:52:05 +08:00
2015-02-09 20:03:34 +08:00
def get_ip_addresses(host):
try:
addrinfo = socket.getaddrinfo(host, None)
except:
return set()
return {info[4][0] for info in addrinfo}
class Controllers:
2015-08-09 17:30:46 +08:00
def __init__(self):
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
2015-02-07 23:52:05 +08:00
self.active = dict()
self.process_task = asyncio.Task(self._process())
2015-10-03 19:28:57 +08:00
async def _process(self):
while True:
2015-10-03 19:28:57 +08:00
action, param = await self.queue.get()
if action == "set":
2015-08-09 17:30:46 +08:00
k, ddb_entry = param
if k in self.active:
2015-10-03 19:28:57 +08:00
await self.active[k].end()
2015-08-09 17:30:46 +08:00
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
2015-10-03 19:28:57 +08:00
await self.active[param].end()
del self.active[param]
else:
raise ValueError
def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller"
2015-02-09 20:03:34 +08:00
and self.host_filter in get_ip_addresses(v["host"])):
2015-08-09 17:30:46 +08:00
v["command"] = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, v)))
self.active_or_queued.add(k)
def __delitem__(self, k):
if k in self.active_or_queued:
self.queue.put_nowait(("del", k))
self.active_or_queued.remove(k)
def delete_all(self):
for name in set(self.active_or_queued):
2015-02-07 23:52:05 +08:00
del self[name]
2015-10-03 19:28:57 +08:00
async def shutdown(self):
2015-02-08 21:44:49 +08:00
self.process_task.cancel()
for c in self.active.values():
2015-10-03 19:28:57 +08:00
await c.end()
2015-02-08 21:44:49 +08:00
class ControllerDB:
2015-08-09 17:30:46 +08:00
def __init__(self):
self.current_controllers = Controllers()
2015-02-07 23:52:05 +08:00
def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter
def sync_struct_init(self, init):
if self.current_controllers is not None:
self.current_controllers.delete_all()
for k, v in init.items():
self.current_controllers[k] = v
return self.current_controllers
class ControllerManager(TaskObject):
def __init__(self, server, port, retry_master):
self.server = server
self.port = port
self.retry_master = retry_master
self.controller_db = ControllerDB()
2015-10-03 19:28:57 +08:00
async def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
2015-10-03 19:28:57 +08:00
await subscriber.connect(self.server, self.port,
set_host_filter)
try:
2015-10-03 19:28:57 +08:00
await asyncio.wait_for(subscriber.receive_task, None)
finally:
2015-10-03 19:28:57 +08:00
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
2015-10-03 19:28:57 +08:00
await asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
2015-10-03 19:28:57 +08:00
await self.controller_db.current_controllers.shutdown()
def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry
now."""
self.controller_db.current_controllers.active[k].retry_now.notify()
2015-02-06 12:17:51 +08:00
2015-02-07 23:52:05 +08:00
def main():
args = get_argparser().parse_args()
init_logger(args)
2015-02-08 21:44:49 +08:00
2015-02-07 23:52:05 +08:00
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
atexit.register(lambda: loop.close())
2015-02-08 21:44:49 +08:00
ctlmgr = ControllerManager(args.server, args.port, args.retry_master)
ctlmgr.start()
atexit.register(lambda: loop.run_until_complete(ctlmgr.stop()))
class CtlMgrRPC:
retry_now = ctlmgr.retry_now
rpc_target = CtlMgrRPC()
rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True)
loop.run_until_complete(rpc_server.start(args.bind, args.bind_port))
atexit.register(lambda: loop.run_until_complete(rpc_server.stop()))
loop.run_until_complete(rpc_server.wait_terminate())
2015-02-08 21:44:49 +08:00
2015-02-07 23:52:05 +08:00
2015-02-06 12:17:51 +08:00
if __name__ == "__main__":
main()