forked from M-Labs/artiq
201 lines
5.3 KiB
Python
201 lines
5.3 KiB
Python
import asyncio
|
|
import importlib.util
|
|
import importlib.machinery
|
|
import inspect
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import string
|
|
import sys
|
|
|
|
import numpy as np
|
|
|
|
from sipyco import pyon
|
|
|
|
from artiq import __version__ as artiq_version
|
|
from artiq.appdirs import user_config_dir
|
|
from artiq.language.environment import is_public_experiment
|
|
from artiq.language import units
|
|
|
|
|
|
__all__ = ["parse_arguments",
|
|
"parse_devarg_override", "unparse_devarg_override",
|
|
"elide", "scale_from_metadata",
|
|
"short_format", "file_import",
|
|
"get_experiment",
|
|
"exc_to_warning", "asyncio_wait_or_cancel",
|
|
"get_windows_drives", "get_user_config_dir"]
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_arguments(arguments):
|
|
d = {}
|
|
for argument in arguments:
|
|
name, eq, value = argument.partition("=")
|
|
d[name] = pyon.decode(value)
|
|
return d
|
|
|
|
|
|
def parse_devarg_override(devarg_override):
|
|
devarg_override_dict = {}
|
|
for item in devarg_override.split():
|
|
device, _, override = item.partition(":")
|
|
if not override:
|
|
raise ValueError
|
|
if device not in devarg_override_dict:
|
|
devarg_override_dict[device] = {}
|
|
argument, _, value = override.partition("=")
|
|
if not value:
|
|
raise ValueError
|
|
devarg_override_dict[device][argument] = pyon.decode(value)
|
|
return devarg_override_dict
|
|
|
|
|
|
def unparse_devarg_override(devarg_override):
|
|
devarg_override_strs = [
|
|
"{}:{}={}".format(device, argument, pyon.encode(value))
|
|
for device, overrides in devarg_override.items()
|
|
for argument, value in overrides.items()]
|
|
return " ".join(devarg_override_strs)
|
|
|
|
|
|
def elide(s, maxlen):
|
|
elided = False
|
|
if len(s) > maxlen:
|
|
s = s[:maxlen]
|
|
elided = True
|
|
try:
|
|
idx = s.index("\n")
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
s = s[:idx]
|
|
elided = True
|
|
if elided:
|
|
maxlen -= 3
|
|
if len(s) > maxlen:
|
|
s = s[:maxlen]
|
|
s += "..."
|
|
return s
|
|
|
|
def scale_from_metadata(metadata):
|
|
unit = metadata.get("unit", "")
|
|
default_scale = getattr(units, unit, 1)
|
|
return metadata.get("scale", default_scale)
|
|
|
|
def short_format(v, metadata={}):
|
|
m = metadata
|
|
unit = m.get("unit", "")
|
|
scale = scale_from_metadata(m)
|
|
precision = m.get("precision", None)
|
|
if v is None:
|
|
return "None"
|
|
t = type(v)
|
|
if np.issubdtype(t, np.number):
|
|
v_t = np.divide(v, scale)
|
|
v_str = np.format_float_positional(v_t,
|
|
precision=precision,
|
|
trim='-',
|
|
unique=True)
|
|
v_str += " " + unit if unit else ""
|
|
return v_str
|
|
elif np.issubdtype(t, np.bool_):
|
|
return str(v)
|
|
elif np.issubdtype(t, np.unicode_):
|
|
return "\"" + elide(v, 50) + "\""
|
|
elif t is np.ndarray:
|
|
v_t = np.divide(v, scale)
|
|
v_str = np.array2string(v_t,
|
|
max_line_width=1000,
|
|
precision=precision,
|
|
suppress_small=True,
|
|
separator=', ',
|
|
threshold=4,
|
|
edgeitems=2,
|
|
floatmode='maxprec')
|
|
v_str += " " + unit if unit else ""
|
|
return v_str
|
|
elif isinstance(v, (dict, list)):
|
|
r = t.__name__ + " ({})".format(len(v))
|
|
return r
|
|
|
|
|
|
def file_import(filename, prefix="file_import_"):
|
|
filename = pathlib.Path(filename)
|
|
modname = prefix + filename.stem
|
|
|
|
path = str(filename.resolve().parent)
|
|
sys.path.insert(0, path)
|
|
|
|
try:
|
|
spec = importlib.util.spec_from_loader(
|
|
modname,
|
|
importlib.machinery.SourceFileLoader(modname, str(filename)),
|
|
)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
finally:
|
|
sys.path.remove(path)
|
|
|
|
return module
|
|
|
|
|
|
def get_experiment(module, class_name=None):
|
|
if class_name:
|
|
obj = module
|
|
for name in class_name.split('.'):
|
|
obj = getattr(obj, name)
|
|
return obj
|
|
|
|
exps = inspect.getmembers(module, is_public_experiment)
|
|
|
|
if not exps:
|
|
raise ValueError("No experiments in module")
|
|
if len(exps) > 1:
|
|
raise ValueError("More than one experiment found in module")
|
|
|
|
return exps[0][1]
|
|
|
|
|
|
async def exc_to_warning(coro):
|
|
try:
|
|
await coro
|
|
except:
|
|
logger.warning("asyncio coroutine terminated with exception",
|
|
exc_info=True)
|
|
|
|
|
|
async def asyncio_wait_or_cancel(fs, **kwargs):
|
|
fs = [asyncio.ensure_future(f) for f in fs]
|
|
try:
|
|
d, p = await asyncio.wait(fs, **kwargs)
|
|
except:
|
|
for f in fs:
|
|
f.cancel()
|
|
raise
|
|
for f in p:
|
|
f.cancel()
|
|
await asyncio.wait([f])
|
|
return fs
|
|
|
|
|
|
def get_windows_drives():
|
|
from ctypes import windll
|
|
|
|
drives = []
|
|
bitmask = windll.kernel32.GetLogicalDrives()
|
|
for letter in string.ascii_uppercase:
|
|
if bitmask & 1:
|
|
drives.append(letter)
|
|
bitmask >>= 1
|
|
return drives
|
|
|
|
|
|
def get_user_config_dir():
|
|
major = artiq_version.split(".")[0]
|
|
dir = user_config_dir("artiq", "m-labs", major)
|
|
os.makedirs(dir, exist_ok=True)
|
|
return dir
|