From 55006119c8fb2cd62d51d7be2cc0508dd1ebc671 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Mon, 1 Feb 2016 01:52:31 -0700 Subject: [PATCH 1/7] subprocesses: unify termination logic --- artiq/devices/ctlmgr.py | 54 ++++++++++++++++++++++++----------------- artiq/master/worker.py | 39 +++++++++++++++++------------ 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/artiq/devices/ctlmgr.py b/artiq/devices/ctlmgr.py index 83295cf69..785f67267 100644 --- a/artiq/devices/ctlmgr.py +++ b/artiq/devices/ctlmgr.py @@ -106,30 +106,40 @@ class Controller: await self._terminate() async def _terminate(self): - logger.info("Terminating controller %s", self.name) - if self.process is not None and self.process.returncode is None: + if self.process is None or self.process.returncode is not None: + logger.info("Controller %s already terminated", self.name) + return + logger.debug("Terminating controller %s", self.name) + try: + await asyncio.wait_for(self.call("terminate"), self.term_timeout) + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller %s terminated", self.name) + return + except: + logger.warning("Controller %s did not exit on request, " + "ending the process", self.name) + if os.name != "nt": try: - await asyncio.wait_for(self.call("terminate"), - self.term_timeout) - except: - logger.warning("Controller %s did not respond to terminate " - "command, killing", self.name) - try: - self.process.kill() - except ProcessLookupError: - pass + self.process.terminate() + except ProcessLookupError: + pass try: - await asyncio.wait_for(self.process.wait(), - self.term_timeout) - except: - logger.warning("Controller %s failed to exit, killing", - self.name) - try: - self.process.kill() - except ProcessLookupError: - pass - await self.process.wait() - logger.debug("Controller %s terminated", self.name) + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller process %s terminated", self.name) + return + except asyncio.TimeoutError: + logger.warning("Controller process %s did not terminate, " + "killing", self.name) + try: + self.process.kill() + except ProcessLookupError: + pass + try: + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller process %s killed", self.name) + return + except asyncio.TimeoutError: + logger.warning("Controller process %s failed to die", self.name) def get_ip_addresses(host): diff --git a/artiq/master/worker.py b/artiq/master/worker.py index fb1752aed..12d05c796 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -115,30 +115,37 @@ class Worker: " (RID %s)", self.ipc.process.returncode, self.rid) return - obj = {"action": "terminate"} try: - await self._send(obj, cancellable=False) + await self._send({"action": "terminate"}, cancellable=False) + await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + logger.debug("worker exited on request (RID %s)", self.rid) + return except: - logger.debug("failed to send terminate command to worker" - " (RID %s), killing", self.rid, exc_info=True) + logger.debug("worker failed to exit on request" + " (RID %s), ending the process", self.rid, + exc_info=True) + if os.name != "nt": try: - self.ipc.process.kill() + self.ipc.process.terminate() except ProcessLookupError: pass - await self.ipc.process.wait() - return + try: + await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + logger.debug("worker terminated (RID %s)", self.rid) + return + except asyncio.TimeoutError: + logger.warning("worker did not terminate (RID %s), killing", + self.rid) + try: + self.ipc.process.kill() + except ProcessLookupError: + pass try: await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + logger.debug("worker killed (RID %s)", self.rid) + return except asyncio.TimeoutError: - logger.debug("worker did not exit by itself (RID %s), killing", - self.rid) - try: - self.ipc.process.kill() - except ProcessLookupError: - pass - await self.ipc.process.wait() - else: - logger.debug("worker exited by itself (RID %s)", self.rid) + logger.warning("worker refuses to die (RID %s)", self.rid) finally: self.io_lock.release() From c105949155fca6219f35b9fafbd861b288f2b4ad Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Tue, 2 Feb 2016 15:14:24 -0700 Subject: [PATCH 2/7] pc_rpc.Client: support socket timeouts ... and fix two flake8 errors. --- artiq/protocols/pc_rpc.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py index d1de343a9..a861b48cd 100644 --- a/artiq/protocols/pc_rpc.py +++ b/artiq/protocols/pc_rpc.py @@ -57,8 +57,7 @@ def _validate_target_name(target_name, target_names): target_name = target_names[0] elif target_name not in target_names: raise IncompatibleServer( - "valid target name(s): " + - " ".join(sorted(target_names))) + "valid target name(s): " + " ".join(sorted(target_names))) return target_name @@ -92,9 +91,14 @@ class Client: Use ``None`` to skip selecting a target. The list of targets can then be retrieved using ``get_rpc_id`` and then one can be selected later using ``select_rpc_target``. + :param timeout: Socket operation timeout. Use ``None`` for blocking + (default), ``0`` for non-blocking, and a finite value to raise + ``socket.timeout`` if an operation does not complete within the + given time. See also ``socket.create_connection()`` and + ``socket.settimeout()`` in the Python standard library. """ - def __init__(self, host, port, target_name=AutoTarget): - self.__socket = socket.create_connection((host, port)) + def __init__(self, host, port, target_name=AutoTarget, timeout=None): + self.__socket = socket.create_connection((host, port), timeout) try: self.__socket.sendall(_init_string) @@ -485,7 +489,8 @@ class Server(_AsyncioServer): obj = {"status": "ok", "ret": doc} elif obj["action"] == "call": logger.debug("calling %s", _PrettyPrintCall(obj)) - if self.builtin_terminate and obj["name"] == "terminate": + if (self.builtin_terminate and obj["name"] == + "terminate"): self._terminate_request.set() obj = {"status": "ok", "ret": None} else: From 7636952496887cb7efbc86d1776d38c7ad5f6f71 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Tue, 2 Feb 2016 15:31:06 -0700 Subject: [PATCH 3/7] ctlmgr: fix import --- artiq/devices/ctlmgr.py | 1 + 1 file changed, 1 insertion(+) diff --git a/artiq/devices/ctlmgr.py b/artiq/devices/ctlmgr.py index 785f67267..c4873a66e 100644 --- a/artiq/devices/ctlmgr.py +++ b/artiq/devices/ctlmgr.py @@ -3,6 +3,7 @@ import logging import subprocess import shlex import socket +import os from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import AsyncioClient From 53e5d0a7bb974dd6d8fc08b0087f49254421f5c8 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Tue, 2 Feb 2016 15:31:40 -0700 Subject: [PATCH 4/7] worker: flake8 style cleanup --- artiq/master/worker.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 12d05c796..668a68c91 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -5,7 +5,6 @@ import logging import subprocess import traceback import time -from functools import partial from artiq.protocols import pipe_ipc, pyon from artiq.protocols.logging import LogParser @@ -130,12 +129,13 @@ class Worker: except ProcessLookupError: pass try: - await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + await asyncio.wait_for(self.ipc.process.wait(), + term_timeout) logger.debug("worker terminated (RID %s)", self.rid) return except asyncio.TimeoutError: - logger.warning("worker did not terminate (RID %s), killing", - self.rid) + logger.warning( + "worker did not terminate (RID %s), killing", self.rid) try: self.ipc.process.kill() except ProcessLookupError: @@ -215,7 +215,8 @@ class Worker: reply = {"status": "ok", "data": data} except Exception as e: reply = {"status": "failed", - "exception": traceback.format_exception_only(type(e), e)[0][:-1], + "exception": traceback.format_exception_only( + type(e), e)[0][:-1], "message": str(e), "traceback": traceback.format_tb(e.__traceback__)} await self.io_lock.acquire() @@ -242,7 +243,8 @@ class Worker: del self.watchdogs[-1] return completed - async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): + async def build(self, rid, pipeline_name, wd, expid, priority, + timeout=15.0): self.rid = rid self.filename = os.path.basename(expid["file"]) await self._create_process(expid["log_level"]) @@ -287,6 +289,7 @@ class Worker: await self._create_process(logging.WARNING) r = dict() + def register(class_name, name, arginfo): r[class_name] = {"name": name, "arginfo": arginfo} self.register_experiment = register From f7df39324804968baa1d7641b42143aaf35e9751 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Tue, 2 Feb 2016 15:32:17 -0700 Subject: [PATCH 5/7] hardware_testbench: full shutdown sequence for controllers --- artiq/test/hardware_testbench.py | 50 +++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/artiq/test/hardware_testbench.py b/artiq/test/hardware_testbench.py index a611e5905..4143da8c1 100644 --- a/artiq/test/hardware_testbench.py +++ b/artiq/test/hardware_testbench.py @@ -8,11 +8,13 @@ import logging import subprocess import shlex import time +import socket from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.coredevice.core import CompileError from artiq.frontend.artiq_run import DummyScheduler +from artiq.protocols.pc_rpc import AutoTarget, Client artiq_root = os.getenv("ARTIQ_ROOT") @@ -27,13 +29,13 @@ class ControllerCase(unittest.TestCase): self.controllers = {} def tearDown(self): - for name in self.controllers: - self.device_mgr.get(name).terminate() self.device_mgr.close_devices() for name in list(self.controllers): self.stop_controller(name) def start_controller(self, name, sleep=1): + if name in self.controllers: + raise ValueError("controller `{}` already started".format(name)) try: entry = self.device_db.get(name) except KeyError: @@ -46,15 +48,43 @@ class ControllerCase(unittest.TestCase): time.sleep(sleep) def stop_controller(self, name, default_timeout=1): - entry, proc = self.controllers[name] - t = entry.get("term_timeout", default_timeout) - proc.terminate() + desc, proc = self.controllers[name] + t = desc.get("term_timeout", default_timeout) + target_name = desc.get("target_name", None) + if target_name is None: + target_name = AutoTarget try: - proc.wait(t) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait(t) - del self.controllers[name] + try: + client = Client(desc["host"], desc["port"], target_name, t) + try: + client.terminate() + finally: + client.close_rpc() + proc.wait(t) + return + except (socket.timeout, subprocess.TimeoutExpired): + logger.warning("Controller %s failed to exit on request", name) + try: + proc.terminate() + except ProcessLookupError: + pass + try: + proc.wait(t) + return + except subprocess.TimeoutExpired: + logger.warning("Controller %s failed to exit on terminate", + name) + try: + proc.kill() + except ProcessLookupError: + pass + try: + proc.wait(t) + return + except subprocess.TimeoutExpired: + logger.warning("Controller %s failed to die on kill", name) + finally: + del self.controllers[name] @unittest.skipUnless(artiq_root, "no ARTIQ_ROOT") From c28b938471d831d9ee5d7a057482793b37a77917 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Tue, 2 Feb 2016 15:42:47 -0700 Subject: [PATCH 6/7] Client: add note about timeout sideeffects --- artiq/protocols/pc_rpc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py index a861b48cd..7f6140ecd 100644 --- a/artiq/protocols/pc_rpc.py +++ b/artiq/protocols/pc_rpc.py @@ -95,7 +95,9 @@ class Client: (default), ``0`` for non-blocking, and a finite value to raise ``socket.timeout`` if an operation does not complete within the given time. See also ``socket.create_connection()`` and - ``socket.settimeout()`` in the Python standard library. + ``socket.settimeout()`` in the Python standard library. A timeout + in the middle of a RPC can break subsequent RPCs (from the same + client). """ def __init__(self, host, port, target_name=AutoTarget, timeout=None): self.__socket = socket.create_connection((host, port), timeout) From 912274c6af9c8deb9fbb43ef5e25a5999969cdf8 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Fri, 5 Feb 2016 15:08:49 -0700 Subject: [PATCH 7/7] test_ctlmgr: fix --- artiq/test/test_ctlmgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artiq/test/test_ctlmgr.py b/artiq/test/test_ctlmgr.py index de205a677..7f081ca62 100644 --- a/artiq/test/test_ctlmgr.py +++ b/artiq/test/test_ctlmgr.py @@ -56,7 +56,6 @@ class ControllerCase(unittest.TestCase): raise asyncio.TimeoutError def test_start_ping_stop_controller(self): - command = sys.executable + " -m " entry = { "type": "controller", "host": "::1", @@ -67,5 +66,6 @@ class ControllerCase(unittest.TestCase): async def test(): await self.start("lda_sim", entry) remote = await self.get_client(entry["host"], entry["port"]) + await remote.ping() self.loop.run_until_complete(test())