artiq/artiq/frontend/artiq_client.py

408 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Client to send commands to :mod:`artiq_master` and display results locally.
The client can perform actions such as accessing/setting datasets,
scanning devices, scheduling experiments, and looking for experiments/devices.
"""
import argparse
import logging
import time
import asyncio
import sys
import os
from operator import itemgetter
from dateutil.parser import parse as parse_date
import numpy as np
from prettytable import PrettyTable
from sipyco.pc_rpc import Client
from sipyco.sync_struct import Subscriber
from sipyco.broadcast import Receiver
from sipyco import common_args, pyon
from sipyco.asyncio_tools import SignalHandler
from artiq.tools import (scale_from_metadata, short_format, parse_arguments,
parse_devarg_override)
from artiq import __version__ as artiq_version
def clear_screen():
if os.name == "nt":
os.system("cls")
else:
sys.stdout.write("\x1b[2J\x1b[H")
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ CLI client")
parser.add_argument(
"-s", "--server", default="::1",
help="hostname or IP of the master to connect to")
parser.add_argument(
"--port", default=None, type=int,
help="TCP port to use to connect to the master")
parser.add_argument("--version", action="version",
version="ARTIQ v{}".format(artiq_version),
help="print the ARTIQ version number")
subparsers = parser.add_subparsers(dest="action")
subparsers.required = True
parser_add = subparsers.add_parser("submit", help="submit an experiment")
parser_add.add_argument("-p", "--pipeline", default="main", type=str,
help="pipeline to run the experiment in "
"(default: %(default)s)")
parser_add.add_argument("-P", "--priority", default=0, type=int,
help="priority (higher value means sooner "
"scheduling, default: %(default)s)")
parser_add.add_argument("-t", "--timed", default=None, type=str,
help="set a due date for the experiment")
parser_add.add_argument("-f", "--flush", default=False,
action="store_true",
help="flush the pipeline before preparing "
"the experiment")
parser_add.add_argument("-R", "--repository", default=False,
action="store_true",
help="use the experiment repository")
parser_add.add_argument("-r", "--revision", default=None,
help="use a specific repository revision "
"(defaults to head, ignored without -R)")
parser_add.add_argument("--devarg-override", default="",
help="specify device arguments to override")
parser_add.add_argument("--content", default=False,
action="store_true",
help="submit by content")
parser_add.add_argument("-c", "--class-name", default=None,
help="name of the class to run")
parser_add.add_argument("file", metavar="FILE",
help="file containing the experiment to run")
parser_add.add_argument("arguments", metavar="ARGUMENTS", nargs="*",
help="run arguments")
parser_delete = subparsers.add_parser("delete",
help="delete an experiment "
"from the schedule")
parser_delete.add_argument("-g", action="store_true",
help="request graceful termination")
parser_delete.add_argument("rid", metavar="RID", type=int,
help="run identifier (RID)")
parser_set_dataset = subparsers.add_parser(
"set-dataset", help="add or modify a dataset")
parser_set_dataset.add_argument("name", metavar="NAME",
help="name of the dataset")
parser_set_dataset.add_argument("value", metavar="VALUE",
help="value in PYON format")
parser_set_dataset.add_argument("--unit", default=None, type=str,
help="physical unit of the dataset")
parser_set_dataset.add_argument("--scale", default=None, type=float,
help="factor to multiply value of dataset in displays")
parser_set_dataset.add_argument("--precision", default=None, type=int,
help="maximum number of decimals to print in displays")
persist_group = parser_set_dataset.add_mutually_exclusive_group()
persist_group.add_argument("-p", "--persist", action="store_true",
help="make the dataset persistent")
persist_group.add_argument("-n", "--no-persist", action="store_true",
help="make the dataset non-persistent")
parser_del_dataset = subparsers.add_parser(
"del-dataset", help="delete a dataset")
parser_del_dataset.add_argument("name", help="name of the dataset")
parser_supply_interactive = subparsers.add_parser(
"supply-interactive", help="supply interactive arguments")
parser_supply_interactive.add_argument(
"rid", metavar="RID", type=int, help="RID of target experiment")
parser_supply_interactive.add_argument(
"arguments", metavar="ARGUMENTS", nargs="*",
help="interactive arguments")
parser_cancel_interactive = subparsers.add_parser(
"cancel-interactive", help="cancel interactive arguments")
parser_cancel_interactive.add_argument(
"rid", metavar="RID", type=int, help="RID of target experiment")
parser_show = subparsers.add_parser(
"show", help="show schedule, log, devices or datasets")
parser_show.add_argument(
"what", metavar="WHAT",
choices=["schedule", "log", "ccb", "devices", "datasets",
"interactive-args"],
help="select object to show: %(choices)s")
subparsers.add_parser(
"scan-devices", help="trigger a device database (re)scan")
parser_scan_repos = subparsers.add_parser(
"scan-repository", help="trigger a repository (re)scan")
parser_scan_repos.add_argument("--async", action="store_true",
help="trigger scan and return immediately")
parser_scan_repos.add_argument("revision", metavar="REVISION",
default=None, nargs="?",
help="use a specific repository revision "
"(defaults to head)")
parser_ls = subparsers.add_parser(
"ls", help="list a directory on the master")
parser_ls.add_argument("directory", default="", nargs="?")
subparsers.add_parser("terminate", help="terminate the ARTIQ master")
common_args.verbosity_args(parser)
return parser
def _action_submit(remote, args):
try:
arguments = parse_arguments(args.arguments)
except Exception as err:
raise ValueError("Failed to parse run arguments") from err
expid = {
"devarg_override": parse_devarg_override(args.devarg_override),
"log_level": logging.WARNING + args.quiet*10 - args.verbose*10,
"class_name": args.class_name,
"arguments": arguments,
}
if args.content:
with open(args.file, "r") as f:
expid["content"] = f.read()
if args.repository:
raise ValueError("Repository cannot be used when submitting by content")
else:
expid["file"] = args.file
if args.repository:
expid["repo_rev"] = args.revision
if args.timed is None:
due_date = None
else:
due_date = time.mktime(parse_date(args.timed).timetuple())
rid = remote.submit(args.pipeline, expid,
args.priority, due_date, args.flush)
print("RID: {}".format(rid))
def _action_delete(remote, args):
if args.g:
remote.request_termination(args.rid)
else:
remote.delete(args.rid)
def _action_set_dataset(remote, args):
persist = None
if args.persist:
persist = True
if args.no_persist:
persist = False
metadata = {}
if args.unit is not None:
metadata["unit"] = args.unit
if args.scale is not None:
metadata["scale"] = args.scale
if args.precision is not None:
metadata["precision"] = args.precision
scale = scale_from_metadata(metadata)
value = pyon.decode(args.value)
t = type(value)
if np.issubdtype(t, np.number) or t is np.ndarray:
value = value * scale
remote.set(args.name, value, persist, metadata)
def _action_del_dataset(remote, args):
remote.delete(args.name)
def _action_scan_devices(remote, args):
remote.scan()
def _action_supply_interactive(remote, args):
arguments = parse_arguments(args.arguments)
remote.supply(args.rid, arguments)
def _action_cancel_interactive(remote, args):
remote.cancel(args.rid)
def _action_scan_repository(remote, args):
if getattr(args, "async"):
remote.scan_repository_async(args.revision)
else:
remote.scan_repository(args.revision)
def _action_ls(remote, args):
contents = remote.list_directory(args.directory)
for name in sorted(contents, key=lambda x: (x[-1] not in "\\/", x)):
print(name)
def _action_terminate(remote, _args):
remote.terminate()
def _show_schedule(schedule):
clear_screen()
if schedule:
sorted_schedule = sorted(schedule.items(),
key=lambda x: (-x[1]["priority"],
x[1]["due_date"] or 0,
x[0]))
table = PrettyTable(["RID", "Pipeline", " Status ", "Prio",
"Due date", "Revision", "File", "Class name"])
for rid, v in sorted_schedule:
row = [rid, v["pipeline"], v["status"], v["priority"]]
if v["due_date"] is None:
row.append("")
else:
row.append(time.strftime("%m/%d %H:%M:%S",
time.localtime(v["due_date"])))
expid = v["expid"]
if "repo_rev" in expid:
row.append(expid["repo_rev"])
else:
row.append("Outside repo.")
row.append(expid.get("file", "<none>"))
if expid["class_name"] is None:
row.append("")
else:
row.append(expid["class_name"])
table.add_row(row)
print(table)
else:
print("Schedule is empty")
def _show_devices(devices):
clear_screen()
table = PrettyTable(["Name", "Description"])
table.align["Description"] = "l"
for k, v in sorted(devices.items(), key=itemgetter(0)):
table.add_row([k, pyon.encode(v, True)])
print(table)
def _show_datasets(datasets):
clear_screen()
table = PrettyTable(["Dataset", "Persistent", "Value"])
for k, (persist, value, metadata) in sorted(datasets.items(), key=itemgetter(0)):
table.add_row([k, "Y" if persist else "N", short_format(value, metadata)])
print(table)
def _show_interactive_args(interactive_args):
clear_screen()
table = PrettyTable(["RID", "Title", "Key", "Type", "Group", "Tooltip"])
for rid, input_request in sorted(interactive_args.items(), key=itemgetter(0)):
title = input_request["title"]
for key, procdesc, group, tooltip in input_request["arglist_desc"]:
table.add_row([rid, title, key, procdesc["ty"], group, tooltip])
print(table)
def _run_subscriber(host, port, subscriber):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
signal_handler = SignalHandler()
signal_handler.setup()
try:
loop.run_until_complete(subscriber.connect(host, port))
try:
_, pending = loop.run_until_complete(asyncio.wait(
[loop.create_task(signal_handler.wait_terminate()), subscriber.receive_task],
return_when=asyncio.FIRST_COMPLETED))
for task in pending:
task.cancel()
finally:
loop.run_until_complete(subscriber.close())
finally:
signal_handler.teardown()
finally:
loop.close()
def _show_dict(args, notifier_name, display_fun):
d = dict()
def init_d(x):
d.clear()
d.update(x)
return d
subscriber = Subscriber(notifier_name, init_d,
lambda mod: display_fun(d))
port = 3250 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
def _print_log_record(record):
level, source, t, message = record
t = time.strftime("%m/%d %H:%M:%S", time.localtime(t))
print(level, source, t, message)
def _show_log(args):
subscriber = Receiver("log", [_print_log_record])
port = 1067 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
def _show_ccb(args):
subscriber = Receiver("ccb", [
lambda d: print(d["service"],
"args:", d["args"],
"kwargs:", d["kwargs"])
])
port = 1067 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
def main():
args = get_argparser().parse_args()
action = args.action.replace("-", "_")
if action == "show":
if args.what == "schedule":
_show_dict(args, "schedule", _show_schedule)
elif args.what == "log":
_show_log(args)
elif args.what == "ccb":
_show_ccb(args)
elif args.what == "devices":
_show_dict(args, "devices", _show_devices)
elif args.what == "datasets":
_show_dict(args, "datasets", _show_datasets)
elif args.what == "interactive-args":
_show_dict(args, "interactive_args", _show_interactive_args)
else:
raise ValueError
else:
port = 3251 if args.port is None else args.port
target_name = {
"submit": "schedule",
"delete": "schedule",
"set_dataset": "dataset_db",
"del_dataset": "dataset_db",
"scan_devices": "device_db",
"supply_interactive": "interactive_arg_db",
"cancel_interactive": "interactive_arg_db",
"scan_repository": "experiment_db",
"ls": "experiment_db",
"terminate": "master_management",
}[action]
remote = Client(args.server, port, target_name)
try:
globals()["_action_" + action](remote, args)
finally:
remote.close_rpc()
if __name__ == "__main__":
main()