artiq/artiq/tools.py

201 lines
5.3 KiB
Python
Raw Permalink Normal View History

import asyncio
2021-12-21 13:20:11 +08:00
import importlib.util
import importlib.machinery
import inspect
import logging
import os
import pathlib
import string
import sys
2015-10-12 19:46:14 +08:00
import numpy as np
2019-11-10 15:55:17 +08:00
from sipyco import pyon
from artiq import __version__ as artiq_version
from artiq.appdirs import user_config_dir
from artiq.language.environment import is_public_experiment
from artiq.language import units
2019-11-10 15:55:17 +08:00
__all__ = ["parse_arguments",
"parse_devarg_override", "unparse_devarg_override",
"elide", "scale_from_metadata",
"short_format", "file_import",
2019-11-10 15:55:17 +08:00
"get_experiment",
2019-11-14 15:21:51 +08:00
"exc_to_warning", "asyncio_wait_or_cancel",
2016-07-19 00:16:56 +08:00
"get_windows_drives", "get_user_config_dir"]
logger = logging.getLogger(__name__)
def parse_arguments(arguments):
d = {}
for argument in arguments:
name, eq, value = argument.partition("=")
d[name] = pyon.decode(value)
return d
def parse_devarg_override(devarg_override):
devarg_override_dict = {}
for item in devarg_override.split():
device, _, override = item.partition(":")
if not override:
raise ValueError
if device not in devarg_override_dict:
devarg_override_dict[device] = {}
argument, _, value = override.partition("=")
if not value:
raise ValueError
devarg_override_dict[device][argument] = pyon.decode(value)
return devarg_override_dict
def unparse_devarg_override(devarg_override):
devarg_override_strs = [
"{}:{}={}".format(device, argument, pyon.encode(value))
for device, overrides in devarg_override.items()
for argument, value in overrides.items()]
return " ".join(devarg_override_strs)
def elide(s, maxlen):
elided = False
if len(s) > maxlen:
s = s[:maxlen]
elided = True
try:
idx = s.index("\n")
except ValueError:
pass
else:
s = s[:idx]
elided = True
if elided:
maxlen -= 3
if len(s) > maxlen:
s = s[:maxlen]
s += "..."
return s
def scale_from_metadata(metadata):
unit = metadata.get("unit", "")
default_scale = getattr(units, unit, 1)
return metadata.get("scale", default_scale)
def short_format(v, metadata={}):
m = metadata
unit = m.get("unit", "")
scale = scale_from_metadata(m)
precision = m.get("precision", None)
if v is None:
return "None"
t = type(v)
if np.issubdtype(t, np.number):
v_t = np.divide(v, scale)
v_str = np.format_float_positional(v_t,
precision=precision,
trim='-',
unique=True)
v_str += " " + unit if unit else ""
return v_str
elif np.issubdtype(t, np.bool_):
return str(v)
elif np.issubdtype(t, np.unicode_):
return "\"" + elide(v, 50) + "\""
elif t is np.ndarray:
v_t = np.divide(v, scale)
v_str = np.array2string(v_t,
max_line_width=1000,
precision=precision,
suppress_small=True,
separator=', ',
threshold=4,
edgeitems=2,
floatmode='maxprec')
v_str += " " + unit if unit else ""
return v_str
elif isinstance(v, (dict, list)):
r = t.__name__ + " ({})".format(len(v))
return r
def file_import(filename, prefix="file_import_"):
filename = pathlib.Path(filename)
modname = prefix + filename.stem
path = str(filename.resolve().parent)
sys.path.insert(0, path)
try:
spec = importlib.util.spec_from_loader(
modname,
importlib.machinery.SourceFileLoader(modname, str(filename)),
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
finally:
sys.path.remove(path)
return module
2015-01-28 21:44:15 +08:00
def get_experiment(module, class_name=None):
if class_name:
obj = module
for name in class_name.split('.'):
obj = getattr(obj, name)
return obj
exps = inspect.getmembers(module, is_public_experiment)
if not exps:
raise ValueError("No experiments in module")
if len(exps) > 1:
raise ValueError("More than one experiment found in module")
return exps[0][1]
2015-10-03 19:28:57 +08:00
async def exc_to_warning(coro):
try:
2015-10-03 19:28:57 +08:00
await coro
except:
logger.warning("asyncio coroutine terminated with exception",
exc_info=True)
2015-10-03 19:28:57 +08:00
async def asyncio_wait_or_cancel(fs, **kwargs):
2015-10-03 14:37:02 +08:00
fs = [asyncio.ensure_future(f) for f in fs]
2015-05-17 16:11:00 +08:00
try:
2015-10-03 19:28:57 +08:00
d, p = await asyncio.wait(fs, **kwargs)
2015-05-17 16:11:00 +08:00
except:
for f in fs:
f.cancel()
raise
for f in p:
f.cancel()
2015-10-03 19:28:57 +08:00
await asyncio.wait([f])
2015-05-17 16:11:00 +08:00
return fs
def get_windows_drives():
from ctypes import windll
drives = []
bitmask = windll.kernel32.GetLogicalDrives()
for letter in string.ascii_uppercase:
if bitmask & 1:
drives.append(letter)
bitmask >>= 1
return drives
def get_user_config_dir():
major = artiq_version.split(".")[0]
dir = user_config_dir("artiq", "m-labs", major)
os.makedirs(dir, exist_ok=True)
return dir