artiq/artiq/tools.py

143 lines
3.2 KiB
Python
Raw Normal View History

import asyncio
import importlib.util
import inspect
import logging
import os
import pathlib
import string
import sys
2015-10-12 19:46:14 +08:00
import numpy as np
2019-11-10 15:55:17 +08:00
from sipyco import pyon
from artiq import __version__ as artiq_version
from artiq.appdirs import user_config_dir
from artiq.language.environment import is_public_experiment
2019-11-10 15:55:17 +08:00
__all__ = ["parse_arguments", "elide", "short_format", "file_import",
2019-11-10 15:55:17 +08:00
"get_experiment",
2019-11-14 15:21:51 +08:00
"exc_to_warning", "asyncio_wait_or_cancel",
2016-07-19 00:16:56 +08:00
"get_windows_drives", "get_user_config_dir"]
logger = logging.getLogger(__name__)
def parse_arguments(arguments):
d = {}
for argument in arguments:
name, eq, value = argument.partition("=")
d[name] = pyon.decode(value)
return d
def elide(s, maxlen):
elided = False
if len(s) > maxlen:
s = s[:maxlen]
elided = True
try:
idx = s.index("\n")
except ValueError:
pass
else:
s = s[:idx]
elided = True
if elided:
maxlen -= 3
if len(s) > maxlen:
s = s[:maxlen]
s += "..."
return s
def short_format(v):
if v is None:
return "None"
t = type(v)
if np.issubdtype(t, np.number) or np.issubdtype(t, np.bool_):
return str(v)
elif np.issubdtype(t, np.unicode_):
return "\"" + elide(v, 50) + "\""
else:
r = t.__name__
if t is list or t is dict or t is set:
r += " ({})".format(len(v))
if t is np.ndarray:
r += " " + str(np.shape(v))
return r
def file_import(filename, prefix="file_import_"):
filename = pathlib.Path(filename)
modname = prefix + filename.stem
path = str(filename.resolve().parent)
sys.path.insert(0, path)
try:
spec = importlib.util.spec_from_file_location(modname, filename)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
finally:
sys.path.remove(path)
return module
2015-01-28 21:44:15 +08:00
def get_experiment(module, class_name=None):
if class_name:
return getattr(module, class_name)
exps = inspect.getmembers(module, is_public_experiment)
if not exps:
raise ValueError("No experiments in module")
if len(exps) > 1:
raise ValueError("More than one experiment found in module")
return exps[0][1]
2015-10-03 19:28:57 +08:00
async def exc_to_warning(coro):
try:
2015-10-03 19:28:57 +08:00
await coro
except:
logger.warning("asyncio coroutine terminated with exception",
exc_info=True)
2015-10-03 19:28:57 +08:00
async def asyncio_wait_or_cancel(fs, **kwargs):
2015-10-03 14:37:02 +08:00
fs = [asyncio.ensure_future(f) for f in fs]
2015-05-17 16:11:00 +08:00
try:
2015-10-03 19:28:57 +08:00
d, p = await asyncio.wait(fs, **kwargs)
2015-05-17 16:11:00 +08:00
except:
for f in fs:
f.cancel()
raise
for f in p:
f.cancel()
2015-10-03 19:28:57 +08:00
await asyncio.wait([f])
2015-05-17 16:11:00 +08:00
return fs
def get_windows_drives():
from ctypes import windll
drives = []
bitmask = windll.kernel32.GetLogicalDrives()
for letter in string.ascii_uppercase:
if bitmask & 1:
drives.append(letter)
bitmask >>= 1
return drives
def get_user_config_dir():
major = artiq_version.split(".")[0]
dir = user_config_dir("artiq", "m-labs", major)
os.makedirs(dir, exist_ok=True)
return dir