From a6a476593e3c3a7306a74ce942d7497cb7c67497 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 5 Jun 2015 00:37:26 +0800 Subject: [PATCH] worker: wait for process termination This prevents stray SIGCHLDs from crashing the program e.g. if the asyncio event loop is closed before the process actually terminates. --- artiq/master/worker.py | 5 ++++- artiq/tools.py | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index bb89907f4..acdca7908 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -7,7 +7,8 @@ import time from artiq.protocols import pyon from artiq.language.units import strip_unit -from artiq.tools import asyncio_process_wait_timeout, asyncio_wait_or_cancel +from artiq.tools import (asyncio_process_wait_timeout, asyncio_process_wait, + asyncio_wait_or_cancel) logger = logging.getLogger(__name__) @@ -101,6 +102,7 @@ class Worker: logger.warning("failed to send terminate command to worker" " (RID %d), killing", self.rid, exc_info=True) self.process.kill() + yield from asyncio_process_wait(self.process) return try: yield from asyncio_process_wait_timeout(self.process, @@ -108,6 +110,7 @@ class Worker: except asyncio.TimeoutError: logger.warning("worker did not exit (RID %d), killing", self.rid) self.process.kill() + yield from asyncio_process_wait(self.process) else: logger.debug("worker exited gracefully (RID %d)", self.rid) finally: diff --git a/artiq/tools.py b/artiq/tools.py index a6ee390f8..b7e6e8dc8 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -18,6 +18,7 @@ def parse_arguments(arguments): d[name] = pyon.decode(value) return d + def format_arguments(arguments): fmtargs = [] for k, v in sorted(arguments.items(), key=itemgetter(0)): @@ -98,6 +99,14 @@ def asyncio_process_wait_timeout(process, timeout): timeout=end_time - time.monotonic()) +@asyncio.coroutine +def asyncio_process_wait(process): + r = True + while r: + f, p = yield from asyncio.wait([process.stdout.read(1024)]) + r = f.pop().result() + + @asyncio.coroutine def asyncio_wait_or_cancel(fs, **kwargs): fs = [asyncio.async(f) for f in fs]