diff --git a/artiq/devices/ctlmgr.py b/artiq/devices/ctlmgr.py index 83295cf69..c4873a66e 100644 --- a/artiq/devices/ctlmgr.py +++ b/artiq/devices/ctlmgr.py @@ -3,6 +3,7 @@ import logging import subprocess import shlex import socket +import os from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import AsyncioClient @@ -106,30 +107,40 @@ class Controller: await self._terminate() async def _terminate(self): - logger.info("Terminating controller %s", self.name) - if self.process is not None and self.process.returncode is None: + if self.process is None or self.process.returncode is not None: + logger.info("Controller %s already terminated", self.name) + return + logger.debug("Terminating controller %s", self.name) + try: + await asyncio.wait_for(self.call("terminate"), self.term_timeout) + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller %s terminated", self.name) + return + except: + logger.warning("Controller %s did not exit on request, " + "ending the process", self.name) + if os.name != "nt": try: - await asyncio.wait_for(self.call("terminate"), - self.term_timeout) - except: - logger.warning("Controller %s did not respond to terminate " - "command, killing", self.name) - try: - self.process.kill() - except ProcessLookupError: - pass + self.process.terminate() + except ProcessLookupError: + pass try: - await asyncio.wait_for(self.process.wait(), - self.term_timeout) - except: - logger.warning("Controller %s failed to exit, killing", - self.name) - try: - self.process.kill() - except ProcessLookupError: - pass - await self.process.wait() - logger.debug("Controller %s terminated", self.name) + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller process %s terminated", self.name) + return + except asyncio.TimeoutError: + logger.warning("Controller process %s did not terminate, " + "killing", self.name) + try: + self.process.kill() + except ProcessLookupError: + pass + try: + await asyncio.wait_for(self.process.wait(), self.term_timeout) + logger.info("Controller process %s killed", self.name) + return + except asyncio.TimeoutError: + logger.warning("Controller process %s failed to die", self.name) def get_ip_addresses(host): diff --git a/artiq/master/worker.py b/artiq/master/worker.py index fb1752aed..668a68c91 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -5,7 +5,6 @@ import logging import subprocess import traceback import time -from functools import partial from artiq.protocols import pipe_ipc, pyon from artiq.protocols.logging import LogParser @@ -115,30 +114,38 @@ class Worker: " (RID %s)", self.ipc.process.returncode, self.rid) return - obj = {"action": "terminate"} try: - await self._send(obj, cancellable=False) + await self._send({"action": "terminate"}, cancellable=False) + await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + logger.debug("worker exited on request (RID %s)", self.rid) + return except: - logger.debug("failed to send terminate command to worker" - " (RID %s), killing", self.rid, exc_info=True) + logger.debug("worker failed to exit on request" + " (RID %s), ending the process", self.rid, + exc_info=True) + if os.name != "nt": try: - self.ipc.process.kill() + self.ipc.process.terminate() except ProcessLookupError: pass - await self.ipc.process.wait() - return + try: + await asyncio.wait_for(self.ipc.process.wait(), + term_timeout) + logger.debug("worker terminated (RID %s)", self.rid) + return + except asyncio.TimeoutError: + logger.warning( + "worker did not terminate (RID %s), killing", self.rid) + try: + self.ipc.process.kill() + except ProcessLookupError: + pass try: await asyncio.wait_for(self.ipc.process.wait(), term_timeout) + logger.debug("worker killed (RID %s)", self.rid) + return except asyncio.TimeoutError: - logger.debug("worker did not exit by itself (RID %s), killing", - self.rid) - try: - self.ipc.process.kill() - except ProcessLookupError: - pass - await self.ipc.process.wait() - else: - logger.debug("worker exited by itself (RID %s)", self.rid) + logger.warning("worker refuses to die (RID %s)", self.rid) finally: self.io_lock.release() @@ -208,7 +215,8 @@ class Worker: reply = {"status": "ok", "data": data} except Exception as e: reply = {"status": "failed", - "exception": traceback.format_exception_only(type(e), e)[0][:-1], + "exception": traceback.format_exception_only( + type(e), e)[0][:-1], "message": str(e), "traceback": traceback.format_tb(e.__traceback__)} await self.io_lock.acquire() @@ -235,7 +243,8 @@ class Worker: del self.watchdogs[-1] return completed - async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): + async def build(self, rid, pipeline_name, wd, expid, priority, + timeout=15.0): self.rid = rid self.filename = os.path.basename(expid["file"]) await self._create_process(expid["log_level"]) @@ -280,6 +289,7 @@ class Worker: await self._create_process(logging.WARNING) r = dict() + def register(class_name, name, arginfo): r[class_name] = {"name": name, "arginfo": arginfo} self.register_experiment = register diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py index d1de343a9..7f6140ecd 100644 --- a/artiq/protocols/pc_rpc.py +++ b/artiq/protocols/pc_rpc.py @@ -57,8 +57,7 @@ def _validate_target_name(target_name, target_names): target_name = target_names[0] elif target_name not in target_names: raise IncompatibleServer( - "valid target name(s): " + - " ".join(sorted(target_names))) + "valid target name(s): " + " ".join(sorted(target_names))) return target_name @@ -92,9 +91,16 @@ class Client: Use ``None`` to skip selecting a target. The list of targets can then be retrieved using ``get_rpc_id`` and then one can be selected later using ``select_rpc_target``. + :param timeout: Socket operation timeout. Use ``None`` for blocking + (default), ``0`` for non-blocking, and a finite value to raise + ``socket.timeout`` if an operation does not complete within the + given time. See also ``socket.create_connection()`` and + ``socket.settimeout()`` in the Python standard library. A timeout + in the middle of a RPC can break subsequent RPCs (from the same + client). """ - def __init__(self, host, port, target_name=AutoTarget): - self.__socket = socket.create_connection((host, port)) + def __init__(self, host, port, target_name=AutoTarget, timeout=None): + self.__socket = socket.create_connection((host, port), timeout) try: self.__socket.sendall(_init_string) @@ -485,7 +491,8 @@ class Server(_AsyncioServer): obj = {"status": "ok", "ret": doc} elif obj["action"] == "call": logger.debug("calling %s", _PrettyPrintCall(obj)) - if self.builtin_terminate and obj["name"] == "terminate": + if (self.builtin_terminate and obj["name"] == + "terminate"): self._terminate_request.set() obj = {"status": "ok", "ret": None} else: diff --git a/artiq/test/hardware_testbench.py b/artiq/test/hardware_testbench.py index a611e5905..4143da8c1 100644 --- a/artiq/test/hardware_testbench.py +++ b/artiq/test/hardware_testbench.py @@ -8,11 +8,13 @@ import logging import subprocess import shlex import time +import socket from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.coredevice.core import CompileError from artiq.frontend.artiq_run import DummyScheduler +from artiq.protocols.pc_rpc import AutoTarget, Client artiq_root = os.getenv("ARTIQ_ROOT") @@ -27,13 +29,13 @@ class ControllerCase(unittest.TestCase): self.controllers = {} def tearDown(self): - for name in self.controllers: - self.device_mgr.get(name).terminate() self.device_mgr.close_devices() for name in list(self.controllers): self.stop_controller(name) def start_controller(self, name, sleep=1): + if name in self.controllers: + raise ValueError("controller `{}` already started".format(name)) try: entry = self.device_db.get(name) except KeyError: @@ -46,15 +48,43 @@ class ControllerCase(unittest.TestCase): time.sleep(sleep) def stop_controller(self, name, default_timeout=1): - entry, proc = self.controllers[name] - t = entry.get("term_timeout", default_timeout) - proc.terminate() + desc, proc = self.controllers[name] + t = desc.get("term_timeout", default_timeout) + target_name = desc.get("target_name", None) + if target_name is None: + target_name = AutoTarget try: - proc.wait(t) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait(t) - del self.controllers[name] + try: + client = Client(desc["host"], desc["port"], target_name, t) + try: + client.terminate() + finally: + client.close_rpc() + proc.wait(t) + return + except (socket.timeout, subprocess.TimeoutExpired): + logger.warning("Controller %s failed to exit on request", name) + try: + proc.terminate() + except ProcessLookupError: + pass + try: + proc.wait(t) + return + except subprocess.TimeoutExpired: + logger.warning("Controller %s failed to exit on terminate", + name) + try: + proc.kill() + except ProcessLookupError: + pass + try: + proc.wait(t) + return + except subprocess.TimeoutExpired: + logger.warning("Controller %s failed to die on kill", name) + finally: + del self.controllers[name] @unittest.skipUnless(artiq_root, "no ARTIQ_ROOT") diff --git a/artiq/test/test_ctlmgr.py b/artiq/test/test_ctlmgr.py index de205a677..7f081ca62 100644 --- a/artiq/test/test_ctlmgr.py +++ b/artiq/test/test_ctlmgr.py @@ -56,7 +56,6 @@ class ControllerCase(unittest.TestCase): raise asyncio.TimeoutError def test_start_ping_stop_controller(self): - command = sys.executable + " -m " entry = { "type": "controller", "host": "::1", @@ -67,5 +66,6 @@ class ControllerCase(unittest.TestCase): async def test(): await self.start("lda_sim", entry) remote = await self.get_client(entry["host"], entry["port"]) + await remote.ping() self.loop.run_until_complete(test())