forked from M-Labs/artiq
1
0
Fork 0

split out artiq-comtools

This commit is contained in:
Sebastien Bourdeauducq 2019-11-14 15:21:51 +08:00
parent 7a92f443c2
commit 4707aef45c
18 changed files with 20 additions and 1034 deletions

View File

@ -9,10 +9,11 @@ import logging
from PyQt5 import QtCore, QtGui, QtWidgets
from quamash import QEventLoop
from sipyco.asyncio_tools import atexit_register_coroutine
from artiq import __version__ as artiq_version
from artiq import __artiq_dir__ as artiq_dir
from artiq.tools import (add_common_args, atexit_register_coroutine,
get_user_config_dir)
from artiq.tools import add_common_args, get_user_config_dir
from artiq.gui import state, applets, models, log
from artiq.browser import datasets, files, experiments

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
import asyncio
import atexit
import argparse
import os
import logging
import platform
from sipyco.pc_rpc import Server
from sipyco.logging_tools import LogForwarder, SourceFilter
from sipyco import common_args
from artiq.tools import atexit_register_coroutine
from artiq.master.ctlmgr import ControllerManager
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ controller manager")
common_args.verbosity_args(parser)
parser.add_argument(
"-s", "--server", default="::1",
help="hostname or IP of the master to connect to")
parser.add_argument(
"--port-notify", default=3250, type=int,
help="TCP port to connect to for notifications")
parser.add_argument(
"--port-logging", default=1066, type=int,
help="TCP port to connect to for logging")
parser.add_argument(
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
common_args.simple_network_args(parser, [("control", "control", 3249)])
return parser
def main():
args = get_argparser().parse_args()
root_logger = logging.getLogger()
root_logger.setLevel(logging.NOTSET)
source_adder = SourceFilter(logging.WARNING +
args.quiet*10 - args.verbose*10,
"ctlmgr({})".format(platform.node()))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
"%(levelname)s:%(source)s:%(name)s:%(message)s"))
console_handler.addFilter(source_adder)
root_logger.addHandler(console_handler)
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
atexit.register(loop.close)
logfwd = LogForwarder(args.server, args.port_logging,
args.retry_master)
logfwd.addFilter(source_adder)
root_logger.addHandler(logfwd)
logfwd.start()
atexit_register_coroutine(logfwd.stop)
ctlmgr = ControllerManager(args.server, args.port_notify,
args.retry_master)
ctlmgr.start()
atexit_register_coroutine(ctlmgr.stop)
class CtlMgrRPC:
retry_now = ctlmgr.retry_now
rpc_target = CtlMgrRPC()
rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True)
loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args),
args.port_control))
atexit_register_coroutine(rpc_server.stop)
loop.run_until_complete(rpc_server.wait_terminate())
if __name__ == "__main__":
main()

View File

