Merge branch 'subprocess-termination'

* subprocess-termination:
  test_ctlmgr: fix
  Client: add note about timeout sideeffects
  hardware_testbench: full shutdown sequence for controllers
  worker: flake8 style cleanup
  ctlmgr: fix import
  pc_rpc.Client: support socket timeouts
  subprocesses: unify termination logic
This commit is contained in:
Robert Jördens 2016-02-14 22:28:11 +01:00
commit 055573a4af
5 changed files with 115 additions and 57 deletions

View File

@ -3,6 +3,7 @@ import logging
import subprocess import subprocess
import shlex import shlex
import socket import socket
import os
from artiq.protocols.sync_struct import Subscriber from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient from artiq.protocols.pc_rpc import AsyncioClient
@ -106,30 +107,40 @@ class Controller:
await self._terminate() await self._terminate()
async def _terminate(self): async def _terminate(self):
logger.info("Terminating controller %s", self.name) if self.process is None or self.process.returncode is not None:
if self.process is not None and self.process.returncode is None: logger.info("Controller %s already terminated", self.name)
return
logger.debug("Terminating controller %s", self.name)
try:
await asyncio.wait_for(self.call("terminate"), self.term_timeout)
await asyncio.wait_for(self.process.wait(), self.term_timeout)
logger.info("Controller %s terminated", self.name)
return
except:
logger.warning("Controller %s did not exit on request, "
"ending the process", self.name)
if os.name != "nt":
try: try:
await asyncio.wait_for(self.call("terminate"), self.process.terminate()
self.term_timeout) except ProcessLookupError:
except: pass
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
try: try:
await asyncio.wait_for(self.process.wait(), await asyncio.wait_for(self.process.wait(), self.term_timeout)
self.term_timeout) logger.info("Controller process %s terminated", self.name)
except: return
logger.warning("Controller %s failed to exit, killing", except asyncio.TimeoutError:
self.name) logger.warning("Controller process %s did not terminate, "
try: "killing", self.name)
self.process.kill() try:
except ProcessLookupError: self.process.kill()
pass except ProcessLookupError:
await self.process.wait() pass
logger.debug("Controller %s terminated", self.name) try:
await asyncio.wait_for(self.process.wait(), self.term_timeout)
logger.info("Controller process %s killed", self.name)
return
except asyncio.TimeoutError:
logger.warning("Controller process %s failed to die", self.name)
def get_ip_addresses(host): def get_ip_addresses(host):

View File

@ -5,7 +5,6 @@ import logging
import subprocess import subprocess
import traceback import traceback
import time import time
from functools import partial
from artiq.protocols import pipe_ipc, pyon from artiq.protocols import pipe_ipc, pyon
from artiq.protocols.logging import LogParser from artiq.protocols.logging import LogParser
@ -115,30 +114,38 @@ class Worker:
" (RID %s)", self.ipc.process.returncode, " (RID %s)", self.ipc.process.returncode,
self.rid) self.rid)
return return
obj = {"action": "terminate"}
try: try:
await self._send(obj, cancellable=False) await self._send({"action": "terminate"}, cancellable=False)
await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
logger.debug("worker exited on request (RID %s)", self.rid)
return
except: except:
logger.debug("failed to send terminate command to worker" logger.debug("worker failed to exit on request"
" (RID %s), killing", self.rid, exc_info=True) " (RID %s), ending the process", self.rid,
exc_info=True)
if os.name != "nt":
try: try:
self.ipc.process.kill() self.ipc.process.terminate()
except ProcessLookupError: except ProcessLookupError:
pass pass
await self.ipc.process.wait() try:
return await asyncio.wait_for(self.ipc.process.wait(),
term_timeout)
logger.debug("worker terminated (RID %s)", self.rid)
return
except asyncio.TimeoutError:
logger.warning(
"worker did not terminate (RID %s), killing", self.rid)
try:
self.ipc.process.kill()
except ProcessLookupError:
pass
try: try:
await asyncio.wait_for(self.ipc.process.wait(), term_timeout) await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
logger.debug("worker killed (RID %s)", self.rid)
return
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.debug("worker did not exit by itself (RID %s), killing", logger.warning("worker refuses to die (RID %s)", self.rid)
self.rid)
try:
self.ipc.process.kill()
except ProcessLookupError:
pass
await self.ipc.process.wait()
else:
logger.debug("worker exited by itself (RID %s)", self.rid)
finally: finally:
self.io_lock.release() self.io_lock.release()
@ -208,7 +215,8 @@ class Worker:
reply = {"status": "ok", "data": data} reply = {"status": "ok", "data": data}
except Exception as e: except Exception as e:
reply = {"status": "failed", reply = {"status": "failed",
"exception": traceback.format_exception_only(type(e), e)[0][:-1], "exception": traceback.format_exception_only(
type(e), e)[0][:-1],
"message": str(e), "message": str(e),
"traceback": traceback.format_tb(e.__traceback__)} "traceback": traceback.format_tb(e.__traceback__)}
await self.io_lock.acquire() await self.io_lock.acquire()
@ -235,7 +243,8 @@ class Worker:
del self.watchdogs[-1] del self.watchdogs[-1]
return completed return completed
async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): async def build(self, rid, pipeline_name, wd, expid, priority,
timeout=15.0):
self.rid = rid self.rid = rid
self.filename = os.path.basename(expid["file"]) self.filename = os.path.basename(expid["file"])
await self._create_process(expid["log_level"]) await self._create_process(expid["log_level"])
@ -280,6 +289,7 @@ class Worker:
await self._create_process(logging.WARNING) await self._create_process(logging.WARNING)
r = dict() r = dict()
def register(class_name, name, arginfo): def register(class_name, name, arginfo):
r[class_name] = {"name": name, "arginfo": arginfo} r[class_name] = {"name": name, "arginfo": arginfo}
self.register_experiment = register self.register_experiment = register

