diff --git a/artiq/devices/lda/lda.py b/artiq/devices/lda/lda.py new file mode 100755 index 000000000..95c460347 --- /dev/null +++ b/artiq/devices/lda/lda.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +import os +import logging +import atexit +import ctypes +import ctypes.util +import struct + +logger = logging.getLogger(__name__) + +if "." not in os.environ["PATH"].split(";"): + os.environ["PATH"] += ";." +dir = os.path.split(__file__)[0] +if dir not in os.environ["PATH"].split(";"): + os.environ["PATH"] += ";%s" % dir + +for n in "hidapi-libusb hidapi-hidraw hidapi".split(): + path = ctypes.util.find_library(n) + if path: + break +if not path: + raise ImportError("no hidapi library found") +hidapi = ctypes.CDLL(path) + + +class HidDeviceInfo(ctypes.Structure): + pass + + +HidDeviceInfo._fields_ = [ + ("path", ctypes.c_char_p), + ("vendor_id", ctypes.c_ushort), + ("product_id", ctypes.c_ushort), + ("serial", ctypes.c_wchar_p), + ("release", ctypes.c_ushort), + ("manufacturer", ctypes.c_wchar_p), + ("product", ctypes.c_wchar_p), + ("usage_page", ctypes.c_ushort), + ("usage", ctypes.c_ushort), + ("interface", ctypes.c_int), + ("next", ctypes.POINTER(HidDeviceInfo)), +] + + +hidapi.hid_enumerate.argtypes = [ctypes.c_ushort, ctypes.c_ushort] +hidapi.hid_enumerate.restype = ctypes.POINTER(HidDeviceInfo) +hidapi.hid_free_enumeration.argtypes = [ctypes.POINTER(HidDeviceInfo)] +hidapi.hid_open.argtypes = [ctypes.c_ushort, ctypes.c_ushort, + ctypes.c_wchar_p] +hidapi.hid_open.restype = ctypes.c_void_p +hidapi.hid_read_timeout.argtypes = [ctypes.c_void_p, ctypes.c_char_p, + ctypes.c_size_t, ctypes.c_int] +hidapi.hid_read.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t] +hidapi.hid_write.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t] +hidapi.hid_send_feature_report.argtypes = [ctypes.c_void_p, ctypes.c_char_p, + ctypes.c_size_t] +hidapi.hid_get_feature_report.argtypes = [ctypes.c_void_p, ctypes.c_char_p, + ctypes.c_size_t] +hidapi.hid_error.argtypes = [ctypes.c_void_p] +hidapi.hid_error.restype = ctypes.c_wchar_p + +atexit.register(hidapi.hid_exit) + + +class HidError(Exception): + pass + + +class Lda: + _vendor_id = 0x041f + _product_ids = { + "LDA-102": 0x1207, + "LDA-602": 0x1208, + } + + def __init__(self, serial=None, product="LDA-102"): + self.product = product + if serial is None and product != "sim": + serial = next(self.enumerate(product)) + self._dev = hidapi.hid_open(self._vendor_id, + self._product_ids[product], serial) + assert self._dev + else: + self._attenuation = None + + @classmethod + def enumerate(cls, product): + devs = hidapi.hid_enumerate(cls._vendor_id, + cls._product_ids[product]) + dev = devs + while dev: + yield dev[0].serial + dev = dev[0].next + yield None + hidapi.hid_free_enumeration(devs) + + def _check_error(self, ret): + if ret < 0: + err = hidapi.hid_error(self._dev) + raise HidError("%s: %s" % (ret, err)) + return ret + + def write(self, command, length, data=bytes()): + # 0 is report id/padding + buf = struct.pack("BBB6s", 0, command, length, data) + res = self._check_error(hidapi.hid_write(self._dev, buf, len(buf))) + assert res == len(buf), res + + def set(self, command, data): + assert command & 0x80 + assert data + self.write(command, len(data), data) + + def get(self, command, length, timeout=1000): + # FIXME: this can collide with the status reports that the + # device sends out by itself + assert not command & 0x80 + self.write(command, length) + buf = ctypes.create_string_buffer(8) + res = self._check_error(hidapi.hid_read_timeout(self._dev, + buf, len(buf), timeout)) + assert res == len(buf), res + command, length, data = struct.unpack("BB6s", buf.raw) + data = data[:length] + logger.info("%s %s %r", command, length, data) + return data + + def get_attenuation(self): + if self.product != "sim": + return ord(self.get(0x0d, 1))/4. + else: + return self._attenuation + + def set_attenuation(self, attenuation): + if self.product != "sim": + print("[{}] setting attenuation to {}".format(self.product, + attenuation)) + self.set(0x8d, bytes(chr(int(round(attenuation*4))), 'ascii')) + else: + attenuation = round(attenuation*4)/4. + print("[LDA-sim] setting attenuation to {}".format(attenuation)) + self._attenuation = attenuation + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + logger.info(list(Lda.enumerate("LDA-102"))) + l = Lda() + logger.info(l.get_attenuation()) + l.set_attenuation(50) + logger.info(l.get_attenuation()) + l.set_attenuation(60) + logger.info(l.get_attenuation()) diff --git a/artiq/devices/lda/lda_client.py b/artiq/devices/lda/lda_client.py new file mode 100755 index 000000000..f09da6e35 --- /dev/null +++ b/artiq/devices/lda/lda_client.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +from artiq.management.pc_rpc import Client +import argparse + + +def get(remote): + return remote.get_attenuation() + + +def set(remote, attenuation): + remote.set_attenuation(attenuation) + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument('-s', '--server', default="::1", + help="The IP address or hostname of the controller") + parser.add_argument('-p', '--port', default=7777, type=int, + help="The TCP port the controller listens to") + parser.add_argument('-a', '--attenuation', type=float, + help="The attenuation value you want to set") + args = parser.parse_args() + + remote = Client(args.server, args.port, "lda") + + try: + if args.attenuation is None: + print("Current attenuation: {}".format(get(remote))) + else: + set(remote, args.attenuation) + except Exception as e: + print("exception: {}".format(e)) + remote.close_rpc() diff --git a/artiq/devices/lda/lda_controller.py b/artiq/devices/lda/lda_controller.py new file mode 100755 index 000000000..74da58808 --- /dev/null +++ b/artiq/devices/lda/lda_controller.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +from artiq.management.pc_rpc import simple_server_loop +from lda import Lda +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('-d', '--device', default="LDA-102", + choices=["LDA-102", "LDA-602", "sim"]) + parser.add_argument('--bind', default="::1", + help="hostname or IP address to bind to") + parser.add_argument('-p', '--port', default=7777, type=int, + help="TCP port to listen to") + parser.add_argument('-s', '--serial', default=None, + help="USB serial number of the device") + args = parser.parse_args() + + simple_server_loop(Lda(args.serial, args.device), "lda", + args.bind, args.port)