forked from M-Labs/artiq
354 lines
11 KiB
Python
354 lines
11 KiB
Python
from operator import itemgetter
|
|
import importlib.machinery
|
|
import linecache
|
|
import logging
|
|
import sys
|
|
import asyncio
|
|
import time
|
|
import collections
|
|
import os
|
|
import socket
|
|
import itertools
|
|
import atexit
|
|
import string
|
|
|
|
import numpy as np
|
|
|
|
from artiq.language.environment import is_experiment
|
|
from artiq.protocols import pyon
|
|
|
|
|
|
__all__ = ["parse_arguments", "elide", "short_format", "file_import",
|
|
"get_experiment", "verbosity_args", "simple_network_args", "init_logger",
|
|
"bind_address_from_args", "atexit_register_coroutine",
|
|
"exc_to_warning", "asyncio_wait_or_cancel",
|
|
"TaskObject", "Condition", "get_windows_drives"]
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def parse_arguments(arguments):
|
|
d = {}
|
|
for argument in arguments:
|
|
name, eq, value = argument.partition("=")
|
|
d[name] = pyon.decode(value)
|
|
return d
|
|
|
|
|
|
def elide(s, maxlen):
|
|
elided = False
|
|
if len(s) > maxlen:
|
|
s = s[:maxlen]
|
|
elided = True
|
|
try:
|
|
idx = s.index("\n")
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
s = s[:idx]
|
|
elided = True
|
|
if elided:
|
|
maxlen -= 3
|
|
if len(s) > maxlen:
|
|
s = s[:maxlen]
|
|
s += "..."
|
|
return s
|
|
|
|
|
|
def short_format(v):
|
|
if v is None:
|
|
return "None"
|
|
t = type(v)
|
|
if t is bool or np.issubdtype(t, int) or np.issubdtype(t, float):
|
|
return str(v)
|
|
elif t is str:
|
|
return "\"" + elide(v, 50) + "\""
|
|
else:
|
|
r = t.__name__
|
|
if t is list or t is dict or t is set:
|
|
r += " ({})".format(len(v))
|
|
if t is np.ndarray:
|
|
r += " " + str(np.shape(v))
|
|
return r
|
|
|
|
|
|
def file_import(filename, prefix="file_import_"):
|
|
linecache.checkcache(filename)
|
|
|
|
modname = filename
|
|
i = modname.rfind("/")
|
|
if i > 0:
|
|
modname = modname[i+1:]
|
|
i = modname.find(".")
|
|
if i > 0:
|
|
modname = modname[:i]
|
|
modname = prefix + modname
|
|
|
|
path = os.path.dirname(os.path.realpath(filename))
|
|
sys.path.insert(0, path)
|
|
try:
|
|
loader = importlib.machinery.SourceFileLoader(modname, filename)
|
|
module = loader.load_module()
|
|
finally:
|
|
sys.path.remove(path)
|
|
|
|
return module
|
|
|
|
|
|
def get_experiment(module, experiment=None):
|
|
if experiment:
|
|
return getattr(module, experiment)
|
|
|
|
exps = [(k, v) for k, v in module.__dict__.items()
|
|
if k[0] != "_" and is_experiment(v)]
|
|
if not exps:
|
|
raise ValueError("No experiments in module")
|
|
if len(exps) > 1:
|
|
raise ValueError("More than one experiment found in module")
|
|
return exps[0][1]
|
|
|
|
|
|
def verbosity_args(parser):
|
|
group = parser.add_argument_group("verbosity")
|
|
group.add_argument("-v", "--verbose", default=0, action="count",
|
|
help="increase logging level")
|
|
group.add_argument("-q", "--quiet", default=0, action="count",
|
|
help="decrease logging level")
|
|
|
|
|
|
def simple_network_args(parser, default_port):
|
|
group = parser.add_argument_group("network server")
|
|
group.add_argument(
|
|
"--bind", default=[], action="append",
|
|
help="add an hostname or IP address to bind to")
|
|
group.add_argument(
|
|
"--no-localhost-bind", default=False, action="store_true",
|
|
help="do not implicitly bind to localhost addresses")
|
|
if isinstance(default_port, int):
|
|
group.add_argument("-p", "--port", default=default_port, type=int,
|
|
help="TCP port to listen to (default: %(default)d)")
|
|
else:
|
|
for name, purpose, default in default_port:
|
|
h = ("TCP port to listen to for {} (default: {})"
|
|
.format(purpose, default))
|
|
group.add_argument("--port-" + name, default=default, type=int,
|
|
help=h)
|
|
|
|
class MultilineFormatter(logging.Formatter):
|
|
def __init__(self):
|
|
logging.Formatter.__init__(
|
|
self, "%(levelname)s:%(name)s:%(message)s")
|
|
|
|
def format(self, record):
|
|
r = logging.Formatter.format(self, record)
|
|
linebreaks = r.count("\n")
|
|
if linebreaks:
|
|
i = r.index(":")
|
|
r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:]
|
|
return r
|
|
|
|
|
|
def multiline_log_config(level):
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(level)
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(MultilineFormatter())
|
|
root_logger.addHandler(handler)
|
|
|
|
|
|
def init_logger(args):
|
|
multiline_log_config(level=logging.WARNING + args.quiet*10 - args.verbose*10)
|
|
|
|
|
|
def bind_address_from_args(args):
|
|
if args.no_localhost_bind:
|
|
return args.bind
|
|
else:
|
|
return ["127.0.0.1", "::1"] + args.bind
|
|
|
|
|
|
def atexit_register_coroutine(coroutine, loop=None):
|
|
if loop is None:
|
|
loop = asyncio.get_event_loop()
|
|
atexit.register(lambda: loop.run_until_complete(coroutine()))
|
|
|
|
|
|
async def exc_to_warning(coro):
|
|
try:
|
|
await coro
|
|
except:
|
|
logger.warning("asyncio coroutine terminated with exception",
|
|
exc_info=True)
|
|
|
|
|
|
async def asyncio_wait_or_cancel(fs, **kwargs):
|
|
fs = [asyncio.ensure_future(f) for f in fs]
|
|
try:
|
|
d, p = await asyncio.wait(fs, **kwargs)
|
|
except:
|
|
for f in fs:
|
|
f.cancel()
|
|
raise
|
|
for f in p:
|
|
f.cancel()
|
|
await asyncio.wait([f])
|
|
return fs
|
|
|
|
|
|
class TaskObject:
|
|
def start(self):
|
|
self.task = asyncio.ensure_future(self._do())
|
|
|
|
async def stop(self):
|
|
self.task.cancel()
|
|
try:
|
|
await asyncio.wait_for(self.task, None)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
del self.task
|
|
|
|
async def _do(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class Condition:
|
|
def __init__(self, *, loop=None):
|
|
if loop is not None:
|
|
self._loop = loop
|
|
else:
|
|
self._loop = asyncio.get_event_loop()
|
|
self._waiters = collections.deque()
|
|
|
|
async def wait(self):
|
|
"""Wait until notified."""
|
|
fut = asyncio.Future(loop=self._loop)
|
|
self._waiters.append(fut)
|
|
try:
|
|
await fut
|
|
finally:
|
|
self._waiters.remove(fut)
|
|
|
|
def notify(self):
|
|
for fut in self._waiters:
|
|
if not fut.done():
|
|
fut.set_result(False)
|
|
|
|
|
|
def get_windows_drives():
|
|
from ctypes import windll
|
|
|
|
drives = []
|
|
bitmask = windll.kernel32.GetLogicalDrives()
|
|
for letter in string.ascii_uppercase:
|
|
if bitmask & 1:
|
|
drives.append(letter)
|
|
bitmask >>= 1
|
|
return drives
|
|
|
|
if sys.version_info[:3] == (3, 5, 1):
|
|
# See https://github.com/m-labs/artiq/issues/253
|
|
@asyncio.coroutines.coroutine
|
|
def create_server(self, protocol_factory, host=None, port=None,
|
|
*,
|
|
family=socket.AF_UNSPEC,
|
|
flags=socket.AI_PASSIVE,
|
|
sock=None,
|
|
backlog=100,
|
|
ssl=None,
|
|
reuse_address=None,
|
|
reuse_port=None):
|
|
"""Create a TCP server.
|
|
The host parameter can be a string, in that case the TCP server is bound
|
|
to host and port.
|
|
The host parameter can also be a sequence of strings and in that case
|
|
the TCP server is bound to all hosts of the sequence. If a host
|
|
appears multiple times (possibly indirectly e.g. when hostnames
|
|
resolve to the same IP address), the server is only bound once to that
|
|
host.
|
|
Return a Server object which can be used to stop the service.
|
|
This method is a coroutine.
|
|
"""
|
|
if isinstance(ssl, bool):
|
|
raise TypeError('ssl argument must be an SSLContext or None')
|
|
if host is not None or port is not None:
|
|
if sock is not None:
|
|
raise ValueError(
|
|
'host/port and sock can not be specified at the same time')
|
|
|
|
AF_INET6 = getattr(socket, 'AF_INET6', 0)
|
|
if reuse_address is None:
|
|
reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
|
|
sockets = []
|
|
if host == '':
|
|
hosts = [None]
|
|
elif (isinstance(host, str) or
|
|
not isinstance(host, collections.Iterable)):
|
|
hosts = [host]
|
|
else:
|
|
hosts = host
|
|
|
|
fs = [self._create_server_getaddrinfo(host, port, family=family,
|
|
flags=flags)
|
|
for host in hosts]
|
|
infos = yield from asyncio.tasks.gather(*fs, loop=self)
|
|
infos = set(itertools.chain.from_iterable(infos))
|
|
|
|
completed = False
|
|
try:
|
|
for res in infos:
|
|
af, socktype, proto, canonname, sa = res
|
|
try:
|
|
sock = socket.socket(af, socktype, proto)
|
|
except socket.error:
|
|
# Assume it's a bad family/type/protocol combination.
|
|
if self._debug:
|
|
asyncio.log.logger.warning('create_server() failed to create '
|
|
'socket.socket(%r, %r, %r)',
|
|
af, socktype, proto, exc_info=True)
|
|
continue
|
|
sockets.append(sock)
|
|
if reuse_address:
|
|
sock.setsockopt(
|
|
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
|
if reuse_port:
|
|
if not hasattr(socket, 'SO_REUSEPORT'):
|
|
raise ValueError(
|
|
'reuse_port not supported by socket module')
|
|
else:
|
|
sock.setsockopt(
|
|
socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
|
|
# Disable IPv4/IPv6 dual stack support (enabled by
|
|
# default on Linux) which makes a single socket
|
|
# listen on both address families.
|
|
if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
|
|
sock.setsockopt(socket.IPPROTO_IPV6,
|
|
socket.IPV6_V6ONLY,
|
|
True)
|
|
try:
|
|
sock.bind(sa)
|
|
except OSError as err:
|
|
raise OSError(err.errno, 'error while attempting '
|
|
'to bind on address %r: %s'
|
|
% (sa, err.strerror.lower()))
|
|
completed = True
|
|
finally:
|
|
if not completed:
|
|
for sock in sockets:
|
|
sock.close()
|
|
else:
|
|
if sock is None:
|
|
raise ValueError('Neither host/port nor sock were specified')
|
|
sockets = [sock]
|
|
|
|
server = asyncio.base_events.Server(self, sockets)
|
|
for sock in sockets:
|
|
sock.listen(backlog)
|
|
sock.setblocking(False)
|
|
self._start_serving(protocol_factory, sock, ssl, server)
|
|
if self._debug:
|
|
asyncio.log.logger.info("%r is serving", server)
|
|
return server
|
|
|
|
asyncio.base_events.BaseEventLoop.create_server = create_server
|