ctlmgr: basic working implementation

This commit is contained in:
Sebastien Bourdeauducq 2015-02-07 23:52:05 +08:00
parent 291ca0bf8d
commit e7d85c5b87
1 changed files with 76 additions and 27 deletions

View File

@ -4,6 +4,7 @@ import asyncio
import argparse import argparse
import os import os
import logging import logging
import signal
from artiq.protocols.sync_struct import Subscriber from artiq.protocols.sync_struct import Subscriber
from artiq.tools import verbosity_args, init_logger from artiq.tools import verbosity_args, init_logger
@ -24,30 +25,74 @@ def get_argparser():
parser.add_argument( parser.add_argument(
"--retry-master", default=5.0, type=float, "--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master") help="retry timer for reconnecting to master")
parser.add_argument(
"--retry-command", default=5.0, type=float,
help="retry timer for restarting a controller command")
return parser return parser
class Controller:
def __init__(self, name, command, retry):
self.launch_task = asyncio.Task(self.launcher(name, command, retry))
def end(self):
self.launch_task.cancel()
@asyncio.coroutine
def launcher(self, name, command, retry):
process = None
try:
while True:
logger.info("Starting controller %s with command: %s",
name, command)
process = yield from asyncio.create_subprocess_exec(*command.split())
yield from asyncio.shield(process.wait())
logger.warning("Controller %s exited", name)
logger.warning("Restarting in %.1f seconds", retry)
yield from asyncio.sleep(retry)
except asyncio.CancelledError:
logger.info("Terminating controller %s", name)
if process is not None and process.returncode is None:
process.send_signal(signal.SIGTERM)
logger.debug("Signal sent")
try:
yield from asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("Controller %s did not respond to SIGTERM",
name)
process.send_signal(signal.SIGKILL)
class Controllers: class Controllers:
def __init__(self): def __init__(self, retry_command):
self.retry_command = retry_command
self.host_filter = None self.host_filter = None
self.active = dict()
def __setitem__(self, k, v): def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller" if (isinstance(v, dict) and v["type"] == "controller"
and v["host"] == self.host_filter): and v["host"] == self.host_filter):
command = v["command"].format(bind=self.host_filter, command = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"]) port=v["port"])
print("start controller {}: {}".format(k, command)) self.active[k] = Controller(k, command, self.retry_command)
def __delitem__(self, k): def __delitem__(self, k):
print("del {}".format(k)) if k in self.active:
self.active[k].end()
del self.active[k]
def delete_all(self): def delete_all(self):
print("delete all") for name in list(self.active.keys()):
del self[name]
class ControllerDB: class ControllerDB:
def __init__(self): def __init__(self, retry_command):
self.current_controllers = Controllers() self.current_controllers = Controllers(retry_command)
def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter
def sync_struct_init(self, init): def sync_struct_init(self, init):
if self.current_controllers is not None: if self.current_controllers is not None:
@ -56,17 +101,9 @@ class ControllerDB:
self.current_controllers[k] = v self.current_controllers[k] = v
def main(): @asyncio.coroutine
args = get_argparser().parse_args() def ctlmgr(server, port, retry_master, retry_command):
init_logger(args) controller_db = ControllerDB(retry_command)
controller_db = ControllerDB()
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try: try:
subscriber = Subscriber("devices", controller_db.sync_struct_init) subscriber = Subscriber("devices", controller_db.sync_struct_init)
while True: while True:
@ -74,25 +111,37 @@ def main():
def set_host_filter(): def set_host_filter():
s = subscriber.writer.get_extra_info("socket") s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0] localhost = s.getsockname()[0]
controller_db.current_controllers.host_filter = localhost controller_db.set_host_filter(localhost)
loop.run_until_complete( yield from subscriber.connect(server, port, set_host_filter)
subscriber.connect(args.server, args.port,
set_host_filter))
try: try:
loop.run_until_complete(subscriber.receive_task) yield from asyncio.wait([subscriber.receive_task])
finally: finally:
loop.run_until_complete(subscriber.close()) yield from subscriber.close()
except (ConnectionAbortedError, ConnectionError, except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e: ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)", logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e)) e.__class__.__name__, str(e))
else: else:
logger.warning("Connection to master lost") logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", args.retry_master) logger.warning("Retrying in %.1f seconds", retry_master)
loop.run_until_complete(asyncio.sleep(args.retry_master)) yield from asyncio.sleep(retry_master)
finally:
controller_db.current_controllers.delete_all()
def main():
args = get_argparser().parse_args()
init_logger(args)
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(ctlmgr(args.server, args.port,
args.retry_master, args.retry_command))
finally: finally:
loop.close() loop.close()
controller_db.current_controllers.delete_all()
if __name__ == "__main__": if __name__ == "__main__":
main() main()