@ -12,10 +12,11 @@ from quamash import QEventLoop
from sipyco.pc_rpc import AsyncioClient, Client
from sipyco.broadcast import Receiver
from sipyco import common_args
from sipyco.asyncio_tools import atexit_register_coroutine
from artiq import __version__ as artiq_version
from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version
from artiq.tools import atexit_register_coroutine, get_user_config_dir
from artiq.tools import get_user_config_dir
from artiq.gui.models import ModelSubscriber
from artiq.gui import state, log
from artiq.dashboard import (experiments, shortcuts, explorer,

View File

@ -1,251 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import asyncio
import atexit
import fnmatch
from functools import partial
import time
import numpy as np
import aiohttp
from sipyco import common_args
from sipyco.asyncio_tools import TaskObject
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import Server
from sipyco import pyon
from artiq.tools import atexit_register_coroutine
logger = logging.getLogger(__name__)
def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ data to InfluxDB bridge",
epilog="Pattern matching works as follows. "
"The default action on a key (dataset name) is to log it. "
"Then the patterns are traversed in order and glob-matched "
"with the key. "
"Optional + and - pattern prefixes specify to either ignore or "
"log keys matching the rest of the pattern. "
"Default (in the absence of prefix) is to ignore. Last matched "
"pattern takes precedence.")
master_group = parser.add_argument_group("master")
master_group.add_argument(
"--server-master", default="::1",
help="hostname or IP of the master to connect to")
master_group.add_argument(
"--port-master", default=3250, type=int,
help="TCP port to use to connect to the master (default: %(default)s")
master_group.add_argument(
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
database_group = parser.add_argument_group("database")
database_group.add_argument(
"--baseurl-db", default="http://localhost:8086",
help="base URL to access InfluxDB (default: %(default)s)")
database_group.add_argument(
"--user-db", default="", help="InfluxDB username")
database_group.add_argument(
"--password-db", default="", help="InfluxDB password")
database_group.add_argument(
"--database", default="db", help="database name to use")
database_group.add_argument(
"--table", default="lab", help="table name to use")
filter_group = parser.add_argument_group("filter")
filter_group.add_argument(
"--pattern-file", default="influxdb_patterns.cfg",
help="file to load the patterns from (default: %(default)s). "
"If the file is not found, no patterns are loaded "
"(everything is logged).")
common_args.simple_network_args(parser, [("control", "control", 3248)])
common_args.verbosity_args(parser)
return parser
def format_influxdb(v):
if np.issubdtype(type(v), np.bool_):
return "bool={}".format(v)
if np.issubdtype(type(v), np.integer):
return "int={}i".format(v)
if np.issubdtype(type(v), np.floating):
return "float={}".format(v)
if np.issubdtype(type(v), np.str_):
return "str=\"{}\"".format(v.replace('"', '\\"'))
return "pyon=\"{}\"".format(pyon.encode(v).replace('"', '\\"'))
class DBWriter(TaskObject):
def __init__(self, base_url, user, password, database, table):
self.base_url = base_url
self.user = user
self.password = password
self.database = database
self.table = table
self._queue = asyncio.Queue(100)
def update(self, k, v):
try:
self._queue.put_nowait((k, v, time.time()))
except asyncio.QueueFull:
logger.warning("failed to update dataset '%s': "
"too many pending updates", k)
async def _do(self):
async with aiohttp.ClientSession() as session:
while True:
k, v, t = await self._queue.get()
url = self.base_url + "/write"
params = {"u": self.user, "p": self.password,
"db": self.database, "precision": "ms"}
data = "{},dataset={} {} {}".format(
self.table, k, format_influxdb(v), round(t*1e3))
try:
response = await session.post(url, params=params,
data=data)
except Exception:
logger.warning("got exception trying to update '%s'",
k, exc_info=True)
else:
if response.status not in (200, 204):
content = (await response.content.read()).decode() \
.strip()
logger.warning("got HTTP status %d "
"trying to update '%s': %s",
response.status, k, content)
response.close()
class _Mock:
def __setitem__(self, k, v):
pass
def __getitem__(self, k):
return self
def __delitem__(self, k):
pass
def append(self, v):
pass
class Datasets:
def __init__(self, filter_function, writer, init):
self.filter_function = filter_function
self.writer = writer
def __setitem__(self, k, v):
if self.filter_function(k):
self.writer.update(k, v[1])
# ignore mutations
def __getitem__(self, k):
return _Mock()
# ignore deletions
def __delitem__(self, k):
pass
class MasterReader(TaskObject):
def __init__(self, server, port, retry, filter_function, writer):
self.server = server
self.port = port
self.retry = retry
self.filter_function = filter_function
self.writer = writer
async def _do(self):
subscriber = Subscriber(
"datasets",
partial(Datasets, self.filter_function, self.writer))
while True:
try:
await subscriber.connect(self.server, self.port)
try:
await asyncio.wait_for(subscriber.receive_task, None)
finally:
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry)
await asyncio.sleep(self.retry)
class Filter:
def __init__(self, pattern_file):
self.pattern_file = pattern_file
self.scan_patterns()
def scan_patterns(self):
"""(Re)load the patterns file."""
try:
with open(self.pattern_file, "r") as f:
self.patterns = []
for line in f:
line = line.rstrip()
if line:
self.patterns.append(line)
except FileNotFoundError:
logger.info("no pattern file found, logging everything")
self.patterns = []
# Privatize so that it is not shown in sipyco_rpctool list-methods.
def _filter(self, k):
take = "+"
for pattern in self.patterns:
sign = "-"
if pattern[0] in "+-":
sign, pattern = pattern[0], pattern[1:]
if fnmatch.fnmatchcase(k, pattern):
take = sign
return take == "+"
def get_patterns(self):
"""Show existing patterns."""
return self.patterns
def ping(self):
return True
def main():
args = get_argparser().parse_args()
common_args.init_logger_from_args(args)
loop = asyncio.get_event_loop()
atexit.register(loop.close)
writer = DBWriter(args.baseurl_db,
args.user_db, args.password_db,
args.database, args.table)
writer.start()
atexit_register_coroutine(writer.stop)
filter = Filter(args.pattern_file)
rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True)
loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args),
args.port_control))
atexit_register_coroutine(rpc_server.stop)
reader = MasterReader(args.server_master, args.port_master,
args.retry_master, filter._filter, writer)
reader.start()
atexit_register_coroutine(reader.stop)
loop.run_until_complete(rpc_server.wait_terminate())
if __name__ == "__main__":
main()

