diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index ce59bc7a4..878d6e649 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import asyncio +import atexit import argparse import os import logging @@ -8,9 +9,9 @@ import shlex import socket from artiq.protocols.sync_struct import Subscriber -from artiq.protocols.pc_rpc import AsyncioClient +from artiq.protocols.pc_rpc import AsyncioClient, Server from artiq.tools import verbosity_args, init_logger -from artiq.tools import asyncio_process_wait_timeout +from artiq.tools import TaskObject, asyncio_process_wait_timeout, Condition logger = logging.getLogger(__name__) @@ -29,8 +30,11 @@ def get_argparser(): "--retry-master", default=5.0, type=float, help="retry timer for reconnecting to master") parser.add_argument( - "--retry-command", default=5.0, type=float, - help="retry timer for restarting a controller command") + "--bind", default="::1", + help="hostname or IP address to bind to") + parser.add_argument( + "--bind-port", default=3249, type=int, + help="TCP port to listen to for control (default: %(default)d)") return parser @@ -48,6 +52,7 @@ class Controller: self.term_timeout = ddb_entry.get("term_timeout", 30) self.retry_timer_cur = self.retry_timer + self.retry_now = Condition() self.process = None self.launch_task = asyncio.Task(self.launcher()) @@ -109,8 +114,13 @@ class Controller: logger.warning("Controller %s failed to start", self.name) else: logger.warning("Controller %s exited", self.name) - logger.warning("Restarting in %.1f seconds", self.retry_timer_cur) - yield from asyncio.sleep(self.retry_timer_cur) + logger.warning("Restarting in %.1f seconds", + self.retry_timer_cur) + try: + yield from asyncio.wait_for(self.retry_now.wait(), + self.retry_timer_cur) + except asyncio.TimeoutError: + pass self.retry_timer_cur *= self.retry_timer_backoff except asyncio.CancelledError: yield from self._terminate() @@ -208,34 +218,47 @@ class ControllerDB: return self.current_controllers -@asyncio.coroutine -def ctlmgr(server, port, retry_master): - controller_db = ControllerDB() - try: - subscriber = Subscriber("devices", controller_db.sync_struct_init) - while True: - try: - def set_host_filter(): - s = subscriber.writer.get_extra_info("socket") - localhost = s.getsockname()[0] - controller_db.set_host_filter(localhost) - yield from subscriber.connect(server, port, set_host_filter) +class ControllerManager(TaskObject): + def __init__(self, server, port, retry_master): + self.server = server + self.port = port + self.retry_master = retry_master + self.controller_db = ControllerDB() + + @asyncio.coroutine + def _do(self): + try: + subscriber = Subscriber("devices", + self.controller_db.sync_struct_init) + while True: try: - yield from asyncio.wait_for(subscriber.receive_task, None) - finally: - yield from subscriber.close() - except (ConnectionAbortedError, ConnectionError, - ConnectionRefusedError, ConnectionResetError) as e: - logger.warning("Connection to master failed (%s: %s)", - e.__class__.__name__, str(e)) - else: - logger.warning("Connection to master lost") - logger.warning("Retrying in %.1f seconds", retry_master) - yield from asyncio.sleep(retry_master) - except asyncio.CancelledError: - pass - finally: - yield from controller_db.current_controllers.shutdown() + def set_host_filter(): + s = subscriber.writer.get_extra_info("socket") + localhost = s.getsockname()[0] + self.controller_db.set_host_filter(localhost) + yield from subscriber.connect(self.server, self.port, + set_host_filter) + try: + yield from asyncio.wait_for(subscriber.receive_task, None) + finally: + yield from subscriber.close() + except (ConnectionAbortedError, ConnectionError, + ConnectionRefusedError, ConnectionResetError) as e: + logger.warning("Connection to master failed (%s: %s)", + e.__class__.__name__, str(e)) + else: + logger.warning("Connection to master lost") + logger.warning("Retrying in %.1f seconds", self.retry_master) + yield from asyncio.sleep(self.retry_master) + except asyncio.CancelledError: + pass + finally: + yield from self.controller_db.current_controllers.shutdown() + + def retry_now(self, k): + """If a controller is disabled and pending retry, perform that retry + now.""" + self.controller_db.current_controllers.active[k].retry_now.notify() def main(): @@ -247,18 +270,22 @@ def main(): asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() + atexit.register(lambda: loop.close()) - try: - task = asyncio.Task(ctlmgr( - args.server, args.port, args.retry_master)) - try: - loop.run_forever() - finally: - task.cancel() - loop.run_until_complete(asyncio.wait_for(task, None)) + ctlmgr = ControllerManager(args.server, args.port, args.retry_master) + ctlmgr.start() + atexit.register(lambda: loop.run_until_complete(ctlmgr.stop())) + + class CtlMgrRPC: + retry_now = ctlmgr.retry_now + + rpc_target = CtlMgrRPC() + rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True) + loop.run_until_complete(rpc_server.start(args.bind, args.bind_port)) + atexit.register(lambda: loop.run_until_complete(rpc_server.stop())) + + loop.run_until_complete(rpc_server.wait_terminate()) - finally: - loop.close() if __name__ == "__main__": main() diff --git a/doc/manual/default_network_ports.rst b/doc/manual/default_network_ports.rst index c5728e428..f9c4f9c09 100644 --- a/doc/manual/default_network_ports.rst +++ b/doc/manual/default_network_ports.rst @@ -8,6 +8,8 @@ Default network ports +--------------------------+--------------+ | Core device (mon/inj) | 3250 (UDP) | +--------------------------+--------------+ +| Controller manager | 3249 | ++--------------------------+--------------+ | Master (notifications) | 3250 | +--------------------------+--------------+ | Master (control) | 3251 |