1
0
forked from M-Labs/artiq

move lda out of tree (#887)

This commit is contained in:
Sebastien Bourdeauducq 2019-04-19 23:43:14 +08:00
parent d4781e9a8a
commit eaec519ac8
8 changed files with 7 additions and 383 deletions

View File

@ -1,229 +0,0 @@
import logging
import ctypes
import struct
from artiq.language.units import dB
logger = logging.getLogger("lda")
class HidError(Exception):
pass
class Ldasim:
"""Lab Brick Digital Attenuator simulation driver."""
def __init__(self):
self._attenuation = None
self._att_max = 63*dB
self._att_step_size = 0.25*dB
def get_att_max(self):
return self._att_max
def get_att_step_size(self):
return self._att_step_size
def close(self):
"""Close the device."""
pass
def get_attenuation(self):
"""Reads last attenuation value set to the simulated device.
:return: Returns the attenuation value in dB, or None if it was
never set.
:rtype: float
"""
return self._attenuation
def set_attenuation(self, attenuation):
"""Stores the new attenuation value.
:param attenuation: The attenuation value in dB.
"""
step = self.get_att_step_size()
att = round(attenuation/step)*step
if att > self.get_att_max():
raise ValueError("Cannot set attenuation {} > {}"
.format(att, self.get_att_max()))
elif att < 0*dB:
raise ValueError("Cannot set attenuation {} < 0".format(att))
else:
att = round(att*4)/4. * dB
self._attenuation = att
def ping(self):
return True
class Lda:
"""Lab Brick Digital Attenuator driver.
This driver depends on the hidapi library.
On Linux you should install hidapi-libusb shared library in a directory
listed in your LD_LIBRARY_PATH or in the conventional places (/usr/lib,
/lib, /usr/local/lib). This can be done either from hidapi sources
or by installing the libhidapi-libusb0 binary package on Debian-like OS.
On Windows you should put hidapi.dll shared library in the
artiq\\\\devices\\\\lda folder.
"""
_vendor_id = 0x041f
_product_ids = {
"LDA-102": 0x1207,
"LDA-602": 0x1208,
"LDA-302P-1": 0x120E,
}
_att_max = {
"LDA-102": 63*dB,
"LDA-602": 63*dB,
"LDA-302P-1": 63*dB
}
_att_step_size = {
"LDA-102": 0.5*dB,
"LDA-602": 0.5*dB,
"LDA-302P-1": 1.0*dB
}
def __init__(self, serial=None, product="LDA-102"):
"""
:param serial: The serial number.
:param product: The product identifier string: LDA-102, LDA-602.
"""
from artiq.devices.lda.hidapi import hidapi
self.hidapi = hidapi
self.product = product
self.serial = serial
if self.serial is None:
self.serial = next(self.enumerate(self.product))
self._dev = self.hidapi.hid_open(self._vendor_id,
self._product_ids[self.product],
self.serial)
if not self._dev:
raise IOError("Device not found")
def close(self):
"""Close the device."""
self.hidapi.hid_close(self._dev)
def get_att_step_size(self):
return self._att_step_size[self.product]
def get_att_max(self):
return self._att_max[self.product]
@classmethod
def enumerate(cls, product):
from artiq.devices.lda.hidapi import hidapi
devs = hidapi.hid_enumerate(cls._vendor_id,
cls._product_ids[product])
try:
dev = devs
while dev:
yield dev[0].serial
dev = dev[0].next
finally:
hidapi.hid_free_enumeration(devs)
def _check_error(self, ret):
if ret < 0:
err = self.hidapi.hid_error(self._dev)
raise HidError("{}: {}".format(ret, err))
return ret
def write(self, command, length, data=bytes()):
"""Writes a command to the Lab Brick device.
:param command: command ID.
:param length: number of meaningful bytes in the data array.
:param data: a byte array containing the payload of the command.
"""
# 0 is report id/padding
buf = struct.pack("BBB6s", 0, command, length, data)
res = self._check_error(self.hidapi.hid_write(self._dev, buf,
len(buf)))
if res != len(buf):
raise IOError
def set(self, command, data):
"""Sends a SET command to the Lab Brick device.
:param command: command ID, must have most significant bit set.
:param data: payload of the command.
"""
if not data:
raise ValueError("Data is empty")
if not (command & 0x80):
raise ValueError("Set commands must have most significant bit set")
self.write(command, len(data), data)
def get(self, command, length, timeout=1000):
"""Sends a GET command to read back some value of the Lab Brick device.
:param command: Command ID, most significant bit must be cleared.
:param length: Length of the command, "count" in the datasheet.
:param timeout: Timeout of the HID read in ms.
:return: Returns the value read from the device.
:rtype: bytes
"""
if command & 0x80:
raise ValueError("Get commands must not have most significant bit"
" set")
status = None
self.write(command, length)
buf = ctypes.create_string_buffer(8)
while status != command:
res = self._check_error(self.hidapi.hid_read_timeout(self._dev,
buf, len(buf), timeout))
if res != len(buf):
raise IOError
status, length, data = struct.unpack("BB6s", buf.raw)
data = data[:length]
logger.info("%s %s %r", command, length, data)
return data
def get_attenuation(self):
"""Reads attenuation value from Lab Brick device.
:return: Returns the attenuation value in dB.
:rtype: float
"""
return (ord(self.get(0x0d, 1))/4.) * dB
def set_attenuation(self, attenuation):
"""Sets attenuation value of the Lab Brick device.
:param attenuation: Attenuation value in dB.
"""
step = self.get_att_step_size()
att = round(attenuation/step)*step
if att > self.get_att_max():
raise ValueError("Cannot set attenuation {} > {}"
.format(att, self.get_att_max()))
elif att < 0*dB:
raise ValueError("Cannot set attenuation {} < 0".format(att))
else:
self.set(0x8d, bytes([int(round(att*4))]))
def ping(self):
try:
self.get_attenuation()
except:
return False
return True

View File

@ -1,58 +0,0 @@
import os
import atexit
import ctypes
import ctypes.util
if "." not in os.environ["PATH"].split(";"):
os.environ["PATH"] += ";."
dir = os.path.split(__file__)[0]
if dir not in os.environ["PATH"].split(";"):
os.environ["PATH"] += ";{}".format(dir)
for n in "hidapi-libusb hidapi-hidraw hidapi".split():
path = ctypes.util.find_library(n)
if path:
break
if not path:
raise ImportError("no hidapi library found")
hidapi = ctypes.CDLL(path)
class HidDeviceInfo(ctypes.Structure):
pass
HidDeviceInfo._fields_ = [
("path", ctypes.c_char_p),
("vendor_id", ctypes.c_ushort),
("product_id", ctypes.c_ushort),
("serial", ctypes.c_wchar_p),
("release", ctypes.c_ushort),
("manufacturer", ctypes.c_wchar_p),
("product", ctypes.c_wchar_p),
("usage_page", ctypes.c_ushort),
("usage", ctypes.c_ushort),
("interface", ctypes.c_int),
("next", ctypes.POINTER(HidDeviceInfo)),
]
hidapi.hid_enumerate.argtypes = [ctypes.c_ushort, ctypes.c_ushort]
hidapi.hid_enumerate.restype = ctypes.POINTER(HidDeviceInfo)
hidapi.hid_free_enumeration.argtypes = [ctypes.POINTER(HidDeviceInfo)]
hidapi.hid_open.argtypes = [ctypes.c_ushort, ctypes.c_ushort,
ctypes.c_wchar_p]
hidapi.hid_open.restype = ctypes.c_void_p
hidapi.hid_close.argtypes = [ctypes.c_void_p]
hidapi.hid_read_timeout.argtypes = [ctypes.c_void_p, ctypes.c_char_p,
ctypes.c_size_t, ctypes.c_int]
hidapi.hid_read.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t]
hidapi.hid_write.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_size_t]
hidapi.hid_send_feature_report.argtypes = [ctypes.c_void_p, ctypes.c_char_p,
ctypes.c_size_t]
hidapi.hid_get_feature_report.argtypes = [ctypes.c_void_p, ctypes.c_char_p,
ctypes.c_size_t]
hidapi.hid_error.argtypes = [ctypes.c_void_p]
hidapi.hid_error.restype = ctypes.c_wchar_p
atexit.register(hidapi.hid_exit)

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python3
import argparse
from artiq.devices.lda.driver import Lda, Ldasim
from artiq.protocols.pc_rpc import simple_server_loop
from artiq.tools import *
def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ controller for the Lab Brick Digital Attenuator")
parser.add_argument("-P", "--product", default="LDA-102",
help="product type (default: %(default)s)",
choices=["LDA-102", "LDA-602"])
simple_network_args(parser, 3253)
parser.add_argument("-d", "--device", default=None,
help="USB serial number of the device. "
"The serial number is written on a sticker under "
"the device, you should write for example "
"-d \"SN:03461\". You must prepend enough 0s for "
"it to be 5 digits. If omitted, the first "
"available device will be used.")
parser.add_argument("--simulation", action="store_true",
help="Put the driver in simulation mode.")
add_common_args(parser)
return parser
def main():
args = get_argparser().parse_args()
init_logger(args)
if args.simulation:
lda = Ldasim()
else:
lda = Lda(args.device, args.product)
try:
simple_server_loop({"lda": lda},
bind_address_from_args(args), args.port)
finally:
lda.close()
if __name__ == "__main__":
main()