View File

@ -1,242 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import asyncio
import atexit
import time
import aiohttp
import numpy as np
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import Server
from sipyco import pyon
from sipyco import common_args
from sipyco.asyncio_tools import TaskObject
from artiq.tools import atexit_register_coroutine
logger = logging.getLogger(__name__)
def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ schedule InfluxDB logger bridge",
epilog="Listens to schedule updates on the master experiment schedule "
"and submits schedule additions and removals to the InfluxDB "
"database. Other schedule changes, such as transitions between "
"pipeline states (prepare, prepare_done, running, etc.) are "
"ignored. Typical high cardinality metadata is logged as "
"fields while low cardinality data is logged as tags. "
"The initially obtained complete state is logged as a 'clear' "
"entry followed by the addition of all entries.")
group = parser.add_argument_group("master")
group.add_argument(
"--server-master", default="::1",
help="hostname or IP of the master to connect to")
group.add_argument(
"--port-master", default=3250, type=int,
help="TCP port to use to connect to the master")
group.add_argument(
"--retry-master", default=5.0, type=float,
help="retry timer for reconnecting to master")
group = parser.add_argument_group("database")
group.add_argument(
"--baseurl-db", default="http://localhost:8086",
help="base URL to access InfluxDB (default: %(default)s)")
group.add_argument(
"--user-db", default="", help="InfluxDB username")
group.add_argument(
"--password-db", default="", help="InfluxDB password")
group.add_argument(
"--database", default="db", help="database name to use")
group.add_argument(
"--table", default="schedule", help="table name to use")
common_args.simple_network_args(parser, [("control", "control", 3275)])
common_args.verbosity_args(parser)
return parser
def format_influxdb(v, tag=True):
if np.issubdtype(type(v), np.bool_):
return "{}".format(v)
if np.issubdtype(type(v), np.integer):
return "{}i".format(v)
if np.issubdtype(type(v), np.floating):
return "{}".format(v)
if not np.issubdtype(type(v), np.str_):
v = pyon.encode(v)
if tag:
for i in ",= ":
v = v.replace(i, "\\" + i)
return v
else:
return "\"{}\"".format(v.replace('"', '\\"'))
class DBWriter(TaskObject):
def __init__(self, base_url, user, password, database, table):
self.base_url = base_url
self.user = user
self.password = password
self.database = database
self.table = table
self._queue = asyncio.Queue(100)
def update(self, fields, tags):
try:
self._queue.put_nowait((fields, tags, time.time()))
except asyncio.QueueFull:
logger.warning("failed to update schedule: "
"too many pending updates")
async def _do(self):
async with aiohttp.ClientSession() as session:
while True:
fields, tags, timestamp = await self._queue.get()
url = self.base_url + "/write"
params = {"u": self.user, "p": self.password,
"db": self.database, "precision": "ms"}
tags = ",".join("{}={}".format(
k, format_influxdb(v, tag=True))
for (k, v) in tags.items())
fields = ",".join("{}={}".format(
k, format_influxdb(v, tag=False))
for (k, v) in fields.items())
data = "{},{} {} {}".format(
self.table, tags, fields, round(timestamp*1e3))
try:
response = await session.post(
url, params=params, data=data)
except:
logger.warning("got exception trying to update schedule",
exc_info=True)
else:
if response.status not in (200, 204):
content = (
await response.content.read()).decode().strip()
logger.warning("got HTTP status %d "
"trying to update schedule: %s",
response.status, content)
response.close()
class Log(dict):
def __init__(self, writer):
self.writer = writer
def init(self, x):
self.clear()
self.update(x)
self.writer.update({"rid": -1}, {"status": "clear"})
for k, v in self.items():
self.notify_cb(dict(action="setitem", key=k, value=v))
return self
def notify_cb(self, mod):
if not mod.get("path"):
if mod["action"] == "setitem":
rid = mod["key"]
v = mod["value"]
logger.debug("added: %s: %s", rid, v)
self.writer.update(
fields={
"rid": rid,
"log_level": v["expid"]["log_level"],
"priority": v["priority"],
"due_date": v["due_date"] or -1.,
"arguments": v["expid"].get("arguments", {}),
"repo_rev": v["expid"]["repo_rev"],
"flush": v["flush"],
},
tags={
"status": "add",
"class_name": v["expid"]["class_name"],
"file": v["expid"]["file"],
"pipeline": v["pipeline"],
})
elif mod["action"] == "delitem":
rid = mod["key"]
logger.debug("removed: %s", rid)
self.writer.update({"rid": rid}, {"status": "remove"})
elif (mod["action"] == "setitem" and mod["key"] == "status"
and mod["value"] == "running"):
rid = mod["path"][0]
logger.debug("run: %s", rid)
def disconnect_cb(self):
logger.warn("disconnect")
class MasterReader(TaskObject):
def __init__(self, server, port, retry, writer):
self.server = server
self.port = port
self.retry = retry
self.writer = writer
async def _do(self):
subscriber = Subscriber(
"schedule",
target_builder=self.writer.init,
notify_cb=self.writer.notify_cb,
disconnect_cb=self.writer.disconnect_cb)
while True:
try:
await subscriber.connect(self.server, self.port)
try:
await asyncio.wait_for(subscriber.receive_task, None)
finally:
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
except Exception as e:
logger.exception(e)
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry)
await asyncio.sleep(self.retry)
class Logger:
def ping(self):
return True
def main():
args = get_argparser().parse_args()
common_args.init_logger_from_args(args)
loop = asyncio.get_event_loop()
atexit.register(loop.close)
writer = DBWriter(args.baseurl_db,
args.user_db, args.password_db,
args.database, args.table)
writer.start()
atexit_register_coroutine(writer.stop)
log = Log(writer)
server = Logger()
rpc_server = Server({"schedule_logger": server}, builtin_terminate=True)
loop.run_until_complete(rpc_server.start(
common_args.bind_address_from_args(args), args.port_control))
atexit_register_coroutine(rpc_server.stop)
reader = MasterReader(args.server_master, args.port_master,
args.retry_master, log)
reader.start()
atexit_register_coroutine(reader.stop)
loop.run_until_complete(rpc_server.wait_terminate())
if __name__ == "__main__":
main()

