mirror of https://github.com/m-labs/artiq.git
ctlmgr: support immediate controller retry
This commit is contained in:
parent
80805407bf
commit
a6ab066c87
|
@ -1,6 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import atexit
|
||||||
import argparse
|
import argparse
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
|
@ -8,9 +9,9 @@ import shlex
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Subscriber
|
from artiq.protocols.sync_struct import Subscriber
|
||||||
from artiq.protocols.pc_rpc import AsyncioClient
|
from artiq.protocols.pc_rpc import AsyncioClient, Server
|
||||||
from artiq.tools import verbosity_args, init_logger
|
from artiq.tools import verbosity_args, init_logger
|
||||||
from artiq.tools import asyncio_process_wait_timeout
|
from artiq.tools import TaskObject, asyncio_process_wait_timeout, Condition
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -29,8 +30,11 @@ def get_argparser():
|
||||||
"--retry-master", default=5.0, type=float,
|
"--retry-master", default=5.0, type=float,
|
||||||
help="retry timer for reconnecting to master")
|
help="retry timer for reconnecting to master")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--retry-command", default=5.0, type=float,
|
"--bind", default="::1",
|
||||||
help="retry timer for restarting a controller command")
|
help="hostname or IP address to bind to")
|
||||||
|
parser.add_argument(
|
||||||
|
"--bind-port", default=3249, type=int,
|
||||||
|
help="TCP port to listen to for control (default: %(default)d)")
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,6 +52,7 @@ class Controller:
|
||||||
self.term_timeout = ddb_entry.get("term_timeout", 30)
|
self.term_timeout = ddb_entry.get("term_timeout", 30)
|
||||||
|
|
||||||
self.retry_timer_cur = self.retry_timer
|
self.retry_timer_cur = self.retry_timer
|
||||||
|
self.retry_now = Condition()
|
||||||
self.process = None
|
self.process = None
|
||||||
self.launch_task = asyncio.Task(self.launcher())
|
self.launch_task = asyncio.Task(self.launcher())
|
||||||
|
|
||||||
|
@ -109,8 +114,13 @@ class Controller:
|
||||||
logger.warning("Controller %s failed to start", self.name)
|
logger.warning("Controller %s failed to start", self.name)
|
||||||
else:
|
else:
|
||||||
logger.warning("Controller %s exited", self.name)
|
logger.warning("Controller %s exited", self.name)
|
||||||
logger.warning("Restarting in %.1f seconds", self.retry_timer_cur)
|
logger.warning("Restarting in %.1f seconds",
|
||||||
yield from asyncio.sleep(self.retry_timer_cur)
|
self.retry_timer_cur)
|
||||||
|
try:
|
||||||
|
yield from asyncio.wait_for(self.retry_now.wait(),
|
||||||
|
self.retry_timer_cur)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
self.retry_timer_cur *= self.retry_timer_backoff
|
self.retry_timer_cur *= self.retry_timer_backoff
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
yield from self._terminate()
|
yield from self._terminate()
|
||||||
|
@ -208,18 +218,26 @@ class ControllerDB:
|
||||||
return self.current_controllers
|
return self.current_controllers
|
||||||
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
class ControllerManager(TaskObject):
|
||||||
def ctlmgr(server, port, retry_master):
|
def __init__(self, server, port, retry_master):
|
||||||
controller_db = ControllerDB()
|
self.server = server
|
||||||
|
self.port = port
|
||||||
|
self.retry_master = retry_master
|
||||||
|
self.controller_db = ControllerDB()
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _do(self):
|
||||||
try:
|
try:
|
||||||
subscriber = Subscriber("devices", controller_db.sync_struct_init)
|
subscriber = Subscriber("devices",
|
||||||
|
self.controller_db.sync_struct_init)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
def set_host_filter():
|
def set_host_filter():
|
||||||
s = subscriber.writer.get_extra_info("socket")
|
s = subscriber.writer.get_extra_info("socket")
|
||||||
localhost = s.getsockname()[0]
|
localhost = s.getsockname()[0]
|
||||||
controller_db.set_host_filter(localhost)
|
self.controller_db.set_host_filter(localhost)
|
||||||
yield from subscriber.connect(server, port, set_host_filter)
|
yield from subscriber.connect(self.server, self.port,
|
||||||
|
set_host_filter)
|
||||||
try:
|
try:
|
||||||
yield from asyncio.wait_for(subscriber.receive_task, None)
|
yield from asyncio.wait_for(subscriber.receive_task, None)
|
||||||
finally:
|
finally:
|
||||||
|
@ -230,12 +248,17 @@ def ctlmgr(server, port, retry_master):
|
||||||
e.__class__.__name__, str(e))
|
e.__class__.__name__, str(e))
|
||||||
else:
|
else:
|
||||||
logger.warning("Connection to master lost")
|
logger.warning("Connection to master lost")
|
||||||
logger.warning("Retrying in %.1f seconds", retry_master)
|
logger.warning("Retrying in %.1f seconds", self.retry_master)
|
||||||
yield from asyncio.sleep(retry_master)
|
yield from asyncio.sleep(self.retry_master)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
yield from controller_db.current_controllers.shutdown()
|
yield from self.controller_db.current_controllers.shutdown()
|
||||||
|
|
||||||
|
def retry_now(self, k):
|
||||||
|
"""If a controller is disabled and pending retry, perform that retry
|
||||||
|
now."""
|
||||||
|
self.controller_db.current_controllers.active[k].retry_now.notify()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
@ -247,18 +270,22 @@ def main():
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
else:
|
else:
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
atexit.register(lambda: loop.close())
|
||||||
|
|
||||||
try:
|
ctlmgr = ControllerManager(args.server, args.port, args.retry_master)
|
||||||
task = asyncio.Task(ctlmgr(
|
ctlmgr.start()
|
||||||
args.server, args.port, args.retry_master))
|
atexit.register(lambda: loop.run_until_complete(ctlmgr.stop()))
|
||||||
try:
|
|
||||||
loop.run_forever()
|
class CtlMgrRPC:
|
||||||
finally:
|
retry_now = ctlmgr.retry_now
|
||||||
task.cancel()
|
|
||||||
loop.run_until_complete(asyncio.wait_for(task, None))
|
rpc_target = CtlMgrRPC()
|
||||||
|
rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True)
|
||||||
|
loop.run_until_complete(rpc_server.start(args.bind, args.bind_port))
|
||||||
|
atexit.register(lambda: loop.run_until_complete(rpc_server.stop()))
|
||||||
|
|
||||||
|
loop.run_until_complete(rpc_server.wait_terminate())
|
||||||
|
|
||||||
finally:
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
|
@ -8,6 +8,8 @@ Default network ports
|
||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
| Core device (mon/inj) | 3250 (UDP) |
|
| Core device (mon/inj) | 3250 (UDP) |
|
||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
|
| Controller manager | 3249 |
|
||||||
|
+--------------------------+--------------+
|
||||||
| Master (notifications) | 3250 |
|
| Master (notifications) | 3250 |
|
||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
| Master (control) | 3251 |
|
| Master (control) | 3251 |
|
||||||
|
|
Loading…
Reference in New Issue