move novatech409b out of tree (#887)

This commit is contained in:
Sebastien Bourdeauducq 2019-04-19 21:56:16 +08:00
parent 4c1fb0c2a1
commit 62e9b2d85e
7 changed files with 1 additions and 319 deletions

View File

@ -1,201 +0,0 @@
# Written by Joe Britton, 2015
import math
import logging
import asyncio
import asyncserial
logger = logging.getLogger(__name__)
class UnexpectedResponse(Exception):
pass
class Novatech409B:
"""Driver for Novatech 409B 4-channel DDS.
All output channels are in range [0, 1, 2, 3].
All frequencies are in Hz.
All phases are in turns.
All amplitudes are in volts.
"""
error_codes = {
"?0": "Unrecognized Command",
"?1": "Bad Frequency",
"?2": "Bad AM Command",
"?3": "Input line too long",
"?4": "Bad Phase",
"?5": "Bad Time",
"?6": "Bad Mode",
"?7": "Bad Amp",
"?8": "Bad Constant",
"?f": "Bad Byte"
}
def __init__(self, serial_dev):
if serial_dev is None:
self.simulation = True
else:
self.simulation = False
self.port = asyncserial.AsyncSerial(
serial_dev,
baudrate=19200,
bytesize=8,
parity="N",
stopbits=1,
xonxoff=0)
def close(self):
"""Close the serial port."""
if not self.simulation:
self.port.close()
async def _ser_readline(self):
c = await self.port.read(1)
r = c
while c != b"\n":
c = await self.port.read(1)
r += c
return r
async def _ser_send(self, cmd, get_response=True):
"""Send a string to the serial port."""
# Low-level routine for sending serial commands to device. It sends
# strings and listens for a response terminated by a carriage return.
# example:
# ser_send("F0 1.0") # sets the freq of channel 0 to 1.0 MHz
if self.simulation:
logger.info("simulation _ser_send(\"%s\")", cmd)
else:
logger.debug("_ser_send(\"%s\")", cmd)
self.port.ser.reset_input_buffer()
await self.port.write((cmd + "\r\n").encode())
if get_response:
result = (await self._ser_readline()).rstrip().decode()
logger.debug("got response from device: %s", result)
if result != "OK":
errstr = self.error_codes.get(result, "Unrecognized reply")
s = "Erroneous reply from device: {ec}, {ecs}".format(
ec=result, ecs=errstr)
raise ValueError(s)
else:
pass
async def reset(self):
"""Hardware reset of 409B."""
await self._ser_send("R", get_response=False)
await asyncio.sleep(1)
await self.setup()
async def setup(self):
"""Initial setup of 409B."""
# Setup the Novatech 409B with the following defaults:
# * command echo off ("E d")
# * external clock ("") 10 MHz sinusoid -1 to +7 dBm
await self._ser_send("E d", get_response=False)
await self.set_phase_continuous(True)
await self.set_simultaneous_update(False)
async def save_state_to_eeprom(self):
"""Save current state to EEPROM."""
await self._ser_send("S")
async def set_phase_continuous(self, is_continuous):
"""Toggle phase continuous mode.
Sends the "M n" command. This turns off the automatic
clearing of the phase register. In this mode, the phase
register is left intact when a command is performed.
Use this mode if you want frequency changes to remain
phase synchronous, with no phase discontinuities.
:param is_continuous: True or False
"""
if is_continuous:
await self._ser_send("M n")
else:
await self._ser_send("M a")
async def set_simultaneous_update(self, simultaneous):
"""Set simultaneous update mode.
Sends the "I m" command. In this mode an update
pulse will not be sent to the DDS chip until
an "I p" command is sent. This is useful when it is
important to change all the outputs to new values
simultaneously.
"""
if simultaneous:
await self._ser_send("I m")
else:
await self._ser_send("I a")
async def do_simultaneous_update(self):
"""Apply update in simultaneous update mode."""
await self._ser_send("I p")
async def set_freq(self, ch_no, freq):
"""Set frequency of one channel."""
# Novatech expects MHz
await self._ser_send("F{:d} {:f}".format(ch_no, freq/1e6))
async def set_phase(self, ch_no, phase):
"""Set phase of one channel."""
# phase word is required by device
# N is an integer from 0 to 16383. Phase is set to
# N*360/16384 deg; in ARTIQ represent phase in cycles [0, 1]
phase_word = round(phase*16383)
cmd = "P{:d} {:d}".format(ch_no, phase_word)
await self._ser_send(cmd)
async def set_gain(self, ch_no, volts):
"""Set amplitude of one channel."""
# due to error in Novatech it doesn't generate an error for
# dac_value>1024, so need to trap.
dac_value = int(math.floor(volts/0.51*1024))
if dac_value < 0 or dac_value > 1023:
s = "Amplitude out of range {v}".format(v=volts)
raise ValueError(s)
s = "V{:d} {:d}".format(ch_no, dac_value)
await self._ser_send(s)
async def get_status(self):
if self.simulation:
return ["00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"80 BC0000 0000 0102 21"]
else:
self.port.ser.reset_input_buffer()
result = []
await self.port.write(("QUE" + "\r\n").encode())
for i in range(5):
m = (await self._ser_readline()).rstrip().decode()
result.append(m)
logger.debug("got device status: %s", result)
return result
async def ping(self):
try:
stat = await self.get_status()
except asyncio.CancelledError:
raise
except:
return False
# check that version number matches is "21"
if stat[4][20:] == "21":
logger.debug("ping successful")
return True
else:
return False

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python3
# Written by Joe Britton, 2015
import argparse
import logging
import sys
import os
import asyncio
from artiq.devices.novatech409b.driver import Novatech409B
from artiq.protocols.pc_rpc import simple_server_loop
from artiq.tools import *
logger = logging.getLogger(__name__)
def get_argparser():
parser = argparse.ArgumentParser(
description="ARTIQ controller for the Novatech 409B 4-channel DDS box")
simple_network_args(parser, 3254)
parser.add_argument(
"-d", "--device", default=None,
help="serial port.")
parser.add_argument(
"--simulation", action="store_true",
help="Put the driver in simulation mode, even if --device is used.")
add_common_args(parser)
return parser
def main():
args = get_argparser().parse_args()
init_logger(args)
if os.name == "nt":
asyncio.set_event_loop(asyncio.ProactorEventLoop())
if not args.simulation and args.device is None:
print("You need to specify either --simulation or -d/--device "
"argument. Use --help for more information.")
sys.exit(1)
dev = Novatech409B(args.device if not args.simulation else None)
asyncio.get_event_loop().run_until_complete(dev.setup())
try:
simple_server_loop(
{"novatech409b": dev}, bind_address_from_args(args), args.port)
finally:
dev.close()
if __name__ == "__main__":
main()

View File

@ -10,8 +10,7 @@ class TestFrontends(unittest.TestCase):
# Skip tests for GUI programs on headless CI environments.
commands = {
"aqctl": [
"corelog", "lda", "novatech409b",
"thorlabs_tcube"
"corelog", "lda", "thorlabs_tcube"
],
"artiq": [
"client", "compile", "coreanalyzer", "coremgmt", "ctlmgr",

View File

@ -1,46 +0,0 @@
import sys
import unittest
from artiq.test.hardware_testbench import GenericControllerCase, ControllerCase
class GenericNovatech409BTest:
def test_parameters_readback(self):
# write sample data and read it back
for i in range(4):
self.driver.set_freq(i, 1e6)
self.driver.set_phase(i, 0.5)
self.driver.set_gain(i, 0.25)
result = self.driver.get_status()
# check for expected status message; ignore all but first 23 bytes
# compare with previous result extracted from Novatech
for i in range(4):
r = result[i]
self.assertEqual(r[0:23], "00989680 2000 01F5 0000")
class TestNovatech409B(GenericNovatech409BTest, ControllerCase):
def setUp(self):
ControllerCase.setUp(self)
self.start_controller("novatech409b")
self.driver = self.device_mgr.get("novatech409b")
class TestNovatech409BSim(GenericNovatech409BTest, GenericControllerCase):
def get_device_db(self):
return {
"novatech409b": {
"type": "controller",
"host": "::1",
"port": 3254,
"command": (sys.executable.replace("\\", "\\\\")
+ " -m artiq.frontend.aqctl_novatech409b "
+ "-p {port} --simulation")
}
}
def setUp(self):
GenericControllerCase.setUp(self)
self.start_controller("novatech409b")
self.driver = self.device_mgr.get("novatech409b")

View File

@ -46,21 +46,6 @@ You can choose the LDA model with the ``-P`` parameter. The default is LDA-102.
:ref: artiq.frontend.aqctl_lda.get_argparser
:prog: aqctl_lda
Novatech 409B
-------------
Driver
++++++
.. automodule:: artiq.devices.novatech409b.driver
:members:
Controller
++++++++++
.. argparse::
:ref: artiq.frontend.aqctl_novatech409b.get_argparser
:prog: aqctl_novatech409b
Thorlabs T-Cube
---------------

View File

@ -38,7 +38,6 @@ console_scripts = [
"aqctl_corelog = artiq.frontend.aqctl_corelog:main",
"aqctl_lda = artiq.frontend.aqctl_lda:main",
"aqctl_novatech409b = artiq.frontend.aqctl_novatech409b:main",
"aqctl_thorlabs_tcube = artiq.frontend.aqctl_thorlabs_tcube:main",
]