View File

@ -11,9 +11,9 @@ from sipyco.sync_struct import Publisher
from sipyco.logging_tools import Server as LoggingServer
from sipyco.broadcast import Broadcaster
from sipyco import common_args
from sipyco.asyncio_tools import atexit_register_coroutine
from artiq import __version__ as artiq_version
from artiq.tools import atexit_register_coroutine
from artiq.master.log import log_args, init_log
from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.scheduler import Scheduler

View File

@ -10,7 +10,9 @@ def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ session manager. "
"Automatically runs the master, dashboard and "
"local controller manager on the current machine.")
"local controller manager on the current machine. "
"The latter requires the artiq-comtools package to "
"be installed.")
parser.add_argument("--version", action="version",
version="ARTIQ v{}".format(artiq_version),
help="print the ARTIQ version number")
@ -28,7 +30,7 @@ def main():
master_cmd = [sys.executable, "-u", "-m", "artiq.frontend.artiq_master"]
dashboard_cmd = [sys.executable, "-m", "artiq.frontend.artiq_dashboard"]
ctlmgr_cmd = [sys.executable, "-m", "artiq.frontend.artiq_ctlmgr"]
ctlmgr_cmd = [sys.executable, "-m", "artiq_comtools.artiq_ctlmgr"]
master_cmd += args.m
dashboard_cmd += args.d
ctlmgr_cmd += args.c

