From 3fd6962bd2b34707679e313b8502a8bd557b7dc1 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 10 Nov 2019 15:55:17 +0800 Subject: [PATCH] use sipyco (#585) --- artiq/applets/simple.py | 6 +- artiq/browser/datasets.py | 3 +- artiq/browser/experiments.py | 3 +- artiq/browser/files.py | 3 +- artiq/dashboard/experiments.py | 3 +- artiq/dashboard/moninj.py | 3 +- artiq/examples/artiq_ipython_notebook.ipynb | 4 +- .../repository/remote_exec_demo.py | 3 +- artiq/examples/remote_exec_controller.py | 2 +- artiq/frontend/aqctl_corelog.py | 15 +- artiq/frontend/artiq_client.py | 16 +- artiq/frontend/artiq_compile.py | 6 +- artiq/frontend/artiq_coreanalyzer.py | 7 +- artiq/frontend/artiq_coremgmt.py | 7 +- artiq/frontend/artiq_ctlmgr.py | 15 +- artiq/frontend/artiq_dashboard.py | 11 +- artiq/frontend/artiq_flash.py | 7 +- artiq/frontend/artiq_influxdb.py | 24 +- artiq/frontend/artiq_influxdb_schedule.py | 21 +- artiq/frontend/artiq_master.py | 17 +- artiq/frontend/artiq_rpctool.py | 121 ---- artiq/frontend/artiq_run.py | 6 +- artiq/gui/applets.py | 7 +- artiq/gui/log.py | 2 +- artiq/gui/models.py | 2 +- artiq/gui/state.py | 4 +- artiq/language/environment.py | 3 +- artiq/master/ctlmgr.py | 10 +- artiq/master/databases.py | 6 +- artiq/master/experiments.py | 3 +- artiq/master/log.py | 2 +- artiq/master/scheduler.py | 6 +- artiq/master/worker.py | 7 +- artiq/master/worker_db.py | 4 +- artiq/master/worker_impl.py | 8 +- artiq/monkey_patches.py | 37 -- artiq/protocols/asyncio_server.py | 54 -- artiq/protocols/broadcast.py | 109 --- artiq/protocols/logging.py | 187 ------ artiq/protocols/packed_exceptions.py | 42 -- artiq/protocols/pc_rpc.py | 625 ------------------ artiq/protocols/pipe_ipc.py | 218 ------ artiq/protocols/pyon.py | 227 ------- artiq/protocols/remote_exec.py | 116 ---- artiq/protocols/sync_struct.py | 328 --------- artiq/test/hardware_testbench.py | 79 --- artiq/test/test_ctlmgr.py | 4 +- artiq/test/test_datasets.py | 3 +- artiq/test/test_fire_and_forget.py | 17 + artiq/test/test_frontends.py | 2 +- artiq/test/test_pc_rpc.py | 166 ----- artiq/test/test_pipe_ipc.py | 80 --- artiq/test/test_rpctool.py | 39 -- artiq/test/test_serialization.py | 56 -- artiq/test/test_sync_struct.py | 72 -- artiq/tools.py | 108 +-- doc/manual/developing_a_ndsp.rst | 38 +- doc/manual/environment.rst | 4 +- doc/manual/index.rst | 1 - doc/manual/protocols_reference.rst | 43 -- doc/manual/utilities.rst | 65 -- setup.py | 1 - 62 files changed, 188 insertions(+), 2900 deletions(-) delete mode 100755 artiq/frontend/artiq_rpctool.py delete mode 100644 artiq/monkey_patches.py delete mode 100644 artiq/protocols/asyncio_server.py delete mode 100644 artiq/protocols/broadcast.py delete mode 100644 artiq/protocols/logging.py delete mode 100644 artiq/protocols/packed_exceptions.py delete mode 100644 artiq/protocols/pc_rpc.py delete mode 100644 artiq/protocols/pipe_ipc.py delete mode 100644 artiq/protocols/pyon.py delete mode 100644 artiq/protocols/remote_exec.py delete mode 100644 artiq/protocols/sync_struct.py create mode 100644 artiq/test/test_fire_and_forget.py delete mode 100644 artiq/test/test_pc_rpc.py delete mode 100644 artiq/test/test_pipe_ipc.py delete mode 100644 artiq/test/test_rpctool.py delete mode 100644 artiq/test/test_serialization.py delete mode 100644 artiq/test/test_sync_struct.py delete mode 100644 doc/manual/protocols_reference.rst diff --git a/artiq/applets/simple.py b/artiq/applets/simple.py index c05d86384..71c2d34fd 100644 --- a/artiq/applets/simple.py +++ b/artiq/applets/simple.py @@ -6,9 +6,9 @@ import string from quamash import QEventLoop, QtWidgets, QtCore -from artiq.protocols.sync_struct import Subscriber, process_mod -from artiq.protocols import pyon -from artiq.protocols.pipe_ipc import AsyncioChildComm +from sipyco.sync_struct import Subscriber, process_mod +from sipyco import pyon +from sipyco.pipe_ipc import AsyncioChildComm logger = logging.getLogger(__name__) diff --git a/artiq/browser/datasets.py b/artiq/browser/datasets.py index 4f3e9e181..b66b18216 100644 --- a/artiq/browser/datasets.py +++ b/artiq/browser/datasets.py @@ -3,10 +3,11 @@ import asyncio from PyQt5 import QtCore, QtWidgets +from sipyco.pc_rpc import AsyncioClient as RPCClient + from artiq.tools import short_format from artiq.gui.tools import LayoutWidget, QRecursiveFilterProxyModel from artiq.gui.models import DictSyncTreeSepModel -from artiq.protocols.pc_rpc import AsyncioClient as RPCClient # reduced read-only version of artiq.dashboard.datasets diff --git a/artiq/browser/experiments.py b/artiq/browser/experiments.py index da501e038..6b580a7ed 100644 --- a/artiq/browser/experiments.py +++ b/artiq/browser/experiments.py @@ -7,10 +7,11 @@ from collections import OrderedDict from PyQt5 import QtCore, QtGui, QtWidgets import h5py +from sipyco import pyon + from artiq import __artiq_dir__ as artiq_dir from artiq.gui.tools import LayoutWidget, log_level_to_name, get_open_file_name from artiq.gui.entries import procdesc_to_entry -from artiq.protocols import pyon from artiq.master.worker import Worker, log_worker_exception logger = logging.getLogger(__name__) diff --git a/artiq/browser/files.py b/artiq/browser/files.py index 1dfbac28e..ea36777d2 100644 --- a/artiq/browser/files.py +++ b/artiq/browser/files.py @@ -5,7 +5,8 @@ from datetime import datetime import h5py from PyQt5 import QtCore, QtWidgets, QtGui -from artiq.protocols import pyon +from sipyco import pyon + logger = logging.getLogger(__name__) diff --git a/artiq/dashboard/experiments.py b/artiq/dashboard/experiments.py index 1b6637f39..57aaf6af6 100644 --- a/artiq/dashboard/experiments.py +++ b/artiq/dashboard/experiments.py @@ -7,9 +7,10 @@ from collections import OrderedDict from PyQt5 import QtCore, QtGui, QtWidgets import h5py +from sipyco import pyon + from artiq.gui.tools import LayoutWidget, log_level_to_name, get_open_file_name from artiq.gui.entries import procdesc_to_entry, ScanEntry -from artiq.protocols import pyon logger = logging.getLogger(__name__) diff --git a/artiq/dashboard/moninj.py b/artiq/dashboard/moninj.py index 5de911826..cea227cb0 100644 --- a/artiq/dashboard/moninj.py +++ b/artiq/dashboard/moninj.py @@ -4,7 +4,8 @@ from collections import namedtuple from PyQt5 import QtCore, QtWidgets, QtGui -from artiq.protocols.sync_struct import Subscriber +from sipyco.sync_struct import Subscriber + from artiq.coredevice.comm_moninj import * from artiq.gui.tools import LayoutWidget from artiq.gui.flowlayout import FlowLayout diff --git a/artiq/examples/artiq_ipython_notebook.ipynb b/artiq/examples/artiq_ipython_notebook.ipynb index 6bab291fe..50b0b0905 100644 --- a/artiq/examples/artiq_ipython_notebook.ipynb +++ b/artiq/examples/artiq_ipython_notebook.ipynb @@ -34,8 +34,8 @@ "import pandas as pd\n", "import h5py\n", "\n", - "from artiq.protocols.pc_rpc import (Client, AsyncioClient,\n", - " BestEffortClient, AutoTarget)\n", + "from sipyco.pc_rpc import (Client, AsyncioClient,\n", + " BestEffortClient, AutoTarget)\n", "from artiq.master.databases import DeviceDB\n", "from artiq.master.worker_db import DeviceManager" ] diff --git a/artiq/examples/no_hardware/repository/remote_exec_demo.py b/artiq/examples/no_hardware/repository/remote_exec_demo.py index c347d43be..f7998bd1d 100644 --- a/artiq/examples/no_hardware/repository/remote_exec_demo.py +++ b/artiq/examples/no_hardware/repository/remote_exec_demo.py @@ -1,8 +1,9 @@ import time import inspect +from sipyco.remote_exec import connect_global_rpc + from artiq.experiment import * -from artiq.protocols.remote_exec import connect_global_rpc import remote_exec_processing diff --git a/artiq/examples/remote_exec_controller.py b/artiq/examples/remote_exec_controller.py index 421202414..7c97a09c6 100755 --- a/artiq/examples/remote_exec_controller.py +++ b/artiq/examples/remote_exec_controller.py @@ -4,7 +4,7 @@ import numpy as np from numba import jit import logging -from artiq.protocols.remote_exec import simple_rexec_server_loop +from sipyco.remote_exec import simple_rexec_server_loop @jit(nopython=True) diff --git a/artiq/frontend/aqctl_corelog.py b/artiq/frontend/aqctl_corelog.py index a59998160..b0b396efd 100755 --- a/artiq/frontend/aqctl_corelog.py +++ b/artiq/frontend/aqctl_corelog.py @@ -6,17 +6,18 @@ import struct import logging import re -from artiq import tools -from artiq.protocols.pc_rpc import Server -from artiq.protocols.logging import log_with_name +from sipyco.pc_rpc import Server +from sipyco import common_args +from sipyco.logging_tools import log_with_name + from artiq.coredevice.comm_mgmt import Request, Reply def get_argparser(): parser = argparse.ArgumentParser( description="ARTIQ controller for core device logs") - tools.add_common_args(parser) - tools.simple_network_args(parser, 1068) + common_args.verbosity_args(parser) + common_args.simple_network_args(parser, 1068) parser.add_argument("--simulation", action="store_true", help="Simulation - does not connect to device") parser.add_argument("core_addr", metavar="CORE_ADDR", @@ -65,7 +66,7 @@ async def get_logs(host): def main(): args = get_argparser().parse_args() - tools.init_logger(args) + common_args.init_logger_from_args(args) loop = asyncio.get_event_loop() try: @@ -73,7 +74,7 @@ def main(): get_logs_sim(args.core_addr) if args.simulation else get_logs(args.core_addr)) try: server = Server({"corelog": PingTarget()}, None, True) - loop.run_until_complete(server.start(tools.bind_address_from_args(args), args.port)) + loop.run_until_complete(server.start(common_args.bind_address_from_args(args), args.port)) try: loop.run_until_complete(server.wait_terminate()) finally: diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index d0c7101f4..03b49d2dc 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -17,11 +17,13 @@ from dateutil.parser import parse as parse_date from prettytable import PrettyTable -from artiq.protocols.pc_rpc import Client -from artiq.protocols.sync_struct import Subscriber -from artiq.protocols.broadcast import Receiver -from artiq.protocols import pyon -from artiq.tools import short_format, add_common_args, parse_arguments +from sipyco.pc_rpc import Client +from sipyco.sync_struct import Subscriber +from sipyco.broadcast import Receiver +from sipyco import pyon + +from artiq.tools import short_format, parse_arguments +from artiq import __version__ as artiq_version def clear_screen(): @@ -39,6 +41,9 @@ def get_argparser(): parser.add_argument( "--port", default=None, type=int, help="TCP port to use to connect to the master") + parser.add_argument("--version", action="version", + version="ARTIQ v{}".format(artiq_version), + help="print the ARTIQ version number") subparsers = parser.add_subparsers(dest="action") subparsers.required = True @@ -64,7 +69,6 @@ def get_argparser(): "(defaults to head, ignored without -R)") parser_add.add_argument("-c", "--class-name", default=None, help="name of the class to run") - add_common_args(parser) parser_add.add_argument("file", metavar="FILE", help="file containing the experiment to run") parser_add.add_argument("arguments", metavar="ARGUMENTS", nargs="*", diff --git a/artiq/frontend/artiq_compile.py b/artiq/frontend/artiq_compile.py index cf7335474..2ebe0a657 100755 --- a/artiq/frontend/artiq_compile.py +++ b/artiq/frontend/artiq_compile.py @@ -2,6 +2,8 @@ import os, sys, logging, argparse +from sipyco import common_args + from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.worker_db import DeviceManager, DatasetManager from artiq.language.environment import ProcessArgumentManager @@ -15,7 +17,7 @@ logger = logging.getLogger(__name__) def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ static compiler") - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") parser.add_argument("--dataset-db", default="dataset_db.pyon", @@ -36,7 +38,7 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) device_mgr = DeviceManager(DeviceDB(args.device_db)) dataset_mgr = DatasetManager(DatasetDB(args.dataset_db)) diff --git a/artiq/frontend/artiq_coreanalyzer.py b/artiq/frontend/artiq_coreanalyzer.py index 2bd965004..96a058b4d 100755 --- a/artiq/frontend/artiq_coreanalyzer.py +++ b/artiq/frontend/artiq_coreanalyzer.py @@ -3,7 +3,8 @@ import argparse import sys -from artiq.tools import add_common_args, init_logger +from sipyco import common_args + from artiq.master.databases import DeviceDB from artiq.master.worker_db import DeviceManager from artiq.coredevice.comm_analyzer import (get_analyzer_dump, @@ -14,7 +15,7 @@ def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ core device " "RTIO analysis tool") - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") @@ -37,7 +38,7 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) if (not args.print_decoded and args.write_vcd is None and args.write_dump is None): diff --git a/artiq/frontend/artiq_coremgmt.py b/artiq/frontend/artiq_coremgmt.py index 270e01cc4..5c7594cff 100755 --- a/artiq/frontend/artiq_coremgmt.py +++ b/artiq/frontend/artiq_coremgmt.py @@ -3,7 +3,8 @@ import argparse import struct -from artiq.tools import add_common_args, init_logger +from sipyco import common_args + from artiq.master.databases import DeviceDB from artiq.coredevice.comm_kernel import CommKernel from artiq.coredevice.comm_mgmt import CommMgmt @@ -14,7 +15,7 @@ def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ core device " "management tool") - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") parser.add_argument("-D", "--device", default=None, @@ -134,7 +135,7 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) if args.device is None: core_addr = DeviceDB(args.device_db).get("core")["arguments"]["host"] diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index bfad03ad7..6612e8e9d 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -7,17 +7,18 @@ import os import logging import platform -from artiq.protocols.pc_rpc import Server -from artiq.protocols.logging import LogForwarder, SourceFilter -from artiq.tools import (simple_network_args, atexit_register_coroutine, - bind_address_from_args, add_common_args) +from sipyco.pc_rpc import Server +from sipyco.logging_tools import LogForwarder, SourceFilter +from sipyco import common_args + +from artiq.tools import atexit_register_coroutine from artiq.master.ctlmgr import ControllerManager def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ controller manager") - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument( "-s", "--server", default="::1", @@ -31,7 +32,7 @@ def get_argparser(): parser.add_argument( "--retry-master", default=5.0, type=float, help="retry timer for reconnecting to master") - simple_network_args(parser, [("control", "control", 3249)]) + common_args.simple_network_args(parser, [("control", "control", 3249)]) return parser @@ -73,7 +74,7 @@ def main(): rpc_target = CtlMgrRPC() rpc_server = Server({"ctlmgr": rpc_target}, builtin_terminate=True) - loop.run_until_complete(rpc_server.start(bind_address_from_args(args), + loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args), args.port_control)) atexit_register_coroutine(rpc_server.stop) diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index ee5f132b8..6af277019 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -9,11 +9,12 @@ import logging from PyQt5 import QtCore, QtGui, QtWidgets from quamash import QEventLoop +from sipyco.pc_rpc import AsyncioClient, Client +from sipyco.broadcast import Receiver +from sipyco import common_args + from artiq import __artiq_dir__ as artiq_dir, __version__ as artiq_version -from artiq.tools import (atexit_register_coroutine, add_common_args, - get_user_config_dir) -from artiq.protocols.pc_rpc import AsyncioClient, Client -from artiq.protocols.broadcast import Receiver +from artiq.tools import atexit_register_coroutine, get_user_config_dir from artiq.gui.models import ModelSubscriber from artiq.gui import state, log from artiq.dashboard import (experiments, shortcuts, explorer, @@ -37,7 +38,7 @@ def get_argparser(): parser.add_argument( "--db-file", default=None, help="database file for local GUI settings") - add_common_args(parser) + common_args.verbosity_args(parser) return parser diff --git a/artiq/frontend/artiq_flash.py b/artiq/frontend/artiq_flash.py index 81e0e6b2c..f2dd3d727 100755 --- a/artiq/frontend/artiq_flash.py +++ b/artiq/frontend/artiq_flash.py @@ -10,8 +10,9 @@ import atexit from functools import partial from collections import defaultdict +from sipyco import common_args + from artiq import __artiq_dir__ as artiq_dir -from artiq.tools import add_common_args, init_logger from artiq.remoting import SSHClient, LocalClient from artiq.frontend.bit2bin import bit2bin @@ -41,7 +42,7 @@ Prerequisites: plugdev group: 'sudo adduser $USER plugdev' and re-login. """) - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument("-n", "--dry-run", default=False, action="store_true", @@ -295,7 +296,7 @@ class ProgrammerMetlino(Programmer): def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) config = { "kasli": { diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py index 0dbd740c5..2a1fd8720 100755 --- a/artiq/frontend/artiq_influxdb.py +++ b/artiq/frontend/artiq_influxdb.py @@ -11,13 +11,13 @@ import time import numpy as np import aiohttp -from artiq.tools import ( - simple_network_args, add_common_args, atexit_register_coroutine, - bind_address_from_args, init_logger, TaskObject -) -from artiq.protocols.sync_struct import Subscriber -from artiq.protocols.pc_rpc import Server -from artiq.protocols import pyon +from sipyco import common_args +from sipyco.asyncio_tools import TaskObject +from sipyco.sync_struct import Subscriber +from sipyco.pc_rpc import Server +from sipyco import pyon + +from artiq.tools import atexit_register_coroutine logger = logging.getLogger(__name__) @@ -62,8 +62,8 @@ def get_argparser(): help="file to load the patterns from (default: %(default)s). " "If the file is not found, no patterns are loaded " "(everything is logged).") - simple_network_args(parser, [("control", "control", 3248)]) - add_common_args(parser) + common_args.simple_network_args(parser, [("control", "control", 3248)]) + common_args.verbosity_args(parser) return parser @@ -201,7 +201,7 @@ class Filter: logger.info("no pattern file found, logging everything") self.patterns = [] - # Privatize so that it is not shown in artiq_rpctool list-methods. + # Privatize so that it is not shown in sipyco_rpctool list-methods. def _filter(self, k): take = "+" for pattern in self.patterns: @@ -222,7 +222,7 @@ class Filter: def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) loop = asyncio.get_event_loop() atexit.register(loop.close) @@ -235,7 +235,7 @@ def main(): filter = Filter(args.pattern_file) rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True) - loop.run_until_complete(rpc_server.start(bind_address_from_args(args), + loop.run_until_complete(rpc_server.start(common_args.bind_address_from_args(args), args.port_control)) atexit_register_coroutine(rpc_server.stop) diff --git a/artiq/frontend/artiq_influxdb_schedule.py b/artiq/frontend/artiq_influxdb_schedule.py index bf8f3a79a..23c391c87 100755 --- a/artiq/frontend/artiq_influxdb_schedule.py +++ b/artiq/frontend/artiq_influxdb_schedule.py @@ -9,12 +9,13 @@ import time import aiohttp import numpy as np -from artiq.protocols.sync_struct import Subscriber -from artiq.tools import (add_common_args, simple_network_args, TaskObject, - init_logger, atexit_register_coroutine, - bind_address_from_args) -from artiq.protocols.pc_rpc import Server -from artiq.protocols import pyon +from sipyco.sync_struct import Subscriber +from sipyco.pc_rpc import Server +from sipyco import pyon +from sipyco import common_args +from sipyco.asyncio_tools import TaskObject + +from artiq.tools import atexit_register_coroutine logger = logging.getLogger(__name__) @@ -53,8 +54,8 @@ def get_argparser(): "--database", default="db", help="database name to use") group.add_argument( "--table", default="schedule", help="table name to use") - simple_network_args(parser, [("control", "control", 3275)]) - add_common_args(parser) + common_args.simple_network_args(parser, [("control", "control", 3275)]) + common_args.verbosity_args(parser) return parser @@ -210,7 +211,7 @@ class Logger: def main(): args = get_argparser().parse_args() - init_logger(args) + common_args.init_logger_from_args(args) loop = asyncio.get_event_loop() atexit.register(loop.close) @@ -226,7 +227,7 @@ def main(): server = Logger() rpc_server = Server({"schedule_logger": server}, builtin_terminate=True) loop.run_until_complete(rpc_server.start( - bind_address_from_args(args), args.port_control)) + common_args.bind_address_from_args(args), args.port_control)) atexit_register_coroutine(rpc_server.stop) reader = MasterReader(args.server_master, args.port_master, diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 4583ed6d4..373fd4da4 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -6,12 +6,13 @@ import atexit import os import logging -from artiq.tools import (simple_network_args, atexit_register_coroutine, - bind_address_from_args) -from artiq.protocols.pc_rpc import Server as RPCServer -from artiq.protocols.sync_struct import Publisher -from artiq.protocols.logging import Server as LoggingServer -from artiq.protocols.broadcast import Broadcaster +from sipyco.pc_rpc import Server as RPCServer +from sipyco.sync_struct import Publisher +from sipyco.logging_tools import Server as LoggingServer +from sipyco.broadcast import Broadcaster +from sipyco import common_args + +from artiq.tools import atexit_register_coroutine from artiq.master.log import log_args, init_log from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler @@ -25,7 +26,7 @@ logger = logging.getLogger(__name__) def get_argparser(): parser = argparse.ArgumentParser(description="ARTIQ master") - simple_network_args(parser, [ + common_args.simple_network_args(parser, [ ("notify", "notifications", 3250), ("control", "control", 3251), ("logging", "remote logging", 1066), @@ -72,7 +73,7 @@ def main(): else: loop = asyncio.get_event_loop() atexit.register(loop.close) - bind = bind_address_from_args(args) + bind = common_args.bind_address_from_args(args) server_broadcast = Broadcaster() loop.run_until_complete(server_broadcast.start( diff --git a/artiq/frontend/artiq_rpctool.py b/artiq/frontend/artiq_rpctool.py deleted file mode 100755 index fb9844f29..000000000 --- a/artiq/frontend/artiq_rpctool.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import textwrap -import sys -import traceback -import numpy as np # Needed to use numpy in RPC call arguments on cmd line -import pprint -import inspect - -from artiq.protocols.pc_rpc import AutoTarget, Client - - -def get_argparser(): - parser = argparse.ArgumentParser( - description="ARTIQ RPC tool") - parser.add_argument("server", metavar="SERVER", - help="hostname or IP of the controller to connect to") - parser.add_argument("port", metavar="PORT", type=int, - help="TCP port to use to connect to the controller") - subparsers = parser.add_subparsers(dest="action") - subparsers.add_parser("list-targets", help="list existing targets") - parser_list_methods = subparsers.add_parser("list-methods", - help="list target's methods") - parser_list_methods.add_argument("-t", "--target", help="target name") - parser_call = subparsers.add_parser("call", help="call a target's method") - parser_call.add_argument("-t", "--target", help="target name") - parser_call.add_argument("method", metavar="METHOD", help="method name") - parser_call.add_argument("args", metavar="ARGS", nargs=argparse.REMAINDER, - help="arguments") - parser_interactive = subparsers.add_parser("interactive", - help="enter interactive mode " - "(default)") - parser_interactive.add_argument("-t", "--target", help="target name") - return parser - - -def list_targets(target_names, description): - print("Target(s): " + ", ".join(target_names)) - if description is not None: - print("Description: " + description) - - -def list_methods(remote): - doc = remote.get_rpc_method_list() - if doc["docstring"] is not None: - print(doc["docstring"]) - print() - for name, (argspec, docstring) in sorted(doc["methods"].items()): - print(name + inspect.formatargspec(**argspec)) - if docstring is not None: - print(textwrap.indent(docstring, " ")) - print() - - -def call_method(remote, method_name, args): - method = getattr(remote, method_name) - ret = method(*[eval(arg) for arg in args]) - if ret is not None: - pprint.pprint(ret) - - -def interactive(remote): - try: - import readline # This makes input() nicer - except ImportError: - print("Warning: readline not available. " - "Install it to add line editing capabilities.") - - while True: - try: - cmd = input("({}) ".format(remote.get_selected_target())) - except EOFError: - return - class RemoteDict: - def __getitem__(self, k): - if k == "np": - return np - else: - return getattr(remote, k) - try: - ret = eval(cmd, {}, RemoteDict()) - except Exception as e: - if hasattr(e, "parent_traceback"): - print("Remote exception:") - print(traceback.format_exception_only(type(e), e)[0].rstrip()) - for l in e.parent_traceback: - print(l.rstrip()) - else: - traceback.print_exc() - else: - if ret is not None: - pprint.pprint(ret) - - -def main(): - args = get_argparser().parse_args() - if not args.action: - args.target = None - - remote = Client(args.server, args.port, None) - targets, description = remote.get_rpc_id() - if args.action != "list-targets": - if not args.target: - remote.select_rpc_target(AutoTarget) - else: - remote.select_rpc_target(args.target) - - if args.action == "list-targets": - list_targets(targets, description) - elif args.action == "list-methods": - list_methods(remote) - elif args.action == "call": - call_method(remote, args.method, args.args) - elif args.action == "interactive" or not args.action: - interactive(remote) - else: - print("Unrecognized action: {}".format(args.action)) - -if __name__ == "__main__": - main() diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index 76954e56c..c235387c9 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -12,6 +12,8 @@ import h5py from llvmlite_artiq import binding as llvm +from sipyco import common_args + from artiq.language.environment import EnvExperiment, ProcessArgumentManager from artiq.language.types import TBool from artiq.master.databases import DeviceDB, DatasetDB @@ -126,7 +128,7 @@ def get_argparser(with_file=True): parser = argparse.ArgumentParser( description="Local experiment running tool") - add_common_args(parser) + common_args.verbosity_args(parser) parser.add_argument("--device-db", default="device_db.py", help="device database file (default: '%(default)s')") parser.add_argument("--dataset-db", default="dataset_db.pyon", @@ -184,7 +186,7 @@ def _build_experiment(device_mgr, dataset_mgr, args): def run(with_file=False): args = get_argparser(with_file).parse_args() - init_logger(args) + common_args.init_logger_from_args(args) device_mgr = DeviceManager(DeviceDB(args.device_db), virtual_devices={"scheduler": DummyScheduler(), diff --git a/artiq/gui/applets.py b/artiq/gui/applets.py index ebf735595..432fb458d 100644 --- a/artiq/gui/applets.py +++ b/artiq/gui/applets.py @@ -10,9 +10,10 @@ from itertools import count from PyQt5 import QtCore, QtGui, QtWidgets -from artiq.protocols.pipe_ipc import AsyncioParentComm -from artiq.protocols.logging import LogParser -from artiq.protocols import pyon +from sipyco.pipe_ipc import AsyncioParentComm +from sipyco.logging_tools import LogParser +from sipyco import pyon + from artiq.gui.tools import QDockWidgetCloseDetect, LayoutWidget diff --git a/artiq/gui/log.py b/artiq/gui/log.py index 2b5d7185f..9bf9d7efa 100644 --- a/artiq/gui/log.py +++ b/artiq/gui/log.py @@ -6,7 +6,7 @@ from functools import partial from PyQt5 import QtCore, QtGui, QtWidgets -from artiq.protocols.logging import SourceFilter +from sipyco.logging_tools import SourceFilter from artiq.gui.tools import (LayoutWidget, log_level_to_name, QDockWidgetCloseDetect) diff --git a/artiq/gui/models.py b/artiq/gui/models.py index 9b9551593..4d6f19c22 100644 --- a/artiq/gui/models.py +++ b/artiq/gui/models.py @@ -1,6 +1,6 @@ from PyQt5 import QtCore -from artiq.protocols.sync_struct import Subscriber, process_mod +from sipyco.sync_struct import Subscriber, process_mod class ModelManager: diff --git a/artiq/gui/state.py b/artiq/gui/state.py index fc19f7919..3a3f21be8 100644 --- a/artiq/gui/state.py +++ b/artiq/gui/state.py @@ -2,8 +2,8 @@ import asyncio from collections import OrderedDict import logging -from artiq.tools import TaskObject -from artiq.protocols import pyon +from sipyco.asyncio_tools import TaskObject +from sipyco import pyon logger = logging.getLogger(__name__) diff --git a/artiq/language/environment.py b/artiq/language/environment.py index 574cd8b16..766779177 100644 --- a/artiq/language/environment.py +++ b/artiq/language/environment.py @@ -2,7 +2,8 @@ import warnings from collections import OrderedDict from inspect import isclass -from artiq.protocols import pyon +from sipyco import pyon + from artiq.language import units from artiq.language.core import rpc diff --git a/artiq/master/ctlmgr.py b/artiq/master/ctlmgr.py index 93306df6d..075f08a41 100644 --- a/artiq/master/ctlmgr.py +++ b/artiq/master/ctlmgr.py @@ -5,10 +5,12 @@ import shlex import socket import os -from artiq.protocols.sync_struct import Subscriber -from artiq.protocols.pc_rpc import AsyncioClient -from artiq.protocols.logging import LogParser -from artiq.tools import Condition, TaskObject +from sipyco.sync_struct import Subscriber +from sipyco.pc_rpc import AsyncioClient +from sipyco.logging_tools import LogParser +from sipyco.asyncio_tools import TaskObject + +from artiq.tools import Condition logger = logging.getLogger(__name__) diff --git a/artiq/master/databases.py b/artiq/master/databases.py index c96deb9ab..0eb59bf0e 100644 --- a/artiq/master/databases.py +++ b/artiq/master/databases.py @@ -1,9 +1,9 @@ import asyncio import tokenize -from artiq.protocols.sync_struct import Notifier, process_mod, update_from_dict -from artiq.protocols import pyon -from artiq.tools import TaskObject +from sipyco.sync_struct import Notifier, process_mod, update_from_dict +from sipyco import pyon +from sipyco.asyncio_tools import TaskObject def device_db_from_file(filename): diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index 892d6d30a..b9a46d7ba 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -5,7 +5,8 @@ import shutil import time import logging -from artiq.protocols.sync_struct import Notifier, update_from_dict +from sipyco.sync_struct import Notifier, update_from_dict + from artiq.master.worker import (Worker, WorkerInternalException, log_worker_exception) from artiq.tools import get_windows_drives, exc_to_warning diff --git a/artiq/master/log.py b/artiq/master/log.py index d759d1309..945e44d86 100644 --- a/artiq/master/log.py +++ b/artiq/master/log.py @@ -1,7 +1,7 @@ import logging import logging.handlers -from artiq.protocols.logging import SourceFilter +from sipyco.logging_tools import SourceFilter class LogForwarder(logging.Handler): diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 0f5d9c8c8..78d90564b 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -3,9 +3,11 @@ import logging from enum import Enum from time import time +from sipyco.sync_struct import Notifier +from sipyco.asyncio_tools import TaskObject + from artiq.master.worker import Worker, log_worker_exception -from artiq.tools import asyncio_wait_or_cancel, TaskObject, Condition -from artiq.protocols.sync_struct import Notifier +from artiq.tools import asyncio_wait_or_cancel, Condition logger = logging.getLogger(__name__) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 293d0f180..9480af8f4 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -5,9 +5,10 @@ import logging import subprocess import time -from artiq.protocols import pipe_ipc, pyon -from artiq.protocols.logging import LogParser -from artiq.protocols.packed_exceptions import current_exc_packed +from sipyco import pipe_ipc, pyon +from sipyco.logging_tools import LogParser +from sipyco.packed_exceptions import current_exc_packed + from artiq.tools import asyncio_wait_or_cancel diff --git a/artiq/master/worker_db.py b/artiq/master/worker_db.py index 3a3f7ef25..6d3799576 100644 --- a/artiq/master/worker_db.py +++ b/artiq/master/worker_db.py @@ -9,8 +9,8 @@ from collections import OrderedDict import importlib import logging -from artiq.protocols.sync_struct import Notifier -from artiq.protocols.pc_rpc import AutoTarget, Client, BestEffortClient +from sipyco.sync_struct import Notifier +from sipyco.pc_rpc import AutoTarget, Client, BestEffortClient logger = logging.getLogger(__name__) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 539b09b90..e53ea9376 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -15,10 +15,12 @@ from collections import OrderedDict import h5py +from sipyco import pipe_ipc, pyon +from sipyco.packed_exceptions import raise_packed_exc +from sipyco.logging_tools import multiline_log_config + import artiq -from artiq.protocols import pipe_ipc, pyon -from artiq.protocols.packed_exceptions import raise_packed_exc -from artiq.tools import multiline_log_config, file_import +from artiq.tools import file_import from artiq.master.worker_db import DeviceManager, DatasetManager, DummyDevice from artiq.language.environment import (is_experiment, TraceArgumentManager, ProcessArgumentManager) diff --git a/artiq/monkey_patches.py b/artiq/monkey_patches.py deleted file mode 100644 index f6c31ad48..000000000 --- a/artiq/monkey_patches.py +++ /dev/null @@ -1,37 +0,0 @@ -import sys -import socket - - -__all__ = [] - - -if sys.version_info[:3] >= (3, 5, 2) and sys.version_info[:3] <= (3, 6, 6): - import asyncio - - # See https://github.com/m-labs/artiq/issues/506 - def _ipaddr_info(host, port, family, type, proto): - return None - asyncio.base_events._ipaddr_info = _ipaddr_info - - # See https://github.com/m-labs/artiq/issues/1016 - @asyncio.coroutine - def sock_connect(self, sock, address): - """Connect to a remote socket at address. - - This method is a coroutine. - """ - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: - socktype = sock.type & 0xf # WA https://bugs.python.org/issue21327 - resolved = asyncio.base_events._ensure_resolved( - address, family=sock.family, type=socktype, proto=sock.proto, loop=self) - if not resolved.done(): - yield from resolved - _, _, _, _, address = resolved.result()[0] - - fut = self.create_future() - self._sock_connect(fut, sock, address) - return (yield from fut) - asyncio.selector_events.BaseSelectorEventLoop.sock_connect = sock_connect diff --git a/artiq/protocols/asyncio_server.py b/artiq/protocols/asyncio_server.py deleted file mode 100644 index 4eef0374b..000000000 --- a/artiq/protocols/asyncio_server.py +++ /dev/null @@ -1,54 +0,0 @@ -import asyncio -from copy import copy - - -class AsyncioServer: - """Generic TCP server based on asyncio. - - Users of this class must derive from it and define the - :meth:`~artiq.protocols.asyncio_server.AsyncioServer._handle_connection_cr` - method/coroutine. - """ - def __init__(self): - self._client_tasks = set() - - async def start(self, host, port): - """Starts the server. - - The user must call :meth:`stop` - to free resources properly after this method completes successfully. - - This method is a *coroutine*. - - :param host: Bind address of the server (see ``asyncio.start_server`` - from the Python standard library). - :param port: TCP port to bind to. - """ - self.server = await asyncio.start_server(self._handle_connection, - host, port, - limit=4*1024*1024) - - async def stop(self): - """Stops the server.""" - wait_for = copy(self._client_tasks) - for task in self._client_tasks: - task.cancel() - for task in wait_for: - try: - await asyncio.wait_for(task, None) - except asyncio.CancelledError: - pass - self.server.close() - await self.server.wait_closed() - del self.server - - def _client_done(self, task): - self._client_tasks.remove(task) - - def _handle_connection(self, reader, writer): - task = asyncio.ensure_future(self._handle_connection_cr(reader, writer)) - self._client_tasks.add(task) - task.add_done_callback(self._client_done) - - async def _handle_connection_cr(self, reader, writer): - raise NotImplementedError diff --git a/artiq/protocols/broadcast.py b/artiq/protocols/broadcast.py deleted file mode 100644 index 354a1b560..000000000 --- a/artiq/protocols/broadcast.py +++ /dev/null @@ -1,109 +0,0 @@ -import asyncio - -from artiq.monkey_patches import * -from artiq.protocols import pyon -from artiq.protocols.asyncio_server import AsyncioServer - - -_init_string = b"ARTIQ broadcast\n" - - -class Receiver: - def __init__(self, name, notify_cb, disconnect_cb=None): - self.name = name - if not isinstance(notify_cb, list): - notify_cb = [notify_cb] - self.notify_cbs = notify_cb - self.disconnect_cb = disconnect_cb - - async def connect(self, host, port): - self.reader, self.writer = \ - await asyncio.open_connection(host, port, limit=100*1024*1024) - try: - self.writer.write(_init_string) - self.writer.write((self.name + "\n").encode()) - self.receive_task = asyncio.ensure_future(self._receive_cr()) - except: - self.writer.close() - del self.reader - del self.writer - raise - - async def close(self): - self.disconnect_cb = None - try: - self.receive_task.cancel() - try: - await asyncio.wait_for(self.receive_task, None) - except asyncio.CancelledError: - pass - finally: - self.writer.close() - del self.reader - del self.writer - - async def _receive_cr(self): - try: - target = None - while True: - line = await self.reader.readline() - if not line: - return - obj = pyon.decode(line.decode()) - - for notify_cb in self.notify_cbs: - notify_cb(obj) - finally: - if self.disconnect_cb is not None: - self.disconnect_cb() - - -class Broadcaster(AsyncioServer): - def __init__(self, queue_limit=1024): - AsyncioServer.__init__(self) - self._queue_limit = queue_limit - self._recipients = dict() - - async def _handle_connection_cr(self, reader, writer): - try: - line = await reader.readline() - if line != _init_string: - return - - line = await reader.readline() - if not line: - return - name = line.decode()[:-1] - - queue = asyncio.Queue(self._queue_limit) - if name in self._recipients: - self._recipients[name].add(queue) - else: - self._recipients[name] = {queue} - try: - while True: - line = await queue.get() - writer.write(line) - # raise exception on connection error - await writer.drain() - finally: - self._recipients[name].remove(queue) - if not self._recipients[name]: - del self._recipients[name] - except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): - # receivers disconnecting are a normal occurence - pass - finally: - writer.close() - - def broadcast(self, name, obj): - if name in self._recipients: - line = pyon.encode(obj) + "\n" - line = line.encode() - for recipient in self._recipients[name]: - try: - recipient.put_nowait(line) - except asyncio.QueueFull: - # do not log: log messages may be sent back to us - # as broadcasts, and cause infinite recursion. - pass diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py deleted file mode 100644 index 542e8da35..000000000 --- a/artiq/protocols/logging.py +++ /dev/null @@ -1,187 +0,0 @@ -import asyncio -import logging -import re - -from artiq.monkey_patches import * -from artiq.protocols.asyncio_server import AsyncioServer -from artiq.tools import TaskObject, MultilineFormatter - - -logging.TRACE = 5 -logging.addLevelName(logging.TRACE, 'TRACE') - - -logger = logging.getLogger(__name__) -_fwd_logger = logging.getLogger("fwd") - - -def log_with_name(name, *args, **kwargs): - _fwd_logger.name = name - _fwd_logger.log(*args, **kwargs) - - -_name_to_level = { - "CRITICAL": logging.CRITICAL, - "ERROR": logging.ERROR, - "WARN": logging.WARNING, - "WARNING": logging.WARNING, - "INFO": logging.INFO, - "DEBUG": logging.DEBUG, - "TRACE": logging.TRACE, -} - - -def parse_log_message(msg): - lr = "|".join(_name_to_level.keys()) - m = re.fullmatch('('+lr+')(<\d+>)?:([^:]*):(.*)', msg) - if m is None: - return 0, logging.INFO, "print", msg - level = _name_to_level[m.group(1)] - if m.group(2): - multiline = int(m.group(2)[1:-1]) - 1 - else: - multiline = 0 - name = m.group(3) - message = m.group(4) - return multiline, level, name, message - - -class LogParser: - def __init__(self, source_cb): - self.source_cb = source_cb - self.multiline_count = 0 - self.multiline_level = None - self.multiline_name = None - self.multiline_message = None - - def line_input(self, msg): - if self.multiline_count: - self.multiline_message += "\n" + msg - self.multiline_count -= 1 - if not self.multiline_count: - log_with_name( - self.multiline_name, - self.multiline_level, - self.multiline_message, - extra={"source": self.source_cb()}) - self.multiline_level = None - self.multiline_name = None - self.multiline_message = None - else: - multiline, level, name, message = parse_log_message(msg) - if multiline: - self.multiline_count = multiline - self.multiline_level = level - self.multiline_name = name - self.multiline_message = message - else: - log_with_name(name, level, message, - extra={"source": self.source_cb()}) - - async def stream_task(self, stream): - while True: - try: - entry = (await stream.readline()) - if not entry: - break - self.line_input(entry.decode().rstrip("\r\n")) - except: - logger.debug("exception in log forwarding", exc_info=True) - break - logger.debug("stopped log forwarding of stream %s of %s", - stream, self.source_cb()) - - -_init_string = b"ARTIQ logging\n" - - -class Server(AsyncioServer): - """Remote logging TCP server. - - Log entries are in the format: - source:levelno:name:message - continuation... - ...continuation - """ - async def _handle_connection_cr(self, reader, writer): - try: - line = await reader.readline() - if line != _init_string: - return - - source = None - parser = LogParser(lambda: source) - - while True: - line = await reader.readline() - if not line: - break - try: - line = line.decode() - except: - return - line = line[:-1] - if parser.multiline_count: - parser.line_input(line) - else: - linesplit = line.split(":", maxsplit=1) - if len(linesplit) != 2: - logger.warning("received improperly formatted message, " - "dropping connection") - return - source, remainder = linesplit - parser.line_input(remainder) - except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): - # May happens on Windows when client disconnects - pass - finally: - writer.close() - - -class SourceFilter: - def __init__(self, local_level, local_source): - self.local_level = local_level - self.local_source = local_source - - def filter(self, record): - if not hasattr(record, "source"): - record.source = self.local_source - if record.source == self.local_source: - return record.levelno >= self.local_level - else: - # log messages that are forwarded from a source have already - # been filtered, and may have a level below the local level. - return True - - -class LogForwarder(logging.Handler, TaskObject): - def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000, - **kwargs): - logging.Handler.__init__(self, **kwargs) - self.host = host - self.port = port - self.setFormatter(MultilineFormatter()) - self._queue = asyncio.Queue(queue_size) - self.reconnect_timer = reconnect_timer - - def emit(self, record): - self._queue.put_nowait(record.source + ":" + self.format(record)) - - async def _do(self): - reader = writer = None - while True: - try: - reader, writer = await asyncio.open_connection(self.host, - self.port) - writer.write(_init_string) - while True: - message = await self._queue.get() + "\n" - writer.write(message.encode()) - await writer.drain() - except asyncio.CancelledError: - return - except: - await asyncio.sleep(self.reconnect_timer) - finally: - if writer is not None: - writer.close() diff --git a/artiq/protocols/packed_exceptions.py b/artiq/protocols/packed_exceptions.py deleted file mode 100644 index 2c453bf80..000000000 --- a/artiq/protocols/packed_exceptions.py +++ /dev/null @@ -1,42 +0,0 @@ -import inspect -import builtins -import traceback -import sys - - -__all__ = ["GenericRemoteException", "current_exc_packed", "raise_packed_exc"] - - -class GenericRemoteException(Exception): - pass - - -builtin_exceptions = {v: k for k, v in builtins.__dict__.items() - if inspect.isclass(v) and issubclass(v, BaseException)} - - -def current_exc_packed(): - exc_class, exc, exc_tb = sys.exc_info() - if exc_class in builtin_exceptions: - return { - "class": builtin_exceptions[exc_class], - "message": str(exc), - "traceback": traceback.format_tb(exc_tb) - } - else: - message = traceback.format_exception_only(exc_class, exc)[0].rstrip() - return { - "class": "GenericRemoteException", - "message": message, - "traceback": traceback.format_tb(exc_tb) - } - - -def raise_packed_exc(pack): - if pack["class"] == "GenericRemoteException": - cls = GenericRemoteException - else: - cls = getattr(builtins, pack["class"]) - exc = cls(pack["message"]) - exc.parent_traceback = pack["traceback"] - raise exc diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py deleted file mode 100644 index 5fa6f0820..000000000 --- a/artiq/protocols/pc_rpc.py +++ /dev/null @@ -1,625 +0,0 @@ -""" -This module provides a remote procedure call (RPC) mechanism over sockets -between conventional computers (PCs) running Python. It strives to be -transparent and uses :mod:`artiq.protocols.pyon` internally so that e.g. Numpy -arrays can be easily used. - -Note that the server operates on copies of objects provided by the client, -and modifications to mutable types are not written back. For example, if the -client passes a list as a parameter of an RPC method, and that method -``append()s`` an element to the list, the element is not appended to the -client's list. -""" - -import asyncio -import inspect -import logging -import socket -import threading -import time -from operator import itemgetter - -from artiq.monkey_patches import * -from artiq.protocols import pyon -from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer -from artiq.protocols.packed_exceptions import * - -logger = logging.getLogger(__name__) - - -class AutoTarget: - """Use this as target value in clients for them to automatically connect - to the target exposed by the server. Servers must have only one target.""" - pass - - -class IncompatibleServer(Exception): - """Raised by the client when attempting to connect to a server that does - not have the expected target.""" - pass - - -_init_string = b"ARTIQ pc_rpc\n" - - -def _validate_target_name(target_name, target_names): - if target_name is AutoTarget: - if len(target_names) > 1: - raise ValueError("Server has multiple targets: " + - " ".join(sorted(target_names))) - else: - target_name = target_names[0] - elif target_name not in target_names: - raise IncompatibleServer( - "valid target name(s): " + " ".join(sorted(target_names))) - return target_name - - -class Client: - """This class proxies the methods available on the server so that they - can be used as if they were local methods. - - For example, if the server provides method ``foo``, and ``c`` is a local - :class:`.Client` object, then the method can be called as: :: - - result = c.foo(param1, param2) - - The parameters and the result are automatically transferred from the - server. - - Only methods are supported. Attributes must be accessed by providing and - using "get" and/or "set" methods on the server side. - - At object initialization, the connection to the remote server is - automatically attempted. The user must call :meth:`~artiq.protocols.pc_rpc.Client.close_rpc` to - free resources properly after initialization completes successfully. - - :param host: Identifier of the server. The string can represent a - hostname or a IPv4 or IPv6 address (see - ``socket.create_connection`` in the Python standard library). - :param port: TCP port to use. - :param target_name: Target name to select. ``IncompatibleServer`` is - raised if the target does not exist. - Use :class:`.AutoTarget` for automatic selection if the server has only one - target. - Use ``None`` to skip selecting a target. The list of targets can then - be retrieved using :meth:`~artiq.protocols.pc_rpc.Client.get_rpc_id` - and then one can be selected later using :meth:`~artiq.protocols.pc_rpc.Client.select_rpc_target`. - :param timeout: Socket operation timeout. Use ``None`` for blocking - (default), ``0`` for non-blocking, and a finite value to raise - ``socket.timeout`` if an operation does not complete within the - given time. See also ``socket.create_connection()`` and - ``socket.settimeout()`` in the Python standard library. A timeout - in the middle of a RPC can break subsequent RPCs (from the same - client). - """ - def __init__(self, host, port, target_name=AutoTarget, timeout=None): - self.__socket = socket.create_connection((host, port), timeout) - - try: - self.__socket.sendall(_init_string) - - server_identification = self.__recv() - self.__target_names = server_identification["targets"] - self.__description = server_identification["description"] - self.__selected_target = None - self.__valid_methods = set() - if target_name is not None: - self.select_rpc_target(target_name) - except: - self.__socket.close() - raise - - def select_rpc_target(self, target_name): - """Selects a RPC target by name. This function should be called - exactly once if the object was created with ``target_name=None``.""" - target_name = _validate_target_name(target_name, self.__target_names) - self.__socket.sendall((target_name + "\n").encode()) - self.__selected_target = target_name - self.__valid_methods = self.__recv() - - def get_selected_target(self): - """Returns the selected target, or ``None`` if no target has been - selected yet.""" - return self.__selected_target - - def get_rpc_id(self): - """Returns a tuple (target_names, description) containing the - identification information of the server.""" - return (self.__target_names, self.__description) - - def get_local_host(self): - """Returns the address of the local end of the connection.""" - return self.__socket.getsockname()[0] - - def close_rpc(self): - """Closes the connection to the RPC server. - - No further method calls should be done after this method is called. - """ - self.__socket.close() - - def __send(self, obj): - line = pyon.encode(obj) + "\n" - self.__socket.sendall(line.encode()) - - def __recv(self): - buf = self.__socket.recv(4096).decode() - while "\n" not in buf: - more = self.__socket.recv(4096) - if not more: - break - buf += more.decode() - return pyon.decode(buf) - - def __do_action(self, action): - self.__send(action) - - obj = self.__recv() - if obj["status"] == "ok": - return obj["ret"] - elif obj["status"] == "failed": - raise_packed_exc(obj["exception"]) - else: - raise ValueError - - def __do_rpc(self, name, args, kwargs): - obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} - return self.__do_action(obj) - - def get_rpc_method_list(self): - obj = {"action": "get_rpc_method_list"} - return self.__do_action(obj) - - def __getattr__(self, name): - if name not in self.__valid_methods: - raise AttributeError - def proxy(*args, **kwargs): - return self.__do_rpc(name, args, kwargs) - return proxy - - -class AsyncioClient: - """This class is similar to :class:`artiq.protocols.pc_rpc.Client`, but - uses ``asyncio`` instead of blocking calls. - - All RPC methods are coroutines. - - Concurrent access from different asyncio tasks is supported; all calls - use a single lock. - """ - def __init__(self): - self.__lock = asyncio.Lock() - self.__reader = None - self.__writer = None - self.__target_names = None - self.__description = None - - async def connect_rpc(self, host, port, target_name): - """Connects to the server. This cannot be done in __init__ because - this method is a coroutine. See :class:`artiq.protocols.pc_rpc.Client` for a description of the - parameters.""" - self.__reader, self.__writer = \ - await asyncio.open_connection(host, port, limit=100*1024*1024) - try: - self.__writer.write(_init_string) - server_identification = await self.__recv() - self.__target_names = server_identification["targets"] - self.__description = server_identification["description"] - self.__selected_target = None - self.__valid_methods = set() - if target_name is not None: - await self.select_rpc_target(target_name) - except: - self.close_rpc() - raise - - async def select_rpc_target(self, target_name): - """Selects a RPC target by name. This function should be called - exactly once if the connection was created with ``target_name=None``. - """ - target_name = _validate_target_name(target_name, self.__target_names) - self.__writer.write((target_name + "\n").encode()) - self.__selected_target = target_name - self.__valid_methods = await self.__recv() - - def get_selected_target(self): - """Returns the selected target, or ``None`` if no target has been - selected yet.""" - return self.__selected_target - - def get_local_host(self): - """Returns the address of the local end of the connection.""" - return self.__writer.get_extra_info("socket").getsockname()[0] - - def get_rpc_id(self): - """Returns a tuple (target_names, description) containing the - identification information of the server.""" - return (self.__target_names, self.__description) - - def close_rpc(self): - """Closes the connection to the RPC server. - - No further method calls should be done after this method is called. - """ - if self.__writer is not None: - self.__writer.close() - self.__reader = None - self.__writer = None - self.__target_names = None - self.__description = None - - def __send(self, obj): - line = pyon.encode(obj) + "\n" - self.__writer.write(line.encode()) - - async def __recv(self): - line = await self.__reader.readline() - return pyon.decode(line.decode()) - - async def __do_rpc(self, name, args, kwargs): - await self.__lock.acquire() - try: - obj = {"action": "call", "name": name, - "args": args, "kwargs": kwargs} - self.__send(obj) - - obj = await self.__recv() - if obj["status"] == "ok": - return obj["ret"] - elif obj["status"] == "failed": - raise_packed_exc(obj["exception"]) - else: - raise ValueError - finally: - self.__lock.release() - - def __getattr__(self, name): - if name not in self.__valid_methods: - raise AttributeError - async def proxy(*args, **kwargs): - res = await self.__do_rpc(name, args, kwargs) - return res - return proxy - - -class BestEffortClient: - """This class is similar to :class:`artiq.protocols.pc_rpc.Client`, but - network errors are suppressed and connections are retried in the - background. - - RPC calls that failed because of network errors return ``None``. Other RPC - calls are blocking and return the correct value. - - :param firstcon_timeout: Timeout to use during the first (blocking) - connection attempt at object initialization. - :param retry: Amount of time to wait between retries when reconnecting - in the background. - """ - def __init__(self, host, port, target_name, - firstcon_timeout=1.0, retry=5.0): - self.__host = host - self.__port = port - self.__target_name = target_name - self.__retry = retry - - self.__conretry_terminate = False - self.__socket = None - self.__valid_methods = set() - try: - self.__coninit(firstcon_timeout) - except: - logger.warning("first connection attempt to %s:%d[%s] failed, " - "retrying in the background", - self.__host, self.__port, self.__target_name, - exc_info=True) - self.__start_conretry() - else: - self.__conretry_thread = None - - def __coninit(self, timeout): - if timeout is None: - self.__socket = socket.create_connection( - (self.__host, self.__port)) - else: - self.__socket = socket.create_connection( - (self.__host, self.__port), timeout) - self.__socket.settimeout(None) - self.__socket.sendall(_init_string) - server_identification = self.__recv() - target_name = _validate_target_name(self.__target_name, - server_identification["targets"]) - self.__socket.sendall((target_name + "\n").encode()) - self.__valid_methods = self.__recv() - - def __start_conretry(self): - self.__conretry_thread = threading.Thread(target=self.__conretry) - self.__conretry_thread.start() - - def __conretry(self): - while True: - try: - self.__coninit(None) - except: - if self.__conretry_terminate: - break - time.sleep(self.__retry) - else: - break - if not self.__conretry_terminate: - logger.warning("connection to %s:%d[%s] established in " - "the background", - self.__host, self.__port, self.__target_name) - if self.__conretry_terminate and self.__socket is not None: - self.__socket.close() - # must be after __socket.close() to avoid race condition - self.__conretry_thread = None - - def close_rpc(self): - """Closes the connection to the RPC server. - - No further method calls should be done after this method is called. - """ - if self.__conretry_thread is None: - if self.__socket is not None: - self.__socket.close() - else: - # Let the thread complete I/O and then do the socket closing. - # Python fails to provide a way to cancel threads... - self.__conretry_terminate = True - - def __send(self, obj): - line = pyon.encode(obj) + "\n" - self.__socket.sendall(line.encode()) - - def __recv(self): - buf = self.__socket.recv(4096).decode() - while "\n" not in buf: - more = self.__socket.recv(4096) - if not more: - break - buf += more.decode() - return pyon.decode(buf) - - def __do_rpc(self, name, args, kwargs): - if self.__conretry_thread is not None: - return None - - obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} - try: - self.__send(obj) - obj = self.__recv() - except: - logger.warning("connection failed while attempting " - "RPC to %s:%d[%s], re-establishing connection " - "in the background", - self.__host, self.__port, self.__target_name) - self.__start_conretry() - return None - else: - if obj["status"] == "ok": - return obj["ret"] - elif obj["status"] == "failed": - raise_packed_exc(obj["exception"]) - else: - raise ValueError - - def __getattr__(self, name): - if name not in self.__valid_methods: - raise AttributeError - def proxy(*args, **kwargs): - return self.__do_rpc(name, args, kwargs) - return proxy - - def get_selected_target(self): - raise NotImplementedError - - def get_local_host(self): - raise NotImplementedError - - -def _format_arguments(arguments): - fmtargs = [] - for k, v in sorted(arguments.items(), key=itemgetter(0)): - fmtargs.append(k + "=" + repr(v)) - if fmtargs: - return ", ".join(fmtargs) - else: - return "" - - -class _PrettyPrintCall: - def __init__(self, obj): - self.obj = obj - - def __str__(self): - r = self.obj["name"] + "(" - args = ", ".join([repr(a) for a in self.obj["args"]]) - r += args - kwargs = _format_arguments(self.obj["kwargs"]) - if args and kwargs: - r += ", " - r += kwargs - r += ")" - return r - - -class Server(_AsyncioServer): - """This class creates a TCP server that handles requests coming from - *Client* objects (whether :class:`.Client`, :class:`.BestEffortClient`, - or :class:`.AsyncioClient`). - - The server is designed using ``asyncio`` so that it can easily support - multiple connections without the locking issues that arise in - multi-threaded applications. Multiple connection support is useful even in - simple cases: it allows new connections to be be accepted even when the - previous client failed to properly shut down its connection. - - If a target method is a coroutine, it is awaited and its return value - is sent to the RPC client. If ``allow_parallel`` is true, multiple - target coroutines may be executed in parallel (one per RPC client), - otherwise a lock ensures that the calls from several clients are executed - sequentially. - - :param targets: A dictionary of objects providing the RPC methods to be - exposed to the client. Keys are names identifying each object. - Clients select one of these objects using its name upon connection. - :param description: An optional human-readable string giving more - information about the server. - :param builtin_terminate: If set, the server provides a built-in - ``terminate`` method that unblocks any tasks waiting on - ``wait_terminate``. This is useful to handle server termination - requests from clients. - :param allow_parallel: Allow concurrent asyncio calls to the target's - methods. - """ - def __init__(self, targets, description=None, builtin_terminate=False, - allow_parallel=False): - _AsyncioServer.__init__(self) - self.targets = targets - self.description = description - self.builtin_terminate = builtin_terminate - if builtin_terminate: - self._terminate_request = asyncio.Event() - if allow_parallel: - self._noparallel = None - else: - self._noparallel = asyncio.Lock() - - @staticmethod - def _document_function(function): - """ - Turn a function into a tuple of its arguments and documentation. - - Allows remote inspection of what methods are available on a local device. - - Args: - function (Callable): a Python function to be documented. - - Returns: - Tuple[dict, str]: tuple of (argument specifications, - function documentation). - Any type annotations are converted to strings (for PYON serialization). - """ - argspec_dict = dict(inspect.getfullargspec(function)._asdict()) - # Fix issue #1186: PYON can't serialize type annotations. - if any(argspec_dict.get("annotations", {})): - argspec_dict["annotations"] = str(argspec_dict["annotations"]) - return argspec_dict, inspect.getdoc(function) - - async def _process_action(self, target, obj): - if self._noparallel is not None: - await self._noparallel.acquire() - try: - if obj["action"] == "get_rpc_method_list": - members = inspect.getmembers(target, inspect.ismethod) - doc = { - "docstring": inspect.getdoc(target), - "methods": {} - } - for name, method in members: - if name.startswith("_"): - continue - method = getattr(target, name) - doc["methods"][name] = self._document_function(method) - if self.builtin_terminate: - doc["methods"]["terminate"] = ( - { - "args": ["self"], - "defaults": None, - "varargs": None, - "varkw": None, - "kwonlyargs": [], - "kwonlydefaults": [], - }, - "Terminate the server.") - logger.debug("RPC docs for %s: %s", target, doc) - return {"status": "ok", "ret": doc} - elif obj["action"] == "call": - logger.debug("calling %s", _PrettyPrintCall(obj)) - if (self.builtin_terminate and obj["name"] == - "terminate"): - self._terminate_request.set() - return {"status": "ok", "ret": None} - else: - method = getattr(target, obj["name"]) - ret = method(*obj["args"], **obj["kwargs"]) - if inspect.iscoroutine(ret): - ret = await ret - return {"status": "ok", "ret": ret} - else: - raise ValueError("Unknown action: {}" - .format(obj["action"])) - except asyncio.CancelledError: - raise - except: - return { - "status": "failed", - "exception": current_exc_packed() - } - finally: - if self._noparallel is not None: - self._noparallel.release() - - async def _handle_connection_cr(self, reader, writer): - try: - line = await reader.readline() - if line != _init_string: - return - - obj = { - "targets": sorted(self.targets.keys()), - "description": self.description - } - line = pyon.encode(obj) + "\n" - writer.write(line.encode()) - line = await reader.readline() - if not line: - return - target_name = line.decode()[:-1] - try: - target = self.targets[target_name] - except KeyError: - return - - if callable(target): - target = target() - - valid_methods = inspect.getmembers(target, inspect.ismethod) - valid_methods = {m[0] for m in valid_methods} - if self.builtin_terminate: - valid_methods.add("terminate") - writer.write((pyon.encode(valid_methods) + "\n").encode()) - - while True: - line = await reader.readline() - if not line: - break - reply = await self._process_action(target, pyon.decode(line.decode())) - writer.write((pyon.encode(reply) + "\n").encode()) - except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): - # May happens on Windows when client disconnects - pass - finally: - writer.close() - - async def wait_terminate(self): - await self._terminate_request.wait() - - -def simple_server_loop(targets, host, port, description=None): - """Runs a server until an exception is raised (e.g. the user hits Ctrl-C) - or termination is requested by a client. - - See :class:`artiq.protocols.pc_rpc.Server` for a description of the parameters. - """ - loop = asyncio.get_event_loop() - try: - server = Server(targets, description, True) - loop.run_until_complete(server.start(host, port)) - try: - loop.run_until_complete(server.wait_terminate()) - finally: - loop.run_until_complete(server.stop()) - finally: - loop.close() diff --git a/artiq/protocols/pipe_ipc.py b/artiq/protocols/pipe_ipc.py deleted file mode 100644 index e81176c4e..000000000 --- a/artiq/protocols/pipe_ipc.py +++ /dev/null @@ -1,218 +0,0 @@ -import os -import asyncio -from asyncio.streams import FlowControlMixin - - -__all__ = ["AsyncioParentComm", "AsyncioChildComm", "ChildComm"] - - -class _BaseIO: - def write(self, data): - self.writer.write(data) - - async def drain(self): - await self.writer.drain() - - async def readline(self): - return await self.reader.readline() - - async def read(self, n): - return await self.reader.read(n) - - -if os.name != "nt": - async def _fds_to_asyncio(rfd, wfd, loop): - reader = asyncio.StreamReader(loop=loop, limit=100*1024*1024) - reader_protocol = asyncio.StreamReaderProtocol(reader, loop=loop) - rf = open(rfd, "rb", 0) - rt, _ = await loop.connect_read_pipe(lambda: reader_protocol, rf) - - wf = open(wfd, "wb", 0) - wt, _ = await loop.connect_write_pipe(FlowControlMixin, wf) - writer = asyncio.StreamWriter(wt, reader_protocol, None, loop) - - return rt, reader, writer - - - class AsyncioParentComm(_BaseIO): - def __init__(self): - self.c_rfd, self.p_wfd = os.pipe() - self.p_rfd, self.c_wfd = os.pipe() - - def get_address(self): - return "{},{}".format(self.c_rfd, self.c_wfd) - - async def _autoclose(self): - await self.process.wait() - self.reader_transport.close() - self.writer.close() - - async def create_subprocess(self, *args, **kwargs): - loop = asyncio.get_event_loop() - self.process = await asyncio.create_subprocess_exec( - *args, pass_fds={self.c_rfd, self.c_wfd}, **kwargs) - os.close(self.c_rfd) - os.close(self.c_wfd) - - self.reader_transport, self.reader, self.writer = \ - await _fds_to_asyncio(self.p_rfd, self.p_wfd, loop) - asyncio.ensure_future(self._autoclose()) - - - class AsyncioChildComm(_BaseIO): - def __init__(self, address): - self.address = address - - async def connect(self): - rfd, wfd = self.address.split(",", maxsplit=1) - self.reader_transport, self.reader, self.writer = \ - await _fds_to_asyncio(int(rfd), int(wfd), - asyncio.get_event_loop()) - - def close(self): - self.reader_transport.close() - self.writer.close() - - - class ChildComm: - def __init__(self, address): - rfd, wfd = address.split(",", maxsplit=1) - self.rf = open(int(rfd), "rb", 0) - self.wf = open(int(wfd), "wb", 0) - - def read(self, n): - return self.rf.read(n) - - def readline(self): - return self.rf.readline() - - def write(self, data): - return self.wf.write(data) - - def close(self): - self.rf.close() - self.wf.close() - - -else: # windows - import itertools - - - _pipe_count = itertools.count() - - - class AsyncioParentComm: - """Requires ProactorEventLoop""" - def __init__(self): - # We cannot use anonymous pipes on Windows, because we do not know - # in advance if the child process wants a handle open in overlapped - # mode or not. - self.address = "\\\\.\\pipe\\artiq-{}-{}".format(os.getpid(), - next(_pipe_count)) - self.ready = asyncio.Event() - self.write_buffer = b"" - - def get_address(self): - return self.address - - async def _autoclose(self): - await self.process.wait() - self.server[0].close() - del self.server - if self.ready.is_set(): - self.writer.close() - del self.reader - del self.writer - - async def create_subprocess(self, *args, **kwargs): - loop = asyncio.get_event_loop() - - def factory(): - reader = asyncio.StreamReader(loop=loop, limit=100*1024*1024) - protocol = asyncio.StreamReaderProtocol(reader, - self._child_connected, - loop=loop) - return protocol - self.server = await loop.start_serving_pipe( - factory, self.address) - - self.process = await asyncio.create_subprocess_exec( - *args, **kwargs) - asyncio.ensure_future(self._autoclose()) - - def _child_connected(self, reader, writer): - # HACK: We should shut down the pipe server here. - # However, self.server[0].close() is racy, and will cause an - # invalid handle error if loop.start_serving_pipe has not finished - # its work in the background. - # The bug manifests itself here frequently as the event loop is - # reopening the server as soon as a new client connects. - # There is still a race condition in the AsyncioParentComm - # creation/destruction, but it is unlikely to cause problems - # in most practical cases. - if self.ready.is_set(): - # A child already connected before. We should have shut down - # the server, but asyncio won't let us do that. - # Drop connections immediately instead. - writer.close() - return - self.reader = reader - self.writer = writer - if self.write_buffer: - self.writer.write(self.write_buffer) - self.write_buffer = b"" - self.ready.set() - - def write(self, data): - if self.ready.is_set(): - self.writer.write(data) - else: - self.write_buffer += data - - async def drain(self): - await self.ready.wait() - await self.writer.drain() - - async def readline(self): - await self.ready.wait() - return await self.reader.readline() - - async def read(self, n): - await self.ready.wait() - return await self.reader.read(n) - - - class AsyncioChildComm(_BaseIO): - """Requires ProactorEventLoop""" - def __init__(self, address): - self.address = address - - async def connect(self): - loop = asyncio.get_event_loop() - self.reader = asyncio.StreamReader(loop=loop, limit=100*1024*1024) - reader_protocol = asyncio.StreamReaderProtocol( - self.reader, loop=loop) - transport, _ = await loop.create_pipe_connection( - lambda: reader_protocol, self.address) - self.writer = asyncio.StreamWriter(transport, reader_protocol, - self.reader, loop) - - def close(self): - self.writer.close() - - - class ChildComm: - def __init__(self, address): - self.f = open(address, "a+b", 0) - - def read(self, n): - return self.f.read(n) - - def readline(self): - return self.f.readline() - - def write(self, data): - return self.f.write(data) - - def close(self): - self.f.close() diff --git a/artiq/protocols/pyon.py b/artiq/protocols/pyon.py deleted file mode 100644 index 2ad6755fd..000000000 --- a/artiq/protocols/pyon.py +++ /dev/null @@ -1,227 +0,0 @@ -""" -This module provides serialization and deserialization functions for Python -objects. Its main features are: - -* Human-readable format compatible with the Python syntax. -* Each object is serialized on a single line, with only ASCII characters. -* Supports all basic Python data structures: None, booleans, integers, - floats, complex numbers, strings, tuples, lists, dictionaries. -* Those data types are accurately reconstructed (unlike JSON where e.g. tuples - become lists, and dictionary keys are turned into strings). -* Supports Numpy arrays. - -The main rationale for this new custom serializer (instead of using JSON) is -that JSON does not support Numpy and more generally cannot be extended with -other data types while keeping a concise syntax. Here we can use the Python -function call syntax to express special data types. -""" - - -from operator import itemgetter -import base64 -from fractions import Fraction -from collections import OrderedDict -import os -import tempfile - -import numpy - - -_encode_map = { - type(None): "none", - bool: "bool", - int: "number", - float: "number", - complex: "number", - str: "str", - bytes: "bytes", - tuple: "tuple", - list: "list", - set: "set", - dict: "dict", - slice: "slice", - Fraction: "fraction", - OrderedDict: "ordereddict", - numpy.ndarray: "nparray" -} - -_numpy_scalar = { - "int8", "int16", "int32", "int64", - "uint8", "uint16", "uint32", "uint64", - "float16", "float32", "float64", - "complex64", "complex128", -} - - -for _t in _numpy_scalar: - _encode_map[getattr(numpy, _t)] = "npscalar" - - -_str_translation = { - ord("\""): "\\\"", - ord("\\"): "\\\\", - ord("\n"): "\\n", - ord("\r"): "\\r", -} - - -class _Encoder: - def __init__(self, pretty): - self.pretty = pretty - self.indent_level = 0 - - def indent(self): - return " "*self.indent_level - - def encode_none(self, x): - return "null" - - def encode_bool(self, x): - if x: - return "true" - else: - return "false" - - def encode_number(self, x): - return repr(x) - - def encode_str(self, x): - # Do not use repr() for JSON compatibility. - return "\"" + x.translate(_str_translation) + "\"" - - def encode_bytes(self, x): - return repr(x) - - def encode_tuple(self, x): - if len(x) == 1: - return "(" + self.encode(x[0]) + ", )" - else: - r = "(" - r += ", ".join([self.encode(item) for item in x]) - r += ")" - return r - - def encode_list(self, x): - r = "[" - r += ", ".join([self.encode(item) for item in x]) - r += "]" - return r - - def encode_set(self, x): - r = "{" - r += ", ".join([self.encode(item) for item in x]) - r += "}" - return r - - def encode_dict(self, x): - if self.pretty and all(k.__class__ == str for k in x.keys()): - items = lambda: sorted(x.items(), key=itemgetter(0)) - else: - items = x.items - - r = "{" - if not self.pretty or len(x) < 2: - r += ", ".join([self.encode(k) + ": " + self.encode(v) - for k, v in items()]) - else: - self.indent_level += 1 - r += "\n" - first = True - for k, v in items(): - if not first: - r += ",\n" - first = False - r += self.indent() + self.encode(k) + ": " + self.encode(v) - r += "\n" # no ',' - self.indent_level -= 1 - r += self.indent() - r += "}" - return r - - def encode_slice(self, x): - return repr(x) - - def encode_fraction(self, x): - return "Fraction({}, {})".format(self.encode(x.numerator), - self.encode(x.denominator)) - - def encode_ordereddict(self, x): - return "OrderedDict(" + self.encode(list(x.items())) + ")" - - def encode_nparray(self, x): - r = "nparray(" - r += self.encode(x.shape) + ", " - r += self.encode(x.dtype.str) + ", " - r += self.encode(base64.b64encode(x.data)) - r += ")" - return r - - def encode_npscalar(self, x): - r = "npscalar(" - r += self.encode(x.dtype.str) + ", " - r += self.encode(base64.b64encode(x.data)) - r += ")" - return r - - def encode(self, x): - ty = _encode_map.get(type(x), None) - if ty is None: - raise TypeError("`{!r}` ({}) is not PYON serializable" - .format(x, type(x))) - return getattr(self, "encode_" + ty)(x) - - -def encode(x, pretty=False): - """Serializes a Python object and returns the corresponding string in - Python syntax.""" - return _Encoder(pretty).encode(x) - - -def _nparray(shape, dtype, data): - a = numpy.frombuffer(base64.b64decode(data), dtype=dtype) - a = a.copy() - return a.reshape(shape) - - -def _npscalar(ty, data): - return numpy.frombuffer(base64.b64decode(data), dtype=ty)[0] - - -_eval_dict = { - "__builtins__": {}, - - "null": None, - "false": False, - "true": True, - "inf": numpy.inf, - "slice": slice, - "nan": numpy.nan, - - "Fraction": Fraction, - "OrderedDict": OrderedDict, - "nparray": _nparray, - "npscalar": _npscalar -} - - -def decode(s): - """Parses a string in the Python syntax, reconstructs the corresponding - object, and returns it.""" - return eval(s, _eval_dict, {}) - - -def store_file(filename, x): - """Encodes a Python object and writes it to the specified file.""" - contents = encode(x, True) - directory = os.path.abspath(os.path.dirname(filename)) - with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as f: - f.write(contents) - f.write("\n") - tmpname = f.name - os.replace(tmpname, filename) - - -def load_file(filename): - """Parses the specified file and returns the decoded Python object.""" - with open(filename, "r") as f: - return decode(f.read()) diff --git a/artiq/protocols/remote_exec.py b/artiq/protocols/remote_exec.py deleted file mode 100644 index ae9870c30..000000000 --- a/artiq/protocols/remote_exec.py +++ /dev/null @@ -1,116 +0,0 @@ -""" -This module provides facilities for experiment to execute code remotely on -controllers. - -The remotely executed code has direct access to the driver, so it can transfer -large amounts of data with it, and only exchange higher-level, processed data -with the experiment (and over the network). - -Controllers with support for remote execution contain an additional target -that gives RPC access to instances of :class:`.RemoteExecServer`. One such instance -is created per client (experiment) connection and manages one Python namespace -in which the experiment can execute arbitrary code by calling the methods of -:class:`.RemoteExecServer`. - -The namespaces are initialized with the following global values: - - * ``controller_driver`` - the driver instance of the controller. - * ``controller_initial_namespace`` - a controller-wide dictionary copied - when initializing a new namespace. - * all values from ``controller_initial_namespace``. - -Access to a controller with support for remote execution is done through an -additional device database entry of this form: :: - - "$REXEC_DEVICE_NAME": { - "type": "controller_aux_target", - "controller": "$CONTROLLER_DEVICE_NAME", - "target_name": "$TARGET_NAME_FOR_REXEC" - } - -Specifying ``target_name`` is mandatory in all device database entries for all -controllers with remote execution support. - -""" - -from functools import partial -import inspect - -from artiq.protocols.pc_rpc import simple_server_loop - - -__all__ = ["RemoteExecServer", "simple_rexec_server_loop", "connect_global_rpc"] - - -class RemoteExecServer: - """RPC target created at each connection by controllers with remote - execution support. Manages one Python namespace and provides RPCs - for code execution. - """ - def __init__(self, initial_namespace): - self.namespace = dict(initial_namespace) - # The module actually has to exist, otherwise it breaks e.g. Numba - self.namespace["__name__"] = "artiq.protocols.remote_exec" - - def add_code(self, code): - """Executes the specified code in the namespace. - - :param code: a string containing valid Python code - """ - exec(code, self.namespace) - - def call(self, function, *args, **kwargs): - """Calls a function in the namespace, passing it positional and - keyword arguments, and returns its value. - - :param function: a string containing the name of the function to - execute. - """ - return self.namespace[function](*args, **kwargs) - - -def simple_rexec_server_loop(target_name, target, host, port, - description=None): - """Runs a server with remote execution support, until an exception is - raised (e.g. the user hits Ctrl-C) or termination is requested by a client. - """ - initial_namespace = {"controller_driver": target} - initial_namespace["controller_initial_namespace"] = initial_namespace - targets = { - target_name: target, - target_name + "_rexec": lambda: RemoteExecServer(initial_namespace) - } - simple_server_loop(targets, host, port, description) - - -def connect_global_rpc(controller_rexec, host=None, port=3251, - target="master_dataset_db", name="dataset_db"): - """Creates a global RPC client in a controller that is used across - all remote execution connections. With the default parameters, it connects - to the dataset database (i.e. gives direct dataset access to experiment - code remotely executing in controllers). - - If a global object with the same name already exists, the function does - nothing. - - :param controller_rexec: the RPC client connected to the controller's - remote execution interface. - :param host: the host name to connect the RPC client to. Default is the - local end of the remote execution interface (typically, the ARTIQ - master). - :param port: TCP port to connect the RPC client to. - :param target: name of the RPC target. - :param name: name of the object to insert into the global namespace. - """ - if host is None: - host = controller_rexec.get_local_host() - code = """ -if "{name}" not in controller_initial_namespace: - import atexit - from artiq.protocols.pc_rpc import Client - - {name} = Client("{host}", {port}, "{target}") - atexit.register({name}.close_rpc) - controller_initial_namespace["{name}"] = {name} -""".format(host=host, port=port, target=target, name=name) - controller_rexec.add_code(code) diff --git a/artiq/protocols/sync_struct.py b/artiq/protocols/sync_struct.py deleted file mode 100644 index 02db70ffc..000000000 --- a/artiq/protocols/sync_struct.py +++ /dev/null @@ -1,328 +0,0 @@ -"""This module helps synchronizing a mutable Python structure owned and -modified by one process (the *publisher*) with copies of it (the -*subscribers*) in different processes and possibly different machines. - -Synchronization is achieved by sending a full copy of the structure to each -subscriber upon connection (*initialization*), followed by dictionaries -describing each modification made to the structure (*mods*, see -:class:`ModAction`). - -Structures must be PYON serializable and contain only lists, dicts, and -immutable types. Lists and dicts can be nested arbitrarily. -""" - -import asyncio -from enum import Enum, unique -from operator import getitem -from functools import partial - -from artiq.monkey_patches import * -from artiq.protocols import pyon -from artiq.protocols.asyncio_server import AsyncioServer - - -_protocol_banner = b"ARTIQ sync_struct\n" - - -@unique -class ModAction(Enum): - """Describes the type of incremental modification. - - `Mods` are represented by a dictionary ``m``. ``m["action"]`` describes - the type of modification, as per this enum, serialized as a string if - required. - - The path (member field) the change applies to is given in - ``m["path"]`` as a list; elements give successive levels of indexing. - (There is no ``path`` on initial initialization.) - - Details on the modification are stored in additional data fields specific - to each type. - - For example, this represents appending the value ``42`` to an array - ``data.counts[0]``: :: - - { - "action": "append", - "path": ["data", "counts", 0], - "x": 42 - } - """ - - #: A full copy of the data is sent in `struct`; no `path` given. - init = "init" - - #: Appends `x` to target list. - append = "append" - - #: Inserts `x` into target list at index `i`. - insert = "insert" - - #: Removes index `i` from target list. - pop = "pop" - - #: Sets target's `key` to `value`. - setitem = "setitem" - - #: Removes target's `key`. - delitem = "delitem" - - -# Handlers to apply a given mod to a target dict, invoked with (target, mod). -_mod_appliers = { - ModAction.append: lambda t, m: t.append(m["x"]), - ModAction.insert: lambda t, m: t.insert(m["i"], m["x"]), - ModAction.pop: lambda t, m: t.pop(m["i"]), - ModAction.setitem: lambda t, m: t.__setitem__(m["key"], m["value"]), - ModAction.delitem: lambda t, m: t.__delitem__(m["key"]) -} - - -def process_mod(target, mod): - """Apply a *mod* to the target, mutating it.""" - for key in mod["path"]: - target = getitem(target, key) - - _mod_appliers[ModAction(mod["action"])](target, mod) - - -class Subscriber: - """An asyncio-based client to connect to a ``Publisher``. - - :param notifier_name: Name of the notifier to subscribe to. - :param target_builder: A function called during initialization that takes - the object received from the publisher and returns the corresponding - local structure to use. Can be identity. - :param notify_cb: An optional function called every time a mod is received - from the publisher. The mod is passed as parameter. The function is - called after the mod has been processed. - A list of functions may also be used, and they will be called in turn. - :param disconnect_cb: An optional function called when disconnection - happens from external causes (i.e. not when ``close`` is called). - """ - def __init__(self, notifier_name, target_builder, notify_cb=None, - disconnect_cb=None): - self.notifier_name = notifier_name - self.target_builder = target_builder - if notify_cb is None: - notify_cb = [] - if not isinstance(notify_cb, list): - notify_cb = [notify_cb] - self.notify_cbs = notify_cb - self.disconnect_cb = disconnect_cb - - async def connect(self, host, port, before_receive_cb=None): - self.reader, self.writer = \ - await asyncio.open_connection(host, port, limit=100*1024*1024) - try: - if before_receive_cb is not None: - before_receive_cb() - self.writer.write(_protocol_banner) - self.writer.write((self.notifier_name + "\n").encode()) - self.receive_task = asyncio.ensure_future(self._receive_cr()) - except: - self.writer.close() - del self.reader - del self.writer - raise - - async def close(self): - self.disconnect_cb = None - try: - self.receive_task.cancel() - try: - await asyncio.wait_for(self.receive_task, None) - except asyncio.CancelledError: - pass - finally: - self.writer.close() - del self.reader - del self.writer - - async def _receive_cr(self): - try: - target = None - while True: - line = await self.reader.readline() - if not line: - return - mod = pyon.decode(line.decode()) - - if mod["action"] == "init": - target = self.target_builder(mod["struct"]) - else: - process_mod(target, mod) - - for notify_cb in self.notify_cbs: - notify_cb(mod) - except ConnectionError: - pass - finally: - if self.disconnect_cb is not None: - self.disconnect_cb() - - -class Notifier: - """Encapsulates a structure whose changes need to be published. - - All mutations to the structure must be made through the :class:`.Notifier`. - The original structure must only be accessed for reads. - - In addition to the list methods below, the :class:`.Notifier` supports the - index syntax for modification and deletion of elements. Modification of - nested structures can be also done using the index syntax, for example: - - >>> n = Notifier([]) - >>> n.append([]) - >>> n[0].append(42) - >>> n.raw_view - [[42]] - - This class does not perform any network I/O and is meant to be used with - e.g. the :class:`.Publisher` for this purpose. Only one publisher at most - can be associated with a :class:`.Notifier`. - - :param backing_struct: Structure to encapsulate. - """ - def __init__(self, backing_struct, root=None, path=[]): - #: The raw data encapsulated (read-only!). - self.raw_view = backing_struct - - if root is None: - self.root = self - self.publish = None - else: - self.root = root - self._backing_struct = backing_struct - self._path = path - - # Backing struct modification methods. - # All modifications must go through them! - - def append(self, x): - """Append to a list.""" - self._backing_struct.append(x) - if self.root.publish is not None: - self.root.publish({"action": ModAction.append.value, - "path": self._path, - "x": x}) - - def insert(self, i, x): - """Insert an element into a list.""" - self._backing_struct.insert(i, x) - if self.root.publish is not None: - self.root.publish({"action": ModAction.insert.value, - "path": self._path, - "i": i, "x": x}) - - def pop(self, i=-1): - """Pop an element from a list. The returned element is not - encapsulated in a :class:`.Notifier` and its mutations are no longer - tracked.""" - r = self._backing_struct.pop(i) - if self.root.publish is not None: - self.root.publish({"action": ModAction.pop.value, - "path": self._path, - "i": i}) - return r - - def __setitem__(self, key, value): - self._backing_struct.__setitem__(key, value) - if self.root.publish is not None: - self.root.publish({"action": ModAction.setitem.value, - "path": self._path, - "key": key, - "value": value}) - - def __delitem__(self, key): - self._backing_struct.__delitem__(key) - if self.root.publish is not None: - self.root.publish({"action": ModAction.delitem.value, - "path": self._path, - "key": key}) - - def __getitem__(self, key): - item = getitem(self._backing_struct, key) - return Notifier(item, self.root, self._path + [key]) - - -def update_from_dict(target, source): - """Updates notifier contents from given source dictionary. - - Only the necessary changes are performed; unchanged fields are not written. - (Currently, modifications are only performed at the top level. That is, - whenever there is a change to a child array/struct the entire member is - updated instead of choosing a more optimal set of mods.) - """ - curr = target.raw_view - - # Delete removed keys. - for k in list(curr.keys()): - if k not in source: - del target[k] - - # Insert/update changed data. - for k in source.keys(): - if k not in curr or curr[k] != source[k]: - target[k] = source[k] - - -class Publisher(AsyncioServer): - """A network server that publish changes to structures encapsulated in - a :class:`.Notifier`. - - :param notifiers: A dictionary containing the notifiers to associate with - the :class:`.Publisher`. The keys of the dictionary are the names of - the notifiers to be used with :class:`.Subscriber`. - """ - def __init__(self, notifiers): - AsyncioServer.__init__(self) - self.notifiers = notifiers - self._recipients = {k: set() for k in notifiers.keys()} - self._notifier_names = {id(v): k for k, v in notifiers.items()} - - for notifier in notifiers.values(): - notifier.publish = partial(self.publish, notifier) - - async def _handle_connection_cr(self, reader, writer): - try: - line = await reader.readline() - if line != _protocol_banner: - return - - line = await reader.readline() - if not line: - return - notifier_name = line.decode()[:-1] - - try: - notifier = self.notifiers[notifier_name] - except KeyError: - return - - obj = {"action": ModAction.init.value, "struct": notifier.raw_view} - line = pyon.encode(obj) + "\n" - writer.write(line.encode()) - - queue = asyncio.Queue() - self._recipients[notifier_name].add(queue) - try: - while True: - line = await queue.get() - writer.write(line) - # raise exception on connection error - await writer.drain() - finally: - self._recipients[notifier_name].remove(queue) - except (ConnectionError, TimeoutError): - # subscribers disconnecting are a normal occurrence - pass - finally: - writer.close() - - def publish(self, notifier, mod): - line = pyon.encode(mod) + "\n" - line = line.encode() - notifier_name = self._notifier_names[id(notifier)] - for recipient in self._recipients[notifier_name]: - recipient.put_nowait(line) diff --git a/artiq/test/hardware_testbench.py b/artiq/test/hardware_testbench.py index d13a2f68e..987a1cf6b 100644 --- a/artiq/test/hardware_testbench.py +++ b/artiq/test/hardware_testbench.py @@ -5,96 +5,17 @@ import os import sys import unittest import logging -import subprocess -import shlex -import time -import socket from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.worker_db import DeviceManager, DatasetManager, DeviceError from artiq.coredevice.core import CompileError from artiq.frontend.artiq_run import DummyScheduler -from artiq.protocols.pc_rpc import AutoTarget, Client artiq_root = os.getenv("ARTIQ_ROOT") logger = logging.getLogger(__name__) -class GenericControllerCase(unittest.TestCase): - def get_device_db(self): - raise NotImplementedError - - def setUp(self): - self.device_db = self.get_device_db() - self.device_mgr = DeviceManager(self.device_db) - self.controllers = {} - - def tearDown(self): - self.device_mgr.close_devices() - for name in list(self.controllers): - self.stop_controller(name) - - def start_controller(self, name, sleep=1): - if name in self.controllers: - raise ValueError("controller `{}` already started".format(name)) - try: - entry = self.device_db.get(name) - except KeyError: - raise unittest.SkipTest( - "controller `{}` not found".format(name)) - entry["command"] = entry["command"].format( - name=name, bind=entry["host"], port=entry["port"]) - proc = subprocess.Popen(shlex.split(entry["command"])) - self.controllers[name] = entry, proc - time.sleep(sleep) - - def stop_controller(self, name, default_timeout=1): - desc, proc = self.controllers[name] - t = desc.get("term_timeout", default_timeout) - target_name = desc.get("target_name", None) - if target_name is None: - target_name = AutoTarget - try: - try: - client = Client(desc["host"], desc["port"], target_name, t) - try: - client.terminate() - finally: - client.close_rpc() - proc.wait(t) - return - except (socket.timeout, subprocess.TimeoutExpired): - logger.warning("Controller %s failed to exit on request", name) - try: - proc.terminate() - except ProcessLookupError: - pass - try: - proc.wait(t) - return - except subprocess.TimeoutExpired: - logger.warning("Controller %s failed to exit on terminate", - name) - try: - proc.kill() - except ProcessLookupError: - pass - try: - proc.wait(t) - return - except subprocess.TimeoutExpired: - logger.warning("Controller %s failed to die on kill", name) - finally: - del self.controllers[name] - - -@unittest.skipUnless(artiq_root, "no ARTIQ_ROOT") -class ControllerCase(GenericControllerCase): - def get_device_db(self): - return DeviceDB(os.path.join(artiq_root, "device_db.py")) - - @unittest.skipUnless(artiq_root, "no ARTIQ_ROOT") class ExperimentCase(unittest.TestCase): def setUp(self): diff --git a/artiq/test/test_ctlmgr.py b/artiq/test/test_ctlmgr.py index bc9031bce..728353033 100644 --- a/artiq/test/test_ctlmgr.py +++ b/artiq/test/test_ctlmgr.py @@ -4,10 +4,12 @@ import unittest import logging import asyncio +from sipyco.pc_rpc import AsyncioClient + from artiq.master.ctlmgr import Controllers -from artiq.protocols.pc_rpc import AsyncioClient from artiq.tools import expect_no_log_messages + logger = logging.getLogger(__name__) diff --git a/artiq/test/test_datasets.py b/artiq/test/test_datasets.py index 4ea48cc6e..871568a2a 100644 --- a/artiq/test/test_datasets.py +++ b/artiq/test/test_datasets.py @@ -3,9 +3,10 @@ import copy import unittest +from sipyco.sync_struct import process_mod + from artiq.experiment import EnvExperiment from artiq.master.worker_db import DatasetManager -from artiq.protocols.sync_struct import process_mod class MockDatasetDB: diff --git a/artiq/test/test_fire_and_forget.py b/artiq/test/test_fire_and_forget.py new file mode 100644 index 000000000..fb8e67555 --- /dev/null +++ b/artiq/test/test_fire_and_forget.py @@ -0,0 +1,17 @@ +import unittest + +from artiq.protocols import fire_and_forget + + +class FireAndForgetCase(unittest.TestCase): + def _set_ok(self): + self.ok = True + + def test_fire_and_forget(self): + self.ok = False + p = fire_and_forget.FFProxy(self) + p._set_ok() + with self.assertRaises(AttributeError): + p.non_existing_method + p.ff_join() + self.assertTrue(self.ok) diff --git a/artiq/test/test_frontends.py b/artiq/test/test_frontends.py index 243f42ac3..0d88fb328 100644 --- a/artiq/test/test_frontends.py +++ b/artiq/test/test_frontends.py @@ -15,7 +15,7 @@ class TestFrontends(unittest.TestCase): "artiq": [ "client", "compile", "coreanalyzer", "coremgmt", "ctlmgr", "netboot", "flash", "influxdb", "master", "mkfs", "route", - "rpctool", "rtiomon", "run", "session" + "rtiomon", "run", "session" ] } diff --git a/artiq/test/test_pc_rpc.py b/artiq/test/test_pc_rpc.py deleted file mode 100644 index 8b095fe9e..000000000 --- a/artiq/test/test_pc_rpc.py +++ /dev/null @@ -1,166 +0,0 @@ -import asyncio -import inspect -import subprocess -import sys -import time -import unittest - -import numpy as np - -from artiq.protocols import fire_and_forget, pc_rpc, pyon - -test_address = "::1" -test_port = 7777 -test_object = [5, 2.1, None, True, False, - {"a": 5, 2: np.linspace(0, 10, 1)}, - (4, 5), (10,), "ab\nx\"'"] - - -class RPCCase(unittest.TestCase): - def _run_server_and_test(self, test, *args): - # running this file outside of unittest starts the echo server - with subprocess.Popen([sys.executable, - sys.modules[__name__].__file__]) as proc: - try: - test(*args) - finally: - try: - proc.wait(timeout=1) - except subprocess.TimeoutExpired: - proc.kill() - raise - - def _blocking_echo(self, target): - for attempt in range(100): - time.sleep(.2) - try: - remote = pc_rpc.Client(test_address, test_port, - target) - except ConnectionRefusedError: - pass - else: - break - try: - test_object_back = remote.echo(test_object) - self.assertEqual(test_object, test_object_back) - test_object_back = remote.async_echo(test_object) - self.assertEqual(test_object, test_object_back) - with self.assertRaises(AttributeError): - remote.non_existing_method - remote.terminate() - finally: - remote.close_rpc() - - def test_blocking_echo(self): - self._run_server_and_test(self._blocking_echo, "test") - - def test_blocking_echo_autotarget(self): - self._run_server_and_test(self._blocking_echo, pc_rpc.AutoTarget) - - async def _asyncio_echo(self, target): - remote = pc_rpc.AsyncioClient() - for attempt in range(100): - await asyncio.sleep(.2) - try: - await remote.connect_rpc(test_address, test_port, target) - except ConnectionRefusedError: - pass - else: - break - try: - test_object_back = await remote.echo(test_object) - self.assertEqual(test_object, test_object_back) - test_object_back = await remote.async_echo(test_object) - self.assertEqual(test_object, test_object_back) - with self.assertRaises(AttributeError): - await remote.non_existing_method - await remote.terminate() - finally: - remote.close_rpc() - - def _loop_asyncio_echo(self, target): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self._asyncio_echo(target)) - finally: - loop.close() - - def test_asyncio_echo(self): - self._run_server_and_test(self._loop_asyncio_echo, "test") - - def test_asyncio_echo_autotarget(self): - self._run_server_and_test(self._loop_asyncio_echo, pc_rpc.AutoTarget) - - def test_rpc_encode_function(self): - """Test that `pc_rpc` can encode a function properly. - - Used in `get_rpc_method_list` part of - :meth:`artiq.protocols.pc_rpc.Server._process_action` - """ - - def _annotated_function( - arg1: str, arg2: np.ndarray = np.array([1,]) - ) -> np.ndarray: - """Sample docstring.""" - return arg1 - - argspec_documented, docstring = pc_rpc.Server._document_function( - _annotated_function - ) - self.assertEqual(docstring, "Sample docstring.") - - # purposefully ignore how argspec["annotations"] is treated. - # allows option to change PYON later to encode annotations. - argspec_master = dict(inspect.getfullargspec(_annotated_function)._asdict()) - argspec_without_annotation = argspec_master.copy() - del argspec_without_annotation["annotations"] - # check if all items (excluding annotations) are same in both dictionaries - self.assertLessEqual( - argspec_without_annotation.items(), argspec_documented.items() - ) - self.assertDictEqual( - argspec_documented, pyon.decode(pyon.encode(argspec_documented)) - ) - - -class FireAndForgetCase(unittest.TestCase): - def _set_ok(self): - self.ok = True - - def test_fire_and_forget(self): - self.ok = False - p = fire_and_forget.FFProxy(self) - p._set_ok() - with self.assertRaises(AttributeError): - p.non_existing_method - p.ff_join() - self.assertTrue(self.ok) - - -class Echo: - def echo(self, x): - return x - - async def async_echo(self, x): - await asyncio.sleep(0.01) - return x - - -def run_server(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - echo = Echo() - server = pc_rpc.Server({"test": echo}, builtin_terminate=True) - loop.run_until_complete(server.start(test_address, test_port)) - try: - loop.run_until_complete(server.wait_terminate()) - finally: - loop.run_until_complete(server.stop()) - finally: - loop.close() - - -if __name__ == "__main__": - run_server() diff --git a/artiq/test/test_pipe_ipc.py b/artiq/test/test_pipe_ipc.py deleted file mode 100644 index b066d4276..000000000 --- a/artiq/test/test_pipe_ipc.py +++ /dev/null @@ -1,80 +0,0 @@ -import unittest -import sys -import asyncio -import os - -from artiq.protocols import pipe_ipc - - -class IPCCase(unittest.TestCase): - def setUp(self): - if os.name == "nt": - self.loop = asyncio.ProactorEventLoop() - else: - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - def tearDown(self): - self.loop.close() - - async def _coro_test(self, child_asyncio): - ipc = pipe_ipc.AsyncioParentComm() - await ipc.create_subprocess(sys.executable, - sys.modules[__name__].__file__, - str(child_asyncio), - ipc.get_address()) - for i in range(10): - ipc.write("{}\n".format(i).encode()) - await ipc.drain() - s = (await ipc.readline()).decode() - self.assertEqual(int(s), i+1) - ipc.write(b"-1\n") - await ipc.process.wait() - - def test_blocking(self): - self.loop.run_until_complete(self._coro_test(False)) - - def test_asyncio(self): - self.loop.run_until_complete(self._coro_test(True)) - - -def run_child_blocking(): - child_comm = pipe_ipc.ChildComm(sys.argv[2]) - while True: - x = int(child_comm.readline().decode()) - if x < 0: - break - child_comm.write((str(x+1) + "\n").encode()) - child_comm.close() - - -async def coro_child(): - child_comm = pipe_ipc.AsyncioChildComm(sys.argv[2]) - await child_comm.connect() - while True: - x = int((await child_comm.readline()).decode()) - if x < 0: - break - child_comm.write((str(x+1) + "\n").encode()) - await child_comm.drain() - child_comm.close() - - -def run_child_asyncio(): - if os.name == "nt": - loop = asyncio.ProactorEventLoop() - asyncio.set_event_loop(loop) - else: - loop = asyncio.get_event_loop() - loop.run_until_complete(coro_child()) - loop.close() - - -def run_child(): - if sys.argv[1] == "True": - run_child_asyncio() - else: - run_child_blocking() - -if __name__ == "__main__": - run_child() diff --git a/artiq/test/test_rpctool.py b/artiq/test/test_rpctool.py deleted file mode 100644 index fb74395bf..000000000 --- a/artiq/test/test_rpctool.py +++ /dev/null @@ -1,39 +0,0 @@ -import os -import sys -import asyncio -import unittest - -from artiq.protocols.pc_rpc import Server - - -class Target: - def output_value(self): - return 4125380 - - -class TestRPCTool(unittest.TestCase): - async def check_value(self): - proc = await asyncio.create_subprocess_exec( - sys.executable, "-m", "artiq.frontend.artiq_rpctool", "::1", "7777", "call", "output_value", - stdout = asyncio.subprocess.PIPE) - (value, err) = await proc.communicate() - self.assertEqual(value.decode('ascii').rstrip(), '4125380') - await proc.wait() - - async def do_test(self): - server = Server({"target": Target()}) - await server.start("::1", 7777) - await self.check_value() - await server.stop() - - def test_rpc(self): - if os.name == "nt": - loop = asyncio.ProactorEventLoop() - else: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self.do_test()) - finally: - loop.close() - diff --git a/artiq/test/test_serialization.py b/artiq/test/test_serialization.py deleted file mode 100644 index 803e42cde..000000000 --- a/artiq/test/test_serialization.py +++ /dev/null @@ -1,56 +0,0 @@ -import unittest -import json -from fractions import Fraction - -import numpy as np - -from artiq.protocols import pyon - - -_pyon_test_object = { - (1, 2): [(3, 4.2), (2, )], - "slice": slice(3), - Fraction(3, 4): np.linspace(5, 10, 1), - "set": {"testing", "sets"}, - "a": np.int8(9), "b": np.int16(-98), "c": np.int32(42), "d": np.int64(-5), - "e": np.uint8(8), "f": np.uint16(5), "g": np.uint32(4), "h": np.uint64(9), - "x": np.float16(9.0), "y": np.float32(9.0), "z": np.float64(9.0), - 1j: 1-9j, - "q": np.complex128(1j), -} - - -class PYON(unittest.TestCase): - def test_encdec(self): - for enc in pyon.encode, lambda x: pyon.encode(x, True): - with self.subTest(enc=enc): - self.assertEqual(pyon.decode(enc(_pyon_test_object)), - _pyon_test_object) - # NaNs don't compare equal, so test separately. - assert np.isnan(pyon.decode(enc(np.nan))) - - def test_encdec_array(self): - orig = {k: (np.array(v), np.array([v])) - for k, v in _pyon_test_object.items() - if np.isscalar(v)} - for enc in pyon.encode, lambda x: pyon.encode(x, True): - result = pyon.decode(enc(orig)) - for k in orig: - with self.subTest(enc=enc, k=k, v=orig[k]): - np.testing.assert_equal(result[k], orig[k]) - - -_json_test_object = { - "a": "b", - "x": [1, 2, {}], - "foo\nbaz\\qux\"\r2": ["bar", 1.2, {"x": "y"}], - "bar": [True, False, None] -} - - -class JSONPYON(unittest.TestCase): - def test_encdec(self): - for enc in pyon.encode, lambda x: pyon.encode(x, True), json.dumps: - for dec in pyon.decode, json.loads: - self.assertEqual(dec(enc(_json_test_object)), - _json_test_object) diff --git a/artiq/test/test_sync_struct.py b/artiq/test/test_sync_struct.py deleted file mode 100644 index 2352508ac..000000000 --- a/artiq/test/test_sync_struct.py +++ /dev/null @@ -1,72 +0,0 @@ -import unittest -import asyncio -import numpy as np - -from artiq.protocols import sync_struct - -test_address = "::1" -test_port = 7777 - - -def write_test_data(test_dict): - test_values = [5, 2.1, None, True, False, - {"a": 5, 2: np.linspace(0, 10, 1)}, - (4, 5), (10,), "ab\nx\"'"] - for i in range(10): - test_dict[str(i)] = i - for key, value in enumerate(test_values): - test_dict[key] = value - test_dict[1.5] = 1.5 - test_dict["list"] = [] - test_dict["list"][:] = [34, 31] - test_dict["list"].append(42) - test_dict["list"].insert(1, 1) - test_dict[100] = 0 - test_dict[100] = 1 - test_dict[101] = 1 - test_dict.pop(101) - test_dict[102] = 1 - del test_dict[102] - test_dict["array"] = np.zeros(1) - test_dict["array"][0] = 10 - test_dict["finished"] = True - - -class SyncStructCase(unittest.TestCase): - def init_test_dict(self, init): - self.received_dict = init - return init - - def notify(self, mod): - if ((mod["action"] == "init" and "finished" in mod["struct"]) - or (mod["action"] == "setitem" and mod["key"] == "finished")): - self.receiving_done.set() - - def setUp(self): - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - - async def _do_test_recv(self): - self.receiving_done = asyncio.Event() - - test_dict = sync_struct.Notifier(dict()) - publisher = sync_struct.Publisher({"test": test_dict}) - await publisher.start(test_address, test_port) - - subscriber = sync_struct.Subscriber("test", self.init_test_dict, - self.notify) - await subscriber.connect(test_address, test_port) - - write_test_data(test_dict) - await self.receiving_done.wait() - - await subscriber.close() - await publisher.stop() - - self.assertEqual(self.received_dict, test_dict.raw_view) - - def test_recv(self): - self.loop.run_until_complete(self._do_test_recv()) - - def tearDown(self): - self.loop.close() diff --git a/artiq/tools.py b/artiq/tools.py index d7d20bae1..afeef8bc8 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -10,17 +10,18 @@ import sys import numpy as np +from sipyco import pyon + from artiq import __version__ as artiq_version from artiq.appdirs import user_config_dir from artiq.language.environment import is_experiment -from artiq.protocols import pyon + __all__ = ["parse_arguments", "elide", "short_format", "file_import", - "get_experiment", "add_common_args", "simple_network_args", + "get_experiment", "UnexpectedLogMessageError", "expect_no_log_messages", - "multiline_log_config", "init_logger", "bind_address_from_args", "atexit_register_coroutine", "exc_to_warning", - "asyncio_wait_or_cancel", "TaskObject", "Condition", + "asyncio_wait_or_cancel", "Condition", "get_windows_drives", "get_user_config_dir"] @@ -106,44 +107,6 @@ def get_experiment(module, class_name=None): return exps[0][1] -def add_common_args(parser): - """Add common utility arguments to the cmd parser. - - Arguments added: - * `-v`/`-q`: increase or decrease the default logging levels. - Repeat for higher levels. - * `--version`: print the ARTIQ version - """ - group = parser.add_argument_group("common") - group.add_argument("-v", "--verbose", default=0, action="count", - help="increase logging level") - group.add_argument("-q", "--quiet", default=0, action="count", - help="decrease logging level") - group.add_argument("--version", action="version", - version="ARTIQ v{}".format(artiq_version), - help="print the ARTIQ version number") - - -def simple_network_args(parser, default_port): - group = parser.add_argument_group("network server") - group.add_argument( - "--bind", default=[], action="append", - help="additional hostname or IP address to bind to; " - "use '*' to bind to all interfaces (default: %(default)s)") - group.add_argument( - "--no-localhost-bind", default=False, action="store_true", - help="do not implicitly also bind to localhost addresses") - if isinstance(default_port, int): - group.add_argument("-p", "--port", default=default_port, type=int, - help="TCP port to listen on (default: %(default)d)") - else: - for name, purpose, default in default_port: - h = ("TCP port for {} connections (default: {})" - .format(purpose, default)) - group.add_argument("--port-" + name, default=default, type=int, - help=h) - - class UnexpectedLogMessageError(Exception): pass @@ -174,42 +137,6 @@ def expect_no_log_messages(level, logger=None): logger.removeHandler(handler) -class MultilineFormatter(logging.Formatter): - def __init__(self): - logging.Formatter.__init__( - self, "%(levelname)s:%(name)s:%(message)s") - - def format(self, record): - r = logging.Formatter.format(self, record) - linebreaks = r.count("\n") - if linebreaks: - i = r.index(":") - r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:] - return r - - -def multiline_log_config(level): - root_logger = logging.getLogger() - root_logger.setLevel(level) - handler = logging.StreamHandler() - handler.setFormatter(MultilineFormatter()) - root_logger.addHandler(handler) - - -def init_logger(args): - multiline_log_config( - level=logging.WARNING + args.quiet*10 - args.verbose*10) - - -def bind_address_from_args(args): - if "*" in args.bind: - return None - if args.no_localhost_bind: - return args.bind - else: - return ["127.0.0.1", "::1"] + args.bind - - def atexit_register_coroutine(coroutine, loop=None): if loop is None: loop = asyncio.get_event_loop() @@ -238,31 +165,6 @@ async def asyncio_wait_or_cancel(fs, **kwargs): return fs -class TaskObject: - def start(self): - async def log_exceptions(awaitable): - try: - return await awaitable() - except asyncio.CancelledError: - raise - except Exception: - logger.error("Unhandled exception in TaskObject task body", exc_info=True) - raise - - self.task = asyncio.ensure_future(log_exceptions(self._do)) - - async def stop(self): - self.task.cancel() - try: - await asyncio.wait_for(self.task, None) - except asyncio.CancelledError: - pass - del self.task - - async def _do(self): - raise NotImplementedError - - class Condition: def __init__(self, *, loop=None): if loop is not None: diff --git a/doc/manual/developing_a_ndsp.rst b/doc/manual/developing_a_ndsp.rst index 1d786ca97..18e3a995a 100644 --- a/doc/manual/developing_a_ndsp.rst +++ b/doc/manual/developing_a_ndsp.rst @@ -1,7 +1,7 @@ Developing a Network Device Support Package (NDSP) ================================================== -Most ARTIQ devices are interfaced through "controllers" that expose RPC interfaces to the network (based on :class:`artiq.protocols.pc_rpc`). The master never does direct I/O to the devices, but issues RPCs to the controllers when needed. As opposed to running everything on the master, this architecture has those main advantages: +Most ARTIQ devices are interfaced through "controllers" that expose RPC interfaces to the network (based on SiPyCo). The master never does direct I/O to the devices, but issues RPCs to the controllers when needed. As opposed to running everything on the master, this architecture has those main advantages: * Each driver can be run on a different machine, which alleviates cabling issues and OS compatibility problems. * Reduces the impact of driver crashes. @@ -15,13 +15,13 @@ A network device support package (NDSP) is composed of several parts: 1. The `driver`, which contains the Python API functions to be called over the network, and performs the I/O to the device. The top-level module of the driver is called ``artiq.devices.XXX.driver``. 2. The `controller`, which instantiates, initializes and terminates the driver, and sets up the RPC server. The controller is a front-end command-line tool to the user and is called ``artiq.frontend.aqctl_XXX``. A ``setup.py`` entry must also be created to install it. -3. An optional `client`, which connects to the controller and exposes the functions of the driver as a command-line interface. Clients are front-end tools (called ``artiq.frontend.aqcli_XXX``) that have ``setup.py`` entries. In most cases, a custom client is not needed and the generic ``artiq_rpctool`` utility can be used instead. Custom clients are only required when large amounts of data must be transferred over the network API, that would be unwieldy to pass as ``artiq_rpctool`` command-line parameters. +3. An optional `client`, which connects to the controller and exposes the functions of the driver as a command-line interface. Clients are front-end tools (called ``artiq.frontend.aqcli_XXX``) that have ``setup.py`` entries. In most cases, a custom client is not needed and the generic ``sipyco_rpctool`` utility can be used instead. Custom clients are only required when large amounts of data must be transferred over the network API, that would be unwieldy to pass as ``sipyco_rpctool`` command-line parameters. 4. An optional `mediator`, which is code executed on the client that supplements the network API. A mediator may contain kernels that control real-time signals such as TTL lines connected to the device. Simple devices use the network API directly and do not have a mediator. Mediator modules are called ``artiq.devices.XXX.mediator`` and their public classes are exported at the ``artiq.devices.XXX`` level (via ``__init__.py``) for direct import and use by the experiments. The driver and controller ------------------------- -A controller is a piece of software that receives commands from a client over the network (or the ``localhost`` interface), drives a device, and returns information about the device to the client. The mechanism used is remote procedure calls (RPCs) using :class:`artiq.protocols.pc_rpc`, which makes the network layers transparent for the driver's user. +A controller is a piece of software that receives commands from a client over the network (or the ``localhost`` interface), drives a device, and returns information about the device to the client. The mechanism used is remote procedure calls (RPCs) using ``sipyco.pc_rpc``, which makes the network layers transparent for the driver's user. The controller we will develop is for a "device" that is very easy to work with: the console from which the controller is run. The operation that the driver will implement is writing a message to that console. @@ -33,9 +33,9 @@ For using RPC, the functions that a driver provides must be the methods of a sin For a more complex driver, you would put this class definition into a separate Python module called ``driver``. -To turn it into a server, we use :class:`artiq.protocols.pc_rpc`. Import the function we will use: :: +To turn it into a server, we use ``sipyco.pc_rpc``. Import the function we will use: :: - from artiq.protocols.pc_rpc import simple_server_loop + from sipyco.pc_rpc import simple_server_loop and add a ``main`` function that is run when the program is executed: :: @@ -68,24 +68,24 @@ and verify that you can connect to the TCP port: :: :tip: Use the key combination Ctrl-AltGr-9 to get the ``telnet>`` prompt, and enter ``close`` to quit Telnet. Quit the controller with Ctrl-C. -Also verify that a target (service) named "hello" (as passed in the first argument to ``simple_server_loop``) exists using the ``artiq_rpctool`` program from the ARTIQ front-end tools: :: +Also verify that a target (service) named "hello" (as passed in the first argument to ``simple_server_loop``) exists using the ``sipyco_rpctool`` program from the ARTIQ front-end tools: :: - $ artiq_rpctool ::1 3249 list-targets + $ sipyco_rpctool ::1 3249 list-targets Target(s): hello The client ---------- -Clients are small command-line utilities that expose certain functionalities of the drivers. The ``artiq_rpctool`` utility contains a generic client that can be used in most cases, and developing a custom client is not required. Try these commands :: +Clients are small command-line utilities that expose certain functionalities of the drivers. The ``sipyco_rpctool`` utility contains a generic client that can be used in most cases, and developing a custom client is not required. Try these commands :: - $ artiq_rpctool ::1 3249 list-methods - $ artiq_rpctool ::1 3249 call message test + $ sipyco_rpctool ::1 3249 list-methods + $ sipyco_rpctool ::1 3249 call message test In case you are developing a NDSP that is complex enough to need a custom client, we will see how to develop one. Create a ``aqcli_hello.py`` file with the following contents: :: #!/usr/bin/env python3 - from artiq.protocols.pc_rpc import Client + from sipyco.pc_rpc import Client def main(): @@ -112,11 +112,11 @@ Command-line arguments Use the Python ``argparse`` module to make the bind address(es) and port configurable on the controller, and the server address, port and message configurable on the client. -We suggest naming the controller parameters ``--bind`` (which adds a bind address in addition to a default binding to localhost), ``--no-bind-localhost`` (which disables the default binding to localhost), and ``--port``, so that those parameters stay consistent across controllers. Use ``-s/--server`` and ``--port`` on the client. The ``artiq.tools.simple_network_args`` library function adds such arguments for the controller, and the ``artiq.tools.bind_address_from_args`` function processes them. +We suggest naming the controller parameters ``--bind`` (which adds a bind address in addition to a default binding to localhost), ``--no-bind-localhost`` (which disables the default binding to localhost), and ``--port``, so that those parameters stay consistent across controllers. Use ``-s/--server`` and ``--port`` on the client. The ``sipyco.common_args.simple_network_args`` library function adds such arguments for the controller, and the ``sipyco.common_args.bind_address_from_args`` function processes them. The controller's code would contain something similar to this: :: - from artiq.tools import simple_network_args + from sipyco.common_args import simple_network_args def get_argparser(): parser = argparse.ArgumentParser(description="Hello world controller") @@ -132,14 +132,14 @@ We suggest that you define a function ``get_argparser`` that returns the argumen Logging ------- -For the debug, information and warning messages, use the ``logging`` Python module and print the log on the standard error output (the default setting). The logging level is by default "WARNING", meaning that only warning messages and more critical messages will get printed (and no debug nor information messages). By calling :func:`artiq.tools.add_common_args` with the parser as argument, you add support for the ``--verbose`` (``-v``) and ``--quiet`` (``-q``) arguments in the parser. Each occurence of ``-v`` (resp. ``-q``) in the arguments will increase (resp. decrease) the log level of the logging module. For instance, if only one ``-v`` is present in the arguments, then more messages (info, warning and above) will get printed. If only one ``-q`` is present in the arguments, then only errors and critical messages will get printed. If ``-qq`` is present in the arguments, then only critical messages will get printed, but no debug/info/warning/error. +For the debug, information and warning messages, use the ``logging`` Python module and print the log on the standard error output (the default setting). The logging level is by default "WARNING", meaning that only warning messages and more critical messages will get printed (and no debug nor information messages). By calling ``sipyco.common_args.verbosity_args`` with the parser as argument, you add support for the ``--verbose`` (``-v``) and ``--quiet`` (``-q``) arguments in the parser. Each occurence of ``-v`` (resp. ``-q``) in the arguments will increase (resp. decrease) the log level of the logging module. For instance, if only one ``-v`` is present in the arguments, then more messages (info, warning and above) will get printed. If only one ``-q`` is present in the arguments, then only errors and critical messages will get printed. If ``-qq`` is present in the arguments, then only critical messages will get printed, but no debug/info/warning/error. The program below exemplifies how to use logging: :: import argparse import logging - from artiq.tools import add_common_args, init_logger + from sipyco.common_args import verbosity_args, init_logger_from_args # get a logger that prints the module name @@ -151,13 +151,13 @@ The program below exemplifies how to use logging: :: parser.add_argument("--someargument", help="some argument") # [...] - add_common_args(parser) # This adds the -q and -v handling + add_verbosity_args(parser) # This adds the -q and -v handling return parser def main(): args = get_argparser().parse_args() - init_logger(args) # This initializes logging system log level according to -v/-q args + init_logger_from_args(args) # This initializes logging system log level according to -v/-q args logger.debug("this is a debug message") logger.info("this is an info message") @@ -172,7 +172,7 @@ The program below exemplifies how to use logging: :: Remote execution support ------------------------ -If you wish to support remote execution in your controller, you may do so by simply replacing ``simple_server_loop`` with :class:`artiq.protocols.remote_exec.simple_rexec_server_loop`. +If you wish to support remote execution in your controller, you may do so by simply replacing ``simple_server_loop`` with :class:`sipyco.remote_exec.simple_rexec_server_loop`. General guidelines ------------------ @@ -184,5 +184,5 @@ General guidelines * Controllers must be able to operate in "simulation" mode, where they behave properly even if the associated hardware is not connected. For example, they can print the data to the console instead of sending it to the device, or dump it into a file. * The simulation mode is entered whenever the ``--simulation`` option is specified. * Keep command line parameters consistent across clients/controllers. When adding new command line options, look for a client/controller that does a similar thing and follow its use of ``argparse``. If the original client/controller could use ``argparse`` in a better way, improve it. -* Use docstrings for all public methods of the driver (note that those will be retrieved by ``artiq_rpctool``). +* Use docstrings for all public methods of the driver (note that those will be retrieved by ``sipyco_rpctool``). * Choose a free default TCP port and add it to the default port list in this manual. diff --git a/doc/manual/environment.rst b/doc/manual/environment.rst index 5a2df7a89..cf2cf7308 100644 --- a/doc/manual/environment.rst +++ b/doc/manual/environment.rst @@ -22,9 +22,9 @@ Local device entries are dictionaries that contain a ``type`` field set to ``loc Controllers +++++++++++ -Controller entries are dictionaries whose ``type`` field is set to ``controller``. When an experiment requests such a device, a RPC client (see :class:`artiq.protocols.pc_rpc`) is created and connected to the appropriate controller. Controller entries are also used by controller managers to determine what controllers to run. +Controller entries are dictionaries whose ``type`` field is set to ``controller``. When an experiment requests such a device, a RPC client (see ``sipyco.pc_rpc``) is created and connected to the appropriate controller. Controller entries are also used by controller managers to determine what controllers to run. -The ``best_effort`` field is a boolean that determines whether to use :class:`artiq.protocols.pc_rpc.Client` or :class:`artiq.protocols.pc_rpc.BestEffortClient`. The ``host`` and ``port`` fields configure the TCP connection. The ``target`` field contains the name of the RPC target to use (you may use ``artiq_rpctool`` on a controller to list its targets). Controller managers run the ``command`` field in a shell to launch the controller, after replacing ``{port}`` and ``{bind}`` by respectively the TCP port the controller should listen to (matches the ``port`` field) and an appropriate bind address for the controller's listening socket. +The ``best_effort`` field is a boolean that determines whether to use ``sipyco.pc_rpc.Client`` or ``sipyco.pc_rpc.BestEffortClient``. The ``host`` and ``port`` fields configure the TCP connection. The ``target`` field contains the name of the RPC target to use (you may use ``sipyco_rpctool`` on a controller to list its targets). Controller managers run the ``command`` field in a shell to launch the controller, after replacing ``{port}`` and ``{bind}`` by respectively the TCP port the controller should listen to (matches the ``port`` field) and an appropriate bind address for the controller's listening socket. Aliases +++++++ diff --git a/doc/manual/index.rst b/doc/manual/index.rst index 3465bc0ac..c7ecde30f 100644 --- a/doc/manual/index.rst +++ b/doc/manual/index.rst @@ -20,7 +20,6 @@ Contents: drtio core_language_reference core_drivers_reference - protocols_reference list_of_ndsps developing_a_ndsp utilities diff --git a/doc/manual/protocols_reference.rst b/doc/manual/protocols_reference.rst deleted file mode 100644 index 247d834b7..000000000 --- a/doc/manual/protocols_reference.rst +++ /dev/null @@ -1,43 +0,0 @@ -Protocols reference -=================== - -:mod:`artiq.protocols.asyncio_server` module --------------------------------------------- - -.. automodule:: artiq.protocols.asyncio_server - :members: - - -:mod:`artiq.protocols.pyon` module ----------------------------------- - -.. automodule:: artiq.protocols.pyon - :members: - - -:mod:`artiq.protocols.pc_rpc` module ------------------------------------- - -.. automodule:: artiq.protocols.pc_rpc - :members: - - -:mod:`artiq.protocols.fire_and_forget` module ---------------------------------------------- - -.. automodule:: artiq.protocols.fire_and_forget - :members: - - -:mod:`artiq.protocols.sync_struct` module ------------------------------------------ - -.. automodule:: artiq.protocols.sync_struct - :members: - - -:mod:`artiq.protocols.remote_exec` module ------------------------------------------ - -.. automodule:: artiq.protocols.remote_exec - :members: diff --git a/doc/manual/utilities.rst b/doc/manual/utilities.rst index abac93f95..4946fea2a 100644 --- a/doc/manual/utilities.rst +++ b/doc/manual/utilities.rst @@ -12,71 +12,6 @@ Local running tool :ref: artiq.frontend.artiq_run.get_argparser :prog: artiq_run -Remote Procedure Call tool --------------------------- - -.. argparse:: - :ref: artiq.frontend.artiq_rpctool.get_argparser - :prog: artiq_rpctool - -This tool is the preferred way of handling simple ARTIQ controllers. -Instead of writing a client for very simple cases you can just use this tool -in order to call remote functions of an ARTIQ controller. - -* Listing existing targets - - The ``list-targets`` sub-command will print to standard output the - target list of the remote server:: - - $ artiq_rpctool hostname port list-targets - -* Listing callable functions - - The ``list-methods`` sub-command will print to standard output a sorted - list of the functions you can call on the remote server's target. - - The list will contain function names, signatures (arguments) and - docstrings. - - If the server has only one target, you can do:: - - $ artiq_rpctool hostname port list-methods - - Otherwise you need to specify the target, using the ``-t target`` - option:: - - $ artiq_rpctool hostname port list-methods -t target_name - -* Remotely calling a function - - The ``call`` sub-command will call a function on the specified remote - server's target, passing the specified arguments. - Like with the previous sub-command, you only need to provide the target - name (with ``-t target``) if the server hosts several targets. - - The following example will call the ``set_attenuation`` method of the - Lda controller with the argument ``5``:: - - $ artiq_rpctool ::1 3253 call -t lda set_attenuation 5 - - In general, to call a function named ``f`` with N arguments named - respectively ``x1, x2, ..., xN`` you can do:: - - $ artiq_rpctool hostname port call -t target f x1 x2 ... xN - - You can use Python syntax to compute arguments as they will be passed - to the ``eval()`` primitive. The numpy package is available in the namespace - as ``np``. Beware to use quotes to separate arguments which use spaces:: - - $ artiq_rpctool hostname port call -t target f '3 * 4 + 2' True '[1, 2]' - $ artiq_rpctool ::1 3256 call load_sample_values 'np.array([1.0, 2.0], dtype=float)' - - If the called function has a return value, it will get printed to - the standard output if the value is not None like in the standard - python interactive console:: - - $ artiq_rpctool ::1 3253 call get_attenuation - 5.0 dB Static compiler --------------- diff --git a/setup.py b/setup.py index a2b470e0a..fae2b91ae 100755 --- a/setup.py +++ b/setup.py @@ -33,7 +33,6 @@ console_scripts = [ "artiq_rtiomon = artiq.frontend.artiq_rtiomon:main", "artiq_session = artiq.frontend.artiq_session:main", "artiq_route = artiq.frontend.artiq_route:main", - "artiq_rpctool = artiq.frontend.artiq_rpctool:main", "artiq_run = artiq.frontend.artiq_run:main", "artiq_flash = artiq.frontend.artiq_flash:main",