forked from M-Labs/artiq
1
0
Fork 0

ctlmgr: wait for processes to terminate before starting new ones

This commit is contained in:
Sebastien Bourdeauducq 2015-02-08 00:59:08 +08:00
parent e7d85c5b87
commit 96a01efc48
1 changed files with 28 additions and 6 deletions

View File

@ -35,8 +35,10 @@ class Controller:
def __init__(self, name, command, retry): def __init__(self, name, command, retry):
self.launch_task = asyncio.Task(self.launcher(name, command, retry)) self.launch_task = asyncio.Task(self.launcher(name, command, retry))
@asyncio.coroutine
def end(self): def end(self):
self.launch_task.cancel() self.launch_task.cancel()
yield from asyncio.wait_for(self.launch_task, None)
@asyncio.coroutine @asyncio.coroutine
def launcher(self, name, command, retry): def launcher(self, name, command, retry):
@ -67,7 +69,25 @@ class Controllers:
def __init__(self, retry_command): def __init__(self, retry_command):
self.retry_command = retry_command self.retry_command = retry_command
self.host_filter = None self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict() self.active = dict()
self.process_task = asyncio.Task(self._process())
@asyncio.coroutine
def _process(self):
while True:
action, param = yield from self.queue.get()
if action == "set":
k, command = param
if k in self.active:
yield from self.active[k].end()
self.active[k] = Controller(k, command, self.retry_command)
elif action == "del":
yield from self.active[param].end()
del self.active[param]
else:
raise ValueError
def __setitem__(self, k, v): def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller" if (isinstance(v, dict) and v["type"] == "controller"
@ -75,15 +95,16 @@ class Controllers:
command = v["command"].format(name=k, command = v["command"].format(name=k,
bind=self.host_filter, bind=self.host_filter,
port=v["port"]) port=v["port"])
self.active[k] = Controller(k, command, self.retry_command) self.queue.put_nowait(("set", (k, command)))
self.active_or_queued.add(k)
def __delitem__(self, k): def __delitem__(self, k):
if k in self.active: if k in self.active_or_queued:
self.active[k].end() self.queue.put_nowait(("del", k))
del self.active[k] self.active_or_queued.remove(k)
def delete_all(self): def delete_all(self):
for name in list(self.active.keys()): for name in set(self.active_or_queued):
del self[name] del self[name]
@ -99,6 +120,7 @@ class ControllerDB:
self.current_controllers.delete_all() self.current_controllers.delete_all()
for k, v in init.items(): for k, v in init.items():
self.current_controllers[k] = v self.current_controllers[k] = v
return self.current_controllers
@asyncio.coroutine @asyncio.coroutine
@ -114,7 +136,7 @@ def ctlmgr(server, port, retry_master, retry_command):
controller_db.set_host_filter(localhost) controller_db.set_host_filter(localhost)
yield from subscriber.connect(server, port, set_host_filter) yield from subscriber.connect(server, port, set_host_filter)
try: try:
yield from asyncio.wait([subscriber.receive_task]) yield from asyncio.wait_for(subscriber.receive_task, None)
finally: finally:
yield from subscriber.close() yield from subscriber.close()
except (ConnectionAbortedError, ConnectionError, except (ConnectionAbortedError, ConnectionError,