View File

@ -1,265 +0,0 @@
import asyncio
import logging
import subprocess
import shlex
import socket
import os
from sipyco.sync_struct import Subscriber
from sipyco.pc_rpc import AsyncioClient
from sipyco.logging_tools import LogParser
from sipyco.asyncio_tools import TaskObject
from artiq.tools import Condition
logger = logging.getLogger(__name__)
class Controller:
def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1)
self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.term_timeout = ddb_entry.get("term_timeout", 30)
self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
self.process = None
self.launch_task = asyncio.ensure_future(self.launcher())
async def end(self):
self.launch_task.cancel()
await asyncio.wait_for(self.launch_task, None)
async def call(self, method, *args, **kwargs):
remote = AsyncioClient()
await remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
await remote.select_rpc_target(targets[0])
r = await getattr(remote, method)(*args, **kwargs)
finally:
remote.close_rpc()
return r
async def _ping(self):
try:
ok = await asyncio.wait_for(self.call("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
return ok
except:
return False
async def _wait_and_ping(self):
while True:
try:
await asyncio.wait_for(self.process.wait(),
self.ping_timer)
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
ok = await self._ping()
if not ok:
logger.warning("Controller %s ping failed", self.name)
await self._terminate()
return
else:
break
def _get_log_source(self):
return "controller({})".format(self.name)
async def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
self.name, self.command)
try:
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env, start_new_session=True)
asyncio.ensure_future(
LogParser(self._get_log_source).stream_task(
self.process.stdout))
asyncio.ensure_future(
LogParser(self._get_log_source).stream_task(
self.process.stderr))
await self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
await asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
await self._terminate()
async def _terminate(self):
if self.process is None or self.process.returncode is not None:
logger.info("Controller %s already terminated", self.name)
return
logger.debug("Terminating controller %s", self.name)
try:
await asyncio.wait_for(self.call("terminate"), self.term_timeout)
await asyncio.wait_for(self.process.wait(), self.term_timeout)
logger.info("Controller %s terminated", self.name)
return
except:
logger.warning("Controller %s did not exit on request, "
"ending the process", self.name)
if os.name != "nt":
try:
self.process.terminate()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(self.process.wait(), self.term_timeout)
logger.info("Controller process %s terminated", self.name)
return
except asyncio.TimeoutError:
logger.warning("Controller process %s did not terminate, "
"killing", self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(self.process.wait(), self.term_timeout)
logger.info("Controller process %s killed", self.name)
return
except asyncio.TimeoutError:
logger.warning("Controller process %s failed to die", self.name)
def get_ip_addresses(host):
try:
addrinfo = socket.getaddrinfo(host, None)
except:
return set()
return {info[4][0] for info in addrinfo}
class Controllers:
def __init__(self):
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict()
self.process_task = asyncio.ensure_future(self._process())
async def _process(self):
while True:
action, param = await self.queue.get()
if action == "set":
k, ddb_entry = param
if k in self.active:
await self.active[k].end()
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
await self.active[param].end()
del self.active[param]
self.queue.task_done()
if action not in ("set", "del"):
raise ValueError
def __setitem__(self, k, v):
try:
if (isinstance(v, dict) and v["type"] == "controller" and
self.host_filter in get_ip_addresses(v["host"]) and
"command" in v):
v["command"] = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, v)))
self.active_or_queued.add(k)
except:
logger.error("Failed to process device database entry %s", k,
exc_info=True)
def __delitem__(self, k):
if k in self.active_or_queued:
self.queue.put_nowait(("del", k))
self.active_or_queued.remove(k)
def delete_all(self):
for name in set(self.active_or_queued):
del self[name]
async def shutdown(self):
self.process_task.cancel()
for c in self.active.values():
await c.end()
class ControllerDB:
def __init__(self):
self.current_controllers = Controllers()
def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter
def sync_struct_init(self, init):
if self.current_controllers is not None:
self.current_controllers.delete_all()
for k, v in init.items():
self.current_controllers[k] = v
return self.current_controllers
class ControllerManager(TaskObject):
def __init__(self, server, port, retry_master):
self.server = server
self.port = port
self.retry_master = retry_master
self.controller_db = ControllerDB()
async def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
await subscriber.connect(self.server, self.port,
set_host_filter)
try:
await asyncio.wait_for(subscriber.receive_task, None)
finally:
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
await asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
await self.controller_db.current_controllers.shutdown()
def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry
now."""
self.controller_db.current_controllers.active[k].retry_now.notify()

View File

@ -4,10 +4,10 @@ from enum import Enum
from time import time
from sipyco.sync_struct import Notifier
from sipyco.asyncio_tools import TaskObject
from sipyco.asyncio_tools import TaskObject, Condition
from artiq.master.worker import Worker, log_worker_exception
from artiq.tools import asyncio_wait_or_cancel, Condition
from artiq.tools import asyncio_wait_or_cancel
logger = logging.getLogger(__name__)

View File

@ -1,85 +0,0 @@
import os
import sys
import unittest
import logging
import asyncio
from sipyco.pc_rpc import AsyncioClient
from artiq.master.ctlmgr import Controllers
from artiq.tools import expect_no_log_messages
logger = logging.getLogger(__name__)
class ControllerCase(unittest.TestCase):
def setUp(self):
if os.name == "nt":
self.loop = asyncio.ProactorEventLoop()
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.addCleanup(self.loop.close)
self.controllers = Controllers()
self.controllers.host_filter = "::1"
self.addCleanup(
self.loop.run_until_complete, self.controllers.shutdown())
async def start(self, name, entry):
self.controllers[name] = entry
await self.controllers.queue.join()
await self.wait_for_ping(entry["host"], entry["port"])
async def get_client(self, host, port):
remote = AsyncioClient()
await remote.connect_rpc(host, port, None)
targets, _ = remote.get_rpc_id()
await remote.select_rpc_target(targets[0])
self.addCleanup(remote.close_rpc)
return remote
async def wait_for_ping(self, host, port, retries=5, timeout=2):
dt = timeout/retries
while timeout > 0:
try:
remote = await self.get_client(host, port)
ok = await asyncio.wait_for(remote.ping(), dt)
if not ok:
raise ValueError("unexcepted ping() response from "
"controller: `{}`".format(ok))
return ok
except asyncio.TimeoutError:
timeout -= dt
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError):
await asyncio.sleep(dt)
timeout -= dt
raise asyncio.TimeoutError
def test_start_ping_stop_controller(self):
entry = {
"type": "controller",
"host": "::1",
"port": 1068,
"command": (sys.executable.replace("\\", "\\\\")
+ " -m artiq.frontend.aqctl_corelog "
+ "-p {port} --simulation foo")
}
async def test():
await self.start("corelog", entry)
remote = await self.get_client(entry["host"], entry["port"])
await remote.ping()
self.loop.run_until_complete(test())
def test_no_command_controller(self):
entry = {
"type": "controller",
"host": "::1",
"port": 1068
}
with expect_no_log_messages(logging.ERROR):
self.controllers["corelog"] = entry
self.assertTrue(self.controllers.queue.empty())

View File

@ -13,8 +13,8 @@ class TestFrontends(unittest.TestCase):
"corelog"
],
"artiq": [
"client", "compile", "coreanalyzer", "coremgmt", "ctlmgr",
"netboot", "flash", "influxdb", "master", "mkfs", "route",
"client", "compile", "coreanalyzer", "coremgmt",
"netboot", "flash", "master", "mkfs", "route",
"rtiomon", "run", "session"
]
}

View File

@ -1,7 +1,4 @@
import asyncio
import atexit
import collections
import contextlib
import importlib.machinery
import logging
import os
@ -19,9 +16,7 @@ from artiq.language.environment import is_experiment
__all__ = ["parse_arguments", "elide", "short_format", "file_import",
"get_experiment",
"UnexpectedLogMessageError", "expect_no_log_messages",
"atexit_register_coroutine", "exc_to_warning",
"asyncio_wait_or_cancel", "Condition",
"exc_to_warning", "asyncio_wait_or_cancel",
"get_windows_drives", "get_user_config_dir"]
@ -107,42 +102,6 @@ def get_experiment(module, class_name=None):
return exps[0][1]
class UnexpectedLogMessageError(Exception):
pass
class FailingLogHandler(logging.Handler):
def emit(self, record):
raise UnexpectedLogMessageError("Unexpected log message: '{}'".format(
record.getMessage()))
@contextlib.contextmanager
def expect_no_log_messages(level, logger=None):
"""Raise an UnexpectedLogMessageError if a log message of the given level
(or above) is emitted while the context is active.
Example: ::
with expect_no_log_messages(logging.ERROR):
do_stuff_that_should_not_log_errors()
"""
if logger is None:
logger = logging.getLogger()
handler = FailingLogHandler(level)
logger.addHandler(handler)
try:
yield
finally:
logger.removeHandler(handler)
def atexit_register_coroutine(coroutine, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
atexit.register(lambda: loop.run_until_complete(coroutine()))
async def exc_to_warning(coro):
try:
await coro
@ -165,29 +124,6 @@ async def asyncio_wait_or_cancel(fs, **kwargs):
return fs
class Condition:
def __init__(self, *, loop=None):
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()
self._waiters = collections.deque()
async def wait(self):
"""Wait until notified."""
fut = asyncio.Future(loop=self._loop)
self._waiters.append(fut)
try:
await fut
finally:
self._waiters.remove(fut)
def notify(self):
for fut in self._waiters:
if not fut.done():
fut.set_result(False)
def get_windows_drives():
from ctypes import windll

View File

@ -35,7 +35,7 @@ mock_modules = ["artiq.gui.waitingspinnerwidget",
"quamash", "pyqtgraph", "matplotlib",
"numpy", "dateutil", "dateutil.parser", "prettytable", "PyQt5",
"h5py", "serial", "scipy", "scipy.interpolate",
"llvmlite_artiq", "Levenshtein", "aiohttp", "pythonparser"]
"llvmlite_artiq", "Levenshtein", "pythonparser"]
for module in mock_modules:
sys.modules[module] = Mock()

View File

@ -58,6 +58,7 @@ Installing multiple packages and making them visible to the ARTIQ commands requi
(pkgs.python3.withPackages(ps: [
# List desired Python packages here.
artiq-full.artiq
artiq-full.artiq-comtools
# The board packages are also "Python" packages. You only need a board
# package if you intend to reflash that board (those packages contain
# only board firmware).

View File

@ -16,7 +16,7 @@ The master is a headless component, and one or several clients (command-line or
Controller manager
------------------
:ref:`Controller managers <frontend-artiq-ctlmgr>` are responsible for running and stopping controllers on a machine. There is one controller manager per network node that runs controllers.
Controller managers (started using the ``artiq_ctlmgr`` command that is part of the ``artiq-comtools`` package) are responsible for running and stopping controllers on a machine. There is one controller manager per network node that runs controllers.
A controller manager connects to the master and uses the device database to determine what controllers need to be run. Changes in the device database are tracked by the manager and controllers are started and stopped accordingly.
@ -158,16 +158,6 @@ artiq_master
:prog: artiq_master
.. _frontend-artiq-ctlmgr:
artiq_ctlmgr
------------
.. argparse::
:ref: artiq.frontend.artiq_ctlmgr.get_argparser
:prog: artiq_ctlmgr
.. _frontend-artiq-client:
artiq_client

View File

@ -131,17 +131,3 @@ DRTIO routing table manipulation tool
.. argparse::
:ref: artiq.frontend.artiq_route.get_argparser
:prog: artiq_route
Data to InfluxDB bridge
-----------------------
.. argparse::
:ref: artiq.frontend.artiq_influxdb.get_argparser
:prog: artiq_influxdb
Schedule to InfluxDB logging bridge
-----------------------------------
.. argparse::
:ref: artiq.frontend.artiq_influxdb_schedule.get_argparser
:prog: artiq_influxdb_schedule

View File

@ -9,6 +9,7 @@ CONDA_ENV_NAME = "artiq"
# The conda packages to download and install.
CONDA_PACKAGES = [
"artiq",
"artiq-comtools",
# Only install board packages if you plan to reflash the board.
# The two lines below are just examples and probably not what you want.
# Select the packages that correspond to your board, or remove them

View File

@ -14,7 +14,7 @@ if sys.version_info[:3] < (3, 5, 3):
requirements = [
"numpy", "scipy",
"python-dateutil", "prettytable", "h5py",
"quamash", "pyqtgraph", "pygit2", "aiohttp",
"quamash", "pyqtgraph", "pygit2",
"llvmlite_artiq", "pythonparser", "python-Levenshtein",
]
@ -23,11 +23,8 @@ console_scripts = [
"artiq_compile = artiq.frontend.artiq_compile:main",
"artiq_coreanalyzer = artiq.frontend.artiq_coreanalyzer:main",
"artiq_coremgmt = artiq.frontend.artiq_coremgmt:main",
"artiq_ctlmgr = artiq.frontend.artiq_ctlmgr:main",
"artiq_ddb_template = artiq.frontend.artiq_ddb_template:main",
"artiq_netboot = artiq.frontend.artiq_netboot:main",
"artiq_influxdb = artiq.frontend.artiq_influxdb:main",
"artiq_influxdb_schedule = artiq.frontend.artiq_influxdb_schedule:main",
"artiq_master = artiq.frontend.artiq_master:main",
"artiq_mkfs = artiq.frontend.artiq_mkfs:main",
"artiq_rtiomon = artiq.frontend.artiq_rtiomon:main",
@ -35,7 +32,6 @@ console_scripts = [
"artiq_route = artiq.frontend.artiq_route:main",
"artiq_run = artiq.frontend.artiq_run:main",
"artiq_flash = artiq.frontend.artiq_flash:main",
"aqctl_corelog = artiq.frontend.aqctl_corelog:main",
]