View File

@ -57,8 +57,7 @@ def _validate_target_name(target_name, target_names):
target_name = target_names[0] target_name = target_names[0]
elif target_name not in target_names: elif target_name not in target_names:
raise IncompatibleServer( raise IncompatibleServer(
"valid target name(s): " + "valid target name(s): " + " ".join(sorted(target_names)))
" ".join(sorted(target_names)))
return target_name return target_name
@ -92,9 +91,16 @@ class Client:
Use ``None`` to skip selecting a target. The list of targets can then Use ``None`` to skip selecting a target. The list of targets can then
be retrieved using ``get_rpc_id`` and then one can be selected later be retrieved using ``get_rpc_id`` and then one can be selected later
using ``select_rpc_target``. using ``select_rpc_target``.
:param timeout: Socket operation timeout. Use ``None`` for blocking
(default), ``0`` for non-blocking, and a finite value to raise
``socket.timeout`` if an operation does not complete within the
given time. See also ``socket.create_connection()`` and
``socket.settimeout()`` in the Python standard library. A timeout
in the middle of a RPC can break subsequent RPCs (from the same
client).
""" """
def __init__(self, host, port, target_name=AutoTarget): def __init__(self, host, port, target_name=AutoTarget, timeout=None):
self.__socket = socket.create_connection((host, port)) self.__socket = socket.create_connection((host, port), timeout)
try: try:
self.__socket.sendall(_init_string) self.__socket.sendall(_init_string)
@ -485,7 +491,8 @@ class Server(_AsyncioServer):
obj = {"status": "ok", "ret": doc} obj = {"status": "ok", "ret": doc}
elif obj["action"] == "call": elif obj["action"] == "call":
logger.debug("calling %s", _PrettyPrintCall(obj)) logger.debug("calling %s", _PrettyPrintCall(obj))
if self.builtin_terminate and obj["name"] == "terminate": if (self.builtin_terminate and obj["name"] ==
"terminate"):
self._terminate_request.set() self._terminate_request.set()
obj = {"status": "ok", "ret": None} obj = {"status": "ok", "ret": None}
else: else:

View File

@ -8,11 +8,13 @@ import logging
import subprocess import subprocess
import shlex import shlex
import time import time
import socket
from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.master.worker_db import DeviceManager, DatasetManager
from artiq.coredevice.core import CompileError from artiq.coredevice.core import CompileError
from artiq.frontend.artiq_run import DummyScheduler from artiq.frontend.artiq_run import DummyScheduler
from artiq.protocols.pc_rpc import AutoTarget, Client
artiq_root = os.getenv("ARTIQ_ROOT") artiq_root = os.getenv("ARTIQ_ROOT")
@ -27,13 +29,13 @@ class ControllerCase(unittest.TestCase):
self.controllers = {} self.controllers = {}
def tearDown(self): def tearDown(self):
for name in self.controllers:
self.device_mgr.get(name).terminate()
self.device_mgr.close_devices() self.device_mgr.close_devices()
for name in list(self.controllers): for name in list(self.controllers):
self.stop_controller(name) self.stop_controller(name)
def start_controller(self, name, sleep=1): def start_controller(self, name, sleep=1):
if name in self.controllers:
raise ValueError("controller `{}` already started".format(name))
try: try:
entry = self.device_db.get(name) entry = self.device_db.get(name)
except KeyError: except KeyError:
@ -46,15 +48,43 @@ class ControllerCase(unittest.TestCase):
time.sleep(sleep) time.sleep(sleep)
def stop_controller(self, name, default_timeout=1): def stop_controller(self, name, default_timeout=1):
entry, proc = self.controllers[name] desc, proc = self.controllers[name]
t = entry.get("term_timeout", default_timeout) t = desc.get("term_timeout", default_timeout)
proc.terminate() target_name = desc.get("target_name", None)
if target_name is None:
target_name = AutoTarget
try: try:
proc.wait(t) try:
except subprocess.TimeoutExpired: client = Client(desc["host"], desc["port"], target_name, t)
proc.kill() try:
proc.wait(t) client.terminate()
del self.controllers[name] finally:
client.close_rpc()
proc.wait(t)
return
except (socket.timeout, subprocess.TimeoutExpired):
logger.warning("Controller %s failed to exit on request", name)
try:
proc.terminate()
except ProcessLookupError:
pass
try:
proc.wait(t)
return
except subprocess.TimeoutExpired:
logger.warning("Controller %s failed to exit on terminate",
name)
try:
proc.kill()
except ProcessLookupError:
pass
try:
proc.wait(t)
return
except subprocess.TimeoutExpired:
logger.warning("Controller %s failed to die on kill", name)
finally:
del self.controllers[name]
@unittest.skipUnless(artiq_root, "no ARTIQ_ROOT") @unittest.skipUnless(artiq_root, "no ARTIQ_ROOT")

View File

@ -56,7 +56,6 @@ class ControllerCase(unittest.TestCase):
raise asyncio.TimeoutError raise asyncio.TimeoutError
def test_start_ping_stop_controller(self): def test_start_ping_stop_controller(self):
command = sys.executable + " -m "
entry = { entry = {
"type": "controller", "type": "controller",
"host": "::1", "host": "::1",
@ -67,5 +66,6 @@ class ControllerCase(unittest.TestCase):
async def test(): async def test():
await self.start("lda_sim", entry) await self.start("lda_sim", entry)
remote = await self.get_client(entry["host"], entry["port"]) remote = await self.get_client(entry["host"], entry["port"])
await remote.ping()
self.loop.run_until_complete(test()) self.loop.run_until_complete(test())