View File

@ -60,13 +60,13 @@ class ControllerCase(unittest.TestCase):
entry = { entry = {
"type": "controller", "type": "controller",
"host": "::1", "host": "::1",
"port": 3253, "port": 1068,
"command": (sys.executable.replace("\\", "\\\\") "command": (sys.executable.replace("\\", "\\\\")
+ " -m artiq.frontend.aqctl_lda " + " -m artiq.frontend.aqctl_corelog "
+ "-p {port} --simulation") + "-p {port} --simulation foo")
} }
async def test(): async def test():
await self.start("lda_sim", entry) await self.start("corelog", entry)
remote = await self.get_client(entry["host"], entry["port"]) remote = await self.get_client(entry["host"], entry["port"])
await remote.ping() await remote.ping()
@ -76,8 +76,8 @@ class ControllerCase(unittest.TestCase):
entry = { entry = {
"type": "controller", "type": "controller",
"host": "::1", "host": "::1",
"port": 3253 "port": 1068
} }
with expect_no_log_messages(logging.ERROR): with expect_no_log_messages(logging.ERROR):
self.controllers["lda_sim"] = entry self.controllers["corelog"] = entry
self.assertTrue(self.controllers.queue.empty()) self.assertTrue(self.controllers.queue.empty())

View File

@ -10,7 +10,7 @@ class TestFrontends(unittest.TestCase):
# Skip tests for GUI programs on headless CI environments. # Skip tests for GUI programs on headless CI environments.
commands = { commands = {
"aqctl": [ "aqctl": [
"corelog", "lda", "thorlabs_tcube" "corelog", "thorlabs_tcube"
], ],
"artiq": [ "artiq": [
"client", "compile", "coreanalyzer", "coremgmt", "ctlmgr", "client", "compile", "coreanalyzer", "coremgmt", "ctlmgr",

View File

@ -1,44 +0,0 @@
import unittest
import sys
from artiq.devices.lda.driver import Ldasim
from artiq.language.units import dB
from artiq.test.hardware_testbench import GenericControllerCase, ControllerCase
class GenericLdaTest:
def test_attenuation(self):
step = self.cont.get_att_step_size()
attmax = self.cont.get_att_max()
test_vector = [i*step*dB for i in range(0, int(attmax*int(1/step)+1))]
for i in test_vector:
with self.subTest(i=i):
self.cont.set_attenuation(i)
j = self.cont.get_attenuation()
self.assertEqual(i, j)
class TestLda(ControllerCase, GenericLdaTest):
def setUp(self):
ControllerCase.setUp(self)
self.start_controller("lda")
self.cont = self.device_mgr.get("lda")
class TestLdaSim(GenericControllerCase, GenericLdaTest):
def get_device_db(self):
return {
"lda": {
"type": "controller",
"host": "::1",
"port": 3253,
"command": (sys.executable.replace("\\", "\\\\")
+ " -m artiq.frontend.aqctl_lda "
+ "-p {port} --simulation")
}
}
def setUp(self):
GenericControllerCase.setUp(self)
self.start_controller("lda")
self.cont = self.device_mgr.get("lda")

View File

@ -37,7 +37,6 @@ console_scripts = [
"artiq_flash = artiq.frontend.artiq_flash:main", "artiq_flash = artiq.frontend.artiq_flash:main",
"aqctl_corelog = artiq.frontend.aqctl_corelog:main", "aqctl_corelog = artiq.frontend.aqctl_corelog:main",
"aqctl_lda = artiq.frontend.aqctl_lda:main",
"aqctl_thorlabs_tcube = artiq.frontend.aqctl_thorlabs_tcube:main", "aqctl_thorlabs_tcube = artiq.frontend.aqctl_thorlabs_tcube:main",
] ]