From 96a01efc48bd998fefd9ee0563e233c4ff38acaa Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 8 Feb 2015 00:59:08 +0800 Subject: [PATCH] ctlmgr: wait for processes to terminate before starting new ones --- artiq/frontend/artiq_ctlmgr.py | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index bd9332405..437ab0b04 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -35,8 +35,10 @@ class Controller: def __init__(self, name, command, retry): self.launch_task = asyncio.Task(self.launcher(name, command, retry)) + @asyncio.coroutine def end(self): self.launch_task.cancel() + yield from asyncio.wait_for(self.launch_task, None) @asyncio.coroutine def launcher(self, name, command, retry): @@ -67,7 +69,25 @@ class Controllers: def __init__(self, retry_command): self.retry_command = retry_command self.host_filter = None + self.active_or_queued = set() + self.queue = asyncio.Queue() self.active = dict() + self.process_task = asyncio.Task(self._process()) + + @asyncio.coroutine + def _process(self): + while True: + action, param = yield from self.queue.get() + if action == "set": + k, command = param + if k in self.active: + yield from self.active[k].end() + self.active[k] = Controller(k, command, self.retry_command) + elif action == "del": + yield from self.active[param].end() + del self.active[param] + else: + raise ValueError def __setitem__(self, k, v): if (isinstance(v, dict) and v["type"] == "controller" @@ -75,15 +95,16 @@ class Controllers: command = v["command"].format(name=k, bind=self.host_filter, port=v["port"]) - self.active[k] = Controller(k, command, self.retry_command) + self.queue.put_nowait(("set", (k, command))) + self.active_or_queued.add(k) def __delitem__(self, k): - if k in self.active: - self.active[k].end() - del self.active[k] + if k in self.active_or_queued: + self.queue.put_nowait(("del", k)) + self.active_or_queued.remove(k) def delete_all(self): - for name in list(self.active.keys()): + for name in set(self.active_or_queued): del self[name] @@ -99,6 +120,7 @@ class ControllerDB: self.current_controllers.delete_all() for k, v in init.items(): self.current_controllers[k] = v + return self.current_controllers @asyncio.coroutine @@ -114,7 +136,7 @@ def ctlmgr(server, port, retry_master, retry_command): controller_db.set_host_filter(localhost) yield from subscriber.connect(server, port, set_host_filter) try: - yield from asyncio.wait([subscriber.receive_task]) + yield from asyncio.wait_for(subscriber.receive_task, None) finally: yield from subscriber.close() except (ConnectionAbortedError, ConnectionError,