From 4707aef45cbbb560bb874165e2f721093510d7e9 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 14 Nov 2019 15:21:51 +0800 Subject: [PATCH] split out artiq-comtools --- artiq/frontend/artiq_browser.py | 5 +- artiq/frontend/artiq_ctlmgr.py | 85 ------- artiq/frontend/artiq_dashboard.py | 3 +- artiq/frontend/artiq_influxdb.py | 251 -------------------- artiq/frontend/artiq_influxdb_schedule.py | 242 -------------------- artiq/frontend/artiq_master.py | 2 +- artiq/frontend/artiq_session.py | 6 +- artiq/master/ctlmgr.py | 265 ---------------------- artiq/master/scheduler.py | 4 +- artiq/test/test_ctlmgr.py | 85 ------- artiq/test/test_frontends.py | 4 +- artiq/tools.py | 66 +----- doc/manual/conf.py | 2 +- doc/manual/installing.rst | 1 + doc/manual/management_system.rst | 12 +- doc/manual/utilities.rst | 14 -- install-with-conda.py | 1 + setup.py | 6 +- 18 files changed, 20 insertions(+), 1034 deletions(-) delete mode 100755 artiq/frontend/artiq_ctlmgr.py delete mode 100755 artiq/frontend/artiq_influxdb.py delete mode 100755 artiq/frontend/artiq_influxdb_schedule.py delete mode 100644 artiq/master/ctlmgr.py delete mode 100644 artiq/test/test_ctlmgr.py diff --git a/artiq/frontend/artiq_browser.py b/artiq/frontend/artiq_browser.py index e002cac83..4faa8fa47 100755 --- a/artiq/frontend/artiq_browser.py +++ b/artiq/frontend/artiq_browser.py @@ -9,10 +9,11 @@ import logging from PyQt5 import QtCore, QtGui, QtWidgets from quamash import QEventLoop +from sipyco.asyncio_tools import atexit_register_coroutine + from artiq import __version__ as artiq_version from artiq import __artiq_dir__ as artiq_dir -from artiq.tools import (add_common_args, atexit_register_coroutine, - get_user_config_dir) +from artiq.tools import add_common_args, get_user_config_dir from artiq.gui import state, applets, models, log from artiq.browser import datasets, files, experiments diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py deleted file mode 100755 index 6612e8e9d..000000000 --- a/artiq/frontend/artiq_ctlmgr.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import atexit -import argparse -import os -import logging -import platform - -from sipyco.pc_rpc import Server -from sipyco.logging_tools import LogForwarder, SourceFilter -from sipyco import common_args - -from artiq.tools import atexit_register_coroutine -from artiq.master.ctlmgr import ControllerManager - - -def get_argparser(): - parser = argparse.ArgumentParser(description="ARTIQ controller manager") - - common_args.verbosity_args(parser) - - parser.add_argument( - "-s", "--server", default="::1", - help="hostname or IP of the master to connect to") - parser.add_argument( - "--port-notify", default=3250, type=int, - help="TCP port to connect to for notifications") - parser.add_argument( - "--port-logging", default=1066, type=int, - help="TCP port to connect to for logging") - parser.add_argument( - "--retry-master", default=5.0, type=float, - help="retry timer for reconnecting to master") - common_args.simple_network_args(parser, [("control", "control", 3249)]) - return parser - - -def main(): - args = get_argparser().parse_args() - - root_logger = logging.getLogger() - root_logger.setLevel(logging.NOTSET) - source_adder = SourceFilter(logging.WARNING + - args.quiet*10 - args.verbose*10, - "ctlmgr({})".format(platform.node())) - console_handler = logging.StreamHandler() - console_handler.setFormatter(logging.Formatter( - "%(levelname)s:%(source)s:%(name)s:%(message)s")) - console_handler.addFilter(source_adder) - root_logger.addHandler(console_handler) - - if os.name == "nt": - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - else: - loop = asyncio.get_event_loop() - atexit.register(loop.close) - - logfwd = LogForwarder(args.server, args.port_logging, - args.retry_master) - logfwd.addFilter(source_adder) - root_logger.addHandler(logfwd) - logfwd.start() - atexit_register_coroutine(logfwd.stop) - - ctlmgr = ControllerManager(args.server, args.port_notify, - args.retry_master) - ctlmgr.start() - atexit_register_coroutine(ctlmgr.stop) - - class CtlMgrRPC: - retry_now = ctlmgr.retry_now - - rpc_target = CtlMgrRPC() - rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True) - loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args), - args.port_control)) - atexit_register_coroutine(rpc_server.stop) - - loop.run_until_complete(rpc_server.wait_terminate()) - - -if __name__ == "__main__": - main() diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index d954b0190..ba0b9fd05 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -12,10 +12,11 @@ from quamash import QEventLoop from sipyco.pc_rpc import AsyncioClient, Client from sipyco.broadcast import Receiver from sipyco import common_args +from sipyco.asyncio_tools import atexit_register_coroutine from artiq import __version__ as artiq_version from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version -from artiq.tools import atexit_register_coroutine, get_user_config_dir +from artiq.tools import get_user_config_dir from artiq.gui.models import ModelSubscriber from artiq.gui import state, log from artiq.dashboard import (experiments, shortcuts, explorer, diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py deleted file mode 100755 index 2a1fd8720..000000000 --- a/artiq/frontend/artiq_influxdb.py +++ /dev/null @@ -1,251 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import asyncio -import atexit -import fnmatch -from functools import partial -import time - -import numpy as np -import aiohttp - -from sipyco import common_args -from sipyco.asyncio_tools import TaskObject -from sipyco.sync_struct import Subscriber -from sipyco.pc_rpc import Server -from sipyco import pyon - -from artiq.tools import atexit_register_coroutine - - -logger = logging.getLogger(__name__) - - -def get_argparser(): - parser = argparse.ArgumentParser( - description="ARTIQ data to InfluxDB bridge", - epilog="Pattern matching works as follows. " - "The default action on a key (dataset name) is to log it. " - "Then the patterns are traversed in order and glob-matched " - "with the key. " - "Optional + and - pattern prefixes specify to either ignore or " - "log keys matching the rest of the pattern. " - "Default (in the absence of prefix) is to ignore. Last matched " - "pattern takes precedence.") - master_group = parser.add_argument_group("master") - master_group.add_argument( - "--server-master", default="::1", - help="hostname or IP of the master to connect to") - master_group.add_argument( - "--port-master", default=3250, type=int, - help="TCP port to use to connect to the master (default: %(default)s") - master_group.add_argument( - "--retry-master", default=5.0, type=float, - help="retry timer for reconnecting to master") - database_group = parser.add_argument_group("database") - database_group.add_argument( - "--baseurl-db", default="http://localhost:8086", - help="base URL to access InfluxDB (default: %(default)s)") - database_group.add_argument( - "--user-db", default="", help="InfluxDB username") - database_group.add_argument( - "--password-db", default="", help="InfluxDB password") - database_group.add_argument( - "--database", default="db", help="database name to use") - database_group.add_argument( - "--table", default="lab", help="table name to use") - filter_group = parser.add_argument_group("filter") - filter_group.add_argument( - "--pattern-file", default="influxdb_patterns.cfg", - help="file to load the patterns from (default: %(default)s). " - "If the file is not found, no patterns are loaded " - "(everything is logged).") - common_args.simple_network_args(parser, [("control", "control", 3248)]) - common_args.verbosity_args(parser) - return parser - - -def format_influxdb(v): - if np.issubdtype(type(v), np.bool_): - return "bool={}".format(v) - if np.issubdtype(type(v), np.integer): - return "int={}i".format(v) - if np.issubdtype(type(v), np.floating): - return "float={}".format(v) - if np.issubdtype(type(v), np.str_): - return "str=\"{}\"".format(v.replace('"', '\\"')) - return "pyon=\"{}\"".format(pyon.encode(v).replace('"', '\\"')) - - -class DBWriter(TaskObject): - def __init__(self, base_url, user, password, database, table): - self.base_url = base_url - self.user = user - self.password = password - self.database = database - self.table = table - - self._queue = asyncio.Queue(100) - - def update(self, k, v): - try: - self._queue.put_nowait((k, v, time.time())) - except asyncio.QueueFull: - logger.warning("failed to update dataset '%s': " - "too many pending updates", k) - - async def _do(self): - async with aiohttp.ClientSession() as session: - while True: - k, v, t = await self._queue.get() - url = self.base_url + "/write" - params = {"u": self.user, "p": self.password, - "db": self.database, "precision": "ms"} - data = "{},dataset={} {} {}".format( - self.table, k, format_influxdb(v), round(t*1e3)) - try: - response = await session.post(url, params=params, - data=data) - except Exception: - logger.warning("got exception trying to update '%s'", - k, exc_info=True) - else: - if response.status not in (200, 204): - content = (await response.content.read()).decode() \ - .strip() - logger.warning("got HTTP status %d " - "trying to update '%s': %s", - response.status, k, content) - response.close() - - -class _Mock: - def __setitem__(self, k, v): - pass - - def __getitem__(self, k): - return self - - def __delitem__(self, k): - pass - - def append(self, v): - pass - - -class Datasets: - def __init__(self, filter_function, writer, init): - self.filter_function = filter_function - self.writer = writer - - def __setitem__(self, k, v): - if self.filter_function(k): - self.writer.update(k, v[1]) - - # ignore mutations - def __getitem__(self, k): - return _Mock() - - # ignore deletions - def __delitem__(self, k): - pass - - -class MasterReader(TaskObject): - def __init__(self, server, port, retry, filter_function, writer): - self.server = server - self.port = port - self.retry = retry - - self.filter_function = filter_function - self.writer = writer - - async def _do(self): - subscriber = Subscriber( - "datasets", - partial(Datasets, self.filter_function, self.writer)) - while True: - try: - await subscriber.connect(self.server, self.port) - try: - await asyncio.wait_for(subscriber.receive_task, None) - finally: - await subscriber.close() - except (ConnectionAbortedError, ConnectionError, - ConnectionRefusedError, ConnectionResetError) as e: - logger.warning("Connection to master failed (%s: %s)", - e.__class__.__name__, str(e)) - else: - logger.warning("Connection to master lost") - logger.warning("Retrying in %.1f seconds", self.retry) - await asyncio.sleep(self.retry) - - -class Filter: - def __init__(self, pattern_file): - self.pattern_file = pattern_file - self.scan_patterns() - - def scan_patterns(self): - """(Re)load the patterns file.""" - try: - with open(self.pattern_file, "r") as f: - self.patterns = [] - for line in f: - line = line.rstrip() - if line: - self.patterns.append(line) - except FileNotFoundError: - logger.info("no pattern file found, logging everything") - self.patterns = [] - - # Privatize so that it is not shown in sipyco_rpctool list-methods. - def _filter(self, k): - take = "+" - for pattern in self.patterns: - sign = "-" - if pattern[0] in "+-": - sign, pattern = pattern[0], pattern[1:] - if fnmatch.fnmatchcase(k, pattern): - take = sign - return take == "+" - - def get_patterns(self): - """Show existing patterns.""" - return self.patterns - - def ping(self): - return True - - -def main(): - args = get_argparser().parse_args() - common_args.init_logger_from_args(args) - - loop = asyncio.get_event_loop() - atexit.register(loop.close) - - writer = DBWriter(args.baseurl_db, - args.user_db, args.password_db, - args.database, args.table) - writer.start() - atexit_register_coroutine(writer.stop) - - filter = Filter(args.pattern_file) - rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True) - loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args), - args.port_control)) - atexit_register_coroutine(rpc_server.stop) - - reader = MasterReader(args.server_master, args.port_master, - args.retry_master, filter._filter, writer) - reader.start() - atexit_register_coroutine(reader.stop) - - loop.run_until_complete(rpc_server.wait_terminate()) - - -if __name__ == "__main__": - main() diff --git a/artiq/frontend/artiq_influxdb_schedule.py b/artiq/frontend/artiq_influxdb_schedule.py deleted file mode 100755 index 23c391c87..000000000 --- a/artiq/frontend/artiq_influxdb_schedule.py +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import logging -import asyncio -import atexit -import time - -import aiohttp -import numpy as np - -from sipyco.sync_struct import Subscriber -from sipyco.pc_rpc import Server -from sipyco import pyon -from sipyco import common_args -from sipyco.asyncio_tools import TaskObject - -from artiq.tools import atexit_register_coroutine - - -logger = logging.getLogger(__name__) - - -def get_argparser(): - parser = argparse.ArgumentParser( - description="ARTIQ schedule InfluxDB logger bridge", - epilog="Listens to schedule updates on the master experiment schedule " - "and submits schedule additions and removals to the InfluxDB " - "database. Other schedule changes, such as transitions between " - "pipeline states (prepare, prepare_done, running, etc.) are " - "ignored. Typical high cardinality metadata is logged as " - "fields while low cardinality data is logged as tags. " - "The initially obtained complete state is logged as a 'clear' " - "entry followed by the addition of all entries.") - group = parser.add_argument_group("master") - group.add_argument( - "--server-master", default="::1", - help="hostname or IP of the master to connect to") - group.add_argument( - "--port-master", default=3250, type=int, - help="TCP port to use to connect to the master") - group.add_argument( - "--retry-master", default=5.0, type=float, - help="retry timer for reconnecting to master") - group = parser.add_argument_group("database") - group.add_argument( - "--baseurl-db", default="http://localhost:8086", - help="base URL to access InfluxDB (default: %(default)s)") - group.add_argument( - "--user-db", default="", help="InfluxDB username") - group.add_argument( - "--password-db", default="", help="InfluxDB password") - group.add_argument( - "--database", default="db", help="database name to use") - group.add_argument( - "--table", default="schedule", help="table name to use") - common_args.simple_network_args(parser, [("control", "control", 3275)]) - common_args.verbosity_args(parser) - return parser - - -def format_influxdb(v, tag=True): - if np.issubdtype(type(v), np.bool_): - return "{}".format(v) - if np.issubdtype(type(v), np.integer): - return "{}i".format(v) - if np.issubdtype(type(v), np.floating): - return "{}".format(v) - if not np.issubdtype(type(v), np.str_): - v = pyon.encode(v) - if tag: - for i in ",= ": - v = v.replace(i, "\\" + i) - return v - else: - return "\"{}\"".format(v.replace('"', '\\"')) - - -class DBWriter(TaskObject): - def __init__(self, base_url, user, password, database, table): - self.base_url = base_url - self.user = user - self.password = password - self.database = database - self.table = table - - self._queue = asyncio.Queue(100) - - def update(self, fields, tags): - try: - self._queue.put_nowait((fields, tags, time.time())) - except asyncio.QueueFull: - logger.warning("failed to update schedule: " - "too many pending updates") - - async def _do(self): - async with aiohttp.ClientSession() as session: - while True: - fields, tags, timestamp = await self._queue.get() - url = self.base_url + "/write" - params = {"u": self.user, "p": self.password, - "db": self.database, "precision": "ms"} - tags = ",".join("{}={}".format( - k, format_influxdb(v, tag=True)) - for (k, v) in tags.items()) - fields = ",".join("{}={}".format( - k, format_influxdb(v, tag=False)) - for (k, v) in fields.items()) - data = "{},{} {} {}".format( - self.table, tags, fields, round(timestamp*1e3)) - try: - response = await session.post( - url, params=params, data=data) - except: - logger.warning("got exception trying to update schedule", - exc_info=True) - else: - if response.status not in (200, 204): - content = ( - await response.content.read()).decode().strip() - logger.warning("got HTTP status %d " - "trying to update schedule: %s", - response.status, content) - response.close() - - -class Log(dict): - def __init__(self, writer): - self.writer = writer - - def init(self, x): - self.clear() - self.update(x) - self.writer.update({"rid": -1}, {"status": "clear"}) - for k, v in self.items(): - self.notify_cb(dict(action="setitem", key=k, value=v)) - return self - - def notify_cb(self, mod): - if not mod.get("path"): - if mod["action"] == "setitem": - rid = mod["key"] - v = mod["value"] - logger.debug("added: %s: %s", rid, v) - self.writer.update( - fields={ - "rid": rid, - "log_level": v["expid"]["log_level"], - "priority": v["priority"], - "due_date": v["due_date"] or -1., - "arguments": v["expid"].get("arguments", {}), - "repo_rev": v["expid"]["repo_rev"], - "flush": v["flush"], - }, - tags={ - "status": "add", - "class_name": v["expid"]["class_name"], - "file": v["expid"]["file"], - "pipeline": v["pipeline"], - }) - elif mod["action"] == "delitem": - rid = mod["key"] - logger.debug("removed: %s", rid) - self.writer.update({"rid": rid}, {"status": "remove"}) - elif (mod["action"] == "setitem" and mod["key"] == "status" - and mod["value"] == "running"): - rid = mod["path"][0] - logger.debug("run: %s", rid) - - def disconnect_cb(self): - logger.warn("disconnect") - - -class MasterReader(TaskObject): - def __init__(self, server, port, retry, writer): - self.server = server - self.port = port - self.retry = retry - - self.writer = writer - - async def _do(self): - subscriber = Subscriber( - "schedule", - target_builder=self.writer.init, - notify_cb=self.writer.notify_cb, - disconnect_cb=self.writer.disconnect_cb) - while True: - try: - await subscriber.connect(self.server, self.port) - try: - await asyncio.wait_for(subscriber.receive_task, None) - finally: - await subscriber.close() - except (ConnectionAbortedError, ConnectionError, - ConnectionRefusedError, ConnectionResetError) as e: - logger.warning("Connection to master failed (%s: %s)", - e.__class__.__name__, str(e)) - except Exception as e: - logger.exception(e) - else: - logger.warning("Connection to master lost") - logger.warning("Retrying in %.1f seconds", self.retry) - await asyncio.sleep(self.retry) - - -class Logger: - def ping(self): - return True - - -def main(): - args = get_argparser().parse_args() - common_args.init_logger_from_args(args) - - loop = asyncio.get_event_loop() - atexit.register(loop.close) - - writer = DBWriter(args.baseurl_db, - args.user_db, args.password_db, - args.database, args.table) - writer.start() - atexit_register_coroutine(writer.stop) - - log = Log(writer) - - server = Logger() - rpc_server = Server({"schedule_logger": server}, builtin_terminate=True) - loop.run_until_complete(rpc_server.start( - common_args.bind_address_from_args(args), args.port_control)) - atexit_register_coroutine(rpc_server.stop) - - reader = MasterReader(args.server_master, args.port_master, - args.retry_master, log) - reader.start() - atexit_register_coroutine(reader.stop) - - loop.run_until_complete(rpc_server.wait_terminate()) - - -if __name__ == "__main__": - main() diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index ff9014b50..1a5073692 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -11,9 +11,9 @@ from sipyco.sync_struct import Publisher from sipyco.logging_tools import Server as LoggingServer from sipyco.broadcast import Broadcaster from sipyco import common_args +from sipyco.asyncio_tools import atexit_register_coroutine from artiq import __version__ as artiq_version -from artiq.tools import atexit_register_coroutine from artiq.master.log import log_args, init_log from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler diff --git a/artiq/frontend/artiq_session.py b/artiq/frontend/artiq_session.py index be9fa4ae1..3a500c90f 100755 --- a/artiq/frontend/artiq_session.py +++ b/artiq/frontend/artiq_session.py @@ -10,7 +10,9 @@ def get_argparser(): parser = argparse.ArgumentParser( description="ARTIQ session manager. " "Automatically runs the master, dashboard and " - "local controller manager on the current machine.") + "local controller manager on the current machine. " + "The latter requires the artiq-comtools package to " + "be installed.") parser.add_argument("--version", action="version", version="ARTIQ v{}".format(artiq_version), help="print the ARTIQ version number") @@ -28,7 +30,7 @@ def main(): master_cmd = [sys.executable, "-u", "-m", "artiq.frontend.artiq_master"] dashboard_cmd = [sys.executable, "-m", "artiq.frontend.artiq_dashboard"] - ctlmgr_cmd = [sys.executable, "-m", "artiq.frontend.artiq_ctlmgr"] + ctlmgr_cmd = [sys.executable, "-m", "artiq_comtools.artiq_ctlmgr"] master_cmd += args.m dashboard_cmd += args.d ctlmgr_cmd += args.c diff --git a/artiq/master/ctlmgr.py b/artiq/master/ctlmgr.py deleted file mode 100644 index 075f08a41..000000000 --- a/artiq/master/ctlmgr.py +++ /dev/null @@ -1,265 +0,0 @@ -import asyncio -import logging -import subprocess -import shlex -import socket -import os - -from sipyco.sync_struct import Subscriber -from sipyco.pc_rpc import AsyncioClient -from sipyco.logging_tools import LogParser -from sipyco.asyncio_tools import TaskObject - -from artiq.tools import Condition - - -logger = logging.getLogger(__name__) - - -class Controller: - def __init__(self, name, ddb_entry): - self.name = name - self.command = ddb_entry["command"] - self.retry_timer = ddb_entry.get("retry_timer", 5) - self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1) - - self.host = ddb_entry["host"] - self.port = ddb_entry["port"] - self.ping_timer = ddb_entry.get("ping_timer", 30) - self.ping_timeout = ddb_entry.get("ping_timeout", 30) - self.term_timeout = ddb_entry.get("term_timeout", 30) - - self.retry_timer_cur = self.retry_timer - self.retry_now = Condition() - self.process = None - self.launch_task = asyncio.ensure_future(self.launcher()) - - async def end(self): - self.launch_task.cancel() - await asyncio.wait_for(self.launch_task, None) - - async def call(self, method, *args, **kwargs): - remote = AsyncioClient() - await remote.connect_rpc(self.host, self.port, None) - try: - targets, _ = remote.get_rpc_id() - await remote.select_rpc_target(targets[0]) - r = await getattr(remote, method)(*args, **kwargs) - finally: - remote.close_rpc() - return r - - async def _ping(self): - try: - ok = await asyncio.wait_for(self.call("ping"), - self.ping_timeout) - if ok: - self.retry_timer_cur = self.retry_timer - return ok - except: - return False - - async def _wait_and_ping(self): - while True: - try: - await asyncio.wait_for(self.process.wait(), - self.ping_timer) - except asyncio.TimeoutError: - logger.debug("pinging controller %s", self.name) - ok = await self._ping() - if not ok: - logger.warning("Controller %s ping failed", self.name) - await self._terminate() - return - else: - break - - def _get_log_source(self): - return "controller({})".format(self.name) - - async def launcher(self): - try: - while True: - logger.info("Starting controller %s with command: %s", - self.name, self.command) - try: - env = os.environ.copy() - env["PYTHONUNBUFFERED"] = "1" - self.process = await asyncio.create_subprocess_exec( - *shlex.split(self.command), - stdout=subprocess.PIPE, stderr=subprocess.PIPE, - env=env, start_new_session=True) - asyncio.ensure_future( - LogParser(self._get_log_source).stream_task( - self.process.stdout)) - asyncio.ensure_future( - LogParser(self._get_log_source).stream_task( - self.process.stderr)) - await self._wait_and_ping() - except FileNotFoundError: - logger.warning("Controller %s failed to start", self.name) - else: - logger.warning("Controller %s exited", self.name) - logger.warning("Restarting in %.1f seconds", - self.retry_timer_cur) - try: - await asyncio.wait_for(self.retry_now.wait(), - self.retry_timer_cur) - except asyncio.TimeoutError: - pass - self.retry_timer_cur *= self.retry_timer_backoff - except asyncio.CancelledError: - await self._terminate() - - async def _terminate(self): - if self.process is None or self.process.returncode is not None: - logger.info("Controller %s already terminated", self.name) - return - logger.debug("Terminating controller %s", self.name) - try: - await asyncio.wait_for(self.call("terminate"), self.term_timeout) - await asyncio.wait_for(self.process.wait(), self.term_timeout) - logger.info("Controller %s terminated", self.name) - return - except: - logger.warning("Controller %s did not exit on request, " - "ending the process", self.name) - if os.name != "nt": - try: - self.process.terminate() - except ProcessLookupError: - pass - try: - await asyncio.wait_for(self.process.wait(), self.term_timeout) - logger.info("Controller process %s terminated", self.name) - return - except asyncio.TimeoutError: - logger.warning("Controller process %s did not terminate, " - "killing", self.name) - try: - self.process.kill() - except ProcessLookupError: - pass - try: - await asyncio.wait_for(self.process.wait(), self.term_timeout) - logger.info("Controller process %s killed", self.name) - return - except asyncio.TimeoutError: - logger.warning("Controller process %s failed to die", self.name) - - -def get_ip_addresses(host): - try: - addrinfo = socket.getaddrinfo(host, None) - except: - return set() - return {info[4][0] for info in addrinfo} - - -class Controllers: - def __init__(self): - self.host_filter = None - self.active_or_queued = set() - self.queue = asyncio.Queue() - self.active = dict() - self.process_task = asyncio.ensure_future(self._process()) - - async def _process(self): - while True: - action, param = await self.queue.get() - if action == "set": - k, ddb_entry = param - if k in self.active: - await self.active[k].end() - self.active[k] = Controller(k, ddb_entry) - elif action == "del": - await self.active[param].end() - del self.active[param] - self.queue.task_done() - if action not in ("set", "del"): - raise ValueError - - def __setitem__(self, k, v): - try: - if (isinstance(v, dict) and v["type"] == "controller" and - self.host_filter in get_ip_addresses(v["host"]) and - "command" in v): - v["command"] = v["command"].format(name=k, - bind=self.host_filter, - port=v["port"]) - self.queue.put_nowait(("set", (k, v))) - self.active_or_queued.add(k) - except: - logger.error("Failed to process device database entry %s", k, - exc_info=True) - - def __delitem__(self, k): - if k in self.active_or_queued: - self.queue.put_nowait(("del", k)) - self.active_or_queued.remove(k) - - def delete_all(self): - for name in set(self.active_or_queued): - del self[name] - - async def shutdown(self): - self.process_task.cancel() - for c in self.active.values(): - await c.end() - - -class ControllerDB: - def __init__(self): - self.current_controllers = Controllers() - - def set_host_filter(self, host_filter): - self.current_controllers.host_filter = host_filter - - def sync_struct_init(self, init): - if self.current_controllers is not None: - self.current_controllers.delete_all() - for k, v in init.items(): - self.current_controllers[k] = v - return self.current_controllers - - -class ControllerManager(TaskObject): - def __init__(self, server, port, retry_master): - self.server = server - self.port = port - self.retry_master = retry_master - self.controller_db = ControllerDB() - - async def _do(self): - try: - subscriber = Subscriber("devices", - self.controller_db.sync_struct_init) - while True: - try: - def set_host_filter(): - s = subscriber.writer.get_extra_info("socket") - localhost = s.getsockname()[0] - self.controller_db.set_host_filter(localhost) - await subscriber.connect(self.server, self.port, - set_host_filter) - try: - await asyncio.wait_for(subscriber.receive_task, None) - finally: - await subscriber.close() - except (ConnectionAbortedError, ConnectionError, - ConnectionRefusedError, ConnectionResetError) as e: - logger.warning("Connection to master failed (%s: %s)", - e.__class__.__name__, str(e)) - else: - logger.warning("Connection to master lost") - logger.warning("Retrying in %.1f seconds", self.retry_master) - await asyncio.sleep(self.retry_master) - except asyncio.CancelledError: - pass - finally: - await self.controller_db.current_controllers.shutdown() - - def retry_now(self, k): - """If a controller is disabled and pending retry, perform that retry - now.""" - self.controller_db.current_controllers.active[k].retry_now.notify() diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 78d90564b..a1c16715f 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -4,10 +4,10 @@ from enum import Enum from time import time from sipyco.sync_struct import Notifier -from sipyco.asyncio_tools import TaskObject +from sipyco.asyncio_tools import TaskObject, Condition from artiq.master.worker import Worker, log_worker_exception -from artiq.tools import asyncio_wait_or_cancel, Condition +from artiq.tools import asyncio_wait_or_cancel logger = logging.getLogger(__name__) diff --git a/artiq/test/test_ctlmgr.py b/artiq/test/test_ctlmgr.py deleted file mode 100644 index 728353033..000000000 --- a/artiq/test/test_ctlmgr.py +++ /dev/null @@ -1,85 +0,0 @@ -import os -import sys -import unittest -import logging -import asyncio - -from sipyco.pc_rpc import AsyncioClient - -from artiq.master.ctlmgr import Controllers -from artiq.tools import expect_no_log_messages - - -logger = logging.getLogger(__name__) - - -class ControllerCase(unittest.TestCase): - def setUp(self): - if os.name == "nt": - self.loop = asyncio.ProactorEventLoop() - else: - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - self.addCleanup(self.loop.close) - - self.controllers = Controllers() - self.controllers.host_filter = "::1" - self.addCleanup( - self.loop.run_until_complete, self.controllers.shutdown()) - - async def start(self, name, entry): - self.controllers[name] = entry - await self.controllers.queue.join() - await self.wait_for_ping(entry["host"], entry["port"]) - - async def get_client(self, host, port): - remote = AsyncioClient() - await remote.connect_rpc(host, port, None) - targets, _ = remote.get_rpc_id() - await remote.select_rpc_target(targets[0]) - self.addCleanup(remote.close_rpc) - return remote - - async def wait_for_ping(self, host, port, retries=5, timeout=2): - dt = timeout/retries - while timeout > 0: - try: - remote = await self.get_client(host, port) - ok = await asyncio.wait_for(remote.ping(), dt) - if not ok: - raise ValueError("unexcepted ping() response from " - "controller: `{}`".format(ok)) - return ok - except asyncio.TimeoutError: - timeout -= dt - except (ConnectionAbortedError, ConnectionError, - ConnectionRefusedError, ConnectionResetError): - await asyncio.sleep(dt) - timeout -= dt - raise asyncio.TimeoutError - - def test_start_ping_stop_controller(self): - entry = { - "type": "controller", - "host": "::1", - "port": 1068, - "command": (sys.executable.replace("\\", "\\\\") - + " -m artiq.frontend.aqctl_corelog " - + "-p {port} --simulation foo") - } - async def test(): - await self.start("corelog", entry) - remote = await self.get_client(entry["host"], entry["port"]) - await remote.ping() - - self.loop.run_until_complete(test()) - - def test_no_command_controller(self): - entry = { - "type": "controller", - "host": "::1", - "port": 1068 - } - with expect_no_log_messages(logging.ERROR): - self.controllers["corelog"] = entry - self.assertTrue(self.controllers.queue.empty()) diff --git a/artiq/test/test_frontends.py b/artiq/test/test_frontends.py index 0d88fb328..9bafbff4a 100644 --- a/artiq/test/test_frontends.py +++ b/artiq/test/test_frontends.py @@ -13,8 +13,8 @@ class TestFrontends(unittest.TestCase): "corelog" ], "artiq": [ - "client", "compile", "coreanalyzer", "coremgmt", "ctlmgr", - "netboot", "flash", "influxdb", "master", "mkfs", "route", + "client", "compile", "coreanalyzer", "coremgmt", + "netboot", "flash", "master", "mkfs", "route", "rtiomon", "run", "session" ] } diff --git a/artiq/tools.py b/artiq/tools.py index afeef8bc8..23095f251 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -1,7 +1,4 @@ import asyncio -import atexit -import collections -import contextlib import importlib.machinery import logging import os @@ -19,9 +16,7 @@ from artiq.language.environment import is_experiment __all__ = ["parse_arguments", "elide", "short_format", "file_import", "get_experiment", - "UnexpectedLogMessageError", "expect_no_log_messages", - "atexit_register_coroutine", "exc_to_warning", - "asyncio_wait_or_cancel", "Condition", + "exc_to_warning", "asyncio_wait_or_cancel", "get_windows_drives", "get_user_config_dir"] @@ -107,42 +102,6 @@ def get_experiment(module, class_name=None): return exps[0][1] -class UnexpectedLogMessageError(Exception): - pass - - -class FailingLogHandler(logging.Handler): - def emit(self, record): - raise UnexpectedLogMessageError("Unexpected log message: '{}'".format( - record.getMessage())) - - -@contextlib.contextmanager -def expect_no_log_messages(level, logger=None): - """Raise an UnexpectedLogMessageError if a log message of the given level - (or above) is emitted while the context is active. - - Example: :: - - with expect_no_log_messages(logging.ERROR): - do_stuff_that_should_not_log_errors() - """ - if logger is None: - logger = logging.getLogger() - handler = FailingLogHandler(level) - logger.addHandler(handler) - try: - yield - finally: - logger.removeHandler(handler) - - -def atexit_register_coroutine(coroutine, loop=None): - if loop is None: - loop = asyncio.get_event_loop() - atexit.register(lambda: loop.run_until_complete(coroutine())) - - async def exc_to_warning(coro): try: await coro @@ -165,29 +124,6 @@ async def asyncio_wait_or_cancel(fs, **kwargs): return fs -class Condition: - def __init__(self, *, loop=None): - if loop is not None: - self._loop = loop - else: - self._loop = asyncio.get_event_loop() - self._waiters = collections.deque() - - async def wait(self): - """Wait until notified.""" - fut = asyncio.Future(loop=self._loop) - self._waiters.append(fut) - try: - await fut - finally: - self._waiters.remove(fut) - - def notify(self): - for fut in self._waiters: - if not fut.done(): - fut.set_result(False) - - def get_windows_drives(): from ctypes import windll diff --git a/doc/manual/conf.py b/doc/manual/conf.py index 3b52040ad..6f9b76777 100644 --- a/doc/manual/conf.py +++ b/doc/manual/conf.py @@ -35,7 +35,7 @@ mock_modules = ["artiq.gui.waitingspinnerwidget", "quamash", "pyqtgraph", "matplotlib", "numpy", "dateutil", "dateutil.parser", "prettytable", "PyQt5", "h5py", "serial", "scipy", "scipy.interpolate", - "llvmlite_artiq", "Levenshtein", "aiohttp", "pythonparser"] + "llvmlite_artiq", "Levenshtein", "pythonparser"] for module in mock_modules: sys.modules[module] = Mock() diff --git a/doc/manual/installing.rst b/doc/manual/installing.rst index 5ce4f1259..33cd8498c 100644 --- a/doc/manual/installing.rst +++ b/doc/manual/installing.rst @@ -58,6 +58,7 @@ Installing multiple packages and making them visible to the ARTIQ commands requi (pkgs.python3.withPackages(ps: [ # List desired Python packages here. artiq-full.artiq + artiq-full.artiq-comtools # The board packages are also "Python" packages. You only need a board # package if you intend to reflash that board (those packages contain # only board firmware). diff --git a/doc/manual/management_system.rst b/doc/manual/management_system.rst index ebebe1b57..aafbb0361 100644 --- a/doc/manual/management_system.rst +++ b/doc/manual/management_system.rst @@ -16,7 +16,7 @@ The master is a headless component, and one or several clients (command-line or Controller manager ------------------ -:ref:`Controller managers ` are responsible for running and stopping controllers on a machine. There is one controller manager per network node that runs controllers. +Controller managers (started using the ``artiq_ctlmgr`` command that is part of the ``artiq-comtools`` package) are responsible for running and stopping controllers on a machine. There is one controller manager per network node that runs controllers. A controller manager connects to the master and uses the device database to determine what controllers need to be run. Changes in the device database are tracked by the manager and controllers are started and stopped accordingly. @@ -158,16 +158,6 @@ artiq_master :prog: artiq_master -.. _frontend-artiq-ctlmgr: - -artiq_ctlmgr ------------- - -.. argparse:: - :ref: artiq.frontend.artiq_ctlmgr.get_argparser - :prog: artiq_ctlmgr - - .. _frontend-artiq-client: artiq_client diff --git a/doc/manual/utilities.rst b/doc/manual/utilities.rst index 4946fea2a..9e056e55e 100644 --- a/doc/manual/utilities.rst +++ b/doc/manual/utilities.rst @@ -131,17 +131,3 @@ DRTIO routing table manipulation tool .. argparse:: :ref: artiq.frontend.artiq_route.get_argparser :prog: artiq_route - -Data to InfluxDB bridge ------------------------ - -.. argparse:: - :ref: artiq.frontend.artiq_influxdb.get_argparser - :prog: artiq_influxdb - -Schedule to InfluxDB logging bridge ------------------------------------ - -.. argparse:: - :ref: artiq.frontend.artiq_influxdb_schedule.get_argparser - :prog: artiq_influxdb_schedule diff --git a/install-with-conda.py b/install-with-conda.py index 2eab13866..fb09ad3ab 100644 --- a/install-with-conda.py +++ b/install-with-conda.py @@ -9,6 +9,7 @@ CONDA_ENV_NAME = "artiq" # The conda packages to download and install. CONDA_PACKAGES = [ "artiq", + "artiq-comtools", # Only install board packages if you plan to reflash the board. # The two lines below are just examples and probably not what you want. # Select the packages that correspond to your board, or remove them diff --git a/setup.py b/setup.py index fae2b91ae..3e0e722ad 100755 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ if sys.version_info[:3] < (3, 5, 3): requirements = [ "numpy", "scipy", "python-dateutil", "prettytable", "h5py", - "quamash", "pyqtgraph", "pygit2", "aiohttp", + "quamash", "pyqtgraph", "pygit2", "llvmlite_artiq", "pythonparser", "python-Levenshtein", ] @@ -23,11 +23,8 @@ console_scripts = [ "artiq_compile = artiq.frontend.artiq_compile:main", "artiq_coreanalyzer = artiq.frontend.artiq_coreanalyzer:main", "artiq_coremgmt = artiq.frontend.artiq_coremgmt:main", - "artiq_ctlmgr = artiq.frontend.artiq_ctlmgr:main", "artiq_ddb_template = artiq.frontend.artiq_ddb_template:main", "artiq_netboot = artiq.frontend.artiq_netboot:main", - "artiq_influxdb = artiq.frontend.artiq_influxdb:main", - "artiq_influxdb_schedule = artiq.frontend.artiq_influxdb_schedule:main", "artiq_master = artiq.frontend.artiq_master:main", "artiq_mkfs = artiq.frontend.artiq_mkfs:main", "artiq_rtiomon = artiq.frontend.artiq_rtiomon:main", @@ -35,7 +32,6 @@ console_scripts = [ "artiq_route = artiq.frontend.artiq_route:main", "artiq_run = artiq.frontend.artiq_run:main", "artiq_flash = artiq.frontend.artiq_flash:main", - "aqctl_corelog = artiq.frontend.aqctl_corelog:main", ]